import collections import itertools import multiprocessing import bz2 class MapReduce(object): def __init__(self,map_func,reduce_func,num_workers=None): self.map_func = map_func self.reduce_func = reduce_func self.pool = multiprocessing.Pool(num_workers) def partition(self,mapped_values): partition_data = collections.defaultdict(list) for key , value in mapped_values: partition_data[key].append(value) return partition_data.items() def __call__(self, inputs,chunksize=1): mao_response = self.pool.map(self.map_func,inputs,chunksize=chunksize) partitioned_data = self.partition(itertools.chain(*mao_response)) reduce_values = self.pool.map(self.reduce_func,partitioned_data) return reduce_values def mapper_match(one_file): output = [] for line in bz2.BZ2File(one_file).readlines(): line=line.rstrip().split() if line[3] == ‘web‘ and line[5] == ‘0‘: output.append((line[4],1)) def reduce_match(item): cookie,occurances = item return (cookie,sum(occurances)) def mapper_count(item): _ , count = item return [(count,1)] def reducer_count(item): freq , occurances = item return ((freq,sum(occurances))) import glob import operator input_files=‘sssssss‘ mapper = MapReduce(mapper_match,reduce_match) cokkie_feq = mapper(input_files) mapper = MapReduce(reducer_count,reducer_count) cookie_fep = mapper(cokkie_feq) cookie_fep.sort (key = operator.itemgetter(1),reverse = True) for key ,value in cookie_fep: print(key,value)
原文地址:https://www.cnblogs.com/1204guo/p/9166823.html
时间: 2024-10-17 05:35:51