hadoop mapreduce开发实践之HDFS压缩文件(-cacheArchive)

1、分发HDFS压缩文件(-cacheArchive)

需求:wordcount(只统计指定的单词【the,and,had...】),但是该文件存储在HDFS上的压缩文件,压缩文件内可能有多个文件,通过-cacheArchive的方式进行分发;

-cacheArchive hdfs://host:port/path/to/file.tar.gz#linkname.tar.gz #选项在计算节点上缓存文件,streaming程序通过./linkname.tar.gz的方式访问文件。

思路:reducer程序都不需要修改,mapper需要增加用来读取压缩文件的函数(或模块),运行streaming的时候需要使用-cacheArchive 指定hdfs上的文件;

1.1、 streaming命令格式(-cacheArchive)

$HADOOP_HOME/bin/hadoop jar hadoop-streaming.jar     -jobconf mapred.job.name="streaming_cacheArchive_demo"     -jobconf mapred.job.priority=3     -jobconf mapred.compress.map.output=true     -jobconf mapred.map.output.compression_codec=org.apache.hadoop.io.compress.GzipCodec     -jobconf mapred.output.compress=true     -jobconf mapred.out.compression.codec=org.apache.hadoop.io.compress.GzipCodec     -input /input/     -output /output/     -mapper "python mapper.py whc.tar.gz"     -reducer "python reducer.py"     -cacheArchive "hdfs://master:9000/cache_file/wordwhite.tar.gz#whc.tar.gz"
    -file ./mapper.py     -file ./reducer.py 

1.2、mapper程序

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import os
import os.path
import sys

def getCachefile(filename):
    filelist = []
    if os.path.isdir(filename):
        for root, dirs, files, in os.walk(filename):
            for name in files:
                filepath = root + ‘/‘ + name
                filelist.append(filepath)
    return filelist

def readWordwhite(filename):
    wordset = set()

    for cachefile in getCachefile(filename):
        with open(cachefile, ‘r‘) as fd:
            for line in fd:
                word = line.strip()
                wordset.add(word)
    return wordset

def mapper(filename):
    wordset = readWordwhite(filename)

    for line in sys.stdin:
        line = line.strip()
        words = line.split()
        for word in words:
            if word != "" and (word in wordset):
                print "%s\t%s" %(word, 1)

if __name__ == "__main__":
    if sys.argv[1]:
        file_fd = sys.argv[1]
        mapper(file_fd)

1.3、 reducer程序

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import sys

def reducer():
    currentword = None
    wordsum = 0

    for line in sys.stdin:
        wordlist = line.strip().split(‘\t‘)
        if len(wordlist) < 2:
            continue
        word = wordlist[0].strip()
        wordvalue = wordlist[1].strip()

        if currentword == None:
            currentword = word
        if currentword != word:
            print "%s\t%s" %(currentword, str(wordsum))
            currentword = word
            wordsum = 0
        wordsum += int(wordvalue)

    print "%s\t%s" %(currentword, str(wordsum))

if __name__ == "__main__":
    reducer()

1.4、上传wordwhite.tar.gz

$ ls -R wordwhite
wordwhite:
wordwhite01  wordwhite02  wordwhite03
$ cat wordwhite/wordwhite0*
have
and
had
the
in
this
or
this
to
$ tar zcf wordwhite.tar.gz wordwhite
$ hadoop fs -put wordwhite.tar.gz hdfs://localhost:9000/input/cachefile/

1.5、 run_streaming程序

#!/bin/bash

HADOOP_CMD="/home/hadoop/app/hadoop/hadoop-2.6.0-cdh5.13.0/bin/hadoop"
STREAM_JAR_PATH="/home/hadoop/app/hadoop/hadoop-2.6.0-cdh5.13.0/share/hadoop/tools/lib/hadoop-streaming-2.6.0-cdh5.13.0.jar"

INPUT_FILE_PATH="/input/The_Man_of_Property"
OUTPUT_FILE_PATH="/output/wordcount/WordwhiteCacheArchiveFiletest"

$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_FILE_PATH

$HADOOP_CMD jar $STREAM_JAR_PATH                 -input $INPUT_FILE_PATH                 -output $OUTPUT_FILE_PATH                 -jobconf "mapred.job.name=wordcount_wordwhite_cacheArchivefile_demo"                 -mapper "python mapper.py WHF.gz"                 -reducer "python reducer.py"                 -cacheArchive "hdfs://localhost:9000/input/cachefile/wordwhite.tar.gz#WHF.gz"                 -file "./mapper.py"                 -file "./reducer.py"

1.6、执行程序

