最近做的事情是用mrjob写mapreduce程序,从mongo读取数据。我的做法很容易也很好懂,因为mrjob可以支持sys.stdin的读取,所以我考虑用一个python程序读mongo中的数据,然后同时让mrjob脚本接受输入,处理,输出。
具体方式:
readInMongoDB.py:
#coding:UTF-8 ‘‘‘ Created on 2014年5月28日 @author: hao ‘‘‘ import pymongo pyconn = pymongo.Connection(host,port=27017) pycursor = pyconn.userid_cid_score.find().batch_size(30) for i in pycursor: userId = i[‘userId‘] cid = i[‘cid‘] score = i[‘score‘] # temp = list() # temp.append(userId) # temp.append(cid) # temp.append(score) print str(userId)+‘,‘+str(cid)+‘,‘+str(score)
step1.py:
#coding:UTF-8 ‘‘‘ Created on 2014年5月27日 @author: hao ‘‘‘ from mrjob.job import MRJob # from mrjob import protocol import pymongo import logging import simplejson as sj class step(MRJob): ‘‘‘ ‘‘‘ # logging.c def parseMatrix(self, _, line): ‘‘‘ input one stdin for pymongo onetime search output contentId, (userId, rating) ‘‘‘ line = (str(line)) line=line.split(‘,‘) userId = line[0] # print userId cid = line[1] # print cid score = float(line[2]) # print score yield cid, (userId, float(score)) def scoreCombine(self, cid, userRating): ‘‘‘ 将对同一个内容的(用户,评分)拼到一个list里 ‘‘‘ userRatings = list() for i in userRating: userRatings.append(i) yield cid, userRatings def userBehavior(self, cid, userRatings): ‘‘‘ ‘‘‘ scoreList = list() for doc in userRatings: # 每个combiner结果 for i in doc: scoreList.append(i) for user1 in scoreList: for user2 in scoreList: if user1[0] == user2[0]: continue yield (user1[0], user2[0]), (user1[1], user2[1]) def steps(self): return [self.mr(mapper = self.parseMatrix, reducer = self.scoreCombine), self.mr(reducer = self.userBehavior),] if __name__==‘__main__‘: fp = open(‘a.txt‘,‘w‘) fp.write(‘[‘) step.run() fp.write(‘]‘) fp.close()
然后执行脚本 python readInMongoDB.py | python step1.py >> out.txt
这个方式在本地执行的非常好,没有任何问题(除开mrjob速度的问题,其实在本次应用中影响不大)
原文:http://blog.csdn.net/whzhcahzxh/article/details/29587059
时间: 2024-11-08 04:49:13