itemcf的hadoop实现优化(Python)

原始数据如下:

u1  a,d,b,c
u2  a,a,c
u3  b,d
u4  a,d,c
u5  a,b,c

计算公式使用:sim = U(i)∩U(j) / (U(i)∪U(j))

其中: (U(i)∪U(j)) = U(i) + U(j) -  U(i)∩U(j)

原始的Hadoop实现需要5轮MR,优化后只需要两轮就可以完成。

之前的轮数过多,主要在于计算(U(i)∪U(j)) 的时候,需要多次更改key,并非计算量大。只需要修改一下传递的key,就可以两轮实现。

mapper_1.py

#!/usr/bin/python
#-*-coding:utf-8-*-
import sys

for line in sys.stdin:
    user,item_str = line.strip().split()
    item_list = sorted(list(set(item_str.split(','))))
    print "item_str:",item_str,"item_list:",item_list
    for i in range(len(item_list)):
        i1 = item_list[i]
        print i1,1,'norm'
        for i2 in item_list[i+1:]:
            print i1,i2,1,'dot'

reducer_1.py

#!/usr/bin/python
#-*-coding:utf-8-*-
import sys

def PrintOut():
    i1 = old_key
    print i1,old_dict['norm'],'norm'
    for i2 in old_dict['dot']:
        print i1 + "-"  + i2,old_dict['dot'][i2],old_dict['norm'],'dot-norm_i1'

old_key = ""
old_dict = {'norm':0,'dot':{}}
for line in sys.stdin:
    sp = line.strip().split()
    if sp[-1] == 'norm':
        key,value = sp[:2]
        if key == old_key:
            old_dict['norm'] += int(value)
        else:
            if old_key != "":
                PrintOut()
            old_key = key
            # Notice: norm part should be int(value)
            old_dict = {'norm':int(value),'dot':{}}
    elif sp[-1] ==  'dot':
        key,i2,value = sp[:3]
        if key == old_key:
            if i2 not in old_dict['dot']:
                old_dict['dot'][i2] = 0
            old_dict['dot'][i2] += int(value)
        else:
            if old_dot_key != "":
                PrintOut()
            old_key = key
            old_dict = {'norm':int(value),'dot':{}}

if old_key != "":
    PrintOut()

mapper_2.py

#!/usr/bin/python
#-*-coding:utf-8-*-
import sys

for line in sys.stdin:
    sp = line.strip().split()
    if sp[-1] == 'norm':
        print line.strip()
    elif sp[-1] == "dot-norm_i1":
        key,dot,norm_i1 = sp[:3]
        i1,i2 = key.split('-')
        print i2,i1,dot,norm_i1,'dot-norm_i1'

reducer_2.py

#!/usr/bin/python
#-*-coding:utf-8-*-
import sys

def GenSim(norm_i1,norm_i2,dot):
    return float(dot) / (int(norm_i1) + int(norm_i2) - int(dot))

def PrintOut():
    i2 = old_key
    norm_i2 = old_dict['norm']
    for i1 in old_dict['dot']:
        dot,norm_i1 = old_dict['dot'][i1]
        sim = GenSim(norm_i1,norm_i2,dot)
        print i1+"-"+i2,dot,norm_i1,norm_i2,sim,'dot,norm_i1,norm_i2,sim'

old_key = ""
old_dict = {'norm':"",'dot':{}}
for line in sys.stdin:
    sp = line.strip().split()
    if sp[-1] == 'norm':
        key,value = sp[:2]
        if key == old_key:
            old_dict['norm'] = value
        else:
            if old_key != "":
                PrintOut()
            old_key = key
            old_dict = {'norm':value,"dot":{}}
    elif sp[-1] == 'dot-norm_i1':
        key,i1,dot,norm_i1 = sp[:4]  #key is i2.
        if key == old_key:
            if i1 not in old_dict['dot']:
                old_dict['dot'][i1] = (dot,norm_i1)
            else:
                if old_key != "":
                    PrintOut()
                old_key = key
                old_dict = {'norm':value,'dot':{i1:(dot,norm_i1)}}

if old_key != "":
    PrintOut()

执行脚本 t.sh:

#!/bin/bash

cat user_log.txt |./mapper_1.py |sort -k1 > d.m.1
cat d.m.1 |./reducer_1.py > d.r.1

cat d.r.1 |./mapper_2.py |sort -k1 > d.m.2
cat d.m.2 |./reducer_2.py > d.r.2
时间: 2024-08-26 01:30:27

itemcf的hadoop实现优化(Python)的相关文章

Hadoop性能优化点小结