$ chmod +x run_streaming.sh
$ ./run_streaming.sh
rmr: DEPRECATED: Please use ‘rm -r‘ instead.
Deleted /output/wordcount/WordwhiteCacheArchiveFiletest
18/02/01 17:57:00 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
18/02/01 17:57:00 WARN streaming.StreamJob: -cacheArchive option is deprecated, please use -archives instead.
18/02/01 17:57:00 WARN streaming.StreamJob: -jobconf option is deprecated, please use -D instead.
18/02/01 17:57:00 INFO Configuration.deprecation: mapred.job.name is deprecated. Instead, use mapreduce.job.name
packageJobJar: [./mapper.py, ./reducer.py, /tmp/hadoop-unjar211766205758273068/] [] /tmp/streamjob9043244899616176268.jar tmpDir=null
18/02/01 17:57:01 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
18/02/01 17:57:01 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
18/02/01 17:57:03 INFO mapred.FileInputFormat: Total input paths to process : 1
18/02/01 17:57:03 INFO mapreduce.JobSubmitter: number of splits:2
18/02/01 17:57:04 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1516345010544_0030
18/02/01 17:57:04 INFO impl.YarnClientImpl: Submitted application application_1516345010544_0030
18/02/01 17:57:04 INFO mapreduce.Job: The url to track the job: http://localhost:8088/proxy/application_1516345010544_0030/
18/02/01 17:57:04 INFO mapreduce.Job: Running job: job_1516345010544_0030
18/02/01 17:57:11 INFO mapreduce.Job: Job job_1516345010544_0030 running in uber mode : false
18/02/01 17:57:11 INFO mapreduce.Job:  map 0% reduce 0%
18/02/01 17:57:20 INFO mapreduce.Job:  map 50% reduce 0%
18/02/01 17:57:21 INFO mapreduce.Job:  map 100% reduce 0%
18/02/01 17:57:27 INFO mapreduce.Job:  map 100% reduce 100%
18/02/01 17:57:28 INFO mapreduce.Job: Job job_1516345010544_0030 completed successfully
18/02/01 17:57:28 INFO mapreduce.Job: Counters: 49
    File System Counters
        FILE: Number of bytes read=113911
        FILE: Number of bytes written=664972
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=636501
        HDFS: Number of bytes written=68
        HDFS: Number of read operations=9
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=2
    Job Counters
        Launched map tasks=2
        Launched reduce tasks=1
        Data-local map tasks=2
        Total time spent by all maps in occupied slots (ms)=12584
        Total time spent by all reduces in occupied slots (ms)=4425
        Total time spent by all map tasks (ms)=12584
        Total time spent by all reduce tasks (ms)=4425
        Total vcore-milliseconds taken by all map tasks=12584
        Total vcore-milliseconds taken by all reduce tasks=4425
        Total megabyte-milliseconds taken by all map tasks=12886016
        Total megabyte-milliseconds taken by all reduce tasks=4531200
    Map-Reduce Framework
        Map input records=2866
        Map output records=14734
        Map output bytes=84437
        Map output materialized bytes=113917
        Input split bytes=198
        Combine input records=0
        Combine output records=0
        Reduce input groups=8
        Reduce shuffle bytes=113917
        Reduce input records=14734
        Reduce output records=8
        Spilled Records=29468
        Shuffled Maps =2
        Failed Shuffles=0
        Merged Map outputs=2
        GC time elapsed (ms)=390
        CPU time spent (ms)=3660
        Physical memory (bytes) snapshot=713809920
        Virtual memory (bytes) snapshot=8331399168
        Total committed heap usage (bytes)=594018304
    Shuffle Errors
        BAD_ID=0
        CONNECTION=0
        IO_ERROR=0
        WRONG_LENGTH=0
        WRONG_MAP=0
        WRONG_REDUCE=0
    File Input Format Counters
        Bytes Read=636303
    File Output Format Counters
        Bytes Written=68
18/02/01 17:57:28 INFO streaming.StreamJob: Output directory: /output/wordcount/WordwhiteCacheArchiveFiletest

1.7、 查看结果

$ hadoop fs -ls /output/wordcount/WordwhiteCacheArchiveFiletest
Found 2 items
-rw-r--r--   1 centos supergroup          0 2018-02-01 17:57 /output/wordcount/WordwhiteCacheArchiveFiletest/_SUCCESS
-rw-r--r--   1 centos supergroup         68 2018-02-01 17:57 /output/wordcount/WordwhiteCacheArchiveFiletest/part-00000
[[email protected] 3]$ hadoop fs -text /output/wordcount/WordwhiteCacheArchiveFiletest/part-00000
and 2573
had 1526
have    350
in  1694
or  253
the 5144
this    412
to  2782

以上就完成了分发HDFS上的压缩文件并指定单词的wordcount.

2、hadoop streaming 语法参考

