Spark-ML-01-小试spark分析离线商品信息

任务

一个在线商品购买记录数据集,约40M,格式如下:

Jack,iphone cover,9,99
Jack,iphone cover,9,99
Jack,iphone cover,9,99
Jack,iphone cover,9,99

完成统计

1.购买总次数

2.客户总个数

3.总收入

4.最畅销的商品

代码

import java.util.Collections;
import java.util.Comparator;
import java.util.List;

import org.apache.commons.collections.comparators.ComparableComparator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.DoubleFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;

import scala.Tuple2;

/**
 *
 * @author jinhang
 *
 */
public class JavaApp {

    public static void main(String[] args) {
        SparkConf sparkConf = new SparkConf().setAppName("ShopInfoAnalysis").setMaster("local[*]");
        JavaSparkContext sc = new JavaSparkContext(sparkConf);
        JavaRDD<String[]> data = sc.textFile("data/UserPurchaseHistory.csv").map(s -> s.split(","));
        /**
         * 统计
         */
        long numPurchases = data.count();
        long uniqueUsers = data.map(s->s[0]).distinct().count();
        double totalRevenue = data.mapToDouble(s -> Double.parseDouble(s[2])).sum();
        JavaPairRDD<String, Integer> product = data.mapToPair(s->new Tuple2(s[1],1));
        List<Tuple2<String, Integer>> pairs= product.reduceByKey((x,y)->(x+y)).sortByKey().collect();
        System.out.println(pairs);
        String mostPopular = pairs.get(pairs.size()-1)._1();
        int purchases = pairs.get(0)._2();
        System.out.println("Total purchases: " + numPurchases);
        System.out.println("Unique users: " + uniqueUsers);
        System.out.println("Total revenue: " + totalRevenue);
        System.out.println(String.format("Most popular product: %s with %d purchases",
                mostPopular, purchases));
        sc.stop();

    }

}

简单的RDD转换和执行就可以简单解决大数据的问题,java实现的代码方便和以前的hadoop代码结合执行。

时间: 2024-11-05 17:34:57

Spark-ML-01-小试spark分析离线商品信息的相关文章

spark ml 的例子

一.关于spark ml pipeline与机器学习 一个典型的机器学习构建包含若干个过程 1.源数据ETL 2.数据预处理 3.特征选取 4.模型训练与验证 以上四个步骤可以抽象为一个包括多个步骤的流水线式工作,从数据收集开始至输出我们需要的最终结果.因此,对以上多个步骤.进行抽象建模,简化为流水线式工作流程则存在着可行性,对利用spark进行机器学习的用户来说,流水线式机器学习比单个步骤独立建模更加高效.易用. 受 scikit-learn 项目的启发,并且总结了MLlib在处理复杂机器学习

Spark中决策树源码分析

1.Example 使用Spark MLlib中决策树分类器API,训练出一个决策树模型,使用Python开发. """ Decision Tree Classification Example. """from __future__ import print_functionfrom pyspark import SparkContextfrom pyspark.mllib.tree import DecisionTree, DecisionT

Spark ML下实现的多分类adaboost+naivebayes算法在文本分类上的应用

1. Naive Bayes算法 朴素贝叶斯算法算是生成模型中一个最经典的分类算法之一了,常用的有Bernoulli和Multinomial两种.在文本分类上经常会用到这两种方法.在词袋模型中,对于一篇文档$d$中出现的词$w_0,w_1,...,w_n$, 这篇文章被分类为$c$的概率为$$p(c|w_0,w_1,...,w_n) = \frac{p(c,w_0,w_1,...,w_n)}{p(w_0,w_1,...,w_n)} = \frac{p(w_0,w_1,...,w_n|c)*p(c

Spark SQL Catalyst源码分析之Physical Plan 到 RDD的具体实现

接上一篇文章Spark SQL Catalyst源码分析之Physical Plan,本文将介绍Physical Plan的toRDD的具体实现细节: 我们都知道一段sql,真正的执行是当你调用它的collect()方法才会执行Spark Job,最后计算得到RDD. lazy val toRdd: RDD[Row] = executedPlan.execute() Spark Plan基本包含4种操作类型,即BasicOperator基本类型,还有就是Join.Aggregate和Sort这种

Spark SQL Catalyst源码分析之TreeNode Library

前几篇文章介绍了Spark SQL的Catalyst的SqlParser,和Analyzer,本来打算直接写Optimizer的,但是发现忘记介绍TreeNode这个Catalyst的核心概念,介绍这个可以更好的理解Optimizer是如何对Analyzed Logical Plan进行优化的生成Optimized Logical Plan,本文就将TreeNode基本架构进行解释. 一.TreeNode类型 TreeNode Library是Catalyst的核心类库,语法树的构建都是由一个个

Spark资源调度机制源码分析--基于spreadOutApps及非spreadOutApps两种资源调度算法

Spark资源调度机制源码分析--基于spreadOutApps及非spreadOutApps两种资源调度算法 1.spreadOutApp尽量平均分配到每个executor上: 2.非spreadOutApp尽量在使用单个executor的资源. 源码分析 org.apache.spark.deploy.master.Master 1.首先判断,master状态不是ALIVE的话,直接返回2.调度driver3. Application的调度机制(核心之核心,重中之重) 源码如下: 1 /*

【Spark 深入学习 01】 Spark是什么鬼?

经过一段时间的学习和测试,是时候给spark的学习经历做一个总结了,对于spark的了解相对晚了写.春节期间(预计是无大事),本博准备推出20篇左右spark系列原创文章(先把牛吹出去再说) ,尽量将枯燥无味的技术讲的通俗易懂- r.kelly 2013年的时候第一次听说spark这么个神器,那时候它还幼小,没什么人鸟它,但是它强大基因注定了它不是个凡夫俗子, 故事就是从那一小撮人群中开始的. 一.Spark何许人也 姓名:Spark 性别:未知 出生地:加州大学伯克利分校AMP实验室 出生年月

spark 启动job的流程分析

从WordCount开始分析 编写一个例子程序 编写一个从HDFS中读取并计算wordcount的例子程序: packageorg.apache.spark.examples importorg.apache.spark.SparkContext importorg.apache.spark.SparkContext._ objectWordCount{ defmain(args : Array[String]) { valsc = newSparkContext(args(0),"wordco

Extending sparklyr to Compute Cost for K-means on YARN Cluster with Spark ML Library

Machine and statistical learning wizards are becoming more eager to perform analysis with Spark MLlibrary if this is only possible. It’s trendy, posh, spicy and gives the feeling of doing state of the art machine learning and being up to date with th