最近一段时间看了许多Hadoop性能优化相关的资料,于是花了点时间整理了一下,希望给正在苦于Hadoop集群性能问题的博友们一点建议吧. 1.Hadoop在存储有输入数据的节点上运行map任务,可以获得最佳性能,称为"数据本地化优化",所以一般会设置最大分片的大小应该与块大小相同,如果分片跨越2个块的大小,必然要经过网络传输到不同的节点上读取数据. 2.适当的时候使用Combine函数.Combine的阶段是在map阶段到reduce阶段之间进行的,在某些Job中,设置Combine可

Hadoop企业优化

1.MapReduce跑的慢的原因 MapReduce程序效率的瓶颈主要在于两点: 1.机器性能不足(CPU.内存.磁盘健康.网络) 2.IO操作优化 数据倾斜 Map和Reduce数设置不合理 Map运行时间太长,导致Reduce等待的时间太久 小文件 大量不可切分的超大文件 spill溢写次数过多 merge次数过多 2.优化方法 我们可以从六个方面考虑优化问题 数据输入 Map阶段 Reduce阶段 IO传输 数据倾斜问题 常用参数调优 2.1 数据输入 合并小文件:在执行MR任务前将小文

Hadoop平台优化

一:概述 随着企业要处理的数据量越来越大,MapReduce思想越来越受到重视.Hadoop是MapReduce的一个开源实现,由于其良好的扩展性和容错性,已经得到越来越广泛的应用. 二:存在问题: Hadoop作为一个基础数据处理平台,虽然其应用价值已经得到大家认可,但仍然存在问题,以下是主要几个: 1:Namenode/jobtracker单点故障 Hadoop采用的是master/slaves架构,该架构管理起来比较简单,但存在致命的单点故障和空间容量不足等缺点,这已经严重影响了Hadoo

hadoop streaming anaconda python 计算平均值

原始Liunx 的python版本不带numpy ,安装了anaconda 之后,使用hadoop streaming 时无法调用anaconda python  , 后来发现是参数没设置好... 进入正题: 环境: 4台服务器:master slave1  slave2  slave3. 全部安装anaconda2与anaconda3, 主环境py2 .anaconda2与anaconda3共存见:Ubuntu16.04 Liunx下同时安装Anaconda2与Anaconda3 安装目录:/

Hadoop任务优化建议 - 【Dr.Elephant系列文章-6】

使用Dr.Elephant来分析我们的任务,可以知道有哪些地方可以进行优化. 加速你的任务流程 对于特定的任务,最好有特定的参数配置.对于很多的应用场景来说,默认的任务配置并不能保证每个任务都有最好的性能.尽管对这些任务进行调优会花费一些时间,但是这些调优带来的性能提升是非常可观的. 有几个任务参数需要特别注意:mapper数量,reducer数量,io.*的配置,内存使用设置以及生成的文件数量.对这几个参数进行配置,让参数更适合当前的任务,可以极大的提升任务的执行性能. Apache的官网中H

在Hadoop平台跑python脚本

1.开发IDE,我使用的是PyCharm. 2.运行原理       使用python写MapReduce的“诀窍”是利用Hadoop流的API,通过STDIN(标准输入).STDOUT(标准输出)在Map函数和Reduce函数之间传递数据.我们唯一需要做的是利用Python的sys.stdin读取输入数据,并把我们的输出传送给sys.stdout.Hadoop流将会帮助我们处理别的任何事情. 3.Map阶段 [[email protected] ~]$ vim mapper.py #!/usr

Hadoop Streaming例子(python)

以前总是用java写一些MapReduce程序现举一个例子使用Python通过Hadoop Streaming来实现Mapreduce. 任务描述: HDFS上有两个目录/a和/b,里面数据均有3列,第一列都是id,第二列是各自的业务类型(这里假设/a对应a,/b对应b),第三列是一个json串.各举一例: /a的一行:1234567 a {"name":"jiufeng","age":"27","sex"

Hadoop参数优化

dfs.block.size 决定HDFS文件block数量的多少(文件个数),它会间接的影响Job Tracker的调度和内存的占用(更影响内存的使用), mapred.map.tasks.speculative.execution=true  mapred.reduce.tasks.speculative.execution=true 这是两个推测式执行的配置项,默认是true 所谓的推测执行,就是当所有task都开始运行之后,Job Tracker会统计所有任务的平均进度,如果某个task

优化python执行效率

开始优化前,写一个高级测试来证明原来代码很慢.你可能需要采用一些最小值数据集来复现它足够慢.通常一两个显示运行时秒的程序就足够处理一些改进的地方了. 有一些基础测试来保证你的优化没有改变原有代码的行为也是很必要的.你也能够在很多次运行测试来优化代码的时候稍微修改这些测试的基准. 那么现在,我们来来看看优化工具把. 简单的计时器 计时器很简单,这是一个最灵活的记录执行时间的方法.你可以把它放到任何地方并且副作用很小.运行你自己的计时器非常简单,并且你可以将其定制,使它以你期望的方式工作.例如,你个