06、action操作开发实战

1、reduce:

2、collect:

3、count:

4、take:

5、saveAsTextFile:

6、countByKey:

7、foreach:

package sparkcore.java;

import java.util.Arrays;

import java.util.List;

import java.util.Map;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.Function;

import org.apache.spark.api.java.function.Function2;

import scala.Tuple2;

/**

* action操作实战

*/

public class ActionOperation {

public static void main(String[] args) {

// reduce();

// collect();

// count();

// take();

// saveAsTextFile();

countByKey();

}

public static void reduce() {

// 创建SparkConf和JavaSparkContext

SparkConf conf = new SparkConf().setAppName("reduce").setMaster("local");

JavaSparkContext sc = new JavaSparkContext(conf);

// 有一个集合,里面有1到10,10个数字,现在要对10个数字进行累加

List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

JavaRDD<Integer> numbers = sc.parallelize(numberList);

// 使用reduce操作对集合中的数字进行累加

// reduce操作的原理:

// 首先将第一个和第二个元素,传入call()方法,进行计算,会获取一个结果,比如1 + 2 = 3

// 接着将该结果与下一个元素传入call()方法,进行计算,比如3 + 3 = 6

// 以此类推

// 所以reduce操作的本质,就是聚合,将多个元素聚合成一个元素

int sum = numbers.reduce(new Function2<Integer, Integer, Integer>() {

private static final long serialVersionUID = 1L;

@Override

public Integer call(Integer v1, Integer v2) throws Exception {

return v1 + v2;

}

});

System.out.println(sum);

// 关闭JavaSparkContext

sc.close();

}

public static void collect() {

// 创建SparkConf和JavaSparkContext

SparkConf conf = new SparkConf().setAppName("collect").setMaster("local");

JavaSparkContext sc = new JavaSparkContext(conf);

// 有一个集合,里面有1到10,10个数字,现在要对10个数字进行累加

List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

JavaRDD<Integer> numbers = sc.parallelize(numberList);

// 使用map操作将集合中所有数字乘以2

JavaRDD<Integer> doubleNumbers = numbers.map(

new Function<Integer, Integer>() {

private static final long serialVersionUID = 1L;

@Override

public Integer call(Integer v1) throws Exception {

return v1 * 2;

}

});

// collect操作,将分布在远程集群上的doubleNumbers RDD的数据拉取到本地

// 这种方式,一般不建议使用,因为如果rdd中的数据量比较大的话,比如超过1万条

// 那么性能会比较差,因为要从远程走大量的网络传输,将数据获取到本地

// 此外,除了性能差,还可能在rdd中数据量特别大的情况下,发生oom异常,内存溢出

// 因此,通常,还是推荐使用foreach action操作,来对最终的rdd元素进行处理

List<Integer> doubleNumberList = doubleNumbers.collect();

for (Integer num : doubleNumberList) {

System.out.println(num);

}

// 关闭JavaSparkContext

sc.close();

}

public static void count() {

// 创建SparkConf和JavaSparkContext

SparkConf conf = new SparkConf().setAppName("count").setMaster("local");

JavaSparkContext sc = new JavaSparkContext(conf);

// 有一个集合,里面有1到10,10个数字,现在要对10个数字进行累加

List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

JavaRDD<Integer> numbers = sc.parallelize(numberList);

// 对rdd使用count操作,统计它有多少个元素

long count = numbers.count();

System.out.println(count);

// 关闭JavaSparkContext

sc.close();

}

public static void take() {

// 创建SparkConf和JavaSparkContext

SparkConf conf = new SparkConf().setAppName("take").setMaster("local");

JavaSparkContext sc = new JavaSparkContext(conf);

// 有一个集合,里面有1到10,10个数字,现在要对10个数字进行累加

List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

JavaRDD<Integer> numbers = sc.parallelize(numberList);

// 对rdd使用count操作,统计它有多少个元素

// take操作,与collect类似,也是从远程集群上,获取rdd的数据

// 但是collect是获取rdd的所有数据,take只是获取前n个数据

List<Integer> top3Numbers = numbers.take(3);

for (Integer num : top3Numbers) {

System.out.println(num);

}

// 关闭JavaSparkContext

sc.close();

}

public static void saveAsTextFile() {

// 创建SparkConf和JavaSparkContext

SparkConf conf = new SparkConf().setAppName("saveAsTextFile");

JavaSparkContext sc = new JavaSparkContext(conf);

// 有一个集合,里面有1到10,10个数字,现在要对10个数字进行累加

List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

JavaRDD<Integer> numbers = sc.parallelize(numberList);

// 使用map操作将集合中所有数字乘以2

JavaRDD<Integer> doubleNumbers = numbers.map(

new Function<Integer, Integer>() {

private static final long serialVersionUID = 1L;

@Override

public Integer call(Integer v1) throws Exception {

return v1 * 2;

}

});

// 直接将rdd中的数据,保存在HFDS文件中

// 但是要注意,我们这里只能指定文件夹,也就是目录

// 那么实际上,会保存为目录中的/double_number.txt/part-00000文件

doubleNumbers.saveAsTextFile("hdfs://node1:8020/double_number.txt");

// 关闭JavaSparkContext

sc.close();

}

public static void countByKey() {

// 创建SparkConf

SparkConf conf = new SparkConf().setAppName("countByKey").setMaster("local");

// 创建JavaSparkContext

JavaSparkContext sc = new JavaSparkContext(conf);

// 模拟集合

List<Tuple2<String, String>> scoreList = Arrays.asList(new Tuple2<String, String>("class1", "leo"),

new Tuple2<String, String>("class2", "jack"), new Tuple2<String, String>("class1", "marry"),

new Tuple2<String, String>("class2", "tom"), new Tuple2<String, String>("class2", "david"));

// 并行化集合,创建JavaPairRDD

JavaPairRDD<String, String> students = sc.parallelizePairs(scoreList);

// 对rdd应用countByKey操作,统计每个班级的学生人数,也就是统计每个key对应的元素个数

// 这就是countByKey的作用

// countByKey返回的类型,直接就是Map<String, Object>

Map<String, Long> studentCounts = students.countByKey();

for (Map.Entry<String, Long> studentCount : studentCounts.entrySet()) {

System.out.println(studentCount.getKey() + ": " + studentCount.getValue());

}

// 关闭JavaSparkContext

sc.close();

}

}

