pyspark的RDD代码纪录

pyspark rdd.py文件代码纪录

代码版本为 spark 2.2.0

1.RDD及常见算子

class RDD(): #这里简单介绍几个典型的算子,其余的算子代码可以自己去看一看

    def __init__(self, jrdd, ctx, jrdd_deserializer=AutoBatchedSerializer(PickleSerializer())):
        """
        _jrdd是个非常重要的属性,这个属性会在pyspark的计算过程中被全程传递
        pyspark里被第一个建立出来的RDD往往都是通过jvm调用建立起来的数据源RDD
        这个_jrdd的值就是这个jvm里对应的数据源RDD
        这里需要记住,这个rdd最终在执行任务的时候被jvm执行,将数据源数据传递给python进程
        """
        self._jrdd = jrdd
        self.is_cached = False
        self.is_checkpointed = False
        self.ctx = ctx
        self._jrdd_deserializer = jrdd_deserializer
        self._id = jrdd.id()
        self.partitioner = None

    #最重要也是也是最基本的action
    #其它action都是最终调用此action实现
    def collect(self):
        """
        返回的是一个list,所有分区的结果集
        调用的是scala中对应的PythonRDD对象的collectAndServer方法触发任务的执行
        collect是所有其它action动作的基础跟入口,也就是说collectAndServer是统一执行入口
        """
        with SCCallSiteSync(self.context) as css:
            #提交任务的时候给了一个参数,就是_jrdd对应的rdd
            #这个是最初的数据源rdd或者是PythonRDD
            #这里需要记住,因为当转到scala里的PythonRDD的时候就看出此处的作用了
            port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
        return list(_load_from_socket(port, self._jrdd_deserializer))

    #reduce action
    def reduce(self, f):
        """
        可以看见此次最终调用的也是collect()
        """
        def func(iterator):
            iterator = iter(iterator)
            try:
                initial = next(iterator)
            except StopIteration:
                return
            yield reduce(f, iterator, initial)

        vals = self.mapPartitions(func).collect() #这里
        if vals:
            return reduce(f, vals)
        raise ValueError("Can not reduce() empty RDD")

    #这个函数是其它几个action的基础,也是调用collect实现的
    def fold(self, zeroValue, op):
        """
        这个函数最终调用的也是collect()来提交任务
        这个函数被foreach,sum,count等action调用
        """
        def func(iterator):
            acc = zeroValue
            for obj in iterator:
                acc = op(acc, obj)
            yield acc

        vals = self.mapPartitions(func).collect() #这里
        return reduce(op, vals, zeroValue)

    def union(self, other):
        """
        这个算子pyspark本地并未做过多处理,直接使用的jvm中对应的union

        """
        if self._jrdd_deserializer == other._jrdd_deserializer:
            rdd = RDD(self._jrdd.union(other._jrdd), self.ctx,
                      self._jrdd_deserializer)
        else:
            # These RDDs contain data in different serialized formats, so we
            # must normalize them to the default serializer.
            self_copy = self._reserialize()
            other_copy = other._reserialize()
            rdd = RDD(self_copy._jrdd.union(other_copy._jrdd), self.ctx,
                      self.ctx.serializer)
        if (self.partitioner == other.partitioner and
                self.getNumPartitions() == rdd.getNumPartitions()):
            rdd.partitioner = self.partitioner
        return rdd

    #这个函数也很重要,如果说所有action的基础是collect
    #那么所有transform的基础是这个
    def mapPartitionsWithIndex(self, f, preservesPartitioning=False):
        """
        这个函数被上层其它的转换算子调用
        map,flatMap,mapPartitions,reduceByKey,combinerByKey等等
        PipelinedRDD 是pyspark中第二个RDD类型,所有转换操作返回的类型都是这个类型
        """
        return PipelinedRDD(self, f, preservesPartitioning)

    def mapPartitions(self, f, preservesPartitioning=False):
        """
        可以看见也是调用了mapPartitionsWithIndex实现的
        这里定义的func是个关键,封装了用户的方法,在PipelinedRDD中函数被嵌套封装起来
        """
        def func(s, iterator):
            return f(iterator)
        return self.mapPartitionsWithIndex(func, preservesPartitioning)

    def flatMap(self, f, preservesPartitioning=False):
        """
        可以看见同上一个函数类似
        """
        def func(s, iterator):
            return chain.from_iterable(map(f, iterator))
        return self.mapPartitionsWithIndex(func, preservesPartitioning)

    def join(self, other, numPartitions=None):
        """
        join是通过调用python_join实现的,这个函数在pyspark join.py文件中实现的
        join.py中的实现代码将在其它部分说明
        此处只做简略说明,底层是用union和groupByKey实现的
        """
        return python_join(self, other, numPartitions)

    def reduceByKey(self, func, numPartitions=None, partitionFunc=portable_hash):
        """
        调用的combineByKey实现的
        """
        return self.combineByKey(lambda x: x, func, func, numPartitions, partitionFunc)

    def combineByKey(self, createCombiner, mergeValue, mergeCombiners,
                     numPartitions=None, partitionFunc=portable_hash):
        """
        这个函数实现逻辑是
        1.用mapPartitions把本分区相同的key聚合到一起
        2.然后再用partitionBy重新分区,把相同的key分到相同的分区
        3.再来一次步骤1
        """
        if numPartitions is None:
            numPartitions = self._defaultReducePartitions()

        serializer = self.ctx.serializer
        memory = self._memory_limit()
        agg = Aggregator(createCombiner, mergeValue, mergeCombiners)

        def combineLocally(iterator):
            merger = ExternalMerger(agg, memory * 0.9, serializer)
            merger.mergeValues(iterator)
            return merger.items()

        locally_combined = self.mapPartitions(combineLocally, preservesPartitioning=True)
        shuffled = locally_combined.partitionBy(numPartitions, partitionFunc)

        def _mergeCombiners(iterator):
            merger = ExternalMerger(agg, memory, serializer)
            merger.mergeCombiners(iterator)
            return merger.items()

        return shuffled.mapPartitions(_mergeCombiners, preservesPartitioning=True)

