YARN集群的mapreduce测试(五)

将user表计算后的结果分区存储

测试准备:

首先同步时间,然后master先开启hdfs集群,再开启yarn集群;用jps查看:

master上: 先有NameNode、SecondaryNameNode;再有ResourceManager;

slave上:   先有DataNode;再有NodeManager;

如果master启动hdfs和yarn成功,但是slave节点有的不成功,则可以使用如下命令手动启动:

hadoop-daemon.sh start datanode
yarn-daemon.sh start nodemanager

然后在集群的主机本地环境创建myinfo.txt;内容如下:

然后将测试文件myinfo.txt上传到集群中:

测试目标:

hadoop集群分区及缓存:
1、分区是必须要经历Shuffle过程的,没有Shuffle过程无法完成分区操作
2、分区是通过MapTask输出的key来完成的,默认的分区算法是数组求模法:

数组求模法:
将Map的输出Key调用hashcode()函数得到的哈希吗(hashcode),此哈希吗是一个数值类型,将此哈希吗数值直接与整数的最大值(Integer.MAXVALUE)取按位与(&)操作,将与操作的结果与ReducerTask
的数量取余数,将此余数作为当前Key落入的Reduce节点的索引;
-------------------------
Integer mod = (Key.hashCode()&Integer.MAXVALUE)%NumReduceTask;
被除数=34567234
NumReduceTas=3
------结果:
0、1、2 这三个数作为Reduce节点的索引;
数组求模法是有HashPartitioner类来实现的,也是MapReduce分区的默认算法;

测试代码:

 1 package com.mmzs.bigdata.yarn.mapreduce;
 2
 3 import java.io.IOException;
 4
 5 import org.apache.hadoop.io.LongWritable;
 6 import org.apache.hadoop.io.Text;
 7 import org.apache.hadoop.mapreduce.Mapper;
 8
 9 public class PartitionMapper extends Mapper<LongWritable, Text,LongWritable, Text>{
10     private LongWritable outKey;
11     private Text outValue;
12
13
14     @Override
15     protected void setup(Mapper<LongWritable, Text, LongWritable, Text>.Context context)
16             throws IOException, InterruptedException {
17         outKey = new LongWritable();
18         outValue= new Text();
19     }
20
21     @Override
22     protected void cleanup(Mapper<LongWritable, Text, LongWritable, Text>.Context context)
23             throws IOException, InterruptedException {
24         outKey=null;
25         outValue=null;
26     }
27
28     @Override
29     protected void map(LongWritable key, Text value,
30             Mapper<LongWritable, Text, LongWritable, Text>.Context context)
31             throws IOException, InterruptedException {
32         String[] fields=value.toString().split("\\s+");
33         Long userId=Long.parseLong(fields[0]);
34         outKey.set(userId);
35         outValue.set(new StringBuilder(fields[1]).append("\t").append(fields[2]).toString());
36         context.write(outKey, outValue);
37
38     }
39
40
41
42
43
44 }

PartitionMapper

 1 package com.mmzs.bigdata.yarn.mapreduce;
 2
 3 import org.apache.hadoop.io.LongWritable;
 4 import org.apache.hadoop.mapreduce.Partitioner;
 5
 6 public class MyPartitoner extends Partitioner {
 7
 8     @Override
 9     public int getPartition(Object key, Object value, int num) {
10         LongWritable userId=(LongWritable)key;
11         Long userCode=userId.get();
12         //分区的依据
13         if(userCode<6){
14             return 0;
15         }else if(userCode<10){
16             return 1;
17         }else{
18             return 2;
19         }
20     }
21
22 }

MyPartitoner

 1 package com.mmzs.bigdata.yarn.mapreduce;
 2
 3 import java.io.IOException;
 4 import java.util.Iterator;
 5 import org.apache.hadoop.io.LongWritable;
 6 import org.apache.hadoop.io.Text;
 7 import org.apache.hadoop.mapreduce.Reducer;
 8
 9 public class PartitionReducer extends Reducer<LongWritable, Text, LongWritable, Text>{
10
11     @Override
12     protected void cleanup(Reducer<LongWritable, Text, LongWritable, Text>.Context context)
13             throws IOException, InterruptedException {
14         super.cleanup(context);
15     }
16
17     @Override
18     protected void reduce(LongWritable key, Iterable<Text> values,
19             Reducer<LongWritable, Text, LongWritable, Text>.Context context) throws IOException, InterruptedException {
20             Iterator<Text> its= values.iterator();
21             if(its.hasNext()){
22                 context.write(key, its.next());
23             }
24     }
25
26     @Override
27     protected void setup(Reducer<LongWritable, Text, LongWritable, Text>.Context context)
28             throws IOException, InterruptedException {
29         super.setup(context);
30     }
31
32
33 }

