Spark读取Hbase中的数据_云帆大数据分享

Spark读取Hbase中的数据

 大家可能都知道很熟悉Spark的两种常见的数据读取方式(存放到RDD中):(1)、调用parallelize函数直接从集合中获取数据,并存入RDD中;Java版本如下:


1


JavaRDD<Integer> myRDD = sc.parallelize(Arrays.asList(1,2,3));

Scala版本如下:


1


val myRDD= sc.parallelize(List(1,2,3))

  这种方式很简单,很容易就可以将一个集合中的数据变成RDD的初始化值;更常见的是(2)、从文本中读取数据到RDD中,这个文本可以是纯文本文件、可以是sequence文件;可以存放在本地(file://)、可以存放在HDFS(hdfs://)上,还可以存放在S3上。其实对文件来说,Spark支持Hadoop所支持的所有文件类型和文件存放位置。Java版如下:


01


/////////////////////////////////////////////////////////////////////


05


http://www.yfteach.com


06


07


云帆大数据博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货


08


微信公共帐号:yfteach


09


/////////////////////////////////////////////////////////////////////


10


import org.apache.spark.SparkConf;


11


import org.apache.spark.api.java.JavaRDD;


12


import org.apache.spark.api.java.JavaSparkContext;


13


14


SparkConf conf = new SparkConf().setAppName("Simple Application");


15


JavaSparkContext sc = new JavaSparkContext(conf);


16


sc.addFile("wyp.data");


17


JavaRDD<String> lines = sc.textFile(SparkFiles.get("wyp.data"));

Scala版本如下:


1


import org.apache.spark.SparkContext


2


import org.apache.spark.SparkConf


3


4


val conf = new SparkConf().setAppName("Simple Application")


5


val sc = new SparkContext(conf)


6


sc.addFile("spam.data")


7


val inFile = sc.textFile(SparkFiles.get("spam.data"))

  在实际情况下,我们需要的数据可能不是简单的存放在HDFS文本中,我们需要的数据可能就存放在Hbase中,那么我们如何用Spark来读取Hbase中的数据呢?本文的所有测试是基于Hadoop 2.2.0、Hbase 0.98.2、Spark 0.9.1,不同版本可能代码的编写有点不同。本文只是简单地用Spark来读取Hbase中的数据,如果需要对Hbase进行更强的操作,本文可能不能帮你。话不多说,Spark操作Hbase的核心的Java版本代码如下:


01


import org.apache.hadoop.conf.Configuration;


02


import org.apache.hadoop.hbase.HBaseConfiguration;


03


import org.apache.hadoop.hbase.client.Result;


04


import org.apache.hadoop.hbase.client.Scan;


05


import org.apache.hadoop.hbase.io.ImmutableBytesWritable;


06


import org.apache.hadoop.hbase.mapreduce.TableInputFormat;


07


import org.apache.hadoop.hbase.protobuf.ProtobufUtil;


08


import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;


09


import org.apache.hadoop.hbase.util.Base64;


10


import org.apache.hadoop.hbase.util.Bytes;


11


import org.apache.spark.api.java.JavaPairRDD;


12


import org.apache.spark.api.java.JavaSparkContext;


13


31


23


24


JavaSparkContext sc = new JavaSparkContext(master, "hbaseTest",


25


System.getenv("SPARK_HOME"), System.getenv("JARS"));


26


27


Configuration conf = HBaseConfiguration.create();


28


Scan scan = new Scan();


29


scan.addFamily(Bytes.toBytes("cf"));


30


scan.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("airName"));


32


try {


33


String tableName = "flight_wap_order_log";


34


conf.set(TableInputFormat.INPUT_TABLE, tableName);


35


ClientProtos.Scan proto = ProtobufUtil.toScan(scan);


36


String ScanToString = Base64.encodeBytes(proto.toByteArray());


37


conf.set(TableInputFormat.SCAN, ScanToString);


38


39


JavaPairRDD<ImmutableBytesWritable, Result> myRDD =


40


sc.newAPIHadoopRDD(conf,  TableInputFormat.class,


41


ImmutableBytesWritable.class, Result.class);


42


43


catch (Exception e) {


44


e.printStackTrace();


45


}

这样本段代码段是从Hbase表名为flight_wap_order_log的数据库中读取cf列簇上的airName一列的数据,这样我们就可以对myRDD进行相应的操作:


1


System.out.println(myRDD.count());

本段代码需要在pom.xml文件加入以下依赖:


01


<dependency>


02


<groupId>org.apache.spark</groupId>


03


<artifactId>spark-core_2.10</artifactId>


04


<version>0.9.1</version>


05


</dependency>


06


07


<dependency>


08


<groupId>org.apache.hbase</groupId>


09


<artifactId>hbase</artifactId>


10


<version>0.98.2-hadoop2</version>


11


</dependency>


12


13


<dependency>


14


<groupId>org.apache.hbase</groupId>


15


<artifactId>hbase-client</artifactId>


16


<version>0.98.2-hadoop2</version>


17


</dependency>


18


19


<dependency>


20


<groupId>org.apache.hbase</groupId>


21


<artifactId>hbase-common</artifactId>


22


<version>0.98.2-hadoop2</version>


23


</dependency>


24


25


<dependency>


26


<groupId>org.apache.hbase</groupId>


27


<artifactId>hbase-server</artifactId>


28


<version>0.98.2-hadoop2</version>


29


</dependency>

Scala版如下:


01


import org.apache.spark._


02


import org.apache.spark.rdd.NewHadoopRDD


03


import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}


04


import org.apache.hadoop.hbase.client.HBaseAdmin


05


import org.apache.hadoop.hbase.mapreduce.TableInputFormat


06


22