2.PipelinedRDD

class PipelinedRDD(RDD):

    """
    这个类是所有转换操作返回回去的RDD类型,这个类继承了RDD类
    这个类重写了_jrdd属性,返回的jrdd是一个PythonRDD
    PythonRDD的父rdd是最初生成的rdd中的_jrdd
    也就是说,用户使用pyspark代码的时候,执行的jvm代码都是从PythonRDD开始
    """

    def __init__(self, prev, func, preservesPartitioning=False):
        if not isinstance(prev, PipelinedRDD) or not prev._is_pipelinable():
            # 上一个rdd不是PipelinedRDD的话就把原始rdd._jrdd传递下去
            self.func = func
            self.preservesPartitioning = preservesPartitioning
            self._prev_jrdd = prev._jrdd
            self._prev_jrdd_deserializer = prev._jrdd_deserializer
        else:
            prev_func = prev.func

            #这个函数就是把上一个rdd的逻辑和当前的处理逻辑嵌套起来
            #prev_func是上一次转换时指定的函数
            #func是这一次转换时指定的函数
            def pipeline_func(split, iterator):
                return func(split, prev_func(split, iterator))
            self.func = pipeline_func
            self.preservesPartitioning =                 prev.preservesPartitioning and preservesPartitioning
            #上一个rdd是PipelinedRDD的话就把从最初rdd得到的_jrdd传递下去
            self._prev_jrdd = prev._prev_jrdd
            self._prev_jrdd_deserializer = prev._prev_jrdd_deserializer
        self.is_cached = False
        self.is_checkpointed = False
        self.ctx = prev.ctx
        self.prev = prev
        self._jrdd_val = None
        self._id = None
        self._jrdd_deserializer = self.ctx.serializer
        self._bypass_serializer = False
        self.partitioner = prev.partitioner if self.preservesPartitioning else None

    def getNumPartitions(self):
        return self._prev_jrdd.partitions().size()

    @property
    def _jrdd(self):
        """
        这里构造PythonRDD
        """
        if self._jrdd_val:
            return self._jrdd_val
        if self._bypass_serializer:
            self._jrdd_deserializer = NoOpSerializer()

        if self.ctx.profiler_collector:
            profiler = self.ctx.profiler_collector.new_profiler(self.ctx)
        else:
            profiler = None

        #把用户的python代码序列化
        wrapped_func = _wrap_function(self.ctx, self.func, self._prev_jrdd_deserializer,
                                      self._jrdd_deserializer, profiler)
        #构造一个新的_jrdd 类型是PythonRDD,此rdd的父rdd是最初的数据源对应的_jrdd
        #当在此rdd的基础上调用action的时候,传递进去的_jrdd就是这里返回的东西
        python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(), wrapped_func,
                                             self.preservesPartitioning)
        self._jrdd_val = python_rdd.asJavaRDD()

        if profiler:
            self._id = self._jrdd_val.id()
            self.ctx.profiler_collector.add_profiler(self._id, profiler)
        return self._jrdd_val

    def id(self):
        if self._id is None:
            self._id = self._jrdd.id()
        return self._id

    def _is_pipelinable(self):
        return not (self.is_cached or self.is_checkpointed)

