9.3.2 map端连接-CompositeInputFormat连接类

1.1.1         map端连接-CompositeInputFormat连接类

(1)使用CompositeInputFormat连接类需要满足三个条件

1)两个数据集都是大的数据集,不能用缓存文件的方式。

2)数据集都是按照相同的键进行排序;

3)数据集有相同的分区数,同一个键的所有记录在同一个分区中,输出文件不可分割;

要满足这三个条件,输入数据在达到map端连接函数之前,两个数据集被reduce处理,reduce任务数量相同都为n,两个数据集被分区输出到n个文件,同一个键的所有记录在同一个分区中,且数据集中的数据都是按照连接键进行排序的。reduce数量相同、键相同且都是按键排序、输出文件是不可切分的(小于一个HDFS块,或通过gzip压缩实现),则就满足map端连接的前提条件。利用org.apach.hadoop.mapreduce.join包中的CompositeInputFormat类来运行一个map端连接。

(2)CompositeInputFormat类简介

CompositeInputFormat类的作用就将job的输入格式设置为job.setInputFormatClass(CompositeInputFormat.class);同时通过conf的set(String name, String value)方法设置两个数据集的连接表达式,表达式内容包括三个要素:连接方式(inner、outer、override、tbl等) ,读取两个数据集的输入方式,两个数据集的路径。这三个要素按照一定的格式组织成字符串作为表达式设置到conf中。

//设置输入格式为 CompositeInputFormat
job.setInputFormatClass(CompositeInputFormat.class);
//conf设置连接的表达式public static final String JOIN_EXPR = "mapreduce.join.expr";
Configuration conf = job.getConfiguration();
conf.set(CompositeInputFormat.JOIN_EXPR, CompositeInputFormat.compose(
        "inner", KeyValueTextInputFormat.class,
        FileInputFormat.getInputPaths(job)));
//等价转换之后就是如下表达式
//conf.set("mapreduce.join.expr", CompositeInputFormat.compose(
       // "inner", KeyValueTextInputFormat.class, userPath,commentPath));
 

CompositeInputFormat类的源码如下

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//

package org.apache.hadoop.mapreduce.lib.join;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.join.Parser.CNode;
import org.apache.hadoop.mapreduce.lib.join.Parser.Node;
import org.apache.hadoop.mapreduce.lib.join.Parser.WNode;

@Public
@Stable
public class CompositeInputFormat<K extends WritableComparable> extends InputFormat<K, TupleWritable> {
    public static final String JOIN_EXPR = "mapreduce.join.expr";
    public static final String JOIN_COMPARATOR = "mapreduce.join.keycomparator";
    private Node root;

    public CompositeInputFormat() {
    }

    public void setFormat(Configuration conf) throws IOException {
        this.addDefaults();
        this.addUserIdentifiers(conf);
        this.root = Parser.parse(conf.get("mapreduce.join.expr", (String)null), conf);
    }

    protected void addDefaults() {
        try {//有默认的四种连接方式,每种连接方式都有对应的Reader
            CNode.addIdentifier("inner", InnerJoinRecordReader.class);
            CNode.addIdentifier("outer", OuterJoinRecordReader.class);
            CNode.addIdentifier("override", OverrideRecordReader.class);
            WNode.addIdentifier("tbl", WrappedRecordReader.class);
        } catch (NoSuchMethodException var2) {
            throw new RuntimeException("FATAL: Failed to init defaults", var2);
        }
    }

    private void addUserIdentifiers(Configuration conf) throws IOException {
        Pattern x = Pattern.compile("^mapreduce\\.join\\.define\\.(\\w+)$");
        Iterator i$ = conf.iterator();

        while(i$.hasNext()) {
            Entry<String, String> kv = (Entry)i$.next();
            Matcher m = x.matcher((CharSequence)kv.getKey());
            if (m.matches()) {
                try {
                    CNode.addIdentifier(m.group(1), conf.getClass(m.group(0), (Class)null, ComposableRecordReader.class));
                } catch (NoSuchMethodException var7) {
                    throw new IOException("Invalid define for " + m.group(1), var7);
                }
            }
        }

    }

