流量汇总(自定义jar包,在hadoop集群上 统计,排序,分组)之统计

小知识点:

half:关机

yarn端口:8088

删除hdfs目录:hadoop fs -rm -r /wc/output

namenode两个状态都是standby原因:zookeeper没有比hdfs先启动

现在来做一个流量统计的例子:

首先数据是这样一张表:见附件

统计:(代码)

1,flowbean:

package cn.itcast.hadoop.mr.flowsum;

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

import org.apache.hadoop.io.Writable;

import org.apache.hadoop.io.WritableComparable;

public class FlowBean implements WritableComparable<FlowBean> {

private String phoneNB;

private long up_flow;

private long d_flow;

private long s_flow;

//在反序列化时候反射机制需要调用空参数构造方法,所以显示定义了一个空参构造函数

public FlowBean() {}

//为了对象数据的初始化方便,加入一个带参数的构造函数

public FlowBean(String phoneNB, long up_flow, long d_flow) {

super();

this.phoneNB = phoneNB;

this.up_flow = up_flow;

this.d_flow = d_flow;

this.s_flow = up_flow+d_flow;

}

@Override

public String toString() {

return ""+up_flow +"\t" +d_flow + "\t"+ s_flow;

}

public String getPhoneNB() {

return phoneNB;

}

public void setPhoneNB(String phoneNB) {

this.phoneNB = phoneNB;

}

public long getUp_flow() {

return up_flow;

}

public void setUp_flow(long up_flow) {

this.up_flow = up_flow;

}

public long getD_flow() {

return d_flow;

}

public void setD_flow(long d_flow) {

this.d_flow = d_flow;

}

public long getS_flow() {

return s_flow;

}

public void setS_flow(long s_flow) {

this.s_flow = s_flow;

}

//从数据流中反序列化出对象的数据

// 从数据流中独处对象字段时候,必须跟序列化的顺序保持一致

@Override

public void readFields(DataInput in) throws IOException {

phoneNB = in.readUTF();

up_flow=in.readLong();

d_flow=in.readLong();

s_flow=in.readLong();

}

//将对象数据序列化到流中

@Override

public void write(DataOutput out) throws IOException {

out.writeUTF(phoneNB);

out.writeLong(up_flow);

out.writeLong(d_flow);

out.writeLong(s_flow);

}

//比较,在这里实现了排序

@Override

public int compareTo(FlowBean o) {

return s_flow>o.getS_flow()?-1:1;

}

}

2,flowsumMapper:

package cn.itcast.hadoop.mr.flowsum;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

/**

* @author yw.wang

* FlowBean 是我们自定义的一种数据类型,要在hadoop的各个节点之间传输,所以应该遵循hadoop的序列化机制

* 就必须实现hadoop的序列化接口

*

*/

public class FlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean> {

// 拿到日志中的一行数据,切分各个字段,抽取我们需要的字段:手机号,上行流量,下行流量,然后封装成kv类型发送出去,到reduce

@Override

protected void map(LongWritable key, Text value,Context context)

throws IOException, InterruptedException {

//拿一行数据

String line = value.toString();

//切分成各个字段

String[] fields = StringUtils.split(line,"\t");

//拿到我们需要的字段

String phoneNB = fields[0];

long u_flow =Long.parseLong(fields[7]);

long d_flow =Long.parseLong(fields[8]);

//封装数据为kv类型并输出

context.write(new Text(phoneNB), new FlowBean(phoneNB,u_flow,d_flow));

}

}

3,flowsumreducer

package cn.itcast.hadoop.mr.flowsum;

import java.io.IOException;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

public class FlowSumReducer extends Reducer<Text, FlowBean, Text, FlowBean>{

//框架每传递一组数据<1237435262,{flowbean,flowbean,flowbean....}>

//reduce中的业务逻辑就是遍历values,然后进行累加求和再输出

@Override

protected void reduce(Text key, Iterable<FlowBean> values,Context context)

throws IOException, InterruptedException {

long up_flow_counter= 0;

long d_flow_counter=0;

for (FlowBean bean : values) {

up_flow_counter +=bean.getUp_flow();

d_flow_counter+=bean.getD_flow();

}

context.write(key, new FlowBean(key.toString(),up_flow_counter,d_flow_counter));

}

}

