hadoop streaming编程小demo(python版)

都到了年根底下了,业务线黄了,成了惨兮兮的茶几。不说了。

换到了新的业务线,搞大数据质量评估。自动化质检和监控平台是用django,MR也是通过python实现的。(后来发现有odc压缩问题,python不知道怎么解决,正在改成java版本)

这里展示一个python编写MR的例子吧。

抄一句话:Hadoop Streaming是Hadoop提供的一个编程工具,它允许用户使用任何可执行文件或者脚本文件作为Mapper和Reducer。

1、首先,先介绍一下背景,我们的数据是存放在hive里的。hive建表语句如下:

我们将会解析元数据,和HDFS上的数据进行merge,方便处理。这里的partition_key用的是year/month/day。

hive (gulfstream_ods)> desc g_order;
OK
col_name        data_type       comment
order_id                bigint                  订单id
driver_id               bigint                  司机id,司机抢单前该值为0
driver_phone            string                  司机电话
passenger_id            bigint                  乘客id
passenger_phone         string                  乘客电话
car_id                  int                     接驾车辆id
area                    int                     城市id
district                string                  城市区号
type                    int                     订单时效,0 实时  1预约
current_lng             decimal(19,6)           乘客发单时的经度
current_lat             decimal(19,6)           乘客发单时的纬度
starting_name           string                  起点名称
starting_lng            decimal(19,6)           起点经度
starting_lat            decimal(19,6)           起点纬度
dest_name               string                  终点名称
dest_lng                decimal(19,6)           终点经度
dest_lat                decimal(19,6)           终点纬度
driver_start_distance   int                     司机与出发地的路面距离,单位:米
start_dest_distance     int                     出发地与终点的路面距离,单位:米
departure_time          string                  出发时间(预约单的预约时间,实时单为发单时间)
strive_time             string                  抢单成功时间
consult_time            string                  协商时间
arrive_time             string                  司机点击‘我已到达’的时间
setoncar_time           string                  上车时间(暂时不用)
begin_charge_time       string                  司机点机‘开始计费’的时间
finish_time             string                  完成时间
year                    string
month                   string
day                     string                                      

# Partition Information
# col_name              data_type               comment             

year                    string
month                   string
day                     string              

2、我们解析元数据

这里是解析元数据的过程。之后我们把元数据序列化后存入文件desc.gulfstream_ods.g_order,我们将会将此配置文件连同MR脚本一起上传到hadoop集群。

import subprocess
from subprocess import Popen

