在spark streaming的文档里,有这么一段:
def sendPartition(iter): # ConnectionPool is a static, lazily initialized pool of connections connection = ConnectionPool.getConnection() for record in iter: connection.send(record) # return to the pool for future reuse ConnectionPool.returnConnection(connection) dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))
但是怎么让worker得到一个ConectionPool呢?简单的想法是在使用static变量指向一个ConnectionPool。但这里有一个讲究:怎么保证这个ConnectionPool是worker上的,而不是driver上的?
用pyhton为例:
在ConnectionPool.py里实现一个pool
#/usr/bin/python#connection_pool.pyimport psycopg2 import settings from DBUtils.PooledDB import PooledDB pool = PooledDB(psycopg2, settings.connection_pool_size, host=settings.db_host, database=settings.database, user=settings.db_user, password=settings.db_password)def getConnection(): return pool.connection()
假设stream的主代码在main.py里,提交spark
spark-submit --py-files connection_pool.py main.py
这样connection_pool.py将被发送到worker执行,main.py里的 sendPartition 在worker节点上执行的时候就可以获得ConnectionPool.getConnection()调用。
这里的关键是明白哪些代码在driver上跑,哪些在worker上跑。
时间: 2024-10-15 01:57:20