PartitionReducer

 1 package com.mmzs.bigdata.yarn.mapreduce;
 2
 3 import java.io.IOException;
 4 import java.net.URI;
 5 import java.net.URISyntaxException;
 6
 7 import org.apache.hadoop.conf.Configuration;
 8 import org.apache.hadoop.fs.FileSystem;
 9 import org.apache.hadoop.fs.Path;
10 import org.apache.hadoop.io.LongWritable;
11 import org.apache.hadoop.io.Text;
12 import org.apache.hadoop.mapreduce.Job;
13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
15
16 public class ParititionDriver {
17 private static FileSystem fs;
18
19     private static Configuration conf;
20
21     static{
22         String uri="hdfs://master01:9000/";
23         conf=new Configuration();
24         try {
25             fs=FileSystem.get(new URI(uri),conf,"hadoop");
26         } catch (IOException e) {
27             // TODO Auto-generated catch block
28             e.printStackTrace();
29         } catch (InterruptedException e) {
30             // TODO Auto-generated catch block
31             e.printStackTrace();
32         } catch (URISyntaxException e) {
33             // TODO Auto-generated catch block
34             e.printStackTrace();
35         }
36     }
37
38
39
40     public static void main(String[] args) throws Exception{
41         Job wcJob =getJob(args);
42         if(null==wcJob)return;
43                 /*
44                  *提交Job到集群并等到Job运行完成,参数true表示将Job的运行是状态信息返回到
45                  *客户端控制台输出,返回值的布尔值代表Job是否运行成功
46                  */
47                 boolean flag=wcJob.waitForCompletion(true);
48                 System.exit(flag?0:1);
49
50     }
51     public static Job getJob(String[] args) throws Exception{
52         if(null==args||args.length<2)return null;
53         //放置需要处理的数据所在的HDFS路径
54         Path inputPath=new Path(args[0]);
55         //放置Job作业执行完成之后其处理结果的输出路径
56         Path ouputPath=new Path(args[1]);
57         //如果输出目录已经存在则将其删除并重建
58         if(!fs.exists(inputPath))return null;
59         if(fs.exists(ouputPath)){
60             fs.delete(ouputPath,true);
61         }
62         //获取Job实例
63         Job wcJob=Job.getInstance(conf, "PartitionerJob");
64         //设置运行此jar包的入口类
65         wcJob.setJarByClass(ParititionDriver.class);
66         //设置job调用的Mapper类
67         wcJob.setMapperClass(PartitionMapper.class);
68         //设置job调用的Reducer类(如果一个job没有ReduceTask则此条语句可以不掉用)
69         wcJob.setReducerClass(PartitionReducer.class);
70         //设置MapTask的输出值类型
71         wcJob.setMapOutputKeyClass(LongWritable.class);
72         //设置MapTask的输出键类型
73         wcJob.setMapOutputValueClass(Text.class);
74         //设置整个Job的输出键类型
75         wcJob.setOutputKeyClass(LongWritable.class);
76         //设置整个Job的输出值类型
77         wcJob.setOutputValueClass(Text.class);
78
79         //设置分区类
80         wcJob.setPartitionerClass(MyPartitoner.class);
81         wcJob.setNumReduceTasks(2);//这个数字和MyPartitioner类中的三种分区依据相对应
82         //如果将数字调整大了,那么只有分区依据的前三个文件有内容,多出任务对应的仅仅是个空分区、空文件;
83         //如果将数字调整小了,那么将得不到任何一个分区结果
84
85         //设置整个Job需要处理数据的输入路径
86         FileInputFormat.setInputPaths(wcJob, inputPath);
87         //设置整个Job需要计算结果的输出路径
88         FileOutputFormat.setOutputPath(wcJob, ouputPath);
89         return wcJob;
90     }
91
92
93 }

ParititionDriver

测试结果:

深入测试:(修改PartitionDriver类的如下代码)

//设置分区类
wcJob.setPartitionerClass(MyPartitoner.class);
wcJob.setNumReduceTasks(3);//这个数字和MyPartitioner类中的三种分区依据相对应

//如果将数字调整大了(比如调整为4),那么只有分区依据的前三个文件有内容,多出任务对应的仅仅是个空分区、空文件;

//如果将数字调整小了(比如调整为2),那么将得不到任何一个分区结果

小结:

1、数据量需要达到一定的数量级使用hadoop集群来处理才是划算的
2、集群的计算性能取决于任务数量的多少,设置任务数量必须充分考虑到集群的计算能力(比如:物理节点数量);
a、Map设置的任务数量作为最小值参考
b、Reduce的任务数默认是1(使用的也是默认的Partitioner类),如果设置了则启动设置的数量;
不管MapTask还是ReduceTask,只要任务数量越多则并发能力越强,处理效率会在一定程度上越高,但是设置的任务数量必须参考集群中的物理节点数量,如果设置的任务数量过多,会导致每个物理节点上分摊的任务数量越多,处理器并发每一个任务产生的计算开销越大,任务之间因处理负载导致相互之间的影响非常大,任务失败率上升(任务失败时会重新请求进行计算,最多重新请求3次),计算性能反而下降,因此在设计MapTask与ReduceTask任务数量时必须权衡利弊,折中考虑...

时间: 2024-08-27 12:59:10

YARN集群的mapreduce测试(五)的相关文章

YARN集群的mapreduce测试(四)

将手机用户使用流量的数据进行分组,排序: 测试准备: 首先同步时间,然后master先开启hdfs集群,再开启yarn集群:用jps查看: master上: 先有NameNode.SecondaryNameNode;再有ResourceManager; slave上:   先有DataNode:再有NodeManager: 如果master启动hdfs和yarn成功,但是slave节点有的不成功,则可以使用如下命令手动启动: hadoop-daemon.sh start datanode yar

十六:mapreduce程序在yarn集群中的调度过程

mapreduce程序在yarn集群中的调度过程: 1.客户端想ResouceManager提交一个job作业,申请运行一个MR的程序,RPC调用 2.ResourceManager返回一个由创建的jobid目录. 3.在HDFS该目录下有一个以jobid命名的目录并,写入job.xml和job分片数据,job.jar,jobConfinger 4.通知RM,job的资源文件提交完毕. 5.初始化一个任务 然后放到队列中去 6.nodemanager 和ResouceManager 保持心跳进行

Mapreduce提交YARN集群运行

Eclipse项目打包1.export2.通过maven打包,切入到项目目录下执行命令mvn clean package Mapreduce提交YARN集群运行 将jar包传到hadoop目录下运行格式:bin/hadoop jar  jar包名   包名(代码的包名).类名 +参数(输入路径输出路径)就可以在集群上运行了 原文地址:https://www.cnblogs.com/libin123/p/10330330.html

大数据学习之MapReduce基础与Yarn集群安装09

1大数据解决的问题? 海量数据的存储:hadoop->分布式文件系统HDFS 海量数据的计算:hadoop->分布式计算框架MapReduce 2什么是MapReduce? 分布式程序的编程框架,java->ssh ssm ,目的:简化开发! 是基于hadoop的数据分析应用的核心框架. mapreduce的功能:将用户编写的业务逻辑代码和自带默认组件整合成一个完整的 分布式运算程序,并发的运行在hadoop集群上. 3 MapReduce的优缺点 优点: (1)易于编程 (2)良好的拓

HDFS集群和YARN集群

Hadoop集群环境搭建(一) 1集群简介 HADOOP集群具体来说包含两个集群:HDFS集群和YARN集群,两者逻辑上分离,但物理上常在一起 HDFS集群: 负责海量数据的存储,集群中的角色主要有 NameNode / DataNode YARN集群: 负责海量数据运算时的资源调度,集群中的角色主要有 ResourceManager /NodeManager 本集群搭建案例,以3节点为例进行搭建,角色分配如下: hdp-node-01 NameNode SecondaryNameNode Re

Rabbitmq集群高可用测试

Rabbitmq集群高可用 RabbitMQ是用erlang开发的,集群非常方便,因为erlang天生就是一门分布式语言,但其本身并不支持负载均衡. Rabbit模式大概分为以下三种:单一模式.普通模式.镜像模式 单一模式:最简单的情况,非集群模式. 没什么好说的. 普通模式:默认的集群模式. 对于Queue来说,消息实体只存在于其中一个节点,A.B两个节点仅有相同的元数据,即队列结构. 当消息进入A节点的Queue中后,consumer从B节点拉取时,RabbitMQ会临时在A.B间进行消息传

大数据【三】YARN集群部署

一 概述 YARN是一个资源管理.任务调度的框架,采用master/slave架构,主要包含三大模块:ResourceManager(RM).NodeManager(NM).ApplicationMaster(AM). >ResourceManager负责所有资源的监控.分配和管理,运行在主节点: >NodeManager负责每一个节点的维护,运行在从节点: >ApplicationMaster负责每一个具体应用程序的调度和协调,只有在有任务正在执行时存在. 对于所有的applicati

5 weekend01、02、03、04、05、06、07的分布式集群的HA测试 + hdfs--动态增加节点和副本数量管理 + HA的java api访问要点

weekend01.02.03.04.05.06.07的分布式集群的HA测试 1)  weekend01.02的hdfs的HA测试 2)  weekend03.04的yarn的HA测试 1)  weekend01.02的hdfs的HA测试 首先,分布式集群都是正常的,且工作的 然后呢, 以上是,weekend01(active).weekend02(standby) 当weekend01给kill, 变成weekend01(standby).weekend02(active) 模拟weekend

一个痛苦的问题的解决,windows上eclipse提交yarn集群的错误

--------一个痛苦的解决问题的过程--------------------------------------    首先确保linux服务器上面的集群环境启动    集群启动     start-dfs.sh     stop-dfs.sh      start-yarn.sh      stop-yarn.sh               [[email protected] sbin]$ jps         3522 NameNode         4823 Jps