PySpark调用自定义jar包

在开发PySpark程序时通常会需要用到Java的对象,而PySpark本身也是建立在Java API之上,通过Py4j来创建JavaSparkContext

这里有几点是需要注意的

1. Py4j只运行在driver

也就是说worker目前来说引入不了第三方的jar包。因为worker结点的PySpark是没有启动Py4j的通信进程的,相应的jar包自然也加载不了。之前没有详细看这部分文档,系统设计时企图在worker结点利用client模式直连Hbase来获取部分数据,从而避免对整个表的JOIN操作,当然对于python来说这样的操作只有通过引入jar包来实现(不考虑thrift方式)。但是测试的jar写好之后,一直不成功,最后只有修改方案,后来才去查了官方文档。

2. PythonRDD 的原型是 JavaRDD[String]

所有的经过PythonRDD传递的数据都通过BASE64编码

3. PySpark 中的方法和匿名函数是通过cloudpickle序列化

为何函数需要被序列化,因为做map或者flatMap时,此时的函数或者lambda表达式是需要传递到各个worder的,如果函数里有用到闭包,cloudpickle也能巧妙的序列化。但是,需要传递的函数里请不要是用self关键字,因为传递过去后,self的指代关系已经不明确了。

文档还提到PythonRDD的序列化是可定制的了,但是目前没这个需求,所有没测试

代码示例

java 测试代码, 编译生成 pyspark-test.jar

package org.valux.py4j;
public class Calculate {
    public int sqAdd(int x){
        return x * x + 1;
    }
}

Python 测试代码,放在文件 driver.py

from pyspark import SparkContext
from py4j.java_gateway import java_import

sc = SparkContext(appName="Py4jTesting")
java_import(sc._jvm, "org.valux.py4j.Calculate")
func = sc._jvm.Calculate()
print func.sqAdd(5)
"""
[OUTPUT] > 26
"""
"""
 !!![错误用法]
 这里是想在每个work上调用自定义的方法,
 前面已经提到过PySpark目前是不支持的
"""
rdd = sc.parallelize([1, 2, 3])
result = rdd.map(func.sqAdd).collect()

"""
 !!![错误用法]
 之前还有个错误的思路是想在work单独 import 相应的 jar
"""
def foo(x):
    java_import(sc._jvm, "org.valux.py4j.Calculate")
    func = sc._jvm.Calculate()
    func.sqAdd(x)
rdd = sc.parallelize([1, 2, 3])
rdd.map(foo).collect()
测试时,提交程序需要记得带上jar包
> bin/spar-submit --driver-class-path pyspark-test.jar driver.py

这里又有一个坑,之前提交为了方便,一直都用的是 --jars 参数

--driver-class-path 附加的 jar 只会在 driver引入 --jars 附加的jar会在所有worker引入

帮助文档里面还提到

--jars Comma-separated list of local jars to include on the driver and executor classpaths.

所有就偷个懒用了 --jars ,结果一直报如下错误:

py4j.protocol.Py4JError: Trying to call a package.

测试了好久终于解决了

参考文档

https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals

时间: 2024-08-07 18:02:04

PySpark调用自定义jar包的相关文章

Kettle自定义jar包供javascript使用

我们都知道 Kettle 是用 Java 语言开发,并且可以在 JavaScript 里面直接调用 java 类方法.所以有些时候,我们可以自定义一些方法,来供 JavaScript 使用. 本篇文章有参考自:https://www.xiaominfo.com/2019/08/13/kettle-12/ 一.在 java 项目中创建工具类 在项目中,创建 utils 工具类,比如 计算总页码 的一个方法.代码如下: public class PaginationUtils { /** * 计算得

java自定义jar包读取Excel(包含2003和2007)数据,并举例说明

用java自定义jar包读取excel数据支持excel2007和excel2003 在http://download.csdn.net/detail/u010792467/8079355下载所需要的包 如果需要excel2003和excel2007文件可以去 http://download.csdn.net/detail/u010792467/8072009下载 在http://download.csdn.net/detail/u010792467/8079345下载工程 自定义jar包运用举

map函数或reduce函数中如何调用第三方jar包

一般我们在mapreduce程序中调用第三方jar包时会出现找不到jar包的问题,检查发现jar包就在相应路径,mapreduce任务就是找不到.仔细想想会发现,这个jar包是放在执行mapreduce主程序机器上的内存中,一般为客户端机器.而我们在map或者reduce函数中调用该jar包时是在集群的机器上的内存中调用,这样怎么可以调用.可以使用以下方法: 1 把jar包提前放在集群每天机器上. 2 和集群调用mysql驱动程序一样,先将jar包放入hdfs,然后通过mysql的distrib

user libary 自定义jar包库

这里讲解如何来建立一个user  libary 下面实际的进行引入jar包! 点击项目右键点击properties在左侧的导航中点击Java build path 在上面的导航条中有一个libary点击后中间的面板会出现一些你没有eclipse中自带的jar包库,点击右侧出现的按钮add libary 出现另外一个框 点击user libary 点击下面的next按钮出现 这里值讲述一下自定义jar包库 这里我们用的是 user libary,是将我们的jar包引入放在我们的myeclipse中

流量汇总(自定义jar包,在hadoop集群上 统计,排序,分组)之统计

小知识点: half:关机 yarn端口:8088 删除hdfs目录:hadoop fs -rm -r /wc/output namenode两个状态都是standby原因:zookeeper没有比hdfs先启动 现在来做一个流量统计的例子: 首先数据是这样一张表:见附件 统计:(代码) 1,flowbean: package cn.itcast.hadoop.mr.flowsum; import java.io.DataInput; import java.io.DataOutput; imp

Android Studio生成自定义jar包(by 星空武哥)

    转载请标注原创地址:http://blog.csdn.net/lsyz0021/article/details/52162414 众所周知android studio会在library所依赖的app运行或者build之后自动生成jar,路径为build/intermediates/bundles/debug or release/classes.jar,这样生成的jar是可以用的,但不是我们可以控制的,所以我们需要通过其他方式来解决这个问题. 首先我们新建一个project然后再新建一个

关于自定义jar包(tomcat)的添加

1 鼠标右击工程 选择 properties 或者 Ait + Enter 2 选择Libraries 3 点击Add Library... 4 选择User Library  点击 Next 5 如果有(曾近导入过)Tomcat jar 包 那就直接打钩选上 finish 如果没有,那么就点击右侧的User Libraries 这里继续第6步 6 点击右侧的New 这里只是新建个Library的名字 然后Add JARs... 把tomcat目录里的jar文件夹里的jar包都添加进去 7 点击

java自定义jar包让jmeter使用---给java参数化

上一篇文章中,提到怎么生成jar包让jmeter使用,这次我们来试试做参数,因为发现调包的时候其实更多还是参数化,那么开始改造吧 1.在httpclientpost这个类中替换参数,且打印参数 import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.util.ArrayList; impo

maven安装自定义jar包到本地仓库并下载

本地仓库找到安装的位置: <?xml version="1.0" encoding="UTF-8"?> <metadata> <groupId>cn.yxyu.app</groupId> <artifactId>my-app</artifactId> <versioning> <release>1.0</release> <versions> &l