How to implement connection pool in spark streaming

在spark streaming的文档里,有这么一段:

def sendPartition(iter):
    # ConnectionPool is a static, lazily initialized pool of connections
    connection = ConnectionPool.getConnection()
    for record in iter:
        connection.send(record)
    # return to the pool for future reuse
    ConnectionPool.returnConnection(connection)

dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))

但是怎么让worker得到一个ConectionPool呢?简单的想法是在使用static变量指向一个ConnectionPool。但这里有一个讲究:怎么保证这个ConnectionPool是worker上的,而不是driver上的?

用pyhton为例:

在ConnectionPool.py里实现一个pool

#/usr/bin/python#connection_pool.pyimport psycopg2
import settings

from DBUtils.PooledDB import PooledDB

pool = PooledDB(psycopg2, settings.connection_pool_size,
                         host=settings.db_host,
                         database=settings.database,
                         user=settings.db_user,
                         password=settings.db_password)def getConnection():    return pool.connection()

假设stream的主代码在main.py里,提交spark

spark-submit --py-files connection_pool.py main.py

这样connection_pool.py将被发送到worker执行,main.py里的 sendPartition 在worker节点上执行的时候就可以获得ConnectionPool.getConnection()调用。

这里的关键是明白哪些代码在driver上跑,哪些在worker上跑。

 
时间: 2024-10-15 01:57:20

How to implement connection pool in spark streaming的相关文章

Spark1.1.0 Spark Streaming Programming Guide

Spark Streaming Programming Guide Overview A Quick Example Basic Concepts Linking Initializing StreamingContext Discretized Streams (DStreams) Input DStreams Transformations on DStreams Output Operations on DStreams Caching / Persistence Checkpointin

Spark Streaming源码解读之Receiver在Driver的精妙实现全生命周期彻底研究和思考

一:Receiver启动的方式设想 1. Spark Streaming通过Receiver持续不断的从外部数据源接收数据,并把数据汇报给Driver端,由此每个Batch Durations就可以根据汇报的数据生成不同的Job. 2. Receiver属于Spark Streaming应用程序启动阶段,那么我们找Receiver在哪里启动就应该去找Spark Streaming的启动. 3. Receivers和InputDStreams是一一对应的,默认情况下一般只有一个Receiver.

(版本定制)第5课:基于案例分析Spark Streaming流计算框架的运行源码

本期内容: 1.在线动态计算分类最热门商品案例回顾与演示 2.基于案例分析Spark Streaming的运行源码 第一部分案例: package com.dt.spark.sparkstreaming import com.robinspark.utils.ConnectionPoolimport org.apache.spark.SparkConfimport org.apache.spark.sql.Rowimport org.apache.spark.sql.hive.HiveConte

第5课:基于案例一节课贯通Spark Streaming流计算框架的运行源码

本期内容: 1 在线动态计算分类最热门商品案例回顾与演示 2 基于案例贯通Spark Streaming的运行源码 一.案例代码 在线动态计算电商中不同类别中最热门的商品排名,例如:手机类别中最热门的三种手机.电视类别中最热门的三种电视等 package com.dt.spark.sparkstreaming import org.apache.spark.SparkConf import org.apache.spark.sql.Row import org.apache.spark.sql.

3.spark streaming Job 架构和容错解析

一.Spark streaming Job 架构 SparkStreaming框架会自动启动Job并每隔BatchDuration时间会自动触发Job的调用. Spark Streaming的Job 分为两大类: 每隔BatchInterval时间片就会产生的一个个Job,这里的Job并不是Spark Core中的Job,它只是基于DStreamGraph而生成的RDD的DAG而已:从Java角度讲相当于Runnable接口的实现类,要想运行Job需要将Job提交给JobScheduler,在J

第9课:Spark Streaming源码解读之Receiver在Driver的精妙实现全生命周期彻底研究和思考

一:Receiver启动的方式设想 1. Spark Streaming通过Receiver持续不断的从外部数据源接收数据,并把数据汇报给Driver端,由此每个Batch Durations就可以根据汇报的数据生成不同的Job. 2. Receiver属于Spark Streaming应用程序启动阶段,那么我们找Receiver在哪里启动就应该去找Spark Streaming的启动. 3. Receivers和InputDStreams是一一对应的,默认情况下一般只有一个Receiver.

基于案例一节课贯通Spark Streaming流计算框架的运行源码

 在线动态计算分类最热门商品案例回顾与演示 基于案例贯通Spark Streaming的运行源码 使用Spark Streaming + Spark SQL来在线动态计算电商中不同类别中最热门的商品排名,例如手机这个类别下面最热门的三款手机. 是用mysql数据库作为元数据库,使用Hive作为存储引擎,使用Spark SQL作为查询引擎. 其中链接数据库代码如下: package com.dt.spark.com.dt.spark.streaming; import java.sql.Con

第96课: 通过Spark Streaming的foreachRDD把处理后的数据写入外部存储系统中

本期内容 技术实现解析 实现实战 SparkStreaming的DStream提供了一个dstream.foreachRDD方法,该方法是一个功能强大的原始的API,它允许将数据发送到外部系统.然而,重要的是要了解如何正确有效地使用这种原始方法.一些常见的错误,以避免如下: 写数据到外部系统,需要建立一个数据连接对象(例如TCP连接到远程的服务器),使用它将数据发送到外部存储系统.为此开发者可能会在Driver中尝试创建一个连接,然后在worker中使用它来保存记录到外部数据.例如如下scala

通过案例对 spark streaming 透彻理解三板斧之三:spark streaming运行机制与架构

本期内容: 1. Spark Streaming Job架构与运行机制 2. Spark Streaming 容错架构与运行机制 事实上时间是不存在的,是由人的感官系统感觉时间的存在而已,是一种虚幻的存在,任何时候宇宙中的事情一直在发生着的. Spark Streaming好比时间,一直遵循其运行机制和架构在不停的在运行,无论你写多或者少的应用程序都跳不出这个范围. 一.   通过案例透视Job执行过程的Spark Streaming机制解析,案例代码如下: import org.apache.