一脸懵逼学习Hadoop中的MapReduce程序中自定义分组的实现

1:首先搞好实体类对象:

  write 是把每个对象序列化到输出流,readFields是把输入流字节反序列化,实现WritableComparable,Java值对象的比较:一般需要重写toString(),hashCode(),equals()方法

  1 package com.areapartition;
  2
  3 import java.io.DataInput;
  4 import java.io.DataOutput;
  5 import java.io.IOException;
  6
  7 import org.apache.hadoop.io.Writable;
  8 import org.apache.hadoop.io.WritableComparable;
  9
 10 /***
 11  *
 12  * @author Administrator
 13  * 1:write 是把每个对象序列化到输出流
 14  * 2:readFields是把输入流字节反序列化
 15  * 3:实现WritableComparable
 16  *      Java值对象的比较:一般需要重写toString(),hashCode(),equals()方法
 17  *
 18  */
 19 public class FlowBean implements WritableComparable<FlowBean>{
 20
 21
 22     private String phoneNumber;//电话号码
 23     private long upFlow;//上行流量
 24     private long downFlow;//下行流量
 25     private long sumFlow;//总流量
 26
 27
 28
 29     public String getPhoneNumber() {
 30         return phoneNumber;
 31     }
 32     public void setPhoneNumber(String phoneNumber) {
 33         this.phoneNumber = phoneNumber;
 34     }
 35     public long getUpFlow() {
 36         return upFlow;
 37     }
 38     public void setUpFlow(long upFlow) {
 39         this.upFlow = upFlow;
 40     }
 41     public long getDownFlow() {
 42         return downFlow;
 43     }
 44     public void setDownFlow(long downFlow) {
 45         this.downFlow = downFlow;
 46     }
 47     public long getSumFlow() {
 48         return sumFlow;
 49     }
 50     public void setSumFlow(long sumFlow) {
 51         this.sumFlow = sumFlow;
 52     }
 53
 54     //为了对象数据的初始化方便,加入一个带参的构造函数
 55     public FlowBean(String phoneNumber, long upFlow, long downFlow) {
 56         this.phoneNumber = phoneNumber;
 57         this.upFlow = upFlow;
 58         this.downFlow = downFlow;
 59         this.sumFlow = upFlow + downFlow;
 60     }
 61     //在反序列化时候,反射机制需要调用空参的构造函数,所以定义了一个空参的构造函数
 62     public FlowBean() {
 63     }
 64
 65     //重写toString()方法
 66     @Override
 67     public String toString() {
 68         return "" + upFlow + "\t" + downFlow + "\t" + sumFlow + "";
 69     }
 70
 71
 72     //从数据流中反序列出对象的数据
 73     //从数据流中读取字段时必须和序列化的顺序保持一致
 74     @Override
 75     public void readFields(DataInput in) throws IOException {
 76         phoneNumber = in.readUTF();
 77         upFlow = in.readLong();
 78         downFlow = in.readLong();
 79         sumFlow = in.readLong();
 80
 81     }
 82
 83     //将对象数据序列化到流中
 84     @Override
 85     public void write(DataOutput out) throws IOException {
 86         out.writeUTF(phoneNumber);
 87         out.writeLong(upFlow);
 88         out.writeLong(downFlow);
 89         out.writeLong(sumFlow);
 90
 91     }
 92
 93     //流量比较的实现方法
 94     @Override
 95     public int compareTo(FlowBean o) {
 96
 97         //大就返回-1,小于等于返回1,进行倒序排序
 98         return sumFlow > o.sumFlow ? -1 : 1;
 99     }
100
101
102
103 }

