reduce端连接-分区分组聚合

1.1.1         reduce端连接-分区分组聚合

reduce端连接则是利用了reduce的分区功能将stationid相同的分到同一个分区,在利用reduce的分组聚合功能,将同一个stationid的气象站数据和温度记录数据分为一组,reduce函数读取分组后的第一个记录(就是气象站的名称)与其他记录组合后输出,实现连接。例如连接下面气象站数据集和温度记录数据集。先用几条数据做分析说明,实际肯定不只这点数据。

气象站数据集,气象站id和名称数据表

StationId StationName

1~hangzhou

2~shanghai

3~beijing

温度记录数据集

StationId  TimeStamp Temperature

3~20200216~6

3~20200215~2

3~20200217~8

1~20200211~9

1~20200210~8

2~20200214~3

2~20200215~4

目标:是将上面两个数据集进行连接,将气象站名称按照气象站id加入气象站温度记录中最输出结果:

1~hangzhou ~20200211~9

1~hangzhou ~20200210~8

2~shanghai ~20200214~3

2~shanghai ~20200215~4

3~beijing ~20200216~6

3~beijing ~20200215~2

3~beijing ~20200217~8

详细步骤如下

(1)   两个maper读取两个数据集的数据输出到同一个文件

因为是不同的数据格式,所以需要创建两个不同maper分别读取,输出到同一个文件中,所以要用MultipleInputs设置两个文件路径,设置两个mapper。

(2)   创建一个组合键<stationed,mark>用于map输出结果排序。

组合键使得map输出按照stationid升序排列,stationid相同的按照第二字段升序排列。mark只有两个值,气象站中读取的数据,mark为0,温度记录数据集中读取的数据mark为1。这样就能保证stationid相同的记录中第一条就是气象站名称,其余的是温度记录数据。组合键TextPair定义如下

package Temperature;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class TextPair implements WritableComparable<TextPair> {
    private Text first;
    private Text second;

    public TextPair(Text first, Text second) {
        this.first = first;
        this.second = second;
    }

    public int compareTo(TextPair o) {
        int cmp=first.compareTo(o.getFirst());
        if (cmp!=0)//第一字段不同按第一字段升序排列
        {
            return cmp;
        }
        ///第一字段相同,按照第二字段升序排列
        return second.compareTo(o.getSecond());
    }

    public void write(DataOutput dataOutput) throws IOException {
        first.write(dataOutput);
        second.write(dataOutput);
    }

    public void readFields(DataInput dataInput) throws IOException {
        first.readFields(dataInput);
        second.readFields(dataInput);
    }

    public Text getFirst() {
        return first;
    }

    public void setFirst(Text first) {
        this.first = first;
    }

    public Text getSecond() {
        return second;
    }
    public void setSecond(Text second) {
        this.second = second;
    }
}

定义maper输出的结果如下,前面是组合键,后面是值。

<1,0>    hangzhou

<1,1>    20200211~9

<1,1>    20200210~8

<2,0>    shanghai

<2,1>    20200214~3

<2,1>    20200215~4

<3,0>    beijing

<3,1>    20200216~6

<3,1>    20200215~2

<3,1>    20200217~8

(3)map结果传入reducestationid分区再分组聚合

map输出结果会按照组合键第一个字段stationid升序排列,相同stationid的记录按照第二个字段升序排列,气象站数据和记录数据混合再一起,shulfe过程中,map将数据传给reduce,会经过partition分区,相同stationid的数据会被分到同一个reduce,一个reduce中stationid相同的数据会被分为一组。假设采用两个reduce任务,分区按照stationid%2,则分区后的结果为

分区1

<1,0>    hangzhou

<1,1>    20200211~9

<1,1>    20200210~8

<3,0>    beijing

<3,1>    20200216~6

<3,1>    20200215~2

<3,1>    20200217~8

分区2

<2,0>    shanghai

<2,1>    20200214~3

<2,1>    20200215~4

4)分区之后再将每个分区的数据按照stationid分组聚合

分区1

分组1

<1,0>    <Hangzhou, 20200211~9, 20200210~8>

分组2

<3,0>    <Beijing, 20200216~6, 20200215~2, 20200217~8>

分区2

<2,0> <shanghai, 20200214~3, 20200215~4>

5)将分组聚合后的数据传入reduce函数,将车站加入到后面的温度记录输出。

因为数据是经过mark升序排列的,所以每组中第一个数据就是气象站的名称数据,剩下的是改气象的温度记录数据,mark字段的作用就是为了保证气象站数据在第一条。所以读取每组中第一个value,既是气象站名称。与其他value组合输出,即实现了数据集的连接。