4,flowsumrunner:

package cn.itcast.hadoop.mr.flowsum;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.InputFormat;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.OutputFormat;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

//这是job描述和提交类的规范写法

public class FlowSumRunner extends Configured implements Tool{

@Override

public int run(String[] args) throws Exception {

Configuration conf = new Configuration();

Job job = Job.getInstance(conf);

job.setJarByClass(FlowSumRunner.class);

job.setMapperClass(FlowSumMapper.class);

job.setReducerClass(FlowSumReducer.class);

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(FlowBean.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(FlowBean.class);

FileInputFormat.setInputPaths(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

return job.waitForCompletion(true)?0:1;

}

public static void main(String[] args) throws Exception {

int res = ToolRunner.run(new Configuration(), new FlowSumRunner(), args);

System.exit(res);

}

}

打成jar包:

在集群中使用命令:

hadoop  jar  /root/Documents/sum.jar   cn.itcast.hadoop.mr.flowsum.FlowSumRunner  /wc/data/  /wc/sumoutput

解释:

排序:

代码:

  1. package cn.itcast.hadoop.mr.flowsort;
  2. import java.io.IOException;
  3. import org.apache.commons.lang.StringUtils;
  4. import org.apache.hadoop.conf.Configuration;
  5. import org.apache.hadoop.fs.Path;
  6. import org.apache.hadoop.io.LongWritable;
  7. import org.apache.hadoop.io.NullWritable;
  8. import org.apache.hadoop.io.Text;
  9. import org.apache.hadoop.mapreduce.Job;
  10. import org.apache.hadoop.mapreduce.Mapper;
  11. import org.apache.hadoop.mapreduce.Reducer;
  12. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  13. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  14. import cn.itcast.hadoop.mr.flowsum.FlowBean;
  15. public class SortMR {
  16. public static class SortMapper extends Mapper<LongWritable, Text, FlowBean, NullWritable>{
  17. //拿到一行数据,切分出各字段,封装为一个flowbean,作为key输出
  18. @Override
  19. protected void map(LongWritable key, Text value,Context context)
  20. throws IOException, InterruptedException {
  21. String line = value.toString();
  22. String[] fields = StringUtils.split(line, "\t");
  23. String phoneNB = fields[0];
  24. long u_flow = Long.parseLong(fields[1]);
  25. long d_flow = Long.parseLong(fields[2]);
  26. context.write(new FlowBean(phoneNB, u_flow, d_flow), NullWritable.get());
  27. }
  28. }
  29. public static class SortReducer extends Reducer<FlowBean, NullWritable, Text, FlowBean>{
  30. @Override
  31. protected void reduce(FlowBean key, Iterable<NullWritable> values,Context context)
  32. throws IOException, InterruptedException {
  33. String phoneNB = key.getPhoneNB();
  34. context.write(new Text(phoneNB), key);
  35. }
  36. }
  37. public static void main(String[] args) throws Exception {
  38. Configuration conf = new Configuration();
  39. Job job = Job.getInstance(conf);
  40. // main方法所在的类,此处表示自身的类
  41. job.setJarByClass(SortMR.class);
  42. //会代表map,reduce的output,如果不一样可以申明mapoutput类型,像下面的一样
  43. job.setMapperClass(SortMapper.class);
  44. job.setReducerClass(SortReducer.class);
  45. // mapoutput类型
  46. job.setMapOutputKeyClass(FlowBean.class);
  47. job.setMapOutputValueClass(NullWritable.class);
  48. job.setOutputKeyClass(Text.class);
  49. job.setOutputValueClass(FlowBean.class);
  50. //这两个参数正好是 hadoop jar 。。 最后两个参数
  51. FileInputFormat.setInputPaths(job, new Path(args[0]));
  52. FileOutputFormat.setOutputPath(job, new Path(args[1]));
  53. //标准输出
  54. System.exit(job.waitForCompletion(true)?0:1);
  55. }
  56. }

排序是针对统计的结果进行排序,故数据元是统计完成之后的00000success那个文件

分组:

FlowSumArea :

  1. package cn.itcast.hadoop.mr.areapartition;
  2. import java.io.IOException;
  3. import org.apache.commons.lang.StringUtils;
  4. import org.apache.hadoop.conf.Configuration;
  5. import org.apache.hadoop.fs.Path;
  6. import org.apache.hadoop.io.LongWritable;
  7. import org.apache.hadoop.io.Text;
  8. import org.apache.hadoop.mapreduce.Job;
  9. import org.apache.hadoop.mapreduce.Mapper;
  10. import org.apache.hadoop.mapreduce.Reducer;
  11. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  12. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  13. import org.apache.hadoop.metrics2.impl.ConfigBuilder;
  14. import cn.itcast.hadoop.mr.flowsum.FlowBean;
  15. /**
  16. * 对流量原始日志进行流量统计,将不同省份的用户统计结果输出到不同文件
  17. * 需要自定义改造两个机制
  18. * 1,改造分区的逻辑,自定义一个partitioneer
  19. * 2,自定义reduer task的并发任务数
  20. */
  21. public class FlowSumArea {
  22. public static class FlowSumAreaMapper extends Mapper<LongWritable, Text, Text, FlowBean>{
  23. @Override
  24. protected void map(LongWritable key, Text value,Context context)
  25. throws IOException, InterruptedException {
  26. //拿一行数据
  27. String line = value.toString();
  28. //切分成各个字段
  29. String[] fields = StringUtils.split(line,"\t");
  30. //拿到我们的字段
  31. String phoneNB = fields[1];
  32. long u_flow = Long.parseLong(fields[7]);
  33. long d_flow = Long.parseLong(fields[8]);
  34. //封装数据为kv并输出
  35. context.write(new Text(phoneNB), new FlowBean(phoneNB,u_flow,d_flow));
  36. }
  37. }
  38. public static class FlowSumAreaReducer extends Reducer<Text, FlowBean, Text, FlowBean>{
  39. @Override
  40. protected void reduce(Text key, Iterable<FlowBean> values,Context context)
  41. throws IOException, InterruptedException {
  42. long up_flow_counter = 0;
  43. long d_flow_counter = 0;
  44. for (FlowBean bean : values) {
  45. up_flow_counter +=bean.getUp_flow();
  46. d_flow_counter += bean.getD_flow();
  47. }
  48. context.write(key, new FlowBean(key.toString(),up_flow_counter,d_flow_counter));
  49. }
  50. }
  51. public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
  52. Configuration conf = new Configuration();
  53. Job job = Job.getInstance(conf);
  54. job.setJarByClass(FlowSumArea.class);
  55. //job.setMapperClass(FlowSumAreaMapper.class);
  56. job.setMapperClass(FlowSumAreaMapper.class);
  57. job.setReducerClass(FlowSumAreaReducer.class);
  58. //设置我们自定义的分组逻辑定义
  59. job.setPartitionerClass(AreaPartitioner.class);
  60. job.setOutputKeyClass(Text.class);
  61. job.setOutputValueClass(FlowBean.class);
  62. //设置reduce的任务并发数,应该跟分组的数量保持一致
  63. job.setNumReduceTasks(6);
  64. //进程数如果大了,后面的文件为空,小了会出现错误,为1则没有分组
  65. FileInputFormat.setInputPaths(job, new Path(args[0]));
  66. FileOutputFormat.setOutputPath(job, new Path(args[1]));
  67. System.exit(job.waitForCompletion(true)?0:1);
  68. }
  69. }

AreaPartitioner :

