Spark 读取Hbase表数据并实现类似groupByKey操作

一、概述 
程序运行环境很重要,本次测试基于: 
hadoop-2.6.5 
spark-1.6.2 
hbase-1.2.4 
zookeeper-3.4.6 
jdk-1.8 
废话不多说了,直接上需求

Andy column=baseINFO:age,  value=21

Andy column=baseINFO:gender,  value=0

Andy column=baseINFO:telphone_number, value=110110110

Tom  column=baseINFO:age, value=18

Tom  column=baseINFO:gender, value=1

Tom  column=baseINFO:telphone_number, value=120120120

如上表所示,将之用spark进行分组,达到这样的效果:

[Andy,(21,0,110110110)] 
[Tom,(18,1,120120120)] 
需求比较简单,主要是熟悉一下程序运行过程

二、具体代码

package com.union.bigdata.spark.hbase;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.mapreduce.TableSplit;import org.apache.hadoop.hbase.util.Base64;import org.apache.hadoop.hbase.util.Bytes;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.SparkConf;import org.apache.spark.api.java.function.Function;import org.apache.spark.api.java.function.Function2;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.client.Scan;import org.apache.hadoop.hbase.client.Result;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.TableInputFormat;import org.apache.spark.api.java.JavaPairRDD;import org.apache.hadoop.hbase.protobuf.ProtobufUtil;import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;import org.apache.spark.api.java.function.PairFunction;import scala.Tuple10;import scala.Tuple2;import java.io.IOException;import java.util.ArrayList;import java.util.List;public class ReadHbase {

    private static String appName = "ReadTable";

    public static void main(String[] args) {
        SparkConf sparkConf = new SparkConf();

    //we can also run it at local:"local[3]"  the number 3 means 3 threads
        sparkConf.setMaster("spark://master:7077").setAppName(appName);
        JavaSparkContext jsc = new JavaSparkContext(sparkConf);

        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "master"); 
        conf.set("hbase.zookeeper.property.clientPort", "2181"); 
        Scan scan = new Scan();
        scan.addFamily(Bytes.toBytes("baseINFO"));
        scan.addColumn(Bytes.toBytes("baseINFO"), Bytes.toBytes("telphone_number"));
        scan.addColumn(Bytes.toBytes("baseINFO"), Bytes.toBytes("age"));
        scan.addColumn(Bytes.toBytes("baseINFO"), Bytes.toBytes("gender"));

        String scanToString = "";
        try {
            ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
            scanToString = Base64.encodeBytes(proto.toByteArray());
        } catch (IOException io) {
            System.out.println(io);
        }

        for (int i = 0; i < 2; i++) {
            try {
                String tableName = "VIPUSER";
                conf.set(TableInputFormat.INPUT_TABLE, tableName);
                conf.set(TableInputFormat.SCAN, scanToString);

                //get the Result of query from the Table of Hbase
                JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD = jsc.newAPIHadoopRDD(conf,
                        TableInputFormat.class, ImmutableBytesWritable.class,
                        Result.class);

                //group by row key like : [(Andy,110,21,0),(Tom,120,18,1)]
                JavaPairRDD<String, List<Integer>> art_scores = hBaseRDD.mapToPair(
                        new PairFunction<Tuple2<ImmutableBytesWritable, Result>, String, List<Integer>>() {
                            @Override
                            public Tuple2<String, List<Integer>> call(Tuple2<ImmutableBytesWritable, Result> results) {

                                List<Integer> list = new ArrayList<Integer>();

                                byte[] telphone_number = results._2().getValue(Bytes.toBytes("baseINFO"), Bytes.toBytes("telphone_number"));
                                byte[] age = results._2().getValue(Bytes.toBytes("baseINFO"), Bytes.toBytes("age"));
                                byte[] gender = results._2().getValue(Bytes.toBytes("baseINFO"), Bytes.toBytes("gender"));

                //the type of storage at Hbase is Byte Array,so we must let it be normal like Int,String and so on 
                                list.add(Integer.parseInt(Bytes.toString(telphone_number)));
                                list.add(Integer.parseInt(Bytes.toString(age)));
                                list.add(Integer.parseInt(Bytes.toString(gender)));

                                return new Tuple2<String, List<Integer>>(Bytes.toString(results._1().get()), list);
                            }
                        }
                );

                //switch to Cartesian product 
                JavaPairRDD<Tuple2<String, List<Integer>>, Tuple2<String, List<Integer>>> cart = art_scores.cartesian(art_scores);

                //use Row Key to delete the repetition from the last step "Cartesian product"  
                JavaPairRDD<Tuple2<String, List<Integer>>, Tuple2<String, List<Integer>>> cart2 = cart.filter(
                        new Function<Tuple2<Tuple2<String, List<Integer>>, Tuple2<String, List<Integer>>>, Boolean>() {
                            public Boolean call(Tuple2<Tuple2<String, List<Integer>>, Tuple2<String, List<Integer>>> tuple2Tuple2Tuple2) throws Exception {

                                return tuple2Tuple2Tuple2._1()._1().compareTo(tuple2Tuple2Tuple2._2()._1()) < 0;
                            }
                        }
                );

                System.out.println("Create the List ‘collect‘...");

        //get the result we need
                 List<Tuple2<Tuple2<String, List<Integer>>, Tuple2<String, List<Integer>>>> collect = cart2.collect();
                 System.out.println("Done..");
                 System.out.println(collect.size() > i ? collect.get(i):"STOP");

                 if (collect.size() > i ) break;
            } catch (Exception e) {
                System.out.println(e);
            }
        }
    }
}

