Hadoop日记Day14---MapReduce源代码回顾总结

一、回顾单词统计源码

 1 package counter;
 2
 3 import java.net.URI;
 4
 5 import org.apache.hadoop.conf.Configuration;
 6 import org.apache.hadoop.fs.FileSystem;
 7 import org.apache.hadoop.fs.Path;
 8 import org.apache.hadoop.io.LongWritable;
 9 import org.apache.hadoop.io.Text;
10 import org.apache.hadoop.mapreduce.Counter;
11 import org.apache.hadoop.mapreduce.Job;
12 import org.apache.hadoop.mapreduce.Mapper;
13 import org.apache.hadoop.mapreduce.Reducer;
14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
15 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
17 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
18 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
19
20 public class WordCountApp {
21     static final String INPUT_PATH = "hdfs://hadoop:9000/hello";
22     static final String OUT_PATH = "hdfs://hadoop:9000/out";
23
24     public static void main(String[] args) throws Exception {
25
26         Configuration conf = new Configuration();
27
28         final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
29         final Path outPath = new Path(OUT_PATH);
30
31         if(fileSystem.exists(outPath)){
32             fileSystem.delete(outPath, true);
33         }
34
35         final Job job = new Job(conf , WordCountApp.class.getSimpleName());
36
37         FileInputFormat.setInputPaths(job, INPUT_PATH);//1.1指定读取的文件位于哪里
38
39         job.setInputFormatClass(TextInputFormat.class);//指定如何对输入文件进行格式化,把输入文件每一行解析成键值对
40
41         job.setMapperClass(MyMapper.class);//1.2 指定自定义的map类
42         job.setMapOutputKeyClass(Text.class);//map输出的<k,v>类型。如果<k3,v3>的类型与<k2,v2>类型一致,则可以省略
43         job.setMapOutputValueClass(LongWritable.class);
44
45         job.setPartitionerClass(HashPartitioner.class);//1.3 分区
46         job.setNumReduceTasks(1);//有一个reduce任务运行
47
48         job.setReducerClass(MyReducer.class);//2.2 指定自定义reduce类
49         job.setOutputKeyClass(Text.class);//指定reduce的输出类型
50         job.setOutputValueClass(LongWritable.class);
51
52         FileOutputFormat.setOutputPath(job, outPath);//2.3 指定写出到哪里
53
54         job.setOutputFormatClass(TextOutputFormat.class);//指定输出文件的格式化类
55
56         job.waitForCompletion(true);//把job提交给JobTracker运行
57     }
58
59     /**
60      * KEYIN    即k1        表示行的偏移量
61      * VALUEIN    即v1        表示行文本内容
62      * KEYOUT    即k2        表示行中出现的单词
63      * VALUEOUT    即v2        表示行中出现的单词的次数,固定值1
64      */
65     static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
66         protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException ,InterruptedException {
67         //    final Counter helloCounter = context.getCounter("Sensitive Words", "hello");
68
69             final String line = v1.toString();
70         /*    if(line.contains("hello")){
71                 //记录敏感词出现在一行中
72                 helloCounter.increment(1L);
73             }*/
74             final String[] splited = line.split(" ");
75             for (String word : splited) {
76                 context.write(new Text(word), new LongWritable(1));
77             }
78         };
79     }
80
81     /**
82      * KEYIN    即k2        表示行中出现的单词
83      * VALUEIN    即v2        表示行中出现的单词的次数
84      * KEYOUT    即k3        表示文本中出现的不同单词
85      * VALUEOUT    即v3        表示文本中出现的不同单词的总次数
86      *
87      */
88     static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
89         protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s, Context ctx) throws java.io.IOException ,InterruptedException {
90             long times = 0L;
91             for (LongWritable count : v2s) {
92                 times += count.get();
93             }
94             ctx.write(k2, new LongWritable(times));
95         };
96     }
97
98 }

代码1.1

