第17课:RDD案例(join、cogroup等实战)

本节课通过代码实战演示RDD中最重要的两个算子,join和cogroup

join算子代码实战:

//通过代码演示join算子
val conf = new SparkConf().setAppName("RDDDemo").setMaster("local")
val sc = new SparkContext(conf)
val arr1 = Array(Tuple2(1, "Spark"), Tuple2(2, "Hadoop"), Tuple2(3, "Tachyon"))
val arr2 = Array(Tuple2(1, 100), Tuple2(2, 70), Tuple2(3, 90))
val rdd1 = sc.parallelize(arr1)
val rdd2 = sc.parallelize(arr2)

val rdd3 = rdd1.join(rdd2)
rdd3.collect().foreach(println)

运行结果:

(1,(Spark,100))

(3,(Tachyon,90))

(2,(Hadoop,70))

cogroup算子代码实战:

首先通过java的方式编写:

SparkConf conf = new SparkConf().setMaster("local").setAppName("Cogroup");

JavaSparkContext sc = new JavaSparkContext(conf);

List<Tuple2<Integer, String>> nameList = Arrays.asList(new Tuple2<Integer, String>(1, "Spark"),

new Tuple2<Integer, String>(2, "Tachyon"), new Tuple2<Integer, String>(3, "Hadoop"));

List<Tuple2<Integer, Integer>> ScoreList = Arrays.asList(new Tuple2<Integer, Integer>(1, 100),

new Tuple2<Integer, Integer>(2, 95), new Tuple2<Integer, Integer>(3, 80),

new Tuple2<Integer, Integer>(1, 80), new Tuple2<Integer, Integer>(2, 110),

new Tuple2<Integer, Integer>(2, 90));

JavaPairRDD<Integer, String> names = sc.parallelizePairs(nameList);

JavaPairRDD<Integer, Integer> scores = sc.parallelizePairs(ScoreList);

JavaPairRDD<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> nameAndScores = names.cogroup(scores);

nameAndScores.foreach(new VoidFunction<Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>>>() {

public void call(Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> t) throws Exception {

System.out.println("ID:" + t._1);

System.out.println("Name:" + t._2._1);

System.out.println("Score:" + t._2._2);

}

});

sc.close();

运行结果:

ID:1

Name:[Spark]

Score:[100, 80]

ID:3

Name:[Hadoop]

Score:[80]

ID:2

Name:[Tachyon]

Score:[95, 110, 90]

通过Scala的方式:

val conf = new SparkConf().setAppName("RDDDemo").setMaster("local")
val sc = new SparkContext(conf)
val arr1 = Array(Tuple2(1, "Spark"), Tuple2(2, "Hadoop"), Tuple2(3, "Tachyon"))
val arr2 = Array(Tuple2(1, 100), Tuple2(2, 70), Tuple2(3, 90), Tuple2(1, 95), Tuple2(2, 65), Tuple2(1, 110))
val rdd1 = sc.parallelize(arr1)
val rdd2 = sc.parallelize(arr2)

val rdd3 = rdd1.cogroup(rdd2)
rdd3.collect().foreach(println)
sc.stop()

运行结果:

(1,(CompactBuffer(Spark),CompactBuffer(100, 95, 110)))

(3,(CompactBuffer(Tachyon),CompactBuffer(90)))

(2,(CompactBuffer(Hadoop),CompactBuffer(70, 65)))

备注:

资料来源于:DT_大数据梦工厂(Spark发行版本定制)

更多私密内容,请关注微信公众号:DT_Spark

如果您对大数据Spark感兴趣,可以免费听由王家林老师每天晚上20:00开设的Spark永久免费公开课,地址YY房间号:68917580

时间: 2025-01-03 01:40:11

第17课:RDD案例(join、cogroup等实战)的相关文章

Spark IMF传奇行动第17课Transformations实战总结

