大数据算法设计模式(1) - topN spark实现

topN算法,spark实现

package com.kangaroo.studio.algorithms.topn;

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.broadcast.Broadcast;
import scala.Tuple2;

import java.io.Serializable;
import java.util.*;

public class TopnSpark implements Serializable {

    private JavaSparkContext jsc;
    Broadcast<Integer> topNum;
    private String inputPath;

    /*
    *   构造函数
    *   1. 初始化JavaSparkContext
    *   2. 初始化广播变量topN个数, 可以被所有partition共享
    *   3. 初始化输入路径
    * */
    public TopnSpark(Integer Num, String path) {
        jsc = new JavaSparkContext();
        topNum = jsc.broadcast(Num);
        inputPath = path;
    }

    /*
    *   程序入口函数
    * */
    public void run() {
        /*
        *   读入inputPath中的数据
        * */
        JavaRDD<String> lines = jsc.textFile(inputPath, 1);

        /*
        *   将rdd规约到9个分区
        * */
        JavaRDD<String> rdd = lines.coalesce(9);

        /*
        *   将输入转化为kv格式
        *   key是规约的主键, value是排序参考的个数
        *   注: 这里的key并不唯一, 即相同的key可能有多条记录, 所以下面我们规约key成唯一键
        *   输入:line, 输出:kv
        * */
        JavaPairRDD<String, Integer> kv = rdd.mapToPair(new PairFunction<String, String, Integer>() {
            public Tuple2<String, Integer> call(String s) throws Exception {
                String[] tokens = s.split(",");
                return new Tuple2<String, Integer>(tokens[0], Integer.parseInt(tokens[1]));
            }
        });

        /*
        *   规约主键成为唯一键
        *   输入:kv, 输出:kv
        * */
        JavaPairRDD<String, Integer> uniqueKeys = kv.reduceByKey(new Function2<Integer, Integer, Integer>() {
            public Integer call(Integer i1, Integer i2) throws Exception {
                return i1 + i2;
            }
        });

        /*
        *   计算各个分区的topN
        *   这里通过广播变量拿到了topN具体个数, 每个分区都保留topN, 所有分区总个数: partitionNum * topN
        *   输入:kv, 输出:SortMap, 长度topN
        * */
        JavaRDD<SortedMap<Integer, String>> partitions = uniqueKeys.mapPartitions(new FlatMapFunction<Iterator<Tuple2<String,Integer>>, SortedMap<Integer, String>>() {
            public Iterable<SortedMap<Integer, String>> call(Iterator<Tuple2<String, Integer>> iter) throws Exception {
                final int N = topNum.getValue();
                SortedMap<Integer, String> topN = new TreeMap<Integer, String>();
                while (iter.hasNext()) {
                    Tuple2<String, Integer> tuple = iter.next();
                    topN.put(tuple._2, tuple._1);

                    if (topN.size() > N) {
                        topN.remove(topN.firstKey());
                    }
                }
                return Collections.singletonList(topN);
            }
        });

        /*
        *   规约所有分区的topN SortMap, 得到最终的SortMap, 长度topN
        *   reduce过后, 数据已经到了本地缓存, 这是最后结果
        *   输入: SortMap, 长度topN, 当然有partitionNum个, 输出:SortMap, 长度topN
        * */
        SortedMap<Integer, String> finalTopN = partitions.reduce(new Function2<SortedMap<Integer, String>, SortedMap<Integer, String>, SortedMap<Integer, String>>() {
            public SortedMap<Integer, String> call(SortedMap<Integer, String> m1, SortedMap<Integer, String> m2) throws Exception {
                final int N = topNum.getValue();
                SortedMap<Integer, String> topN = new TreeMap<Integer, String>();
                for (Map.Entry<Integer, String> entry : m1.entrySet()) {
                    topN.put(entry.getKey(), entry.getValue());
                    if (topN.size() > N) {
                        topN.remove(topN.firstKey());
                    }
                }
                for (Map.Entry<Integer, String> entry : m2.entrySet()) {
                    topN.put(entry.getKey(), entry.getValue());
                    if (topN.size() > N) {
                        topN.remove(topN.firstKey());
                    }
                }
                return topN;
            }
        });

        /*
        *   将本地缓存的最终结果打印出来
        * */
        for (Map.Entry<Integer, String> entry : finalTopN.entrySet()) {
            System.out.println(entry.getKey() + " -- " + entry.getValue());
        }
    }

    public static void main(String[] args) {
        String inputPath = args[0];
        TopnSpark topnMapper = new TopnSpark(10, inputPath);
        topnMapper.run();

    }
}
时间: 2024-08-06 11:58:40

大数据算法设计模式(1) - topN spark实现的相关文章

决胜大数据时代:Hadoop&amp;Yarn&amp;Spark企业级最佳实践(8天完整版脱产式培训版本)

Hadoop.Yarn.Spark是企业构建生产环境下大数据中心的关键技术,也是大数据处理的核心技术,是每个云计算大数据工程师必修课. 课程简介 大数据时代的精髓技术在于Hadoop.Yarn.Spark,是大数据时代公司和个人必须掌握和使用的核心内容. Hadoop.Yarn.Spark是Yahoo!.阿里淘宝等公司公认的大数据时代的三大核心技术,是大数据处理的灵魂,是云计算大数据时代的技术命脉之所在,以Hadoop.Yarn.Spark为基石构建起来云计算大数据中心广泛运行于Yahoo!.阿

大数据时代到底Hadoop和Spark谁是王者!

