beibei_sum_spark(python)

# -*-coding:utf8-*-#
__author__ = ‘hash‘
"""
create time:16/7/5 15:42
"""
from datetime import datetime, timedelta
# os.environ[‘SPARK_HOME‘] = "/Users/play/software/spark"  # 绝对路径
# sys.path.append("/Users/play/software/spark/python")
# print os.environ[‘SPARK_HOME‘]
from pyspark import SparkContext, SparkConf
# Initialize SparkContext
# sc = SparkContext(‘local‘)
conf = SparkConf().setAppName("The GMV Sum BeiBei")
conf.set(‘spark.logConf‘, False)
sc = SparkContext(conf=conf)
today_str=‘2016-07-05‘
today = datetime.strptime(today_str, "%Y-%m-%d")
# today = datetime.today()
today_str = today.strftime("%Y-%m-%d")
# 指定日期,前一天
ytday = today - timedelta(days=1)
ytday_str = ytday.strftime("%Y-%m-%d")
base_path = "hdfs://master:9000/super_db/"
category = "beibei-com"
# source_path_current=‘/Users/play/TEMP/7-4/log/2016-07-03/beibei-com_2016-07-03*‘
# source_path_current = ‘/Users/play/TEMP/7-4/log/test/test.txt‘
# source_path_next = ‘/Users/play/TEMP/7-4/log/2016-07-04/beibei-com_2016-07-04-00*‘
# yesterday
# source_path_current = ‘/super_db/raw_db/‘ + category + ‘/‘ + category + ‘_item/2016/‘ + ytday_str + ‘/‘ + category + ‘_‘ + ytday_str + ‘*‘
# today
# source_path_next = ‘/super_db/raw_db/‘ + category + ‘/‘ + category + ‘_item/2016/‘ + today_str + ‘/‘ + category + ‘_‘ + today_str + ‘-00*‘
#
source_path_current = ‘/super_db/raw_db/beibei/beibei_item/2016/beibei_item_07_04.txt‘
source_path_next = ‘/super_db/raw_db/beibei/beibei_item/2016/beibei_item_07_05.txt‘
# target_data_path = ‘/Users/play/TEMP/7-4/data/rs.txt‘
# target_stat_path = ‘/Users/play/TEMP/7-4/stat/rs.txt‘
# target_cat_stat_path = ‘/Users/play/TEMP/7-4/stat_by_cat/rs.txt‘
target_stat_path = base_path + "analytics_db/" + category + "/" + category + "_item/2016/" + ytday_str + "_stat2"
print target_stat_path
# def filter_log_item(x):
#     sp1 = x.split("#&#")
#     if len(sp1) == 10:
#         try:
#             x1 = datetime.strptime(sp1[9], ‘%Y-%m-%d %H:%M:%S‘)
#             return True
#         except:
#             return False
#             # return True
#     else:
#         return False
def filter_log_item2(x):
    sp1 = x.split("#&#")
    if len(sp1) == 28:
        try:
            x1 = datetime.strptime(sp1[-1], ‘%Y-%m-%d %H:%M:%S‘)
            return True
        except:
            return False
            # return True
    else:
        return False
def get_some_data(x):
    sp1 = x.split("#&#")
    # if len(sp1)<10:return None
    id = sp1[0].strip()
    cat = sp1[2].strip()
    brand = sp1[3].strip()
    price = float(sp1[4])
    orig_price = float(sp1[5])
    sales = int(sp1[6])
    start_date = sp1[8].split(":")[0].strip()
    start_date1 = datetime.strptime(start_date, "%Y-%m-%d %H")
    price= 0 if price==-1 else price
    orig_price= 0 if orig_price==-1 else orig_price
    sales= 0 if sales==-1 else sales
    return ((id, start_date1), (start_date1, sales, price, cat, brand, orig_price))
