Spark Kudu 结合

Kudu的背景

Hadoop中有很多组件,为了实现复杂的功能通常都是使用混合架构

  • Hbase:实现快速插入和修改,对大量的小规模查询也很迅速
  • HDFS/Parquet + Impala/Hive:对超大的数据集进行查询分析,对于这类场景, Parquet这种列式存储文件格式具有极大的优势。
  • HDFS/Parquet + Hbase:这种混合架构需要每隔一段时间将数据从hbase导出成Parquet文件,然后用impala来实现复杂的查询分析 
    以上的架构没办法把复杂的实时查询集成在Hbase上

Kudu的设计

  • Kudu是对HDFS和HBase功能上的补充,能提供快速的分析和实时计算能力,并且充分利用CPU和I/O资源,支持数据原地修改,支持简单的、可扩展 
    的数据模型。
  • Kudu的定位是提供”fast analytics on fast data”,kudu期望自己既能够满足分析的需求(快速的数据scan),也能够满足查询的需求(快速的随机访问)。它定位OLAP和少量的OLTP工作流,如果有大量的random accesses,官方建议还是使用HBase最为合适

Kudu的结构

其实跟Hbase是有点像的

Kudu的使用

1:支持主键(类似 关系型数据库) 
2:支持事务操作,可对数据增删改查数据 
3:支持各种数据类型 
4:支持 alter table。可删除列(非主键) 
5:支持 INSERT, UPDATE, DELETE, UPSERT 
6:支持Hash,Range分区 
进入Impala-shell -i node1ip 
具体的CURD语法可以查询官方文档,我就不一一列了 
http://kudu.apache.org/docs/kudu_impala_integration.html 
建表 
Create table kudu_table (Id string,Namestring,Age int, 
Primary key(id,name) 
)partition by hash partitions 16 
Stored as kudu; 
插入数据 
Insert into kudu_table 
Select * from impala_table; 
注意 
以上的sql语句都是在impala里面执行的。Kudu和hbase一样都是nosql查询的,Kudu本身只提供api。impala集成了kudu。 

Kudu Api

奉上我的Git地址: 
https://github.com/LinMingQiang/spark-util/tree/spark-kudu

Scala Api

pom.xml

<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-metastore</artifactId>
    <version>1.1.0</version>
</dependency>
<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-jdbc</artifactId>
    <version>1.1.0</version>
</dependency>
<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-service</artifactId>
    <version>1.1.0</version>
    <exclusions>
        <exclusion>
            <artifactId>servlet-api</artifactId>
            <groupId>javax.servlet</groupId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.apache.kudu</groupId>
    <artifactId>kudu-client</artifactId>
    <version>1.3.0</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.10</artifactId>
    <version>1.6.0</version>
</dependency>
<dependency>
    <groupId>org.kududb</groupId>
    <artifactId>kudu-spark_2.10</artifactId>
    <version>1.3.1</version>
</dependency>
<dependency>
    <groupId>org.apache.kudu</groupId>
    <artifactId>kudu-mapreduce</artifactId>
    <version>1.3.1</version>
    <exclusions>
        <exclusion>
            <artifactId>jsp-api</artifactId>
            <groupId>javax.servlet.jsp</groupId>
        </exclusion>
        <exclusion>
            <artifactId>servlet-api</artifactId>
            <groupId>javax.servlet</groupId>
        </exclusion>
    </exclusions>
        val client = new KuduClientBuilder("master2").build()
    val table = client.openTable("impala::default.kudu_pc_log")
    client.getTablesList.getTablesList.foreach { println }
    val schema = table.getSchema();
    val kp = KuduPredicate.newComparisonPredicate(schema.getColumn("id"), KuduPredicate.ComparisonOp.EQUAL, "1")
    val scanner = client.newScanTokenBuilder(table)
      .addPredicate(kp)
      .limit(100)
      .build()
    val token = scanner.get(0)
    val scan = KuduScanToken.deserializeIntoScanner(token.serialize(), client)
    while (scan.hasMoreRows()) {
      val results = scan.nextRows()
      while (results.hasNext()) {
        val rowresult = results.next();
        println(rowresult.getString("id"))
      }
    }