def desc_table(db, table):
    process = Popen(‘hive -e "desc %s.%s"‘ % (db, table),
            shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    stdout, stderr = process.communicate()
    is_column = True
    structure_list = list()
    column_list = list()
    for line in stdout.split(‘\n‘):
        value_list = list()
        if not line or len(line.split()) < 2:
            break
        if is_column:
            column_list = line.split()
            is_column = False
            continue
        else:
            value_list = line.split()
        structure_dict = dict(zip(column_list, value_list))
        structure_list.append(structure_dict)

    return structure_list

3、下面是hadoop streaming执行脚本。

#!/bin/bashsource /etc/profilesource ~/.bash_profile

#hadoop目录echo "HADOOP_HOME: "$HADOOP_HOMEHADOOP="$HADOOP_HOME/bin/hadoop"

DB=$1TABLE=$2YEAR=$3MONTH=$4DAY=$5echo $DB--$TABLE--$YEAR--$MONTH--$DAY

if [ "$DB" = "gulfstream_ods" ]then    DB_NAME="gulfstream"else    DB_NAME=$DBfiTABLE_NAME=$TABLE

#输入路径input_path="/user/xiaoju/data/bi/$DB_NAME/$TABLE_NAME/$YEAR/$MONTH/$DAY/*"#标记文件后缀名input_mark="_SUCCESS"echo $input_path#输出路径output_path="/user/bigdata-t/QA/yangfan/$DB_NAME/$TABLE_NAME/$YEAR/$MONTH/$DAY"output_mark="_SUCCESS"echo $output_path#性能约束参数capacity_mapper=500capacity_reducer=200map_num=10reducer_num=10queue_name="root.dashujudidiyanjiuyuan-zhinengpingtaibu.datapolicy-develop"#启动job namejob_name="DW_Monitor_${DB_NAME}_${TABLE_NAME}_${YEAR}${MONTH}${DAY}"mapper="python mapper.py $DB $TABLE_NAME"reducer="python reducer.py"

$HADOOP fs -rmr $output_path$HADOOP jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.7.2.jar \    -jobconf mapred.job.name="$job_name" \    -jobconf mapred.job.queue.name=$queue_name \    -jobconf mapred.map.tasks=$map_num \    -jobconf mapred.reduce.tasks=$reducer_num \    -jobconf mapred.map.capacity=$capacity_mapper \    -jobconf mapred.reduce.capacity=$capacity_reducer \    -input $input_path \    -output $output_path \    -file ./mapper.py \    -file ./reducer.py \    -file ./utils.py \    -file ./"desc.${DB}.${TABLE_NAME}" \    -mapper "$mapper" \    -reducer "$reducer"if [ $? -ne 0 ]; then        echo "$DB_NAME $TABLE_NAME $YEAR $MONTH $DAY run faild"fi$HADOOP fs -touchz "${output_path}/$output_mark"rm -rf ./${DB_NAME}.${TABLE_NAME}.${YEAR}-${MONTH}-${DAY}$HADOOP fs -get $output_path/part-00000 ./${DB_NAME}.${TABLE_NAME}.${YEAR}-${MONTH}-${DAY}

4、这里是Wordcount的进阶版本,第一个功能是分区域统计订单量,第二个功能是在一天中分时段统计订单量。

mapper脚本

# -*- coding:utf-8 -*-
#!/usr/bin/env python
import sys
import json
import pickle
reload(sys)
sys.setdefaultencoding(‘utf-8‘)

# 将字段和元数据匹配, 返回迭代器
def read_from_input(file, separator, columns):
    for line in file:
        if line is None or line == ‘‘:
            continue
        data_list = mapper_input(line, separator)
        if not data_list:
            continue
        item = None
        # 最后3列, 年月日作为partitionkey, 无用
        if len(data_list) == len(columns) - 3:
            item = dict(zip(columns, data_list))
        elif len(data_list) == len(columns):
            item = dict(zip(columns, data_list))
        if not item:
            continue
        yield item

def index_columns(db, table):
    with open(‘desc.%s.%s‘ % (db, table), ‘r‘) as fr:
        structure_list = deserialize(fr.read())
    return [column.get(‘col_name‘) for column in structure_list]

# map入口
def main(separator, columns):
    items = read_from_input(sys.stdin, separator, columns)
    mapper_result = {}
    for item in items:
        mapper_plugin_1(item, mapper_result)
        mapper_plugin_2(item, mapper_result)

def mapper_plugin_1(item, mapper_result):
    # key在现实中可以是不同appkey, 是用来分发到不同的reducer上的, 相同的route用来分发到相同的reducer
    key = ‘route1‘
    area = item.get(‘area‘)
    district = item.get(‘district‘)
    order_id = item.get(‘order_id‘)
    if not area or not district or not order_id:
        return
    mapper_output(key, {‘area‘: area, ‘district‘: district, ‘order_id‘: order_id, ‘count‘: 1})

def mapper_plugin_2(item, mapper_result):
    key = ‘route2‘
    strive_time = item.get(‘strive_time‘)
    order_id = item.get(‘order_id‘)
    if not strive_time or not order_id:
        return
    try:
        day_hour = strive_time.split(‘:‘)[0]
        mapper_output(key, {‘order_id‘: order_id, ‘strive_time‘: strive_time, ‘count‘: 1, ‘day_hour‘: day_hour})except Exception, ex:
        pass

def serialize(data, type=‘json‘):
    if type == ‘json‘:
        try:
            return json.dumps(data)
        except Exception, ex:
            return ‘‘
    elif type == ‘pickle‘:
        try:
            return pickle.dumps(data)
        except Exception, ex:
            return ‘‘
    else:
        return ‘‘

def deserialize(data, type=‘json‘):
    if type == ‘json‘:
        try:
            return json.loads(data)
        except Exception, ex:
            return []
    elif type == ‘pickle‘:
        try:
            return pickle.loads(data)
        except Exception, ex:
            return []
    else:
        return []

def mapper_input(line, separator=‘\t‘):
    try:
        return line.split(separator)
    except Exception, ex:
        return None

def mapper_output(key, data, separator=‘\t‘):
    key = str(key)
    data = serialize(data)
    print ‘%s%s%s‘ % (key, separator, data)
    # print >> sys.stderr, ‘%s%s%s‘ % (key, separator, data)

if __name__ == ‘__main__‘:
    db = sys.argv[1]
    table = sys.argv[2]
    columns = index_columns(db, table)
    main(‘||‘, columns)

reducer脚本

#!/usr/bin/env python
# vim: set fileencoding=utf-8
import sys
reload(sys)
sys.setdefaultencoding(‘utf-8‘)
import json
import pickle
from itertools import groupby
from operator import itemgetter

def read_from_mapper(file, separator):
    for line in file:
        yield reducer_input(line)

def main(separator=‘\t‘):
    reducer_result = {}
    line_list = read_from_mapper(sys.stdin, separator)
    for route_key, group in groupby(line_list, itemgetter(0)):
        if route_key is None:
            continue
        reducer_result.setdefault(route_key, {})
        if route_key == ‘route1‘:
            reducer_plugin_1(route_key, group, reducer_result)
            reducer_output(route_key, reducer_result[route_key])
        if route_key == ‘route2‘:
            reducer_plugin_2(route_key, group, reducer_result)
            reducer_output(route_key, reducer_result[route_key])

def reducer_plugin_1(route_key, group, reducer_result):
    for _, data in group:
        if data is None or len(data) == 0:
            continue
        if not data.get(‘area‘) or not data.get(‘district‘) or not data.get(‘count‘):
            continue
        key = ‘_‘.join([data.get(‘area‘), data.get(‘district‘)])
        reducer_result[route_key].setdefault(key, 0)
        reducer_result[route_key][key] += int(data.get(‘count‘))
        # print >> sys.stderr, ‘%s‘ % json.dumps(reducer_result[route_key])

def reducer_plugin_2(route_key, group, reducer_result):
    for _, data in group:
        if data is None or len(data) == 0:
            continue
        if not data.get(‘order_id‘) or not data.get(‘strive_time‘) or not data.get(‘count‘) or not data.get(‘day_hour‘):
            continue
        key = data.get(‘day_hour‘)
        reducer_result[route_key].setdefault(key, {})
        reducer_result[route_key][key].setdefault(‘count‘, 0)
        reducer_result[route_key][key].setdefault(‘order_list‘, [])
        reducer_result[route_key][key][‘count‘] += int(data.get(‘count‘))
        if len(reducer_result[route_key][key][‘order_list‘]) < 100:
            reducer_result[route_key][key][‘order_list‘].append(data.get(‘order_id‘))
        # print >> sys.stderr, ‘%s‘ % json.dumps(reducer_result[route_key])

def serialize(data, type=‘json‘):
    if type == ‘json‘:
        try:
            return json.dumps(data)
        except Exception, ex:
            return ‘‘
    elif type == ‘pickle‘:
        try:
            return pickle.dumps(data)
        except Exception, ex:
            return ‘‘
    else:
        return ‘‘

def deserialize(data, type=‘json‘):
    if type == ‘json‘:
        try:
            return json.loads(data)
        except Exception, ex:
            return []
    elif type == ‘pickle‘:
        try:
            return pickle.loads(data)
        except Exception, ex:
            return []
    else:
        return []

def reducer_input(data, separator=‘\t‘):
    data_list = data.strip().split(separator, 2)
    key = data_list[0]
    data = deserialize(data_list[1])
    return [key, data]

def reducer_output(key, data, separator=‘\t‘):
    key = str(key)
    data = serialize(data)
    print ‘%s\t%s‘ % (key, data)
    # print >> sys.stderr, ‘%s\t%s‘ % (key, data)

if __name__ == ‘__main__‘:
    main()

5、上一个版本,遭遇了reduce慢的情况,原因有两个:一是因为route的设置,所有相同的route都将分发到同一个reducer,造成单个reducer处理压力大,性能下降。二是因为集群是搭建在虚拟机上的,性能本身就差。可以对这个问题进行改进。改进版本如下,方案是在mapper阶段先对数据进行初步的统计,缓解reducer的计算压力。

mapper脚本

# -*- coding:utf-8 -*-
#!/usr/bin/env python
import sys
import json
import pickle
reload(sys)
sys.setdefaultencoding(‘utf-8‘)

# 将字段和元数据匹配, 返回迭代器
def read_from_input(file, separator, columns):
    for line in file:
        if line is None or line == ‘‘:
            continue
        data_list = mapper_input(line, separator)
        if not data_list:
            continue
        item = None
        # 最后3列, 年月日作为partitionkey, 无用
        if len(data_list) == len(columns) - 3:
            item = dict(zip(columns, data_list))
        elif len(data_list) == len(columns):
            item = dict(zip(columns, data_list))
        if not item:
            continue
        yield item

def index_columns(db, table):
    with open(‘desc.%s.%s‘ % (db, table), ‘r‘) as fr:
        structure_list = deserialize(fr.read())
    return [column.get(‘col_name‘) for column in structure_list]

# map入口
def main(separator, columns):
    items = read_from_input(sys.stdin, separator, columns)
    mapper_result = {}
    for item in items:
        mapper_plugin_1(item, mapper_result)
        mapper_plugin_2(item, mapper_result)

    for route_key, route_value in mapper_result.iteritems():
        for key, value in route_value.iteritems():
            ret_dict = dict()
            ret_dict[‘route_key‘] = route_key
            ret_dict[‘key‘] = key
            ret_dict.update(value)
            mapper_output(‘route_total‘, ret_dict)

def mapper_plugin_1(item, mapper_result):
    # key在现实中可以是不同appkey, 是用来分发到不同的reducer上的, 相同的route用来分发到相同的reducer
    key = ‘route1‘
    area = item.get(‘area‘)
    district = item.get(‘district‘)
    order_id = item.get(‘order_id‘)
    if not area or not district or not order_id:
        returntry:
        # total统计
        mapper_result.setdefault(key, {})
        mapper_result[key].setdefault(‘_‘.join([area, district]), {})
        mapper_result[key][‘_‘.join([area, district])].setdefault(‘count‘, 0)
        mapper_result[key][‘_‘.join([area, district])].setdefault(‘order_id‘, [])
        mapper_result[key][‘_‘.join([area, district])][‘count‘] += 1
        if len(mapper_result[key][‘_‘.join([area, district])][‘order_id‘]) < 10:
            mapper_result[key][‘_‘.join([area, district])][‘order_id‘].append(order_id)
    except Exception, ex:
        pass

def mapper_plugin_2(item, mapper_result):
    key = ‘route2‘
    strive_time = item.get(‘strive_time‘)
    order_id = item.get(‘order_id‘)
    if not strive_time or not order_id:
        return
    try:
        day_hour = strive_time.split(‘:‘)[0]# total统计
        mapper_result.setdefault(key, {})
        mapper_result[key].setdefault(day_hour, {})
        mapper_result[key][day_hour].setdefault(‘count‘, 0)
        mapper_result[key][day_hour].setdefault(‘order_id‘, [])
        mapper_result[key][day_hour][‘count‘] += 1
        if len(mapper_result[key][day_hour][‘order_id‘]) < 10:
            mapper_result[key][day_hour][‘order_id‘].append(order_id)
    except Exception, ex:
        pass

def serialize(data, type=‘json‘):
    if type == ‘json‘:
        try:
            return json.dumps(data)
        except Exception, ex:
            return ‘‘
    elif type == ‘pickle‘:
        try:
            return pickle.dumps(data)
        except Exception, ex:
            return ‘‘
    else:
        return ‘‘

def deserialize(data, type=‘json‘):
    if type == ‘json‘:
        try:
            return json.loads(data)
        except Exception, ex:
            return []
    elif type == ‘pickle‘:
        try:
            return pickle.loads(data)
        except Exception, ex:
            return []
    else:
        return []

def mapper_input(line, separator=‘\t‘):
    try:
        return line.split(separator)
    except Exception, ex:
        return None

def mapper_output(key, data, separator=‘\t‘):
    key = str(key)
    data = serialize(data)
    print ‘%s%s%s‘ % (key, separator, data)
    # print >> sys.stderr, ‘%s%s%s‘ % (key, separator, data)

if __name__ == ‘__main__‘:
    db = sys.argv[1]
    table = sys.argv[2]
    columns = index_columns(db, table)
    main(‘||‘, columns)

reducer脚本

#!/usr/bin/env python
# vim: set fileencoding=utf-8
import sys
reload(sys)
sys.setdefaultencoding(‘utf-8‘)
import json
import pickle
from itertools import groupby
from operator import itemgetter

def read_from_mapper(file, separator):
    for line in file:
        yield reducer_input(line)

def main(separator=‘\t‘):
    reducer_result = {}
    line_list = read_from_mapper(sys.stdin, separator)
    for route_key, group in groupby(line_list, itemgetter(0)):
        if route_key is None:
            continue
        reducer_result.setdefault(route_key, {})if route_key == ‘route_total‘:
            reducer_total(route_key, group, reducer_result)
            reducer_output(route_key, reducer_result[route_key])

def reducer_total(route_key, group, reducer_result):
    for _, data in group:
        if data is None or len(data) == 0:
            continue
        if data.get(‘route_key‘) == ‘route1‘:
            reducer_result[route_key].setdefault(data.get(‘route_key‘), {})
            reducer_result[route_key][data.get(‘key‘)].setdefault(‘count‘, 0)
            reducer_result[route_key][data.get(‘key‘)].setdefault(‘order_id‘, [])
            reducer_result[route_key][data.get(‘key‘)][‘count‘] += data.get(‘count‘)
            for order_id in data.get(‘order_id‘):
                if len(reducer_result[route_key][data.get(‘key‘)][‘order_id‘]) <= 10:
                    reducer_result[route_key][data.get(‘key‘)][‘order_id‘].append(order_id)
        elif data.get(‘route_key‘) == ‘route2‘:
            reducer_result[route_key].setdefault(data.get(‘route_key‘), {})
            reducer_result[route_key][data.get(‘key‘)].setdefault(‘count‘, 0)
            reducer_result[route_key][data.get(‘key‘)].setdefault(‘order_id‘, [])
            reducer_result[route_key][data.get(‘key‘)][‘count‘] += data.get(‘count‘)
            for order_id in data.get(‘order_id‘):
                if len(reducer_result[route_key][data.get(‘key‘)][‘order_id‘]) <= 10:
                    reducer_result[route_key][data.get(‘key‘)][‘order_id‘].append(order_id)
        else:
            pass

def serialize(data, type=‘json‘):
    if type == ‘json‘:
        try:
            return json.dumps(data)
        except Exception, ex:
            return ‘‘
    elif type == ‘pickle‘:
        try:
            return pickle.dumps(data)
        except Exception, ex:
            return ‘‘
    else:
        return ‘‘

def deserialize(data, type=‘json‘):
    if type == ‘json‘:
        try:
            return json.loads(data)
        except Exception, ex:
            return []
    elif type == ‘pickle‘:
        try:
            return pickle.loads(data)
        except Exception, ex:
            return []
    else:
        return []

def reducer_input(data, separator=‘\t‘):
    data_list = data.strip().split(separator, 2)
    key = data_list[0]
    data = deserialize(data_list[1])
    return [key, data]

def reducer_output(key, data, separator=‘\t‘):
    key = str(key)
    data = serialize(data)
    print ‘%s\t%s‘ % (key, data)
    # print >> sys.stderr, ‘%s\t%s‘ % (key, data)

if __name__ == ‘__main__‘:
    main()
时间: 2024-12-11 15:13:21

hadoop streaming编程小demo(python版)的相关文章

Hadoop Streaming 编程

1.概述 Hadoop Streaming是Hadoop提供的一个编程工具,它允许用户使用任何可执行文件或者脚本文件作为Mapper和Reducer,例如: 采用shell脚本语言中的一些命令作为mapper和reducer(cat作为mapper,wc作为reducer) $HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/contrib/streaming/hadoop-*-streaming.jar \ -input myInputDirs \ -outpu

大数据Hadoop Streaming编程实战之C++、Php、Python

Streaming框架允许任何程序语言实现的程序在HadoopMapReduce中使用,方便已有程序向Hadoop平台移植.因此可以说对于hadoop的扩展性意义重大.接下来我们分别使用C++.Php.Python语言实现HadoopWordCount. 实战一:C++语言实现Wordcount 代码实现: 1)C++语言实现WordCount中的Mapper,文件命名为mapper.cpp,以下是详细代码 #include #include #include usingnamespacestd