def get_some_data2(x):
    sp1 = x.split("#&#")
    # if len(sp1)<10:return None
    # id = sp1[0].strip()
    # cat = sp1[2].strip()
    # brand = sp1[3].strip()
    # price = float(sp1[4])
    # orig_price = float(sp1[5])
    # # sales = int(sp1[6])
    # start_date = sp1[8].split(":")[0].strip()
    # start_date1 = datetime.strptime(start_date, "%Y-%m-%d %H")
    id = sp1[3]
    start_date = sp1[27].split(":")[0].strip()
    start_date1 = datetime.strptime(start_date, "%Y-%m-%d %H")
    sales = sp1[8]
    price = sp1[6]
    orig_price = sp1[9]
    # stock = int(sp1[17])
    if "\N" == sales:sales = 0
    else:sales = int(sales)
    if "\N" == price:
        price = 0.0
    else:
        price = float(price)
    if "\N" == orig_price:
        orig_price = 0.0
    else:
        orig_price = float(orig_price)
    # return (id, (start_date, sales, stock, price))
    cat=""
    brand=""
    return ((id, start_date1), (start_date1, sales, price, cat, brand, orig_price))
# 昨天销售数据
yesterday_sales_rdd = sc.textFile(source_path_current).filter(filter_log_item2).map(get_some_data2)
# ys1 = yesterday_sales_rdd.collect()
# print ys1
# print get_some_data(ys1)
# 今天0点的数据
today_sales_rdd = sc.textFile(source_path_next).filter(filter_log_item2).map(get_some_data2)
# print today_sales_rdd.collect()
# 每小时的商品数,只要有日志记录
gbk = yesterday_sales_rdd.groupByKey()
# from collections import Counter
def couter(x, y):
    k1 = x.keys()
    k2 = y.keys()
    s1 = set(k1 + k2)
    d1 = dict()
    for h in s1:
        d1[h] = x.get(h, 0) + y.get(h, 0)
    return d1
# hour_sale_num = gbk.keys().map(lambda dd: {dd[1].hour: 1}).reduce(lambda x, y: dict(Counter(x) + Counter(y)))
hour_sale_num = gbk.keys().map(lambda dd: {dd[1].hour: 1}).reduce(lambda x, y: couter(x, y))
print hour_sale_num
# 在售商品个数
spu_sum = yesterday_sales_rdd.count()
#
# s = ‘2016-07-04 00:15:02‘
# datetime.strptime(s, "%Y-%m-%d %H:%M:%S")
# 销售汇总,昨天和今天
all_data = yesterday_sales_rdd.union(today_sales_rdd)
# print ‘all_data.first()‘,all_data.collect()
sales_rdd = all_data.map(lambda x: (x[0][0], x[1]))
print sales_rdd.first()
grouped_sales_rdd = sales_rdd.groupByKey().mapValues(list)
print grouped_sales_rdd.first()
# start_date, sales, price, cat, brand, orig_price
def date_dict(lst):
    d2 = dict()
    for i, x in enumerate(lst):
        #     print x
        #     print x[0].hour
        d2[x[0]] = x[1:]
    return d2
# date_dict(sale_1[1])
def sale_count24(dd):
    sale_list = date_dict(dd)
    #         print sale_list
    sort = sorted(sale_list)
    sales = []
    for x in reversed(range(24)):
        #     print x
        td1 = ytday + timedelta(hours=x)
        td2 = ytday + timedelta(hours=x - 1)  # 前1小时
        try:
            indx = sort.index(td1)
            td2 = sort[indx - 1]  # 列表的前一项
            s1 = int(sale_list[td1][0])
            p1 = float(sale_list[td1][1])
            s2 = int(sale_list[td2][0])
            p2 = float(sale_list[td2][1])
            sale = s1 - s2
            sale = sale if sale > 0 else 0
            money = s1 * p1 - s2 * p2
            money = money if money > 0 else 0
            sales.append((sale, money))
            continue
        except:
            # 不在列表内
            sale = 0
            money = 0.0
            sales.append((sale, money))
            continue
    return list(reversed(sales))  # 要倒置一下
# gsr=grouped_sales_rdd.collect()
# example1=gsr[1][1]
# for x in example1:
#     print x
# print sale_count24(example1)
hour_sales_rdd = grouped_sales_rdd.map(lambda (a, b): (a, sale_count24(b)))
def sum2(a, b):
    if len(a) == 2:
        a = a[1]
    if len(b) == 2:
        b = b[1]
    su = []
    for x, y in zip(a, b):
        su.append((x[0] + y[0], x[1] + y[1]))
    return su
hour_sales = hour_sales_rdd.reduce(lambda a, b: sum2(a, b))
# print hour_sales
# 每小时,累积销量,累积销售额
num = [x[0] for x in hour_sales]
money = [x[1] for x in hour_sales]
sale_num_sum = sum(num)
# sum(money) / float(len(money))
sale_money_sum = sum(money)
# SPU 每小时销售额>0的商品个数
spu_sale = hour_sales_rdd.map(lambda a: [1 if x[0] > 0 else 0 for x in a]).reduce(
    lambda a, b: [x + y for x, y in zip(a, b)])
print ‘spu_sale‘, spu_sale, len(spu_sale)
# 平均价格
# yesterday_sales_rdd.first()
# 取出现价
now_price_rdd = yesterday_sales_rdd.map(lambda x: x[1][2])
# now_price_rdd.first()
# 取出原价
org_price_rdd = yesterday_sales_rdd.map(lambda x: x[1][-1])
# org_price_rdd.first()
avg_price = now_price_rdd.reduce(lambda a, b: (a + b) / 2.0)
avg_org_price = org_price_rdd.reduce(lambda a, b: (a + b) / 2.0)
# 折扣
discount = avg_price / avg_org_price
# discount
# SPU 每小时销售额>0的商品个数
def sale_item_to_hour(x):
    idd = x[0]
    salist = x[1]
    hd = dict()
    for h in range(24):
        if salist[h][0] > 0:  # 有销量
            hd[h] = [idd]
    return hd
def reduce_hour_dict_sum(x, y):
    #     print Counter(x),Counter(y)
    nd = dict()
    for h in range(24):
        nd[h] = x.get(h, []) + y.get(h, [])
    return nd
# reduce_hour_dict_sum(ssc[6],ssc[10])
spu_sale = hour_sales_rdd.map(sale_item_to_hour).reduce(lambda x, y: reduce_hour_dict_sum(x, y))
##每小时销售商品的个数
spu_sale_num = dict()
for h, lst in spu_sale.items():
    spu_sale_num[h] = len(lst)
set1 = reduce(set.union, [set(x) for x in spu_sale.values()])
spu_sale_sum = len(set1)  # 商品去重后的总数
# json
js = dict()
for hour in range(24):
    sale_num = hour_sales[hour][0]
    sale_money = hour_sales[hour][1]
    spu = hour_sale_num.get(hour, 0)
    spu_sale_hour = spu_sale_num.get(hour, 0)
    js[hour] = {‘sales‘: sale_num, ‘gmv‘: sale_money, ‘spu‘: spu, ‘spu_saled‘: spu_sale_hour}
# spu_sale_sum ,set 去重
js[‘sum‘] = {‘sales_sum‘: sale_num_sum, ‘gmv_sum‘: sale_money_sum,
             ‘spu_sum‘: spu_sum, ‘spu_saled_sum‘: spu_sale_sum,
             ‘avg_price‘: avg_price, ‘avg_org_price‘: avg_org_price, ‘discount‘: discount
             }
print ‘----------------------------‘
print js
"""
js={0: {‘spu‘: 3, ‘spu_saled‘: 2, ‘sales‘: 4394, ‘gmv‘: 1739302.0}, 1: {‘spu‘: 3, ‘spu_saled‘: 2, ‘sales‘: 8048, ‘gmv‘: 716863.0}, 2: {‘spu‘: 3, ‘spu_saled‘: 3, ‘sales‘: 3, ‘gmv‘: 798.0}, 3: {‘spu‘: 3, ‘spu_saled‘: 2, ‘sales‘: -8046, ‘gmv‘: -715563.0}, 4: {‘spu‘: 0, ‘spu_saled‘: 0, ‘sales‘: 0, ‘gmv‘: 0.0}, 5: {‘spu‘: 0, ‘spu_saled‘: 0, ‘sales‘: 0, ‘gmv‘: 0.0}, 6: {‘spu‘: 3, ‘spu_saled‘: 3, ‘sales‘: 8057, ‘gmv‘: 718666.0}, 7: {‘spu‘: 3, ‘spu_saled‘: 2, ‘sales‘: -2480, ‘gmv‘: -1687642.0}, 8: {‘spu‘: 3, ‘spu_saled‘: 2, ‘sales‘: 560, ‘gmv‘: 1632733.0}, 9: {‘spu‘: 3, ‘spu_saled‘: 1, ‘sales‘: -8611, ‘gmv‘: -2349332.0}, 10: {‘spu‘: 3, ‘spu_saled‘: 2, ‘sales‘: 8055, ‘gmv‘: 716835.0}, 11: {‘spu‘: 3, ‘spu_saled‘: 1, ‘sales‘: -7496, ‘gmv‘: 916460.0}, 12: {‘spu‘: 3, ‘spu_saled‘: 1, ‘sales‘: 5571, ‘gmv‘: -972225.0}, 13: {‘spu‘: 0, ‘spu_saled‘: 0, ‘sales‘: 0, ‘gmv‘: 0.0}, 14: {‘spu‘: 0, ‘spu_saled‘: 0, ‘sales‘: 0, ‘gmv‘: 0.0}, 15: {‘spu‘: 0, ‘spu_saled‘: 0, ‘sales‘: 0, ‘gmv‘: 0.0}, 16: {‘spu‘: 0, ‘spu_saled‘: 0, ‘sales‘: 0, ‘gmv‘: 0.0}, 17: {‘spu‘: 0, ‘spu_saled‘: 0, ‘sales‘: 0, ‘gmv‘: 0.0}, 18: {‘spu‘: 0, ‘spu_saled‘: 0, ‘sales‘: 0, ‘gmv‘: 0.0}, 19: {‘spu‘: 0, ‘spu_saled‘: 0, ‘sales‘: 0, ‘gmv‘: 0.0}, 20: {‘spu‘: 0, ‘spu_saled‘: 0, ‘sales‘: 0, ‘gmv‘: 0.0}, 21: {‘spu‘: 0, ‘spu_saled‘: 0, ‘sales‘: 0, ‘gmv‘: 0.0}, 22: {‘spu‘: 3, ‘spu_saled‘: 0, ‘sales‘: -8055, ‘gmv‘: -716895.0}, 23: {‘spu‘: 0, ‘spu_saled‘: 0, ‘sales‘: 0, ‘gmv‘: 0.0}, ‘sum‘: {‘discount‘: 0.639766081882126, ‘gmv_sum‘: 0.0, ‘avg_org_price‘: 244.28571429350995, ‘sales_sum‘: 0, ‘avg_price‘: 156.28571429333533, ‘spu_sum‘: 36, ‘spu_saled_sum‘: 3}}
"""
# 保存
# stat_rdd = sc.parallelize(json.dumps(str(js),ensure_ascii=False),numSlices=1)
stat_rdd = sc.parallelize([str(js)])
# stat_rdd.saveAsTextFile(target_stat_path)
# stat_rdd.repartition(1).saveAsTextFile(target_stat_path)
# stat_rdd.map(json.dumps).saveAsPickleFile(target_stat_path)
stat_rdd.repartition(1).saveAsTextFile(target_stat_path)
print  ‘Saved:‘, target_stat_path
# from pyspark import SQLContext
# languagesDF= SQLContext.load(js)
# languagesDF.write.json(target_stat_path)
时间: 2024-10-17 19:11:04

beibei_sum_spark(python)的相关文章

Python学习1-Python和Pycharm的下载与安装

本文主要介绍Python的下载安装和Python编辑器Pycharm的下载与安装. 一.Python的下载与安装 1.下载 到Python官网上下载Python的安装文件,进入网站后显示如下图: 网速访问慢的话可直接在这里下载:python-2.7.11.amd64 在Downloads中有对应的支持的平台,这里我们是在Windows平台下运行,所以点击Windows,出现如下: 在这里显示了Python更新的所有版本,其中最上面两行分别是Python2.X和Python3.X对应的最后更新版本

Python——深入理解urllib、urllib2及requests(requests不建议使用?)

深入理解urllib.urllib2及requests            python Python 是一种面向对象.解释型计算机程序设计语言,由Guido van Rossum于1989年底发明,第一个公开发行版发行于1991年,Python 源代码同样遵循 GPL(GNU General Public License)协议[1] .Python语法简洁而清晰,具有丰富和强大的类库. urllib and urllib2 区别 urllib和urllib2模块都做与请求URL相关的操作,但

