spark出现task不能序列化错误的解决方法

应用场景:使用JavaHiveContext执行SQL之后,希望能得到其字段名及相应的值,但却出现"Caused by: java.io.NotSerializableException: org.apache.spark.sql.api.java.StructField"的错误,代码如下:

JavaSparkContext sc = new JavaSparkContext(conf);
JavaHiveContext sqlContext = new JavaHiveContext(sc);
JavaSchemaRDD schema = sqlContext.sql("select * from default.dual");
final StructField[] fields = schema.schema().getFields();
JavaRDD<String> result = schema.map(new Function<Row, String>() {
	private static final long serialVersionUID = 1L;
	@Override
	public String call(Row row) throws Exception {
		StringBuffer out = new StringBuffer();
		for (int i = 0; i < row.length(); i++) {
			out.append(fields[i].getName() + "->" + row.get(i) + ";");
		}
		return out.toString();
	}
});
System.out.println(result.collect());

在spark官网上查找序列化方面的内容,看到可以通过注册的方式自定义类的序列化方式,故在conf上添加以下设置:

conf.registerKryoClasses(new Class[] { org.apache.spark.sql.api.java.StructField.class });

测试执行后,还是报相同的错误:java.io.NotSerializableException: org.apache.spark.sql.api.java.StructField,不知道为什么,如果有朋友知道,可在下面留言。

上述方法测不通后,又再网上寻求方法,此时看到了下面的这篇文章,内容摘录见下:http://www.cnblogs.com/zwCHAN/p/4305156.html

按照第一种方法,将依赖的变量StructField[]放到map内部定义,代码见下:

JavaSparkContext sc = new JavaSparkContext(conf);
JavaHiveContext sqlContext = new JavaHiveContext(sc);
JavaSchemaRDD schema = sqlContext.sql("select * from default.dual");
JavaRDD<String> result = schema.map(new Function<Row, String>() {
	private static final long serialVersionUID = 1L;
	@Override
	public String call(Row row) throws Exception {
                StructField[] fields = schema.schema().getFields();
		StringBuffer out = new StringBuffer();
		for (int i = 0; i < row.length(); i++) {
			out.append(fields[i].getName() + "->" + row.get(i) + ";");
		}
		return out.toString();
	}
});
System.out.println(result.collect());

  

测试通过,但考虑到每次map都需要从JavaSchemaRDD中获取一次schema信息,比较耗时,而在map中有只需要String类型的字段名就可以了,故在原有基础上对代码进行优化,见下:

JavaSparkContext sc = new JavaSparkContext(conf);
JavaHiveContext sqlContext = new JavaHiveContext(sc);
JavaSchemaRDD schema = sqlContext.sql("select * from default.dual");
StructField[] fields = schema.schema().getFields();
final String[] fieldsName = new String[fields.length];
for (int i = 0; i < fields.length; i++) {
	fieldsName[i] = fields[i].getName();
}
JavaRDD<String> result = schema.map(new Function<Row, String>() {
	private static final long serialVersionUID = 1L;
	@Override
	public String call(Row row) throws Exception {
		StringBuffer out = new StringBuffer();
		for (int i = 0; i < row.length(); i++) {
			out.append(fieldsName[i] + "->" + row.get(i) + ";");
		}
		return out.toString();
	}
});
System.out.println(result.collect());

以下内容摘录自:http://www.cnblogs.com/zwCHAN/p/4305156.html

出现“org.apache.spark.SparkException: Task not serializable"这个错误,一般是因为在map、filter等的参数使用了外部的变量,但是这个变量不能序列化。特别是当引用了某个类(经常是当前类)的成员函数或变量时,会导致这个类的所有成员(整个类)都需要支持序列化。解决这个问题最常用的方法有:

  1. 如果可以,将依赖的变量放到map、filter等的参数内部定义。这样就可以使用不支持序列化的类;
  2. 如果可以,将依赖的变量独立放到一个小的class中,让这个class支持序列化;这样做可以减少网络传输量,提高效率;
  3. 如果可以,将被依赖的类中不能序列化的部分使用transient关键字修饰,告诉编译器它不需要序列化。
  4. 将引用的类做成可序列化的。
  5. 以下这两个没试过。。
  • Make the NotSerializable object as a static and create it once per machine.
  • Call rdd.forEachPartition and create the NotSerializable object in there like this:

==================

ref[1]:<http://databricks.gitbooks.io/databricks-spark-knowledge-base/content/troubleshooting/javaionotserializableexception.html>

If you see this error:

org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: ...

The above error can be triggered when you intialize a variable on the driver (master), but then try to use it on one of the workers. In that case, Spark Streaming will try to serialize the object to send it over to the worker, and fail if the object is not serializable. Consider the following code snippet:

NotSerializable notSerializable = new NotSerializable();
JavaRDD<String> rdd = sc.textFile("/tmp/myfile");

rdd.map(s -> notSerializable.doSomething(s)).collect();

This will trigger that error. Here are some ideas to fix this error:

  • Serializable the class
  • Declare the instance only within the lambda function passed in map.
  • Make the NotSerializable object as a static and create it once per machine.
  • Call rdd.forEachPartition and create the NotSerializable object in there like this:
rdd.forEachPartition(iter -> {
  NotSerializable notSerializable = new NotSerializable();

  // ...Now process iter
});

Pasted from: <http://databricks.gitbooks.io/databricks-spark-knowledge-base/content/troubleshooting/javaionotserializableexception.html>

另外, stackoverflow上http://stackoverflow.com/questions/25914057/task-not-serializable-exception-while-running-apache-spark-job 这个答的也很简明易懂。

时间: 2024-11-04 21:55:01

spark出现task不能序列化错误的解决方法的相关文章

spark出现task不能序列化错误的解决方法 org.apache.spark.SparkException: Task not serializable

import org.elasticsearch.cluster.routing.Murmur3HashFunction; import org.elasticsearch.common.math.MathUtils; // 自定义Partitioner class ESShardPartitioner(settings: String) extends org.apache.spark.Partitioner { protected var _numPartitions = -1; prote

WCF项目中出现常见错误的解决方法:基础连接已经关闭: 连接被意外关闭

原文:WCF项目中出现常见错误的解决方法:基础连接已经关闭: 连接被意外关闭 在我们开发WCF项目的时候,常常会碰到一些莫名其妙的错误,有时候如果根据它的错误提示信息,一般很难定位到具体的问题所在,而由于WCF服务的特殊性,调试起来也不是那么方便,因此往往会花费不少时间来进行跟踪处理.本文介绍我在我在我的框架里面使用WCF服务的时候,出现的一个常见错误的处理方法,它的提示信息是:基础连接已经关闭: 连接被意外关闭.这种情况我碰到的有两种,一种是返回DataTable的时候出现的,一种是返回实体类

fedora25 安装virtualbox5.1 出现提示Kernel driver not installed (rc=-1908) 错误的解决方法

fedora25 安装virtualbox5.1 出现提示Kernel driver not installed (rc=-1908) 错误的解决方法: $ sudo /usr/lib/virtualbox/vboxdrv.sh setup        vboxdrv.sh: Building VirtualBox kernel modules.             vboxdrv.sh: Starting VirtualBox services.

Windows 7 64bit上安装Oracle Database 12c [INS-30131] 错误的解决方法

Windows 7 64bit上安装Oracle Database 12c,出现以下错误: 解决方法: 第一步:控制面板>所有控制面板项>管理工具>服务>SERVER  启动 第二步:控制面板>所有控制面板项>管理工具>计算机管理>系统工具>共享文件夹>共享   右键单击“共享”>新建共享> 点击“下一步”>   单击“浏览”> 选择"本地磁盘(C:)">确定   单击“下一步”:     单击“

飞鸽传书 bind() error=10048错误的解决方法

提示  bind() 错误=10048 原因:其他程序占用飞鸽的 2425 端口 比如:飞秋也是使用2425端口 解决:用netstat命令查看是哪个进程占用了该端口 格式:netstat -ano | find "2425" 结果:  UDP    0.0.0.0:2425     *:*        1716 最后的1716就是占用2425进程的进程ID,看看是什么,可以结束的直接结束该进程,再打开飞鸽就可以了. 关于netstat命令可以输入命令 netstat /? 查看更多

运维实战案例之“Argument list too long”错误与解决方法

作为一名运维人员来说,这个错误并不陌生,在执行rm.cp.mv等命令时,如果要操作的文件数很多,可能会使用通配符批量处理大量文件,这时就可能会出现"Argument list too long"这个问题了. 1.错误现象 这是一台Mysql数据库服务器,在系统中运行了很多定时任务,今天通过crontab命令又添加了一个计划任务,退出时发生了如下报错: #crontab -e 编辑完成后,保存退出,就出现下面如下图所示错误: 2.解决思路 根据上面报错的提示信息,基本判定是磁盘空间满了,

idea调试SpringMvc, 出现:”Can&#39;t find catalina.jar&quot;错误的解决方法

用gradle构建的项目,点击运行出现以下错误提示: Error running PraticeWeb: Can't find catalina.jar 21:54 Error running PraticeWeb Invalid arguments : Already listening [timeout, port, localAddress] 检查tomcat配置发现: File -> Setting -> Build,Execution,Deployment -> Applica

Laravel中常见的错误与解决方法小结

一.报错: 「Can't swap PDO instance while within transaction」 通过查询 Laravel 源代码,可以确认异常是在 setPdo 方法中抛出的: ? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 <?php public function setPdo($pdo) {   if ($this->transactions >= 1) {     throw new RuntimeException("

关于spring4和hibernate4整合,配置事务报“Cannot unwrap to requested type [javax.sql.DataSource]”错误的解决方法及心得

Cannot unwrap to requested type [javax.sql.DataSource] 配置hibernate4和spring4时,出现错误,解决方法: 1.我去了spring4中的事务配置,数据库可以正常执行并访问.如下: <!-- 配置Hibernate  数据事务 --> <bean id="transactionManager" class="org.springframework.orm.hibernate4.Hibernat