编程导论(Python版)

第1篇 计算机科学基础 一.计算机组成原理    二.计算机网络原理    三.操作系统原理    四.编译原理    五.软件工程原理        第2篇 Python程序设计基础        part1 过程式编程    part2 面向对象编程    part3 函数式编程    part4 Python进阶 专题一 错误.调试和测试    专题二 正则表达式    专题三 IO编程    专题四 多进程和多线程    专题五 常用模块    专题六 网络编程    专题七 图形界面编

事件委托小demo(jq版)

<style type="text/css"> * { margin: 0; padding: 0; } .box1 { width: 200px; height: 60px; background: #00A3AF; } .box2 { width: 200px; height: 200px; background: #ee6600; display: none; } body{height: 100%;} </style> <div class=&qu

事件委托小demo(原生版)

<style type="text/css"> body, div, span { margin: 0; padding: 0; font-family: "\5FAE\8F6F\96C5\9ED1", Helvetica, sans-serif; font-size: 14px; } html, body { width: 100%; height: 100%; overflow: hidden; } #box { position: absolute

Hadoop Streaming

什么是Hadoop Streaming ? ? Hadoop提供的一个编程工具,允许用户使用任何可执行文件或脚本作为mapper和Reducer ? ? 比如shell中的cat作为mapper,wc作为reducer ? ? $HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-*-streaming.jar -input myInputDirs -output myOutputDir -mapper cat -re

《Hadoop权威指南》笔记 第二章 Hadoop Streaming

什么是Hadoop Streaming ? ? Hadoop提供的一个编程工具,允许用户使用任何可执行文件或脚本作为mapper和Reducer ? ? 一个例子(shell简洁版本) ? ? $HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-*-streaming.jar -input myInputDirs -output myOutputDir -mapper cat -reducer wc ? ? 解析:

Hadoop-2.4.1学习之Streaming编程

在之前的文章曾提到Hadoop不仅支持用Java编写的job,也支持其他语言编写的作业,比方Hadoop Streaming(shell.python)和Hadoop Pipes(c++),本篇文章将学习Hadoop Streaming编程. Streaming是hadoop自带的工具,封装在hadoop-streaming-版本.jar中,能够使用hadoop jar hadoop-streaming-版本.jar命令启动,在该命令中还须要指定mapper或/和reducer.当中mapper

用python + hadoop streaming 编写分布式程序(三) -- 自定义功能

又是期末又是实训TA的事耽搁了好久……先把写好的放上博客吧 前文: 用python + hadoop streaming 编写分布式程序(一) -- 原理介绍,样例程序与本地调试 用python + hadoop streaming 编写分布式程序(二) -- 在集群上运行与监控 使用额外的文件 假如你跑的job除了输入以外还需要一些额外的文件(side data),有两种选择: 大文件 所谓的大文件就是大小大于设置的local.cache.size的文件,默认是10GB.这个时候可以用-fil