原文地址:https://www.cnblogs.com/cloud-zhao/p/9046218.html

时间: 2024-10-31 17:01:03

pyspark的RDD代码纪录的相关文章

PySpark之RDD操作

一.什么是RDD A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable, partitioned collection of elements that can be operated on in parallel. 弹性分布式数据集(RDD),Spark中的基本抽象.表示可以并行操作的元素的不变分区集合. 弹性:可以存储在磁盘或内存中(多种存储级别) 分布:分

(2)pyspark建立RDD以及读取文件成dataframe

1.启动spark 2.建立RDD: 3.从text中读取,read.text 4.从csv中读取:read.csv 5.从json中读取:read.json 7.RDD与Dataframe的转换 (1)dataframe转换成rdd: 法一:datardd = dataDataframe.rdd 法二:datardd = sc.parallelize(_) (2)rdd转换成dataframe: dataDataFrame = spark.createDataFrame(datardd) 原文

pyspark对应的scala代码PythonRDD类

pyspark jvm端的scala代码PythonRDD 代码版本为 spark 2.2.0 1.PythonRDD.class 这个rdd类型是python能接入spark的关键 //这是一个标准的RDD实现,实现对应的compute,partitioner,getPartitions等方法 //这个PythonRDD就是pyspark里PipelinedRDD里_jrdd属性方法返回的东西 //parent就是PipelinedRDD里传递进来的_prev_jrdd,是最初构建的数据源RD

Spark RDD Union

示例 Spark多个RDD(数据格式相同)“组合”为一个RDD 代码   from pyspark import SparkConf, SparkContext conf = SparkConf().setAppName("spark_app_union") sc = SparkContext(conf=conf) rdd1 = sc.parallelize(["line1", "line2", "line3"]) rdd2

RDD 创建

第15课:RDD创建内幕 RDD的创建方式 Spark应用程序运行过程中,第一个RDD代表了Spark应用程序输入数据的来源,之后通过Trasformation来对RDD进行各种算子的转换,来实现具体的算法 Spark中的基本方式: 1)       使用程序中的集合创建 这种方式的实际意义主要用于测试. 2)       使用本地文件系统创建 这种方式的实际意义主要用于测试大量数据的文件 3)       使用HDFS创建RDD 这种方式为生产环境中最常用的创建RDD的方式 4)      

强者联盟——Python语言结合Spark框架

引言:Spark由AMPLab实验室开发,其本质是基于内存的高速迭代框架,"迭代"是机器学习最大的特点,因此很适合做机器学习. 得益于在数据科学中强大的表现,Python语言的粉丝遍布天下,现在又遇上强大的分布式内存计算框架Spark,两个领域的强者走到一起,自然能碰出更加强大的火花(Spark能够翻译为火花).因此本文主要讲述了PySpark. 本文选自<全栈数据之门>. 全栈框架 Spark由AMPLab实验室开发,其本质是基于内存的高速迭代框架,"迭代&qu

基于spark的plsa实现

PLSA.py 1 # coding:utf8 2 from pyspark import SparkContext 3 from pyspark import RDD 4 import numpy as np 5 from numpy.random import RandomState 6 7 import sys 8 reload(sys) 9 #设置默认编码为utf8,从spark rdd中取出中文词汇时需要编码为中文编码,否则不能保存成功 10 sys.setdefaultencodin

一次优化列表页卡顿的经历

写下这篇文章的日期是2016年4月初.当时来到公司,项目之前是外包出去的,代码乱糟糟的,需要重构掉, 摆在面前的问题不是重构项目,而是一些列表页的紧急的性能优化. 1.先优化item的层级 其实层级只要不是太深的话,比如5层,6层,对性能的差别在中等性能的机器上几乎看不出来的,但是想要做到 极致, 我就得死扣细节,原来代码是有4层的,其实有一点点接近可优化的范围了,我把原来的4层降到1层. 1层的话在item的话,在cpu进行计算测量的时候就速度很快了. 下面是我用DDMS去查看某台和我台的列表

Python3实战Spark大数据分析及调度 (网盘分享)

Python3实战Spark大数据分析及调度 搜索QQ号直接加群获取其它学习资料:517432778 部分课程截图: 链接:https://pan.baidu.com/s/1YMmswv47fOUlt-z2A6691A 提取码:z5xv PS:免费分享,若点击链接无法获取到资料,若如若链接失效请加群 其它资源在群里,私聊管理员即可免费领取:群——517432778,点击加群,或扫描二维码   第1章 课程介绍 课程介绍 1-1 PySpark导学试看 1-2 OOTB环境演示 第2章 实战环境搭