用Spark查询HBase中的表数据

java代码如下:

package db.query;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.util.Base64;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.VoidFunction;

import db.insert.HBaseDBDao;
import scala.Tuple2;

import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;

public class HBaseSparkQuery implements Serializable {

    private static final long serialVersionUID = 1L;

    public Log log = LogFactory.getLog(HBaseSparkQuery.class);

    /**
     * 将scan编码,该方法copy自 org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil
     *
     * @param scan
     * @return
     * @throws IOException
     */
    static String convertScanToString(Scan scan) throws IOException {
        ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
        return Base64.encodeBytes(proto.toByteArray());
    }

    public void start() {
        //初始化sparkContext,
        SparkConf sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local[2]");
        JavaSparkContext sc = new JavaSparkContext(sparkConf);

        //使用HBaseConfiguration.create()生成Configuration
        // 必须在项目classpath下放上hadoop以及hbase的配置文件。
        Configuration conf = HBaseConfiguration.create();
        //设置查询条件,这里值返回用户的等级
        Scan scan = new Scan();
//        scan.setStartRow(Bytes.toBytes("57348556169_0000000000000"));
//        scan.setStopRow(Bytes.toBytes("57348556169_9999999999999"));
        scan.addFamily(Bytes.toBytes("cids"));
        scan.addFamily(Bytes.toBytes("times"));
        scan.addFamily(Bytes.toBytes("pcis"));
        scan.addFamily(Bytes.toBytes("angle"));
        scan.addFamily(Bytes.toBytes("tas"));
        scan.addFamily(Bytes.toBytes("gis"));
        scan.addColumn(Bytes.toBytes("cids"), Bytes.toBytes("cid"));
        scan.addColumn(Bytes.toBytes("times"), Bytes.toBytes("time"));
        scan.addColumn(Bytes.toBytes("pcis"), Bytes.toBytes("pci"));
        scan.addColumn(Bytes.toBytes("angle"), Bytes.toBytes("st"));
        scan.addColumn(Bytes.toBytes("angle"), Bytes.toBytes("ed"));
        scan.addColumn(Bytes.toBytes("tas"), Bytes.toBytes("ta"));
        scan.addColumn(Bytes.toBytes("gis"), Bytes.toBytes("lat"));
        scan.addColumn(Bytes.toBytes("gis"), Bytes.toBytes("lng"));
        try {
            //需要读取的hbase表名
            String tableName = "mapCar";
            conf.set(TableInputFormat.INPUT_TABLE, tableName);
            conf.set(TableInputFormat.SCAN, convertScanToString(scan));

            //获得hbase查询结果Result
            JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD = sc.newAPIHadoopRDD(conf,
                    TableInputFormat.class, ImmutableBytesWritable.class,
                    Result.class);

            //从result中取出用户年龄
            JavaRDD<String> cars = hBaseRDD.flatMap(new FlatMapFunction<Tuple2<ImmutableBytesWritable, Result>, String>(){
                private static final long serialVersionUID = 1L;

                @Override
                public Iterable<String> call(
                        Tuple2<ImmutableBytesWritable, Result> t)
                        throws Exception {
                    String cid = Bytes.toString(t._2.getValue(Bytes.toBytes("cids"), Bytes.toBytes("cid")));
                    String time = Bytes.toString(t._2.getValue(Bytes.toBytes("times"), Bytes.toBytes("time")));
                    String pci = Bytes.toString(t._2.getValue(Bytes.toBytes("pcis"), Bytes.toBytes("pci")));
                    String st = Bytes.toString(t._2.getValue(Bytes.toBytes("angle"), Bytes.toBytes("st")));
                    String ed = Bytes.toString(t._2.getValue(Bytes.toBytes("angle"), Bytes.toBytes("ed")));
                    String ta = Bytes.toString(t._2.getValue(Bytes.toBytes("tas"), Bytes.toBytes("ta")));
                    String lat = Bytes.toString(t._2.getValue(Bytes.toBytes("gis"), Bytes.toBytes("lat")));
                    String lng = Bytes.toString(t._2.getValue(Bytes.toBytes("gis"), Bytes.toBytes("lng")));
                    return Arrays.asList("cid : "+cid+", time: "+time+", pci: "+pci+", st: "+st+", ed: "+ed+", ta: "+ta+", lat: "+lat+", lon: "+lng);
//                    return Arrays.asList(cid);
                }

            });
//            JavaRDD<String>car = cars.distinct();
            //打印出最终结果
            cars.foreach(new VoidFunction<String>(){
                private static final long serialVersionUID = 1L;

                @Override
                public void call(String s) throws Exception {
                    System.out.println(s);
                }
            });
;

//            //打印出最终结果
//            List<String> output = car.collect();
//            for (String s : output) {
//                System.out.println(s);
//            }

        } catch (Exception e) {
            log.warn(e);
        }

    }