    public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException {
        this.setFormat(job.getConfiguration());
        job.getConfiguration().setLong("mapreduce.input.fileinputformat.split.minsize", 9223372036854775807L);
        return this.root.getSplits(job);
    }

    public RecordReader<K, TupleWritable> createRecordReader(InputSplit split, TaskAttemptContext taskContext) throws IOException, InterruptedException {
        this.setFormat(taskContext.getConfiguration());
        return this.root.createRecordReader(split, taskContext);
    }
//按格式组织连接表达式
    public static String compose(Class<? extends InputFormat> inf, String path) {
        return compose(inf.getName().intern(), path, new StringBuffer()).toString();
    }
//连接方式(inner、outer、override、tbl等) 、读取两个数据集的输入方式、两个数据集的路径
    public static String compose(String op, Class<? extends InputFormat> inf, String... path) {
        String infname = inf.getName();//org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat
        StringBuffer ret = new StringBuffer(op + ‘(‘);
        String[] arr$ = path;
        int len$ = path.length;

        for(int i$ = 0; i$ < len$; ++i$) {
            String p = arr$[i$];
            compose(infname, p, ret);
            ret.append(‘,‘);
        }

        ret.setCharAt(ret.length() - 1, ‘)‘);
        return ret.toString();
    }

    public static String compose(String op, Class<? extends InputFormat> inf, Path... path) {
        ArrayList<String> tmp = new ArrayList(path.length);
        Path[] arr$ = path;
        int len$ = path.length;

        for(int i$ = 0; i$ < len$; ++i$) {
            Path p = arr$[i$];
            tmp.add(p.toString());
        }

        return compose(op, inf, (String[])tmp.toArray(new String[0]));
    }

    private static StringBuffer compose(String inf, String path, StringBuffer sb) {
        sb.append("tbl(" + inf + ",\"");
        sb.append(path);
        sb.append("\")");
        return sb;
    }
}

其中主要的函数就是compose函数,他是一个重载函数:

public static String compose(String op, Class<? extends InputFormat> inf, String... path);

op表示连接类型(inner、outer、override、tbl),inf表示数据集的输入方式,path表示输入数据集的文件路径。这个函数的作用是将传入的表达式三要素:连接方式(inner、outer、override、tbl等) 、读取两个数据集的输入方式、两个数据集的路径组成字符串。假设conf按如下方式传入三要素:

conf.set("mapreduce.join.expr", CompositeInputFormat.compose(

"inner", KeyValueTextInputFormat.class,“/hdfs/inputpath/userpath”, “/hdfs/inputpath/commentpath”));

compose函数最终得出的表达式为:

inner(tbl(org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat,” /hdfs/inputpath/userpath”),tbl(org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat,” /hdfs/inputpath/ commentpath”))

现在我只能深入到这里,至于为什么要满足三个条件才可以连接?设置表达式之后内部又是如何实现连接?有知道的欢迎留言讨论。

(3)CompositeInputFormat实现map端连接的实例

成绩数据和名字数据通过CompositeInputFormat实现map连接

成绩数据:

1,yuwen,100

1,shuxue,99

2,yuwen,99

2,shuxue,88

3,yuwen,99

3,shuxue,56

4,yuwen,33

4,shuxue,99名字数据:

1,yaoshuya,25

2,yaoxiaohua,29

3,yaoyuanyie,15

4,yaoshupei,26

文件夹定义如下:

代码:

package Temperature;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.fs.FileUtil;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;

import org.apache.hadoop.mapreduce.lib.join.CompositeInputFormat;

import org.apache.hadoop.mapreduce.lib.join.TupleWritable;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

import java.io.File;

import java.io.IOException;

public class CompositeJoin extends Configured implements Tool {

    private static class CompositeJoinMapper extends Mapper<Text, TupleWritable,Text,TupleWritable>