  1. package cn.itcast.hadoop.mr.areapartition;
  2. import java.util.HashMap;
  3. import org.apache.hadoop.mapreduce.Partitioner;
  4. public class AreaPartitioner<KEY, VALUE> extends Partitioner<KEY, VALUE> {
  5. private static HashMap<String,Integer> areaMap = new HashMap<>();
  6. static{
  7. areaMap.put("135", 0);
  8. areaMap.put("136", 1);
  9. areaMap.put("137", 2);
  10. areaMap.put("138", 3);
  11. areaMap.put("139", 4);
  12. }
  13. @Override
  14. public int getPartition(KEY key, VALUE value, int numPartitions) {
  15. //从key中拿到手机号,查询手机归属地字典,不同省份返回不同的组号
  16. int areaCoder = areaMap.get(key.toString().substring(0,3))==null?5:areaMap.get(key.toString().substring(0,3));
  17. return areaCoder;
  18. }
  19. }

运行:

hadoop jar /root/Documents/area.jar cn.itcast.hadoop.mr.areapartition.FlowSumArea /wc/data /wc/areasoutput

至此,mapreduce的流量统计,分组,排序工作完成了

来自为知笔记(Wiz)

附件列表

时间: 2024-08-06 03:34:39

流量汇总(自定义jar包,在hadoop集群上 统计,排序,分组)之统计的相关文章

通过eclipse方法来操作Hadoop集群上cassandra数据库(包括创建Keyspace对象以及往数据库写入数据)

(1)下载cassandra,我所用版本为apache-cassandra-2.0.13-bin.tar.gz(hadoop版本为1.0.1),将其上传到hadoop集群,然后解压,tar -xzf apache-cassandra-2.0.13-bin.tar.gz; 并改名为 cassandra,放在目录/usr/下面,然后修改几个文件: vim cassandra.yaml  按照下面的字段修改 data_file_directories: - /usr/cassandra/data # 

Hadoop集群上使用JNI,调用资源文件

hadoop是基于java的数据计算平台,引入第三方库,例如C语言实现的开发包将会大大增强数据分析的效率和能力. 通常在是用一些工具的时候都要用到一些配置文件.资源文件等.接下来,借一个例子来说明hadoop上面如何使用JNI.以及调用资源文件. 首先介绍一下ICTClass,ICTClass是中国科学院开发的一个分词软件(ICTClass官网).该套软件采用C/C++编写.ICTClass虽然支持java,但是必须使用到的JNI技术.因此,在使用ICTClass之前需要配置好JNI资源以及IC

在Hadoop集群上运行R程序--安装RHadoop

RHadoop是由Revolution Analytics发起的一个开源项目,它可以将统计语言R与Hadoop结合起来.目前该项目包括三个R packages,分别为支持用R来编写MapReduce应用的rmr.用于R语言访问HDFS的rhdfs以及用于R语言访问HBASE的rhbase.下载网址为https://github.com/RevolutionAnalytics/RHadoop/wiki/Downloads. 说明:下面的记录是在安装成功后的总结,中间的过程描述及解决方法可能并不精确

IDEA maven打jar包在linux hadoop集群上运行

1.在IDEA pom.xml 中添加 <build> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> <source>1.8</source> <target>1.8</target> </c

在Hadoop集群上安装Sqoop数据迁移工具

集群如下: HostName          IP                    Soft                                Processh1                    192.168.1.31    Hadoop,Hbase                          NameNode(Active),DFSZKFailoverController,HMaster(Active)h2                    192.168

使用HDFS客户端java api读取hadoop集群上的信息

本文介绍使用hdfs java api的配置方法. 1.先解决依赖,pom <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.2</version> <scope>provided</scope> </dependency> 2.配置文

一个Hadoop集群上搭建多个Hbase集群

即不同的集群在hdfs上建立不同的根目录和Zooeekper的根目录.如图所示:原来的hbase-0.94.14版本中在hdfs上目录是hbase, zookeeper的根目录是zookeeper_data.hbase-0.96.8版本中在hdfs上目录是index,zookeeper的根目录是zookeeper_data_inidex.

搭建Hadoop集群 (三)

通过 搭建Hadoop集群 (二), 我们已经可以顺利运行自带的wordcount程序. 下面学习如何创建自己的Java应用, 放到Hadoop集群上运行, 并且可以通过debug来调试. 有多少种Debug方式 Hadoop在Eclipse上的Debug方式 一般来说, Debug最多的应用场景是调试MR中的代码逻辑, 还有部分是调试main方法中的某些代码逻辑. 无论是Standalone, Pesudo-Distributed, 还是Fully-Distributed Mode, 都可以d

Hadoop集群(第10期)_MySQL关系数据库

1.MySQL安装 MySQL下载地址:http://www.mysql.com/downloads/ 1.1 Windows平台 1)准备软件 MySQL版本:mysql-5.5.21-win32.msi 2)安装环境: 操作系统:Windows 7旗舰版 3)开始安装 第一步:双击"msi"安装文件,出现如图1.1-1界面——"MySQL安装向导",按"Next"继续. 图1.1-1 MySQL安装向导 第二步:在"I accept