今晚听了王家林老师的Spark IMF传奇行动第17课Transformations实战,作业是用SCALA写cogroup: def main(args: Array[String]): Unit = { val sc = sparkContext("Transformations") cogroupTrans(sc) sc.stop() } def cogroupTrans(sc:SparkContext): Unit ={ val stuNames = Array( Tuple2

Spark3000门徒第15课RDD创建内幕彻底解密总结

今晚听了王家林老师的第15课RDD创建内幕彻底解密,课堂笔记如下: Spark driver中第一个RDD:代表了Spark应用程序输入数据的来源.后续通过Transformation来对RDD进行各种算子的转换实现算法 创建RDD的方法:1,使用程序中的集合创建RDD;2,使用本地文件系统创建RDD:3,使用HDFS创建RDD 4,基于DB创建RDD5,基于NoSQL,例如HBase 6,基于S3创建RDD 7,基于数据流创建RDD 不指定并行度,有多少core就用多少core,所以需要资源管

第17课 - 对象的构造

第17课 - 对象的构造(上) 0. 问题 对象中成员变量的初始值是什么? 下面的类定义中成员变量 i 和 j 的初始值是什么?  对象定义在 全局空间.栈上.堆上,具有不同的属性. 1 #include <stdio.h> 2 3 class Test 4 { 5 private: 6 int i; 7 int j; 8 public: 9 int getI() { return i; } // 类成员函数,直接访问 10 int getJ() { return j; } 11 }; 12

第17课-数据库开发及ado.net 聚合函数,模糊查询like,通配符.空值处理.order by排序.分组group by-having.类型转换-cast,Convert.union all; Select 列 into 新表;字符串函数;日期函数

第17课-数据库开发及ado.net 聚合函数,模糊查询like,通配符.空值处理.order by排序.分组group by-having.类型转换-cast,Convert.union all;  Select 列 into 新表;字符串函数;日期函数 SQL聚合函数 MAX(最大值).MIN(最小值).AVG(平均值).SUM(和).COUNT(数量:记录的条数) 聚合函数对null不计算.如果一行数据都是null,count(*)包含对空值行.重复行的统计. --聚合函数演示 selec

Spark IMF传奇行动第16课RDD实战总结

今晚听了王家林老师的Spark IMF传奇行动第16课RDD实战,课堂笔记如下: RDD操作类型:Transformation.Action.Contoller reduce要符合交换律和结合律 val textLines = lineCount.reduceByKey(_+_,1) textLines.collect.foreach(pair=> println(pair._1 + "="+pair._2)) def collect(): Array[T] = withScop

3月3日完成第17课,准备开始第18课

昨天,也就是3月3日终于完成word文档"lvs+keepalived集群架构服务应用指南.doc"第17课的部分.之前,又完成1节51cto学院里,oldboy关于职业发展的视频笔记"linux运维人员需要具备的技能说明". 本来的计划是春节前完成第17课,没想到又拖成2个月. 第18课估计也不会好到那里去-- 我现在又开始纠结到底要不要进行MySQL的最后1课了,在公司根本就用不到MySQL.是不是要在换工作之前,先把4节shell课程完成再说呢? 先学能用的,

1月5日完成第22课,准备开始第17课

今天总算把第22课,nagios的第二部分视频看完了.3个月的时间,实在超出我的预期:之前的21课.28课,可以说是一课比一课超出我的预期.22课还比21课少半小时的视频时长,可是,学习的进度实在太慢了. 不断"挑战"自己的容忍限度,可是对自己又没有什么办法. 后天下班回来,计划开始第17课(如果明天能早起,说不定可以从明天开始.对了,后天下班回来,我还想完成51cto学院里,oldboy组织的一次运维职场发展的课程里的1节视频:"到底做运维还是做开发-听听专家的意见&quo

Python Codecademy 函数17课

def shut_down(s): if s=="yes": return "Shutting down" elif s=="no": return "Shutdown aborted" else: return"Sorry 17课这个小程序,改了好几遍,错误原因 1.关键字return打错了.(⊙﹏⊙)b 2.return后加了括号 3.用了print

数据-第17课-栈课后练习

第17课-栈课后练习 1. 分析顺序栈和链式栈各个操作的算法时间复杂度. 2. 我们在创建顺序栈时将队尾定义为栈顶,而在创建链式栈时将队头定义为栈顶.那么我们反过来可行吗(即:顺序栈操作队头,链式栈操作队尾)?为什么? 3. 将后缀表达式的转换和计算合并为一个完整的程序,当用户输入合法的s四则运算表达式时直接给出结果. 提示: l  示例中只能处理每个数字为0—9的情况,如何扩展? l  如何判断表达式的输入是否合法? 原文地址:https://www.cnblogs.com/free-1122