二、原理与代码解析

2.1 MapReduce的任务与原理

2.1.1 MapReduce的工作原理

  MapReduce的工作原理如下图2.1所示。

图 2.1

  在图中我们已看出,关于File有两种划分,一个是split分片一个是block,注意分片只是逻辑划分,并不是像划分block那样,将文件真是的划分为多个部分,他只是逻辑上的的划分,可以说是只是读取时候按分片来读取。关于分片的大小默认为块大小,为什么要这样呢?那因为MapReduce作业 处理的文件是存放在DataNode上的,而且文件在DataNode上是按block存放的,而不同的block可是存放在不同的DataNode上的,如果分片大小大于block块大小,那么说明一个块满足不 了该分片,那么就需要再读取一个block块,这样当这两个block块位于不同的DataNode上 时,就要通过网络访问另一个节点,这样就可能造成网络延迟影响Mapreduce的执行效率,所以一般分片大小会默认为block块大小。

  在分析一下该图,不难看出,每一个split都分配了一个MappperTask,每个MapperTask又有三个箭头,有三个不同的走向表示分了三个区,那就有三个ReducerTask,而最终的结果会分不同的痛的部分存放在DataNode目录中。我们也可以对比下面这张图来对比理解MapReduce的工作原理,如图2.2所示。

图 2.2

2.1.2 map()和reduce的任务

<1>map任务处理
  1) 读取输入文件内容,解析成key、value对。对输入文件的每一行,解析成key、value对。每一个键值对调用一次map函数。
  2) 写自己的逻辑,对输入的key、value处理,转换成新的key、value输出。
  3) 对输出的key、value进行分区。
  4) 对不同分区的数据,按照key进行排序、分组。相同key的value放到一个集合中。
<2>reduce任务处理
  1) 对多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点。
  2) 对多个map任务的输出进行合并、排序。写reduce函数自己的逻辑,对输入的key、value处理,转换成新的key、value输出。
  3) 把reduce的输出保存到文件中。

2.2源码任务的对比分析

  关于任务和源码的对应分析主要是针对map的第一项和第二项任务。第一项任务是:读取输入文件内容,解析成key、value对。对输入文件的每一行,解析成key、value对。每一个键值对调用一次map函数。第二项任务是:写自己的逻辑,对输入的key、value处理,转换成新的key、value输出。

2.2.1 第一项任务

  从上面代码1.1中,可以看出这项任务是由下面这段代码来完成,如代码2.1所示。

1 FileInputFormat.setInputPaths(job, INPUT_PATH);//1.1指定读取的文件位于哪里
2 job.setInputFormatClass(TextInputFormat.class);//指定如何对输入文件进行格式化,把输入文件每一行解析成键值对       

代码 2.1

  分析这段代码,可以知道,由代码中的TextInputFromat这个类主要是来完成分割的任务的,下面先来看一下该类的树结构,如下图2.1所示。

图 2.2

  从图中可知,TextInputFormat的继承的关系为,TextInputFormat--->FileInputformat--->InputFormat,那么看进入TextInputFormat类,看一下该类的注释,和其中的方法,如下代码2.2,2.3,注释中的@link表示后面跟的是一个连接,可以点击查看。

 1 * <code>InputFormat</code> describes the input-specification for a          InputFormat用来描述Map-Reduce的输入规格
 2  * Map-Reduce job.
 3  *
 4  * <p>The Map-Reduce framework relies on the <code>InputFormat</code> of the  Map-reduce框架依赖于一个job的InputFormat
 5  * job to:<p>
 6  * <ol>
 7  *   <li>
 8  *   Validate the input-specification of the job.                  验证job的输入规格
 9  *   <li>
10  *   Split-up the input file(s) into logical {@link InputSplit}s, each of    把输入文件拆分成逻辑Inputsplit,每一个
11  *   which is then assigned to an individual {@link Mapper}.           InputSplit都会被分配到一个独立的Mapper
12  *   </li>
13  *   <li>
14  *   Provide the {@link RecordReader} implementation to be used to glean     提供实现类RcordReader,用于为Mapper任务,从逻辑InputSplit
15  *   input records from the logical <code>InputSplit</code> for processing by  收集输入记录。
16  *   the {@link Mapper}.
17  *   </li>
18  * </ol>

代码 2.2

 1 /**
 2    * Logically split the set of input files for the job.                      为job逻辑切分输入文件
 3    * @param context job configuration.
 4    * @return an array of {@link InputSplit}s for the job.
 5    */
 6  public abstract
 7     List<InputSplit> getSplits(JobContext context
 8                                ) throws IOException, InterruptedException;
 9
10 /**
11    * Create a record reader for a given split. The framework will call    为分片创建一个记录读取器
12    * {@link RecordReader#initialize(InputSplit, TaskAttemptContext)} before
13    * the split is used.
14    * @param split the split to be read
15    * @param context the information about the task
16    * @return a new record reader
17    * @throws IOException
18    * @throws InterruptedException
19    */
20   public abstract
21     RecordReader<K,V> createRecordReader(InputSplit split,
22                                          TaskAttemptContext context
23                                         ) throws IOException,
24                                                  InterruptedException;

代码 2.3



  从上面的代码中可以知道InputFormat是一个抽象类,两面有两个抽象方法getSplit和createRecordReader,由于抽象类中只有方法的声明,并没有方法的实现,所以要分析该类的实现类FileInputFormat,在该实现类中,实现了他的父类InputFormat的getSplits()方法,查看该类的源码及注释如下代码2.4所示。

 1  /**
 2    * Generate the list of files and make them into FileSplits.                 生成一个文件列表并创建FileSplits
 3    */
 4   public List<InputSplit> getSplits(JobContext job
 5                                     ) throws IOException {
 6     long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); //该值等于1
 7     long maxSize = getMaxSplitSize(job); //该值等于263-1
 8     // generate splits
 9     List<InputSplit> splits = new ArrayList<InputSplit>();
10     List<FileStatus> files = listStatus(job);  //读取文件夹下的所有文件
11     for (FileStatus file: files) {      //遍历文件夹下的所有文件
12       Path path = file.getPath();    //获取文件路径
13       FileSystem fs = path.getFileSystem(job.getConfiguration());     //根据该路径获取文件系统
14       long length = file.getLen();    //文件长度
15       BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);    //块位置
16       if ((length != 0) && isSplitable(job, path)) {     //判断文件数量是否不为空且文件允许被拆分
17         long blockSize = file.getBlockSize();
18         long splitSize = computeSplitSize(blockSize, minSize, maxSize);  //计算分片大小,该分片大小和blockSize, minSize, maxSize有关系,默认为block块大小
19         long bytesRemaining = length;  //文件初始长度
20         while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {    //分片
21           int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
22           splits.add(new FileSplit(path, length-bytesRemaining, splitSize,
23                                    blkLocations[blkIndex].getHosts()));
24           bytesRemaining -= splitSize;
25         }
26
27         if (bytesRemaining != 0) {
28           splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining,
29                      blkLocations[blkLocations.length-1].getHosts()));
30         }
31       } else if (length != 0) {    //如果该文件不能够被切分就,就直接生成分片
32         splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));
33       } else {
34         //Create empty hosts array for zero length files
35         splits.add(new FileSplit(path, 0, length, new String[0]));
36       }
37     }
38     // Save the number of input files in the job-conf
39     job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
40     LOG.debug("Total # of splits: " + splits.size());
41     return splits;
42   }

代码 2.4

  注意,分片FileinputSplit只是逻辑划分,并不是像划分block那样,将文件真是的划分为多个部分,他只是逻辑上的的划分,可以说是只是读取时候按分片来读取,分片InputSplit大小默认为块大小,为什么要这样呢?那因为MapReduce作业 处理的文件是存放在datanode上的,而且文件在DataNode上是按block存放的,如果分片大小大于block块大小,那么说明一个块满足不了该分片需要再读取一个block块,而不同的block可是存放在不同的DataNode上的,这样当这两个block块位于不同的DataNode上时,就要通过网络访问另一个节点,这样就可能造成网络延迟影响Mapre-duce的执行效率,所以一般分片大小会默认为block块大小。



  我们知道FileInputFormat实现了,inputFormat的 getSplits()的抽象方法,那么另一个抽象方法createRecordReader由谁来实现呢,我们看一下该类的两个实现类FileIn putFormat和TextInputFormat这两个实现类的源码,看一发现createRecordReader是在TextInputFormat这个实现类中实现的,我们看一下该类的源码如下代码2.5所示。

 1 /** An {@link InputFormat} for plain text files.  Files are broken into lines.      文件被解析成行
 2  * Either linefeed or carriage-return are used to signal end of line.  Keys are    无论是换行符还是回车符都表示一行结束
 3  * the position in the file, and values are the line of text.. */           键是该行在文件中的位置,值为该行的内容
 4 public class TextInputFormat extends FileInputFormat<LongWritable, Text> {
 5
 6   @Override
 7   public RecordReader<LongWritable, Text>
 8     createRecordReader(InputSplit split,
 9                        TaskAttemptContext context) {
10     return new LineRecordReader();
11   }
12
13   @Override
14   protected boolean isSplitable(JobContext context, Path file) {
15     CompressionCodec codec =
16       new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
17     if (null == codec) {
18       return true;
19     }
20     return codec instanceof SplittableCompressionCodec;
21   }
22
23 }

代码2.5

  我们再分析一下createRecordReader()方法的返回值,他的返回值类型为RecordReader,返回值是new LineRecordReader (),而他继承了RecordReader,我们先看一下RecordReader源码如代码2.6所示。

 1 package org.apache.hadoop.mapreduce;
 2
 3 import java.io.Closeable;
 4 import java.io.IOException;
 5
 6 /**
 7  * The record reader breaks the data into key/value pairs for input to the       将数据解析成Mapper能够处理的键值对
 8  * {@link Mapper}.
 9  * @param <KEYIN>
10  * @param <VALUEIN>
11  */
12 public abstract class RecordReader<KEYIN, VALUEIN> implements Closeable {
13
14   /**
15    * Called once at initialization.
16    * @param split the split that defines the range of records to read
17    * @param context the information about the task
18    * @throws IOException
19    * @throws InterruptedException
20    */
21   public abstract void initialize(InputSplit split,
22                                   TaskAttemptContext context
23                                   ) throws IOException, InterruptedException;
24
25   /**
26    * Read the next key, value pair.
27    * @return true if a key/value pair was read
28    * @throws IOException
29    * @throws InterruptedException
30    */
31   public abstract boolean nextKeyValue() throws IOException, InterruptedException;
32
33   public abstract KEYIN getCurrentKey() throws IOException, InterruptedException;
34   public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException;
35   public abstract float getProgress() throws IOException, InterruptedException;
36   public abstract void close() throws IOException;
37 }

代码 2.6

  从上面的代码中我们可以发现,RecordReader类是一个抽象类,其中的抽象方法initialize(),主要是用来将内容解析成键值对的,nextKeyValue(), getCurrentKey() ,getCurrentValue() 主要是用来获取键值对的内容的,他们的使用方法如下面代码2.7所示。

1 while(xxx.nextKeyValue()){
2         key=xxx.getCurrenKey();
3         value=xxx.getCurrentValue();
4     }

代码 2.7

