MapReduce 编程模板编写【分析网站基本指标UV】程序

1.网站基本指标的几个概念

PV: page view 浏览量

页面的浏览次数,用户每打开一次页面就记录一次。

UV:unique visitor 独立访客数

一天内访问某站点的人数(以cookie为例) 但是如果用户把浏览器cookie给删了之后再次访问会影响记录。

VV: visit view 访客的访问次数

记录所有访客一天内访问了多少次网站,访客完成访问直到浏览器关闭算一次。

IP:独立ip数

指一天内使用不同ip地址的用户访问网站的数量。

2.编写MapReduce编程模板

Driver

package mapreduce;
?
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
?
public class MRDriver extends Configured implements Tool {
?
    public int run(String[] args) throws Exception {
        //创建job
        Job job = Job.getInstance(this.getConf(),"mr-demo");
        job.setJarByClass(MRDriver.class);
?
        //input 默认从hdfs读取数据 将每一行转换成key-value
        Path inPath = new Path(args[0]);
        FileInputFormat.setInputPaths(job,inPath);
?
        //map 一行调用一次Map方法  对每一行数据进行分割
        job.setMapperClass(null);
        job.setMapOutputKeyClass(null);
        job.setMapOutputValueClass(null);
?
        //shuffle
        job.setPartitionerClass(null);//分组
        job.setGroupingComparatorClass(null);//分区
        job.setSortComparatorClass(null);//排序
?
        //reduce 每有一条key value调用一次reduce方法
        job.setReducerClass(null);
        job.setOutputKeyClass(null);
        job.setOutputValueClass(null);
?
        //output
        Path outPath = new Path(args[1]);
        //this.getConf()来自父类 内容为空可以自己set配置信息
        FileSystem fileSystem = FileSystem.get(this.getConf());
        //如果目录已经存在则删除
        if(fileSystem.exists(outPath)){
            //if path is a directory and set to true
            fileSystem.delete(outPath,true);
        }
        FileOutputFormat.setOutputPath(job, outPath);
        //submit
        boolean isSuccess = job.waitForCompletion(true);
        return isSuccess ? 0:1;
    }
?
    public static void main(String[] args) {
        Configuration configuration = new Configuration();
        try {
            int status = ToolRunner.run(configuration, new MRDriver(), args);
            System.exit(status);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
?

Mapper

public class MRModelMapper extends Mapper<LongWritable,Text,Text,LongWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        /**
         * 实现自己的业务逻辑
         */
    }
}

Reduce

public class MRModelReducer extends Reducer<Text,LongWritable,Text,LongWritable> {
?
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        /**
         * 根据业务需求自己实现
         */
    }
}

3. 统计每个城市的UV数

分析需求:

UV:unique view 唯一访问数,一个用户记一次

map:

key: CityId (城市id) 数据类型: Text

value: guid (用户id) 数据类型:Text

shuffle:

key: CityId

value: {guid guid guid..}

reduce:

key: CityId

value: 访问数 即shuffle输出value的集合大小

output:

key : CityId

value : 访问数

MRDriver.java mapreduce执行过程