2:流量分区处理操作的步骤:

   2. 1:对流量原始日志进行流量统计,将不同的省份的用户统计结果输出到不同文件;

   2.2:需要自定义改造两个机制:

    2.2.1:改造分区的逻辑,自定义一个partitioner

    2.2.2:自定义reducer task的并发任务数

  1 package com.areapartition;
  2
  3 import java.io.IOException;
  4
  5 import org.apache.commons.lang.StringUtils;
  6 import org.apache.hadoop.conf.Configuration;
  7 import org.apache.hadoop.fs.Path;
  8 import org.apache.hadoop.io.LongWritable;
  9 import org.apache.hadoop.io.Text;
 10 import org.apache.hadoop.mapreduce.Job;
 11 import org.apache.hadoop.mapreduce.Mapper;
 12 import org.apache.hadoop.mapreduce.Reducer;
 13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 15
 16 /***
 17  * 流量分区处理操作
 18  * @author Administrator
 19  * 1:对流量原始日志进行流量统计,将不同的省份的用户统计结果输出到不同文件;
 20  * 2:需要自定义改造两个机制:
 21  *      2.1:改造分区的逻辑,自定义一个partitioner
 22  *      2.2:自定义reducer task的并发任务数
 23  */
 24 public class FlowSumArea {
 25
 26
 27     public static class FlowSumAreaMapper extends Mapper<LongWritable, Text, Text, FlowBean>{
 28         @Override
 29         protected void map(LongWritable key, Text value, Context context)
 30                 throws IOException, InterruptedException {
 31             //拿到一行数据
 32             String line = value.toString();
 33             //切分成各个字段
 34             String[] fields = StringUtils.split(line, "\t");
 35
 36             //获取到我们需要的字段
 37             String phoneNumber = fields[1];
 38             long up_flow = Long.parseLong(fields[7]);
 39             long down_flow = Long.parseLong(fields[8]);
 40
 41             //封装成key-value并且输出
 42             context.write(new Text(phoneNumber), new FlowBean(phoneNumber, up_flow, down_flow));
 43         }
 44     }
 45
 46
 47     public static class FlowSumAreaReducer extends Reducer<Text, FlowBean, Text, FlowBean>{
 48         @Override
 49         protected void reduce(Text key, Iterable<FlowBean> values, Context context)
 50                 throws IOException, InterruptedException {
 51             //遍历求和
 52             long up_flowSum = 0;
 53             long down_flowSum = 0;
 54             for(FlowBean fb : values){
 55                 up_flowSum += fb.getUpFlow();
 56                 down_flowSum += fb.getDownFlow();
 57             }
 58
 59             //封装成key-value并且输出
 60             context.write(key, new FlowBean(key.toString(),up_flowSum,down_flowSum));
 61         }
 62
 63     }
 64
 65
 66     public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
 67         //创建配置文件
 68         Configuration conf = new Configuration();
 69         //获取一个作业
 70         Job job = Job.getInstance(conf);
 71
 72         //设置整个job所用的那些类在哪个jar包
 73         job.setJarByClass(FlowSumArea.class);
 74         //本job使用的mapper和reducer的类
 75         job.setMapperClass(FlowSumAreaMapper.class);
 76         job.setReducerClass(FlowSumAreaReducer.class);
 77
 78         //设置我们自定义的分组逻辑定义
 79         job.setPartitionerClass(AreaPartitioner.class);
 80
 81         //指定mapper的输出数据key-value类型
 82         job.setMapOutputKeyClass(Text.class);
 83         job.setMapOutputValueClass(FlowBean.class);
 84
 85         //指定reduce的输出数据key-value类型Text
 86         job.setOutputKeyClass(Text.class);
 87         job.setOutputValueClass(FlowBean.class);
 88
 89
 90         //设置reduce的任务并发数,应该跟分组的数量保持一致
 91         job.setNumReduceTasks(7);
 92
 93         //指定要处理的输入数据存放路径
 94         //FileInputFormat是所有以文件作为数据源的InputFormat实现的基类,
 95         //FileInputFormat保存作为job输入的所有文件,并实现了对输入文件计算splits的方法。
 96         //至于获得记录的方法是有不同的子类——TextInputFormat进行实现的。
 97         FileInputFormat.setInputPaths(job, new Path(args[0]));
 98
 99         //指定处理结果的输出数据存放路径
100         FileOutputFormat.setOutputPath(job, new Path(args[1]));
101
102         //将job提交给集群运行
103         //job.waitForCompletion(true);
104         //正常执行成功返回0,否则返回1
105         System.exit(job.waitForCompletion(true) ? 0 : 1);;
106
107     }
108
109 }

