MapReduce Map Join 样例

1. 样例数据

011990-99999    SIHCCAJAVRI
012650-99999    TYNSET-HANSMOEN
012650-99999    194903241200    111
012650-99999    194903241800    78
011990-99999    195005150700    0
011990-99999    195005151200    22
011990-99999    195005151800    -11

2. 需求

3. 思路、代码

足够小的关联文件(即气象台信息)添加到分布式缓存,然后在每个 Mapper 端读取被缓存到本地全量气象台信息,再与天气信息相关联。

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class MapJoin {

    static class RecordMapper extends Mapper<LongWritable, Text, Text, Text> {
        private Map<String, String> stationMap = new HashMap<String, String>();

        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            //预处理,把要关联的文件加载到缓存中
            Path[] paths = context.getLocalCacheFiles();
            //新的检索缓存文件的API是 context.getCacheFiles() ,而 context.getLocalCacheFiles() 被弃用
            //然而 context.getCacheFiles() 返回的是 HDFS 路径; context.getLocalCacheFiles() 返回的才是本地路径

            //这里只缓存了一个文件,所以取第一个即可
            BufferedReader reader = new BufferedReader(new FileReader(paths[0].toString()));
            String line = null;
            try {
                while ((line = reader.readLine()) != null) {
                    String[] vals = line.split("\\t");
                    if (vals.length == 2) {
                        stationMap.put(vals[0], vals[1]);
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                reader.close();
            }
            super.setup(context);
        }

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] vals = value.toString().split("\\t");
            if (vals.length == 3) {
                String stationName = stationMap.get(vals[0]); //Join
                stationName = stationName == null ? "" : stationName;
                context.write(new Text(vals[0]),
                        new Text(stationName + "\t" + vals[1] + "\t" + vals[2]));
            }
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length != 3) {
            System.err.println("Parameter number is wrong, please enter three parameters:<ncdc input> <station input> <output>");
            System.exit(-1);
        }

        Path inputPath = new Path(otherArgs[0]);
        Path stationPath = new Path(otherArgs[1]);
        Path outputPath = new Path(otherArgs[2]);

        Job job = Job.getInstance(conf, "MapJoin");
        job.setJarByClass(MapJoin.class);

        FileInputFormat.addInputPath(job, inputPath);
        FileOutputFormat.setOutputPath(job, outputPath);

        job.addCacheFile(stationPath.toUri()); //添加缓存文件,可添加多个

        job.setMapperClass(RecordMapper.class);
        job.setMapOutputKeyClass(Text.class);

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

4. 运行结果

时间: 2024-11-05 18:20:23

MapReduce Map Join 样例的相关文章

Hadoop辅助排序样例一

1. 样例数据 011990-99999 SIHCCAJAVRI 012650-99999 TYNSET-HANSMOEN 012650-99999 194903241200 111 012650-99999 194903241800 78 011990-99999 195005150700 0 011990-99999 195005151200 22 011990-99999 195005151800 -11 2. 需求 3. 思路.代码 将气象站ID相同的气象站信息和天气信息交由同一个 Re

Hadoop.2.x_高级应用_二次排序及MapReduce端join

一.对于二次排序案例部分理解 1. 分析需求(首先对第一个字段排序,然后在对第二个字段排序) 杂乱的原始数据 排序完成的数据 a,1 a,1 b,1 a,2 a,2 [排序] a,100 b,6 ===> b,-3 c,2 b,-2 b,-2 b,1 a,100 b,6 b,-3 c,-7 c,-7 c,2 2. 分析[MapRedice过程] 1> 分析数据传入通过input()传入map() 2> map()对数据进行层层过滤,以达到我们想要的数据源, 3> 过滤方法中可添加自

Hadoop0.20.2 Bloom filter应用演示样例

1. 简单介绍 參见<Hadoop in Action>P102 以及 <Hadoop实战(第2版)>(陆嘉恒)P69 2. 案例 网上大部分的说明不过依照<Hadoop in Action>中的演示样例代码给出.这里是Hadoop0.20.2版本号,在该版本号中已经实现了BloomFilter. 案例文件例如以下: customers.txt 1,Stephanie Leung,555-555-5555 2,Edward Kim,123-456-7890 3,Jose

MapGuide应用程序演示样例——你好,MapGuide!

图 3?4显示了基于MapGuide的Web应用程序的开发流程,整个开发流程能够分为五个阶段.图中,矩形代表任务,椭圆形被任务使用的或被任务创建的实体,箭头代表数据流. 1) 载入文件类型的数据,配置到外部数据库的连接,通过联接(Join)一个要素源到还有一个要素源扩展要素数据. 2) 通过引用要素源的数据和为要素应用样式创建图层. 3) 将图层结合起来创建地图. 4) 通过Internet或Intrant公布地图,使用户能够通过client浏览地图. 5) 使用MapGuide API为ser

[Python] SQLBuilder 演示样例代码

用Python写一个SQLBuilder.Java版能够从 http://www.java2s.com/Code/Java/Database-SQL-JDBC/SQLBuilder.htm 看到. 附上代码: 演示样例代码(一): class SQLDirector: @classmethod def buildSQL(cls, builder): sql = "" sql += builder.getCommand() sql += builder.getTable() sql +=

Hadoop辅助排序样例二

1. 需求 求每年的最高温度 2. 样例数据 1995 10 1996 11 1995 16 1995 22 1996 26 1995 3 1996 7 1996 10 1996 20 1996 33 1995 21 1996 9 1995 31 1995 -13 1995 22 1997 -2 1997 28 1997 15 1995 8 3. 思路.代码 将记录按年份分组并按温度降序排序,然后才将同一年份的所有记录送到一个 reducer 组,则各组的首条记录就是这一年的最高温度.实现此方案

hadoop的WordCount样例

package cn.lmj.mapreduce; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org

Python word_cloud 样例 标签云系列(三)

转载地址:https://zhuanlan.zhihu.com/p/20436642word_cloud/examples at master · amueller/word_cloud · GitHub 上面是官方样例.这一篇里的大部分尝试都基于这些样例进行修改.前提是你已经完成了安装,依照上一篇修改了 FONT_PATH . 还记得 http://zhuanlan.zhihu.com/666666/20432734 里提到的中文分词方法吧,这次我们就不再赘述对文本的预处理了.有所不同的是,在

ADF Faces 表格应用基础案例二:动态字段+事件处理【附样例工程】

本文提供一个基于ADF Face组件开发样例工程,实现表格开发中常见的处理: 1.Map对象+Bean对象填充表格的数据行. 2.使用静态列.动态列.嵌套列的实现方法. 3.介绍表格中表单组件的使用方法. 4.介绍表格单行选中事件的处理过程. 本文是基于"ADF Faces 表格应用基础案例一:应用List<Class>填充文本表格"编写的,会省去许多细节部分的介绍. 实现的基本思路: 将样例工程的创建过程分为几个小的阶段,每个阶段实现了不同的目标. 第一阶段: 表格数据: