python 实现Hadoop的partitioner和二次排序

我们知道,一个典型的Map-Reduce过程包 括:Input->Map->Patition->Reduce->Output。Pation负责把Map任务输出的中间结果 按key分发给不同的Reduce任务进行处理。Hadoop 提供了一个非常实用的partitioner类KeyFieldBasedPartitioner,通过配置相应的参数就可以使用。通过 KeyFieldBasedPartitioner可以方便地实现二次排序。

使用方法:

      -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner

一般配合:

-D map.output.key.field.separator及-D num.key.fields.for.partition使用。

map.output.key.field.separator指定key内部的分隔符

num.key.fields.for.partition指定对key分出来的前几部分做partition而不是整个key

示例:

1. 编写map程序mapper.sh;reduce程序reducer.sh; 测试数据test.txt

view plain

mapper.sh:

#!/bin/sh  cat

reducer.sh:

#!/bin/sh  sort

test.txt内容:

1,2,1,1,1

1,2,2,1,1

1,3,1,1,1

1,3,2,1,1

1,3,3,1,1

1,2,3,1,1

1,3,1,1,1

1,3,2,1,1

1,3,3,1,1

2. 测试数据test.txt放入hdfs,运行map-reduce程序

view plain

$ hadoop streaming /

-D stream.map.output.field.separator=, /

-D stream.num.map.output.key.fields=4 /

-D map.output.key.field.separator=, /

-D num.key.fields.for.partition=2 /

-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner /

-input /app/test/test.txt  /

-output /app/test/test_result /

-mapper ./mapper.sh  /

-reducer ./reducer.sh /

-file mapper.sh /

-file reducer.sh /

-jobconf mapre.job.name="sep_test"

$ hadoop fs –cat /app/test/test_result/part-00003

1,2,1,1     1

1,2,2,1     1

1,2,3,1     1

$ hadoop fs –cat /app/test/test_result/part-00004

1,3,1,1     1

1,3,1,1     1

1,3,2,1     1

1,3,2,1     1

1,3,3,1     1

1,3,3,1     1

通过这种方式,就做到前4个字段是key,但是通过前两个字段进行partition的目的

注意:

-D map.output.key.field.separator=, /

这个分隔符使用TAB键貌似不管用

Hadoop Streaming 是一个工具, 代替编写Java的实现类,而利用可执行程序来完成map-reduce过程

工作流程 : 

InputFile --> mappers --> [Partitioner] --> reducers -->
outputFiles

理解 : 

1 输入文件,可以是指定远程文件系统内的文件夹下的 *

2 通过集群自己分解到各个PC上,每个mapper是一个可执行文件,相应的启动一个进程,来实现你的逻辑

3 mapper 的输入为标准输入,所以,任何能够支持标准输入的可执行的东西,c,c++(编译出来的可执行文件),python,......都可以作
为mapper 和 reducer mapper的输出为标准输出,如果有Partitioner,就给它,如果没有,它的输出将作为reducer的输入

4 Partitioner 为可选的项,二次排序,可以对结果进行分类打到结果文件里面,它的输入是mapper的标准输出,它的输出,将作为reducer的标准输入

5 reducer 同 mapper

6 输出文件夹,在远端文件不能重名

Hadoop Streaming

1 : hadoop-streaming.jar 的位置 : $HADOOP_HOME/contrib/streaming

官方上面关于hadoop-streaming 的介绍已经很详细了,而且也有了关于python的例子,我就不说了,这里总结下自己的经验

1 指定 mapper or reducer 的 task 官方上说要用 -jobconf 但是这个参数已经过时,不可以用了,官方说要用
-D, 注意这个-D是要作为最开始的配置出现的,因为是在maper 和 reducer 执行之前,就需要硬性指定好的,所以要出现在参数的最前面 ./bin/hadoop jar hadoop-0.19.2-streaming.jar -D .........-input ........ 类似这样,这样,即使你程序最后只指定了一个输出管道,但是还是会有你指定的task数量的结果文件,只不过多余的就是空的 实验以下 就知道了

2 关于二次排序,由于是用的streaming 所以,在可执行文件内,只能够处理逻辑,还有就是输出,当然我们也可以指定二次排序,但是由于是全部参数化,不是很灵活。比如:

10.2.3.40    1

11.22.33.33    1

www.renren.com 1

www.baidu.com    1

10.2.3.40    1

