执行mapreduce有两种方式,一种是原生Java写的mr,一种是直接使用Streaming方式,这种方式是在Java上面做了一个封装,可以通过其它语言调用Java原生的mr接口。
- 优点
- 可以使用自己喜欢的语言来编写MapReduce程序(换句话说,不必写Java XD)
- 不需要像写Java的MR程序那样import一大堆库,在代码里做一大堆配置,很多东西都抽象到了stdio上,代码量显著减少
- 因为没有库的依赖,调试方便,并且可以脱离Hadoop先在本地用管道模拟调试
- 缺点
- 只能通过命令行参数来控制MapReduce框架,不像Java的程序那样可以在代码里使用API,控制力比较弱,有些东西鞭长莫及
- 因为中间隔着一层处理,效率会比较慢
下面是一个例子:
第一个文件test.dat:
0067011990999991950051507004...9999999N9+00001+99999999999... 0043011990999991950051512004...9999999N9+00221+99999999999... 0043011990999991950051518004...9999999N9-00111+99999999999... 0043012650999991949032412004...0500001N9+01111+99999999999... 0043012650999991949032418004...0500001N9+01121+99999999999... 0043012650999991949032418004...0500001N9+01221+99999999999...
第二个文件:map.py
#!usr/bin/python import re import sys for line in sys.stdin: val=line.strip() (year,temp)=(val[15:19],val[40:45]) print "%s\t%s" % (year,temp)
第三个文件:red.py
#!usr/bin/python import sys (last_key,max_val)=(None,0) for line in sys.stdin: (key,val)=line.strip().split('\t') if last_key and last_key!=key: print '%s\t%s' % (last_key, max_val) (last_key, max_val)=(key,int(val)) else: (last_key, max_val)=(key,max(max_val,int(val))) if last_key: print '%s\t%s' % (last_key, max_val)
执行的mr命令:
hadoop jar /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/jars/hadoop-streaming-2.5.0-cdh5.3.2.jar -mapper 'python map.py' -file /usr/local/tables/map.py -reducer 'python red.py' -file /usr/local/tables/red.py -input /tmp/logs/test.dat -output /tmp/test/
为了让Hadoop将程序分发给其他机器,需要再加一个-file参数用于指明要分发的程序放在哪里。
时间: 2024-10-12 03:03:29