HBase连接的几种方式(二)

1. HBase连接的方式概况

主要分为:

  1. 纯Java API连接HBase的方式;
  2. Spark连接HBase的方式;
  3. Flink连接HBase的方式;
  4. HBase通过Phoenix连接的方式;

第一种方式是HBase自身提供的比较原始的高效操作方式,而第二、第三则分别是Spark、Flink集成HBase的方式,最后一种是第三方插件Phoenix集成的JDBC方式,Phoenix集成的JDBC操作方式也能在Spark、Flink中调用。

注意:

这里我们使用HBase2.1.2版本,以下代码都是基于该版本开发的。

2. Spark上连接HBase

Spark上读写HBase主要分为新旧两种API,另外还有批量插入HBase的,通过Phoenix操作HBase的。

2.1 spark读写HBase的新旧API

2.1.1 spark写数据到HBase

使用旧版本saveAsHadoopDataset保存数据到HBase上。

/**
 * saveAsHadoopDataset
 */
def writeToHBase(): Unit ={
  // 屏蔽不必要的日志显示在终端上
  Logger.getLogger("org.apache.spark").setLevel(Level.WARN)

  /* spark2.0以前的写法
  val conf = new SparkConf().setAppName("SparkToHBase").setMaster("local")
  val sc = new SparkContext(conf)
  */
  val sparkSession = SparkSession.builder().appName("SparkToHBase").master("local[4]").getOrCreate()
  val sc = sparkSession.sparkContext

  val tableName = "test"

  //创建HBase配置
  val hbaseConf = HBaseConfiguration.create()
  hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "192.168.187.201") //设置zookeeper集群,也可以通过将hbase-site.xml导入classpath,但是建议在程序里这样设置
  hbaseConf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181") //设置zookeeper连接端口,默认2181
  hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, tableName)

  //初始化job,设置输出格式,TableOutputFormat 是 org.apache.hadoop.hbase.mapred 包下的
  val jobConf = new JobConf(hbaseConf)
  jobConf.setOutputFormat(classOf[TableOutputFormat])

  val dataRDD = sc.makeRDD(Array("12,jack,16", "11,Lucy,15", "15,mike,17", "13,Lily,14"))

  val data = dataRDD.map{ item =>
      val Array(key, name, age) = item.split(",")
      val rowKey = key.reverse
      val put = new Put(Bytes.toBytes(rowKey))
      /*一个Put对象就是一行记录,在构造方法中指定主键
       * 所有插入的数据 须用 org.apache.hadoop.hbase.util.Bytes.toBytes 转换
       * Put.addColumn 方法接收三个参数:列族,列名,数据*/
      put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("name"), Bytes.toBytes(name))
      put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("age"), Bytes.toBytes(age))
      (new ImmutableBytesWritable(), put)
  }
  //保存到HBase表
  data.saveAsHadoopDataset(jobConf)
  sparkSession.stop()
}

使用新版本saveAsNewAPIHadoopDataset保存数据到HBase上

a.txt文件内容为:

100,hello,20
101,nice,24
102,beautiful,26
/**
 * saveAsNewAPIHadoopDataset
 */
 def writeToHBaseNewAPI(): Unit ={
   // 屏蔽不必要的日志显示在终端上
   Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
   val sparkSession = SparkSession.builder().appName("SparkToHBase").master("local[4]").getOrCreate()
   val sc = sparkSession.sparkContext

   val tableName = "test"
   val hbaseConf = HBaseConfiguration.create()
   hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "192.168.187.201")
   hbaseConf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181")
   hbaseConf.set(org.apache.hadoop.hbase.mapreduce.TableOutputFormat.OUTPUT_TABLE, tableName)

   val jobConf = new JobConf(hbaseConf)
   //设置job的输出格式
   val job = Job.getInstance(jobConf)
   job.setOutputKeyClass(classOf[ImmutableBytesWritable])
   job.setOutputValueClass(classOf[Result])
   job.setOutputFormatClass(classOf[org.apache.hadoop.hbase.mapreduce.TableOutputFormat[ImmutableBytesWritable]])

   val input = sc.textFile("v2120/a.txt")

   val data = input.map{item =>
   val Array(key, name, age) = item.split(",")
   val rowKey = key.reverse
   val put = new Put(Bytes.toBytes(rowKey))
   put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("name"), Bytes.toBytes(name))
   put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("age"), Bytes.toBytes(age))
   (new ImmutableBytesWritable, put)
   }
   //保存到HBase表
   data.saveAsNewAPIHadoopDataset(job.getConfiguration)
   sparkSession.stop()
}