python学习_day26_面向对象之封装

1.私有属性 (1)动态属性 在python中用双下划线开头的方式将属性隐藏起来.类中所有双下划线开头的名称,如__x都会自动变形成:_类名__x的形式.这种自动变形的特点是: a.类中定义的__x只能在内部使用,如self.__x,引用的就是变形的结果.b.这种变形其实正是针对外部的变形,在外部是无法通过__x这个名字访问到的.c.在子类定义的__x不会覆盖在父类定义的__x,因为子类中变形成了:_子类名__x,而父类中变形成了:_父类名__x,即双下滑线开头的属性在继承给子类时,子类是无法覆

python面向对象知识点疏理

面向对象技术简介 类: 用来描述具有相同的属性和方法的对象的集合.它定义了该集合中每个对象所共有的属性和方法.对象是类的实例.class 类变量:类变量在整个实例化的对象中是公用的.类变量定义在类中且在函数体之外.类变量通常不作为实例变量使用. 数据成员:类变量或者实例变量用于处理类及其实例对象的相关的数据. 方法重写:如果从父类继承的方法不能满足子类的需求,可以对其进行改写,这个过程叫方法的覆盖,也称为方法的重写. 实例变量:定义在方法中的变量,只作用于当前实例的类. 继承:即一个派生类(de

python实现网页登录时的rsa加密流程

对某些网站的登录包进行抓包时发现,客户端对用户名进行了加密,然后传给服务器进行校验. 使用chrome调试功能断点调试,发现网站用javascript对用户名做了rsa加密. 为了实现网站的自动登录,需要模拟这个加密过程. 网上搜了下关于rsa加密的最简明的解释: rsa加密是非对称加密算法,该算法基于一个十分简单的数论事实:将两个大素数相乘十分容易,但那时想要对其乘积进行因式分解却极其困难,因此可以将乘积公开作为加密密钥,即公钥,而两个大素数组合成私钥.公钥是可发布的供任何人使用,私钥则为自己

Python中编码的详细讲解

看这篇文章前,你应该已经知道了为什么有编码,以及编码的种类情况 ASCII 占1个字节,只支持英文 GB2312 占2个字节,支持6700+汉字 GBK GB2312的升级版,支持21000+汉字 Shift-JIS 日本字符 ks_c_5601-1987 韩国编码 TIS-620 泰国编码 由于每个国家都有自己的字符,所以其对应关系也涵盖了自己国家的字符,但是以上编码都存在局限性,即:仅涵盖本国字符,无其他国家字符的对应关系.应运而生出现了万国码,他涵盖了全球所有的文字和二进制的对应关系, U

Python练习(一)

Python练习(一): 给一个不超过5位的正整数,判断其有几位,依次打印出个位.十位.百位.千位.万位的数字: num = int(input('please enter a number: '))   lst = [] for i in str(num):      lst.append(i) lenlst = len(lst) if num >= 1000:      if num >= 10000:          print('too big')     else:        

菜鸟学python之对象类型及运算

Python 中的变量不需要声明.每个变量在使用前都必须赋值,变量赋值以后该变量才会被创建. 在 Python 中,变量就是变量,它没有类型,我们所说的"类型"是变量所指的内存中对象的类型. 等号(=)用来给变量赋值. 1 变量赋值 1.1 单个变量赋值 >>> name="python" >>> print(name) python 1.2 多个变量赋值 >>> name=names="python&

开始我的Python爬虫学习之路

因为工作需要经常收集一些数据,我就想通过学爬虫来实现自动化完成比较重复的任务. 目前我Python的状况,跟着敲了几个教程,也算是懂点基础,具体比较深入的知识,是打算从做项目中慢慢去了解学习. 我是觉得如果一开始就钻细节的话,是很容易受到打击而放弃的,做点小项目让自己获得点成就感路才更容易更有信心走下去. 反正遇到不懂的就多查多问就对了. 知乎上看了很多关于入门Python爬虫的问答,给自己总结出了大概的学习方向. 基础: HTML&CSS,JOSN,HTTP协议(这些要了解,不太需要精通) R