Reducejoin sample

示例文件同sample join analysis

之前的示例是使用map端的join.这次使用reduce端的join.

根据源的类别写不同的mapper,处理不同的文件,输出的key都是studentno.value是其他的信息同时加上类别信息。

然后使用multipleinputs不同的路径注册不同的mapper.

reduce端相同的studentno的学生信息和考试成绩分配给同一个reduce,而且value中包含了这些信息,

把这些信息抽取出来,再做笛卡尔积即可。

下面的示例代码中,我没有使用multipleinputs来处理,自己修改了TextInputFormat的一些信息,使用返回文件名和当前行的信息。

根据文件名我在mapper中处理两个不同文件的信息,加上不同的类别送出去。

下面的代码中还有很多可以优化的地方,以后再更新。

package myexamples;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.LineReader;

public class reducejoin {

    public static class MyTextInputFormat extends FileInputFormat<Text, Text> {

        @Override
        public MyLineRecordReader createRecordReader(InputSplit split,
                TaskAttemptContext context) {
            return new MyLineRecordReader();
        }

        @Override
        protected boolean isSplitable(JobContext context, Path file) {
            CompressionCodec codec = new CompressionCodecFactory(
                    context.getConfiguration()).getCodec(file);
            return codec == null;
        }

    }

    public static class MyLineRecordReader extends RecordReader<Text, Text> {
        private static final Log LOG = LogFactory
                .getLog(LineRecordReader.class);

        private CompressionCodecFactory compressionCodecs = null;
        private long start;
        private long pos;
        private long end;
        private LineReader in;
        private int maxLineLength;
        private Text key = null;
        private Text value = null;

        Text filename = null;

        public void initialize(InputSplit genericSplit,
                TaskAttemptContext context) throws IOException {
            FileSplit split = (FileSplit) genericSplit;
            Configuration job = context.getConfiguration();
            this.maxLineLength = job.getInt(
                    "mapred.linerecordreader.maxlength", Integer.MAX_VALUE);
            start = split.getStart();
            end = start + split.getLength();
            final Path file = split.getPath();
            key = new Text(file.getName());
            compressionCodecs = new CompressionCodecFactory(job);
            final CompressionCodec codec = compressionCodecs.getCodec(file);

            // open the file and seek to the start of the split
            FileSystem fs = file.getFileSystem(job);
            FSDataInputStream fileIn = fs.open(split.getPath());
            boolean skipFirstLine = false;
            if (codec != null) {
                in = new LineReader(codec.createInputStream(fileIn), job);
                end = Long.MAX_VALUE;
            } else {
                if (start != 0) {
                    skipFirstLine = true;
                    --start;
                    fileIn.seek(start);
                }
                in = new LineReader(fileIn, job);
            }
            if (skipFirstLine) { // skip first line and re-establish "start".
                start += in.readLine(new Text(), 0,
                        (int) Math.min((long) Integer.MAX_VALUE, end - start));
            }
            this.pos = start;
        }

        public boolean nextKeyValue() throws IOException {
            if (key == null) {

            }

            if (value == null) {
                value = new Text();
            }
            int newSize = 0;
            while (pos < end) {
                newSize = in.readLine(value, maxLineLength, Math.max(
                        (int) Math.min(Integer.MAX_VALUE, end - pos),
                        maxLineLength));
                if (newSize == 0) {
                    break;
                }
                pos += newSize;
                if (newSize < maxLineLength) {
                    break;
                }

                // line too long. try again
                LOG.info("Skipped line of size " + newSize + " at pos "
                        + (pos - newSize));
            }
            if (newSize == 0) {
                key = null;
                value = null;
                return false;
            } else {
                return true;
            }
        }

        @Override
        public Text getCurrentKey() {
            return key;
        }

        @Override
        public Text getCurrentValue() {
            return value;
        }

        /**
         * Get the progress within the split
         */
        public float getProgress() {
            if (start == end) {
                return 0.0f;
            } else {
                return Math.min(1.0f, (pos - start) / (float) (end - start));
            }
        }

        public synchronized void close() throws IOException {
            if (in != null) {
                in.close();
            }
        }
    }

    public static class studentMapper extends Mapper<Text, Text, Text, Text> {
        public void map(Text key, Text value, Context context)
                throws IOException, InterruptedException {
            Text newvalue = null;
            String strv = value.toString().substring(
                    value.toString().indexOf(","));
            if (key.toString().contains("student")) // student file
                newvalue = new Text("student" + strv);
            else
                newvalue = new Text("score" + strv);
            Text newkey = new Text(value.toString().substring(0,
                    value.toString().indexOf(",")));
            context.write(newkey, newvalue);
        }
    }

