大数据学习之七——MapReduce简单代码实例

1.关于MapReduce

MapReduce是一种可用于数据处理的编程模型,能够支持java、Python、C++等语言。MapReduce程序本质上是并行运行的,因此可以处理大规模数据集,这也是它的优势。

2.使用hadoop分析数据

hadoop提供了并行处理,我们将查询表示成MapReduce作业。

MapReduce任务过程分为两个处理阶段:map阶段和reduce阶段。每个阶段都以键/值作为输入和输出,并选择它们的类型。程序员还需要定义两个函数:map函数和reduce函数。

Java  MapReduce

我们需要三个东西:一个map函数,一个reduce函数和一些用来运行作业的代码。map函数由mapper接口实现。

Mapper接口是一个泛型类型,有四个形参,分别指定map函数的输入键、输入值、输出键和输出值的类型。这些类型均可在org.apache.hadoop.io包中找到。其中,LongWritable类型相当于java中的Long类型、Text类型相当于java中的String类型、IntWritable类型相当于java中的Integer类型。

在主函数中经常使用的类有:

FileOutputFormat类中的静态函数setOutputPath()来指定输出路径,该函数指定了reduce函数输出文件的写入目录。在运行任务前该目录不应该存在。接着通过setMapperClass()和setReducerClass()指定map和reduce类型。setOutputKeyClass()和setOutputValueClass()控制map和reduce函数的输出类型。输入的类型通过InputFormat类来控制,在设置定义map和reduce函数的类之后,JobClient类的静态函数runJob()会提交作业并等待完成,最后将其进展情况写到控制台。

3.统计单词数量代码实例

package mapreduce01;  //MapReduce工程名字

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

//单词计数

