用python + hadoop streaming 编写分布式程序(三) -- 自定义功能

又是期末又是实训TA的事耽搁了好久……先把写好的放上博客吧

前文:

用python + hadoop streaming 编写分布式程序(一) -- 原理介绍,样例程序与本地调试

用python + hadoop streaming 编写分布式程序(二) -- 在集群上运行与监控

使用额外的文件

假如你跑的job除了输入以外还需要一些额外的文件(side data),有两种选择:

  1. 大文件

    所谓的大文件就是大小大于设置的local.cache.size的文件,默认是10GB。这个时候可以用-file来分发。除此之外代码本身也可以用file来分发。

    格式:假如我要加多一个sideData.txt给python脚本用:

    $HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar     -input iputDir     -output outputDir     -mapper mapper.py     -file mapper.py     -reducer reduer.py     -file reducer.py     -file sideDate.txt
    

    在python脚本里,只要把这个文件当成自己同一目录下的本地文件来打开就可以了。比如:

    f = open("sideData.txt")
    

    注意这个file是只读的,不可以写。

  2. 小文件

    如果是比较小的文件,想要提高读写速度可以将它放在distributed cache里(也就是每台机器都有自己的一份copy,不需要网络IO就可以拿到数据)。这里要用到的参数是-cachefile,写法和用法上一个一样,就是-file改成-cachefile而已。

控制partitioner

partitioning指的是数据经过mapper处理后,被分发到reducer上的过程。partitioner控制的,就是“怎样的mapper输出会被分发到哪一个reducer上”。

Hadoop有几个自带的partitioner,解释可以看这里。默认的是HashPartitioner,也就是把第一个tab前的key做hash之后用于分配partition。写Hadoop Streaming程序是可以选择其他partitioner的,你可以选择自带的其他几种里的一种,也可以自己写一个继承Partitioner的java类然后编译成jar,在运行参数里指定为你用的partitioner。

官方自带的partitioner里最常用的是KeyFieldBasedPartitioner(源代码可以看这里)。它会按照key的一部分来做partition,而不是用整个key来做partition。

在学会用KeyFieldBasedPartitioner之前,必然要先学怎么控制key-value的分割。分割key的步骤可以分为两步,用python来描述一下大约是

fields = output.split(seperator)
key = fields[:numKeyfields]
  1. 选择用什么符号来分割key,也就是选择seperator

    map.output.key.field.separator可以指定用于分隔key的符号。比如指定为一点的话,就要加上参数

    -D stream.map.output.field.separator=.
    

    假设你的mapper输出是

    11.22.33.44
    

    这时会先看准[11, 22, 33, 44]这里的其中一个或几个作为key

  2. 选择key的范围,也就是选择numKeyfields

    控制key的范围的参数是这个,假设我要设置被分割出的前2个元素为key:

    -D stream.num.map.output.key.fields=2
        

    那么key就是上面的 1122。值得注意的是假如这个数字设置到覆盖整个输出,在这个例子里是4的话,那么整一行都会变成key。

上面分割出key之后, KeyFieldBasedPartitioner还需要知道你想要用key里的哪部分作为partition的依据。它进行配置的过程可以看源代码来理解。

假设在上一步我们通过使用

-D stream.map.output.field.separator=. -D stream.num.map.output.key.fields=4     

将11.22.33.44的整个字符串都设置成了key,下一步就是在这个key的内部再进行一次分割。map.output.key.field.separator可以用来设置第二次分割用的分割符,mapred.text.key.partitioner.options可以接受参数来划分被分割出来的partition key,比如:

-D map.output.key.field.separator=. -D mapred.text.key.partitioner.options=-k1,2     

指的就是在key的内部里,将第1到第2个被点分割的元素作为partition key,这个例子里也就是1122。这里的值-ki,j表示从i到j个元素(inclusive)会作为partition key。如果终点省略不写,像-ki的话,那么i和i之后的元素都会作为partition key。

partition key相同的输出会保证分到同一个reducer上,也就是所有11.22.xx.xx的输出都会到同一个partitioner,11.22换成其他各种组合也是一样。

实例说明一下,就是这样的:

  1. mapper的输出是

    11.12.1.2
    11.14.2.3
    11.11.4.1
    11.12.1.1
    11.14.2.2
    
  2. 指定前4个元素做key,key里的前两个元素做partition key,分成3个partition的话,就会被分成
    11.11.4.1
    -----------
    11.12.1.2
    11.12.1.1
    -----------
    11.14.2.3 11.14.2.2
    
  3. 下一步reducer会对自己得到的每个partition内进行排序,结果就是
    11.11.4.1
    -----------
    11.12.1.1
    11.12.1.2
    -----------
    11.14.2.2
    11.14.2.3 

命令格式大约就是长这样

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar         -D stream.map.output.field.separator=.         -D stream.num.map.output.key.fields=4         -D map.output.key.field.separator=.         -D mapred.text.key.partitioner.options=-k1,2         -input inputDir         -output outputDir         -mapper mapper.py -file mapper.py         -reducer reducer.py -file reducer.py         -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
    

注意-D参数放在前面,指定用KeyFieldBasedPartitioner的-partitioner要放在下面。

控制comparator与自定义排序

上面说到mapper的输出被partition到各个reducer之后,会有一步排序。这个排序的标准也是可以通过设置comparator控制的。和上面一样,要先设置分割出key用的分隔符、key的范围,key内部分割用的分隔符

