Hadoop Streaming例子(python)

  以前总是用java写一些MapReduce程序现举一个例子使用Python通过Hadoop Streaming来实现Mapreduce。

  任务描述:

  HDFS上有两个目录/a和/b,里面数据均有3列,第一列都是id,第二列是各自的业务类型(这里假设/a对应a,/b对应b),第三列是一个json串。各举一例:

  /a的一行:1234567  a  {"name":"jiufeng","age":"27","sex":"male","school":"","status":["111","000","001"],...}

  /b的一行:12345  b  {"a":"abc","b":"adr","xxoo":"e",...}

  要查找在/a中出现"status"且有"111"状态,而且要再/b中有这个id的所有id列表。

  那么来吧,首先需要mapper来提取/a中满足"status"有"111"状态的id和第二列"a"、/b中所有行的前两列,python代码如下,mapper.py:

 1 #!/usr/bin/env python
 2 #coding = utf-8
 3
 4 import json
 5 import sys
 6 import traceback
 7 import datetime,time
 8
 9 def mapper():
10     for line in sys.stdin:
11         line = line.strip()
12         id,tag,content = line.split(‘\t‘)
13         if tag == ‘a‘:
14             jstr = json.loads(content)
15             active = jstr.get(‘status‘,[])
16             if "111" in active:
17                 print ‘%s\t%s‘ %(id,tag)
18         if tag == ‘b‘:
19             print ‘%s\t%s‘ % ( id,tag)
20
21 if __name__ == ‘__main__‘:
22     mapper()

  这个mapper是从表中输入中提取数据,然后将满足条件的数据通过标准输出。然后是reducer.py:

 1 #!/usr/bin/env python
 2 #coding = utf-8
 3
 4 import sys
 5 import json
 6
 7 def reducer():
 8     tag_a = 0
 9     tag_b = 0
10     pre_id = ‘‘
11     for line in sys.stdin:
12         line = line.strip()
13         current_id,tag = line.split(‘\t‘)
14         if current_id != pre_id:
15             if tag_a==1 and tag_b==1:
16                 tag_a = 0
17                 tag_b = 0
18                 print ‘%s‘ % pre_id
19             else :
20                 tag_a = 0
21                 tag_b = 0
22         pre_id = current_id
23         if tag == ‘a‘:
24             if tag_a == 0:
25                 tag_a = 1
26         if tag == ‘b‘:
27             if tag_b == 0:
28                 tag_b = 1
29     if tag_b==1 and tag_b==1:
30         print ‘%s‘ % pre_id
31
32 if __name__ == ‘__main__‘:
33     reducer()

  一个reducer可以接受N多行数据,不像java那样的一行对应一个key然后多个value,而是一个key对应一个value,但好在相同key的行都是连续的,只要在key变化的时候做一下处理就行。

  然后安排让hadoop执行,schedule.py:

 1 #!/usr/bin/env python
 2 #coding = utf-8
 3
 4 import subprocess, os
 5 import datetime
 6
 7
 8 def mr_job():
 9     mypath = os.path.dirname(os.path.abspath(__file__))
10     inputpath1 = ‘/b/*‘
11     inputpath2 = ‘/a/*‘
12     outputpath = ‘/out/‘
13     mapper = mypath + ‘/mapper.py‘
14     reducer = mypath + ‘/reducer.py‘
15     cmds = [‘$HADOOP_HOME/bin/hadoop‘, ‘jar‘, ‘$HADOOP_HOME/contrib/streaming/hadoop-streaming-1.2.1.jar‘,
16             ‘-numReduceTasks‘, ‘40‘,
17             ‘-input‘, inputpath1,
18             ‘-input‘, inputpath2,
19             ‘-output‘, outputpath,
20             ‘-mapper‘, mapper,
21             ‘-reducer‘, reducer,
22             ‘-file‘, mapper,
23             ‘-file‘, reducer,]
24     for f in os.listdir(mypath):
25         cmds.append(mypath + ‘/‘ + f)
26     cmd = [‘$HADOOP_HOME/bin/hadoop‘, ‘fs‘, ‘-rmr‘, outputpath]
27     subprocess.call(cmd)
28     subprocess.call(cmds)
29
30
31 def main():
32     mr_job()
33
34 if __name__ == ‘__main__‘:
35     main()

  schedule.py就是执行MapReduce的地方通过调用hadoop-streamingXXX.jar会通过调用shell命令来提交job,另外可以配置一下参数,shell命令会将制定的文件上传到hdfs然后分发到各个节点执行。。。$HADOOP_HOME就是hadoop的安装目录。。。mapper和reducer的python脚本的名字无所谓,方法名无所谓因为在配置shell执行命令时已经指定了

  上述是一个很简单的python_hadoop-streamingXXX例子。。。。

时间: 2024-11-07 19:41:45

Hadoop Streaming例子(python)的相关文章

hadoop streaming anaconda python 计算平均值

原始Liunx 的python版本不带numpy ,安装了anaconda 之后,使用hadoop streaming 时无法调用anaconda python  , 后来发现是参数没设置好... 进入正题: 环境: 4台服务器:master slave1  slave2  slave3. 全部安装anaconda2与anaconda3, 主环境py2 .anaconda2与anaconda3共存见:Ubuntu16.04 Liunx下同时安装Anaconda2与Anaconda3 安装目录:/

用python + hadoop streaming 编写分布式程序(二) -- 在集群上运行与监控

写在前面 前文:用python + hadoop streaming 编写分布式程序(一) -- 原理介绍,样例程序与本地调试 为了方便,这篇文章里的例子均为伪分布式运行,一般来说只要集群配置得当,在伪分布式下能够运行的程序,在真实集群上也不会有什么问题. 为了更好地模拟集群环境,我们可以在mapred-site.xml中增设reducer和mapper的最大数目(默认为2,实际可用数目大约是CPU核数-1). 假设你为Hadoop安装路径添加的环境变量叫$HADOOP_HOME(如果是$HAD

自制 python hadoop streaming 数据分析工具

https://github.com/zhuyi10/hadoop_data_analysis跟大家交流一下我写的数据分析工具用hadoop streaming执行python写的mapper, reducer目前只实现了一些简单的分析功能希望大家多提意见

Hadoop Streaming

什么是Hadoop Streaming ? ? Hadoop提供的一个编程工具,允许用户使用任何可执行文件或脚本作为mapper和Reducer ? ? 比如shell中的cat作为mapper,wc作为reducer ? ? $HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-*-streaming.jar -input myInputDirs -output myOutputDir -mapper cat -re

Hadoop Streaming 使用及参数设置

1. MapReduce 与 HDFS 简介 什么是 Hadoop ? Google 为自己的业务需要提出了编程模型 MapReduce 和分布式文件系统 Google File System,并发布了相关论文(可在 Google Research 的网站上获得:GFS.MapReduce).Doug Cutting 和 Mike Cafarella 在开发搜索引擎 Nutch 时对这两篇论文进行了自己的实现,即同名的 MapReduce 和 HDFS,合起来就是 Hadoop. MapRedu

hadoop streaming编程小demo(python版)

都到了年根底下了,业务线黄了,成了惨兮兮的茶几.不说了. 换到了新的业务线,搞大数据质量评估.自动化质检和监控平台是用django,MR也是通过python实现的.(后来发现有odc压缩问题,python不知道怎么解决,正在改成java版本) 这里展示一个python编写MR的例子吧. 抄一句话:Hadoop Streaming是Hadoop提供的一个编程工具,它允许用户使用任何可执行文件或者脚本文件作为Mapper和Reducer. 1.首先,先介绍一下背景,我们的数据是存放在hive里的.h

用python + hadoop streaming 编写分布式程序(三) -- 自定义功能

又是期末又是实训TA的事耽搁了好久……先把写好的放上博客吧 前文: 用python + hadoop streaming 编写分布式程序(一) -- 原理介绍,样例程序与本地调试 用python + hadoop streaming 编写分布式程序(二) -- 在集群上运行与监控 使用额外的文件 假如你跑的job除了输入以外还需要一些额外的文件(side data),有两种选择: 大文件 所谓的大文件就是大小大于设置的local.cache.size的文件,默认是10GB.这个时候可以用-fil

Hadoop Streaming 编程

1.概述 Hadoop Streaming是Hadoop提供的一个编程工具,它允许用户使用任何可执行文件或者脚本文件作为Mapper和Reducer,例如: 采用shell脚本语言中的一些命令作为mapper和reducer(cat作为mapper,wc作为reducer) $HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/contrib/streaming/hadoop-*-streaming.jar \ -input myInputDirs \ -outpu

《Hadoop权威指南》笔记 第二章 Hadoop Streaming

什么是Hadoop Streaming ? ? Hadoop提供的一个编程工具,允许用户使用任何可执行文件或脚本作为mapper和Reducer ? ? 一个例子(shell简洁版本) ? ? $HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-*-streaming.jar -input myInputDirs -output myOutputDir -mapper cat -reducer wc ? ? 解析: