用Map-Reduce的思维处理数据

  在很多人的眼里,Map-Reduce等于Hadoop,没有Hadoop谈Map-Reduce犹如自上谈兵,实则不然,Map-Reduce是一种计算模型,只是非常适合在并行的环境下运行,Hadoop是Map-Reduce的一种实现,没有Hadoop照样可以跑Map-Reduce程序。python就内置有map()和reduce方法(虽然与hadoop的map-reduce有区别)。

  这篇文章主要介绍如何用python在linux的管道进行map-reduce编程,本文写的所有map-reduce程序都可以原封不动的放在Hadoop下运行,关于用Hadoop
Streaming调用python的map-reduce程序,可以参考这篇文章(非常详细,推荐看看),本文主要通过几个实例来讲解如何用map-reduce来处理具体的问题。

一、python的内置函数map reduce

  首先来看看python的两个内置函数map()和reduce()

1、map  

map(function, iterable, ...)

  map对迭代器的每个元素调用function函数,在Python2中,将得到的结果组成一个list,而在python3中结果是一个迭代器。function的参数只有一个,即为迭代器的元素,看一个实例:

>>> d = [1,2,3,4,5]
>>> m = map(lambda x:x*2,d)
>>> print m
[2, 4, 6, 8, 10]

  这里的map可以理解为一种映射,将迭代器中的元素经过function映射到另一个值。

2、reduce

reduce(function, iterable[, initializer])

  reduce可以理解为规约,它对迭代器从左到右累计调用function,function有两个参数,第一个是之前的累积值,第二个当前规约的元素,实例:

>>> d = [1,2,3,4,5]
>>> r = reduce(lambda x,y:x+y,d)
>>> r
15

  利用reduce定义一个求前n个整数之和的函数


>>> def accuPlus(n):
return reduce(lambda x,y:x+y,range(1,n+1))

>>> print accuPlus(10)
55
>>> print accuPlus(500)
125250
>>>

  上面介绍的python的两个内置函数与我接下来要介绍的map-reduce还是有一些区别,适用于分布式下的map-redece主要是以键值对的形式处理数据,map阶段发射很多键值对出去,然后按键排序,而reduce则对键相同的键值对进行处理,也可认为是一种规约。

二、用map-reduce做词频统计

  词频统计可以说是map-reduce的Hello
World程序,它简单明了,却描述了map-reduce的基本原理,统计中文的词频得先要分词,目前有一些免费的分词软件,中科院和哈工大的都还不错,现在我们有下面这样一篇已经分好词的文章(data.txt)需要进行词频统计:

?





1

双子座 的 你 是 这样 的 吗 1 手机 不 离 身 睡觉 不 关机 2 对待 不同 的 人 有 不同 的 性格 3 从 小 懂得 很多 道理 但 知 行 往往 难以 合 一 4 有 时候 很 神经 有时候 很 镇静 5 会 因为 别人 一 句 话 伤心 但 不 会 被 发现 6 很 会 安慰 别人 却 不 会 安慰 自己 7 会 经常 怀念 从 前

  定义map的处理(mapper.py)如下:


 1 #!encoding=utf-8
2
3 from sys import stdin
4
5 def mapprint(word):
6 ‘‘‘定义map函数对元素的处理方式,这里直接打印‘‘‘
7 print word
8
9 #对每行进行统计
10 for line in stdin:
11 words = line.strip().split(‘ ‘)
12 map(mapprint,words)

  mapper.py从标准输入流中对每一行进行处理,然后将每个单词打印到标准输出,输出之后需要对词进行排序,才能被reduce处理,利用管道我们来看看mapper.py对data.txt进行处理:

$ cat data.txt |python mapper.py |sort

  输出的部分结果如下:


  • 安慰
    安慰

    别人
    别人




    不同
    不同




    道理

  接下来我们看看reducer.py,mapper的输出排序后,相同的元素会出现在一起,我们可以从上往下扫描所有的元素,并维持一个计数器,遇到相同的词就把计数器加1,遇到不相同的词,就说明上一个词已经统计完毕了,可以将这个词及其词频输出到标准输出:


 1 #!encoding=utf-8
2 from sys import stdin
3
4 last = None
5 count = 0
6 for line in stdin:
7 word = line.strip()
8 if word != last:#遇到不同的词
9 if last:
10 print ‘%s\t%d‘ % (last,count)
11 last = word
12 count = 0
13 count += 1
14 #输出最后一个词
15 if last:
16 print ‘%s\t%d‘ % (last,count)
17

  结合map和reduce,整个词频统计执行如下:

$ cat data.txt |python mapper.py | sort |python reducer.py

  输出的词频结果:


1    1
2 1
3 1
4 1
5 1
6 1
7 1
安慰 2
被 1
别人 2
不 4
不同 2
从 2
但 2
道理 1
的 4
懂得 1
对待 1
发现 1
关机 1
合 1
很 3
很多 1
话 1
怀念 1
会 5
经常 1
句 1
离 1
吗 1
难以 1
你 1
前 1
却 1
人 1
伤心 1
身 1
神经 1
时候 1
是 1
手机 1
双子座 1
睡觉 1
往往 1
小 1
行 1
性格 1
一 2
因为 1
有 2
有时候 1
这样 1
镇静 1
知 1
自己 1

  为什么要把这个问题复杂化,完全可以用一个程序来处理统计词频这事,看下面的count.py


 1 from collections import defaultdict
2 from sys import stdin
3
4 m = defaultdict(int)
5 for line in stdin:
6 words = line.strip().split(‘ ‘)
7 for word in words:
8 m[word] += 1
9
10 for word,count in m.items():
11 print ‘%s\t%s‘ % (word,count)

  有两个理由让我偏向于使用map-reduce作数据处理:

  • 首先map-reduce的思路清晰,它将一个问题分成两步,map阶段我们只需要把词打印到标准输出,再经过排序进入reduce阶段,reduce对相同的词进行累加;

  • map-reduce的程序可以原封不动的在Hadoop上跑起来,大规模的数据可以完爆count.py,现在的编程应该倾向于人的易理解性与扩展性,而不是在小数据上提升那么一丁点性能。

二、用map-reduce做多表等值连接

  假设我们有这样一份某app的日志文件app.log,包含appId、IMEI、userInfo这三个字段(这里的讲述表的等值连接不牵涉到userInfo字段,所以该字段都置为U):


appId      IMEI         userInfo
8111111 I86733062 U1
8111112 I86733010 U2
8111113 I86733048 U3
8111114 I86733012 U4
8111115 I86733020 U5
8111116 I86733063 U6
8111117 I86733042 U7
8111118 I86733022 U8
8111119 I86733016 U9
8111120 I86733027 U10

  现在我们想挖掘更多的用户信息,得到了一份微博的数据,微博的数据weibo.log包含weiboId、IMEI、weiboInfo(同意我们将weibInfo置为W):


weiboId    IMEI    weiboInfo
1287680 I86733017 W1
1287681 I86733048 W2
1287682 I86733015 W3
1287683 I86733047 W4
1287684 I86733020 W5
1287685 I86733051 W6
1287686 I86733022 W7
1287687 I86733036 W8

  两个表都有一个共同的字段,即手机的IMEI序列号,通过微博的IMEI与我们app日志文件IMEI合并,就可以找出一些app用户的微博信息,进行进一步的分析,现在我们需要的是将既在app.log又在weibo.log中的用户找出来。

  合并app.log和weibo.log到aw.log,合并了之后是方便于map的处理,但我们需要标记每条记录来自于哪个表,我们用A标准来自于app.log,用W标志weibo.log:

$ awk ‘NR > 1 {printf "A\t%s\n",$0}‘ app.log > aw.log

$ awk ‘NR > 1 {printf "W\t%s\n",$0}‘ weibo.log >> aw.log

  这样合并之后的aw.log如下:


A       8111111 I86733062       U1
A 8111112 I86733010 U2
A 8111113 I86733048 U3
A 8111114 I86733012 U4
A 8111115 I86733020 U5
A 8111116 I86733063 U6
A 8111117 I86733042 U7
A 8111118 I86733022 U8
A 8111119 I86733016 U9
A 8111120 I86733027 U10
W 1287680 I86733017 W1
W 1287681 I86733048 W2
W 1287682 I86733015 W3
W 1287683 I86733047 W4
W 1287684 I86733020 W5
W 1287685 I86733051 W6
W 1287686 I86733022 W7
W 1287687 I86733036 W8

  现在我们来写map的mapper.py,map的输出应该以IMEI作为key,其他字段作为value,这样经过排序后,才能保证在reduce阶段,IMEI相同的app.log记录和weibo.log记录能够连续出现,mapper.py实现如下:


1 #!encoding=utf-8
2 from sys import stdin
3
4 for line in stdin:
5 data = line.strip().split(‘\t‘)
6 if len(data) != 4:#过滤掉错误行
7 continue
8 #把IMEI放在最前面,以便以IMEI排序
9 print ‘%s\t%s\t%s\t%s‘ % (data[2],data[0],data[1],data[3])

  mapper输出并排序下:


$ cat aw.log |python mapper.py |sort -k1
I86733010 A 8111112 U2
I86733012 A 8111114 U4
I86733015 W 1287682 W3
I86733016 A 8111119 U9
I86733017 W 1287680 W1
I86733020 A 8111115 U5
I86733020 W 1287684 W5
I86733022 A 8111118 U8
I86733022 W 1287686 W7
I86733027 A 8111120 U10
I86733036 W 1287687 W8
I86733042 A 8111117 U7
I86733047 W 1287683 W4
I86733048 A 8111113 U3
I86733048 W 1287681 W2
I86733051 W 1287685 W6
I86733062 A 8111111 U1
I86733063 A 8111116 U6

  接下来处理reduce了,reduce从流中不断扫描行,当遇到相同的IMEI,并且一个来自weibo.log一个来自app.log,就把这两条记录拼接起来,reducer.py:


 1 #!encoding=utf-8
2 from sys import stdin
3
4 wIMEI = None#记录来自为微博的IMEI
5 weibo = None
6
7 aIMEI = None#记录来自app的IMEI
8 app = None
9
10 for line in stdin:
11 data = line.strip().split(‘\t‘,2)
12 if len(data) != 3:#过滤错误的数据行
13 continue
14 if data[1] == ‘A‘:
15 aIMEI = data[0]
16 app = data[2]
17 elif data[1] == ‘W‘:
18 wIMEI = data[0]
19 weibo = data[2]
20 if wIMEI == aIMEI and wIMEI is not None:#两个IMEI相等时连接两行
21 print ‘%s\t%s\t%s‘ % (wIMEI,app,weibo)
22 aIMEI = wIMEI = None#重置
23
24 if wIMEI == aIMEI and wIMEI is not None:#最后的记录不要忘记输出
25 print ‘%s\t%s\t%s‘ % (wIMEI,app,weibo)

  连接map-reduce,整个等值连接结果如下:

$ cat aw.log |python mapper.py |sort -k1 | python reducer.py
I86733020 8111115 U5 1287684 W5
I86733022 8111118 U8 1287686 W7
I86733048 8111113 U3 1287681 W2

  上述的map-reduce阶段依然可以用一个程序跑,但日志文件往往比较大,几GB到几十GB也很正常,但我们的map-reduce总是能伸缩自如,用Hadoop不怕数据量大。

 三、用map-reduce做矩阵的迭代运算

  我们来看最后一个例子,一个矩阵的一下轮的元素等于其周围四面八方所有元素以及自己的之和的平均值,比如下面矩阵:

1    2    3

4 5 6

7 8 9

  下一轮矩阵为:

(1+2+4+5)/4        (1+2+3+4+5+6)/6                (2+3+5+6)/4

(1+2+4+5+7+8)/6   (1+2+3+4+5+6+7+8+9)/9    (2+3+5+6+8+9)/6

(4+5+7+8)/5 (4+5+6+7+8+9)/6 (5+6+8+9)/4

  当矩阵非常大的时候,我们需要把矩阵转为三元组的形式<x,y,value>,这样方便map-reduce的处理,否则,矩阵非常大,矩阵的一行就能爆内存。比如上面的矩阵转化为三元组后变为:


0,0,1
0,1,2
0,2,3
1,0,4
1,1,5
1,2,6
2,0,7
2,1,8
2,2,9

  现在我们处理map,先理清map阶段需要发射的key和value是什么,map扫描每一行得到的是矩阵的一个元素,而这个元素会参与周围所有元素的均值计算,因此我们对每一个<x,y,value>输出的键值对是(<xi,yj,>,<x,y,value>),<xi,yj,>是<x,y>周围的坐标,map输出后,按xi,yj顺序排序,对所有key值相同的元素累加并求平均值,mapper.py:


 1 #!encoding=utf-8
2 from sys import stdin
3
4 M = 3#矩阵的行数
5 N = 3#矩阵的列数
6
7 for line in stdin:
8 data = line.strip().split(‘,‘)
9 x,y = map(int,data[0:2])
10 value = float(data[2])
11 for i in xrange(-1,2):
12 if x + i < 0 or x + i >= M:#超出上下边界
13 continue
14 for j in xrange(-1,2):
15 if y + j < 0 or y + j >= N:#超出左右边界
16 continue
17 print ‘%d,%d\t%f‘ % (x+i,y+j,value)#为周围的每个值计算提供贡献

  执行mapper.py,排序时注意一下,字段分割符是制表符‘\t‘,实际上hadoop的默认分割符就是制表符:

?




$ cat data.txt |python mapper.py |sort -t$‘\t‘

 mapper.py输出的中间过程比较长,我们看一下部分结果:


0,0    1.000000
0,0 2.000000
0,0 4.000000
0,0 5.000000
0,1 1.000000
0,1 2.000000
0,1 3.000000
0,1 4.000000
0,1 5.000000
0,1 6.000000
0,2 2.000000
0,2 3.000000
0,2 5.000000
0,2 6.000000

  接下来写reducer.py,在reduce阶段依旧是扫描每一行,并判断是否与上一行的key相同,如果相同就累加value,如果不同,则计算平均值,并输出到标准输出:


 1 from sys import stdin
2 last = None
3 count = 0
4 s = 0
5 for line in stdin:
6 p,v = line.strip().split(‘\t‘)
7 value = float(v)
8 if last != p:
9 if last:
10 print ‘%5s,%10f‘ % (last,s/count)
11 last = p
12 s = 0
13 count = 0
14 s += value
15 count += 1
16 if last:
17 print ‘%5s,%10f‘ % (last,s/count)

  连接map-reduce执行:

?




$ cat data.txt |python mapper.py |sort -t$‘\t‘|python reducer.py

  输出如下:  


  0,0,  3.000000
0,1, 3.500000
0,2, 4.000000
1,0, 4.500000
1,1, 5.000000
1,2, 5.500000
2,0, 6.000000
2,1, 6.500000
2,2, 7.000000

  其实这里,矩阵连续迭代到max(M,N)+3次之后,矩阵的每个元素都将趋向于整个矩阵的平均值,因为经过max(M,N)+3次迭代后,最左边的元素已经传递到了最右边,最上面的元素也已经传递到了最下边,作为一个单机版的测试,我写了迭代测试:


 1 from os import system
2
3 lastMatrix = ‘data.txt‘
4 matrix = None
5 for i in xrange(1,4+3):
6 if matrix != None:
7 lastMatrix = matrix
8 matrix = ‘data.‘+str(i)
9 print i
10 cmd = ‘cat ‘+lastMatrix+"|python mapper.py |sort -t‘\t‘|python reducer.py >"+matrix
11 system(cmd)
12
13 system(‘cat ‘+matrix)

  迭代6次后输出: 


  0,0,  4.937500
0,1, 4.953125
0,2, 4.968750
1,0, 4.984375
1,1, 5.000000
1,2, 5.015625
2,0, 5.031250
2,1, 5.046875
2,2, 5.062500

  可以看到已经基本接近矩阵均值5了。

四、总结

  上面介绍了三个实例,用map-reduce来处理数据,可以看到,按照这种思路写代码,不仅思路清晰,而且扩展性强,因为背后有hadoop在支撑。需要注意的是,上面的map-reduce代码在hadoop中运行时,应该对每一行的处理加上一个try...except,过滤掉那些异常数据,因为海量数据里总是有一些噪声数据,如果没有try-except,hadoop虽然在失败时会重试,但重试多次后任然失败,那整个任务就会失败了。

  最后,完整的代码在github上可以查看。

转载请注明出处:http://www.cnblogs.com/fengfenggirl/

用Map-Reduce的思维处理数据,布布扣,bubuko.com

时间: 2025-01-16 01:16:59

用Map-Reduce的思维处理数据的相关文章

Node.js结合使用MongDb的Map.reduce功能进行大量数据简化处理办法

一年前,准备使用mongDb自带的map,reduce功能模拟hadoop,换个思路做一个简易的大数据分拆再结合存储的办法: 这个功能可以用于数据日志或者游戏数据之类,进行周期性归纳和按照自己需求重组数据; 以下代码实现了将每日数据collecttion:gameLog日期的数据统计出不同的collection.具体不详述,只讲思路和遇到的问题: 主要代码如下: var MongoClient = require('mongodb').MongoClient;MongoClient.connec

Map/Reduce 工作机制分析 --- 数据的流向分析

前言 在MapReduce程序中,待处理的数据最开始是放在HDFS上的,这点无异议. 接下来,数据被会被送往一个个Map节点中去,这也无异议. 下面问题来了:数据在被Map节点处理完后,再何去何从呢? 这就是本文探讨的话题. Shuffle 在Map进行完计算后,将会让数据经过一个名为Shuffle的过程交给Reduce节点: 然后Reduce节点在收到了数据并完成了自己的计算后,会将结果输出到Hdfs. 那么,什么是Shuffle阶段,它具体做什么事情? 需要知道,这可是Hadoop最为核心的

第九篇:Map/Reduce 工作机制分析 - 数据的流向分析

前言 在MapReduce程序中,待处理的数据最开始是放在HDFS上的,这点无异议. 接下来,数据被会被送往一个个Map节点中去,这也无异议. 下面问题来了:数据在被Map节点处理完后,再何去何从呢? 这就是本文探讨的话题. Shuffle 在Map进行完计算后,将会让数据经过一个名为Shuffle的过程交给Reduce节点: 然后Reduce节点在收到了数据并完成了自己的计算后,会将结果输出到Hdfs. 那么,什么是Shuffle阶段,它具体做什么事情? 需要知道,这可是Hadoop最为核心的

ng机器学习视频笔记(十五) ——大数据机器学习(随机梯度下降与map reduce)

ng机器学习视频笔记(十五) --大数据机器学习(随机梯度下降与map reduce) (转载请附上本文链接--linhxx) 一.概述 1.存在问题 当样本集非常大的时候,例如m=1亿,此时如果使用原来的梯度下降算法(也成为批量梯度下降算法(batch gradient descent),下同),则速度会非常慢,因为其每次遍历整个数据集,才完成1次的梯度下降的优化.即计算机执行1亿次的计算,仅仅完成1次的优化,因此速度非常慢. 2.数据量考虑 在使用全量数据,而不是摘取一部分数据来做机器学习,

记一次MongoDB Map&amp;Reduce入门操作

需求说明 用Map&Reduce计算几个班级中,每个班级10岁和20岁之间学生的数量: 需求分析 学生表的字段: db.students.insert({classid:1, age:14, name:'Tom'}) 将classid随机1和2.age在8-25岁之间随机,name在3-7个字符之间随机. 数据写入 数据写入java脚本 往mrtask库中students写入1000万条数据: package org.test; import java.util.ArrayList; impor

js数组中indexOf/filter/forEach/map/reduce详解

今天在网上看到一篇帖子,如题: 出处:前端开发博客 (http://caibaojian.com/5-array-methods.html) 在ES5中一共有9个Array方法,分别是: Array.prototype.indexOf Array.prototype.lastIndexOf Array.prototype.every Array.prototype.some Array.prototype.forEach Array.prototype.map Array.prototype.f

map reduce

作者:Coldwings链接:https://www.zhihu.com/question/29936822/answer/48586327来源:知乎著作权归作者所有,转载请联系作者获得授权. 简单的说就是问题可以划分成若干单元,每个单元的计算互不相关,单元计算结果可以在可以承受的时间内合成为总结果的计算.再说直白一点:所有分治模型都可交由hadoop解决.可以说spark是功能更全面的hadoop,支持一些诸如filter.group之类的操作,但是原本思想仍是map reduce,差别不太大

MapReduce核心map reduce shuffle (spill sort partition merge)详解

Shuffle过程是MapReduce的核心,也被称为奇迹发生的地方.要想理解MapReduce, Shuffle是必须要了解的.Shuffle的正常意思是洗牌或弄乱,可能大家更熟悉的是Java API里Collections.shuffle(List)方法,它会随机地打乱参数list里的元素顺序.如果你不知道MapReduce里 Shuffle是什么,那么请看这张图: 这张是官方对Shuffle过程的描述.但我可以肯定的 是,单从这张图你基本不可能明白Shuffle的过程,因为它与事实相差挺多

如何在MAP/REDUCE中不检查输出路径?

前言 如果在REDUCE中并没有涉及到生成HDFS文件,比如只是将一些数据写入REDIS,那么每次都要提供一个不存在的OUTPUT,真是挺麻烦的,有没有机制可以让MAP/REDUCE作业不要检查输出路径? 检查输出路径的机制 经常,我们的输出格式是这样的: job.setOutputFormatClass(TextOutputFormat.class); 跟踪下TextOutputFormat,扫描下其中的方法,发现没有检查输出路径的方法. 向上检查TextOutputFormat的父类File