# -*-coding:utf8-*-# __author__ = ‘hash‘ """ create time:16/7/5 15:42 """ from datetime import datetime, timedelta # os.environ[‘SPARK_HOME‘] = "/Users/play/software/spark" # 绝对路径 # sys.path.append("/Users/play/software/spark/python") # print os.environ[‘SPARK_HOME‘] from pyspark import SparkContext, SparkConf # Initialize SparkContext # sc = SparkContext(‘local‘) conf = SparkConf().setAppName("The GMV Sum BeiBei") conf.set(‘spark.logConf‘, False) sc = SparkContext(conf=conf) today_str=‘2016-07-05‘ today = datetime.strptime(today_str, "%Y-%m-%d") # today = datetime.today() today_str = today.strftime("%Y-%m-%d") # 指定日期,前一天 ytday = today - timedelta(days=1) ytday_str = ytday.strftime("%Y-%m-%d") base_path = "hdfs://master:9000/super_db/" category = "beibei-com" # source_path_current=‘/Users/play/TEMP/7-4/log/2016-07-03/beibei-com_2016-07-03*‘ # source_path_current = ‘/Users/play/TEMP/7-4/log/test/test.txt‘ # source_path_next = ‘/Users/play/TEMP/7-4/log/2016-07-04/beibei-com_2016-07-04-00*‘ # yesterday # source_path_current = ‘/super_db/raw_db/‘ + category + ‘/‘ + category + ‘_item/2016/‘ + ytday_str + ‘/‘ + category + ‘_‘ + ytday_str + ‘*‘ # today # source_path_next = ‘/super_db/raw_db/‘ + category + ‘/‘ + category + ‘_item/2016/‘ + today_str + ‘/‘ + category + ‘_‘ + today_str + ‘-00*‘ # source_path_current = ‘/super_db/raw_db/beibei/beibei_item/2016/beibei_item_07_04.txt‘ source_path_next = ‘/super_db/raw_db/beibei/beibei_item/2016/beibei_item_07_05.txt‘ # target_data_path = ‘/Users/play/TEMP/7-4/data/rs.txt‘ # target_stat_path = ‘/Users/play/TEMP/7-4/stat/rs.txt‘ # target_cat_stat_path = ‘/Users/play/TEMP/7-4/stat_by_cat/rs.txt‘ target_stat_path = base_path + "analytics_db/" + category + "/" + category + "_item/2016/" + ytday_str + "_stat2" print target_stat_path # def filter_log_item(x): # sp1 = x.split("#&#") # if len(sp1) == 10: # try: # x1 = datetime.strptime(sp1[9], ‘%Y-%m-%d %H:%M:%S‘) # return True # except: # return False # # return True # else: # return False def filter_log_item2(x): sp1 = x.split("#&#") if len(sp1) == 28: try: x1 = datetime.strptime(sp1[-1], ‘%Y-%m-%d %H:%M:%S‘) return True except: return False # return True else: return False def get_some_data(x): sp1 = x.split("#&#") # if len(sp1)<10:return None id = sp1[0].strip() cat = sp1[2].strip() brand = sp1[3].strip() price = float(sp1[4]) orig_price = float(sp1[5]) sales = int(sp1[6]) start_date = sp1[8].split(":")[0].strip() start_date1 = datetime.strptime(start_date, "%Y-%m-%d %H") price= 0 if price==-1 else price orig_price= 0 if orig_price==-1 else orig_price sales= 0 if sales==-1 else sales return ((id, start_date1), (start_date1, sales, price, cat, brand, orig_price)) def get_some_data2(x): sp1 = x.split("#&#") # if len(sp1)<10:return None # id = sp1[0].strip() # cat = sp1[2].strip() # brand = sp1[3].strip() # price = float(sp1[4]) # orig_price = float(sp1[5]) # # sales = int(sp1[6]) # start_date = sp1[8].split(":")[0].strip() # start_date1 = datetime.strptime(start_date, "%Y-%m-%d %H") id = sp1[3] start_date = sp1[27].split(":")[0].strip() start_date1 = datetime.strptime(start_date, "%Y-%m-%d %H") sales = sp1[8] price = sp1[6] orig_price = sp1[9] # stock = int(sp1[17]) if "\N" == sales:sales = 0 else:sales = int(sales) if "\N" == price: price = 0.0 else: price = float(price) if "\N" == orig_price: orig_price = 0.0 else: orig_price = float(orig_price) # return (id, (start_date, sales, stock, price)) cat="" brand="" return ((id, start_date1), (start_date1, sales, price, cat, brand, orig_price)) # 昨天销售数据 yesterday_sales_rdd = sc.textFile(source_path_current).filter(filter_log_item2).map(get_some_data2) # ys1 = yesterday_sales_rdd.collect() # print ys1 # print get_some_data(ys1) # 今天0点的数据 today_sales_rdd = sc.textFile(source_path_next).filter(filter_log_item2).map(get_some_data2) # print today_sales_rdd.collect() # 每小时的商品数,只要有日志记录 gbk = yesterday_sales_rdd.groupByKey() # from collections import Counter def couter(x, y): k1 = x.keys() k2 = y.keys() s1 = set(k1 + k2) d1 = dict() for h in s1: d1[h] = x.get(h, 0) + y.get(h, 0) return d1 # hour_sale_num = gbk.keys().map(lambda dd: {dd[1].hour: 1}).reduce(lambda x, y: dict(Counter(x) + Counter(y))) hour_sale_num = gbk.keys().map(lambda dd: {dd[1].hour: 1}).reduce(lambda x, y: couter(x, y)) print hour_sale_num # 在售商品个数 spu_sum = yesterday_sales_rdd.count() # # s = ‘2016-07-04 00:15:02‘ # datetime.strptime(s, "%Y-%m-%d %H:%M:%S") # 销售汇总,昨天和今天 all_data = yesterday_sales_rdd.union(today_sales_rdd) # print ‘all_data.first()‘,all_data.collect() sales_rdd = all_data.map(lambda x: (x[0][0], x[1])) print sales_rdd.first() grouped_sales_rdd = sales_rdd.groupByKey().mapValues(list) print grouped_sales_rdd.first() # start_date, sales, price, cat, brand, orig_price def date_dict(lst): d2 = dict() for i, x in enumerate(lst): # print x # print x[0].hour d2[x[0]] = x[1:] return d2 # date_dict(sale_1[1]) def sale_count24(dd): sale_list = date_dict(dd) # print sale_list sort = sorted(sale_list) sales = [] for x in reversed(range(24)): # print x td1 = ytday + timedelta(hours=x) td2 = ytday + timedelta(hours=x - 1) # 前1小时 try: indx = sort.index(td1) td2 = sort[indx - 1] # 列表的前一项 s1 = int(sale_list[td1][0]) p1 = float(sale_list[td1][1]) s2 = int(sale_list[td2][0]) p2 = float(sale_list[td2][1]) sale = s1 - s2 sale = sale if sale > 0 else 0 money = s1 * p1 - s2 * p2 money = money if money > 0 else 0 sales.append((sale, money)) continue except: # 不在列表内 sale = 0 money = 0.0 sales.append((sale, money)) continue return list(reversed(sales)) # 要倒置一下 # gsr=grouped_sales_rdd.collect() # example1=gsr[1][1] # for x in example1: # print x # print sale_count24(example1) hour_sales_rdd = grouped_sales_rdd.map(lambda (a, b): (a, sale_count24(b))) def sum2(a, b): if len(a) == 2: a = a[1] if len(b) == 2: b = b[1] su = [] for x, y in zip(a, b): su.append((x[0] + y[0], x[1] + y[1])) return su hour_sales = hour_sales_rdd.reduce(lambda a, b: sum2(a, b)) # print hour_sales # 每小时,累积销量,累积销售额 num = [x[0] for x in hour_sales] money = [x[1] for x in hour_sales] sale_num_sum = sum(num) # sum(money) / float(len(money)) sale_money_sum = sum(money) # SPU 每小时销售额>0的商品个数 spu_sale = hour_sales_rdd.map(lambda a: [1 if x[0] > 0 else 0 for x in a]).reduce( lambda a, b: [x + y for x, y in zip(a, b)]) print ‘spu_sale‘, spu_sale, len(spu_sale) # 平均价格 # yesterday_sales_rdd.first() # 取出现价 now_price_rdd = yesterday_sales_rdd.map(lambda x: x[1][2]) # now_price_rdd.first() # 取出原价 org_price_rdd = yesterday_sales_rdd.map(lambda x: x[1][-1]) # org_price_rdd.first() avg_price = now_price_rdd.reduce(lambda a, b: (a + b) / 2.0) avg_org_price = org_price_rdd.reduce(lambda a, b: (a + b) / 2.0) # 折扣 discount = avg_price / avg_org_price # discount # SPU 每小时销售额>0的商品个数 def sale_item_to_hour(x): idd = x[0] salist = x[1] hd = dict() for h in range(24): if salist[h][0] > 0: # 有销量 hd[h] = [idd] return hd def reduce_hour_dict_sum(x, y): # print Counter(x),Counter(y) nd = dict() for h in range(24): nd[h] = x.get(h, []) + y.get(h, []) return nd # reduce_hour_dict_sum(ssc[6],ssc[10]) spu_sale = hour_sales_rdd.map(sale_item_to_hour).reduce(lambda x, y: reduce_hour_dict_sum(x, y)) ##每小时销售商品的个数 spu_sale_num = dict() for h, lst in spu_sale.items(): spu_sale_num[h] = len(lst) set1 = reduce(set.union, [set(x) for x in spu_sale.values()]) spu_sale_sum = len(set1) # 商品去重后的总数 # json js = dict() for hour in range(24): sale_num = hour_sales[hour][0] sale_money = hour_sales[hour][1] spu = hour_sale_num.get(hour, 0) spu_sale_hour = spu_sale_num.get(hour, 0) js[hour] = {‘sales‘: sale_num, ‘gmv‘: sale_money, ‘spu‘: spu, ‘spu_saled‘: spu_sale_hour} # spu_sale_sum ,set 去重 js[‘sum‘] = {‘sales_sum‘: sale_num_sum, ‘gmv_sum‘: sale_money_sum, ‘spu_sum‘: spu_sum, ‘spu_saled_sum‘: spu_sale_sum, ‘avg_price‘: avg_price, ‘avg_org_price‘: avg_org_price, ‘discount‘: discount } print ‘----------------------------‘ print js """ js={0: {‘spu‘: 3, ‘spu_saled‘: 2, ‘sales‘: 4394, ‘gmv‘: 1739302.0}, 1: {‘spu‘: 3, ‘spu_saled‘: 2, ‘sales‘: 8048, ‘gmv‘: 716863.0}, 2: {‘spu‘: 3, ‘spu_saled‘: 3, ‘sales‘: 3, ‘gmv‘: 798.0}, 3: {‘spu‘: 3, ‘spu_saled‘: 2, ‘sales‘: -8046, ‘gmv‘: -715563.0}, 4: {‘spu‘: 0, ‘spu_saled‘: 0, ‘sales‘: 0, ‘gmv‘: 0.0}, 5: {‘spu‘: 0, ‘spu_saled‘: 0, ‘sales‘: 0, ‘gmv‘: 0.0}, 6: {‘spu‘: 3, ‘spu_saled‘: 3, ‘sales‘: 8057, ‘gmv‘: 718666.0}, 7: {‘spu‘: 3, ‘spu_saled‘: 2, ‘sales‘: -2480, ‘gmv‘: -1687642.0}, 8: {‘spu‘: 3, ‘spu_saled‘: 2, ‘sales‘: 560, ‘gmv‘: 1632733.0}, 9: {‘spu‘: 3, ‘spu_saled‘: 1, ‘sales‘: -8611, ‘gmv‘: -2349332.0}, 10: {‘spu‘: 3, ‘spu_saled‘: 2, ‘sales‘: 8055, ‘gmv‘: 716835.0}, 11: {‘spu‘: 3, ‘spu_saled‘: 1, ‘sales‘: -7496, ‘gmv‘: 916460.0}, 12: {‘spu‘: 3, ‘spu_saled‘: 1, ‘sales‘: 5571, ‘gmv‘: -972225.0}, 13: {‘spu‘: 0, ‘spu_saled‘: 0, ‘sales‘: 0, ‘gmv‘: 0.0}, 14: {‘spu‘: 0, ‘spu_saled‘: 0, ‘sales‘: 0, ‘gmv‘: 0.0}, 15: {‘spu‘: 0, ‘spu_saled‘: 0, ‘sales‘: 0, ‘gmv‘: 0.0}, 16: {‘spu‘: 0, ‘spu_saled‘: 0, ‘sales‘: 0, ‘gmv‘: 0.0}, 17: {‘spu‘: 0, ‘spu_saled‘: 0, ‘sales‘: 0, ‘gmv‘: 0.0}, 18: {‘spu‘: 0, ‘spu_saled‘: 0, ‘sales‘: 0, ‘gmv‘: 0.0}, 19: {‘spu‘: 0, ‘spu_saled‘: 0, ‘sales‘: 0, ‘gmv‘: 0.0}, 20: {‘spu‘: 0, ‘spu_saled‘: 0, ‘sales‘: 0, ‘gmv‘: 0.0}, 21: {‘spu‘: 0, ‘spu_saled‘: 0, ‘sales‘: 0, ‘gmv‘: 0.0}, 22: {‘spu‘: 3, ‘spu_saled‘: 0, ‘sales‘: -8055, ‘gmv‘: -716895.0}, 23: {‘spu‘: 0, ‘spu_saled‘: 0, ‘sales‘: 0, ‘gmv‘: 0.0}, ‘sum‘: {‘discount‘: 0.639766081882126, ‘gmv_sum‘: 0.0, ‘avg_org_price‘: 244.28571429350995, ‘sales_sum‘: 0, ‘avg_price‘: 156.28571429333533, ‘spu_sum‘: 36, ‘spu_saled_sum‘: 3}} """ # 保存 # stat_rdd = sc.parallelize(json.dumps(str(js),ensure_ascii=False),numSlices=1) stat_rdd = sc.parallelize([str(js)]) # stat_rdd.saveAsTextFile(target_stat_path) # stat_rdd.repartition(1).saveAsTextFile(target_stat_path) # stat_rdd.map(json.dumps).saveAsPickleFile(target_stat_path) stat_rdd.repartition(1).saveAsTextFile(target_stat_path) print ‘Saved:‘, target_stat_path # from pyspark import SQLContext # languagesDF= SQLContext.load(js) # languagesDF.write.json(target_stat_path)
时间: 2024-10-17 19:11:04