public class mytest {

static String INPUT_PATH="hdfs://master:9000/input/mr.txt";   //待统计的文件路径

static String OUTPUT_PATH="hdfs://master:9000/output/mr.txt";    //统计结果存放的路径

static class MyMapper extends Mapper <Object,Object,Text,IntWritable> {     //定义继承mapper类

protected void map(Object key, Object value, Context context) throws IOException, InterruptedException{    //定义map方法

String[] arr=value.toString().split(",");      //文件中的单词是以“,”分割的,并将每一行定义为一个数组

for(int i=0;i<arr.length;i++){      //遍历循环每一行,统计单词出现的数量

context.write(new Text(arr[i]),new IntWritable(1));

}

}

}

static class MyReduce extends Reducer<Text,IntWritable,Text,IntWritable>{     //定义继承reducer类

protected void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException,InterruptedException{      //定义reduce方法

int count=0;

for(IntWritable c:values){     //统计同一个单词的数量

count+=c.get();

}

IntWritable outValue=new IntWritable(count);

context.write(key,outValue);

}

}

public static void main(String[] args) throws Exception{    //main函数

Path outputpath=new Path(OUTPUT_PATH);    //输出路径

Configuration conf=new Configuration();

Job job=Job.getInstance(conf);     //定义一个job,启动任务

FileInputFormat.setInputPaths(job, INPUT_PATH);

FileOutputFormat.setOutputPath(job,outputpath);

job.setMapperClass(MyMapper.class);

job.setReducerClass(MyReduce.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

job.waitForCompletion(true);

}

}

4.统计去重代码实例

package mapreduce01;  //MapReduce工程名字

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

//单词去重

public class testquchong {

static String INPUT_PATH="hdfs://master:9000/quchong";   //待统计的文件

static String OUTPUT_PATH="hdfs://master:9000/quchong/qc";    //统计结果存放的路径

static class MyMapper extends Mapper<Object,Text,Text,Text>{

private static Text line=new Text();      //text相当于string

protected void map(Object key, Text value, Context context) throws IOException, InterruptedException{

line=value;

context.write(line,new Text(","));     //以“,”规定格式,空格不容易控制,统计key,因为key值是唯一的

}

}

static class MyReduce extends Reducer<Text,Text,Text,Text>{

protected void reduce(Text key,Iterable<Text> values,Context context) throws IOException,InterruptedException{

context.write(key,new Text(""));

}

}

public static void main(String[] args) throws Exception{

Path outputpath=new Path(OUTPUT_PATH);

Configuration conf=new Configuration();

Job job=Job.getInstance(conf);

job.setMapperClass(MyMapper.class);

job.setReducerClass(MyReduce.class);

job.setCombinerClass(MyReduce.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(Text.class);

FileInputFormat.setInputPaths(job, INPUT_PATH);

FileOutputFormat.setOutputPath(job,outputpath);

job.waitForCompletion(true);

}

}

原文地址:https://www.cnblogs.com/m-study/p/8366698.html

时间: 2024-11-10 19:10:53

大数据学习之七——MapReduce简单代码实例的相关文章

大数据学习之一——了解简单概念

1.大数据是什么?特点 大数据:是一种规模非常大的,在分析.管理.存储和获取等方面都超出了传统的数据库软件所具有的功能处理范围的巨大数据的调集. 特征:1.海量的数据规模(Volume) 2.数据类型多种多样(Variety) 3.快速的数据流转和动态的数据体系(Velocity) 4.巨大的数据价值(Value) 2.数据仓库是什么?Datawarehouse 数据仓库,英文名称Data Warehouse,是面向主题的.集成的.稳定的.面向时间的数据集合.是单个数据存储. 数据仓库中有OLT

大数据学习之MapReduce基础与Yarn集群安装09

1大数据解决的问题? 海量数据的存储:hadoop->分布式文件系统HDFS 海量数据的计算:hadoop->分布式计算框架MapReduce 2什么是MapReduce? 分布式程序的编程框架,java->ssh ssm ,目的:简化开发! 是基于hadoop的数据分析应用的核心框架. mapreduce的功能:将用户编写的业务逻辑代码和自带默认组件整合成一个完整的 分布式运算程序,并发的运行在hadoop集群上. 3 MapReduce的优缺点 优点: (1)易于编程 (2)良好的拓

大数据学习之八——MapReduce工作机制

1.MapReduce的特点 软件框架.并行处理.可靠且容错.大规模集群.海量数据集 2.mapper和reducer mapper负责"分":把复杂的任务分解为若干个"简单的任务"来处理.简单的任务包含三层含义: (1)数据或计算的规模相对原任务要大大缩小: (2)就近计算原则,任务会分配到存放着所需数据的节点上进行计算: (3)这些小任务可以并行计算,彼此间几乎没有依赖关系. reducer负责对map阶段的结果进行汇总. 3.MapReduce的工作机制 (1

大数据学习之MapReduce编程案例一单词计数 10

一:单词计数 1:单词计数总流程图 2:代码实现 1:Map阶段 package it.dawn.YARNPra.wc_hdfs; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapp

大数据学习系列之七 ----- Hadoop+Spark+Zookeeper+HBase+Hive集群搭建 图文详解

引言 在之前的大数据学习系列中,搭建了Hadoop+Spark+HBase+Hive 环境以及一些测试.其实要说的话,我开始学习大数据的时候,搭建的就是集群,并不是单机模式和伪分布式.至于为什么先写单机的搭建,是因为作为个人学习的话,单机已足以,好吧,说实话是自己的电脑不行,使用虚拟机实在太卡了... 整个的集群搭建是在公司的测试服务搭建的,在搭建的时候遇到各种各样的坑,当然也收获颇多.在成功搭建大数据集群之后,零零散散的做了写笔记,然后重新将这些笔记整理了下来.于是就有了本篇博文. 其实我在搭

大数据学习系列之七 ----- Hadoop+Spark+Zookeeper+HBase+Hive集

引言 在之前的大数据学习系列中,搭建了Hadoop+Spark+HBase+Hive 环境以及一些测试.其实要说的话,我开始学习大数据的时候,搭建的就是集群,并不是单机模式和伪分布式.至于为什么先写单机的搭建,是因为作为个人学习的话,单机已足以,好吧,说实话是自己的电脑不行,使用虚拟机实在太卡了... 整个的集群搭建是在公司的测试服务搭建的,在搭建的时候遇到各种各样的坑,当然也收获颇多.在成功搭建大数据集群之后,零零散散的做了写笔记,然后重新将这些笔记整理了下来.于是就有了本篇博文. 其实我在搭

好程序员大数据学习路线分享MAPREDUCE

好程序员大数据学习路线分享MAPREDUCE,需求:统计大量的文本文件中的单词出现的次数 1)整个运算需要分阶段 阶段一:并行局部运算 阶段二 :汇总处理,不同的阶段需要开发不同的程序 2)阶段之间的调用 3)业务程序(task程序)如何并发到集群并启动程序 4)如何监控task程序的运行状态,如何处理异常 ::这些问题是开发分布式程序都会面临的问题,完全可以封装成框架::MR 的结构 一个完整的MapReduce运行时有三类实例进程: 1)MRAppMaster : 负责整个程序的过程调度和状

大数据学习路线分享MapReduce全过程解析

大数据学习路线分享MapReduce全过程解析,移动数据与移动计算 在学习大数据的时候接触了移动数据和移动计算这两种联系紧密而又有很大不同的概念,其中移动计算也叫做本地计算. 在以前的数据处理中时使用的移动数据,其实就是将需要处理的数据传输到存放不同处理数据方式逻辑的各个节点上.这样做的效率很低,特别是大数据中的数据量是很大的,至少都是GB以上,更大的是TB.PB甚至更大,而且磁盘I/O.网络I/O的效率是很低的,这样处理起来就需要很长的时间,远远不能满足我们的要求.而移动计算就出现了. 移动计

史上最全“大数据”学习资源整理

史上最全"大数据"学习资源整理 当前,整个互联网正在从IT时代向DT时代演进,大数据技术也正在助力企业和公众敲开DT世界大门.当今"大数据"一词的重点其实已经不仅在于数据规模的定义,它更代表着信息技术发展进入了一个新的时代,代表着爆炸性的数据信息给传统的计算技术和信息技术带来的技术挑战和困难,代表着大数据处理所需的新的技术和方法,也代表着大数据分析和应用所带来的新发明.新服务和新的发展机遇. 为了帮助大家更好深入了解大数据,云栖社区组织翻译了GitHub Aweso