在现在这个大数据时代,Hadoop和Spark是最潮流的两个词汇,Hadoop是一种分布式计算框架,由Google提出,主要用于搜索领域,解决海量数据的计算问题,Hadoop中的MapReduce包括两个阶段:Mapper阶段和Reducer阶段,用户只需要实现map函数和reduce函数即可实现分布式计算,非常简单.而近几年Spark新兴框架的产生,以不可挡之势席卷中国,其核心内部结构RDD以超强的弹性机制更加的引人注目!越来越多的人认为Spark终有一天要取代Hadoop,但是事实究竟如何呢

大数据算法摘录

大数据算法的摘抄 预处理 抽取 清洗 分析方法 聚合: 聚类类似于分类,但与分类的目的不同,是针对数据的相似性和差异性将一组数据分为几个类别.属于同一类别的数据间的相似性很大,但不同类别之间数据的相似性很小,跨类的数据关联性很低. 分类: 分类是找出数据库中的一组数据对象的共同特点并按照分类模式将其划分为不同的类,其目的是通过分类模型,将数据库中的数据项映射到摸个给定的类别中. 回归分析: 回归分析反映了数据库中数据的属性值的特性,通过函数表达数据映射的关系来发现属性值之间的依赖关系.它可以应用

大数据算法-&gt;推荐系统常用算法之基于内容的推荐系统算法

港真,自己一直非常希望做算法工程师,所以自己现在开始对现在常用的大数据算法进行不断地学习,今天了解到的算法,就是我们生活中无处不在的推荐系统算法. 其实,向别人推荐商品是一个很常见的现象,比如我用了一个好的商品,向朋友安利之类的.在以前广告系统不发达的时候,我们也是靠口口相传来进行商品的推广.那么为什么,现在推荐系统变的非常重要了呢?,在以前,我们的商品不像现在的物品一样琳琅满目,我们有时间,可以把商品都浏览一遍在进行选择,因为我们都想选择所有商品中最好的,而现在,由于资源的众多,我们不会用大把

SVM4TS (SVM for Time Series) 企业级SVM大数据算法运算平台

SVM4TS(SVM for Time Series)是一款基于SVM的大数据运行平台,支持大数据训练.模型产生.模型实例化.模型发布与上线.目前,SVM4TS是最优秀的商业大数据算法运行平台之一.STV4TS具有如下优点:(1)STV4TS更适合针对时间序列的机器学习计算,包括连续时间序列(如传感器数据)以及离散时间序列(如股票数据):(2)SVM4TS提供了数据分析.模型产生.模型升级.模型上线的全套解决方案,使用相对容易:(3)算法表现好,其算法的综合测评在众多框架中均名列前茅:(4)价格

大数据学习系列之七 ----- Hadoop+Spark+Zookeeper+HBase+Hive集群搭建 图文详解

引言 在之前的大数据学习系列中,搭建了Hadoop+Spark+HBase+Hive 环境以及一些测试.其实要说的话,我开始学习大数据的时候,搭建的就是集群,并不是单机模式和伪分布式.至于为什么先写单机的搭建,是因为作为个人学习的话,单机已足以,好吧,说实话是自己的电脑不行,使用虚拟机实在太卡了... 整个的集群搭建是在公司的测试服务搭建的,在搭建的时候遇到各种各样的坑,当然也收获颇多.在成功搭建大数据集群之后,零零散散的做了写笔记,然后重新将这些笔记整理了下来.于是就有了本篇博文. 其实我在搭

大数据学习系列之七 ----- Hadoop+Spark+Zookeeper+HBase+Hive集

引言 在之前的大数据学习系列中,搭建了Hadoop+Spark+HBase+Hive 环境以及一些测试.其实要说的话,我开始学习大数据的时候,搭建的就是集群,并不是单机模式和伪分布式.至于为什么先写单机的搭建,是因为作为个人学习的话,单机已足以,好吧,说实话是自己的电脑不行,使用虚拟机实在太卡了... 整个的集群搭建是在公司的测试服务搭建的,在搭建的时候遇到各种各样的坑,当然也收获颇多.在成功搭建大数据集群之后,零零散散的做了写笔记,然后重新将这些笔记整理了下来.于是就有了本篇博文. 其实我在搭

大数据算法(一)亚线性算法

来源:大数据算法 王宏志 一.概述 大数据定义:在给定的资源约束下,以大数据为输入,在给定时间约束内可以生成满足给定约束结果的算法. 大数据特点:4V 大数据算法可以不是: 精确算法 内存算法 串行算法 仅在电子计算机上运行的算法 大数据算法不仅是: 云计算 MapReduce 大数据分析和挖掘的算法 难度: 访问全部数据时间过长 读取部分数据 亚线性算法 数据难以放入内存 将数据存储到磁盘上 外存算法 仅基于少量数据进行计算 空间亚线性算法 单个计算机难以保存全部数据 并行处理 并行算法 计算

决胜大数据时代:Hadoop&amp;Yarn&amp;Spark企业级最佳实践(3天)

Hadoop是云计算的事实标准软件框架,是云计算理念.机制和商业化的具体实现,是整个云计算技术学习中公认的核心和最具有价值内容. Yarn是目前公认的最佳的分布式集群资源管理框架: Mahout是目前数据挖掘领域的王者:        工业和信息化部电信研究院于2014年5月发布的“大数据白皮书”中指出: “2012 年美国联邦政府就在全球率先推出“大数据行动计划(Big data initiative)”,重点在基础技术研究和公共部门应用上加大投入.在该计划支持下,加州大学伯克利分校开发了完整