这样一个很规整的输入文件,需求是要把记录独立的ip和url的count 但是输出文件要分分割出来。

官方网站的例子,是指定 key 然后对key 指定 主-key 和 key 用来排序,而 主-key 用来二次排序,这样会输出你想要的东西, 但是对于上面最简单的需求,对于传递参数,我们如何做呢?

其实我们还是可以利用这一点,在我们mapper 里面,还是按照/t来分割key value 但是我们要给key指定一个主-key 用来给Partitioner
来实现二次排序,所以我们可以稍微处理下这个KEY,我们可以简单的判断出来ip 和 url 的区别,这样,我们就人为的加上一个主-key 我们在mapper里面,给每个key人为的加上一个"标签",用来给partitioner做 二次排序用,比如我们的mapper的输出是这样

D&10.2.3.40    1

D&11.22.33.33    1

W&www.renren.com 1

W&www.baidu.com    1

D&10.2.3.40    1

然后通过传递命令参数

-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner //指定要求二次排序
-jobconf map.output.key.field.separator=‘&‘ //这里如果不加两个单引号的话我的命令会死掉
-jobconf num.key.fields.for.partition=1 //这里指第一个 & 符号来分割,保证不会出错

这样我们就可以通过 partitioner 来实现二次排序了

在reducer里面,我们再把"标签"摘掉(不费吹灰之力)就可以做到悄无声息的完成二次排序了。

3: 关于模块化

(强调 : 没有在集群上测试,只在单机上做测试)

