Spark应用_PageView_UserView_HotChannel


Spark应用_PageView_UserView_HotChannel

一、PV

对某一个页面的访问量,在页面中进行刷新一次就是一次pv

PV {p1, (u1,u2,u3,u1,u2,u4…)} 对同一个页面的浏览量进行统计,用户可以重复


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57


public class PV_ANA {

public static void main(String[] args) {

SparkConf conf = new SparkConf()

.setAppName("PV_ANA")

.setMaster("local")

.set("spark.testing.memory", "2147480000");

JavaSparkContext sc = new JavaSparkContext(conf);

JavaRDD<String> logRDD = sc.textFile("f:/userLog");

String str = "View";

final Broadcast<String> broadcast = sc.broadcast(str);

pvAnalyze(logRDD, broadcast);

}

private static void pvAnalyze(JavaRDD<String> logRDD,

final Broadcast<String> broadcast) {

JavaRDD<String> filteredLogRDD = logRDD.filter

(new Function<String, Boolean>() {

private static final long serialVersionUID = 1L;

@Override

public Boolean call(String s) throws Exception {

String actionParam = broadcast.value();

String action = s.split("\t")[5];

return actionParam.equals(action);

}

});

JavaPairRDD<String, String> pariLogRDD = filteredLogRDD.mapToPair

(new PairFunction<String, String, String>() {

private static final long serialVersionUID = 1L;

@Override

public Tuple2<String, String> call(String s)

throws Exception {

String pageId = s.split("\t")[3];

return new Tuple2<String, String>(pageId, null);

}

});

pariLogRDD.groupByKey().foreach(new VoidFunction

<Tuple2<String, Iterable<String>>>() {

private static final long serialVersionUID = 1L;

@Override

public void call(Tuple2<String, Iterable<String>> tuple)

throws Exception {

String pageId = tuple._1;

Iterator<String> iterator = tuple._2.iterator();

long count = 0L;

while (iterator.hasNext()) {

iterator.next();

count++;

}

System.out.println("PAGEID:" + pageId + "\t PV_COUNT:" + count);

}

});

}

}

二、UV

UV {p1, (u1,u2,u3,u4,u5…)} 对一个页面有多少用户访问,用户不可以重复

【方式一】

【流程图】


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57


public class UV_ANA {

public static void main(String[] args) {

SparkConf conf = new SparkConf()

.setAppName("UV_ANA")

.setMaster("local")

.set("spark.testing.memory", "2147480000");

JavaSparkContext sc = new JavaSparkContext(conf);

JavaRDD<String> logRDD = sc.textFile("f:/userLog");

String str = "View";

final Broadcast<String> broadcast = sc.broadcast(str);

uvAnalyze(logRDD, broadcast);

}

private static void uvAnalyze(JavaRDD<String> logRDD,

final Broadcast<String> broadcast) {

JavaRDD<String> filteredLogRDD = logRDD.filter

(new Function<String, Boolean>() {

private static final long serialVersionUID = 1L;

@Override

public Boolean call(String s) throws Exception {

String actionParam = broadcast.value();

String action = s.split("\t")[5];

return actionParam.equals(action);

}

});

JavaPairRDD<String, String> pairLogRDD = filteredLogRDD.mapToPair

(new PairFunction<String, String, String>() {

private static final long serialVersionUID = 1L;

@Override

public Tuple2<String, String> call(String s) throws Exception {

String pageId = s.split("\t")[3];

String userId = s.split("\t")[2];

return new Tuple2<String, String>(pageId, userId);

}

});

pairLogRDD.groupByKey().foreach(new VoidFunction

<Tuple2<String, Iterable<String>>>() {

private static final long serialVersionUID = 1L;

@Override

public void call(Tuple2<String, Iterable<String>> tuple)

throws Exception {

String pageId = tuple._1;

Iterator<String> iterator = tuple._2.iterator();

Set<String> userSets = new HashSet<>();

while (iterator.hasNext()) {

String userId = iterator.next();

userSets.add(userId);

}

System.out.println("PAGEID:" + pageId + "\t " +

"UV_COUNT:" + userSets.size());

}

});

}

}

【方式二】

【流程图】


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62


public class UV_ANAoptz {

public static void main(String[] args) {

SparkConf conf = new SparkConf()

.setAppName("UV_ANAoptz")

.setMaster("local")

.set("spark.testing.memory", "2147480000");

JavaSparkContext sc = new JavaSparkContext(conf);

JavaRDD<String> logRDD = sc.textFile("f:/userLog");

String str = "View";

final Broadcast<String> broadcast = sc.broadcast(str);

uvAnalyzeOptz(logRDD, broadcast);

}

private static void uvAnalyzeOptz(JavaRDD<String> logRDD,

final Broadcast<String> broadcast) {

JavaRDD<String> filteredLogRDD = logRDD.filter

(new Function<String, Boolean>() {

private static final long serialVersionUID = 1L;

@Override

public Boolean call(String s) throws Exception {

String actionParam = broadcast.value();

String action = s.split("\t")[5];

return actionParam.equals(action);

}

});

JavaPairRDD<String, String> pairRDD = filteredLogRDD.mapToPair

(new PairFunction<String, String, String>() {

private static final long serialVersionUID = 1L;

@Override

public Tuple2<String, String> call(String s)

throws Exception {

String pageId = s.split("\t")[3];

String userId = s.split("\t")[2];

return new Tuple2<String, String>(pageId + "_" +

userId, null);

}

});

JavaPairRDD<String, Iterable<String>> groupUp2LogRDD = pairRDD.groupByKey();

Map<String, Object> countByKey = groupUp2LogRDD.mapToPair

(new PairFunction<Tuple2<String, Iterable<String>>,

String, String>() {

private static final long serialVersionUID = 1L;

@Override

public Tuple2<String, String> call(Tuple2<String,

Iterable<String>> tuple)

throws Exception {

String pu = tuple._1;

String[] spilted = pu.split("_");

String pageId = spilted[0];

return new Tuple2<String, String>(pageId, null);

}

}).countByKey();

Set<String> keySet = countByKey.keySet();

for (String key : keySet) {

System.out.println("PAGEID:" + key + "\tUV_COUNT:" +

countByKey.get(key));

}

}

}

三、热门版块下用户访问的数量

统计出热门版块中最活跃的top3用户。

【方式一】

【流程图】


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119


public class HotChannel {

public static void main(String[] args) {

SparkConf conf = new SparkConf()

.setAppName("HotChannel")

.setMaster("local")

.set("spark.testing.memory", "2147480000");

JavaSparkContext sc = new JavaSparkContext(conf);

JavaRDD<String> logRDD = sc.textFile("f:/userLog");

String str = "View";

final Broadcast<String> broadcast = sc.broadcast(str);

hotChannel(sc, logRDD, broadcast);

}

private static void hotChannel(JavaSparkContext sc, JavaRDD<String> logRDD,

final Broadcast<String> broadcast) {

JavaRDD<String> filteredLogRDD = logRDD.filter

(new Function<String, Boolean>() {

private static final long serialVersionUID = 1L;

@Override

public Boolean call(String v1) throws Exception {

String actionParam = broadcast.value();

String action = v1.split("\t")[5];

return actionParam.equals(action);

}

});

JavaPairRDD<String, String> channel2nullRDD = filteredLogRDD.mapToPair

(new PairFunction<String, String, String>() {

private static final long serialVersionUID = 1L;

@Override

public Tuple2<String, String> call(String s) throws Exception {

String channel = s.split("\t")[4];

return new Tuple2<String, String>(channel, null);

}

});

Map<String, Object> channelPVMap = channel2nullRDD.countByKey();

Set<String> keySet = channelPVMap.keySet();

List<SortObj> channels = new ArrayList<>();

for (String channel : keySet) {

channels.add(new SortObj(channel, Integer.valueOf

(channelPVMap.get(channel) + "")));

}

Collections.sort(channels, new Comparator<SortObj>() {

@Override

public int compare(SortObj o1, SortObj o2) {

return o2.getValue() - o1.getValue();

}

});

List<String> hotChannelList = new ArrayList<>();

for (int i = 0; i < 3; i++) {

hotChannelList.add(channels.get(i).getKey());

}

for (String channel : hotChannelList) {

System.out.println("channel:" + channel);

}

final Broadcast<List<String>> hotChannelListBroadcast =

sc.broadcast(hotChannelList);

JavaRDD<String> filterRDD = logRDD.filter(new Function<String, Boolean>() {

@Override

public Boolean call(String s) throws Exception {

List<String> hostChannels = hotChannelListBroadcast.value();

String channel = s.split("\t")[4];

String userId = s.split("\t")[2];

return hostChannels.contains(channel) && !"null".equals(userId);

}

});

JavaPairRDD<String, String> channel2UserRDD = filterRDD.mapToPair

(new PairFunction<String, String, String>() {

@Override

public Tuple2<String, String> call(String s)

throws Exception {

String[] splited = s.split("\t");

String channel = splited[4];

String userId = splited[2];

return new Tuple2<String, String>(channel, userId);

}

});

channel2UserRDD.groupByKey().foreach(new VoidFunction

<Tuple2<String, Iterable<String>>>() {

@Override

public void call(Tuple2<String, Iterable<String>> tuple)

throws Exception {

String channel = tuple._1;

Iterator<String> iterator = tuple._2.iterator();

Map<String, Integer> userNumMap = new HashMap<>();

while (iterator.hasNext()) {

String userId = iterator.next();

Integer count = userNumMap.get(userId);

if (count == null) {

count = 1;

} else {

count++;

}

userNumMap.put(userId, count);

}

List<SortObj> lists = new ArrayList<>();

Set<String> keys = userNumMap.keySet();

for (String key : keys) {

lists.add(new SortObj(key, userNumMap.get(key)));

}

Collections.sort(lists, new Comparator<SortObj>() {

@Override

public int compare(SortObj O1, SortObj O2) {

return O2.getValue() - O1.getValue();

}

});

System.out.println("HOT_CHANNEL:" + channel);

for (int i = 0; i < 3; i++) {

SortObj sortObj = lists.get(i);

System.out.println(sortObj.getKey() + "=="

+ sortObj.getValue());

}

}

});

}

}

【方式二】

【流程图】


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148


public class HotChannelOpz {

public static void main(String[] args) {

SparkConf conf = new SparkConf()

.setAppName("hotChannelOpz")

.setMaster("local")

.set("spark.testing.memory", "2147480000");

JavaSparkContext sc = new JavaSparkContext(conf);

JavaRDD<String> logRDD = sc.textFile("f:/userLog");

String str = "View";

final Broadcast<String> broadcast = sc.broadcast(str);

hotChannelOpz(sc, logRDD, broadcast);

}

private static void hotChannelOpz(JavaSparkContext sc, JavaRDD<String> logRDD,

final Broadcast<String> broadcast) {

JavaRDD<String> filteredLogRDD = logRDD.filter

(new Function<String, Boolean>() {

private static final long serialVersionUID = 1L;

@Override

public Boolean call(String v1) throws Exception {

String actionParam = broadcast.value();

String action = v1.split("\t")[5];

return actionParam.equals(action);

}

});

JavaPairRDD<String, String> channel2nullRDD = filteredLogRDD.mapToPair

(new PairFunction<String, String, String>() {

private static final long serialVersionUID = 1L;

@Override

public Tuple2<String, String> call(String val)

throws Exception {

String channel = val.split("\t")[4];

return new Tuple2<String, String>(channel, null);

}

});

Map<String, Object> channelPVMap = channel2nullRDD.countByKey();

Set<String> keySet = channelPVMap.keySet();

List<SortObj> channels = new ArrayList<>();

for (String channel : keySet) {

channels.add(new SortObj(channel, Integer.valueOf

(channelPVMap.get(channel) + "")));

}

Collections.sort(channels, new Comparator<SortObj>() {

@Override

public int compare(SortObj o1, SortObj o2) {

return o2.getValue() - o1.getValue();

}

});

List<String> hotChannelList = new ArrayList<>();

for (int i = 0; i < 3; i++) {

hotChannelList.add(channels.get(i).getKey());

}

final Broadcast<List<String>> hotChannelListBroadcast =

sc.broadcast(hotChannelList);

JavaRDD<String> filtedRDD = logRDD.filter

(new Function<String, Boolean>() {

@Override

public Boolean call(String v1) throws Exception {

List<String> hostChannels = hotChannelListBroadcast.value();

String channel = v1.split("\t")[4];

String userId = v1.split("\t")[2];

return hostChannels.contains(channel) &&

!"null".equals(userId);

}

});

JavaPairRDD<String, String> user2ChannelRDD = filtedRDD.mapToPair

(new PairFunction<String, String, String>() {

private static final long serialVersionUID = 1L;

@Override

public Tuple2<String, String> call(String val)

throws Exception {

String[] splited = val.split("\t");

String userId = splited[2];

String channel = splited[4];

return new Tuple2<String, String>(userId, channel);

}

});

JavaPairRDD<String, String> userVistChannelsRDD =

user2ChannelRDD.groupByKey().

flatMapToPair(new PairFlatMapFunction

<Tuple2<String, Iterable<String>>, String, String>() {

private static final long serialVersionUID = 1L;

@Override

public Iterable<Tuple2<String, String>> call

(Tuple2<String, Iterable<String>> tuple)

throws Exception {

String userId = tuple._1;

Iterator<String> iterator = tuple._2.iterator();

Map<String, Integer> channelMap = new HashMap<>();

while (iterator.hasNext()) {

String channel = iterator.next();

Integer count = channelMap.get(channel);

if (count == null)

count = 1;

else

count++;

channelMap.put(channel, count);

}

List<Tuple2<String, String>> list = new ArrayList<>();

Set<String> keys = channelMap.keySet();

for (String channel : keys) {

Integer channelNum = channelMap.get(channel);

list.add(new Tuple2<String, String>(channel,

userId + "_" + channelNum));

}

return list;

}

});

userVistChannelsRDD.groupByKey().foreach(new VoidFunction

<Tuple2<String, Iterable<String>>>() {

@Override

public void call(Tuple2<String, Iterable<String>> tuple)

throws Exception {

String channel = tuple._1;

Iterator<String> iterator = tuple._2.iterator();

List<SortObj> list = new ArrayList<>();

while (iterator.hasNext()) {

String ucs = iterator.next();

String[] splited = ucs.split("_");

String userId = splited[0];

Integer num = Integer.valueOf(splited[1]);

list.add(new SortObj(userId, num));

}

Collections.sort(list, new Comparator<SortObj>() {

@Override

public int compare(SortObj o1, SortObj o2) {

return o2.getValue() - o1.getValue();

}

});

System.out.println("HOT_CHANNLE:" + channel);

for (int i = 0; i < 3; i++) {

SortObj sortObj = list.get(i);

System.out.println(sortObj.getKey() + "==="

+ sortObj.getValue());

}

}

});

}

}

Spark

时间: 2024-10-15 18:51:34

Spark应用_PageView_UserView_HotChannel的相关文章

基于Spark MLlib平台的协同过滤算法---电影推荐系统

基于Spark MLlib平台的协同过滤算法---电影推荐系统 又好一阵子没有写文章了,阿弥陀佛...最近项目中要做理财推荐,所以,回过头来回顾一下协同过滤算法在推荐系统中的应用. 说到推荐系统,大家可能立马会想到协同过滤算法.本文基于Spark MLlib平台实现一个向用户推荐电影的简单应用.其中,主要包括三部分内容: 协同过滤算法概述 基于模型的协同过滤应用---电影推荐 实时推荐架构分析     一.协同过滤算法概述 本人对算法的研究,目前还不是很深入,这里简单的介绍下其工作原理. 通常,

Spark SQL 之 Join 实现

原文地址:Spark SQL 之 Join 实现 Spark SQL 之 Join 实现 涂小刚 2017-07-19 217标签: spark , 数据库 Join作为SQL中一个重要语法特性,几乎所有稍微复杂一点的数据分析场景都离不开Join,如今Spark SQL(Dataset/DataFrame)已经成为Spark应用程序开发的主流,作为开发者,我们有必要了解Join在Spark中是如何组织运行的. SparkSQL总体流程介绍 在阐述Join实现之前,我们首先简单介绍SparkSQL

spark性能调优之资源调优

转https://tech.meituan.com/spark-tuning-basic.html spark作业原理 使用spark-submit提交一个Spark作业之后,这个作业就会启动一个对应的Driver进程.根据你使用的部署模式(deploy-mode)不同,Driver进程可能在本地启动,也可能在集群中某个工作节点上启动.Driver进程本身会根据我们设置的参数,占有一定数量的内存和CPU core.而Driver进程要做的第一件事情,就是向集群管理器(可以是Spark Stand

Spark 整合hive 实现数据的读取输出

实验环境: linux centOS 6.7 vmware虚拟机 spark-1.5.1-bin-hadoop-2.1.0 apache-hive-1.2.1 eclipse 或IntelJIDea 本次使用eclipse. 代码: import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.DataFrame; import o

spark 教程三 spark Map filter flatMap union distinct intersection操作

RDD的创建 spark 所有的操作都围绕着弹性分布式数据集(RDD)进行,这是一个有容错机制的并可以被并行操作的元素集合,具有只读.分区.容错.高效.无需物化.可以缓存.RDD依赖等特征 RDD的创建基础RDD 1.并行集合(Parallelized Collections):接收一个已经存在的Scala集合,然后进行各种并行运算 var sc=new SparkContext(conf) var rdd=sc.parallelize(Array(2,4,9,3,5,7,8,1,6)); rd

Spark运行命令示例

local单机模式:结果xshell可见:./bin/spark-submit --class org.apache.spark.examples.SparkPi --master local[1] ./lib/spark-examples-1.3.1-hadoop2.4.0.jar 100 standalone集群模式:需要的配置项1, slaves文件2, spark-env.shexport JAVA_HOME=/usr/soft/jdk1.7.0_71export SPARK_MASTE

Spark Job具体的物理执行

即使采用pipeline的方式,函数f对依赖的RDD中的数据集合的操作也会有两种方式: 1.f(record),f作用于集合的每一条记录,每次只作用于一条记录 2.f(records),f一次性作用于集合的全部数据: Spark采用的是第一种方式,因为: 1.无需等待,可以最大化的使用集群的计算资源 2.减少OOM的产生 3.最大化的有利于并发 4.可以精准的控制每一个Partition本身(Dependency)及其内部的计算(compute) 5.基于lineage的算子流动式函数式计算,可

Dataflow编程模型和spark streaming结合

Dataflow编程模型和spark streaming结合 主要介绍一下Dataflow编程模型的基本思想,后面再简单比较一下Spark  streaming的编程模型 == 是什么 == 为用户提供以流式或批量模式处理海量数据的能力,该服务的编程接口模型(或者说计算框架)也就是下面要讨论的dataflow model 流式计算框架处理框架很多,也有大量的模型/框架号称能较好的处理流式和批量计算场景,比如Lambda模型,比如Spark等等,那么dataflow模型有什么特别的呢? 这就要要从

Spark性能优化指南——高级篇

Spark性能优化指南--高级篇 [TOC] 前言 继基础篇讲解了每个Spark开发人员都必须熟知的开发调优与资源调优之后,本文作为<Spark性能优化指南>的高级篇,将深入分析数据倾斜调优与shuffle调优,以解决更加棘手的性能问题. 数据倾斜调优 调优概述 有的时候,我们可能会遇到大数据计算中一个最棘手的问题--数据倾斜,此时Spark作业的性能会比期望差很多.数据倾斜调优,就是使用各种技术方案解决不同类型的数据倾斜问题,以保证Spark作业的性能. 数据倾斜发生时的现象 绝大多数tas