2.1.2 spark从HBase读取数据

使用newAPIHadoopRDD从hbase中读取数据,可以通过scan过滤数据

/**
 * scan
 */
 def readFromHBaseWithHBaseNewAPIScan(): Unit ={
   //屏蔽不必要的日志显示在终端上
   Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
   val sparkSession = SparkSession.builder().appName("SparkToHBase").master("local").getOrCreate()
   val sc = sparkSession.sparkContext

   val tableName = "test"
   val hbaseConf = HBaseConfiguration.create()
   hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "192.168.187.201")
   hbaseConf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181")
   hbaseConf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.INPUT_TABLE, tableName)

   val scan = new Scan()
   scan.addFamily(Bytes.toBytes("cf1"))
   val proto = ProtobufUtil.toScan(scan)
   val scanToString = new String(Base64.getEncoder.encode(proto.toByteArray))
   hbaseConf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.SCAN, scanToString)

   //读取数据并转化成rdd TableInputFormat是org.apache.hadoop.hbase.mapreduce包下的
   val hbaseRDD = sc.newAPIHadoopRDD(hbaseConf, classOf[org.apache.hadoop.hbase.mapreduce.TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])

   val dataRDD = hbaseRDD
     .map(x => x._2)
     .map{result =>
       (result.getRow, result.getValue(Bytes.toBytes("cf1"), Bytes.toBytes("name")), result.getValue(Bytes.toBytes("cf1"), Bytes.toBytes("age")))
     }.map(row => (new String(row._1), new String(row._2), new String(row._3)))
     .collect()
     .foreach(r => (println("rowKey:"+r._1 + ", name:" + r._2 + ", age:" + r._3)))
}

2.2 spark利用BulkLoad往HBase批量插入数据

BulkLoad原理是先利用mapreduce在hdfs上生成相应的HFlie文件,然后再把HFile文件导入到HBase中,以此来达到高效批量插入数据。

/**
 * 批量插入 多列
 */
 def insertWithBulkLoadWithMulti(): Unit ={

   val sparkSession = SparkSession.builder().appName("insertWithBulkLoad").master("local[4]").getOrCreate()
   val sc = sparkSession.sparkContext

   val tableName = "test"
   val hbaseConf = HBaseConfiguration.create()
   hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "192.168.187.201")
   hbaseConf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181")
   hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, tableName)

   val conn = ConnectionFactory.createConnection(hbaseConf)
   val admin = conn.getAdmin
   val table = conn.getTable(TableName.valueOf(tableName))

   val job = Job.getInstance(hbaseConf)
   //设置job的输出格式
   job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
   job.setMapOutputValueClass(classOf[KeyValue])
   job.setOutputFormatClass(classOf[HFileOutputFormat2])
   HFileOutputFormat2.configureIncrementalLoad(job, table, conn.getRegionLocator(TableName.valueOf(tableName)))

   val rdd = sc.textFile("v2120/a.txt")
     .map(_.split(","))
     .map(x => (DigestUtils.md5Hex(x(0)).substring(0, 3) + x(0), x(1), x(2)))
     .sortBy(_._1)
     .flatMap(x =>
       {
         val listBuffer = new ListBuffer[(ImmutableBytesWritable, KeyValue)]
         val kv1: KeyValue = new KeyValue(Bytes.toBytes(x._1), Bytes.toBytes("cf1"), Bytes.toBytes("name"), Bytes.toBytes(x._2 + ""))
         val kv2: KeyValue = new KeyValue(Bytes.toBytes(x._1), Bytes.toBytes("cf1"), Bytes.toBytes("age"), Bytes.toBytes(x._3 + ""))
         listBuffer.append((new ImmutableBytesWritable, kv2))
         listBuffer.append((new ImmutableBytesWritable, kv1))
         listBuffer
       }
     )
   //多列的排序,要按照列名字母表大小来

   isFileExist("hdfs://node1:9000/test", sc)

   rdd.saveAsNewAPIHadoopFile("hdfs://node1:9000/test", classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat2], job.getConfiguration)
   val bulkLoader = new LoadIncrementalHFiles(hbaseConf)
   bulkLoader.doBulkLoad(new Path("hdfs://node1:9000/test"), admin, table, conn.getRegionLocator(TableName.valueOf(tableName)))
}

