使用MapReduce简单的数据清洗

package com.bank.service;

import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * 将非结构化的数据处理为结构化数据
 * @author mengyao
 *
 */
import com.bank.entity.CNY;

public class CnyDataFormat extends Configured implements Tool {

static class CnyDataFormatMapper extends Mapper<LongWritable, Text, NullWritable, CNY>{
        
        CNY cny = new CNY();
        
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] fields = line.split("\t");
            if (fields.length == 42) {
                String gzh = fields[12] ;
                String currency = fields[9];
                String version = fields[10];
                String valuta = fields[11];
                long qfTime;
                try {
                    qfTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(fields[3]+" "+fields[4]).getTime();
                } catch (ParseException e) {
                    qfTime = System.currentTimeMillis();
                }
                int flag = Integer.parseInt(fields[5]);
                String machineID = fields[13];
                cny.set(gzh, currency, version, valuta, qfTime, flag, machineID);
                context.write(NullWritable.get(), cny);
            } else {
                System.err.println(" ERROR: data format failed!");
            }
        }
    }
    
    static class CnyDataFormatReduce extends Reducer<NullWritable, CNY, NullWritable, CNY>{
        @Override
        protected void reduce(NullWritable key, Iterable<CNY> value, Context context) throws IOException, InterruptedException {
            for (CNY cny : value) {
                context.write(NullWritable.get(), cny);
            }
        }
    }

@Override
    public int run(String[] arg0) throws Exception {
        Job job = Job.getInstance(getConf(), CnyDataFormat.class.getSimpleName());
        job.setJarByClass(CnyDataFormat.class);                                //设置main函数所在的类
        
        FileInputFormat.setInputPaths(job, new Path(arg0[0]));
        job.setMapperClass(CnyDataFormatMapper.class);
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(CNY.class);
        
        job.setReducerClass(CnyDataFormatReduce.class);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(CNY.class);
        FileOutputFormat.setOutputPath(job, new Path(arg0[1]));
        
        return job.waitForCompletion(true) ? 0 : 1;                                //等待MapReduce执行完成并打印作业进度详情
    }

public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] paths = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (paths.length != 2) {
            System.err.println("Usage: " + CnyDataFormat.class.getName() + " <in> <out>");
            System.exit(2);
        }
        int status = ToolRunner.run(new CnyDataFormat(), args);
        System.exit(status);
        
    }
}

package com.bank.entity;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

/**
 * 实现Hadoop的序列化接口
 * @author mengyao
 *
 */
public class CNY implements Writable {

private String gzh;
    private String currency;
    private String version;
    private String valuta;
    private long qfTime;
    private int flag;
    private String machineID;
    
    @Override
    public void readFields(DataInput in) throws IOException {
        this.gzh = in.readUTF();
        this.currency = in.readUTF();
        this.version = in.readUTF();
        this.valuta = in.readUTF();
        this.qfTime = in.readLong();
        this.flag = in.readInt();
        this.machineID = in.readUTF();
    }
    
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(this.gzh);
        out.writeUTF(this.currency);
        out.writeUTF(this.version);
        out.writeUTF(this.valuta);
        out.writeLong(this.qfTime);
        out.writeInt(this.flag);
        out.writeUTF(this.machineID);
    }

public void set(String gzh, String currency, String version,
            String valuta, long qfTime, int flag, String machineID) {
        this.gzh = gzh;
        this.currency = currency;
        this.version = version;
        this.valuta = valuta;
        this.qfTime = qfTime;
        this.flag = flag;
        this.machineID = machineID;
    }

@Override
    public String toString() {
        return this.gzh +"\t"+ this.currency +"\t"+ this.version +"\t"+ this.valuta +"\t"+ this.qfTime +"\t"+ this.flag +"\t"+ this.machineID;
    }

public String getGzh() {
        return gzh;
    }

public void setGzh(String gzh) {
        this.gzh = gzh;
    }

public String getCurrency() {
        return currency;
    }

public void setCurrency(String currnecy) {
        this.currency = "cny";
    }

public String getVersion() {
        return version;
    }

public void setVersion(String version) {
        this.version = version;
    }

public String getValuta() {
        return valuta;
    }

public void setValuta(String valuta) {
        this.valuta = valuta;
    }

public long getQfTime() {
        return qfTime;
    }

public void setQfTime(long qfTime) {
        this.qfTime = qfTime;
    }

public int getFlag() {
        return flag;
    }

public void setFlag(int flag) {
        this.flag = flag;
    }

public String getMachineID() {
        return machineID;
    }

public void setMachineID(String machineID) {
        this.machineID = machineID;
    }
   
    
}

时间: 2025-01-02 04:53:26

使用MapReduce简单的数据清洗的相关文章

MapReduce简单使用

1.启动hadoop工程 2.MapReduce统计文本单词数量 public class WordCount { private static class WordMapper extends Mapper<LongWritable, Text, Text, IntWritable> { @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWrita

MapReduce 简单的全文搜索2

上一个全文搜索实现了模糊查找,这个主要实现了精确查找,就是比如你查找mapreduce is simple那么他就只查找有这个句子的文章,而不是查找有这三个单词的文章. 这个版本需要重写反向索引,因为需要查找句子,所以需要定位每个单词的在文章中的位置,所以我们的反向索引需要加上单词所在的位置,即我们希望的输出是: MapReduce file1.txt:<1,2,3>;file2.txt:<5,3,1>;这种格式的. 其实这一步比较简单.我们在map的时候输出为 “filename

MapReduce 简单的全文搜索

上一个已经实现了反向索引,那么为什么不尝试下全文搜索呢.例如有了 Hello     file3.txt:1; MapReduce     file3.txt:2;fil1.txt:1;fil2.txt:1; bye     file3.txt:1;  is     fil1.txt:1;fil2.txt:2; powerful     fil2.txt:1; simple     fil2.txt:1;fil1.txt:1; 那么我要找MapReduce is simple,那么就有file1

大数据学习之七——MapReduce简单代码实例

1.关于MapReduce MapReduce是一种可用于数据处理的编程模型,能够支持java.Python.C++等语言.MapReduce程序本质上是并行运行的,因此可以处理大规模数据集,这也是它的优势. 2.使用hadoop分析数据 hadoop提供了并行处理,我们将查询表示成MapReduce作业. MapReduce任务过程分为两个处理阶段:map阶段和reduce阶段.每个阶段都以键/值作为输入和输出,并选择它们的类型.程序员还需要定义两个函数:map函数和reduce函数. Jav

MapReduce简单实例:wordcount--大数据纪录片第五记

不知道为啥不是很想学习MapReduce方面的知识,不过现在这么想可能过段时间还是免不了去学,这边先记录下一个MapReduce的实例wordcount代码. 1. pom.xml: <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>RELEASE</version> <

python简单的数据清洗,数据筛选方法归类

创建数组有两种方式,1.直接赋值 2.随机变量生成随机生成包括4种:np.arange(20),np.linspace(0,10,5),np.logspace(0,2,5),np.random.random(3,2,3)np.arange(10,20,2) ##左闭右开区间,起始值,终止值,步长np.linspace(0,10,5) ##闭区间,起始值,终止值,元素个数 等差数列np.logspace(0,2,5) ##闭区间,起始值(以指数形式存在),终止值(以指数形式存在, 以10为底,2的

大数据采集、清洗、处理:使用MapReduce进行离线数据分析完整案例

[TOC] 1 大数据处理的常用方法 大数据处理目前比较流行的是两种方法,一种是离线处理,一种是在线处理,基本处理架构如下: 在互联网应用中,不管是哪一种处理方式,其基本的数据来源都是日志数据,例如对于web应用来说,则可能是用户的访问日志.用户的点击日志等. 如果对于数据的分析结果在时间上有比较严格的要求,则可以采用在线处理的方式来对数据进行分析,如使用Spark.Storm等进行处理.比较贴切的一个例子是天猫双十一的成交额,在其展板上,我们看到交易额是实时动态进行更新的,对于这种情况,则需要

Google MapReduce 中文版

摘要 MapReduce是一个编程模型,也是一个处理和生成超大数据集的算法模型的相关实现.用户首先创建一个Map函数处理一个基于key/value pair的数据集合,输出中间的基于key/value pair的数据集合:然后再创建一个Reduce函数用来合并所有的具有相同中间key值的中间value值.现实世界中有很多满足上述处理模型的例子,本论文将详细描述这个模型. MapReduce架构的程序能够在大量的普通配置的计算机上实现并行化处理.这个系统在运行时只关心:如何分割输入数据,在大量计算

数据挖掘|朴素贝叶斯算法

作者:张一 链接:https://zhuanlan.zhihu.com/p/21571692 来源:知乎 著作权归作者所有.商业转载请联系作者获得授权,非商业转载请注明出处. 因为后期的项目将涉及到各种各样的价格数据处理问题,所以我们现在开始学习一些简单的数据清洗与算法的知识.关于算法,以前听起来觉得好高大上,现在开始学,觉得书上的描述并不是很通俗易懂,所以用自己的语言来简要写一下这些算法~ 注:非商业转载注明作者即可,商业转载请联系作者授权并支付稿费.本人已授权"维权骑士"网站(ht