Spark Kudu Api

val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("Test"))
  val sparksql = new SQLContext(sc)
  import sparksql.implicits._
  val a = new KuduContext(kuduMaster, sc)
def getKuduRDD() {
    val tableName = "impala::default.kudu_pc_log"
    val columnProjection = Seq("id", "name")
    val kp = KuduPredicate.newComparisonPredicate(new ColumnSchemaBuilder("id", Type.STRING).build(), KuduPredicate.ComparisonOp.EQUAL, "q")
    val df = a.kuduRDD(sc, tableName, columnProjection,Array(kp))
    df.foreach { x => println(x.mkString(",")) }
  }
 def writetoKudu() {
    val tableName = "impala::default.student"
    val rdd = sc.parallelize(Array("k", "b", "a")).map { n => STU(n.hashCode, n) }
    val data = rdd.toDF()
    a.insertRows(data, tableName)
  }
  case class STU(id: Int, name: String)

小结

    • Kudu简单来说就是加强版的Hbase,除了像hbase一样可以高效的单条数据查询,他的表结构是类型关系型数据库的。集合impala可以达到复杂sql的实时查询。适合做OLAP(官方也是这么定位的)
    • Kudu本质上是将性能的优化,寄托在以列式存储为核心的基础上,希望通过提高存储效率,加快字段投影过滤效率,降低查询时CPU开销等来提升性能。而其他绝大多数设计,都是为了解决在列式存储的基础上支持随机读写这样一个目的而存在的。比如类Sql的元数据结构,是提高列式存储效率的一个辅助手段,唯一主键的设定也是配合列式存储引入的定制策略,至于其他如Delta存储,compaction策略等都是在这个设定下为了支持随机读写,降低latency不确定性等引入的一些Tradeoff方案。 
      官方测试结果上,如果是存粹的随机读写,或者单行的检索请求这类场景,由于这些Tradeoff的存在,HBASE的性能吞吐率是要优于Kudu不少的(2倍到4倍),kudu的优势还是在支持类SQL检索这样经常需要进行投影操作的批量顺序检索分析场合。目前kudu还处在Incubator阶段,并且还没有成熟的线上应用(小米走在了前面,做了一些业务应用的尝试),在数据安全,备份,系统健壮性等方面也还要打个问号,所以是否使用kudu,什么场合,什么时间点使用,是个需要好好考量的问题 ;)
时间: 2024-10-09 23:37:52

Spark Kudu 结合的相关文章

列式存储数据库-kudu

一.kudu概念 Apache Kudu是由Cloudera开源的存储引擎,可以同时提供低延迟的随机读写和高效的数据分析能力.Kudu支持水平扩展,使用Raft协议进行一致性保证,并且与Cloudera Impala和Apache Spark等当前流行的大数据查询和分析工具结合紧密. 这是一个为块数据的快分析而生的存储架构 二.kudu架构Master:master节点负责整个集群的元数据管理和服务协调.它承担着以下功能:作为catalog manager,master节点管理着集群中所有tab

使用Spark Streaming + Kudu + Impala构建一个预测引擎

随着用户使用天数的增加,不管你的业务是扩大还是缩减了,为什么你的大数据中心架构保持线性增长的趋势?很明显需要一个稳定的基本架构来保障你的业务线.当你的客户处在休眠期,或者你的业务处在淡季,你增加的计算资源就处在浪费阶段:相对应地,当你的业务在旺季期,或者每周一每个人对上周的数据进行查询分析,有多少次你忒想拥有额外的计算资源. 根据需求水平动态分配资源 VS 固定的资源分配方式,似乎不太好实现.幸运的是,借助于现今强大的开源技术,可以很轻松的实现你所愿.在这篇文章中,我将给出一个解决例子,基于流式

[原创]kudu vs parquet, impala vs spark Benchmark

