云计算(一)——使用 Hadoop Mapreduce 进行数据处理

使用 Hadoop Mapreduce 进行数据处理

1. 综述

  使用HDP(下载: http://zh.hortonworks.com/products/releases/hdp-2-3/#install)搭建环境,进行分布式数据处理。

  项目文件下载,解压文件后将看到项目文件夹。该程序将读取 cloudMR/internal_use/tmp/dataset/titles 目录下的四个文本文件, 文件中的每一行文本都是来自于 wikipedia 的一个标题, 读取每个标题,并使用 cloudMR/internal_use/tmp/dataset/misc/delimiters.txt 中指定的特殊符号分割标题成独立单词,然后将单词转换为全小写,然后将出现在
cloudMR/internal_use/tmp/dataset/misc/stopwords.txt 中的单词全部删除,最后统计剩余单词的出现次数,并输出。

  程序的编译过程需要在“~/.bashrc”文件内定义自己的环境变量“$hadoop_CLASSPATH”,在“~/.bashrc”文件中添加一行:  

export hadoop_CLASSPATH="/usr/hdp/2.3.2.0-2950/hadoop/conf:/usr/hdp/2.3.2.0-2950/hadoop/conf:/usr/hdp/2.3.2.0-2950/hadoop/conf:/usr/hdp/2.3.2.0-2950/hadoop/lib/*:/usr/hdp/2.3.2.0-2950/hadoop/.//*:/usr/hdp/2.3.2.0-2950/hadoop-hdfs/./:/usr/hdp/2.3.2.0-2950/hadoop-hdfs/lib/*:/usr/hdp/2.3.2.0-2950/hadoop-hdfs/.//*:/usr/hdp/2.3.2.0-2950/hadoop-yarn/lib/*:/usr/hdp/2.3.2.0-2950/hadoop-yarn/.//*:/usr/hdp/2.3.2.0-2950/hadoop-mapreduce/lib/*:/usr/hdp/2.3.2.0-2950/hadoop-mapreduce/.//*:::/usr/share/java/mysql-connector-java-5.1.17.jar:/usr/share/java/mysql-connector-java-5.1.31-bin.jar:/usr/share/java/mysql-connector-java.jar:/usr/hdp/2.3.2.0-2950/tez/*:/usr/hdp/2.3.2.0-2950/tez/lib/*:/usr/hdp/2.3.2.0-2950/tez/conf:/usr/hdp/current/hadoop-yarn-client/.//*:/usr/hdp/current/hadoop-yarn-client/lib/*"

2. 运行过程

Step (1): 将项目文件夹放入HDP虚拟机,进入cloudMR文件夹,运行下列命令启动:

./start.sh

要求输入账号,随意输入10位数字即可。再运行下列命令检查 hadoop 是否正常运行:

hadoop version

Step (2): 编写 TitleCount.java 文件,完成相应功能。完成后的 TitleCount.java 如下:

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.*;
import java.util.*;
/**
 * Classic "Word Count"
 */
public class TitleCount extends Configured implements Tool {
    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new TitleCount(), args);
        System.exit(res);
    }

    @Override
    public int run(String[] args) throws Exception {
        Job job = Job.getInstance(this.getConf(), "Title Count");
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setMapperClass(TitleCountMap.class);
        job.setReducerClass(TitleCountReduce.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setJarByClass(TitleCount.class);
        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static String readHDFSFile(String path, Configuration conf) throws IOException{
        Path pt=new Path(path);
        FileSystem fs = FileSystem.get(pt.toUri(), conf);
        FSDataInputStream file = fs.open(pt);
        BufferedReader buffIn=new BufferedReader(new InputStreamReader(file));

        StringBuilder everything = new StringBuilder();
        String line;
        while( (line = buffIn.readLine()) != null) {
            everything.append(line);
            everything.append("\n");
        }
        return everything.toString();
    }

    public static class TitleCountMap extends Mapper<Object, Text, Text, IntWritable> {
        Set<String> stopWords = new HashSet<String>();
        String delimiters;

        @Override
        protected void setup(Context context) throws IOException,InterruptedException {

            Configuration conf = context.getConfiguration();

            String delimitersPath = conf.get("delimiters");
            delimiters = readHDFSFile(delimitersPath, conf);

            String stopWordsPath = conf.get("stopwords");
            List<String> stopWordsList = Arrays.asList(readHDFSFile(stopWordsPath, conf).split("\n"));
            for(String e : stopWordsList){
                stopWords.add(e);
            }
        }

        @Override
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            StringTokenizer stk = new StringTokenizer(value.toString(),delimiters);
            while(stk.hasMoreTokens()){
                String e = stk.nextToken().trim().toLowerCase();
                if(stopWords.contains(e) == false){
                    context.write(new Text(e),new IntWritable(1));
                }
            }
        }
    }

    public static class TitleCountReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
        @Override
        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for(IntWritable e : values){
                sum += e.get();
            }
            context.write(key, new IntWritable(sum));
        }
    }
}

