spark 写入 redis 和 org.apache.spark.SparkException: Task not serializable

spark将数据写入redis时调用以下代码会报  org.apache.spark.SparkException: Task not serializable

import com.redis.RedisClient
val r = new RedisClient("192.168.1.101", 6379)
val perhit = rdd.map(x => {
    val arr = x.split(" ")
    val k = arr(0).toInt
    val v = arr(1).toInt
    r.rpush(k, v)
    (k, v)
 })

原因是:在spark,rdd的方法里比如这里的map,方法里的数据会被序列化,并且分发到executors 去执行。这就需要rdd方法里的所有元素是可被序列化的这里的redis连接是不可被序列化的,所以会报Task not serializable异常

解决这个问题的方法是在executors中创建连接对象,这里介绍两种方法

1)rdd.mapPartitions 这个方法允许一次处理整个partitons的数据,在此方法中创建连接:

 val rdd = rdd.mapPartitions{partition =>
    val r = new RedisClient("192.168.1.101", 6379)
    val res = partition.map{ x =>
        ...
        val refStr = r.rpush(...)
    }
    r.close
    res
}

2)用可序列化的单例模式来管理连接,让连接用lazy的方式创建

object RedisConnection extends Serializable {
   lazy val conn: RedisClient = new RedisClient("192.168.1.101", 6379)
}

val rdd = rdd.map{x =>
    ... ...
    val refStr = RedisConnection.conn.rpush(...)
}

这里主要是给出在处理rdd数据时,获得redis连接的方法,同样的,操作其他数据库道理是一样的,这里是以redis为例

时间: 2024-11-05 13:33:24

spark 写入 redis 和 org.apache.spark.SparkException: Task not serializable的相关文章

spark出现task不能序列化错误的解决方法 org.apache.spark.SparkException: Task not serializable

import org.elasticsearch.cluster.routing.Murmur3HashFunction; import org.elasticsearch.common.math.MathUtils; // 自定义Partitioner class ESShardPartitioner(settings: String) extends org.apache.spark.Partitioner { protected var _numPartitions = -1; prote

spark 插入数据到mysql时遇到的问题 org.apache.spark.SparkException: Task not serializable

报错问题:Exception in thread "main" org.apache.spark.SparkException: Task not serializableCaused by: java.io.NotSerializableException: org.apache.commons.dbcp2.PoolingDataSource$PoolGuardConnectionWrapper 出错的代码: def saveMonthToMysql(everymonth_avg:R

Apache Spark源码走读之5 -- DStream处理的容错性分析

欢迎转载,转载请注明出处,徽沪一郎,谢谢. 在流数据的处理过程中,为了保证处理结果的可信度(不能多算,也不能漏算),需要做到对所有的输入数据有且仅有一次处理.在Spark Streaming的处理机制中,不能多算,比较容易理解.那么它又是如何作到即使数据处理结点被重启,在重启之后这些数据也会被再次处理呢? 环境搭建 为了有一个感性的认识,先运行一下简单的Spark Streaming示例.首先确认已经安装了openbsd-netcat. 运行netcatnc -lk 9999 运行spark-s

Apache Spark 2.2.0 中文文档 - Spark Streaming 编程指南 | ApacheCN

Spark Streaming 编程指南 概述 一个入门示例 基础概念 依赖 初始化 StreamingContext Discretized Streams (DStreams)(离散化流) Input DStreams 和 Receivers(接收器) DStreams 上的 Transformations(转换) DStreams 上的输出操作 DataFrame 和 SQL 操作 MLlib 操作 缓存 / 持久性 Checkpointing Accumulators, Broadcas

Apache Spark技术实战之4 -- 利用Spark将json文件导入Cassandra

欢迎转载,转载请注明出处. 概要 本文简要介绍如何使用spark-cassandra-connector将json文件导入到cassandra数据库,这是一个使用spark的综合性示例. 前提条件 假设已经阅读技术实战之3,并安装了如下软件 jdk scala sbt cassandra spark-cassandra-connector 实验目的 将存在于json文件中的数据导入到cassandra数据库,目前由cassandra提供的官方工具是json2sstable,由于对cassandr

Apache Spark 2.2.0 中文文档 - Spark SQL, DataFrames and Datasets Guide | ApacheCN

Spark SQL, DataFrames and Datasets Guide Overview SQL Datasets and DataFrames 开始入门 起始点: SparkSession 创建 DataFrames 无类型的Dataset操作 (aka DataFrame 操作) Running SQL Queries Programmatically 全局临时视图 创建Datasets RDD的互操作性 使用反射推断Schema 以编程的方式指定Schema Aggregatio

2018年前100名Apache Spark面试问题和解答(上)

我们知道Apache Spark现在是一项蓬勃发展的技术.因此,了解Apache Spark的各个方面以及Spark面试问题非常重要.我将介绍Spark的每个方面,这也可能是经常被问到的Spark面试问题.此外,我将尽力提供每个问题,从现在开始,您搜索最佳和所有Spark面试问题将在此结束. Apache Spark面试问题答案 一,什么是Apache Spark? Apache Spark是一个功能强大的开源灵活数据处理框架,围绕速度,易用性和复杂的分析而构建.Apache Spark在集群计

Apache Spark技术实战之3 -- Spark Cassandra Connector的安装和使用

欢迎转载,转载请注明出处,徽沪一郎. 概要 前提 假设当前已经安装好如下软件 jdk sbt git scala 安装cassandra 以archlinux为例,使用如下指令来安装cassandra yaourt -S cassandra 启动cassandra cassandra -f 创建keyspace和table, 运行/usr/bin/cqlsh进入cql console,然后执行下述语句创建keyspace和table CREATE KEYSPACE test WITH repli

Apache Spark技术实战之8:Standalone部署模式下的临时文件清理

未经本人同意严禁转载,徽沪一郎. 概要 在Standalone部署模式下,Spark运行过程中会创建哪些临时性目录及文件,这些临时目录和文件又是在什么时候被清理,本文将就这些问题做深入细致的解答. 从资源使用的方面来看,一个进程运行期间会利用到这四个方面的资源,分别是CPU,内存,磁盘和网络.进程退出之后,CPU,内存和网络都会由操作系统负责释放掉,但是运行过程中产生临时文件如果进程自己不在退出之前有效清除,就会留下一地鸡毛,浪费有效的存储空间. 部署时的第三方依赖 再提出具体的疑问之前,先回顾