三、程序运行过程分析 
1、spark自检以及Driver和excutor的启动过程 
实例化一个SparkContext(若在spark2.x下,这里初始化的是一个SparkSession对象),这时候启动SecurityManager线程去检查用户权限,OK之后创建sparkDriver线程,spark底层远程通信模块(akka框架实现)启动并监听sparkDriver,之后由sparkEnv对象来注册BlockManagerMaster线程,由它的实现类对象去监测运行资源 
2、zookeeper与Hbase的自检和启动 
第一步顺利完成之后由sparkContext对象去实例去启动程序访问Hbase的入口,触发之后zookeeper完成自己的一系列自检活动,包括用户权限、操作系统、数据目录等,一切OK之后初始化客户端连接对象,之后由Hbase的ClientCnxn对象来建立与master的完整连接 
3、spark job 的运行 
程序开始调用spark的action类方法,比如这里调用了collect,会触发job的执行,这个流程网上资料很详细,无非就是DAGScheduler搞的一大堆事情,连带着出现一大堆线程,比如TaskSetManager、TaskScheduler等等,最后完成job,返回结果集 
4、结束程序 
正确返回结果集之后,sparkContext利用反射调用stop()方法,这之后也会触发一系列的stop操作,主要线程有这些:BlockManager,ShutdownHookManager,后面还有释放actor的操作等等,最后一切结束,临时数据和目录会被删除,资源会被释放

时间: 2024-10-22 09:03:54

Spark 读取Hbase表数据并实现类似groupByKey操作的相关文章

Spark读取Hbase的数据