    {

        @Override

        protected void map(Text key, TupleWritable value, Context context) throws IOException, InterruptedException {

            context.write(key,value);

        }

    }

    public int run(String[] args) throws Exception {

        Path userPath = new Path(args[0]);

        Path commentPath = new Path(args[1]);

        Path output = new Path(args[2]);

        Job job=null;

        try {

            job = new Job(getConf(), "mapinnerjoin");

        } catch (IOException e) {

            e.printStackTrace();

        }

        job.setJarByClass(getClass());

        job.setMapOutputKeyClass(Text.class);

        job.setMapOutputValueClass(TupleWritable.class);

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(TupleWritable.class);

        // 设置两个输入数据集的目录

        FileInputFormat.addInputPaths(job, args[0]);

        FileInputFormat.addInputPaths(job, args[1]);

        //设置输出目录

        FileOutputFormat.setOutputPath(job,output);

        Configuration conf = job.getConfiguration();

        //设置输入格式为 CompositeInputFormat

        job.setInputFormatClass(CompositeInputFormat.class);

        conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ",");

        //conf设置连接的表达式public static final String JOIN_EXPR = "mapreduce.join.expr";

        //conf.set(CompositeInputFormat.JOIN_EXPR, CompositeInputFormat.compose(

              //  "inner", KeyValueTextInputFormat.class,

             //   FileInputFormat.getInputPaths(job)));

        //等价转换之后就是如下表达式

        String strExpretion=CompositeInputFormat.compose("inner", KeyValueTextInputFormat.class, userPath,commentPath);

       conf.set("mapreduce.join.expr",strExpretion );

        job.setOutputFormatClass(TextOutputFormat.class);

        job.setNumReduceTasks(0);//map端连接,reduce为0,不使用reduce

        job.setMapperClass(CompositeJoinMapper.class);

        //键值属性分隔符设置为空格

        //删除结果目录,重新生成

        FileUtil.fullyDelete(new File(args[2]));

        return job.waitForCompletion(true)?0:1;

    }

    public static void main(String[] args) throws Exception

    {

        //三个参数,两个连接的数据路径,一个输出路径

        int exitCode= ToolRunner.run(new CompositeJoin(),args);

        System.exit(exitCode);

    }

}

设置run->edit Configuration设置输入输出路径,两个输入,一个输出

运行该类的main函数得到结果

自己开发了一个股票智能分析软件,功能很强大,需要的点击下面的链接获取:

https://www.cnblogs.com/bclshuai/p/11380657.html

原文地址:https://www.cnblogs.com/bclshuai/p/12329505.html

时间: 2024-11-14 17:22:55

9.3.2 map端连接-CompositeInputFormat连接类的相关文章

使用map端连接结合分布式缓存机制实现Join算法

前面我们介绍了MapReduce中的Join算法,我们提到了可以通过map端连接或reduce端连接实现join算法,在文章中,我们只给出了reduce端连接的例子,下面我们说说使用map端连接结合分布式缓存机制实现Join算法 1.介绍 我们使用频道类型数据集和机顶盒用户数据集,进行连接,统计出每天.每个频道.每分钟的收视人数 2.数据集 频道类型数据集就是channelType.csv文件,如下示例 机顶盒用户数据集来源于“08.统计电视机顶盒中无效用户数据,并以压缩格式输出有效数据”这个实

9.3.1 map端连接- DistributedCache分布式缓存小数据集

1.1.1         map端连接- DistributedCache分布式缓存小数据集 当一个数据集非常小时,可以将小数据集发送到每个节点,节点缓存到内存中,这个数据集称为边数据.用map函数将小数据集中的数据按键聚合到大的数据集中,输出连接数据集,进行连接操作. (1)   分布式缓存指定缓存文件 执行命令行时,采用hadoop  jar hadoop-example.jar MapSideJoinMain  -files input/cityfile/tb_dim_city.dat

Asp.Net SignalR - 持久连接类

