wordcount实例

scala的wordcount实例

package com.wondersgroup.myscala

import scala.actors.{Actor, Future}
import scala.collection.mutable.ListBuffer
import scala.io.Source

//首先统计每个文本中出现的频率=》汇总
case class SubmitTask(f:String)
case object StopTask

//统计一个文本中单词出现的次数

class ActorTest3 extends Actor{

  override def act() :Unit = {
    while (true) {
      receive{
        case SubmitTask(f) => {
          //把文件的一行内容作为一个元素存入list
          val lines = Source.fromFile(f).getLines().toList
          //文件中的每一个单词作为一个元素存入list
          val words = lines.flatMap(_.split(" "))
          print("----------"+words)
          println("================"+words.map((_,1)))
          //得到一个map ,当前文本的单词,以及相应单词出现的次数
          println("++++++"+words.map((_,1)).groupBy(_._1))
          val result = words.map((_,1)).groupBy(_._1).mapValues(_.size)
          println("&&&&&&&&&&&&&&&&"+result)

          sender ! result

        }

        case StopTask => exit()
      }
    }
  }

}

object ActorTest3{
  def main(args: Array[String]): Unit = {
    //把文本分析任务提交给actor
    val replys = new ListBuffer[Future[Any]]
    val results = new ListBuffer[Map[String,Int]]
    val files = Array("src/wordcount.txt","src/wordcount1.txt")
    for(f <- files) {
      val actor = new ActorTest3
      actor.start()
      val reply = actor !! SubmitTask(f)
      //把处理结果放到replys
      replys += reply
    }

    //对多个文件的处理结果汇总
    while (replys.size > 0) {
      //判断结果是否可取
      val done = replys.filter(_.isSet)
      print("@@@@@@@@@@@"+done)
      for(res <- done) {
        results += res.apply().asInstanceOf[Map[String,Int]]
        replys -= res
      }
      Thread.sleep(5000)
    }

    //对各个分析结果进行汇总
    val res2 = results.flatten.groupBy(_._1).mapValues(_.foldLeft(0)(_+_._2))
    println("******************"+res2)

  }
}  

输出

@@@@@@@@@@@ListBuffer()----------List(python, is, a, very, brief, language, It, is, also, a, shell, language, we, like, python)================List((python,1), (is,1), (a,1), (very,1), (brief,1), (language,1), (It,1), (is,1), (also,1), (a,1), (shell,1), (language,1), (we,1), (like,1), (python,1))
----------List(python, java, go, python, c++, c++, java, ruby, c, javascript, c++)================List((python,1), (java,1), (go,1), (python,1), (c++,1), (c++,1), (java,1), (ruby,1), (c,1), (javascript,1), (c++,1))
++++++Map(java -> List((java,1), (java,1)), c++ -> List((c++,1), (c++,1), (c++,1)), go -> List((go,1)), python -> List((python,1), (python,1)), c -> List((c,1)), ruby -> List((ruby,1)), javascript -> List((javascript,1)))
++++++Map(is -> List((is,1), (is,1)), shell -> List((shell,1)), a -> List((a,1), (a,1)), also -> List((also,1)), language -> List((language,1), (language,1)), brief -> List((brief,1)), python -> List((python,1), (python,1)), It -> List((It,1)), very -> List((very,1)), we -> List((we,1)), like -> List((like,1)))
&&&&&&&&&&&&&&&&Map(is -> 2, shell -> 1, a -> 2, also -> 1, language -> 2, brief -> 1, python -> 2, It -> 1, very -> 1, we -> 1, like -> 1)
&&&&&&&&&&&&&&&&Map(java -> 2, c++ -> 3, go -> 1, python -> 2, c -> 1, ruby -> 1, javascript -> 1)
@@@@@@@@@@@ListBuffer(<function0>, <function0>)******************Map(is -> 2, shell -> 1, a -> 2, java -> 2, c++ -> 3, go -> 1, also -> 1, language -> 2, brief -> 1, python -> 4, It -> 1, c -> 1, ruby -> 1, very -> 1, we -> 1, like -> 1, javascript -> 1)

mapreduce的wordcount

mapper

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
//import org.apache.hadoop.io.*;
//import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider.Text;
/**
 * 输入key   LongWritable  行号
 * 输入的value Text   一行内容
 * 输出的key  Text  单词
 * 输出的value IntWritable  单词的个数
 * @author lenovo
 *
 */
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{

	Text k =new Text();
	IntWritable v = new IntWritable(1);
//	@SuppressWarnings("unused")
	@Override
	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {

		//  1 将一行内容转化为String
		String line = value.toString();

		// 2 切分
		String[] words = line.split(" ");

		// 3 循环写出到下一个阶段   写
		for (String word : words) {

			k.set(word);
			context.write(k,v);//写入
		}
	}
}  

reducer

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordCountReducer extends Reducer<Text, IntWritable, Text,IntWritable>{

	// hello 1
	// hello 1

	@Override
	//相同的进来
	protected void reduce(Text key, Iterable<IntWritable> values,Context context)
			throws IOException, InterruptedException {
		//  1 汇总 单词总个数
		int sum = 0;
		for (IntWritable count : values) {
			sum +=count.get();
		}

		// 2 输出单词的总个数

		context.write(key, new IntWritable(sum));
	}
}  

driver

import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountDriver {
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

		// 1获取job信息
		Configuration configuration = new Configuration();

		// 开启 map 端输出压缩
		configuration.setBoolean("mapreduce.map.output.compress", true);
		// 设置 map 端输出压缩方式
//		configuration.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);
		configuration.setClass("mapreduce.map.output.compress.codec", DefaultCodec.class, CompressionCodec.class);

		Job job = Job.getInstance(configuration);

		// 2 获取jar包位置

		job.setJarByClass(WordCountDriver.class);

		// 3 关联mapper he reducer
		job.setMapperClass(WordCountMapper.class);
		job.setReducerClass(WordCountReducer.class);

		// 4 设置map输出数据类型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);

		// 5 设置最终输出类型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);

		// 9 添加combiner     进入reduce之前先进行合并,不是所有的map都能合并,需要满足要求
//		job.setCombinerClass(WordcountCombiner.class);

		// 8 设置读取输入文件切片的类     多个小文件的处理方式 使用CombineTextInputFormat     系统默认TextInputFormat

//		job.setInputFormatClass(CombineTextInputFormat.class);
//		CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);
//		CombineTextInputFormat.setMinInputSplitSize(job, 2097152);
		// 6 设置数据输入 输出文件的 路径
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		// 设置 reduce 端输出压缩开启
		FileOutputFormat.setCompressOutput(job, true);
		// 设置压缩的方式
		 FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
		// FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
		// FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class); 

		// 7提交代码

		boolean result = job.waitForCompletion(true);
		System.exit(result?0:1);
	}
}  

combiner

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordcountCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{

	@Override
	protected void reduce(Text key, Iterable<IntWritable> values,
			Context context) throws IOException, InterruptedException {
		// 1 汇总
		int sum = 0;
		for (IntWritable value : values) {
			sum += value.get();
		}

		// 2 输出
		context.write(key, new IntWritable(sum));
	}
} 

原文地址:https://www.cnblogs.com/snow-wolf-1/p/11827088.html

时间: 2024-08-02 04:35:37

wordcount实例的相关文章

Hadoop3 在eclipse中访问hadoop并运行WordCount实例

前言:       毕业两年了,之前的工作一直没有接触过大数据的东西,对hadoop等比较陌生,所以最近开始学习了.对于我这样第一次学的人,过程还是充满了很多疑惑和不解的,不过我采取的策略是还是先让环境跑起来,然后在能用的基础上在多想想为什么.       通过这三个礼拜(基本上就是周六周日,其他时间都在加班啊T T)的探索,我目前主要完成的是: 1.在Linux环境中伪分布式部署hadoop(SSH免登陆),运行WordCount实例成功. http://www.cnblogs.com/Pur

