hadoop-temperture(自定义value数据类型)

本实例是:用hadoop的mapreduce思想来求解每年中的最高温度和最低温度(假设都是整型的温度数据)

1.mapreduce程序

package com.zhangdan.count;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import com.zhangdan.count.TopTemperature.TopMap.TopReduce;

public class TopTemperature {
    public static class TopMap extends Mapper<LongWritable,Text,Text,TemperBeansWritable>  {
        public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{
            String line=value.toString();
            StringTokenizer str=new StringTokenizer(line);
            while(str.hasMoreElements()){
                String thekey=str.nextToken();
                int value1=Integer.parseInt(str.nextToken());
                int value2=Integer.parseInt(str.nextToken());
                //System.out.println(thekey+":"+value1+"  "+value2);
                context.write(new Text(thekey),new TemperBeansWritable(value1,value2));
            }
        }
        public static class TopReduce extends Reducer<Text,TemperBeansWritable,Text,TemperBeansWritable>{
            public void reduce(Text key,Iterable<TemperBeansWritable> value,Context context) throws IOException,InterruptedException{
                int min=100;
                int max=-100;
                for(TemperBeansWritable temper:value){
                    if(min>temper.getLowvalue()) min=temper.getLowvalue();
                    if(max<temper.getHighvalue()) max=temper.getHighvalue();
                    System.out.println(key+":"+temper.getLowvalue()+"  "+temper.getHighvalue());
                }
                //System.out.println(key+":"+min+"  "+max);
                context.write(key, new TemperBeansWritable(min,max));
            }
        }
    }

    public static void main(String []args) throws Exception{
        Configuration conf =new Configuration();
        Job job=new Job(conf);
        job.setJarByClass(TemperBeansWritable.class);
        job.setJobName("temperture");

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(TemperBeansWritable.class);

        job.setMapperClass(TopMap.class);
        job.setReducerClass(TopReduce.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.waitForCompletion(true);

    }

}

2.自定义value的数据类型

package com.zhangdan.count;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

/**
 *
 * @author vlab 此自定义数据类型只适合value型,如果是key类,必须实现writableComparable接口
 */
public class TemperBeansWritable implements Writable {
    private int lowvalue;
    private int highvalue;

    public TemperBeansWritable() {
        // TODO Auto-generated constructor stub
    }

    public TemperBeansWritable(int lowvalue, int highvalue) {
        this.lowvalue = lowvalue;
        this.highvalue = highvalue;
    }

    public static TemperBeansWritable read(DataInput in) throws IOException {
        TemperBeansWritable w = new TemperBeansWritable();
        w.readFields(in);
        return w;
    }

    public int getLowvalue() {
        return lowvalue;
    }

    public int getHighvalue() {
        return highvalue;
    }

    public void setLowvalue(int lowvalue) {
        this.lowvalue = lowvalue;
    }

    public void setHighvalue(int highvalue) {
        this.highvalue = highvalue;
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        // TODO Auto-generated method stub
        this.lowvalue = in.readInt();
        this.highvalue = in.readInt();

    }

    @Override
    public void write(DataOutput out) throws IOException {
        // TODO Auto-generated method stub
        out.writeInt(lowvalue);
        out.writeInt(highvalue);
    }

    @Override
    public String toString() {
        return this.lowvalue + "\t" + this.highvalue;
    }

    @Override
    public int hashCode() {
        final int prime = 31;
        int result = 1;
        result = prime * result + highvalue;
        result = prime * result + lowvalue;
        return result;
    }

    @Override
    public boolean equals(Object obj) {
        if (this == obj)
            return true;
        if (obj == null)
            return false;
        if (getClass() != obj.getClass())
            return false;
        TemperBeansWritable other = (TemperBeansWritable) obj;
        if (highvalue != other.highvalue)
            return false;
        if (lowvalue != other.lowvalue)
            return false;
        return true;
    }

}
时间: 2024-12-20 00:06:52

hadoop-temperture(自定义value数据类型)的相关文章

Hadoop 学习自定义数据类型

(学习网易云课堂Hadoop大数据实战笔记) 序列化在分布式环境的两大作用:进程间通信,永久存储. Writable接口, 是根据 DataInput 和 DataOutput 实现的简单.有效的序列化对象. MR的任意Value必须实现Writable接口: MR的key必须实现WritableComparable接口,WritableComparable继承自Writable和Comparable接口: (本节先讲自定义value值,下一节再讲自定义key值,根据key值进行自定义排序) 以

Hadoop mapreduce自定义分组RawComparator

本文发表于本人博客. 今天接着上次[Hadoop mapreduce自定义排序WritableComparable]文章写,按照顺序那么这次应该是讲解自定义分组如何实现,关于操作顺序在这里不多说了,需要了解的可以看看我在博客园的评论,现在开始. 首先我们查看下Job这个类,发现有setGroupingComparatorClass()这个方法,具体源码如下: /** * Define the comparator that controls which keys are grouped toge

commoncrawl 源码库是用于 Hadoop 的自定义 InputFormat 配送实现

commoncrawl 源码库是用于 Hadoop 的自定义 InputFormat 配送实现. Common Crawl 提供一个示例程序 BasicArcFileReaderSample.java (位于 org.commoncrawl.samples) 用来配置 InputFormat. commoncrawl / commoncrawl Watch414 Fork86 CommonCrawl Project Repository — More... http://www.commoncr

hadoop 学习自定义排序

(网易云课程hadoop大数据实战学习笔记) 自定义排序,是基于k2的排序,设现有以下一组数据,分别表示矩形的长和宽,先按照面积的升序进行排序. 99 66 78 11 54 现在需要重新定义数据类型,MR的key值必须继承WritableComparable接口,因此定义RectangleWritable数据类型如下: import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import

Hadoop之--&gt;自定义排序

data: 3 33 23 12 22 11 1 --------------------- 需求: 1 12 12 23 13 23 3 package sort; import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path

Hadoop之--&gt;自定义分组 RawComparator

data: 3 33 23 22 22 11 1 --------------------- 需求: 1 12 23 3 当第一列相同时候要第二列的最小值 package group; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org

Hadoop之——自定义计数器

转载请注明出处:http://blog.csdn.net/l1028386804/article/details/46057909 1.Mapper类的实现 /** * KEYIN 即k1 表示行的偏移量 * VALUEIN 即v1 表示行文本内容 * KEYOUT 即k2 表示行中出现的单词 * VALUEOUT 即v2 表示行中出现的单词的次数,固定值1 */ static class MyMapper extends Mapper<LongWritable, Text, Text, Lon

[Hadoop] - Mapreduce自定义Counter

在Hadoop的MR程序开发中,经常需要统计一些map/reduce的运行状态信息,这个时候我们可以通过自定义Counter来实现,这个实现的方式是不是通过配置信息完成的,而是通过代码运行时检查完成的. 1.创建一个自己的Counter枚举类. enum PROCESS_COUNTER { BAD_RECORDS, BAD_GROUPS; } 2.在需要统计的地方,比如map或者reduce阶段进行下列操作. context.getCounter(PROCESS_COUNTER.BAD_RECO

hadoop 学习自定义分区

(网易云课程hadoop大数据实战学习笔记) 如图所示:有三个ReducerTask,因此处理完成之后的数据存储在三个文件中: 默认情况下,numReduceTasks的数量为1,前面做的实验中,输出数据都是在一个文件中.通过自定义myPatitioner类,可以把ruduce处理后的数据分类汇总,这里MyPartitioner是Partitioner的基类,如果需要定制partitioner也需要继承该类.HashPartitioner是mapreduce的默认partitioner.计算方法