Spark应用_PageView_UserView_HotChannel
一、PV
对某一个页面的访问量,在页面中进行刷新一次就是一次pv
PV {p1, (u1,u2,u3,u1,u2,u4…)} 对同一个页面的浏览量进行统计,用户可以重复
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
public class PV_ANA { public static void main(String[] args) { SparkConf conf = new SparkConf() .setAppName("PV_ANA") .setMaster("local") .set("spark.testing.memory", "2147480000"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> logRDD = sc.textFile("f:/userLog"); String str = "View"; final Broadcast<String> broadcast = sc.broadcast(str); pvAnalyze(logRDD, broadcast); } private static void pvAnalyze(JavaRDD<String> logRDD, final Broadcast<String> broadcast) { JavaRDD<String> filteredLogRDD = logRDD.filter (new Function<String, Boolean>() { private static final long serialVersionUID = 1L; @Override public Boolean call(String s) throws Exception { String actionParam = broadcast.value(); String action = s.split("\t")[5]; return actionParam.equals(action); } }); JavaPairRDD<String, String> pariLogRDD = filteredLogRDD.mapToPair (new PairFunction<String, String, String>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, String> call(String s) throws Exception { String pageId = s.split("\t")[3]; return new Tuple2<String, String>(pageId, null); } }); pariLogRDD.groupByKey().foreach(new VoidFunction <Tuple2<String, Iterable<String>>>() { private static final long serialVersionUID = 1L; @Override public void call(Tuple2<String, Iterable<String>> tuple) throws Exception { String pageId = tuple._1; Iterator<String> iterator = tuple._2.iterator(); long count = 0L; while (iterator.hasNext()) { iterator.next(); count++; } System.out.println("PAGEID:" + pageId + "\t PV_COUNT:" + count); } }); } } |
二、UV
UV {p1, (u1,u2,u3,u4,u5…)} 对一个页面有多少用户访问,用户不可以重复
【方式一】
【流程图】
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
public class UV_ANA { public static void main(String[] args) { SparkConf conf = new SparkConf() .setAppName("UV_ANA") .setMaster("local") .set("spark.testing.memory", "2147480000"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> logRDD = sc.textFile("f:/userLog"); String str = "View"; final Broadcast<String> broadcast = sc.broadcast(str); uvAnalyze(logRDD, broadcast); } private static void uvAnalyze(JavaRDD<String> logRDD, final Broadcast<String> broadcast) { JavaRDD<String> filteredLogRDD = logRDD.filter (new Function<String, Boolean>() { private static final long serialVersionUID = 1L; @Override public Boolean call(String s) throws Exception { String actionParam = broadcast.value(); String action = s.split("\t")[5]; return actionParam.equals(action); } }); JavaPairRDD<String, String> pairLogRDD = filteredLogRDD.mapToPair (new PairFunction<String, String, String>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, String> call(String s) throws Exception { String pageId = s.split("\t")[3]; String userId = s.split("\t")[2]; return new Tuple2<String, String>(pageId, userId); } }); pairLogRDD.groupByKey().foreach(new VoidFunction <Tuple2<String, Iterable<String>>>() { private static final long serialVersionUID = 1L; @Override public void call(Tuple2<String, Iterable<String>> tuple) throws Exception { String pageId = tuple._1; Iterator<String> iterator = tuple._2.iterator(); Set<String> userSets = new HashSet<>(); while (iterator.hasNext()) { String userId = iterator.next(); userSets.add(userId); } System.out.println("PAGEID:" + pageId + "\t " + "UV_COUNT:" + userSets.size()); } }); } } |
【方式二】
【流程图】
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
public class UV_ANAoptz { public static void main(String[] args) { SparkConf conf = new SparkConf() .setAppName("UV_ANAoptz") .setMaster("local") .set("spark.testing.memory", "2147480000"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> logRDD = sc.textFile("f:/userLog"); String str = "View"; final Broadcast<String> broadcast = sc.broadcast(str); uvAnalyzeOptz(logRDD, broadcast); } private static void uvAnalyzeOptz(JavaRDD<String> logRDD, final Broadcast<String> broadcast) { JavaRDD<String> filteredLogRDD = logRDD.filter (new Function<String, Boolean>() { private static final long serialVersionUID = 1L; @Override public Boolean call(String s) throws Exception { String actionParam = broadcast.value(); String action = s.split("\t")[5]; return actionParam.equals(action); } }); JavaPairRDD<String, String> pairRDD = filteredLogRDD.mapToPair (new PairFunction<String, String, String>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, String> call(String s) throws Exception { String pageId = s.split("\t")[3]; String userId = s.split("\t")[2]; return new Tuple2<String, String>(pageId + "_" + userId, null); } }); JavaPairRDD<String, Iterable<String>> groupUp2LogRDD = pairRDD.groupByKey(); Map<String, Object> countByKey = groupUp2LogRDD.mapToPair (new PairFunction<Tuple2<String, Iterable<String>>, String, String>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, String> call(Tuple2<String, Iterable<String>> tuple) throws Exception { String pu = tuple._1; String[] spilted = pu.split("_"); String pageId = spilted[0]; return new Tuple2<String, String>(pageId, null); } }).countByKey(); Set<String> keySet = countByKey.keySet(); for (String key : keySet) { System.out.println("PAGEID:" + key + "\tUV_COUNT:" + countByKey.get(key)); } } } |
三、热门版块下用户访问的数量
统计出热门版块中最活跃的top3用户。
【方式一】
【流程图】
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
public class HotChannel { public static void main(String[] args) { SparkConf conf = new SparkConf() .setAppName("HotChannel") .setMaster("local") .set("spark.testing.memory", "2147480000"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> logRDD = sc.textFile("f:/userLog"); String str = "View"; final Broadcast<String> broadcast = sc.broadcast(str); hotChannel(sc, logRDD, broadcast); } private static void hotChannel(JavaSparkContext sc, JavaRDD<String> logRDD, final Broadcast<String> broadcast) { JavaRDD<String> filteredLogRDD = logRDD.filter (new Function<String, Boolean>() { private static final long serialVersionUID = 1L; @Override public Boolean call(String v1) throws Exception { String actionParam = broadcast.value(); String action = v1.split("\t")[5]; return actionParam.equals(action); } }); JavaPairRDD<String, String> channel2nullRDD = filteredLogRDD.mapToPair (new PairFunction<String, String, String>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, String> call(String s) throws Exception { String channel = s.split("\t")[4]; return new Tuple2<String, String>(channel, null); } }); Map<String, Object> channelPVMap = channel2nullRDD.countByKey(); Set<String> keySet = channelPVMap.keySet(); List<SortObj> channels = new ArrayList<>(); for (String channel : keySet) { channels.add(new SortObj(channel, Integer.valueOf (channelPVMap.get(channel) + ""))); } Collections.sort(channels, new Comparator<SortObj>() { @Override public int compare(SortObj o1, SortObj o2) { return o2.getValue() - o1.getValue(); } }); List<String> hotChannelList = new ArrayList<>(); for (int i = 0; i < 3; i++) { hotChannelList.add(channels.get(i).getKey()); } for (String channel : hotChannelList) { System.out.println("channel:" + channel); } final Broadcast<List<String>> hotChannelListBroadcast = sc.broadcast(hotChannelList); JavaRDD<String> filterRDD = logRDD.filter(new Function<String, Boolean>() { @Override public Boolean call(String s) throws Exception { List<String> hostChannels = hotChannelListBroadcast.value(); String channel = s.split("\t")[4]; String userId = s.split("\t")[2]; return hostChannels.contains(channel) && !"null".equals(userId); } }); JavaPairRDD<String, String> channel2UserRDD = filterRDD.mapToPair (new PairFunction<String, String, String>() { @Override public Tuple2<String, String> call(String s) throws Exception { String[] splited = s.split("\t"); String channel = splited[4]; String userId = splited[2]; return new Tuple2<String, String>(channel, userId); } }); channel2UserRDD.groupByKey().foreach(new VoidFunction <Tuple2<String, Iterable<String>>>() { @Override public void call(Tuple2<String, Iterable<String>> tuple) throws Exception { String channel = tuple._1; Iterator<String> iterator = tuple._2.iterator(); Map<String, Integer> userNumMap = new HashMap<>(); while (iterator.hasNext()) { String userId = iterator.next(); Integer count = userNumMap.get(userId); if (count == null) { count = 1; } else { count++; } userNumMap.put(userId, count); } List<SortObj> lists = new ArrayList<>(); Set<String> keys = userNumMap.keySet(); for (String key : keys) { lists.add(new SortObj(key, userNumMap.get(key))); } Collections.sort(lists, new Comparator<SortObj>() { @Override public int compare(SortObj O1, SortObj O2) { return O2.getValue() - O1.getValue(); } }); System.out.println("HOT_CHANNEL:" + channel); for (int i = 0; i < 3; i++) { SortObj sortObj = lists.get(i); System.out.println(sortObj.getKey() + "==" + sortObj.getValue()); } } }); } } |
【方式二】
【流程图】
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 |
public class HotChannelOpz { public static void main(String[] args) { SparkConf conf = new SparkConf() .setAppName("hotChannelOpz") .setMaster("local") .set("spark.testing.memory", "2147480000"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> logRDD = sc.textFile("f:/userLog"); String str = "View"; final Broadcast<String> broadcast = sc.broadcast(str); hotChannelOpz(sc, logRDD, broadcast); } private static void hotChannelOpz(JavaSparkContext sc, JavaRDD<String> logRDD, final Broadcast<String> broadcast) { JavaRDD<String> filteredLogRDD = logRDD.filter (new Function<String, Boolean>() { private static final long serialVersionUID = 1L; @Override public Boolean call(String v1) throws Exception { String actionParam = broadcast.value(); String action = v1.split("\t")[5]; return actionParam.equals(action); } }); JavaPairRDD<String, String> channel2nullRDD = filteredLogRDD.mapToPair (new PairFunction<String, String, String>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, String> call(String val) throws Exception { String channel = val.split("\t")[4]; return new Tuple2<String, String>(channel, null); } }); Map<String, Object> channelPVMap = channel2nullRDD.countByKey(); Set<String> keySet = channelPVMap.keySet(); List<SortObj> channels = new ArrayList<>(); for (String channel : keySet) { channels.add(new SortObj(channel, Integer.valueOf (channelPVMap.get(channel) + ""))); } Collections.sort(channels, new Comparator<SortObj>() { @Override public int compare(SortObj o1, SortObj o2) { return o2.getValue() - o1.getValue(); } }); List<String> hotChannelList = new ArrayList<>(); for (int i = 0; i < 3; i++) { hotChannelList.add(channels.get(i).getKey()); } final Broadcast<List<String>> hotChannelListBroadcast = sc.broadcast(hotChannelList); JavaRDD<String> filtedRDD = logRDD.filter (new Function<String, Boolean>() { @Override public Boolean call(String v1) throws Exception { List<String> hostChannels = hotChannelListBroadcast.value(); String channel = v1.split("\t")[4]; String userId = v1.split("\t")[2]; return hostChannels.contains(channel) && !"null".equals(userId); } }); JavaPairRDD<String, String> user2ChannelRDD = filtedRDD.mapToPair (new PairFunction<String, String, String>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, String> call(String val) throws Exception { String[] splited = val.split("\t"); String userId = splited[2]; String channel = splited[4]; return new Tuple2<String, String>(userId, channel); } }); JavaPairRDD<String, String> userVistChannelsRDD = user2ChannelRDD.groupByKey(). flatMapToPair(new PairFlatMapFunction <Tuple2<String, Iterable<String>>, String, String>() { private static final long serialVersionUID = 1L; @Override public Iterable<Tuple2<String, String>> call (Tuple2<String, Iterable<String>> tuple) throws Exception { String userId = tuple._1; Iterator<String> iterator = tuple._2.iterator(); Map<String, Integer> channelMap = new HashMap<>(); while (iterator.hasNext()) { String channel = iterator.next(); Integer count = channelMap.get(channel); if (count == null) count = 1; else count++; channelMap.put(channel, count); } List<Tuple2<String, String>> list = new ArrayList<>(); Set<String> keys = channelMap.keySet(); for (String channel : keys) { Integer channelNum = channelMap.get(channel); list.add(new Tuple2<String, String>(channel, userId + "_" + channelNum)); } return list; } }); userVistChannelsRDD.groupByKey().foreach(new VoidFunction <Tuple2<String, Iterable<String>>>() { @Override public void call(Tuple2<String, Iterable<String>> tuple) throws Exception { String channel = tuple._1; Iterator<String> iterator = tuple._2.iterator(); List<SortObj> list = new ArrayList<>(); while (iterator.hasNext()) { String ucs = iterator.next(); String[] splited = ucs.split("_"); String userId = splited[0]; Integer num = Integer.valueOf(splited[1]); list.add(new SortObj(userId, num)); } Collections.sort(list, new Comparator<SortObj>() { @Override public int compare(SortObj o1, SortObj o2) { return o2.getValue() - o1.getValue(); } }); System.out.println("HOT_CHANNLE:" + channel); for (int i = 0; i < 3; i++) { SortObj sortObj = list.get(i); System.out.println(sortObj.getKey() + "===" + sortObj.getValue()); } } }); } } |