Spark读取Hbase中的数据
大家可能都知道很熟悉Spark的两种常见的数据读取方式(存放到RDD中):(1)、调用parallelize函数直接从集合中获取数据,并存入RDD中;Java版本如下:
1 |
JavaRDD<Integer> myRDD = sc.parallelize(Arrays.asList(1,2,3)); |
Scala版本如下:
1 |
val myRDD= sc.parallelize(List(1,2,3)) |
这种方式很简单,很容易就可以将一个集合中的数据变成RDD的初始化值;更常见的是(2)、从文本中读取数据到RDD中,这个文本可以是纯文本文件、可以是sequence文件;可以存放在本地(file://)、可以存放在HDFS(hdfs://)上,还可以存放在S3上。其实对文件来说,Spark支持Hadoop所支持的所有文件类型和文件存放位置。Java版如下:
01 |
///////////////////////////////////////////////////////////////////// |
05 |
http://www.yfteach.com |
06 |
07 |
云帆大数据博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货 |
08 |
微信公共帐号:yfteach |
09 |
///////////////////////////////////////////////////////////////////// |
10 |
import org.apache.spark.SparkConf; |
11 |
import org.apache.spark.api.java.JavaRDD; |
12 |
import org.apache.spark.api.java.JavaSparkContext; |
13 |
14 |
SparkConf conf = new SparkConf().setAppName("Simple Application"); |
15 |
JavaSparkContext sc = new JavaSparkContext(conf); |
16 |
sc.addFile("wyp.data"); |
17 |
JavaRDD<String> lines = sc.textFile(SparkFiles.get("wyp.data")); |
Scala版本如下:
1 |
import org.apache.spark.SparkContext |
2 |
import org.apache.spark.SparkConf |
3 |
4 |
val conf = new SparkConf().setAppName("Simple Application") |
5 |
val sc = new SparkContext(conf) |
6 |
sc.addFile("spam.data") |
7 |
val inFile = sc.textFile(SparkFiles.get("spam.data")) |
在实际情况下,我们需要的数据可能不是简单的存放在HDFS文本中,我们需要的数据可能就存放在Hbase中,那么我们如何用Spark来读取Hbase中的数据呢?本文的所有测试是基于Hadoop 2.2.0、Hbase 0.98.2、Spark 0.9.1,不同版本可能代码的编写有点不同。本文只是简单地用Spark来读取Hbase中的数据,如果需要对Hbase进行更强的操作,本文可能不能帮你。话不多说,Spark操作Hbase的核心的Java版本代码如下:
01 |
import org.apache.hadoop.conf.Configuration; |
02 |
import org.apache.hadoop.hbase.HBaseConfiguration; |
03 |
import org.apache.hadoop.hbase.client.Result; |
04 |
import org.apache.hadoop.hbase.client.Scan; |
05 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
06 |
import org.apache.hadoop.hbase.mapreduce.TableInputFormat; |
07 |
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; |
08 |
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; |
09 |
import org.apache.hadoop.hbase.util.Base64; |
10 |
import org.apache.hadoop.hbase.util.Bytes; |
11 |
import org.apache.spark.api.java.JavaPairRDD; |
12 |
import org.apache.spark.api.java.JavaSparkContext; |
13 |
31 |
23 |
24 |
JavaSparkContext sc = new JavaSparkContext(master, "hbaseTest", |
25 |
System.getenv("SPARK_HOME"), System.getenv("JARS")); |
26 |
27 |
Configuration conf = HBaseConfiguration.create(); |
28 |
Scan scan = new Scan(); |
29 |
scan.addFamily(Bytes.toBytes("cf")); |
30 |
scan.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("airName")); |
32 |
try { |
33 |
String tableName = "flight_wap_order_log"; |
34 |
conf.set(TableInputFormat.INPUT_TABLE, tableName); |
35 |
ClientProtos.Scan proto = ProtobufUtil.toScan(scan); |
36 |
String ScanToString = Base64.encodeBytes(proto.toByteArray()); |
37 |
conf.set(TableInputFormat.SCAN, ScanToString); |
38 |
39 |
JavaPairRDD<ImmutableBytesWritable, Result> myRDD = |
40 |
sc.newAPIHadoopRDD(conf, TableInputFormat.class, |
41 |
ImmutableBytesWritable.class, Result.class); |
42 |
43 |
catch (Exception e) { |
44 |
e.printStackTrace(); |
45 |
} |
这样本段代码段是从Hbase表名为flight_wap_order_log的数据库中读取cf列簇上的airName一列的数据,这样我们就可以对myRDD进行相应的操作:
1 |
System.out.println(myRDD.count()); |
本段代码需要在pom.xml文件加入以下依赖:
01 |
<dependency> |
02 |
<groupId>org.apache.spark</groupId> |
03 |
<artifactId>spark-core_2.10</artifactId> |
04 |
<version>0.9.1</version> |
05 |
</dependency> |
06 |
07 |
<dependency> |
08 |
<groupId>org.apache.hbase</groupId> |
09 |
<artifactId>hbase</artifactId> |
10 |
<version>0.98.2-hadoop2</version> |
11 |
</dependency> |
12 |
13 |
<dependency> |
14 |
<groupId>org.apache.hbase</groupId> |
15 |
<artifactId>hbase-client</artifactId> |
16 |
<version>0.98.2-hadoop2</version> |
17 |
</dependency> |
18 |
19 |
<dependency> |
20 |
<groupId>org.apache.hbase</groupId> |
21 |
<artifactId>hbase-common</artifactId> |
22 |
<version>0.98.2-hadoop2</version> |
23 |
</dependency> |
24 |
25 |
<dependency> |
26 |
<groupId>org.apache.hbase</groupId> |
27 |
<artifactId>hbase-server</artifactId> |
28 |
<version>0.98.2-hadoop2</version> |
29 |
</dependency> |
Scala版如下:
01 |
import org.apache.spark._ |
02 |
import org.apache.spark.rdd.NewHadoopRDD |
03 |
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor} |
04 |
import org.apache.hadoop.hbase.client.HBaseAdmin |
05 |
import org.apache.hadoop.hbase.mapreduce.TableInputFormat |
06 |
22 |
val conf = HBaseConfiguration.create() |
15 |
///////////////////////////////////////////////////////////////////// |
16 |
17 |
object HBaseTest { |
18 |
def main(args: Array[String]) { |
19 |
val sc = new SparkContext(args(0), "HBaseTest", |
20 |
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass)) |
21 |
23 |
conf.set(TableInputFormat.INPUT_TABLE, args(1)) |
24 |
25 |
val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], |
26 |
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], |
27 |
classOf[org.apache.hadoop.hbase.client.Result]) |
28 |
29 |
hBaseRDD.count() |
30 |
31 |
System.exit(0) |
32 |
} |
33 |
} |
我们需要在加入如下依赖:
1 |
libraryDependencies ++= Seq( |
2 |
"org.apache.spark" % "spark-core_2.10" % "0.9.1", |
3 |
"org.apache.hbase" % "hbase" % "0.98.2-hadoop2", |
4 |
"org.apache.hbase" % "hbase-client" % "0.98.2-hadoop2", |
5 |
"org.apache.hbase" % "hbase-common" % "0.98.2-hadoop2", |
6 |
"org.apache.hbase" % "hbase-server" % "0.98.2-hadoop2" |
7 |
) |
在测试的时候,需要配置好Hbase、Hadoop环境,否则程序会出现问题,特别是让程序找到Hbase-site.xml配置文件
云帆大数据学院www.cloudyhadoop.com
详情请加入QQ群:374152400 ,咨询课程顾问!
关注云帆教育微信公众号yfteach,第一时间获取公开课信息。