1~hangzhou ~20200211~9

1~hangzhou ~20200210~8

2~shanghai ~20200214~3

2~shanghai ~20200215~4

3~beijing ~20200216~6

3~beijing ~20200215~2

3~beijing ~20200217~8

6)详细的代码实例

package Temperature;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;
import java.util.Iterator;

public class JoinRecordWithStationId extends Configured implements Tool {
    //气象站名称数据集map处理类
   public static class StationMapper extends Mapper<LongWritable,Text,TextPair,Text>{
       protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
           //1~hangzhou
           String[] values=value.toString().split("~");
           if (values.length!=2)
           {
               return;
           }
           //组合键第一字段为stationid,第二字段为默认0,表示车站名字数据
           context.write(new TextPair(new Text(values[0]),new Text("0")),new Text(values[1]));
       }
   }
   //温度记录数据集处理mapper类
   public static class TemperatureRecordMapper extends Mapper<LongWritable,Text,TextPair,Text>{
       protected void map(TextPair key, Text value, Context context) throws IOException, InterruptedException {
           String[] values=value.toString().split("~");
           if (values.length!=3)
           {
               return;
           }
           //组合键第一字段为stationid,第二字段为默认1,表示温度记录数据
           //3~20200216~6
           String outputValue=values[1]+"~"+values[2];
           context.write(new TextPair(new Text(values[0]),new Text("1")),new Text(outputValue));
       }
   }
   //按照statitionid分区的partioner类
    public static class FirstPartitioner extends Partitioner<TextPair,Text>{

       public int getPartition(TextPair textPair, Text text, int i) {
           //按照第一字段stationid取余reduce任务数,得到分区id
           return Integer.parseInt(textPair.getFirst().toString())%i;
       }
   }
   //分组比较类
   public static class GroupingComparator extends WritableComparator
   {
       public int compare(WritableComparable a, WritableComparable b) {
           TextPair pairA=(TextPair)a;
           TextPair pairB=(TextPair)b;
           //stationid相同,返回值为0的分为一组
           return pairA.getFirst().compareTo(pairB.getFirst());
       }
   }
   //reudce将按键分组的后数据,去values中第一个数据(气象站名称),聚合values后面的温度记录输出到文件
    public static class JoinReducer extends Reducer<TextPair,Text,Text,Text>
    {
        @Override
        protected void reduce(TextPair key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            Iterator it =values.iterator();
            String stationName=it.next().toString();
            while (it.hasNext())
            {
                String outputValue="~"+stationName+"~"+it.toString();
                context.write(key.getFirst(),new Text(outputValue));
            }
        }
    }
    public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
       if (args.length!=3)
       {
           return -1;
       }
        Job job=new Job(getConf(),"joinStationTemperatueRecord");
       if (job==null)
       {
           return -1;
       }
       job.setJarByClass(this.getClass());
      //设置两个输入路径,一个输出路径
       Path StationPath=new Path(args[0]);
       Path TemperatureRecordPath= new Path(args[1]);
       Path outputPath=new Path(args[2]);
       MultipleInputs.addInputPath(job,StationPath, TextInputFormat.class,StationMapper.class);
       MultipleInputs.addInputPath(job,TemperatureRecordPath,TextInputFormat.class,TemperatureRecordMapper.class);
       FileOutputFormat.setOutputPath(job,outputPath);
       //设置分区类、分组类、reduce类
       job.setPartitionerClass(FirstPartitioner.class);
       job.setGroupingComparatorClass(GroupingComparator.class);
       job.setReducerClass(JoinReducer.class);
       //设置输出类型
       job.setOutputKeyClass(Text.class);
       job.setOutputValueClass(Text.class);
       job.setMapOutputKeyClass(TextPair.class);
       job.setMapOutputValueClass(Text.class);
       return job.waitForCompletion(true)? 0:1;
    }
    public static void main(String[] args) throws Exception
    {
        //三个参数,参数1:气象站数据集路径,参数2:温度记录数据集路径,参数3:输出路径
       int exitCode= ToolRunner.run(new JoinRecordWithStationId(),args);
       System.exit(exitCode);
    }

}

执行任务命令

% hadoop jar temperature-example.jar JoinRecordWithStationId input/station/all input/ncdc/all output

自己开发了一个股票智能分析软件,功能很强大,需要的点击下面的链接获取:

https://www.cnblogs.com/bclshuai/p/11380657.html