-D stream.map.output.field.separator=. -D stream.num.map.output.key.fields=4 -D map.output.key.field.separator=. \

这里要控制的就是key内部的哪些元素用来做排序依据,是排字典序还是数字序,倒序还是正序。用来控制的参数是mapred.text.key.comparator.options,接受的值格式类似于unix sort。比如我要按第二个元素的数字序(默认字典序)+倒序来排元素的话,就用

-D mapred.text.key.comparator.options=-k2,2nr

n表示数字序,r表示倒序。这样一来

11.12.1.2
11.14.2.3
11.11.4.1
11.12.1.1
11.14.2.2

就会被排成

11.14.2.3
11.14.2.2
11.12.1.2
11.12.1.1
11.11.4.1

用python + hadoop streaming 编写分布式程序(三) -- 自定义功能

时间: 2024-12-16 06:52:26

用python + hadoop streaming 编写分布式程序(三) -- 自定义功能的相关文章

用python + hadoop streaming 编写分布式程序(二) -- 在集群上运行与监控

写在前面 前文:用python + hadoop streaming 编写分布式程序(一) -- 原理介绍,样例程序与本地调试 为了方便,这篇文章里的例子均为伪分布式运行,一般来说只要集群配置得当,在伪分布式下能够运行的程序,在真实集群上也不会有什么问题. 为了更好地模拟集群环境,我们可以在mapred-site.xml中增设reducer和mapper的最大数目(默认为2,实际可用数目大约是CPU核数-1). 假设你为Hadoop安装路径添加的环境变量叫$HADOOP_HOME(如果是$HAD

自制 python hadoop streaming 数据分析工具

https://github.com/zhuyi10/hadoop_data_analysis跟大家交流一下我写的数据分析工具用hadoop streaming执行python写的mapper, reducer目前只实现了一些简单的分析功能希望大家多提意见

python练习:编写一个程序,检查3个变量x,y,z,输出其中最大的奇数。如果其中没有奇数,就输出一个消息进行说明。

笔者是只使用条件语句实行的.(if-else) 重难点:先把三个数进行由小到大的排序,然后再从最大数进行判断,如果是奇数就输出,如果不是就判断下一个数. 1 print("----------------------------") 2 x,y,z=1,4,6 3 if x>y: 4 x,y=y,x#交换两个变量的值 5 if y>z:#这里注意单独写一个if而不是elif 6 y,z=z,y 7 if z%2==1: 8 print(z) 9 elif y%2==1: 1

python练习:编写一个程序,要求用户输入10个整数,然后输出其中最大的奇数,如果用户没有输入奇数,则输出一个消息进行说明。

重难点:通过input函数输入的行消息为字符串格式,必须转换为整型,否则不能进行排序交换位置.通过索引的方式可以查看字符串中的每一个字符,双层for循环进行冒泡排序.for循环的倒序输出方式:for z in range(9,1,-1):.break直接跳出循环.通过标志位判断是否输出过奇数. 1 print("----------------------------") 2 num=input('请输入十个整数:')#通过input函数输入,这时返回值为行信息的字符串 3 for m

一脸懵逼学习Hadoop中的MapReduce程序中自定义分组的实现

1:首先搞好实体类对象: write 是把每个对象序列化到输出流,readFields是把输入流字节反序列化,实现WritableComparable,Java值对象的比较:一般需要重写toString(),hashCode(),equals()方法 1 package com.areapartition; 2 3 import java.io.DataInput; 4 import java.io.DataOutput; 5 import java.io.IOException; 6 7 im

Hadoop_21_编写MapReduce程序实现Join功能

1.序列化与Writable接口 1.1.hadoop的序列化格式 序列化和反序列化就是结构化对象和字节流之间的转换,主要用在内部进程的通讯和持久化存储方面 hadoop在节点间的内部通讯使用的是RPC,RPC协议把消息翻译成二进制字节流发送到远程节点,远程节点再通过反序 列化把二进制流转成原始的信息 hadoop自身的序列化存储格式实现了Writable接口的类,他只实现了前面压缩和快速.但是不容易扩展也不跨语言 我们先来看下Writable接口,Writable接口定义了两个方法: 1.将数

用Python编写WordCount程序任务

1. 用Python编写WordCount程序并提交任务 程序 WordCount 输入 一个包含大量单词的文本文件 输出 文件中每个单词及其出现次数(频数),并按照单词字母顺序排序,每个单词和其频数占一行,单词和频数之间有间隔 2.编写map函数,reduce函数 import sys for line in sys.stdin: line=line.strip() words=line.split() for word in words: print '%s\t%s' % (word,1)

hadoop streaming anaconda python 计算平均值

原始Liunx 的python版本不带numpy ,安装了anaconda 之后,使用hadoop streaming 时无法调用anaconda python  , 后来发现是参数没设置好... 进入正题: 环境: 4台服务器:master slave1  slave2  slave3. 全部安装anaconda2与anaconda3, 主环境py2 .anaconda2与anaconda3共存见:Ubuntu16.04 Liunx下同时安装Anaconda2与Anaconda3 安装目录:/

Hadoop Streaming例子(python)

以前总是用java写一些MapReduce程序现举一个例子使用Python通过Hadoop Streaming来实现Mapreduce. 任务描述: HDFS上有两个目录/a和/b,里面数据均有3列,第一列都是id,第二列是各自的业务类型(这里假设/a对应a,/b对应b),第三列是一个json串.各举一例: /a的一行:1234567 a {"name":"jiufeng","age":"27","sex"