根据PV统计出前三的热门板块,并统计出热门板块下的用户数--方式二


根据PV统计出前三的热门板块,并统计出热门板块下的用户数--方式二

测试数据

java代码

  1 package com.hzf.spark.study;
  2
  3 import java.util.ArrayList;
  4 import java.util.Collections;
  5 import java.util.Comparator;
  6 import java.util.HashMap;
  7 import java.util.Iterator;
  8 import java.util.List;
  9 import java.util.Map;
 10 import java.util.Set;
 11
 12 import org.apache.spark.SparkConf;
 13 import org.apache.spark.api.java.JavaPairRDD;
 14 import org.apache.spark.api.java.JavaRDD;
 15 import org.apache.spark.api.java.JavaSparkContext;
 16 import org.apache.spark.api.java.function.Function;
 17 import org.apache.spark.api.java.function.PairFlatMapFunction;
 18 import org.apache.spark.api.java.function.PairFunction;
 19 import org.apache.spark.api.java.function.VoidFunction;
 20 import org.apache.spark.broadcast.Broadcast;
 21
 22 import scala.Tuple2;
 23
 24 public class HotChannel02 {
 25     public static void main(String[] args) {
 26         SparkConf conf = new SparkConf()
 27                 .setAppName("HotChannel")
 28                 .setMaster("local")
 29                 .set("spark.testing.memory", "2147480000");
 30         JavaSparkContext sc = new JavaSparkContext(conf);
 31         JavaRDD<String> logRDD = sc.textFile("f:/userLog");
 32         String str = "View";
 33         final Broadcast<String> broadcast = sc.broadcast(str);
 34         hotChannel(sc, logRDD, broadcast);
 35     }
 36     private static void hotChannel(JavaSparkContext sc, JavaRDD<String> logRDD, final Broadcast<String> broadcast) {
 37         JavaRDD<String> filteredLogRDD = logRDD.filter(new Function<String, Boolean>() {
 38
 39             private static final long serialVersionUID = 1L;
 40
 41             @Override
 42             public Boolean call(String v1) throws Exception {
 43                 String actionParam = broadcast.value();
 44                 String action = v1.split("\t")[5];
 45                 return actionParam.equals(action);
 46             }
 47         });
 48
 49         JavaPairRDD<String, String> channel2nullRDD = filteredLogRDD.mapToPair(new PairFunction<String, String,String>() {
 50
 51             private static final long serialVersionUID = 1L;
 52
 53             @Override
 54             public Tuple2<String, String> call(String val) throws Exception {
 55                 String channel = val.split("\t")[4];
 56
 57                 return new Tuple2<String, String>(channel,null);
 58             }
 59         });
 60         Map<String, Object> channelPVMap = channel2nullRDD.countByKey();
 61         Set<String> keySet = channelPVMap.keySet();
 62         List<SortObj> channels  = new ArrayList<>();
 63         for(String channel : keySet){
 64             channels.add(new SortObj(channel, Integer.valueOf(channelPVMap.get(channel)+"")));
 65         }
 66         Collections.sort(channels, new Comparator<SortObj>() {
 67
 68             @Override
 69             public int compare(SortObj o1, SortObj o2) {
 70                 return o2.getValue() - o1.getValue();
 71             }
 72         });
 73
 74         List<String> hotChannelList = new ArrayList<>();
 75         for (int i = 0; i < 3; i++) {
 76             hotChannelList.add(channels.get(i).getKey());
 77         }
 78
 79
 80         final Broadcast<List<String>> hotChannelListBroadcast = sc.broadcast(hotChannelList);
 81
 82
 83         JavaRDD<String> filtedRDD = logRDD.filter(new Function<String, Boolean>() {
 84
 85             @Override
 86             public Boolean call(String v1) throws Exception {
 87                 List<String> hostChannels = hotChannelListBroadcast.value();
 88                 String channel = v1.split("\t")[4];
 89                  String userId = v1.split("\t")[2];
 90                 return hostChannels.contains(channel) && !"null".equals(userId);
 91             }
 92         });
 93
 94         JavaPairRDD<String, String> user2ChannelRDD = filtedRDD.mapToPair(new PairFunction<String, String,String>() {
 95
 96             private static final long serialVersionUID = 1L;
 97
 98             @Override
 99             public Tuple2<String, String> call(String val) throws Exception {
100                 String[] splited = val.split("\t");
101                 String userId = splited[2];
102                 String channel = splited[4];
103                 return new Tuple2<String, String>(userId,channel);
104             }
105         });
106
107         JavaPairRDD<String, String> userVistChannelsRDD = user2ChannelRDD.groupByKey().flatMapToPair(new PairFlatMapFunction<Tuple2<String,Iterable<String>>, String, String>() {
108
109             private static final long serialVersionUID = 1L;
110
111             @Override
112             public Iterable<Tuple2<String, String>> call(Tuple2<String, Iterable<String>> tuple) throws Exception {
113                 String userId = tuple._1;
114                 Iterator<String> iterator = tuple._2.iterator();
115                 Map<String, Integer> channelMap = new HashMap<>();
116                 while (iterator.hasNext()) {
117                     String channel = iterator.next();
118                     Integer count = channelMap.get(channel);
119                     if(count == null)
120                         count = 1;
121                     else
122                         count++;
123                     channelMap.put(channel, count);
124                 }
125
126                 List<Tuple2<String, String>> list = new ArrayList<>();
127                 Set<String> keys = channelMap.keySet();
128                 for(String channel : keys){
129                      Integer channelNum  = channelMap.get(channel);
130                      list.add(new Tuple2<String, String>(channel, userId + "_" + channelNum));
131                 }
132                 return list;
133             }
134         });
135
136
137         userVistChannelsRDD.groupByKey().foreach(new VoidFunction<Tuple2<String,Iterable<String>>>() {
138
139             private static final long serialVersionUID = 1L;
140
141             @Override
142             public void call(Tuple2<String, Iterable<String>> tuple) throws Exception {
143                 String channel = tuple._1;
144                 Iterator<String> iterator = tuple._2.iterator();
145                 List<SortObj> list = new ArrayList<>();
146                 while (iterator.hasNext()) {
147                     String ucs = iterator.next();
148                     String[] splited = ucs.split("_");
149                     String userId = splited[0];
150                     Integer num = Integer.valueOf(splited[1]);
151                     list.add(new SortObj(userId, num));
152                 }
153
154                 Collections.sort(list,new Comparator<SortObj>() {
155
156                     @Override
157                     public int compare(SortObj o1, SortObj o2) {
158                         return o2.getValue() - o1.getValue();
159                     }
160                 });
161
162                 System.out.println("HOT_CHANNLE:"+channel);
163                 for(int i = 0 ; i < 3 ; i++){
164                     SortObj sortObj = list.get(i);
165                     System.out.println(sortObj.getKey() + "===" + sortObj.getValue());
166                 }
167             }
168         });
169     }
170 }

result

时间: 2024-08-24 00:34:34

根据PV统计出前三的热门板块,并统计出热门板块下的用户数--方式二的相关文章

如何构建日均千万PV Web站点 (三) Sharding

     其实国内许多大型网站为了应对日益复杂的业务场景,通过使用分而治之的手段将整个网站业务分成不同的产品线,比如说国内那些大型购物交易网站它们都将自己的网站首页.商铺.订单.买家.卖家等拆分不同的产品线,分归不同的业务团队负责: 集体到技术,也会根据产品线划分,将一个网站拆分成许多不同的应用,每个应用用独立部署维护.应用之间可以通过一个超链接建立关系(在首页上的导航链接每个都指向不同的应用地址),也可以通过消息队列进行数据分发,当然最多的还是通过访问同一个数据库存储系统来构成一个关联的完整系

日均百万PV架构第三弹(分布内容为王)

接续接上篇 缓存时代来临 为蓝本,继续改造我们的百万级站点架构,这次我们 拿之前存储静态内容的 nfs 开刀,众所周知 nfs 的多台集群节点下可能由于多重 原因(磁盘io , 网络带宽, 并发场景),不适合做文件共享系统的基础结构. 互联网站点中,存在大量图片或其他静态内容,并且这些内容一般在1M之内,对于 海量小文件,我们将采用mogilefs分布式文件系统来完成.其中概念自行google. # mogilefs分布式文件系统工作流程 架构已经愈发复杂,我们需要从新梳理一下.从下表中应该很容

构建之法(前三章)读后感

