我们都知道一个地址拥有着多家公司,本案例将通过两种类型输入文件:address类(地址)和company类(公司)进行一对多的关联查询,得到地址名(例如:Beijing)与公司名(例如:Beijing JD、Beijing Red Star)的关联信息。
开发环境
硬件环境:Centos 6.5 服务器4台(一台为Master节点,三台为Slave节点)
软件环境:Java 1.7.0_45、hadoop-1.2.1
1、 Map过程
首先使用默认的TextInputFormat类对输入文件进行处理,得到文本中每行的偏移量及其内容并存入< key,value>例如<0,” 1:Beijing”>。Map过程首先按照输入文件的类型不同对输入信息进行不同的处理,例如,对于address类型输入文件将value值(”1:Beijing”)处理成<”1”,”address:Beijing”>,对于company类型输入文件将value值(”Beijing Red Star:1”)处理成<”1”,”company:Beijing Red Star”>,如图所示:
Map端核心代码实现如下,详细源码请参考:CompanyJoinAddress\src\com\zonesion\tablejoin\CompanyJoinAddress.java。
public static class MapClass extends Mapper<LongWritable, Text, Text, Text>{
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
Text addressId = new Text();
Text info = new Text();
String[] line = value.toString().split(":");// 获取文件的每一行数据,并以":"分割
String path = ((FileSplit) context.getInputSplit()).getPath().toString();
if (line.length < 2) {
return;
}
if (path.indexOf("company") >= 0) {//处理company文件的value信息: "Beijing Red Star:1"
addressId.set(line[1]);//"1"
info.set("company" + ":" + line[0]);//"company:Beijing Red Star"
context.write(addressId,info);//<key,value> --<"1","company:Beijing Red Star">
} else if (path.indexOf("address") >= 0) {//处理adress文件的value信息:"1:Beijing"
addressId.set(line[0]);//"1"
info.set("address" + ":" + line[1]);//"address:Beijing"
context.write(addressId,info);//<key,value> --<"1","address:Beijing">
}
}
}
2、 Reduce过程
Reduce过程首先对输入< key,values>即<”1”,[“company:Beijing Red Star”,”company:Beijing JD”,”address:Beijing”]>的values值进行遍历获取到单元信息value(例如”company:Beijing Red Star”),然后根据value中的标识符(company和address)将公司名和地址名分别存入到company集合和address集合,最后对company集合和address集合进行笛卡尔积运算得到company与address的关系,并进行输出,如图所示。
Reduce端核心代码实现如下,详细源码请参考:CompanyJoinAddress\src\com\zonesion\tablejoin\CompanyJoinAddress.java。
public static class ReduceClass extends Reducer<Text, Text, Text, Text>{
@Override
protected void reduce(Text key, Iterable<Text> values,Context context)
throws IOException, InterruptedException {
List<String> companys = new ArrayList<String>();
List<String> addresses = new ArrayList<String>();
//["company:Beijing Red Star","company:Beijing JD","address:Beijing"]
Iterator<Text> it = values.iterator();
while(it.hasNext()){
String value = it.next().toString();//"company:Beijing Red Star"
String[] result = value.split(":");
if(result.length >= 2){
if(result[0].equals("company")){
companys.add(result[1]);
}else if(result[0].equals("address")){
addresses.add(result[1]);
}
}
}
// 求笛卡尔积
if(0 != companys.size()&& 0 != addresses.size()){
for(int i=0;i<companys.size();i++){
for(int j=0;j<addresses.size();j++){
context.write(new Text(companys.get(i)), new Text(addresses.get(j)));//<key,value>--<"Beijing JD","Beijing">
}
}
}
}
}
3、 驱动实现
驱动核心代码实现如下,详细源码请参考:CompanyJoinAddress\src\com\zonesion\tablejoin\CompanyJoinAddress.java。
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 3) {
System.err.println("Usage: company Join address <companyTableDir> <addressTableDir> <out>");
System.exit(2);
}
Job job = new Job(conf, "company Join address");
//设置Job入口类
job.setJarByClass(CompanyJoinAddress.class);
// 设置Map和Reduce处理类
job.setMapperClass(MapClass.class);
job.setReducerClass(ReduceClass.class);
// 设置输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 设置输入和输出目录
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));//companyTableDir
FileInputFormat.addInputPath(job, new Path(otherArgs[1]));//addressTableDir
FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));//out
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
4、部署运行
1)启动Hadoop集群
[[email protected] ~]$ start-dfs.sh
[[email protected] ~]$ start-mapred.sh
[[email protected] ~]$ jps
5283 SecondaryNameNode
5445 JobTracker
5578 Jps
5109 NameNode
2)部署源码
#设置工作环境
[[email protected] ~]$ mkdir -p /usr/hadoop/workspace/MapReduce
#部署源码
将CompanyJoinAddress文件夹拷贝到/usr/hadoop/workspace/MapReduce/ 路径下;
… 你可以直接 下载 CompanyJoinAddress
3)编译文件
#切换工作目录
[[email protected] ~]$ cd /usr/hadoop/workspace/MapReduce/CompanyJoinAddress
#编译文件
[[email protected] CompanyJoinAddress]$ javac -classpath /usr/hadoop/hadoop-core-1.2.1.jar:/usr/hadoop/lib/commons-cli-1.2.jar -d bin src/com/zonesion/tablejoin/CompanyJoinAddress.java
[[email protected] CompanyJoinAddress]$ ls bin/com/zonesion/tablejoin/* -la
-rw-rw-r-- 1 hadoop hadoop 1909 8月 1 10:29 bin/com/zonesion/tablejoin/CompanyJoinAddress.class
-rw-rw-r-- 1 hadoop hadoop 2199 8月 1 10:29 bin/com/zonesion/tablejoin/CompanyJoinAddress$MapClass.class
-rw-rw-r-- 1 hadoop hadoop 2242 8月 1 10:29 bin/com/zonesion/tablejoin/CompanyJoinAddress$ReduceClass.class
4)打包jar文件
[[email protected] CompanyJoinAddress]$ jar -cvf CompanyJoinAddress.jar -C bin/ .
added manifest
adding: com/(in = 0) (out= 0)(stored 0%)
adding: com/zonesion/(in = 0) (out= 0)(stored 0%)
adding: com/zonesion/tablejoin/(in = 0) (out= 0)(stored 0%)
adding: com/zonesion/tablejoin/CompanyJoinAddress$MapClass.class(in = 2273) (out= 951)(deflated 58%)
adding: com/zonesion/tablejoin/CompanyJoinAddress$ReduceClass.class(in = 2242) (out= 1029)(deflated 54%)
adding: com/zonesion/tablejoin/CompanyJoinAddress.class(in = 1909) (out= 983)(deflated 48%)
5)上传输入文件
#创建company输入文件夹
[[email protected] CompanyJoinAddress]$ hadoop fs -mkdir CompanyJoinAddress/input/company/
#创建address输入文件夹
[[email protected] CompanyJoinAddress]$ hadoop fs -mkdir CompanyJoinAddress/input/address/
#上传文件到company输入文件夹
[[email protected] CompanyJoinAddress]$ hadoop fs -put input/company* CompanyJoinAddress/input/company/
#上传文件到address输入文件夹
[[email protected] CompanyJoinAddress]$ hadoop fs -put input/address* CompanyJoinAddress/input/address/
6)运行Jar文件
[[email protected] CompanyJoinAddress]$ hadoop jar CompanyJoinAddress.jar com.zonesion.tablejoin.CompanyJoinAddress CompanyJoinAddress/input/company/ CompanyJoinAddress/input/address/ CompanyJoinAddress/output
14/08/01 10:50:05 INFO input.FileInputFormat: Total input paths to process : 4
14/08/01 10:50:05 INFO util.NativeCodeLoader: Loaded the native-hadoop library
14/08/01 10:50:05 WARN snappy.LoadSnappy: Snappy native library not loaded
14/08/01 10:50:05 INFO mapred.JobClient: Running job: job_201408010921_0008
14/08/01 10:50:06 INFO mapred.JobClient: map 0% reduce 0%
14/08/01 10:50:09 INFO mapred.JobClient: map 50% reduce 0%
14/08/01 10:50:10 INFO mapred.JobClient: map 100% reduce 0%
14/08/01 10:50:17 INFO mapred.JobClient: map 100% reduce 100%
14/08/01 10:50:17 INFO mapred.JobClient: Job complete: job_201408010921_0008
14/08/01 10:50:17 INFO mapred.JobClient: Counters: 29
......
7)查看输出结果
[[email protected] CompanyJoinAddress]$ hadoop fs -ls CompanyJoinAddress/output
Found 3 items
-rw-r--r-- 1 hadoop supergroup 0 2014-08-01 10:50 /user/hadoop/CompanyJoinAddress/output/_SUCCESS
drwxr-xr-x - hadoop supergroup 0 2014-08-01 10:50 /user/hadoop/CompanyJoinAddress/output/_logs
-rw-r--r-- 1 hadoop supergroup 241 2014-08-01 10:50 /user/hadoop/CompanyJoinAddress/output/part-r-00000
[[email protected] CompanyJoinAddress]$ hadoop fs -cat CompanyJoinAddress/output/part-r-00000
Beijing Red Star Beijing
Beijing Rising Beijing
Back of Beijing Beijing
Beijing JD Beijing
xiaomi Beijing
Guangzhou Honda Guangzhou
Guangzhou Development Bank Guangzhou
Shenzhen Thunder Shenzhen
Tencent Shenzhen
aiplay hangzhou
huawei wuhan
您可能喜欢
时间: 2024-10-04 13:40:20