val conf = HBaseConfiguration.create() conf.addResource(new Path("/opt/cloudera/parcels/CDH-5.4.4-1.cdh5.4.4.p0.4/lib/hbase/conf/hbase-site.xml")) conf.addResource(new Path("/opt/cloudera/parcels/CDH-5.4.4-1.cdh5.4.4.p0.4/lib/hadoop/etc/had

Spark读取Hbase中的数据_云帆大数据分享

Spark读取Hbase中的数据 大家可能都知道很熟悉Spark的两种常见的数据读取方式(存放到RDD中):(1).调用parallelize函数直接从集合中获取数据,并存入RDD中:Java版本如下: 1 JavaRDD<Integer> myRDD = sc.parallelize(Arrays.asList(1,2,3)); Scala版本如下: 1 val myRDD= sc.parallelize(List(1,2,3)) 这种方式很简单,很容易就可以将一个集合中的数据变成RDD的初

数据分页处理系列之二:HBase表数据分页处理

  HBase是Hadoop大数据生态技术圈中的一项关键技术,是一种用于分布式存储大数据的列式数据库,关于HBase更加详细的介绍和技术细节,朋友们可以在网络上进行搜寻,笔者本人在接下来的日子里也会写一个HBase方面的技术专题,有兴趣的朋友们可以稍微的期待一下.不过本章节的重点是介绍下HBase表数据的分页处理,其他的就不多说了. 首先说一下表数据分页中不可回避的一个指标:总记录数.在关系数据库中很容易统计出记录总数,但在HBase中,这却是一个大难题,至少在目前,朋友们根本不要奢望能够通过类

HBase表数据分页处理

HBase表数据分页处理 HBase是Hadoop大数据生态技术圈中的一项关键技术,是一种用于分布式存储大数据的列式数据库,关于HBase更加详细的介绍和技术细节,朋友们可以在网络上进行搜寻,笔者本人在接下来的日子里也会写一个HBase方面的技术专题,有兴趣的朋友们可以稍微的期待一下.不过本章节的重点是介绍下HBase表数据的分页处理,其他的就不多说了. 首先说一下表数据分页中不可回避的一个指标:总记录数.在关系数据库中很容易统计出记录总数,但在HBase中,这却是一个大难题,至少在目前,朋友们

Spark 读取 Hbase 优化 --手动划分 region 提高并行数

一. Hbase 的 region 我们先简单介绍下 Hbase 的 架构和 region : 从物理集群的角度看,Hbase 集群中,由一个 Hmaster 管理多个 HRegionServer,其中每个 HRegionServer 都对应一台物理机器,一台 HRegionServer 服务器上又可以有多个 Hregion(以下简称 region).要读取一个数据的时候,首先要先找到存放这个数据的 region.而 Spark 在读取 Hbase 的时候,读取的 Rdd 会根据 Hbase 的

HBase表数据导出和导入

本文不是技术收集贴,就是记录一下,因此没有收集所有的HBase表导入导出方式,只是记录一下自己用过的一种. 数据表的导出: 1 $ bin/hbase org.apache.hadoop.hbase.mapreduce.Driver export <tablename> <outputdir> [<versions> [<starttime> [<endtime>]]] 需要注意的是,outputdir指的是HDFS上的路径,建议使用绝对路径(没

ACCESS(VBA)上的一个小项目 —— 2、读取ACCESS表数据到TreeView和ListView

有人问我能不能做一个程序的时候,我第一反应都说“能”. --这次在ACCESS中,借助TreeView和ListView做了一个数据联动的模型. 简析过程: 1)从网上找了一份TreeView学习教程<三小时学会树控件>,了解了TreeView的建立以及节点的使用方法: 2)把数据表中的某列按一定规则生成的数据再按一种规则解析成TreeView中的树结构(VPPS): 3)通过遍历TreeView中节点的折叠状态,通过上述一定规则生成的数据(唯一性)把数据表中的内容读取到ListView中:

spark读取hbase数据,如果表存在则不做任何操作,如果表不存在则新建表。

import java.text.SimpleDateFormat import java.util.Date import org.apache.hadoop.hbase.{HColumnDescriptor, HTableDescriptor, HBaseConfiguration} import org.apache.hadoop.hbase.client.{HBaseAdmin, Put, Result, Scan} import org.apache.hadoop.hbase.io.I

Spark 读取 HBase 数据

1.pom.xml 版本号 <properties> <hbase.version>2.2.2</hbase.version> <hadoop.version>2.10.0</hadoop.version> <spark.version>2.4.2</spark.version> </properties> 依赖包 <dependencies> <dependency> <grou