Spark PairRDDFunctions flatMapValues

package com.latrobe.spark

import org.apache.spark.{SparkContext, SparkConf}

/**
 * Created by spark on 15-1-18.
 */
object FlatMapValues {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("spark-demo").setMaster("local")
    val sc = new SparkContext(conf)

    import org.apache.spark.SparkContext._
    val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle") , 2)
    //b会是这个样子:(3,dog)(5,tiger)(4,lion)(3,cat)(7,panther)(5,eagle)
    val b = a.map(x => (x.length , x))

    /**
     * 1 针对b的所有Value执行"x" + _ + "x",例如 dog => xdogx
     * 2 将1计算获得的Value压扁,Key保持不变。例如 (3,xdogx) => (3,x),(3,d),(3,o),(3,g),(3,x)
     * 3 flatMapValues 是PairRDDFunctions中的一个函数,所以前面需要隐式转换
     */
    val c = b.flatMapValues("x" + _ + "x")
    c.collect().foreach(print)
  }
}
时间: 2024-11-06 15:55:07

Spark PairRDDFunctions flatMapValues的相关文章

Spark调优

转载:http://www.oschina.net/translate/spark-tuning 因为大部分Spark程序都具有“内存计算”的天性,所以集群中的所有资源:CPU.网络带宽或者是内存都有可能成为Spark程序的瓶颈.通常情况下,如果数据完全加载到内存那么网络带宽就会成为瓶颈,但是你仍然需要对程序进行优化,例如采用序列化的方式保存RDD数据(Resilient Distributed Datasets),以便减少内存使用.该文章主要包含两个议题:数据序列化和内存优化,数据序列化不但能

【Spark学习】Apache Spark调优

Spark调优 本文系根据官方文档翻译而来,转载请注明本文链接 http://www.oschina.net/translate/spark-tuning?print 数据序列化 内存优化 确定内存用量 调整数据结构 序列化RDD存储 垃圾收集调整 其他考虑因素 并行化水平 Reduce任务的内存用量 Broadcasting large variables 总结 因为大部分Spark程序都具有“内存计算”的天性,所以集群中的所有资源:CPU.网络带宽或者是内存都有可能成为Spark程序的瓶颈.

Parallelism , Partitioner

转:spark通过合理设置spark.default.parallelism参数提高执行效率 spark中有partition的概念(和slice是同一个概念,在spark1.2中官网已经做出了说明),一般每个partition对应一个task.在我的测试过程中,如果没有设置spark.default.parallelism参数,spark计算出来的partition非常巨大,与我的cores非常不搭.我在两台机器上(8cores *2 +6g * 2)上,spark计算出来的partition

Spark常用的算子总结(3)—— flatMapValues

flatmapValues就是和flatmap差不多,但是人家本身就是个key value了,所以一个pair怎么变成多个pair呢,就是根据关于values的函数 val a = sc.parallelize(List((1,2),(3,4),(5,6))) val b = a.flatMapValues(x=>1 to x) b.collect.foreach(println(_)) /*结果 (1,1) (1,2) (3,1) (3,2) (3,3) (3,4) (5,1) (5,2) (

spark总结——转载

转载自:http://smallx.me/2016/06/07/spark%E4%BD%BF%E7%94%A8%E6%80%BB%E7%BB%93/ 第一个Spark程序 /** * 功能:用spark实现的单词计数程序 * 环境:spark 1.6.1, scala 2.10.4 */ // 导入相关类库import org.apache.spark._ object WordCount { def main(args: Array[String]) { // 建立spark运行上下文 val

Spark Streaming源码解读之Job详解

一:Spark Streaming Job生成深度思考 1. 做大数据例如Hadoop,Spark等,如果不是流处理的话,一般会有定时任务.例如10分钟触发一次,1个小时触发一次,这就是做流处理的感觉,一切不是流处理,或者与流处理无关的数据都将是没有价值的数据,以前做批处理的时候其实也是隐形的在做流处理. 2. JobGenerator构造的时候有一个核心的参数是jobScheduler, jobScheduler是整个作业的生成和提交给集群的核心,JobGenerator会基于DStream生

Spark操作Hbase

Spark 下操作 HBase(1.0.0 新 API) HBase经过七年发展,终于在今年2月底,发布了 1.0.0 版本.这个版本提供了一些让人激动的功能,并且,在不牺牲稳定性的前提下,引入了新的API.虽然 1.0.0 兼容旧版本的 API,不过还是应该尽早地来熟悉下新版API.并且了解下如何与当下正红的 Spark 结合,进行数据的写入与读取.鉴于国内外有关 HBase 1.0.0 新 API 的资料甚少,故作此文. 本文将分两部分介绍,第一部分讲解使用 HBase 新版 API 进行

Introduction to Monoids and Semigroups with Spark

在地球上什么是Monoid?? 定义: monoid(幺半群 译注:参考附注1翻译,下文中继续使用英文名)是一个带有二元运算(+)和一个单位元(译注:原文为identity element)i使得对于任意x,x+i=i+x=x.注意它不像群(译注:group,数学上翻译为群),它不带有逆元素.也可以说是带有单位元的半群(semigroup) Wow,没什么用.那先看一些例子然后重新看个简单定义.. https://blog.safaribooksonline.com/2013/05/15/mon

Spark编程实现SQL查询的实例

1.Oracle中的SQL select count(1) from a_V_PWYZL_CUSTACCT_PSMIS t where not exists (select 1 from tb_show_multi_question q WHERE q.dqmp_rule_code = '仅比对系统有' and q.dqmp_role_id = '105754659' and q.DQMP_target_id = t.dqmp_mrid) AND NOT EXISTS (select /*+ i