从RecordReader的类中回到 LineRecordReader类我们可以看到,该类对RecordReader类的三个抽象方法nextKeyValue(), getCurrentKey(),getCurrentValue()进行了实现,LineRecordReader类源码如代码2.8所示。

 1 public boolean nextKeyValue() throws IOException {
 2     if (key == null) {
 3       key = new LongWritable();
 4     }
 5     key.set(pos);  //第一次调用时pos为零,key也就为零,key表示该行的偏移量
 6     if (value == null) {
 7       value = new Text();
 8     }
 9     int newSize = 0;  //表示当前读取的字节数
10     // We always read one extra line, which lies outside the upper
11     // split limit i.e. (end - 1)
12     while (getFilePosition() <= end) {  //读取一行内容给value
13       newSize = in.readLine(value, maxLineLength,
14           Math.max(maxBytesToConsume(pos), maxLineLength));
15       if (newSize == 0) {
16         break;
17       }
18       pos += newSize;  //读取一行重置pos
19       if (newSize < maxLineLength) {
20         break;
21       }
22
23       // line too long. try again
24       LOG.info("Skipped line of size " + newSize + " at pos " +
25                (pos - newSize));
26     }
27     if (newSize == 0) {
28       key = null;
29       value = null;
30       return false;
31     } else {
32       return true;
33     }
34   }

代码 2.8

  通过以上对TextInputFormat的一系列分析,我们可以知道文件是如何分片的,分片是如何被解析成键值对的。那么这些键值对是如何被提交到Mapper上的呢?我们一步步分析,首先我们知道,分片是被createRecordReader()解析成键值对的,他的返回值是new LineRecordReader (),代表被解析成的键值对,那么我们就分析一下 LineRecordRe ader和Mapper的关系。好那么我们就看一下,Mapper的源码,如代码2.9所示。

 1 public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
 2
 3   public class Context
 4     extends MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
 5     public Context(Configuration conf, TaskAttemptID taskid,
 6                    RecordReader<KEYIN,VALUEIN> reader,
 7                    RecordWriter<KEYOUT,VALUEOUT> writer,
 8                    OutputCommitter committer,
 9                    StatusReporter reporter,
10                    InputSplit split) throws IOException, InterruptedException {
11       super(conf, taskid, reader, writer, committer, reporter, split);
12     }
13   }
14
15   /**
16    * Called once at the beginning of the task.
17    */
18   protected void setup(Context context
19                        ) throws IOException, InterruptedException {
20     // NOTHING
21   }
22
23   /**
24    * Called once for each key/value pair in the input split. Most applications
25    * should override this, but the default is the identity function.
26    */
27   @SuppressWarnings("unchecked")
28   protected void map(KEYIN key, VALUEIN value,
29                      Context context) throws IOException, InterruptedException {
30     context.write((KEYOUT) key, (VALUEOUT) value);
31   }
32
33   /**
34    * Called once at the end of the task.
35    */
36   protected void cleanup(Context context
37                          ) throws IOException, InterruptedException {
38     // NOTHING
39   }
40
41   /**
42    * Expert users can override this method for more complete control over the
43    * execution of the Mapper.
44    * @param context
45    * @throws IOException
46    */
47   public void run(Context context) throws IOException, InterruptedException {
48     setup(context);
49     while (context.nextKeyValue()) {
50       map(context.getCurrentKey(), context.getCurrentValue(), context);
51     }
52     cleanup(context);
53   }
54 }

