用sparkR, 分析上亿条订单数据的脚本。(增长黑客实践)

上周我们这个10人的小团队开发的推荐拉新系统, 日拉新人数已接近4万人。过去几个月这个系统从无到有, 拉新从日增几千稳步增长到日增几万, 同事们几个月来,每天工作13个小时以上,洗澡时间都没有, 有时就住在公司, 回家怕吵到家人,只能睡客厅地板, 周日也不能保证休息。 大家的全力投入,不懈努力才能有这个结果。 非常感慨团队产生的的化学反应, 和惊人的生产效率。 产品稳定后,最近全面转入大数据分析, 和机器学习阶段, 开始做真正的增长黑客实践。 spark, R, scala都是刚刚开始深入地学习,没几天, 还好有数据, 学的快!, 不休息, 连做梦都是在做分析数据的工作, 日进千里啊。

刚开始用spark-sql的时候, 如果做一个复杂的查询,写一长串sql, 谁都看不懂,拆成小sql, 就要保存中间结果, 效率低下。 用了几天后, 开始切入sparkR和Scala , 发现效率比直接用spark-sql高太多了, 代码可读性也强太多。此外善用cahe,也可以有效提高效率。

下面都是干货。废话不多少, 只希望帮到你。

工作目标: 分析一下新手券分享的拉新效果和人数,需要对最近15日的订单大概2亿多条订单纪录, 以及300万左右的领券纪录, 几十万笔的返利信息做全库查询 , 这在msql上是不可能完成的任务。 对spark+hive来说, 也很耗时, 但一个小时内可以搞定。

用R写了一下查询脚本, 稍后准备改成scala的。 两者都是调用spark api, 区别应该只在语法上。

用15个节点的spark跑这个查询脚本, 大概需要半个多小时才能出来结果。代码是最完整,最准确的文档, 提纲挈领的总结以后得空再总结。

############################statistics.R################################

#领券日期参数, 修改统计日参数
date_parameter <- "2016-07-11"
dayCount_parameter = 1

hiveContext <- sparkRHive.init(sc)
sql(hiveContext, "use honeycomb_bh_db")

#通过hiveSql 获得想要的并集集合并且缓存下来 sql date_add
##程序执行阶段1: 数据准备。。。。。
acquired_users_sql <-"select * from sc_t_acquire_record where sc_t_acquire_record.year=2016 and sc_t_acquire_record.month=07 and to_date(ct_time)=‘STARTDATE‘"
all_order_sql <- "select * from sc_t_order_all_info As a where a.year=2016 and a.month=07 and to_date(a.create_time)>=‘STARTDATE‘ and to_date(a.create_time)<=date_add(date(‘STARTDATE‘),14) and product_id=210"
rebate_order_sql <- "select * from sc_t_order_rebate_info As a where a.year=2016 and a.month=07 and to_date(a.create_time)>=‘STARTDATE‘ and to_date(a.create_time)<=date_add(date(‘STARTDATE‘),7) and product_id=210"

acquired_users_sql<-sub(pattern=‘STARTDATE‘, replacement=date_parameter, acquired_users_sql)
all_order_sql<-gsub(pattern=‘STARTDATE‘, replacement=date_parameter, all_order_sql)
rebate_order_sql<-gsub(pattern=‘STARTDATE‘, replacement=date_parameter, rebate_order_sql)

#当天领券绑定的用户集合
acquired_users <-sql(hiveContext,acquired_users_sql)
cache(acquired_users)

#15日内的全订单集合
all_orders <-sql(hiveContext,all_order_sql)

#7日内返利的订单集合
rebated_orders <- sql(hiveContext,rebate_order_sql)

#第0日领券后到14日结束前, 有打车纪录的
acquired_users_with_orders<-join(acquired_users,all_orders, acquired_users$presentee_mobile==all_orders$passenger_phone, "left_outer")
acquired_users_with_orders <- filter(acquired_users_with_orders, "passenger_phone is not null")

mobiles_acquired_users <-distinct(select(acquired_users_with_orders, "presentee_mobile"))
#write.json(acquired_users_with_orders, "file:///home/rd/spark/bin/20160711_users_convertion.json")

#第0日领券后~第7日结束前,被返利的领券用户
orders_rebated_within_8days <- join(acquired_users,rebated_orders, acquired_users$presentee_mobile==rebated_orders$passenger_phone, "left_outer")
orders_rebated_within_8days <- filter(orders_rebated_within_8days, "passenger_phone is not null")

cache(orders_rebated_within_8days)
results <- data.frame("name" = c("frist"), "value" = c(0),stringsAsFactors=FALSE)

##程序执行阶段2: 开始利用spark进行集合运算。。。。。

#第0日到第7日结束前, 券有效期内打过车的领券用户订单数据
rules<- "to_date(a.create_time)>=‘STARTDATE‘ and to_date(a.create_time)<=date_add(date(‘STARTDATE‘),7)"
rules<-gsub(pattern=‘STARTDATE‘, replacement=date_parameter, rules)
orders_within_8days = filter(acquired_users_with_orders, rules)
mobiles_with_orders_within_8days <- distinct(select(orders_within_8days, "presentee_mobile"))

#第8日到第14日结束前, 券过期后, 打过车的领券用户订单数据
rules<- "to_date(a.create_time)>=date_add(date(‘STARTDATE‘),8) and to_date(a.create_time)<=date_add(date(‘STARTDATE‘),15)"
rules<-gsub(pattern=‘STARTDATE‘, replacement=date_parameter, rules)
orders_after_8days = filter(acquired_users_with_orders, rules)
mobiles_with_orders_after_8days <- distinct(select(orders_after_8days, "presentee_mobile"))

#第0日到第7日结束前, 被返利信息纪录的领券用户
mobiles_user_reabted <-distinct(select(orders_rebated_within_8days, "presentee_mobile"))

#券0~7天有效期内首单后未被返利的用户
mobiles_my_team_losted <- except(mobiles_with_orders_within_8days, mobiles_user_reabted)

#第8日券有效期过后, 14日内, 有成交纪录被sic统计方法, 统计进来的用户
mobiles_after_7days_countedBySicheng <-except(mobiles_with_orders_after_8days, mobiles_user_reabted)

#券0~7天有效期内首单后未被返利的用户, 第8日到第14日成单, 被sic统计转化的用户
mobiles_my_team_losted_countedBySicheng <-intersect(mobiles_my_team_losted, mobiles_with_orders_after_8days)

#第8日券有效期过后, 14日内, 思成没有统计的首单用户
mobiles_both_losted <- except(mobiles_my_team_losted, mobiles_after_7days_countedBySicheng)

#券0~7天有效期内首单后未被返利, 后7天没打车的用户
mobile_first_order_withno_coupon_no_futher_order_after_7days <- except(mobiles_my_team_losted, mobiles_with_orders_after_8days)

#7日内没打车, 后7日打车的用户
mobiles_with_order_invoked_coupon <- except(mobiles_with_orders_after_8days, mobiles_with_orders_within_8days)

#领券后15天里打车的用户, 由于业务特性,可以重复领券 这个存在重复统计。
mobiles_converted = acquired_users_with_orders

#程序运行阶段: 输出结果。。。
results<-rbind(results, c("领新手券的用户数量", nrow(distinct(select(acquired_users, "presentee_mobile")))))
results<-rbind(results, c("领新手券后15日转化的用户数量", nrow(mobiles_acquired_users)))
results<-rbind(results, c("领新手券7日内打车用券转化的用户数量", nrow(mobiles_user_reabted)))
results<-rbind(results, c("新手券有效期过期后7日内打车转化用户", nrow(mobiles_after_7days_countedBySicheng)))
results<-rbind(results, c("sic统计方法统计的转化用户数", nrow(mobiles_user_reabted)+nrow(mobiles_after_7days_countedBySicheng)))
results<-rbind(results, c("7日内首单未用新手券的人数", nrow(mobiles_my_team_losted)))
results<-rbind(results, c("7日内首单未用新手券, 后7日内没打车的人数", nrow(mobiles_both_losted)))
results<-rbind(results, c("7日内首单未用新手券, 后7日内有打车的人数", nrow(mobiles_my_team_losted_countedBySicheng)))

results<-rbind(results, c("领新手券后7日内未打车, 后7日又打车的人数", nrow(mobiles_with_order_invoked_coupon)))
results

时间: 2024-12-12 12:39:07

用sparkR, 分析上亿条订单数据的脚本。(增长黑客实践)的相关文章

DBA_在执行1亿条资料插入长脚本如何判断需耗时多久(案例)

1 create table bxj_xla_journals(  ledger_name varchar2(30),  account varchar2(240),  account_description varchar2(240),  accounting_date date,  accounted_dr number,  accounted_cr number,  gl_transfer_status varchar2(30),  period_name varchar2(15),  j

每天近百亿条用户数据,携程大数据高并发应用架构涅槃

互联网二次革命的移动互联网时代,如何吸引用户.留住用户并深入挖掘用户价值,在激烈的竞争中脱颖而出,是各大电商的重要课题.通过各类大数据对用户进行研究,以数据驱动产品是解决这个课题的主要手段,携程的大数据团队也由此应运而生;经过几年的努力,大数据的相关技术为业务带来了惊人的提升与帮助. 以基础大数据的用户意图服务为例,通过将广告和栏位的"千人一面"变为"千人千面",在提升用户便捷性,可用性,降低费力度的同时,其转化率也得到了数倍的提升,体现了大数据服务的真正价值. 在

服务上亿用户,中国结算新一代数据集市技术实践

作者介绍: 卢向澄 金融科技领域十余年工作经验,目前在中国证券登记结算公司从事技术架构工作,专注于技术中台.云平台.大数据平台等领域. 1. 背景介绍 我国股市约有1.2亿散户,直接关乎上亿家庭.数亿人切身利益,保护好投资者尤其是中小投资者的合法权益,是资本市场工作人民性的具体体现,也是服务实体经济的应有之义.党的十九大明确提出"必须坚持以人民为中心的发展思想".中国证监会有关负责人表示,要认真贯彻落实十九大精神和党中央.国务院关于资本市场建设的一系列决策部署,加快推动形成融资功能完备

海量大数据大屏分析展示一步到位:DataWorks数据服务对接DataV最佳实践

概述数据服务(https://ds-cn-shanghai.data.aliyun.com) 是DataWorks产品家族的一员,提供了快速将数据表生成API的能力,通过可视化的向导,一分钟"零代码"就可以生成API,让API开发从未有过如此便捷!同时支持自定义API查询SQL功能,对您的个性化复杂查询逻辑支持照样不在话下. DataWorks数据服务提供HTTP API服务,采用Serverless架构,您只需关注API本身的查询逻辑,无需关心运行环境等基础设施,零运维成本. Dat

PLSQL_在执行1亿条资料插入长脚本如何判断需耗时多久v$sql / v$sqltext / v$sqlarea / v$sql_plan(案例)(监控SQL效率)

2014-08-27 BaoXinjian 一.摘要 当执行耗时时间较长的PLSQL时,有时需要查看程式运行的进度,目前已经处理了多少资料,还需处理多上资料 如果程式中专门的Log Module管控这一块,问题就不太大 如果没有这个这块的管控,可能就需要通过跟踪session,并查询动态性能视图,大概猜测出系统的运行情况,特别是undo表空间的变化 二.案例 案例: Step1. 创建测试表bxj_test create table bxj_test ( invoice_id number, i

实战:上亿数据如何秒查(转)

最近在忙着优化集团公司的一个报表.优化完成后,报表查询速度有从半小时以上(甚至查不出)到秒查的质变.从修改SQL查询语句逻辑到决定创建存储 过程实现,花了我3天多的时间,在此总结一下,希望对朋友们有帮助. 数据背景 首先项目是西门子中国在我司实施部署的MES项目,由于项目是在产线上运作(3 years+),数据累积很大.在项目的数据库中,大概上亿条数据的表有5个以上,千万级数据的表10个以上,百万级数据的表,很多... (历史问题,当初实施无人监管,无人监控数据库这块的性能问题.ps:我刚入职不

实战:上亿数据如何秒查

最近在忙着优化集团公司的一个报表.优化完成后,报表查询速度有从半小时以上(甚至查不出)到秒查的质变.从修改SQL查询语句逻辑到决定创建存储过程实现,花了我3天多的时间,在此总结一下,希望对朋友们有帮助. 数据背景 首先项目是西门子中国在我司实施部署的MES项目,由于项目是在产线上运作(3 years+),数据累积很大.在项目的数据库中,大概上亿条数据的表有5个以上,千万级数据的表10个以上,百万级数据的表,很多- (历史问题,当初实施无人监管,无人监控数据库这块的性能问题.ps:我刚入职不久-)

hadoop、spark、hive、solr、es与YDB在车辆即席分析上的对比分析

自2012年以来,公安部交通管理局在全国范围内推广了机动车缉查布控系统(简称卡口系统),通过整合共享各地车辆智能监测记录等信息资源,建立了横向联网.纵向贯通的全国机动车缉查布控系统,实现了大范围车辆缉查布控和预警拦截.车辆轨迹.交通流量分析研判.重点车辆布控.交通违法行为甄别查处及侦破涉车案件等应用.在侦破肇事逃逸案件.查处涉车违法行为.治安防控以及反恐维稳等方面发挥着重要作用. 随着联网单位和接入卡口的不断增加,各省市区部署的机动车缉查布控系统积聚了海量的过车数据.截至目前,全国32个省(区.

MySQL 如何准备一亿条记录的表来测试

曾经一个朋友问我如何快速的在线往一个大表里面添加一个字段或者修改一个字段的长度,mysql版本是5.6,所以就准备在测试环境准备一个一亿条记录的表,然后来实际测试下到底哪种方式比较快,先来开始准备一亿条记录的表.   我线上有上亿条记录的表,但是很多网上朋友都没有,那么我这里就实践了一条办法,来实现自己构造一亿条数据记录的表.实现思路就是先建一张通用的20个字段左右的用户表,然后写一个存储过程,不停的往这个表里面写数据,while循环写上一亿次,这样就形成了一张一亿条记录的表出来.     1.