MR(MapReduce)运行过程
client程序--》提交job至JobTracker--》分配job ID--》JobTracker检查输入文件存在,输出文件不存在--》进行输入分片--》Jobtracker分配资源--》初始化job(初始化就是创建一个正在运行的job对象(封装任务和记录信息),以便JobTracker跟踪job的状态和进程。)--》为每一个任务创建map任务--》任务分配(运行taskTracker与JobTracker进行沟通)--》执行任务--》任务完成,job状态为成功
MR作业处理(K:key,V:value)
文件 Map输入 Map进程 Map输出-Reduce输入 Reduce进程 Reduce输出
Input --> Splitting --> Mapping --> Shuffling --> Reducing --> Output(Hdfs)
K1,V1 K2,V2 K3,V3
输入--》分片--》分片执行Mapper run函数(setUp--while{map}--cleanUp)--》自定义combiner、sort、shuffling(数据处理,排序,merge整合)--》每个Reducer执行run(setUp--while{reduce}--cleanUp)函数--》输出到磁盘
(Input-->Splitting-->Mapping-->Shuffling)
MAP工作流程:
Map作业从输入数据中抽取出键值对(K1,V1),每一个键值对都作为参数传递给map函数,map函数产生的中间键值对(K2,V2)被缓存在内存中。
Map对数据进行两次排序: 1 每次溢出时,磁盘上产生一个溢写文件,溢写文件对溢出的数据划分好相应的分区(对应reduce),并排序 --第一次排序
2 结束时,将所有溢出时的溢写文件进行分区(对应reduce)排序整合 --第二次排序
(Shuffling)
MAP-Reduce交接:
1.缓存的中间键值(K2,V2)对会被定期(溢出时或指定时间内)写入本地磁盘,而且被分为R个区,R的大小是由用户定义的,将来每个区会对应一个Reduce作业;这些中间键值对的位置会被通报给master,master负责将信息转发给Reduce worker。
2.master通知分配了Reduce作业的worker它负责的分区在什么位置(肯定不止一个地方,每个Map作业产生的中间键值对都可能映射到所有R个不同分区),当Reduce worker把所有它负责的中间键值对都读过来后,先对它们进行排序,使得相同键的键值对聚集在一起。因为不同的键可能会映射到同一个分区也就是同一个Reduce作业(谁让分区少呢),所以排序是必须的(将所有Map进程产生的已经整合过的溢写文件提取相应的分区,并排序整合--第三次排序)。
(Shuffling-->Reducing-->Output(Hdfs))
reduce工作流程:
reduce worker遍历排序后的中间键值对(K2,V2),对于每个唯一的键,都将键与关联的值传递给reduce函数,reduce函数产生的输出(K3,V3)会添加到这个分区的输出文件中。
FileOutputFormat --MapReduce使用OutputFormat类将数据输出存入文件中,其基本与InputFormat类似。输出没有分块,每个Reducer将它的输出直接写到自己的文件中。输出文件存在于一 个共有目录当中,一般被命名为part-nnnnn,nnnnn是Reducer的分区ID。
FileInputFormat --继承InputFormat抽象类实现List和CreateRecordReader方法
1.FileInputFormat实现了InputFormat的getSplits()方法,将输入的文件划分为InputSplit(输入块)。
2.protected List listStatus(JobContext job ) throws IOException递归获取输入数据中的文件,其中的job包含前面的那几个参数,是系统的配置Configuration
3.切分之后有RecordReader来读取,FileInputFormat没有对应的RecordReader,他的两个子类:SequenceFileInputFormat二进制形式存放的键/值文件TextInputFormat是文本文件 的处理,他们的createRecordReader()分别返回SequenceFileRecordReader,LineRecordReader实例
InputSplit:包含一个以字节为单位的长度和一组储存位置。一个分片并不包含数据本身,而是指向数据的引用。存储位置供MapReduce系统使用以便将map任务尽量放在分片数据附近,而长度用来排序分片,以便优先处理最大的分片,从而最小化作业运行时间。
MapReduce程序:
1.获取配置信息 Configuration
2.新建Job
3.编写Mapper类和Reduce类
4.Job设置运行时的Mapper和Reduce类
5.Job设置输入输出文件
6.Jop设置输出参数类型
7.Job进程等待
8.Job进程结束