    /**
     * spark如果计算没写在main里面,实现的类必须继承Serializable接口,<br>
     * </>否则会报 Task not serializable: java.io.NotSerializableException 异常
     */
    public static void main(String[] args) throws InterruptedException {
        new HBaseSparkQuery().start();
    }
}

所用jar包如下:

时间: 2024-08-06 07:58:29

用Spark查询HBase中的表数据的相关文章

Spark读取Hbase中的数据_云帆大数据分享

Spark读取Hbase中的数据 大家可能都知道很熟悉Spark的两种常见的数据读取方式(存放到RDD中):(1).调用parallelize函数直接从集合中获取数据,并存入RDD中:Java版本如下: 1 JavaRDD<Integer> myRDD = sc.parallelize(Arrays.asList(1,2,3)); Scala版本如下: 1 val myRDD= sc.parallelize(List(1,2,3)) 这种方式很简单,很容易就可以将一个集合中的数据变成RDD的初

Hive整合HBase——通过Hive读/写 HBase中的表

写在前面一: 本文将Hive与HBase整合在一起,使Hive可以读取HBase中的数据,让Hadoop生态系统中最为常用的两大框架互相结合,相得益彰. 写在前面二: 使用软件说明 约定所有软件的存放目录: /home/yujianxin 一.Hive整合HBase原理 Hive与HBase整合的实现是利用两者本身对外的API接口互相进行通信,相互通信主要是依靠hive-hbase-handler-0.9.0.jar工具类,如下图 Hive与HBase通信示意图 二.具体步骤 安装前说明 1.关

清空SQL Server数据库中所有表数据的方法

其实删除数据库中数据的方法并不复杂,为什么我还要多此一举呢,一是我这里介绍的是删除数据库的所有数据,因为数据之间可能形成相互约束关系,删除操作可能陷入死循环,二是这里使用了微软未正式公开的sp_MSForEachTable存储过程. 也许很多读者朋友都经历过这样的事情:要在开发数据库基础上清理一个空库,但由于对数据库结构缺乏整体了解,在删除一个表的记录时,删除不了,因为可能有外键约束,一个常见的数据库结构是一个主表,一个子表,这种情况下一般都得先删除子表记录,再删除主表记录. 说道删除数据记录,

MySql 查询数据库中所有表名

查询数据库中所有表名select table_name from information_schema.tables where table_schema='csdb' and table_type='base table'; 查询指定数据库中指定表的所有字段名column_nameselect column_name from information_schema.columns where table_schema='csdb' and table_name='users'

查询数据库中所有表的记录数,所占空间,索引使用空间

常用 --查询数据库中所有表的记录数,所占空间,索引使用空间 exec sp_MSForEachTable @precommand=N'create table ##(表名 sysname,记录数 int,保留空间 Nvarchar(20),使用空间 varchar(20),索引使用空间 varchar(20),未用空间 varchar(20))', @command1=N'insert ## exec sp_spaceused ''?''', @postcommand=N'select * f

MySql 查询数据库中所有表

查询数据库中所有表名select table_name from information_schema.tables where table_schema='csdb' and table_type='base table'; 查询指定数据库中指定表的所有字段名column_nameselect column_name from information_schema.columns where table_schema='csdb' and table_name='users' 原文地址:htt

使用bulkload向hbase中批量写入数据

1.数据样式 写入之前,需要整理以下数据的格式,之后将数据保存到hdfs中,本例使用的样式如下(用tab分开): row1 N row2 M row3 B row4 V row5 N row6 M row7 B 2.代码 假设要将以上样式的数据写入到hbase中,列族为cf,列名为colb,可以使用下面的代码(参考) 1 package com.testdata; 2 3 import java.io.IOException; 4 import org.apache.hadoop.conf.Co

Sql Server 导入还有一个数据库中的表数据

在涉及到SQL Server编程或是管理时一定会用到数据的导入与导出, 导入导出的方法有多种,此处以SQL Server导入表数据为例.阐述一下: 1.打开SQL Server Management Studio.在对象资源管理器中,展开"SQL Server 实例"→"数据库"→"你须要导入数据的数据库"节点.单击鼠标右键,选择"任务"→"导出数据"命令. 出现导入和导出向导的欢迎界面,单击"下

Sql Server 导入另一个数据库中的表数据

在涉及到SQL Server编程或是管理时一定会用到数据的导入与导出, 导入导出的方法有多种,此处以SQL Server导入表数据为例,阐述一下: 1.打开SQL Server Management Studio,在对象资源管理器中,展开"SQL Server 实例"→"数据库"→"你需要导入数据的数据库"节点,单击鼠标右键,选择"任务"→"导出数据"命令.出现导入和导出向导的欢迎界面,单击"下一