spark 累加历史+统计全部

spark 累加历史主要用到了窗口函数,而进行全部统计,则需要用到rollup函数

1  应用场景:



2 原始数据:

product_code |event_date |duration |
1438         |2016-05-13 |165      |
1438         |2016-05-14 |595      |
1438         |2016-05-15 |105      |
1629         |2016-05-13 |12340    |
1629         |2016-05-14 |13850    |
1629         |2016-05-15 |227      |

3 业务场景实现

3.1 业务场景1:累加历史:


3.1.1 spark-sql实现

//spark sql 使用窗口函数累加历史数据
  select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc) as sum_duration
  from userlogs_date
| 1438|2016-05-13|         165|
| 1438|2016-05-14|         760|
| 1438|2016-05-15|         865|
| 1629|2016-05-13|       12340|
| 1629|2016-05-14|       26190|
| 1629|2016-05-15|       26417|

3.1.2 dataframe实现

//使用Column提供的over 函数,传入窗口操作
import org.apache.spark.sql.expressions._
val first_2_now_window = Window.partitionBy("pcode").orderBy("event_date")

| 1438|2016-05-13|         165|
| 1438|2016-05-14|         760|
| 1438|2016-05-15|         865|
| 1629|2016-05-13|       12340|
| 1629|2016-05-14|       26190|
| 1629|2016-05-15|       26417|

 3.1.3 扩展 累加一段时间范围内

实际业务中的累加逻辑远比上面复杂,比如,累加之前N天,累加前N天到后N天等等。以下我们来实现: 累加历史所有:

select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc) as sum_duration from userlogs_dateselect pcode,event_date,sum(duration) over (partition by pcode order by event_date asc rows between unbounded preceding and current row) as sum_duration from userlogs_dateWindow.partitionBy("pcode").orderBy("event_date").rowsBetween(Long.MinValue,0)Window.partitionBy("pcode").orderBy("event_date")上边四种写法完全相等 累加N天之前,假设N=3
select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc rows between 3 preceding and current row) as sum_duration from userlogs_dateWindow.partitionBy("pcode").orderBy("event_date").rowsBetween(-3,0) 累加前N天,后M天: 假设N=3 M=5 

select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc rows between 3 preceding and 5 following ) as sum_duration from userlogs_dateWindow.partitionBy("pcode").orderBy("event_date").rowsBetween(-3,5) 累加该分区内所有行
select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc rows between unbounded preceding and unbounded following ) as sum_duration from userlogs_date


preceding:用于累加前N行(分区之内)。若是从分区第一行头开始,则为 unbounded。 N为:相对当前行向前的偏移量following :与preceding相反,累加后N行(分区之内)。若是累加到该分区结束,则为 unbounded。N为:相对当前行向后的偏移量current row:顾名思义,当前行,偏移量为0
说明:上边的前N,后M,以及current row均会累加该偏移量所在行3.1.3.4 实测结果
累加历史:分区内当天及之前所有 写法1:select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc) as sum_duration from userlogs_date

| 1438|2016-05-13|         165|
| 1438|2016-05-14|         760|
| 1438|2016-05-15|         865|
| 1629|2016-05-13|       12340|
| 1629|2016-05-14|       26190|
| 1629|2016-05-15|       26417|
累加历史:分区内当天及之前所有 写法2:select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc rows between unbounded preceding and current row) as sum_duration from userlogs_date
| 1438|2016-05-13|         165|
| 1438|2016-05-14|         760|
| 1438|2016-05-15|         865|
| 1629|2016-05-13|       12340|
| 1629|2016-05-14|       26190|
| 1629|2016-05-15|       26417|
累加当日和昨天:select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc rows between 1 preceding and current row) as sum_duration from userlogs_date
| 1438|2016-05-13|         165|
| 1438|2016-05-14|         760|
| 1438|2016-05-15|         700|
| 1629|2016-05-13|       12340|
| 1629|2016-05-14|       26190|
| 1629|2016-05-15|       14077|
累加当日、昨日、明日:select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc rows between 1 preceding and 1 following ) as sum_duration from userlogs_date
| 1438|2016-05-13|         760|
| 1438|2016-05-14|         865|
| 1438|2016-05-15|         700|
| 1629|2016-05-13|       26190|
| 1629|2016-05-14|       26417|
| 1629|2016-05-15|       14077|
累加分区内所有:当天和之前之后所有:select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc rows between unbounded preceding and unbounded following ) as sum_duration from userlogs_date
| 1438|2016-05-13|         865|
| 1438|2016-05-14|         865|
| 1438|2016-05-15|         865|
| 1629|2016-05-13|       26417|
| 1629|2016-05-14|       26417|
| 1629|2016-05-15|       26417|
3.2 业务场景2:统计全部

3.2.1 spark sql实现

//spark sql 使用rollup添加all统计
  select pcode,event_date,sum(duration) as sum_duration
  from userlogs_date_1
  group by pcode,event_date with rollup
  order by pcode,event_date

| null|      null|       27282|
| 1438|      null|         865|
| 1438|2016-05-13|         165|
| 1438|2016-05-14|         595|
| 1438|2016-05-15|         105|
| 1629|      null|       26417|
| 1629|2016-05-13|       12340|
| 1629|2016-05-14|       13850|
| 1629|2016-05-15|         227|

3.2.2 dataframe函数实现

df_userlogs_date.rollup($"pcode", $"event_date").agg(sum($"duration")).orderBy($"pcode", $"event_date")

| null|      null|        27282|
| 1438|      null|          865|
| 1438|2016-05-13|          165|
| 1438|2016-05-14|          595|
| 1438|2016-05-15|          105|
| 1629|      null|        26417|
| 1629|2016-05-13|        12340|
| 1629|2016-05-14|        13850|
| 1629|2016-05-15|          227|



def rollup(col1: String, cols: String*): GroupedData
Create a multi-dimensional rollup for the current DataFrame using the specified columns, so we can run aggregation on them. See GroupedData for all the available aggregate functions.
This is a variant of rollup that can only group by existing columns using column names (i.e. cannot construct expressions).

// Compute the average for all numeric columns rolluped by department and group.
df.rollup("department", "group").avg()

// Compute the max age and average salary, rolluped by department and gender.
df.rollup($"department", $"gender").agg(Map(
  "salary" -> "avg",
  "age" -> "max"
def rollup(cols: Column*): GroupedData
Create a multi-dimensional rollup for the current DataFrame using the specified columns, so we can run aggregation on them. See GroupedData for all the available aggregate functions.
df.rollup($"department", $"group").avg()

// Compute the max age and average salary, rolluped by department and gender.
df.rollup($"department", $"gender").agg(Map(
  "salary" -> "avg",
  "age" -> "max"
def over(window: WindowSpec): Column
Define a windowing column.

val w = Window.partitionBy("name").orderBy("id")
  sum("price").over(w.rangeBetween(Long.MinValue, 2)),
  avg("price").over(w.rowsBetween(0, 4))