持久连接类 通过SignalR持久连接类可以快速的构建一个即时通讯的应用,上篇博文已经我们创建一个owin Startup类和一个持久连接类来完成我们的工作,然后在Startup类的Configuration方法中添加了我们的中间件,配置名称 myconnection1 简单看一下持久连接类,里面有四个可供我们重写的方法,从字面上就可以看出是什么意思,需要一提的是持久连接类是享元模式的实践,因为一个客户端和服务器的连接过程中只会创建一个对象,后面都不会再创建对象. 在调试窗口也可以看到我们的Wr

ADO.NET基础巩固-----连接类和非连接类

      最近的一段时间自己的状态还是不错的,早上,跑步,上自习看书,下午宿舍里面编程实战,晚上要么练习代码,要么去打球(在不打就没机会了),生活还是挺丰富的. 关于C#的基础回顾就先到前面哪里,这些要自己在工作中慢慢的去体会,不是说看书就可以掌握的.我们都是从学生时代过来的知道每个人的学习情况是不一样的,所以找到自己的学习节奏是最好不过的. 下面是关于访问数据库[ADO.NET]的学习,之前刚开始学习的时候把这些基本的都过了一遍,但是长时间不使用,一些基本的用法还是会遗忘的.     一:关

Java基础中map接口和实现类

1.Map接口常用的实现类有HashMap和TreeMap. 2.HashMap类实现的Map集合对于添加和删除映射关系效率更高.HashMap是基于哈希表的Map接口的实现,HashMap通过哈希码对其内部的映射关系进行快速查询,由HashMap类实现的Map集合对于添加或删除映射关系效率较高. 3.TreeMap中的映射关系存在一定的顺序,如果希望Map集合中的对象存在一定的顺序,该使用TreeMap类实现Map集合. HashMap类 ①此类不保证映射的顺序,特别是不保证该顺序恒久不变 ②

9.4-全栈Java笔记:Map接口和实现类

现实生活中,我们经常需要成对存储某些信息.比如,我们使用的微信,一个手机号只能对应一个微信账户.这就是一种成对存储的关系. Map就是用来存储"键(key)-值(value) 对".  Map类中存储的"键值对"通过键来标识,所以"键对象"不能重复. Map 接口的实现类有: HashMap.TreeMap.HashTable.Properties等. 常用的方法: 方法 说明 Object   put(Object key, Object va

PHP mysqli方式连接类

分享一个PHP以mysqli方式连接类完整代码实例,有关mysqli用法实例. 一个在PHP中以mysqli方式连接数据库的一个数据库类实例,该数据库类是从一个PHP的CMS中整理出来的,可实现PHP连接数据库类,MySQLi版,兼容PHP4,对于有针对性需要的朋友可根据此代码进行优化和修改.<?php#=================================================================================================

黑马程序员——黑马基础——Map,集合框架工具类Conlections和Arrays

黑马程序员--黑马基础--Map,集合框架工具类Conlections和Arrays ------Java培训.Android培训.iOS培训..Net培训.期待与您交流! ------- 一,Map集合 Map<K,V>集合是一个接口,和List集合及Set集合不同的是,它是双列集合,并且可以给对象加上名字,即键(Key). 特点: 1)该集合存储键值对,一对一对往里存 2)要保证键的唯一性. Map集合的子类 Map |--Hashtable:底层是哈希表数据结构,不可以存入null键nu

JavaSE入门学习37:Java集合框架之Map接口及其实现类HashMap和TreeMap

一Map接口 Map接口中的每个成员方法由一个关键字(key)和一个值(value)构成.Map接口不直接继承于Collection接口,因 为它包装的是一组成对的"键-值"对象的集合,而且在Map接口的集合中也不能有重复的key出现,因为每个键只能与 一个成员元素相对应. Map接口定义了存储"键(key)--值(value)映射对"的方法.实现Map接口的类用来存储键值对.Map接口中包含 了一个keySet()方法,用于返回Map中所有key组成的Set集合.