两份数据,若干行,若干列。均有相同一列,作为key。连接,完整显示两份数据。
data1.txt
zhangsan man 25 teacher
lisi man 28 police
wangwu man 26 student
guoxiao man 26 salesman
wangmeng women 26 student
huangshi women 28 programmer
zhouchu man 25 manager
yangchao man 27 doctor
data2.txt
zhanghuan liaoning huludao
huangshi liaoning unknown
xutao henan luoyang
guoxiao shanxi datong
zhangchao yunnan qujing
zhangsan known known
wangwu henan luoyang
zhouchu hubei enshi
luopeng sichuan dazho
第一列为姓名,在每份数据里都是唯一的,为唯一键。
结果如下所示
huangshi women 28 programmer liaoning unknown
guoxiao man 26 salesman shanxi datong
wangwu man 26 student henan luoyang
zhouchu man 25 manager hubei enshi
Mapper
#!/bin/bash function get_data_source() { #local mapreduce_map_input_file=data2asdf.txt echo "${mapreduce_map_input_file}" | awk ‘{ if ($1 ~ "data1") { print "data1" } else if ($1 ~ "data2") { print "data2" } }‘ return 0 } function filter() { local data_source=`get_data_source` if [ "${data_source}" == "data1" ] then awk ‘BEGIN{ OFS = "\t" }{ print $0 ,1 }END{ }‘ elif [ "${data_source}" == "data2" ] then awk ‘BEGIN{ OFS = "\t" }{ print $0, 2 }END{ }‘ fi } filter
Reduce
#!/bin/bash awk ‘BEGIN{ OFS = "\t" }{ if($NF == 1) { print substr($0,1,length($0)-1) > "1.txt" } if($NF == 2) { print substr($0,1,length($0)-1) > "2.txt" } }END{ }‘ awk ‘BEGIN{ FS = "\t" OFS = "\t" }{ if(ARGIND == 1) { dict[$1] = $0 } else if(ARGIND == 2) { if($1 in dict) { print dict[$1], $2, $3 } } }END{ }‘ 1.txt 2.txt
总结:
streaming框架通过设置环境变量的方式给mapper、reducer程序传递配置信息。我们利用mapreducer_map_input_file这一环境变量,可以获得输入文件路径,这样可以区分Map读取的数据来自哪一个文件。根据数据来源文件的不同,分别为Map的输出数据加后缀标识符。这里有两个文件,加上1,2用于区分。
Reducer的输入数据,每一行最后都有后缀1,2。在每一个Reducer任务里面,根据后缀不同在当前工作目录下面,生成两个临时文件。这样对这两个文件,进行单机版的相同处理即可得到我们需要的交集。
原文地址:https://www.cnblogs.com/zwcoding/p/9269767.html