storm wordcount实例

在storm环境部署完毕,并正确启动之后,现在就可以真正进入storm开发了,按照惯例,以wordcount作为开始.这个例子很简单,核心组件包括:一个spout,两个bolt,一个Topology.spout从一个路径读取文件,然后readLine,向bolt发射,一个文件处理完毕后,重命名,以不再重复处理.第一个bolt将从spout接收到的字符串按空格split,产生word,发射给下一个bolt.第二个bolt接收到word后,统计.计数,放到HashMap<string, intege

Spark编程环境搭建及WordCount实例

基于Intellij IDEA搭建Spark开发环境搭建 基于Intellij IDEA搭建Spark开发环境搭——参考文档 ● 参考文档http://spark.apache.org/docs/latest/programming-guide.html ● 操作步骤 ·a)创建maven 项目 ·b)引入依赖(Spark 依赖.打包插件等等) 基于Intellij IDEA搭建Spark开发环境—maven vs sbt ● 哪个熟悉用哪个 ● Maven也可以构建scala项目 基于Inte

【Flink】Flink基础之WordCount实例(Java与Scala版本)

简述 WordCount(单词计数)作为大数据体系的标准示例,一直是入门的经典案例,下面用java和scala实现Flink的WordCount代码: 采用IDEA + Maven + Flink 环境:文末附 pom 文件和相关技术点总结: Java批处理版本 import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.DataSet; import org.apa

Ubuntu14.04安装配置Hadoop2.6.0(完全分布式)与 wordcount实例运行

我的环境是:Ubuntu14.04+Hadoop2.6.0+JDK1.8.0_25 官网2.6.0的安装教程:http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-common/SingleCluster.html 为了方面配置,我在每台机器上都使用了hadoop用户来操作,这样做的确够方便. 结点信息:(分布式集群架构:master为主节点,其余为从节点) 机器名 IP 作用 master 122.205.135.254

Storm的HelloWorld实例

====实例需求分析 数据源会源源不断的产生海量的英文语句. 我们需要实时的获取到单词的词频,或者是TopN,来观察词频是如何变化的. 设想这是不同商品的用户行为操作数据,我们是不是就可以实时观测到用户关注商品的热点呢? ====与Hadoop的对比 ====Storm编程模型 详细请参考后续文章中的部分.这里只进行简单介绍. (1)消息源Spout组件 是消息源的接入组件.通常我们有两个方法来实现这个组件: ①.继承BaseRichSpout类: 相对来说比较简单的方法,并且需要重写的方法也比

三:Storm设计一个Topology用来统计单词的TopN的实例

Storm的单词统计设计 一:Storm的wordCount和Hadoop的wordCount实例对比 二:Storm的wordCount的方案实例设计 三:建立maven项目,添加maven相关依赖包(1)输入:search.maven.org网址,在其中找到storm的核心依赖(2)将核心依赖添加到pom.xml文件中 <dependency>            <groupId>com.github.aloomaio</groupId>            

Storm设计一个Topology用来统计单词的TopN的实例

Storm的单词统计设计 一:Storm的wordCount和Hadoop的wordCount实例对比 二:Storm的wordCount的方案实例设计 三:建立maven项目,添加maven相关依赖包(1)输入:search.maven.org网址,在其中找到storm的核心依赖(2)将核心依赖添加到pom.xml文件中 <dependency>            <groupId>com.github.aloomaio</groupId>            

CentOS 7下Hadoop2.6伪分布模式安装

1.Hadoop核心配置文件: # gedit core-site.xml <configuration> <property> <name>fs.default.name</name> <value>hdfs://localhost:9000</value> </property> </configuration> # gedit hdfs-site.xml <configuration> <