3:从key中拿到手机号,查询手机归属地字典,不同的省份返回不同的组号:

  3.1:Partitioner是partitioner的基类,如果需要定制partitioner也需要继承该类。

  3.2:HashPartitioner是mapreduce的默认partitioner。计算方法是 which reducer=(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks,得到当前的目的reducer。

 1 package com.areapartition;
 2
 3 import java.util.HashMap;
 4
 5 import org.apache.hadoop.mapreduce.Partitioner;
 6
 7 public class AreaPartitioner<KEY,VALUE> extends Partitioner<KEY, VALUE>{
 8
 9     private static HashMap<String, Integer> areaMap = new HashMap<String,Integer>();
10
11     static{
12         areaMap.put("135", 0);
13         areaMap.put("136", 1);
14         areaMap.put("137", 2);
15         areaMap.put("138", 3);
16         areaMap.put("139", 4);
17         areaMap.put("841", 5);
18     }
19
20     @Override
21     public int getPartition(KEY key, VALUE value, int numPartitions) {
22         //从key中拿到手机号,查询手机归属地字典,不同的省份返回不同的组号
23         Integer areaCoder = areaMap.get(key.toString().subSequence(0, 3)) == null ? 6 : areaMap.get(key.toString().subSequence(0, 3));
24
25
26         return areaCoder;
27     }
28
29
30 }

4:将打好的jar包上传到虚拟机上面:

然后启动搭建的集群start-dfs.sh,start-yarn.sh:

然后操作如下所示:

  1 [[email protected] hadoop]# hadoop jar flowarea.jar com.areapartition.FlowSumArea /flow/data /flow/areaoutput4
  2 17/09/25 15:36:38 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.0.55:8032
  3 17/09/25 15:36:38 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
  4 17/09/25 15:36:38 INFO input.FileInputFormat: Total input paths to process : 1
  5 17/09/25 15:36:38 INFO mapreduce.JobSubmitter: number of splits:1
  6 17/09/25 15:36:38 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1506324201206_0004
  7 17/09/25 15:36:38 INFO impl.YarnClientImpl: Submitted application application_1506324201206_0004
  8 17/09/25 15:36:38 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1506324201206_0004/
  9 17/09/25 15:36:38 INFO mapreduce.Job: Running job: job_1506324201206_0004
 10 17/09/25 15:36:43 INFO mapreduce.Job: Job job_1506324201206_0004 running in uber mode : false
 11 17/09/25 15:36:43 INFO mapreduce.Job:  map 0% reduce 0%
 12 17/09/25 15:36:48 INFO mapreduce.Job:  map 100% reduce 0%
 13 17/09/25 15:36:56 INFO mapreduce.Job:  map 100% reduce 14%
 14 17/09/25 15:37:04 INFO mapreduce.Job:  map 100% reduce 29%
 15 17/09/25 15:37:08 INFO mapreduce.Job:  map 100% reduce 43%
 16 17/09/25 15:37:10 INFO mapreduce.Job:  map 100% reduce 71%
 17 17/09/25 15:37:11 INFO mapreduce.Job:  map 100% reduce 86%
 18 17/09/25 15:37:12 INFO mapreduce.Job:  map 100% reduce 100%
 19 17/09/25 15:37:12 INFO mapreduce.Job: Job job_1506324201206_0004 completed successfully
 20 17/09/25 15:37:12 INFO mapreduce.Job: Counters: 49
 21     File System Counters
 22         FILE: Number of bytes read=1158
 23         FILE: Number of bytes written=746635
 24         FILE: Number of read operations=0
 25         FILE: Number of large read operations=0
 26         FILE: Number of write operations=0
 27         HDFS: Number of bytes read=2322
 28         HDFS: Number of bytes written=526
 29         HDFS: Number of read operations=24
 30         HDFS: Number of large read operations=0
 31         HDFS: Number of write operations=14
 32     Job Counters
 33         Launched map tasks=1
 34         Launched reduce tasks=7
 35         Data-local map tasks=1
 36         Total time spent by all maps in occupied slots (ms)=2781
 37         Total time spent by all reduces in occupied slots (ms)=98540
 38         Total time spent by all map tasks (ms)=2781
 39         Total time spent by all reduce tasks (ms)=98540
 40         Total vcore-seconds taken by all map tasks=2781
 41         Total vcore-seconds taken by all reduce tasks=98540
 42         Total megabyte-seconds taken by all map tasks=2847744
 43         Total megabyte-seconds taken by all reduce tasks=100904960
 44     Map-Reduce Framework
 45         Map input records=22
 46         Map output records=22
 47         Map output bytes=1072
 48         Map output materialized bytes=1158
 49         Input split bytes=93
 50         Combine input records=0
 51         Combine output records=0
 52         Reduce input groups=21
 53         Reduce shuffle bytes=1158
 54         Reduce input records=22
 55         Reduce output records=21
 56         Spilled Records=44
 57         Shuffled Maps =7
 58         Failed Shuffles=0
 59         Merged Map outputs=7
 60         GC time elapsed (ms)=1751
 61         CPU time spent (ms)=4130
 62         Physical memory (bytes) snapshot=570224640
 63         Virtual memory (bytes) snapshot=2914865152
 64         Total committed heap usage (bytes)=234950656
 65     Shuffle Errors
 66         BAD_ID=0
 67         CONNECTION=0
 68         IO_ERROR=0
 69         WRONG_LENGTH=0
 70         WRONG_MAP=0
 71         WRONG_REDUCE=0
 72     File Input Format Counters
 73         Bytes Read=2229
 74     File Output Format Counters
 75         Bytes Written=526
 76 [[email protected] hadoop]# hadoop fs -ls /flow/
 77 Found 10 items
 78 drwxr-xr-x   - root supergroup          0 2017-09-25 15:25 /flow/areaoutput
 79 drwxr-xr-x   - root supergroup          0 2017-09-25 15:34 /flow/areaoutput2
 80 drwxr-xr-x   - root supergroup          0 2017-09-25 15:35 /flow/areaoutput3
 81 drwxr-xr-x   - root supergroup          0 2017-09-25 15:37 /flow/areaoutput4
 82 -rw-r--r--   1 root supergroup       2229 2017-09-20 10:00 /flow/data
 83 drwxr-xr-x   - root supergroup          0 2017-09-20 09:35 /flow/output
 84 drwxr-xr-x   - root supergroup          0 2017-09-20 09:47 /flow/output2
 85 drwxr-xr-x   - root supergroup          0 2017-09-20 10:01 /flow/output3
 86 drwxr-xr-x   - root supergroup          0 2017-09-20 10:21 /flow/output4
 87 drwxr-xr-x   - root supergroup          0 2017-09-21 19:32 /flow/sortoutput
 88 [[email protected] hadoop]# hadoop fs -ls /flow/areaoutput4
 89 Found 8 items
 90 -rw-r--r--   1 root supergroup          0 2017-09-25 15:37 /flow/areaoutput4/_SUCCESS
 91 -rw-r--r--   1 root supergroup         77 2017-09-25 15:36 /flow/areaoutput4/part-r-00000
 92 -rw-r--r--   1 root supergroup         49 2017-09-25 15:37 /flow/areaoutput4/part-r-00001
 93 -rw-r--r--   1 root supergroup        104 2017-09-25 15:37 /flow/areaoutput4/part-r-00002
 94 -rw-r--r--   1 root supergroup         22 2017-09-25 15:37 /flow/areaoutput4/part-r-00003
 95 -rw-r--r--   1 root supergroup        102 2017-09-25 15:37 /flow/areaoutput4/part-r-00004
 96 -rw-r--r--   1 root supergroup         24 2017-09-25 15:37 /flow/areaoutput4/part-r-00005
 97 -rw-r--r--   1 root supergroup        148 2017-09-25 15:37 /flow/areaoutput4/part-r-00006
 98 [[email protected] hadoop]# hadoop fs -cat /flow/areaoutput4/part-r-00000
 99 13502468823    102    7335    7437
100 13560436666    954    200    1154
101 13560439658    5892    400    6292
102 [[email protected] hadoop]# hadoop fs -cat /flow/areaoutput4/part-r-00001
103 13602846565    12    1938    1950
104 13660577991    9    6960    6969
105 [[email protected] hadoop]# hadoop fs -cat /flow/areaoutput4/part-r-00002
106 13719199419    0    200    200
107 13726230503    2481    24681    27162
108 13726238888    2481    24681    27162
109 13760778710    120    200    320
110 [[email protected] hadoop]# hadoop fs -cat /flow/areaoutput4/part-r-00003
111 13826544101    0    200    200
112 [[email protected] hadoop]# hadoop fs -cat /flow/areaoutput4/part-r-00004
113 13922314466    3008    3720    6728
114 13925057413    63    11058    11121
115 13926251106    0    200    200
116 13926435656    1512    200    1712
117 [[email protected] hadoop]# hadoop fs -cat /flow/areaoutput4/part-r-00005
118 84138413    4116    1432    5548
119 [[email protected] hadoop]# hadoop fs -cat /flow/areaoutput4/part-r-00006
120 13480253104    180    200    380
121 15013685858    27    3659    3686
122 15920133257    20    3156    3176
123 15989002119    3    1938    1941
124 18211575961    12    1527    1539
125 18320173382    18    9531    9549


5:复制多份测试数据操作如下,测试map的多线程执行:

  5.1:map task 的并发数是切片的数量决定的,有多少个切片,就启动多少个map task。

  5.2:切片是一个逻辑的概念,指的就是文件中数据的偏移量的范围。

  5.3:切片的具体大小应该根据所处理的文件的大小来调整。

[[email protected] hadoop]# hadoop fs -mkdir /flow/data/
[[email protected] hadoop]# hadoop fs -put HTTP_20130313143750.dat /flow/data/
[[email protected] hadoop]# hadoop fs -cp /flow/data/HTTP_20130313143750.dat /flow/data/HTTP_20130313143750.dat.2
[[email protected] hadoop]# hadoop fs -cp /flow/data/HTTP_20130313143750.dat /flow/data/HTTP_20130313143750.dat.3
[[email protected] hadoop]# hadoop fs -cp /flow/data/HTTP_20130313143750.dat /flow/data/HTTP_20130313143750.dat.4
[[email protected] hadoop]# hadoop fs -ls /flow/data/
Found 4 items
-rw-r--r--   1 root supergroup       2229 2017-09-25 16:36 /flow/data/HTTP_20130313143750.dat
-rw-r--r--   1 root supergroup       2229 2017-09-25 16:36 /flow/data/HTTP_20130313143750.dat.2
-rw-r--r--   1 root supergroup       2229 2017-09-25 16:37 /flow/data/HTTP_20130313143750.dat.3
-rw-r--r--   1 root supergroup       2229 2017-09-25 16:37 /flow/data/HTTP_20130313143750.dat.4
[[email protected] hadoop]# 

6:Combiners编程

  6.1:每一个map可能会产生大量的输出,combiner的作用就是在map端对输出先做一次合并,以减少传输到reducer的数据量。

  6.2:combiner最基本是实现本地key的归并,combiner具有类似本地的reduce功能。

  6.3: 如果不用combiner,那么,所有的结果都是reduce完成,效率会相对低下。使用combiner,先完成的map会在本地聚合,提升速度。

  6.4:注意:Combiner的输出是Reducer的输入,如果Combiner是可插拔的,添加Combiner绝不能改变最终的计算结果。所以Combiner只应该用于那种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。比如累加,最大值等。

7:shuffle机制:

   7.1:每个map有一个环形内存缓冲区,用于存储任务的输出。默认大小100MB(io.sort.mb属性),一旦达到阀值0.8(io.sort.spill.percent),一个后台线程把内容写到(spill)磁盘的指定目录(mapred.local.dir)下的新建的一个溢出写文件。

   7.2:写磁盘前,要partition,sort。如果有combiner,combine排序后数据。

   7.3:等最后记录写完,合并全部溢出写文件为一个分区且排序的文件。

   7.4:Reducer通过Http方式得到输出文件的分区。

   7.5:TaskTracker为分区文件运行Reduce任务。复制阶段把Map输出复制到Reducer的内存或磁盘。一个Map任务完成,Reduce就开始复制输出。

   7.6:排序阶段合并map输出。然后走Reduce阶段。

时间: 2024-10-10 18:13:40

一脸懵逼学习Hadoop中的MapReduce程序中自定义分组的实现的相关文章

一脸懵逼学习hadoop之HDFS的java客户端编写

1:eclipse创建一个项目,然后导入对应的jar包: 鼠标右击项目,点击properties或者alt+enter快捷键--->java build path--->libraries--->add library--->user library--->next--->user libraries--->new--->hdfsLib(根据自己的需要填写)--->add external jars(添加自己的需求包): 2:开始添加自己的需求包,路径

hadoop-初学者写map-reduce程序中容易出现的问题 3

1.写hadoop的map-reduce程序之前所必须知道的基础知识: 1)hadoop map-reduce的自带的数据类型: Hadoop提供了如下内容的数据类型,这些数据类型都实现了WritableComparable接口,以便用这些类型定义的数据可以被序列化进行网络传输和文件存储,以及进行大小比较.(如果是自定义的key,value的数据类型,必须也要写其大小比较的方法) BooleanWritable:标准布尔型数值 ByteWritable:单字节数值 DoubleWritable:

从Hadoop框架与MapReduce模式中谈海量数据处理(含淘宝技术架构)

从hadoop框架与MapReduce模式中谈海量数据处理 前言 几周前,当我最初听到,以致后来初次接触Hadoop与MapReduce这两个东西,我便稍显兴奋,认为它们非常是神奇,而神奇的东西常能勾起我的兴趣,在看过介绍它们的文章或论文之后,认为Hadoop是一项富有趣味和挑战性的技术,且它还牵扯到了一个我更加感兴趣的话题:海量数据处理. 由此,近期凡是空暇时,便在看"Hadoop","MapReduce""海量数据处理"这方面的论文.但在看论

Hadoop学习之路(5)Mapreduce程序完成wordcount

程序使用的测试文本数据: Dear River Dear River Bear Spark Car Dear Car Bear Car Dear Car River Car Spark Spark Dear Spark 1编写主要类 (1)Maper类 首先是自定义的Maper类代码 public class WordCountMap extends Mapper<LongWritable, Text, Text, IntWritable> { public void map(LongWrit

使用Eclipse运行Hadoop 2.x MapReduce程序常见问题

1. 当我们编写好MapReduce程序,点击Run on Hadoop的时候,Eclipse控制台输出如下内容: 这个信息告诉我们没有找到log4j.properties文件.如果没有这个文件,程序运行出错的时候,就没有打印日志,因此我们会很难调试. 解决方法:复制$HADOOP_HOME/etc/hadoop/目录下的log4j.properties文件到MapReduce项目 src文件夹下. 2.当执行MapReduce程序的时候,Eclipse可能会报告堆益处的错误. 此时,MapRe

在Eclipse中开发MapReduce程序

一.Eclipse的安装与设置 1.在Eclipse官网上下载eclipse-jee-oxygen-3a-linux-gtk-x86_64.tar.gz文件并将其拷贝到/home/jun/Resources下,然后再将文件拷贝到/home/jun下并解压. [[email protected] ~]$ cp /home/jun/Resources/eclipse-jee-oxygen-3a-linux-gtk-x86_64.tar.gz /home/jun/ [[email protected]

从hadoop框架与MapReduce模式中谈海量数据处理

前言 几周前,当我最初听到,以致后来初次接触Hadoop与MapReduce这两个东西,我便稍显兴奋,觉得它们很是神秘,而神秘的东西常能勾起我的兴趣,在看过介绍它们的文章或论文之后,觉得Hadoop是一项富有趣味和挑战性的技术,且它还牵扯到了一个我更加感兴趣的话题:海量数据处理. 由此,最近凡是空闲时,便在看“Hadoop”,“MapReduce”“海量数据处理”这方面的论文.但在看论文的过程中,总觉得那些论文都是浅尝辄止,常常看的很不过瘾,总是一个东西刚要讲到紧要处,它便结束了,让我好生“愤懑

海量数据处理之从Hadoop框架与MapReduce模式中谈海量数据处理(淘宝技术架构)

几周前,当我最初听到,以致后来初次接触Hadoop与MapReduce这两个东西,我便稍显兴奋,觉得它们很是神秘,而神秘的东西常能勾起我的兴趣,在看过介绍它们的文章或论文之后,觉得Hadoop是一项富有趣味和挑战性的技术,且它还牵扯到了一个我更加感兴趣的话题:海量数据处理. 由此,最近凡是空闲时,便在看"Hadoop","MapReduce""海量数据处理"这方面的论文.但在看论文的过程中,总觉得那些论文都是浅尝辄止,常常看的很不过瘾,总是一个东

一脸懵逼学习KafKa集群的安装搭建--(一种高吞吐量的分布式发布订阅消息系统)

1:KafKa的官方网址:http://kafka.apache.org/ 开发流程图,如: 2:KafKa的基础知识: 2.1:kafka是一个分布式的消息缓存系统2.2:kafka集群中的服务器都叫做broker2.3:kafka有两类客户端,一类叫producer(消息生产者),一类叫做consumer(消息消费者),客户端和broker服务器之间采用tcp协议连接2.4:kafka中不同业务系统的消息可以通过topic进行区分,而且每一个消息topic都会被分区,以分担消息读写的负载2.