SparkContext, map, flatMap, zip以及例程wordcount

  SparkContext

  通常作为入口函数,可以创建并返回一个RDD。

  如把Spark集群当作服务端那Spark Driver就是客户端,SparkContext则是客户端的核心;

  如注释所说 SparkContext用于连接Spark集群、创建RDD、累加器(accumlator)、广播变量(broadcast variables)

map操作:  会对每一条输入进行指定的操作,然后为每一条输入返回一个对象;

flatMap操作:  “先映射后扁平化”      操作1:同map函数一样:对每一条输入进行指定的操作,然后为每一条输入返回一个对象      操作2:最后将所有对象合并为一个对象

zip函数

x = [1, 2, 3]
y = [4, 5, 6, 7]
xy = zip(x, y)
print xy  #[(1, 4), (2, 5), (3, 6)]

例程WordCount:

from pyspark import SparkContext

sc = SparkContext(‘local‘)
‘‘‘
在一个Spark程序的开始部分,有好多是用sparkContext的parallelize制作RDD的,
是ParallelCollectionRDD,创建一个并行集合。
doc这里包含2个task
‘‘‘
doc = sc.parallelize([[‘a‘, ‘b‘, ‘c‘], [‘b‘, ‘d‘, ‘d‘]])
print(doc.count())  # 2

‘‘‘
map操作:会对每一条输入进行指定的操作,然后为每一条输入返回一个对象;
flatMap操作:“先映射后扁平化”
      操作1:同map函数一样:对每一条输入进行指定的操作,然后为每一条输入返回一个对象
      操作2:最后将所有对象合并为一个对象
‘‘‘
words = doc.map(lambda d: d).collect()
print(words)
words = doc.flatMap(lambda d: d).collect()
print(words)
words = doc.flatMap(lambda d: d).distinct().collect()
print(words)

‘‘‘
zip(list1, list2)把list1,list2变成一个list(e1(list1), e1(list2),e2...)
这里把字符给标号(0 : len(words))
‘‘‘
word_dict = {w: i for w, i in zip(words, range(len(words)))}

‘‘‘
broadcast将变量word_dict高效的传递给每一个子节点
word_dict_b就是word_dict在子节点处理函数中的别名,内容是一致的区别就是要是用.value来用里面的值
‘‘‘
word_dict_b = sc.broadcast(word_dict)

def word_count_per_doc(d):
    dict_tmp = {}
    wd = word_dict_b.value
    for w in d:
        dict_tmp[wd[w]] = dict_tmp.get(wd[w], 0) + 1
    return dict_tmp

‘‘‘
每一个doc都会调用一次 word_count_per_doc
‘‘‘
print(doc.map(word_count_per_doc).collect())
print("successful!")
时间: 2024-10-24 18:01:54

SparkContext, map, flatMap, zip以及例程wordcount的相关文章

Scala learning(2): map, flatMap, filter与For表达式

本文叙述Collections里最常见的三种操作map, flatMap, filter,与For表达式的关系. List对三种方法的实现 map在List的实现: abstract class List[+T] { def map[U](f: T => U): List[U] = this match { case x :: xs => f(x) :: xs.map(f) case Nil => Nil } } flatMap在List的实现: abstract class List[

python几个重要的函数(lambda,filter,reduce,map,zip)

一.匿名函数lambda lambda argument1,argument2,...argumentN :expression using arguments 1.lambda是一个表达式,而不是一个语句. 因为这一点,lambda可以出现在python语法不允许def出现的地方---例如,在一个列表常量中或者函数调用的参数中,此外,作为一个表达式,lambda返回一个值一个值(一个新的函数),可以选择性地值给一个变量名.相反,def语句总是得在头部将一个新的函数赋值给一个变量名,而不是将这个

Python---高级函数map, filter, zip, enumerate等的用法

今天看自然语言处理这本书的时候,被这里的高级函数的概念吸引了,因为我觉得所有的函数都只是函数而已,是为了实现特定功能而实现的,不应该有高级,低级之分啊!不过了解之后,发现这几个函数确实是有点高级,非常好用,所以在这里做一个简单的总结. 1. Haskell:之前以为它是一个函数,其实它是一个统称.Haskell 中的函数可以作为参数和回传值传来传去,这样的函数就被称作高阶函数. 2. map(function, list): 就是对list 中的每一个元素都调用function函数进行处理,返回

Map/Reduce简单样例----wordcount

1.1 MapReduce编程模型 MapReduce采用"分而治之"的思想,把对大规模数据集的操作,分发给一个主节点管理下的各个分节点共同完成,然后通过整合各个节点的中间结果,得到最终结果.简单地说,MapReduce就是"任务的分解与结果的汇总". 在Hadoop中,用于执行MapReduce任务的机器角色有两个:一个是JobTracker:另一个是TaskTracker,JobTracker是用于调度工作的,TaskTracker是用于执行工作的.一个Hado

Swift --> Map & FlatMap

转载自:https://segmentfault.com/a/1190000004050907 Map map函数能够被数组调用,它接受一个闭包作为参数,作用于数组中的每个元素.闭包返回一个变换后的元素,接着将所有这些变换后的元素组成一个新的数组. 这听起来有些复杂,但它是相当简单的.想象你拥有一个string类型的数组: let testArray = ["test1","test1234","","test56"] map

python的reduce,map,zip和filter函数

一.    reduce(function,Iterable),它的形式和map()函数一样.不过参数function必须有两个参数. reduce()函数作用是:把结果继续和序列的下一个元素做累积计算. 例,        >>>def add(x, y) :            # 两数相加    ...     return x + y    ...     >>> reduce(add, [1,2,3,4,5])   # 计算列表和:1+2+3+4+5    

java8 map flatmap

map: 对于Stream中包含的元素使用给定的转换函数进行转换操作,新生成的Stream只包含转换生成的元素.这个方法有三个对于原始类型的变种方法,分别是:mapToInt,mapToLong和mapToDouble.这三个方法也比较好理解,比如mapToInt就是把原始Stream转换成一个新的Stream,这个新生成的Stream中的元素都是int类型.之所以会有这样三个变种方法,可以免除自动装箱/拆箱的额外消耗: map方法示意图: flatMap:和map类似,不同的是其每个元素转换得

RxJava 变换操作符 map flatMap concatMap buffer

demo地址:https://github.com/baiqiantao/RxJavaDemo.git 常用的变换操作符 map:[数据类型转换]将被观察者发送的事件转换为另一种类型的事件 flatMap:[化解循环嵌套和接口嵌套]将被观察者发送的事件序列进行拆分 & 转换 后合并成一个新的事件序列,最后再进行发送 concatMap:[有序]与 flatMap 的 区别在于,拆分 & 重新合并生成的事件序列 的顺序与被观察者旧序列生产的顺序一致 flatMapIterable:相当于对 

java1.8 新特性(五 如何使用filter,limit ,skip ,distinct map flatmap ,collect 操作 java集合)

使用filter 根据 条件筛选 出结果:例如 找出 user 中 age >=15 的用户 package lambda.stream; /** * @author 作者:cb * @version 创建时间:2019年1月4日 下午2:35:05 */ import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; import java.util.stream.Stream; publi