测试环境 节点: 2 台主节点,6台计算节点 机器配置: 16个物理核 128G内存 12*3T磁盘 操作系统: redhat 7.2 版本: CDH 5.7.1-1.cdh5.7.1.p0.11 impala_kudu 2.7.0-1.cdh5.9.0.p0.23 kudu 0.9.1-1.kudu0.9.1.p0.32 spark 2.0.0 对照组: Spark on Parquet Impala on Parquet Impala on Kudu 测试数据.语句.场景 TPC-DS,是用

spark/java连接 kudu incompatible RPC 异常解决

如果是新搭的CDH 集群并使用默认的kudu设置, 使用spark /java 连接kudu时候,一般都会碰到此类异常. 程序: val masteraddr = "kudumasterip:7051" //创建kudu的数据库链接 val client = new KuduClient.KuduClientBuilder(masteraddr).build //打开表 val table = client.openTable("impala::testdb.PERSON&q

Hadoop/Spark生态圈里的新气象

令人惊讶的是,Hadoop在短短一年的时间里被重新定义.让我们看看这个火爆生态圈的所有主要部分,以及它们各自具有的意义. 对于Hadoop你需要了解的最重要的事情就是 ,它不再是原来的Hadoop. 这边厢,Cloudera有时换掉HDFS改用Kudu,同时宣布Spark是其圈子的核心(因而一概取代发现的MapReduce);那边厢,Hortonworks加入了Spark阵营.在Cloudera和Hortonworks之间,"Hadoop"集群中唯一可以确信的项目就是 YARN.但是D

kudu论文阅读

#????????????Kudu:Storage?for?Fast?Analytics?on?Fast?Data##?Abstract??kudu?是一个用于存储结构化数据的开源存储引擎,它支持低延迟的随机访问及高效的分析访问模式.kudu采用水平分区的方式对数据分布到集群中,使用Raft一致性协议复制每个分区,提供较低的平均恢复时间and?low?tail?latencies.?kudu在hadoop生态圈的背景下所设计的,它支持通过Cloudera?Impala,Apache?Spark,

Apache Kudu

Apache Kudu是由Cloudera开源的存储引擎,可以同时提供低延迟的随机读写和高效的数据分析能力.Kudu支持水平扩展,使用Raft协议进行一致性保证,并且与Cloudera Impala和Apache Spark等当前流行的大数据查询和分析工具结 合紧密.本文将为您介绍Kudu的一些基本概念和架构以及在企业中的应用,使您对Kudu有一个较为全面的了解. 比较有意思的是,同为Cloudera公司开源的另一款产品Impala,是另一种非洲的羚羊,叫做“黑斑羚”,也叫“高角羚”.不知道Cl

[原创]Kudu:支持快速分析的新型Hadoop存储系统

Kudu是Cloudera开源的新型列式存储系统,是Apache Hadoop生态圈的新成员之一(incubating),专门为了对快速变化的数据进行快速的分析,填补了以往Hadoop存储层的空缺.本文主要对Kudu的动机.背景,以及架构进行简单介绍. 背景——功能上的空白 Hadoop生态系统有很多组件,每一个组件有不同的功能.在现实场景中,用户往往需要同时部署很多Hadoop工具来解决同一个问题,这种架构称为混合架构 (hybrid architecture).比如,用户需要利用Hbase的

通过Spark Streaming处理交易数据

Apache Spark 是加州大学伯克利分校的 AMPLabs 开发的开源分布式轻量级通用计算框架. 由于 Spark 基于内存设计,使得它拥有比 Hadoop 更高的性能(极端情况下可以达到 100x),并且对多语言(Scala.Java.Python)提供支持. 其一栈式设计特点使得我们的学习和维护成本大大地减少,而且其提供了很好的容错解决方案 业务场景 我们每天都有来自全国各地的天然气购气数据,并根据用户的充气,退气,核销等实时计算分析的是用户订单数数据,由于数据量比较大,单台机器处理已