程序员最悲剧的就是不能代码复用,做这个也一样,用hadoop-streaming 也一样,要做到代码重用,是我第一个考虑的问题
当我看到 -file(详细可以看官方网站上的讲解) 的时候,我就想到利用这个东西,果然,我的在本机上建立了一个py模块,简单的一个函数
然后在我的mapper里面import 它,本地测试通过后,利用-file 把模块所在的问价夹用 -file moudle/* 这个参数,传入streaming
执行的结果毫无错误,这样,我们就可以抽象出来一些模块的东西,来实现我们模块化的需求

注 : 不要忘记 chmod +x *.py  将py 变成可执行的,不然不可以运行

代码 : 

1: 模块代码 mg.py 用来给 mapper 贴标签

def mgFunction(line):

if(line[0] >= ‘0‘ and line[0] <= ‘9‘):

return "D&" + line

return "W&" + line

2: mapper.py 

#!/usr/bin/env python

import sys

sys.path.append(‘/home/liuguoqing/Desktop/hadoop-0.19.2/moudle‘)

import mg

for line in sys.stdin:

line = mg.mgFunction(line)

line = line.strip()

#       print line

words = line.split()

print ‘%s\t%s‘ % (words[0], words[1])


3: reducer.py

#!/usr/bin/env python

import sys

user_login_day = {}

for line in sys.stdin:

line = line[2:]//去掉帽子

line = line.strip()

userid, day = line.split(‘\t‘, 1)

user_login_day[userid] = user_login_day.get(userid, 0) + 1

for uid in user_login_day.keys():

print ‘%s\t%d‘ % (uid, user_login_day[uid])

这样就实现了模块化的可以二次排序的hadoop-streaming

命令 

./bin/hadoop jar hadoop-0.19.2-streaming.jar \

#streaming jar

-D mapred.reduce.tasks=2  \

#指定2个reduce来处理

-input user_login_day-input2/*  \

#指定输入文件 可以用 dir/* 方式

-output user_login_day-output102

#指定输出文件夹

-mapper ~/Desktop/hadoop-0.19.2/python/mapper/get_user_login_day_back.py  \

#指定mapper 可执行文件 我用全路径,好像用相对路径会出错...

-reducer ~/Desktop/hadoop-0.19.2/python/reducer/get_user_login_day_back.py
\

#指定reducer 可执行文件 

-file ~/Desktop/hadoop-0.19.2/moudle/* \

#指定模块化的库文件 dir/* 模式

-partitioner
org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \此处报错-partitioner:
command not found

#指定 partitioner 参数为class

-jobconf map.output.key.field.separator=‘&‘ \

#指定 主-key 的分割符号为 ‘&‘

-jobconf num.key.fields.for.partition=1

#指定为第一个‘&’

[email protected]:~/Desktop/hadoop-0.19.2$
./bin/hadoop jar hadoop-0.19.2-streaming.jar -D mapred.reduce.tasks=2 -input user_login_day-input2/* -output user_login_day-output102 -mapper ~/Desktop/hadoop-0.19.2/python/mapper/get_user_login_day_back.py -reducer ~/Desktop/hadoop-0.19.2/python/reducer/get_user_login_day_back.py
-file ~/Desktop/hadoop-0.19.2/moudle/* -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner -jobconf map.output.key.field.separator=‘&‘ -jobconf num.key.fields.for.partition=1

10/01/24 03:19:15 WARN streaming.StreamJob: -jobconf option
is deprecated, please use -D instead.

packageJobJar: [/home/liuguoqing/Desktop/hadoop-0.19.2/moudle/mg.py,
/home/liuguoqing/Desktop/hadoop-0.19.2/moudle/mg.pyc, /tmp/hadoop-liuguoqing/hadoop-unjar6780057097425964518/] [] /tmp/streamjob3100401358387519950.jar tmpDir=null

10/01/24 03:19:15 INFO mapred.FileInputFormat: Total input
paths to process : 2

10/01/24 03:19:15 INFO streaming.StreamJob: getLocalDirs():
[/tmp/hadoop-liuguoqing/mapred/local]

10/01/24 03:19:15 INFO streaming.StreamJob: Running job:
job_201001221008_0065

10/01/24 03:19:15 INFO streaming.StreamJob: To kill this
job, run:

10/01/24 03:19:15 INFO streaming.StreamJob: /home/liuguoqing/Desktop/hadoop-0.19.2/bin/../bin/hadoop
job  -Dmapred.job.tracker=hdfs://localhost:9881 -kill job_201001221008_0065

10/01/24 03:19:15 INFO streaming.StreamJob: Tracking URL:
http://localhost:50030/jobdetails.jsp?jobid=job_201001221008_0065

10/01/24 03:19:16 INFO streaming.StreamJob:  map 0%  reduce
0%

10/01/24 03:19:17 INFO streaming.StreamJob:  map 33%  reduce
0%

10/01/24 03:19:18 INFO streaming.StreamJob:  map 67%  reduce
0%

10/01/24 03:19:19 INFO streaming.StreamJob:  map 100%  reduce
0%

10/01/24 03:19:27 INFO streaming.StreamJob:  map 100%  reduce
50%

10/01/24 03:19:32 INFO streaming.StreamJob:  map 100%  reduce
100%

10/01/24 03:19:32 INFO streaming.StreamJob: Job complete:
job_201001221008_0065

10/01/24 03:19:32 INFO streaming.StreamJob: Output: user_login_day-output102

[email protected]:~/Desktop/hadoop-0.19.2$
./bin/hadoop dfs -ls user_login_day-output102

Found 3 items

drwxr-xr-x   - liuguoqing supergroup          0
2010-01-24 03:19 /user/liuguoqing/user_login_day-output102/_logs

-rw-r--r--   1 liuguoqing supergroup         25
2010-01-24 03:19 /user/liuguoqing/user_login_day-output102/part-00000

-rw-r--r--   1 liuguoqing supergroup         47
2010-01-24 03:19 /user/liuguoqing/user_login_day-output102/part-00001

[email protected]:~/Desktop/hadoop-0.19.2$
./bin/hadoop dfs -cat user_login_day-output102/part-00000

54321    2

99999    1

12345    12

[email protected]:~/Desktop/hadoop-0.19.2$
./bin/hadoop dfs -cat user_login_day-output102/part-00001

http://www.renren.com    3

http://www.baidu.com    3

以上为操作结果显示

python 实现Hadoop的partitioner和二次排序,布布扣,bubuko.com

时间: 2024-11-18 13:50:03

python 实现Hadoop的partitioner和二次排序的相关文章

Hadoop学习之自定义二次排序

一.概述    MapReduce框架对处理结果的输出会根据key值进行默认的排序,这个默认排序可以满足一部分需求,但是也是十分有限的.在我们实际的需求当中,往 往有要对reduce输出结果进行二次排序的需求.对于二次排序的实现,本文将通过一个实际的MapReduce二次排序例子讲述 二次排序的实现和其MapReduce的整个处理流程,并且通过结果和map.reduce端的日志来验证所描述的处理流程的正确性. 二.需求描述 1.输入数据: sort1    1 sort2    3 sort2 

hadoop编程小技巧(9)---二次排序(值排序)

代码测试环境:Hadoop2.4 应用场景:在Reducer端一般是key排序,而没有value排序,如果想对value进行排序,则可以使用此技巧. 应用实例描述: 比如针对下面的数据: a,5 b,7 c,2 c,9 a,3 a,1 b,10 b,3 c,1 如果使用一般的MR的话,其输出可能是这样的: a 1 a 3 a 5 b 3 b 10 b 7 c 1 c 9 c 2 从数据中可以看到其键是排序的,但是其值不是.通过此篇介绍的技巧可以做到下面的输出: a 1 a 3 a 5 b 3 b

Hadoop---mapreduce排序和二次排序以及全排序

自己学习排序和二次排序的知识整理如下. 1.Hadoop的序列化格式介绍:Writable 2.Hadoop的key排序逻辑 3.全排序 4.如何自定义自己的Writable类型 5.如何实现二次排序 1.Hadoop的序列化格式介绍:Writable 要了解和编写MR实现排序必须要知道的第一个知识点就是Writable相关的接口和类,这些是HADOOP自己的序列化格式.更多的可能是要关注他的Subinterfaces:WritableComparable<T>.他是继承Writable和Co

Hadoop二次排序及MapReduce处理流程实例详解

一.概述 MapReduce框架对处理结果的输出会根据key值进行默认的排序,这个默认排序可以满足一部分需求,但是也是十分有限的,在我们实际的需求当中,往往有要对reduce输出结果进行二次排序的需求.对于二次排序的实现,网络上已经有很多人分享过了,但是对二次排序的实现原理及整个MapReduce框架的处理流程的分析还是有非常大的出入,而且部分分析是没有经过验证的.本文将通过一个实际的MapReduce二次排序的例子,讲述二次排序的实现和其MapReduce的整个处理流程,并且通过结果和Map.

Hadoop.2.x_高级应用_二次排序及MapReduce端join

一.对于二次排序案例部分理解 1. 分析需求(首先对第一个字段排序,然后在对第二个字段排序) 杂乱的原始数据 排序完成的数据 a,1 a,1 b,1 a,2 a,2 [排序] a,100 b,6 ===> b,-3 c,2 b,-2 b,-2 b,1 a,100 b,6 b,-3 c,-7 c,-7 c,2 2. 分析[MapRedice过程] 1> 分析数据传入通过input()传入map() 2> map()对数据进行层层过滤,以达到我们想要的数据源, 3> 过滤方法中可添加自

Hadoop二次排序的其他写法

二次排序原理 在map阶段,使用job.setInputFormatClass定义的InputFormat将输入的数据集分割成小数据块splites,同时InputFormat提供一个RecordReder的实现. 本例子中使用的是TextInputFormat,他提供的RecordReader会将文本的字节偏移量作为key,这一行的文本作为value. 这就是自定义Map的输入是<LongWritable, Text>的原因.然后调用自定义Map的map方法,将一个个<LongWrit

结合手机上网流量业务来说明Hadoop中的二次排序机制,分区机制

本篇博客将结合手机上网流量业务来详细介绍Hadoop的二次排序机制.分区机制,先介绍一下业务场景: 先介绍一下业务场景:统计每个用户的上行流量和,下行流量和,以及总流量和. 本次描述所用数据: 日志格式描述: 日志flowdata.txt中的具体数据: 首先我们先通过mapreduce程序实现上面的业务逻辑: 代码实现: package FlowSum; import java.io.DataInput; import java.io.DataOutput; import java.io.IOE

Hadoop Mapreduce分区、分组、二次排序

1.MapReduce中数据流动   (1)最简单的过程:  map - reduce   (2)定制了partitioner以将map的结果送往指定reducer的过程: map - partition - reduce   (3)增加了在本地先进性一次reduce(优化)过程: map - combin(本地reduce) - partition -reduce2.Mapreduce中Partition的概念以及使用.(1)Partition的原理和作用        得到map给的记录后,

一起学Hadoop——二次排序算法的实现

二次排序,从字面上可以理解为在对key排序的基础上对key所对应的值value排序,也叫辅助排序.一般情况下,MapReduce框架只对key排序,而不对key所对应的值排序,因此value的排序经常是不固定的.但是我们经常会遇到同时对key和value排序的需求,例如Hadoop权威指南中的求一年的高高气温,key为年份,value为最高气温,年份按照降序排列,气温按照降序排列.还有水果电商网站经常会有按天统计水果销售排行榜的需求等等,这些都是需要对key和value同时进行排序.如下图所示: