在SparkStreaming中统计了数据之后,我们需要将结果写入外部文件系统。
本文,以向Hbase中写数据,为例,说一下,SparkStreaming怎么向Hbase中写数据。
首先,需要说一下,下面的这个方法。
foreachRDD(func)
最通用的输出操作,把func作用于从stream生成的每一个RDD。
注意:这个函数是在 运行streaming程序的driver进程 中执行的。
下面跟着思路,看一下,怎么优雅的向Hbase中写入数据
向外部写数据 常见的错误:
向外部数据库写数据,通常会建立连接,使用连接发送数据(也就是保存数据)。
开发者可能 在driver中创建连接,而在spark worker 中保存数据
例如:
dstream.foreachRDD { rdd => val connection = createNewConnection() // 这个会在driver中执行 rdd.foreach { record => connection.send(record) //这个会在 worker中执行 } }
上面这种写法是错误的!上面的写法,需要connection 对象被序列化,然后从driver发送到worker。
这样的connection是很少在机器之间传输的。知道这个问题后,我们可以写出以下的,修改后的代码:
dstream.foreachRDD { rdd => rdd.foreach { record => val connection = createNewConnection() connection.send(record) connection.close() } }
这种写法也是不对的。这会导致,对于每条数据,都创建一个connection(创建connection是消耗资源的)。
下面的方法会好一些:
dstream.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => val connection = createNewConnection() partitionOfRecords.foreach(record => connection.send(record)) connection.close() } }
上面的方法,使用 rdd.foreachPartition 创建一个connection 对象, 一个RDD分区中的所有数据,都使用这一个connection。
更优的方法
在多个RDD之间,connection对象是可以重用的,所以可以创建一个连接池。如下
dstream.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => // ConnectionPool是一个静态的,延迟初始化的连接池 val connection = ConnectionPool.getConnection() partitionOfRecords.foreach(record => connection.send(record)) ConnectionPool.returnConnection(connection) // 返回到池中 以便别人使用 } }
连接池中的连接应该是,应需求而延迟创建,并且,如果一段时间没用,就超时了(也就是关闭该连接)
时间: 2024-10-15 03:46:49