/**
 * 判断hdfs上文件是否存在,存在则删除
 */
def isFileExist(filePath: String, sc: SparkContext): Unit ={
  val output = new Path(filePath)
  val hdfs = FileSystem.get(new URI(filePath), new Configuration)
  if (hdfs.exists(output)){
    hdfs.delete(output, true)
  }
}

2.3 spark利用Phoenix往HBase读写数据

利用Phoenix,就如同msyql等关系型数据库的写法,需要写jdbc

def readFromHBaseWithPhoenix: Unit ={
   //屏蔽不必要的日志显示在终端上
   Logger.getLogger("org.apache.spark").setLevel(Level.WARN)

   val sparkSession = SparkSession.builder().appName("SparkHBaseDataFrame").master("local[4]").getOrCreate()

   //表小写,需要加双引号,否则报错
   val dbTable = "\"test\""

   //spark 读取 phoenix 返回 DataFrame的第一种方式
   val rdf = sparkSession.read
     .format("jdbc")
     .option("driver", "org.apache.phoenix.jdbc.PhoenixDriver")
     .option("url", "jdbc:phoenix:192.168.187.201:2181")
     .option("dbtable", dbTable)
     .load()

   val rdfList = rdf.collect()
   for (i <- rdfList){
     println(i.getString(0) + " " + i.getString(1) + " " + i.getString(2))
   }
   rdf.printSchema()

   //spark 读取 phoenix 返回 DataFrame的第二种方式
   val df = sparkSession.read
     .format("org.apache.phoenix.spark")
     .options(Map("table" -> dbTable, "zkUrl" -> "192.168.187.201:2181"))
     .load()
   df.printSchema()
   val dfList = df.collect()
   for (i <- dfList){
      println(i.getString(0) + " " + i.getString(1) + " " + i.getString(2))
   }
   //spark DataFrame 写入 phoenix,需要先建好表
   /*df.write
     .format("org.apache.phoenix.spark")
     .mode(SaveMode.Overwrite)
     .options(Map("table" -> "PHOENIXTESTCOPY", "zkUrl" -> "jdbc:phoenix:192.168.187.201:2181"))
     .save()
*/
   sparkSession.stop()
}

3. 总结

github地址:

https://github.com/qiushangwenyue/HBaseDemo.git

参考资料:

https://my.oschina.net/uchihamadara/blog/2032481

https://www.cnblogs.com/simple-focus/p/6879971.html

https://www.cnblogs.com/MOBIN/p/5559575.html

https://blog.csdn.net/Suubyy/article/details/80892023

https://www.jianshu.com/p/b09283b14d84

https://www.jianshu.com/p/8e3fdf70dc06

https://www.cnblogs.com/wumingcong/p/6044038.html

https://blog.csdn.net/zhuyu_deng/article/details/43192271

https://www.jianshu.com/p/4c908e419b60

https://blog.csdn.net/Colton_Null/article/details/83387995

原文地址:https://www.cnblogs.com/swordfall/p/10517177.html

时间: 2024-11-07 07:59:45

HBase连接的几种方式(二)的相关文章

python字符串连接的N种方式

python中有很多字符串连接方式,今天在写代码,顺便总结一下: 最原始的字符串连接方式:str1 + str2 python 新字符串连接语法:str1, str2 奇怪的字符串方式:str1 str2 % 连接字符串:‘name:%s; sex: ’ % ('tom', 'male') 字符串列表连接:str.join(some_list) 第一种,想必只要是有编程经验的人,估计都知道,直接用 “+” 来连接两个字符串: 'Jim' + 'Green' = 'JimGreen' 第二种比较特