第一章:软件工程的定义是:选定合适的开发模型,然后根据客户的需求分析,在给定成本的前提下开发可靠性,可维护性的软件.但这个过程的实现需要团队的共同合作,软件完成后还要根据客户的要求进行修改,发布后的维护,所以说软件工程是一个长久性的工作. 第二章:讲的是软件制作后期所遇到的许多测试,黑盒测试,百盒测试等.测试能力也属于个人能力的一种.而里面所说到的单元测试就是为了提高程序的健壮性,提高程序的可靠性和稳定性. 第三章:成为一个出色的软件工程师需要不断的接触各类软件硬件,吸收更多的知识 ,所谓孰能生

&lt;记录学习&gt;(前三天)京东页面各种注意点

培训学校第1到3天先学习HTML现在流行的是HTML5,目前学习的是HTML5规范.(给有基础一定的人学习)前三天学习的是京东页面的编写,和以前写的不同,页面看上去和自己写的一样,但老师讲的还是有很多不同,更加详细和细节.1.程序员的规范写法很重要,要方便别人的观看,因此注释十分重要.<注意点>2.页面的布局先要看好,分成几块,头部和尾部有很多通用的,可以写在一个css文件里.3.在用户主要访问的地方,可以写的尽可能美观,如"|"号,首部标签之间可以用<li>&

HBase in Action前三章笔记

近期接触HBase,看了HBase In Action的英文版.開始认为还行,做了些笔记.可是兴许看下去,越来越感觉到实战这本书比較偏使用上的细节,对于HBase的具体设计涉及得很少.把前三章的一些笔记帖一下.后面几章内容不打算整理了.并非说书内容不好. key-value存储.强一致性,多个RegionServer节点对client端是不暴露细节的 使用场景:典型的web-search, capture incremental data, ad. click stream, content s

我心中的前三佳作品

1.必趣 内容:  一个提供电影丰富多彩的预告片.花絮.特辑还有精彩片段,美轮美奂的高清海报和养眼剧照,还提供购票. 链接:https://itunes.apple.com/cn/app/wei-shi-fa-xian-wei-zhi-shi/id1008086406 优势:提供了电影的更多相关资讯,不仅仅是预告片. 不足:功能上没看出与猫眼电影等显示出来的不足,但是宣传没比上像猫眼电影等的app,所以较少人知道. 2.高考小秘书 内容:提供志愿分析,志愿指导,等高考资讯. 链接:http://

读《构建之法-现代软件工程》前三张后感

刚开学的第一天,学校发下了书本,有六本书,当时会在上这门课的头一天晚上去看看书本的内容,去大概了解这门课程.其中有一门课程的书吸引了我,感觉这本书和其他的教科书有所不同,它不像其他书一样那么的死板,除了公式,就是理论,没什么奇特的地方.而这本书却很奇特,它不但吸引了我,还让我觉得很有趣,这本书是邹欣老师写的一本书,一本胜似小说的书,平时爱看小说的我,就开始喜欢上了这本书,这本有关软件工程的小说书. 到现在,老师也上了好几次课,我们也对软件工程有了一些初步的了解,而软件工程这本书,我也大概的看完了

《增长黑客》阅读内容摘要(前三章)

<增长黑客>阅读内容摘要(前三章) 寒假无聊,偶然间看到<增长黑客>这本名气很大的书,顺便拿来读读.读到后来根本停不下来,这本书真的比电影还精彩.作者提倡的一种新的软件工程,令人叫绝. 以下是这本书前三章的内容摘要: 一.第一章 通常采用的手段包括A/B测试.搜索引擎优化.电子邮件召回.病毒营销等,而页面加载速度.注册转化率.E-mail到达水平.病毒因子这些指标成为他们日常关注的对象. 增长黑客:以数据驱动营销.以市场指导产品,通过技术化手段贯彻增长目标的人. 五个环节:1. 获

对编程语言的需求总结为四个:效率,灵活,抽象,生产率(C++玩的是前三个,Java和C#玩的是后两个)

Why C++ ? 王者归来(转载) 因为又有人邀请我去Quora的C2C网站去回答问题去了,这回是 关于 @laiyonghao 的这篇有点争议的博文<2012 不宜进入的三个技术点>ActionScript,Thread 和 C++,C++争议的争议最大.(要我说,.NET比C++更需要慎重进入,呵).我就在这里回复一下这个问题吧. 正好我一个月前看到一个视频,这个演讲视频还比较著名,这个演讲者是Exceptional C++ 和 C++ Coding Standards 的作者,还是IS