package mapreduce;
?
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
?
public class MRDriver extends Configured implements Tool {
?
    public int run(String[] args) throws Exception {
        //创建job
        Job job = Job.getInstance(this.getConf(),"mr-demo");
        job.setJarByClass(MRDriver.class);
?
        //input 默认从hdfs读取数据 将每一行转换成key-value
        Path inPath = new Path(args[0]);
        FileInputFormat.setInputPaths(job,inPath);
?
        //map 一行调用一次Map方法  对每一行数据进行分割
        job.setMapperClass(MRMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
?
       /* //shuffle
        job.setPartitionerClass(null);//分组
        job.setGroupingComparatorClass(null);//分区
        job.setSortComparatorClass();//排序
*/
        //reduce
        job.setReducerClass(MRReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
?
        //output
        Path outPath = new Path(args[1]);
        FileSystem fileSystem = FileSystem.get(this.getConf());
        if(fileSystem.exists(outPath)){
            //if path is a directory and set to true
            fileSystem.delete(outPath,true);
        }
        FileOutputFormat.setOutputPath(job, outPath);

        //submit
        boolean isSuccess = job.waitForCompletion(true);
        return isSuccess ? 0:1;
    }
?
    public static void main(String[] args) {
        Configuration configuration = new Configuration();
        try {
            int status = ToolRunner.run(configuration, new MRDriver(), args);
            System.exit(status);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

MRMapper.java

package mapreduce;
?
import java.io.IOException;
?
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
?
public class MRMapper extends Mapper<LongWritable,Text,Text,Text> {
    private Text mapOutKey = new Text();
    private Text mapOutKey1 = new Text();

    //一行调用一次Map方法  对每一行数据进行分割
    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {

        //获得每行的值
        String str = value.toString();
        //按空格得到每个item
        String[] items = str.split("\t");

        if (items[24]!=null) {
            this.mapOutKey.set(items[24]);
            if (items[5]!=null) {
                this.mapOutKey1.set(items[5]);
            }
        }
        context.write(mapOutKey, mapOutKey1);
    }

}

MPReducer.java

package mapreduce;
?
import java.io.IOException;
import java.util.HashSet;
?
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
?
public class MRReducer extends Reducer<Text, Text, Text, IntWritable>{
?
    //每有一个key value数据 就执行一次reduce方法
    @Override
    protected void reduce(Text key, Iterable<Text> texts, Reducer<Text, Text, Text, IntWritable>.Context context)
            throws IOException, InterruptedException {

        HashSet<String> set = new HashSet<String>();

        for (Text text : texts) {
            set.add(text.toString());
        }

        context.write(key,new IntWritable(set.size()));

    }
}
 

4.MapReduce执行wordcount过程理解

input:默认从HDFS读取数据

 Path inPath = new Path(args[0]);
 FileInputFormat.setInputPaths(job,inPath);

将每一行数据转换为key-value(分割),这一步由MapReduce框架自动完成。

输出行的偏移量和行的内容

mapper: 分词输出

数据过滤,数据补全,字段格式化

输入:input的输出

将分割好的<key,value>对交给用户定义的map方法进行处理,生成新的<key,value>对。

一行调用一次map方法。

统计word中的map:

shuffle: 分区,分组,排序

输出:

<Bye,1>

<Hello,1>

<World,1,1>

得到map输出的<key,value>对,Mapper会将他们按照key进行排序,得到mapper的最终输出结果。

Reduce :每一条Keyvalue调用一次reduce方法

将相同Key的List<value>,进行相加求和

output:将reduce输出写入hdfs

原文地址:https://www.cnblogs.com/whcwkw1314/p/8971760.html

时间: 2024-10-14 09:21:04

MapReduce 编程模板编写【分析网站基本指标UV】程序的相关文章

JNI编程(一) —— 编写一个最简单的JNI程序

来自:http://chnic.iteye.com/blog/198745 忙了好一段时间,总算得了几天的空闲.貌似很久没更新blog了,实在罪过.其实之前一直想把JNI的相关东西整理一下的,就从今天开始吧.Here we go. JNI其实是Java Native Interface的简称,也就是java本地接口.它提供了若干的API实现了和Java和其他语言的通信(主要是C&C++).也许不少人觉得Java已经足够强大,为什么要需要JNI这种东西呢?我们知道Java是一种平台无关性的语言,平

JNI编程(一) —— 编写一个最简单的JNI程序(转载)

转自:http://chnic.iteye.com/blog/198745 忙了好一段时间,总算得了几天的空闲.貌似很久没更新blog了,实在罪过.其实之前一直想把JNI的相关东西整理一下的,就从今天开始吧.Here we go. JNI其实是Java Native Interface的简称,也就是java本地接口.它提供了若干的API实现了和Java和其他语言的通信(主要是C&C++).也许不少人觉 得Java已经足够强大,为什么要需要JNI这种东西呢?我们知道Java是一种平台无关性的语言,

Flume+Kafka+Storm+Redis构建大数据实时处理系统:实时统计网站PV、UV+展示

[TOC] 1 大数据处理的常用方法 前面在我的另一篇文章中<大数据采集.清洗.处理:使用MapReduce进行离线数据分析完整案例>中已经有提及到,这里依然给出下面的图示: 前面给出的那篇文章是基于MapReduce的离线数据分析案例,其通过对网站产生的用户访问日志进行处理并分析出该网站在某天的PV.UV等数据,对应上面的图示,其走的就是离线处理的数据处理方式,而这里即将要介绍的是另外一条路线的数据处理方式,即基于Storm的在线处理,在下面给出的完整案例中,我们将会完成下面的几项工作: 1

MapReduce编程实战之“高级特性”

本篇介绍MapReduce的一些高级特性,如计数器.数据集的排序和连接.计数器是一种收集作业统计信息的有效手段,排序是MapReduce的核心技术,MapReduce也能够执行大型数据集间的""连接(join)操作. 计数器 计数器是一种收集作业统计信息的有效手段,用于质量控制或应用级统计.计数器还可用于辅助诊断系统故障.对于大型分布式系统来说,获取计数器比分析日志文件容易的多. 示例一:气温缺失及不规则数据计数器 import java.io.IOException; import

MapReduce 编程 系列五 MapReduce 主要过程梳理

前面4篇文章介绍了如何编写一个简单的日志提取程序,读取HDFS share/logs目录下的所有csv日志文件,然后提取数据后,最终输出到share/output目录下. 本篇停留一下,梳理一下主要过程,然后提出新的改进目标. 首先声明一下,所有的代码都是maven工程的,没有使用任何IDE.  这是我一贯的编程风格,用Emacs + JDEE开发.需要使用IDE的只需要学习如何在IDE中使用maven即可. 可比较的序列化 第一个是序列化,这是各种编程技术中常用的.MapReduce的特别之处

MapReduce编程模型及优化技巧

(一)MapReduce 编程模型 (备注:如果你已经了解MapReduce 编程模型请直接进入第二部分MapReduce 的优化讲解) 在学习MapReduce 优化之前我们先来了解一下MapReduce 编程模型是怎样的? 下图中红色的标注表示没有加入Combiner和Partitioner来进行优化. 上图的流程大概分为以下几步. 第一步:假设一个文件有三行英文单词作为 MapReduce 的Input(输入),这里经过 Splitting 过程把文件分割为3块.分割后的3块数据就可以并行

MapReduce编程模型及其在Hadoop上的实现

转自:https://www.zybuluo.com/frank-shaw/note/206604 MapReduce基本过程 关于MapReduce中数据流的传输过程,下图是一个经典演示:  关于上图,可以做出以下逐步分析: 输入数据(待处理)首先会被切割分片,每一个分片都会复制多份到HDFS中.上图默认的是分片已经存在于HDFS中. Hadoop会在存储有输入数据分片(HDFS中的数据)的节点上运行map任务,可以获得最佳性能(数据TaskTracker优化,节省带宽). 在运行完map任务

[Hadoop入门] - 1 Ubuntu系统 Hadoop介绍 MapReduce编程思想

Ubuntu系统 (我用到版本号是140.4) ubuntu系统是一个以桌面应用为主的Linux操作系统,Ubuntu基于Debian发行版和GNOME桌面环境.Ubuntu的目标在于为一般用户提供一个最新的.同时又相当稳定的主要由自由软件构建而成的操作系统,它可免费使用,并带有社团及专业的支持应. 作为Hadoop大数据开发测试环境, 建议大家不要在windows上安装CgyWin来学习或研究, 直接用Vmware+ubuntu来学习. 下载 www.vmware.com这里下载vmware,

MapReduce编程(六) 从HDFS导入数据到Elasticsearch

一.Elasticsearch for Hadoop安装 Elasticsearch for Hadoop并不像logstash.kibana一样是一个独立的软件,而是Hadoop和Elasticsearch交互所需要的jar包.所以,有直接下载和maven导入2种方式.安装之前确保JDK版本不要低于1.8,Elasticsearch版本不能低于1.0. 官网对声明是对Hadoop 1.1.x.1.2.x.2.2.x.2.4.x.2.6.x.2.7.x测试通过,支持较好,其它版本的也并不是不能用