    public static class studentReducer extends Reducer<Text, Text, Text, Text> {
        public void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            List<String> students = new ArrayList<String>();
            List<String> scores = new ArrayList<String>();
            for (Text value : values)
                if (value.toString().startsWith("student"))
                    students.add(value.toString().substring(8));
                else
                    scores.add(value.toString().substring(6));
            // split real results
            for (String student : students)
                for (String score : scores)
                    context.write(key, new Text(student + "," + score));
        }
    }

    public static void main(String[] args) throws Exception {
        args = "hdfs://namenode:9000/user/hadoop/student/ hdfs://namenode:9000/user/hadoop/reducejoinout"
                .split(" ");

        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args)
                .getRemainingArgs();
        if (otherArgs.length != 2) {
            System.err.println("Usage: wordcount <in> <out>");
            System.exit(2);
        }

        myUtils.myUtils.DeleteFolder(conf, otherArgs[1]);
        conf.set("io.sort.mb", "10");
        Job job = new Job(conf, "reduce join");
        job.setInputFormatClass(MyTextInputFormat.class);
        // job.setOutputFormatClass(SequenceFileOutputFormat.class);

        job.setJarByClass(reducejoin.class);
        job.setMapperClass(studentMapper.class);
        job.setReducerClass(studentReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}
时间: 2024-08-25 23:42:44

Reducejoin sample的相关文章

随机抽样一致算法(Random sample consensus,RANSAC)

作者:桂. 时间:2017-04-25  21:05:07 链接:http://www.cnblogs.com/xingshansi/p/6763668.html 前言 仍然是昨天的问题,别人问到最小二乘.霍夫变换.RANSAC在直线拟合上的区别.昨天梳理了霍夫变换,今天打算抽空梳理一下RANSAC算法,主要包括: 1)RANSAC理论介绍 2)RANSAC应用简介: 内容为自己的学习记录,其中很多地方借鉴了别人,最后一起给出链接. 一.RANSAC理论介绍 普通最小二乘是保守派:在现有数据下,

Sample Testlink API client in python

""" Testlink API Sample Python Client implementation """ import xmlrpclib class TestlinkAPIClient: # substitute your server URL Here SERVER_URL = "http://localhost/testlink/lib/api/xmlrpc.php" def __init__(self, dev

Sample Apps by Android Team -- Amazed

Sample Apps by Android Team 代码下载:http://pan.baidu.com/s/1eSNmdUE 本次是项目Amazed代码学习记录. 一.创建自定义View @.在onSizeChanged中,通过如参w(宽)和h(高)的比较来判断手机是处于横向(Landscape)还是纵向(Portrait). @.在onDraw中进行自定义View的界面绘制. @.绘制界面需要Canvas和Paint: 1.Cnavas:用来控制画什么,比如画直线(drawLine).画矩

openstack4j a java sample demo

This is  A sample Demo package edu.hnu.lost.openstack.test; import java.util.List; import javax.ws.rs.client.Entity; import org.openstack.common.client.AbstractOpenStackClient;import org.openstack.keystone.KeystoneClient;import org.openstack.keystone

hbase java sample

通过HBaseAdmin维护表(创建,删除表) import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.

solrcloud sample

在solrcloud出来之前,如果通过solrj连接solrserver,需要程序自己实现一致性hash.新版本的solr支持cloud的部署方式,可以自动实现lb和sharding的功能(通过CloudSolrServer类连接cloud),可以用下面代码做测试需要的jar包如下: apache-solr-solrj.jar apache-solr-core.jar zookeeper.jar    commons-logging.jar  apache-logging-log4j.jar 

如何将经纬度利用Google Map API显示C# VS2005 Sample Code

原文 如何将经纬度利用Google Map API显示C# VS2005 Sample Code 日前写了一篇如何用GPS抓取目前所在,并回传至资料库储存,这篇将会利用这些回报的资料,将它显示在地图上,这个做法有两种,最简单的就是直接传值到Google Maps上. 举例来说,当我们知道经纬度后,只要将数据套到以下网址即可. http://maps.google.com/maps?q=25.048346%2c121.516396 在参数q=后面,就可以加上经纬度了. 25.048346是Lati

解决新建Support7Demos的sample时出现编译错误和运行报错出现的问题

右键New->Other->Android Sample Project->选择Android 4.4.2->选择Support7Demos ,finish. 会出现编译错误 解决如下: 1.把这三个文件导入到Eclipse里, 这三个是库文件,并且勾选COPY到工作空间里选项 如果没有这三个文件,就从这里下载 2.导入三个library工程后,mediarouter会无法编译,打开工程属性在anroid里将api-level改成17,添加依赖工程appcompat 3.就是将刚才

Python 对不均衡数据进行Over sample(重抽样)

需要重采样的数据文件(Libsvm format),如heart_scale +1 1:0.708333 2:1 3:1 4:-0.320755 5:-0.105023 6:-1 7:1 8:-0.419847 9:-1 10:-0.225806 12:1 13:-1 -1 1:0.583333 2:-1 3:0.333333 4:-0.603774 5:1 6:-1 7:1 8:0.358779 9:-1 10:-0.483871 12:-1 13:1 .... 重采样后的数据保存文件(Lib