代码 2.9

  我们分析一下这段代码,其中的getCurrentKey(),getCurrentValue(),nextKeyValue(),在RecordReader也见过,那么是不是他的呢?我们点击getCurrentKey(),然后进入到,MapContext类,看一下他的一段代码如代码2.10所示。

 1 public class MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
 2   extends TaskInputOutputContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
 3   private RecordReader<KEYIN,VALUEIN> reader;
 4   private InputSplit split;
 5
 6   public MapContext(Configuration conf, TaskAttemptID taskid,
 7                     RecordReader<KEYIN,VALUEIN> reader,
 8                     RecordWriter<KEYOUT,VALUEOUT> writer,
 9                     OutputCommitter committer,
10                     StatusReporter reporter,
11                     InputSplit split) {
12     super(conf, taskid, writer, committer, reporter);
13     this.reader = reader;
14     this.split = split;
15   }
16
17   /**
18    * Get the input split for this map.
19    */
20   public InputSplit getInputSplit() {
21     return split;
22   }
23
24   @Override
25   public KEYIN getCurrentKey() throws IOException, InterruptedException {
26     return reader.getCurrentKey();
27   }
28
29   @Override
30   public VALUEIN getCurrentValue() throws IOException, InterruptedException {
31     return reader.getCurrentValue();
32   }
33
34   @Override
35   public boolean nextKeyValue() throws IOException, InterruptedException {
36     return reader.nextKeyValue();
37   }
38
39 }
40      

代码 2.10

  我们从上面的代码,发现Reader的类型就是RecordReader类型,我们又知道他的子类就是,LineRecordReader我们这样就知道了他与Mapper之间的关系了。那么我们也就清楚了计算机在Mapper第一阶段所做的事如图2.4所示。

图 2.4



  与TextInputFormat相对应的是OutputFormat,他的继承关系结构如图2.3所示,关于对他们的分析,可依据前面对InputFormat的分析方法进行分析在这里不再分析。

图 2.3

  

2.2.2 第二项任务

  这项任务主要是由我们自己来做,通过对map()函数进行覆盖来实现我们的业务逻辑,这也是我们在MapReduce编程过程中的主要工作量。在单词统计的项目中,在未经map()函数处理时,初始键值对<K1,V1>中,键K1表示存储位置,V2表示某一行的内容。由于我们要统计单词的个数,为了便于实现我们的目的,所以我们的中间结果<K2,V2>,K2表示单词,V2用特定的值1来表示。然后在经过reduce函数处理,得到我们的最终结果。

时间: 2024-09-28 16:42:46

Hadoop日记Day14---MapReduce源代码回顾总结的相关文章

Hadoop日记系列目录

下面是Hadoop日记系列的目录,由于目前时间不是很充裕,以后的更新的速度会变慢,会按照一星期发布一期的原则进行,希望能和大家相互学习.交流. 目录安排 1>  Hadoop日记Day1---Hadoop介绍 2>  Hadoop日记Day2---虚拟机中搭建Linux 3>  Hadoop日记Day3---Hadoop的伪分布式安装 4>  Hadoop日记Day4---去除HADOOP_HOME is deprecated 5>  Hadoop日记Day5---HDFS介

每天收获一点点------Hadoop之初始MapReduce

一.神马是高大上的MapReduce MapReduce是Google的一项重要技术,它首先是一个编程模型,用以进行大数据量的计算.对于大数据量的计算,通常采用的处理手法就是并行计算.但对许多开发者来说,自己完完全全实现一个并行计算程序难度太大,而MapReduce就是一种简化并行计算的编程模型,它使得那些没有多有多少并行计算经验的开发人员也可以开发并行应用程序.这也就是MapReduce的价值所在,通过简化编程模型,降低了开发并行应用的入门门槛. 1.1 MapReduce是什么 Hadoop

MapReduce源代码分析之JobSubmitter(一)

JobSubmitter.顾名思义,它是MapReduce中作业提交者,而实际上JobSubmitter除了构造方法外.对外提供的唯一一个非private成员变量或方法就是submitJobInternal()方法,它是提交Job的内部方法,实现了提交Job的全部业务逻辑. 本文,我们将深入研究MapReduce中用于提交Job的组件JobSubmitter. 首先,我们先看下JobSubmitter的类成员变量.例如以下: // 文件系统FileSystem实例 private FileSys

【Big Data - Hadoop - MapReduce】初学Hadoop之图解MapReduce与WordCount示例分析

Hadoop的框架最核心的设计就是:HDFS和MapReduce.HDFS为海量的数据提供了存储,MapReduce则为海量的数据提供了计算. HDFS是Google File System(GFS)的开源实现. MapReduce是Google MapReduce的开源实现. HDFS和MapReduce实现是完全分离的,并不是没有HDFS就不能MapReduce运算. 本文主要参考了以下三篇博客学习整理而成. 1. Hadoop示例程序WordCount详解及实例 2. hadoop 学习笔

Hadoop 中利用 mapreduce 读写 mysql 数据

Hadoop 中利用 mapreduce 读写 mysql 数据 有时候我们在项目中会遇到输入结果集很大,但是输出结果很小,比如一些 pv.uv 数据,然后为了实时查询的需求,或者一些 OLAP 的需求,我们需要 mapreduce 与 mysql 进行数据的交互,而这些特性正是 hbase 或者 hive 目前亟待改进的地方. 好了言归正传,简单的说说背景.原理以及需要注意的地方: 1.为了方便 MapReduce 直接访问关系型数据库(Mysql,Oracle),Hadoop提供了DBInp

大数据分析:结合 Hadoop或 Elastic MapReduce使用 Hunk

作者 Jonathan Allen ,译者 张晓鹏 Hunk是Splunk公司一款比較新的产品,用来对Hadoop和其他NoSQL数据存储进行探測和可视化,它的新版本号将会支持亚马逊的Elastic MapReduce. 结合Hadoop使用Hunk Hadoop由两个单元组成,首先是被称为HDFS的存储单元,HDFS能够分布在成千上万个复制的节点上.接下来是MapReduce单元,它负责跟踪和管理被命名为map-reduce jobs的作业. 之前,开发人员会用到Splunk Hadoop C

从Hadoop框架与MapReduce模式中谈海量数据处理(含淘宝技术架构)

从hadoop框架与MapReduce模式中谈海量数据处理 前言 几周前,当我最初听到,以致后来初次接触Hadoop与MapReduce这两个东西,我便稍显兴奋,认为它们非常是神奇,而神奇的东西常能勾起我的兴趣,在看过介绍它们的文章或论文之后,认为Hadoop是一项富有趣味和挑战性的技术,且它还牵扯到了一个我更加感兴趣的话题:海量数据处理. 由此,近期凡是空暇时,便在看"Hadoop","MapReduce""海量数据处理"这方面的论文.但在看论

hadoop入门笔记MapReduce Shuffle简介(五)

1. MapReduce 定义 Hadoop 中的 MapReduce是一个使用简单的软件框架,基于它写出来的应用程序能够运行在由上千个商用机器组成的大型集群上,并以一种可靠容错式并行处理TB级别的数据集 2. MapReduce 特点 MapReduce 为什么如此受欢迎?尤其现在互联网+时代,互联网+公司都在使用 MapReduce.MapReduce 之所以如此受欢迎,它主要有以下几个特点. - MapReduce 易于编程.它简单的实现一些接口,就可以完成一个分布式程序,这个分布式程序可

初学Hadoop之图解MapReduce与WordCount示例分析

Hadoop的框架最核心的设计就是:HDFS和MapReduce.HDFS为海量的数据提供了存储,MapReduce则为海量的数据提供了计算. HDFS是Google File System(GFS)的开源实现,MapReduce是Google MapReduce的开源实现. HDFS和MapReduce实现是完全分离的,并不是没有HDFS就不能MapReduce运算. 本文主要参考了以下三篇博客学习整理而成. 1.Hadoop示例程序WordCount详解及实例 2.hadoop 学习笔记:m

YARN Apache Hadoop 的下一代MapReduce

之前自己做的hadoop项目是基于0.20.2版本的,查了一下资料,知道了自己以前学的是原map/reduce模型. 官方说明: 1.1.X - current stable version, 1.1 release 1.2.X - current beta version, 1.2 release 2.X.X - current alpha version 0.23.X - simmilar to 2.X.X but missing NN HA. 0.22.X - does not inclu