package sparkcore.scala

import org.apache.spark.SparkConf

import org.apache.spark.SparkContext

object ActionOperation {

def main(args: Array[String]) {

// reduce()

// collect()

// count()

// take()

countByKey()

}

def reduce() {

val conf = new SparkConf()

.setAppName("reduce")

.setMaster("local")

val sc = new SparkContext(conf)

val numberArray = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

val numbers = sc.parallelize(numberArray, 1)

val sum = numbers.reduce(_ + _)

println(sum)

}

def collect() {

val conf = new SparkConf()

.setAppName("collect")

.setMaster("local")

val sc = new SparkContext(conf)

val numberArray = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

val numbers = sc.parallelize(numberArray, 1)

val doubleNumbers = numbers.map { num => num * 2 }

val doubleNumberArray = doubleNumbers.collect()

for (num <- doubleNumberArray) {

println(num)

}

}

def count() {

val conf = new SparkConf()

.setAppName("count")

.setMaster("local")

val sc = new SparkContext(conf)

val numberArray = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

val numbers = sc.parallelize(numberArray, 1)

val count = numbers.count()

println(count)

}

def take() {

val conf = new SparkConf()

.setAppName("take")

.setMaster("local")

val sc = new SparkContext(conf)

val numberArray = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

val numbers = sc.parallelize(numberArray, 1)

val top3Numbers = numbers.take(3)

for (num <- top3Numbers) {

println(num)

}

}

def saveAsTextFile() {

}

def countByKey() {

val conf = new SparkConf()

.setAppName("countByKey")

.setMaster("local")

val sc = new SparkContext(conf)

val studentList = Array(Tuple2("class1", "leo"), Tuple2("class2", "jack"),

Tuple2("class1", "tom"), Tuple2("class2", "jen"), Tuple2("class2", "marry"))

val students = sc.parallelize(studentList, 1)

val studentCounts = students.countByKey()

println(studentCounts)

}

}

时间: 2024-10-30 21:14:43

06、action操作开发实战的相关文章

05、transformation操作开发实战

1.map:将集合中每个元素乘以2 2.filter:过滤出集合中的偶数 3.flatMap:将行拆分为单词 4.groupByKey:将每个班级的成绩进行分组 5.reduceByKey:统计每个班级的总分 6.sortByKey.sortBy:将学生分数进行排序 7.join:打印每个学生的成绩 8.cogroup:打印每个学生的成绩 package sparkcore.java; import java.util.Arrays; import java.util.Iterator; imp

微信公众平台网页开发实战--2.从手机相册中选照片然后分享