原文地址:http://blog.51cto.com/balich/2067858

时间: 2024-08-28 11:55:54

hadoop mapreduce开发实践之HDFS压缩文件(-cacheArchive)的相关文章

hadoop mapreduce开发实践之HDFS文件分发by streaming

1.分发HDFS文件(-cacheFile) 需求:wordcount(只统计指定的单词),但是该文件非常大,可以先将该文件上传到hdfs,通过-cacheFile的方式进行分发: -cachefile hdfs://host:port/path/to/file#linkname #选项在计算节点上缓存文件,streaming程序通过./linkname的方式访问文件. 思路:mapper和reducer程序都不需要修改,只是在运行streaming的时候需要使用-cacheFile 指定hdf

Hadoop MapReduce开发最佳实践(上篇)

body{ font-family: "Microsoft YaHei UI","Microsoft YaHei",SimSun,"Segoe UI",Tahoma,Helvetica,Sans-Serif,"Microsoft YaHei", Georgia,Helvetica,Arial,sans-serif,宋体, PMingLiU,serif; font-size: 10.5pt; line-height: 1.5;}

在Windows上使用Eclipse配置Hadoop MapReduce开发环境

在Windows上使用Eclipse配置Hadoop MapReduce开发环境 1. 系统环境及所需文件 windows 8.1 64bit Eclipse (Version: Luna Release 4.4.0) hadoop-eclipse-plugin-2.7.0.jar hadoop.dll & winutils.exe 2. 修改Master节点的hdfs-site.xml 添加如下内容 <property> <name>dfs.permissions<

intellij idea hadoop mapreduce 开发调试

在idea中的hadoop程序开发(MAC或Linux) hadoop的安装(自己查) 新建一个java project 3.配置项目结构与依赖(project structure) 4.配置构件(artifacts):名称(name),类型(Type),构件时重新编译打包(Build on make),输出目录(Output directory),输出规划(Output Layout),选择当前模块的输出构件 5.编写代码(WordCount.java) 6.配置调试运行方式 7.新建一个运行

Hadoop第4周练习—HDFS读写文件操作

1    运行环境说明... 3 1.1     硬软件环境... 3 1.2     机器网络环境... 3 2    书面作业1:编译并运行<权威指南>中的例3.2. 3 2.1     书面作业1内容... 3 2.2     运行代码... 3 2.3     实现过程... 4 2.3.1   创建代码目录... 4 2.3.2   建立例子文件上传到hdfs中... 4 2.3.3   配置本地环境... 5 2.3.4   编写代码... 5 2.3.5   编译代码... 6

【hadoop】使用javaAPI对hdfs进行文件操作

前提:1.搭建好hadoop伪分布式环境:2.安装好eclipse: 注:修改 /etc/hosts 添加 “本机IP hadoop01” , 那么代码中创建hdfs文件系统的时候的URI hdfs://hadoop01:9000 相当于  hdfs://hadoop服务器ip(例如:192.168.1.1XX):9000 import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOExcep

Hadoop Mapreduce 中的FileInputFormat类的文件切分算法和host选择算法

文件切分算法 文件切分算法主要用于确定InputSplit的个数以及每个InputSplit对应的数据段. FileInputFormat以文件为单位切分成InputSplit.对于每个文件,由以下三个属性值确定其对应的InputSplit的个数. goalSize:根据用户期望的InputSplit数据计算,即totalSize/numSplit.totalSize为文件总大小:numSplit为用户设定的Map Task个数,默认情况下是1. minSize:InputSplit的最小值,由

MapReduce(十五): 从HDFS读取文件的源码分析

以Map任务读取文本数据为例: 1)   LineRecordReader负责对文件分割的定位,以及对读取每一行内容的封装供用户Map任务使用.每次在定位在文件中不为0的位置时,多读取一行,因为前一个处理该位置之前的数据时,会完整把该一行已经读取并处理. 2)   LineReader负责对所要访问文件输入流的数据进行每一行读取,只实现对每一行读取的逻辑. 3)   DFSDataInputStream封装了DFSInputStream的实现,直接调用DFSInputStream接口完成. 4)

深入浅出Hadoop实战开发(HDFS实战图片、MapReduce、HBase实战微博、Hive应用)

Hadoop是什么,为什么要学习Hadoop?     Hadoop是一个分布式系统基础架构,由Apache基金会开发.用户可以在不了解分布式底层细节的情况下,开发分布式程序.充分利用集群的威力高速运算和存储.Hadoop实现了一个分布式文件系统(Hadoop Distributed File System),简称HDFS.HDFS有着高容错性的特点,并且设计用来部署在低廉的(low-cost)硬件上.而且它提供高传输率(high throughput)来访问应用程序的数据,适合那些有着超大数据