原文地址:https://www.cnblogs.com/bclshuai/p/12319490.html

时间: 2024-10-11 17:36:27

reduce端连接-分区分组聚合的相关文章

reduce连接是怎么按组合键分组聚合功能原理详解

1.reduce连接实现目标 气象站数据集,气象站id和名称数据表 StationId StationName 1~hangzhou 2~shanghai 3~beijing 温度记录数据集 StationId  TimeStamp Temperature 3~20200216~6 3~20200215~2 3~20200217~8 1~20200211~9 1~20200210~8 2~20200214~3 2~20200215~4 目标:是将上面两个数据集进行连接,将气象站名称按照气象站id

Map/Reduce中分区和分组的问题

全篇结论 [分在同一组的<key,value>一定同属一个分区.在一个分区的<key,value>可重载"job.setGroupingComparatorClass(a.class);"中的a类的 compare方法重新定义分组规则,同一组的value做为reduce的输入.] 一.为什么写 分区和分组在排序中的作用是不一样的,今天早上看书,又有点心得体会,记录一下. 二.什么是分区 1.还是举书上的例子,在8.2.4章节的二次排序过程中,用气温举例,所以这里

Atitit &#160;数据存储的分组聚合 groupby的实现attilax总结

Atitit  数据存储的分组聚合 groupby的实现attilax总结 1. 聚合操作1 1.1. a.标量聚合 流聚合1 1.2. b.哈希聚合2 1.3. 所有的最优计划的选择都是基于现有统计信息来评估3 1.4. 参考资料3 1. 聚合操作 聚合也是我们在写T-SQL语句的时候经常遇到的,我们来分析一下一些常用的聚合操作运算符的特性和可优化项. 1.1. a.标量聚合 流聚合 标量聚合是一种常用的数据聚合方式,比如我们写的语句中利用的以下聚合函数:MAX().MIN().AVG().C

Elasticsearch分组聚合-查询每个A_logtype下有多少数据

Elasticsearch分组聚合 1.查询指定索引下每个A_logtype有多少数据 curl -XPOST 'localhost:19200/ylchou-0-2015-10-07/_search?pretty' -d ' { "size": 0, "aggs": { "group_by_state": { "terms": { "field": "A_logtype" } } }

crm使用FetchXml分组聚合查询

/* 创建者:菜刀居士的博客 * 创建日期:2014年07月09号 */ namespace Net.CRM.FetchXml { using System; using Microsoft.Xrm.Sdk; using Microsoft.Xrm.Sdk.Query; /// <summary> /// 使用FetchXml聚合查询,分组依据 /// </summary> public class FetchXmlExtension { /// <summary> /

mongodb 分组聚合查询

MongoDB,分组,聚合 使用聚合,db.集合名.aggregate- 而不是find 管道在Unix和Linux中一般用于将当前命令的输出结果作为下一个命令的参数.MongoDB的聚合管道将MongoDB文档在一个管道处理完毕后将结果传递给下一个管道处理.管道操作是可以重复的. 每一个操作符(集合)都会接受一连串的文档,对这些文档做一些类型转换,最后将转换后的文档作为结果传递给下一个操作符,对于最后一个操作符,是将结果返回给客户端 //分组(这里制定了分组字段 $+字段名)//这里可以理解为

浅析MySQL使用 GROUP BY 分组聚合与细分聚合

1. 聚合函数(Aggregate Function) MySQL(5.7 ) 官方文档中给出的聚合函数列表(图片)如下: 详情点击https://dev.mysql.com/doc/refman/5.7/en/group-by-functions.html . 除非另有说明,否则聚合函数都会忽略空值(NULL values). 2. 聚合函数的使用 聚合函数通常对 GROUP BY 语句进行分组后的每个分组起作用,即,如果在查询语句中不使用 GROUP BY 对结果集分组,则聚合函数就对结果集

窗口聚合函数与分组聚合函数的异同

窗口聚合函数与分组聚合函数的功能是相同的:唯一不同的是,分组聚合函数通过分组查询来进行,而窗口聚合函数通过OVER子句定义的窗口来进行. --<T-SQL性能调优秘笈---基于SQL Server2012窗口函数>2.1.1窗口聚合函数描述

分页,sql分组聚合

分页 SELECT TOP 页大小 * FROM    (        SELECT ROW_NUMBER() OVER (ORDER BY id) AS RowNumber,* FROM table1    )   as A  WHERE RowNumber > 页大小*(页数-1) 分组聚合 create table tb(id int, value varchar(10))insert into tb values(1, 'aa')insert into tb values(1, 'bb