sparkStreaming向hbase写数据

在SparkStreaming中统计了数据之后,我们需要将结果写入外部文件系统。

本文,以向Hbase中写数据,为例,说一下,SparkStreaming怎么向Hbase中写数据。

首先,需要说一下,下面的这个方法。

foreachRDD(func)

最通用的输出操作,把func作用于从stream生成的每一个RDD。

注意:这个函数是在 运行streaming程序的driver进程 中执行的。

下面跟着思路,看一下,怎么优雅的向Hbase中写入数据

向外部写数据 常见的错误:

向外部数据库写数据,通常会建立连接,使用连接发送数据(也就是保存数据)。

开发者可能 在driver中创建连接,而在spark worker 中保存数据

例如:

dstream.foreachRDD { rdd =>
  val connection = createNewConnection()  // 这个会在driver中执行
  rdd.foreach { record =>
    connection.send(record) //这个会在 worker中执行
  }
}

上面这种写法是错误的!上面的写法,需要connection 对象被序列化,然后从driver发送到worker。

这样的connection是很少在机器之间传输的。知道这个问题后,我们可以写出以下的,修改后的代码:

dstream.foreachRDD { rdd =>
  rdd.foreach { record =>
    val connection = createNewConnection()
    connection.send(record)
    connection.close()
  }
}

这种写法也是不对的。这会导致,对于每条数据,都创建一个connection(创建connection是消耗资源的)。

下面的方法会好一些:

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val connection = createNewConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
  }
}

上面的方法,使用 rdd.foreachPartition 创建一个connection 对象, 一个RDD分区中的所有数据,都使用这一个connection。

更优的方法

在多个RDD之间,connection对象是可以重用的,所以可以创建一个连接池。如下

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool是一个静态的,延迟初始化的连接池
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // 返回到池中 以便别人使用  }
}

连接池中的连接应该是,应需求而延迟创建,并且,如果一段时间没用,就超时了(也就是关闭该连接)

时间: 2024-10-15 03:46:49

sparkStreaming向hbase写数据的相关文章

Hbase写数据,存数据,读数据的详细过程

转自:http://www.aboutyun.com/thread-10886-1-1.html 附HBase 0.94之后Split策略: http://www.aboutyun.com/thread-11211-1-1.html 1.Client写入需要哪些过程?2.Hbase是如何读取数据的? Client写入 -> 存入MemStore,一直到MemStore满 -> Flush成一个StoreFile,直至增长到一定阈值 -> 出发Compact合并操作 -> 多个Sto

Java往hbase写数据

接上篇读HDFS 上面读完了HDFS,当然还有写了. 先上代码: WriteHBase public class WriteHBase { public static void writeHbase(String content){ // HDFS 数据是一行一条记录 String[] lines = content.split("\n"); int userSize = 0; List<Put> puts = new ArrayList<Put>(); Put

线上问题排查-HBase写数据出现NotServingRegionException(Region ... is not online)异常

今天线上遇到一个问题:有一台服务器的cpu持续冲高,排查发现是我们的一个java应用进程造成的,该进程在向hbase中写入数据时,日志不断地打印下面的异常: org.apache.hadoop.hbase.NotServingRegionException: Region iot_flow_cdr_201811,4379692584601-2101152593-20181115072326-355,1536703383699.82804f639798d0502dd64e6e47d75d84. i

mapreduce 只使用Mapper往多个hbase表中写数据

只使用Mapper不使用reduce会大大减少mapreduce程序的运行时间. 有时候程序会往多张hbase表写数据. 所以有如题的需求. 下面给出的代码,不是可以运行的代码,只是展示driver中需要进行的必要项设置,mapper类需要实现的接口,map函数需要的参数以及函数内部的处理方式. 实现过程比较曲折,只贴代码: class Qos2HbaseDriver extends Configured implements Tool { private static Logger logge

【hbase】——HBase 写优化之 BulkLoad 实现数据快速入库

1.为何要 BulkLoad 导入?传统的 HTableOutputFormat 写 HBase 有什么问题? 我们先看下 HBase 的写流程: 通常 MapReduce 在写HBase时使用的是 TableOutputFormat 方式,在reduce中直接生成put对象写入HBase,该方式在大数据量写入时效率低下(HBase会block写入,频繁进行flush,split,compact等大量IO操作),并对HBase节点的稳定性造成一定的影响(GC时间过长,响应变慢,导致节点超时退出,

Spark 批量写数据入HBase

介绍 ??工作中常常会遇到这种情形,需要将hdfs中的大批量数据导入HBase.本文使用Spark+HBase的方式将RDD中的数据导入HBase中.没有使用官网提供的newAPIHadoopRDD接口的方式.使用本文的方式将数据导入HBase, 7000W条数据,花费时间大概20分钟左右,本文Spark可用核数量为20. 本文使用spark版本为1.3.0,hbase版本为0.98.1 hbase表结构为:表名table,列族Family,列为qualifier. 代码如下: val read

【原创】问题定位分享(16)spark写数据到hive外部表报错ClassCastException: org.apache.hadoop.hive.hbase.HiveHBaseTableOutputFormat cannot be cast to org.apache.hadoop.hive.ql.io.HiveOutputFormat

spark 2.1.1 spark在写数据到hive外部表(底层数据在hbase中)时会报错 Caused by: java.lang.ClassCastException: org.apache.hadoop.hive.hbase.HiveHBaseTableOutputFormat cannot be cast to org.apache.hadoop.hive.ql.io.HiveOutputFormat at org.apache.spark.sql.hive.SparkHiveWrit

HBase写的初步测试中的表现

底 第四年HBase.在上线的机HBase集群做一个初步的测试写入性能.下面具体说明做测试内容. 说明 HBase周围环境 0.96版本号,8台region server.默认配置 写数据说明 单column family.两个column qualifier的值为字符串+随机8位正整数,Row Key为两个quailifer值相连后串上随机Long 比方:val1 = dd1977285, val2 =cc6549921, rowkey = rondom.nextLong() + val1 +

优雅的将hbase的数据导入hive表

v\:* {behavior:url(#default#VML);} o\:* {behavior:url(#default#VML);} w\:* {behavior:url(#default#VML);} .shape {behavior:url(#default#VML);} wgx wgx 2 67 2016-04-02T15:15:00Z 2016-04-02T15:15:00Z 1 233 1332 11 3 1562 15.00 Clean Clean false 7.8 磅 0