Step (3): 编译java源文件。为了方便,在cloudMR文件夹中新建output文件夹,用来存放编译生成的.class文件。使用以下命令(在cloudMR文件夹下执行):

mkdir output
javac -classpath $Hadoop_CLASSPATH -d output TitleCount.java

进入output文件夹会看到3个.class文件。

Step (4): 将编译生成的类文件打包。

首先在 cloudMR 文件夹下新建文本文件 manifest.mf,使用以下命令(在 cloudMR 文件夹下执行):

touch manifest.mf

编辑内容为

Main-Class: TitleCount.class

manifest.mf 内是一些关于这个包的信息,这里定义了主类。

再使用下面命令打包(在cloudMR文件夹下执行):

jar cvfM TitleCount.jar manifest.mf -C output/ .

这条命令的含义是;

jar        打包命令

cvfM       

TitleCount.jar   打成的包的名字

manifest.mf     将这个文件打进包里

-C        -C之后的文件夹内的所有文件打进包里

output/      将output文件夹内的文件全部打进包里

.          打成的包TitleCount.jar放在当前文件夹

注意:打包过程很重要而且易错,请务必按照上文所述步骤进行。

Step (5): 将 TitleCount.jar 发布。

在发布(yarn)之前,还要完成准备工作。

将相关文件:cloudMR/internal_use/tmp/dataset/titles 目录下的四个文本文件、cloudMR/internal_use/tmp/dataset/misc/delimiters.txt 、
cloudMR/internal_use/tmp/dataset/misc/stopwords.txt 上传到hdfs.

在 hdfs 的 /user/root/ 文件夹内新建 data 文件夹,将 delimiters.txt、stopwords.txt 放入 data 文件夹,再在 data 文件夹中新建 titles 文件夹,将cloudMR/internal_use/tmp/dataset/titles 目录下的四个文本文件放入 titles 文件夹。

介绍相关的命令:

hadoop fs -ls                                         列出hdfs目录,由于没有参数,列出的是当前用户的主目录

hadoop fs -ls /                                         列出hdfs根目录

hadoop fs -mkdir data                                       在默认目录下新建data目录

hadoop fs -mkdir data/titles                               在data目录中新建 titles目录

hadoop fs -copyFromLocal ./abc.txt  data           上传当前目录(本地)中的 abc.txt 到 hdfs 上的 data 目录

  之后便可以发布了,使用命令:  

yarn jar TitleCount.jar TitleCount -D delimiters="/user/root/data/delimiters.txt" -D stopwords="/user/root/data/stopwords.txt" data/titles output

这条命令的含义是;

yarn                              发布内容

jar                                要发布的内容为jar包

TitleCount.jar                发布的内容

TitleCount                 TitleCount.jar的入口

-D delimiters="/user/root/data/delimiters.txt" -D stopwords="/user/root/data/stopwords.txt"       -D后跟参数,这里定义了两个参数

data/titles                     输入文件夹,其内的文件作为Map 的输入

output                          输出文件存放的位置

  yarn 命令执行完毕后,即可查看运行结果。

时间: 2024-08-26 01:00:35

云计算(一)——使用 Hadoop Mapreduce 进行数据处理的相关文章

大数据云计算高级实战Hadoop,Flink,Spark,Kafka,Storm,Docker高级技术大数据和Hadoop技能

大数据和Hadoop技能可能意味着有你的梦想事业和被遗忘之间的差异.骰子引用:“技术专业人员应该志愿参与大数据项目,这使他们对目前的雇主更有价值,对其他雇主更有销路.” 1.与Hadoop的职业:根据福布斯2015年的一份报告,约有90%的全球性组织报告了中高级别的大数据分析投资,约三分之一的投资者称其投资“非常重要”.最重要的是,约三分之二的受访者表示,数据和分析计划对收入产生了重大的可衡量的影响. Hadoop技能是需求的 - 这是不可否认的事实!因此,IT专业人士迫切需要使用 Hadoop

Hadoop MapReduce原理及实例