val conf = HBaseConfiguration.create()


15


/////////////////////////////////////////////////////////////////////


16


17


object HBaseTest {


18


def main(args: Array[String]) {


19


val sc = new SparkContext(args(0), "HBaseTest",


20


System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))


21


23


conf.set(TableInputFormat.INPUT_TABLE, args(1))


24


25


val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],


26


classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],


27


classOf[org.apache.hadoop.hbase.client.Result])


28


29


hBaseRDD.count()


30


31


System.exit(0)


32


}


33


}

我们需要在加入如下依赖:


1


libraryDependencies ++= Seq(


2


"org.apache.spark" % "spark-core_2.10" % "0.9.1",


3


"org.apache.hbase" % "hbase" % "0.98.2-hadoop2",


4


"org.apache.hbase" % "hbase-client" % "0.98.2-hadoop2",


5


"org.apache.hbase" % "hbase-common" % "0.98.2-hadoop2",


6


"org.apache.hbase" % "hbase-server" % "0.98.2-hadoop2"


7


)

  在测试的时候,需要配置好Hbase、Hadoop环境,否则程序会出现问题,特别是让程序找到Hbase-site.xml配置文件

云帆大数据学院www.cloudyhadoop.com

详情请加入QQ群:374152400 ,咨询课程顾问!

关注云帆教育微信公众号yfteach,第一时间获取公开课信息。

时间: 2024-12-08 01:40:48

Spark读取Hbase中的数据_云帆大数据分享的相关文章

企业级Hadoop 2.x入门系列之一Apache Hadoop 2.x简介与版本_云帆大数据学院

1.1 Hadoop简介 从Hadoop官网获得Hadoop的介绍:http://hadoop.apache.org/ (1)What Is Apache Hadoop? TheApache Hadoop project develops open-source software for reliable, scalable, distributed computing. TheApache Hadoop software library is a framework that allows f

Spark 读取 Hbase 优化 --手动划分 region 提高并行数

一. Hbase 的 region 我们先简单介绍下 Hbase 的 架构和 region : 从物理集群的角度看,Hbase 集群中,由一个 Hmaster 管理多个 HRegionServer,其中每个 HRegionServer 都对应一台物理机器,一台 HRegionServer 服务器上又可以有多个 Hregion(以下简称 region).要读取一个数据的时候,首先要先找到存放这个数据的 region.而 Spark 在读取 Hbase 的时候,读取的 Rdd 会根据 Hbase 的

C#中几种数据库的大数据批量插入

C#语言中对SqlServer.Oracle.SQLite和MySql中的数据批量插入是支持的,不过Oracle需要使用Orace.DataAccess驱动. IProvider里有一个用于实现批量插入的插件服务接口IBatcherProvider.批量插入的实现可以通过实现该接口来实现. /// <summary> /// 提供数据批量处理的方法. /// </summary> public interface IBatcherProvider : IProviderServic

分类推荐&amp;通俗易懂 :数据科学与大数据技术专业领域的实用工具

数据科学与大数据技术是一门偏向应用的学科领域,因此工具就成为重要的组成部分.在工作中,数据科学家如果选择有效的工具会带来事半功倍的效果.一般来说,数据科学家应该具有操作数据库.数据处理和数据可视化等相关技能,还有很多人还认为计算机技能也是不可或缺的,可以提高数据科学家工作的效率. 在这里相信有许多想要学习大数据的同学,大家可以+下大数据学习裙:957205962,即可免费领取套系统的大数据学习教程 开源社区多年来对数据科学工具包开发有着巨大贡献,这也让数据科学领域得以不断进步.这里我们收集了一些

网络天然是大数据的,大数据天然是网络的

The network is naturally bigdataing, while bigdata is inherently networking. [email protected] 用英文表达似乎更加准确一些. 计算机科学发展了半个世纪,而网络的出现极大推动了计算机相关技术的爆发式进步. 计算机或网络领域所研究的典型问题,往往都是追求高性能.精确.准确,而大数据技术则往往提供一些统筹.模糊的结论. 一方面,网络中产生了海量的数据,无法被传统技术处理而白白浪费:反过来,要实现大数据处理的平

Spark 读取 HBase 数据

1.pom.xml 版本号 <properties> <hbase.version>2.2.2</hbase.version> <hadoop.version>2.10.0</hadoop.version> <spark.version>2.4.2</spark.version> </properties> 依赖包 <dependencies> <dependency> <grou

spark读取hbase数据,如果表存在则不做任何操作,如果表不存在则新建表。

import java.text.SimpleDateFormat import java.util.Date import org.apache.hadoop.hbase.{HColumnDescriptor, HTableDescriptor, HBaseConfiguration} import org.apache.hadoop.hbase.client.{HBaseAdmin, Put, Result, Scan} import org.apache.hadoop.hbase.io.I

spark读取hbase(NewHadoopAPI 例子)

package cn.piesat.controller import java.text.{DecimalFormat, SimpleDateFormat}import java.utilimport java.util.concurrent.{CountDownLatch, Executors, Future} import ba.common.log.enums.{LogLevel, LogType}import ba.common.log.utils.LogUtilimport cn.p

Spark项目之电商用户行为分析大数据平台之(十)IDEA项目搭建及工具类介绍

一.创建Maven项目 创建项目,名称为LogAnalysis 二.常用工具类 2.1 配置管理组建 ConfigurationManager.java 1 import java.io.InputStream; 2 import java.util.Properties; 3 4 /** 5 * 配置管理组件 6 * 7 * 1.配置管理组件可以复杂,也可以很简单,对于简单的配置管理组件来说,只要开发一个类,可以在第一次访问它的 8 * 时候,就从对应的properties文件中,读取配置项,