使用Hadoop的MapReduce与HDFS处理数据

hadoop是一个分布式的基础架构,利用分布式实现高效的计算与储存,最核心的设计在于HDFS与MapReduce,HDFS提供了大量数据的存储,mapReduce提供了大量数据计算的实现,通过Java项目实现hadoop job处理海量数据解决复杂的需求。

一、基本环境及相关软件的配置

详细配置说明:基本环境配置及权限申请

二、hadoop项目开发流程

hadoop主要的开发为job的初始化与分布式处理流程的开发。

1、任务基本配置

首相根据业务需求,需要在代码中配置job在每台机器上需要的java虚拟机使用的内存与执行过程需要的最大内存。

Configuration configuration =
new Configuration();

configuration.set("mapreduce.map.java.opts",
"-Xmx2048m");

configuration.set("mapreduce.map.memory.mb",
"3072");

configuration.set("mapreduce.reduce.java.opts",
"-Xmx2048m");

configuration.set("mapreduce.reduce.memory.mb",
"3072");

Job job = new
Job(configuration, "miuilite-dailyRetain-"+arg[4]);

2、运行参数配置

job.setJarByClass(MiuiliteRetainJob.class);

MultipleOutputs.addNamedOutput(job, MIUIDanfaGeneralMapReduce.MULTI_OUTPUT_NAME_STATUS, SequenceFileOutputFormat.class,Text.class,
Text.
class);

job.setOutputFormatClass(SequenceFileOutputFormat.class);

MultipleInputs.addInputPath(job,
new Path(arg[0]), SequenceFileInputFormat.class, MiuiliteRetainMapReduce.NewLogMapper.class);

MultipleInputs .addInputPath(job,
new Path(arg[1]), SequenceFileInputFormat.class, MiuiliteRetainMapReduce.StatusLogMapper.class);

FileOutputFormat.setOutputPath(job,
new Path(arg[2]));

job.setReducerClass(MiuiliteRetainMapReduce.RetainReducer.class);

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(Text.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(LongWritable.class);

job.setNumReduceTasks(40);//配置节点数量

hadoop任务处理过程中,各个分布式机器读取操作数据都是通过分布式储存文件系统hdfs,并且分布式计算将中间结果或者最终结果都是保存到hdfs上的,所以在job开发过程中需要的配置有:

1)相关的地址:数据hdfs地址,中间状态缓存保存HDFS地址,以及生成的结果hdfs保存地址,(如需要本地进一步处理结果,还需要本地地址,需要将hdfs的结果地址拉取到本 地进行处理),本地服务器地址:

MultipleInputs.addInputPath(job, new Path(arg[0]), SequenceFileInputFormat.class, MiuiliteRetainMapReduce.NewLogMapper.class);

        注意:对于要写入数据的地址要具有写的权限,详细权限配置请看基本配置介绍。

2)各种数据格式:

一种是文件的读取格式,可以使用基于行的日志文件,也可以使用二进制格式,多行输入记录或其他的格式,Hadoop有自带的几种格式:


输入格式

解释

key

value
TextInputFormat 默认格式,按照行读取 行的字节偏移量 行的内容
KeyValueInputFormat 解析每一行的数据 第一个Tab前的字符 剩下的内容
SequenceFileInputFormat 具有高性能的二进制格式 自定义 自定义

所以在读取输入文件格式中,需要选择自己合适的格式来初始化  MultipleInputs.addInputPath(job, new Path(arg[0]),
SequenceFileInputFormat.class, MiuiliteRetainMapReduce.NewLogMapper.class);

对于自定义的SequenceFileInputFormat,它会读取特殊的特定于Hadoop的二进制文件,会让Hadoop的mapper快速读取数据。Sequence文件是块压缩的,并提供了对几种数据类型(不仅仅是文本类型)直接的序列化与反序列化操作。

其次文件读取key 与value的格式,以及输出到文件的格式:BooleanWritable:标准布尔型数值,ByteWritable:单字节数值,DoubleWritable:双字节数,FloatWritable:浮点数,IntWritable:整型数,LongWritable:长整型数,Text:使用UTF8格式存储的文本,NullWritable:当<key,value>中的key或value为空时使用,需要在初始化job的过程中初始化对应输入输出的格式。

3)配置数据处理类,一般分为两个阶段,