通过对需求的了解,可以将其分解为: (1)微信端手机用户,可以使用微信的JSSDK. (2)选取图片,使用JSSDK的“chooseImage”,由于分享图片时本地地址无法分享,因此还需要JSSDK的“uploadImage”. (3)分享到朋友圈,需要JSSDK的“onMenuShareTimeline”. 综合起来,业务逻辑如图4.5所示. 图4.5  业务逻辑结构图 首先将JSSDK的环境复制一份到本节目录下,创建index.html文件.imageSharing.js文件,目录结构如图4

Cocos2d-x+3.x游戏开发实战pdf

下载地址:网盘下载 内容简介  · · · · · · <Cocos2d-x 3.x游戏开发实战>是一本介绍Cocos2d-x游戏引擎的实用图书,全面介绍了最新的Cocos2d-x 3.2游戏引擎各方面的知识. <Cocos2d-x 3.x游戏开发实战>从内容层次上可分为四个部分.第一部分介绍了游戏开发的基础知识.游戏引擎概念.Cocos2d-x的下载与安装,以及跨平台开发环境的搭建.第二部分介绍了Cocos2d-x中的核心类.动作.动画.3D特效.文字.字体.菜单.事件处理.UI

微信公众平台网页开发实战--3.利用JSSDK在网页中获取地理位置(HTML5+jQuery)

复制一份JSSDK环境,创建一份index.html文件,结构如图7.1所示. 图7.1  7.1节文件结构 在location.js中,封装“getLocation”接口,如下: 01 wxJSSDK.location = function(locationApi){ 02 if(wxJSSDK.isReady){ //wxJSSDK.isReady 查看微信JSSDK是否初始化完毕 03 if(locationApi){ 04 locationApi.getLocation && wx

chrome拓展开发实战

chrome拓展开发实战:页面脚本的拦截注入 时间 2015-07-24 11:15:00  博客园精华区 原文  http://www.cnblogs.com/horve/p/4672890.html 主题 Chrome 原文请访问个人博客: chrome拓展开发实战:页面脚本的拦截注入 目前公司产品的无线站点已经实现了业务平台组件化,所有业务组件的转场都是通过路由来完成,而各个模块是通过 requirejs 进行统一管理,在灰度测试时会通过grunt进行打包操作,虽然工程化的开发流程已经大大

企业级业务系统开发实战

通过一个系列讲述一个真实企业的ERP系统开发全过程.其中包括需求分析.设计建模.开发.测试全生命周期过程,其中会详细讲方法论与技术实践.涉及到的方法包括敏捷软件开发.四色原型.领域驱动设计.业务架构.技术架构与具体的EF.WF.EasyUI等技术在项目中的使用. 领域驱动设计案例之领域层框架搭建 摘要: 根据前面对领域驱动设计概念以及一些最佳实践的理解,领域模型是系统最核心的部分,我们还是采用前面销售订单的例子,这个案例系统的核心构建就从领域层开始.领域层框架搭建主要完成两个任务:1.领域模型的

iOS开发实战——CollectionView点击事件与键盘隐藏结合案例(二)

我在前一篇博客中<iOS开发实战--CollectionView点击事件与键盘隐藏结合案例>详细实现了CollectionView与键盘组合操作中出现的多种情况,并解决了交互体验上的一些问题.在实际项目中也的确可以采用这种方法来操作.但是问题来了,原来的界面我们是使用UIView来操作的,也就是界面是不可滚动的.然而更为常见的场景是一个ScrollView,界面可以进行上下滚动.所以,这篇博客主要是对前一个案例进行优化.还有一个问题是,在自动布局Masonry结合ScrollView中,会碰到

新书《Jfinal极速开发实战》正式发布

JfinalUIB学习交流QQ群 :309647612 书籍整个创作周期从2015年8月1日开始到2015年9月10日,时间仓促,难免有瑕疵,希望大家能够指出存在的问题,我会不断的更新纠正,谢谢大家! 前四章免费阅读,内容会持续定期更新,紧随Jfinal的发展,希望大家看看做出评价,谢谢!   百度阅读地址 :http://yuedu.baidu.com/ebook/3fc54b55d5bbfd0a7956739f 目录 Jfinal极速开发实战... 为什么要写这本书... 3 读者对象...

《Python开发实战》

<Python开发实战> 基本信息 作者: (日)BePROUD股份有限公司 译者: 盛荣 丛书名: 图灵程序设计丛书 出版社:人民邮电出版社 ISBN:9787115320896 上架时间:2014-5-6 出版日期:2014 年5月 开本:16开 页码:1 版次:1-1 所属分类:计算机 > 软件与程序设计 > Python 更多关于>>><Python开发实战> 编辑推荐 真实项目中的经验总结 行业精英们的智慧结晶 团队开发环境的搭建与管理 /