Java字符串连接的几种方式

Java字符串连接的几种方式 字符串表现的几种方式 StringBuffer和StringBuilder及String的继承关系 字符串的连接 1.String的连接方法 可以看出连接方式是新建了一个包含两个长度的字符数组,然后进行连接. 2.StringBuilder中存储字符串其实用的是一个char数组,capacity其实就是指定这个char数组的大小,StringBuilder的连接方法是继承AbstractStringBuilder的方法的,线程不安全的 在append(str)函数调

多表连接的三种方式详解 HASH JOIN MERGE JOIN NESTED LOOP

在多表联合查询的时候,如果我们查看它的执行计划,就会发现里面有多表之间的连接方式. 之前打算在sqlplus中用执行计划的,但是格式看起来有点乱,就用Toad 做了3个截图. 从3张图里我们看到了几点信息: 1.       CBO 使用的ALL_ROWS模式 Oracle Optimizer CBO RBO http://blog.csdn.NET/tianlesoftware/archive/2010/08/19/5824886.aspx 2.       表之间的连接用了hash Join

sql表连接的几种方式

这里有两张表TableA和TableB,分别是姓名表和年龄表,用于我们例子的测试数据 TableA id name 1 t1 2 t2 4 t4 TableB id age 1 18 2 20 3 19 在开发中我们的业务需求有时候是复杂的,多张表联合查询的时候是有多种方式的,面对不同的需求, 灵活使用不同的表连接方式,那么表连接分成哪几种呢? 表连接有几种? sql表连接分成外连接.内连接和交叉连接. 一.外连接 概述: 外连接包括三种,分别是左外连接.右外连接.全外连接. 对应的sql关键字

C#连接mysql三种方式

第一种方式: 使用MySQLDriverCS.dll连接 MySQLDriverCS软件下载:http://sourceforge.net/projects/mysqldrivercs/?source=typ_redirect 安装完之后再引用中添加引用,找到安装目录,找到MySQLDriverCS.dll文件,然后添加using MySQLDriverCS.dll文件 参考网址:http://www.cnblogs.com/genli/articles/1956537.html C#连接mys

虚拟机 linux 网络连接的三种方式

在VMare虚拟机中,网络连接有三种方式 1.桥接模式   2.only host模式     3.NAT模式   当然还可以自定义,但是虚拟机提供的只有三种方式. 在装完虚拟机之后,网络适配器中就会多出来两个虚拟网卡,VMent1 和Vment8.在使用桥接模式的时候是不需要使用虚拟网卡的,因为它使用的是 你的真实网卡,也就是有线网卡,所以有时候使用桥接模式链接的时候会发现自己没有连通,原因可能是使用了无线网卡,在虚拟机桥接模式中将自动链接 改成自己的有线网卡就可以了. 使用桥接模式因为使用的

进行蓝牙连接的两种方式

为了在两台设备间创建一个连接,必须实现服务器端和客户端的机制,因为一个设备必须打开一个Server Socket,而另一个必须发起连接(使用服务器端设备的MAC地址发起连接).当服务器端和客户端在同一个RFCOMM信道上都有一个BluetoothSocket时,则两端就建立了连接.此刻,每个设备都能获得一个输入输出流,进行数据传输.服务器端和客户端获得BluetoothSocket的方法是不同的,服务器端是在客户端的连接被接受时才产生一个BluetoothSocket,客户端是在打开一个到服务器

C#与数据库的连接的三种方式

学习了.net的知识从C#一直到MVC,我一直觉得基础很重要,最近有复习一下数据库连接的三种方式 1 返回结果集的一张表 public static DataTable ExecuteDataTable(string sql, params SqlParameter[] parameters) { DataSet ds = new DataSet(); SqlDataAdapter adapter = new SqlDataAdapter(sql, str); adapter.SelectCom

java-数组连接的几种方式

多个数组进行拼接, 1, 使用java自己的 System#arrayCopy() byte[] message = new byte[heads.length + result.length + bodies.length]; System.arraycopy(heads, 0, message, 0, heads.length); System.arraycopy(result, 0, message, heads.length, result.length); System.arrayco