第一步叫做mapping,会对数据作为mapper函数的输入数据,每条数据对应一个,mapper会吧每次map处理后的结果可以根据相同的key单独传到一个输出数据元素里面。例子: MultipleInputs.addInputPath(job, new Path(arg[0]), SequenceFileInputFormat.class,
MiuiliteRetainMapReduce.NewLogMapper.class);。 

        注意:可以同时使用多个数据输入处理的mapper,但输出key与value格式必须保持一致。

第二步叫做reducer,会接收mapping的输出作为输入列表的迭代器,会将同一key的值聚合在一起,并做一定的处理而返回处理结果。例子:job.setReducerClass(MiuiliteRetainMapReduce.RetainReducer.class);

3、数据处理流程

在所有配置好了之后调用job.waitForCompletion(true);提交任务执行任务并等待结束。

Mapper阶段:

public
static
class NewLogMapper
extends Mapper<Object, BytesWritable, Text, Text> {

        @Override

        public
void map(Object key, BytesWritable value, Context context) {

        //..........省略中间处理原始数据过程,比如解密,生成OutPutKey等

        context.write(new
Text(OutPutKey), new
Text(OutPutValue);

    }

}

Redecer阶段:

public
static
  class RetainReducer
extends Reducer<Text, Text, Text, Text> {

        @Override

        public
void setup(Context context)
throws IOException, InterruptedException {

            super.setup(context);

            //数据初始化过程,初始化相关的计数工具

        }

        @Override

        public
void reduce(Text key, Iterable<Text> values, Context context) {

          //对应同一个key进行相关的统计处理阶段,并将数据计入到相关的计数工具中。

        }

        @Override

        protected
void cleanup(Reducer.Context context)
throws IOException, InterruptedException {

                stringCounter.output(context);

                super.cleanup(context);

         //执行完毕的后续阶段,将没台分布式计算的机器的结果输入到hdfs上,清理context,

        }

reducer完成后需要统一将处理结果写入到HDFS中,所以在统计工具中应带有最后的输出函数:

public
void
output(Reducer.Context context, int
longTailBar) throws
IOException, InterruptedException {

        for
(Iterator<String> iterator = stringCountMap.keySet().iterator(); iterator.hasNext();) {

            String key = iterator.next();

            long
value = stringCountMap.get(key);

            if
(value < longTailBar)

                continue;

            key = key.replace(‘\r‘,
‘ ‘);

            key = key.replace(‘\n‘,
‘ ‘);

            context.write(new
Text(key), new
LongWritable(value));

        }

    }

4、处理结果本地

hadoop处理后的结果都是保存在hdfs上的,可以将对应的结果作为行的任务的输入进一步精确处理,如果需要进一步本地处理,通过调用本地shell命令将结果复制到本地:

private
void
copyToLocal(String hdfsPath, String localPath)
throws IOException, InterruptedException {

        String[] cmd = {"/bin/sh",
"-c", "hadoop fs -cat "
+ hdfsPath + "/part* > "
+ localPath};

        String tmpDic = loalPath.substring(0,localPath.lastIndexOf("/"));

        if(!new
File(tmpDic).exists()){

            new
File(tmpDic).mkdirs();

        }

        if(!new
File(localPath).exists()){

            new
File(localPath).createNewFile();

        }

        Process pid = Runtime.getRuntime().exec(cmd);

        if
(pid != null) {

            pid.waitFor();

        }

    }

三、运行流程

运行shell命令配置

hadoop项目运行方式通过shell文件执行指定的jar包,并指定对应的入口函数,依据项目的需求传入不同的参数。

hadoop jar  miuiapp-logs.jar com.xiaomi.miui.logs.danfa.MiuiMihomeGeneralJob XXX-param-1 XXX-param-2 XXX-param-3

注意:如果通过crontab -e定时指定相关的命令运行,需要在运行的shell文件中添加 jdk,hadoop的地址到环境变量中。

注意:在配置pom过程中需要将jar包打成大包,将所有依赖的jar包都应该打进去,所以在pom中应该加入下列配置:

<plugin>

    <groupId>org.apache.maven.plugins</groupId>

    <artifactId>maven-dependency-plugin</artifactId>

</plugin>

<plugin>

    <groupId>org.apache.maven.plugins</groupId>

        <artifactId>maven-assembly-plugin</artifactId>

        <version>2.3</version>

        <configuration>

            <appendAssemblyId>false</appendAssemblyId>

            <descriptorRefs>

              <descriptorRef>jar-with-dependencies</descriptorRef>

            </descriptorRefs>

        </configuration>

            <executions>

               <execution>

                    <id>make-assembly</id>

                    <phase>package</phase>

                    <goals>

                        <goal>assembly</goal>

                    </goals>

                </execution>

            </executions>

</plugin>

时间: 2024-10-09 12:24:15

使用Hadoop的MapReduce与HDFS处理数据的相关文章

Hadoop(三)HDFS写数据的基本流程

HDFS写数据的流程 HDFS shell上传文件a.txt,300M 对文件分块,默认每块128M. shell向NameNode发送上传文件请求 NameNode检测文件系统目录树,看能否上传 NameNode向shell发送允许上传通知 shell向NameNode发送上传block1,备份为3的通知. NameNode检测DataNode信息池,查找的3台DataNode的IP,查找的IP有以下机制: 网络拓扑距离最近(经历交换机最少) 如果shell本身就是一个DataNode,本地会

MapReduce编程(六) 从HDFS导入数据到Elasticsearch

一.Elasticsearch for Hadoop安装 Elasticsearch for Hadoop并不像logstash.kibana一样是一个独立的软件,而是Hadoop和Elasticsearch交互所需要的jar包.所以,有直接下载和maven导入2种方式.安装之前确保JDK版本不要低于1.8,Elasticsearch版本不能低于1.0. 官网对声明是对Hadoop 1.1.x.1.2.x.2.2.x.2.4.x.2.6.x.2.7.x测试通过,支持较好,其它版本的也并不是不能用

用mapreduce实现将mysql数据导出到HDFS上

因为业务需要,需要将一批mysql数据导入到HBASE,现在先将数据从Mysql导出到HDFS. 版本:hadoop CDH4.5,Hbase-0.946 1.实体类 YqBean 是我的实体类,请根据自己需要修改,实体类需要 implements Writable, DBWritable. 2.MR实现 import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.conf.Configurati

使用MapReduce将HDFS数据导入到HBase(二)

package com.bank.service; import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;import org.apache.hadoop.hbase.mapred

使用MapReduce将HDFS数据导入到HBase(一)

package com.bank.service; import java.io.IOException; import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hba

Hadoop新MapReduce框架Yarn详解

简介 本文介绍了Hadoop自0.23.0版本后新的MapReduce框架(Yarn)原理,优势,运行机制和配置方法等,着重介绍新的yarn框架相对于原框架的差异及改进,并通过Demo示例详细介绍了在新的Yarn框架下搭建和开发Hadoop程序的方法.读者通过本文中新旧Hadoop MapReduce框架的对比,更深刻理解新的yarn框架技术与那里和设计思想,文中的Demo代码经过微小修改既可用于用户基于Hadoop新框架的实际生产环境. Hadoop MapReduceV2(Yarn)框架简介

Hadoop之MapReduce

http://blog.csdn.net/wangloveall/article/details/21407531 摘要:MapReduce是Hadoop的又一核心模块,从MapReduce是什么,MapReduce能做什么以及MapReduce的工作机制三方面认识MapReduce. 关键词:Hadoop   MapReduce     分布式处理 面对大数据,大数据的存储和处理,就好比一个人的左右手,显得尤为重要.Hadoop比较适合解决大数据问题,很大程度上依赖其大数据存储系统,即HDFS

Hadoop 新 MapReduce 框架 Yarn 详解

原 Hadoop MapReduce 框架的问题 对于业界的大数据存储及分布式处理系统来说,Hadoop 是耳熟能详的卓越开源分布式文件存储及处理框架,对于 Hadoop 框架的介绍在此不再累述,读者可参考 Hadoop 官方简介.使用和学习过老 Hadoop 框架(0.20.0 及之前版本)的同仁应该很熟悉如下的原 MapReduce 框架图: 图 1.Hadoop 原 MapReduce 架构 从上图中可以清楚的看出原 MapReduce 程序的流程及设计思路: 首先用户程序 (JobCli

对于Hadoop的MapReduce编程makefile

根据近期需要hadoop的MapReduce程序集成到一个大的应用C/C++书面框架.在需求make当自己主动MapReduce编译和打包的应用. 在这里,一个简单的WordCount1一个例子详细的实施细则,注意:hadoop版本号2.4.0. 源码包括两个文件.一个是WordCount1.java是详细的对单词计数实现的逻辑.第二个是CounterThread.java.当中简单的当前处理的行数做一个统计和打印.代码分别见附1. 编写makefile的关键是将hadoop提供的jar包的路径