MapReduce是用于数据处理的一种编程模型,简单但足够强大,专门为并行处理大数据而设计. 1. 通俗理解MapReduce MapReduce的处理过程分为两个步骤:map和reduce.每个阶段的输入输出都是key-value的形式,key和value的类型可以自行指定.map阶段对切分好的数据进行并行处理,处理结果传输给reduce,由reduce函数完成最后的汇总. 例如从大量历史数据中找出往年最高气温,NCDC公开了过去每一年的所有气温等天气数据的检测,每一行记录一条观测记录,格式如

Hadoop MapReduce编程 API入门系列之Crime数据分析(二十五)(未完)

不多说,直接上代码. 一共12列,我们只需提取有用的列:第二列(犯罪类型).第四列(一周的哪一天).第五列(具体时间)和第七列(犯罪场所). 思路分析 基于项目的需求,我们通过以下几步完成: 1.首先根据数据集,分别统计出不同犯罪类别在周时段内发生犯罪次数和不同区域在周时段内发生犯罪的次数. 2.然后根据第一步的输出结果,再按日期统计出每天每种犯罪类别在每个区域发生的犯罪次数. 3.将前两步的输出结果,按需求插入数据库,便于对犯罪数据的分析. 程序开发 我们要编写5个文件: 编写基类,MapRe

Hadoop MapReduce例子-新版API多表连接Join之模仿订单配货

文章为作者原创,未经许可,禁止转载.    -Sun Yat-sen University 冯兴伟 一.    项目简介: 电子商务的发展以及电商平台的多样化,类似于京东和天猫这种拥有过亿用户的在线购物网站,每天要处理的订单数堪称海量,更别提最近的双十一购物节,如此海量的订单数据阿里巴巴和京东是如何准确将用户信息和其订单匹配并配货的呢?答案是数据连接匹配.我的云计算项目idea也是来源于此.我们在做数据分析时常要连接从不同的数据源中获取到的数据,单机模式下的关系型数据库中我们会遇到这问题,同样在

Hadoop,MapReduce操作Mysql

前以前帖子介绍,怎样读取文本数据源和多个数据源的合并:http://www.cnblogs.com/liqizhou/archive/2012/05/15/2501835.html 这一个博客介绍一下MapReduce怎样读取关系数据库的数据,选择的关系数据库为MySql,因为它是开源的软件,所以大家用的比较多.以前上学的时候就没有用过开源的软件,直接用盗版,也相当与免费,且比开源好用,例如向oracle,windows7等等.现在工作了,由于公司考虑成本的问题,所以都用成开源的,ubuntu,

Hadoop - MapReduce MRAppMaster-剖析

一 概述 MRv1主要由编程模型(MapReduce API).资源管理与作业控制块(由JobTracker和TaskTracker组成)和数据处理引擎(由MapTask和ReduceTask组成)三部分组成.而YARN出现之后,资源管理模块则交由YARN实现,这样为了让MapReduce框架运行在YARN上,仅需要一个ApplicationMaster组件完成作业控制模块功能即可,其它部分,包括编程模型和数据处理引擎等,可直接采用MRv1原有的部分. 二 MRAppMaster组成 MRApp

Hadoop MapReduce链式实践--ChainReducer

版本:CDH5.0.0,HDFS:2.3.0,Mapreduce:2.3.0,Yarn:2.3.0. 场景描述:求一组数据中按照不同类别的最大值,比如,如下的数据: data1: A,10 A,11 A,12 A,13 B,21 B,31 B,41 B,51 data2: A,20 A,21 A,22 A,23 B,201 B,301 B,401 B,501 最后输出为: A,23 B,501 假如这样的逻辑的mapreduce数据流如下: 假设C组数据比较多,同时假设集群有2个节点,那么这个任

【Big Data - Hadoop - MapReduce】初学Hadoop之图解MapReduce与WordCount示例分析

Hadoop的框架最核心的设计就是:HDFS和MapReduce.HDFS为海量的数据提供了存储,MapReduce则为海量的数据提供了计算. HDFS是Google File System(GFS)的开源实现. MapReduce是Google MapReduce的开源实现. HDFS和MapReduce实现是完全分离的,并不是没有HDFS就不能MapReduce运算. 本文主要参考了以下三篇博客学习整理而成. 1. Hadoop示例程序WordCount详解及实例 2. hadoop 学习笔

Hadoop MapReduce编程 API入门系列之压缩和计数器(三十)

不多说,直接上代码. Hadoop MapReduce编程 API入门系列之小文件合并(二十九) 生成的结果,作为输入源. 代码 package zhouls.bigdata.myMapReduce.ParseTVDataCompressAndCounter; import java.net.URI; import java.util.List;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Co