kudu系列: Java API使用和效率测试

Kudu+Impala很适合数据分析, 但直接使用Insert values语句往Kudu表插入数据, 效率实在不好, 测试下来insert的速度仅为80笔/秒. 原因也是显然的, Kudu本身写入效率很高, 但是Impala并没有做这方面优化, 观察下来每次Impala语句执行的overhead都太大了, 导致频繁小批次写入效率非常差, Kudu官方推荐使用Java API或Python API完成数据写入工作. 下面是使用Java API的测试用例, 也可以看出Kudu API的大致用法.

=========================
准备测试Table
=========================

-- kudu table
CREATE TABLE kudu_testdb.tmp_test_perf
(
id string ENCODING PLAIN_ENCODING COMPRESSION SNAPPY,
name    string ENCODING DICT_ENCODING COMPRESSION SNAPPY,
PRIMARY KEY (id)
)
PARTITION BY HASH (id) PARTITIONS 6
STORED AS KUDU
TBLPROPERTIES (
‘kudu.table_name‘ = ‘testdb.tmp_test_perf‘,
‘kudu.master_addresses‘ = ‘10.0.0.100:7051,10.0.0.101:7051,10.0.0.101:7051‘,
‘kudu.num_tablet_replicas‘ = ‘1‘
)
;

=========================
编写测试java程序
=========================

package kudu_perf_test;

import java.sql.Timestamp;
import java.util.UUID;
import org.apache.kudu.client.*;

public class Test {
    private final static int OPERATION_BATCH = 500;

    //同时支持三个模式的测试用例
    public static void insertTestGeneric(KuduSession session, KuduTable table, SessionConfiguration.FlushMode mode,
            int recordCount) throws Exception {
        // SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND
        // SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC
        // SessionConfiguration.FlushMode.MANUAL_FLUSH
        session.setFlushMode(mode);
        if (SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC != mode) {
            session.setMutationBufferSpace(OPERATION_BATCH);
        }
        int uncommit = 0;

        for (int i = 0; i < recordCount; i++) {
            Insert insert = table.newInsert();
            PartialRow row = insert.getRow();
            UUID uuid = UUID.randomUUID();
            row.addString("id", uuid.toString());
            row.addString("name", mode.name());

            session.apply(insert);

            // 对于手工提交, 需要buffer在未满的时候flush,这里采用了buffer一半时即提交
            if (SessionConfiguration.FlushMode.MANUAL_FLUSH == mode) {
                uncommit = uncommit + 1;
                if (uncommit > OPERATION_BATCH / 2) {
                    session.flush();
                    uncommit = 0;
                }
            }
        }

        // 对于手工提交, 保证完成最后的提交
        if (SessionConfiguration.FlushMode.MANUAL_FLUSH == mode && uncommit > 0) {
            session.flush();
        }

        // 对于后台自动提交, 必须保证完成最后的提交, 并保证有错误时能抛出异常
        if (SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND == mode) {
            session.flush();
            RowErrorsAndOverflowStatus error = session.getPendingErrors();
            if (error.isOverflowed() || error.getRowErrors().length > 0) {
                if (error.isOverflowed()) {
                    throw new Exception("Kudu overflow exception occurred.");
                }
                StringBuilder errorMessage = new StringBuilder();
                if (error.getRowErrors().length > 0) {
                    for (RowError errorObj : error.getRowErrors()) {
                        errorMessage.append(errorObj.toString());
                        errorMessage.append(";");
                    }
                }
                throw new Exception(errorMessage.toString());
            }
        }

    }

    //仅支持手动flush的测试用例
    public static void insertTestManual(KuduSession session, KuduTable table, int recordCount) throws Exception {
        // SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND
        // SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC
        // SessionConfiguration.FlushMode.MANUAL_FLUSH
        SessionConfiguration.FlushMode mode = SessionConfiguration.FlushMode.MANUAL_FLUSH;
        session.setFlushMode(mode);
        session.setMutationBufferSpace(OPERATION_BATCH);

        int uncommit = 0;
        for (int i = 0; i < recordCount; i++) {
            Insert insert = table.newInsert();
            PartialRow row = insert.getRow();
            UUID uuid = UUID.randomUUID();
            row.addString("id", uuid.toString());
            row.addString("name", mode.name());

            session.apply(insert);

            // 对于手工提交, 需要buffer在未满的时候flush,这里采用了buffer一半时即提交
            uncommit = uncommit + 1;
            if (uncommit > OPERATION_BATCH / 2) {
                session.flush();
                uncommit = 0;
            }
        }

        // 对于手工提交, 保证完成最后的提交
        if (uncommit > 0) {
            session.flush();
        }
    }

    //仅支持自动flush的测试用例
    public static void insertTestInAutoSync(KuduSession session, KuduTable table, int recordCount) throws Exception {
        // SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND
        // SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC
        // SessionConfiguration.FlushMode.MANUAL_FLUSH
        SessionConfiguration.FlushMode mode = SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC;
        session.setFlushMode(mode);        

        for (int i = 0; i < recordCount; i++) {
            Insert insert = table.newInsert();
            PartialRow row = insert.getRow();
            UUID uuid = UUID.randomUUID();
            row.addString("id", uuid.toString());
            row.addString("name", mode.name());

            //对于AUTO_FLUSH_SYNC模式, apply()将立即完成kudu写入
            session.apply(insert);
        }
    }

    public static void test() throws KuduException {
        KuduClient client = new KuduClient.KuduClientBuilder("10.0.0.100:7051,10.0.0.101:7051,10.0.0.101:7051")
                .build();
        KuduSession session = client.newSession();
        KuduTable table = client.openTable("testdb.tmp_test_perf");

        SessionConfiguration.FlushMode mode;
        Timestamp d1 = null;
        Timestamp d2 = null;
        long millis;
        long seconds;
        int recordCount = 0;

        try {
            mode = SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND;
            d1 = new Timestamp(System.currentTimeMillis());
            insertTestGeneric(session, table, mode, recordCount);
            d2 = new Timestamp(System.currentTimeMillis());
            millis = d2.getTime() - d1.getTime();
            seconds = millis / 1000 % 60;
            System.out.println(mode.name() + "耗时秒数:" + seconds);

            mode = SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC;
            d1 = new Timestamp(System.currentTimeMillis());
            insertTestInAutoSync(session, table,  recordCount);
            d2 = new Timestamp(System.currentTimeMillis());
            millis = d2.getTime() - d1.getTime();
            seconds = millis / 1000 % 60;
            System.out.println(mode.name() + "耗时秒数:" + seconds);

            mode = SessionConfiguration.FlushMode.MANUAL_FLUSH;
            d1 = new Timestamp(System.currentTimeMillis());
            insertTestManual(session, table,  recordCount);
            d2 = new Timestamp(System.currentTimeMillis());
            millis = d2.getTime() - d1.getTime();
            seconds = millis / 1000 % 60;
            System.out.println(mode.name() + "耗时秒数:" + seconds);            

        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } finally {
            if (!session.isClosed()) {
                session.close();
            }
        }

    }

    public static void main(String[] args) {
        try {
            test();
        } catch (KuduException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        System.out.println("Done");

    }
}

=========================
性能测试结果
=========================
MANUAL_FLUSH 模式:8000 row/second
AUTO_FLUSH_BACKGROUND 模式:8000 row/second
AUTO_FLUSH_SYNC 模式:1000 row/second
Impala SQL Insert 语句:80 row/second

=========================
Kudu API 使用总结
=========================
1. 尽量采用 MANUAL_FLUSH, 性能最好, 如果有写入kudu错误, flush()函数就会抛出异常, 逻辑非常清晰.
2. 在性能要求不高的情况下, AUTO_FLUSH_SYNC 也是一个好的选择.
3. 仅仅在demo场景下使用 AUTO_FLUSH_BACKGROUND, 不考虑异常处理时候代码可以很简单, 性能也很好. 在生产环境下, 不推荐的 原因是: 插入数据可能会是乱序, 一旦考虑捕获异常代码就很拖沓.

原文地址:https://www.cnblogs.com/harrychinese/p/kudu_java_api.html

时间: 2024-11-01 16:07:52

kudu系列: Java API使用和效率测试的相关文章

Kafka笔记整理(二):Kafka Java API使用

[TOC] Kafka笔记整理(二):Kafka Java API使用 下面的测试代码使用的都是下面的topic: $ kafka-topics.sh --describe hadoop --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181 Topic:hadoop PartitionCount:3 ReplicationFactor:3 Configs: Topic: hadoop Partition: 0 Leader:

Apache Kafka系列(三) Java API使用

Apache Kafka系列(一) 起步 Apache Kafka系列(二) 命令行工具(CLI) Apache Kafka系列(三) Java API使用 摘要: Apache Kafka Java Client API 一.基本概念 Kafka集成了Producer/Consumer连接Broker的客户端工具,但是在消息处理方面,这两者主要用于服务端(Broker)的简单操作,如: 1.创建Topic 2.罗列出已存在的Topic 3.对已有Topic的Produce/Consume测试

5 weekend01、02、03、04、05、06、07的分布式集群的HA测试 + hdfs--动态增加节点和副本数量管理 + HA的java api访问要点

weekend01.02.03.04.05.06.07的分布式集群的HA测试 1)  weekend01.02的hdfs的HA测试 2)  weekend03.04的yarn的HA测试 1)  weekend01.02的hdfs的HA测试 首先,分布式集群都是正常的,且工作的 然后呢, 以上是,weekend01(active).weekend02(standby) 当weekend01给kill, 变成weekend01(standby).weekend02(active) 模拟weekend

kafka2.9.2的伪分布式集群安装和demo(java api)测试

1.什么是kafka? kafka是LinkedIn开发并开源的一个分布式MQ系统,现在是Apache的一个孵化项目.在它的主页描述kafka为一个高吞吐量的分布式(能将消息分散到不同的节点上)MQ.Kafka仅仅由7000行Scala编写,据了解,Kafka每秒可以生产约25万消息(50 MB),每秒处理55万消息(110 MB). kafka目前支持多种客户端语言:java,python,c++,php等等. kafka集群的简要图解如下,producer写入消息,consumer读取消息

ubuntu12.04+kafka2.9.2+zookeeper3.4.5的分布式集群安装和demo(java api)测试

博文作者:迦壹 博客地址:http://idoall.org/home.php?mod=space&uid=1&do=blog&id=547 转载声明:可以转载, 但必须以超链接形式标明文章原始出处和作者信息及版权声明,谢谢合作! --------------------------------------- 目录: 一.什么是kafka? 二.kafka的官方网站在哪里? 三.在哪里下载?需要哪些组件的支持? 四.如何安装? 五.FAQ 六.扩展阅读 一.什么是kafka? ka

kafka2.9.2的分布式集群安装和demo(java api)测试

目录: 一.什么是kafka? 二.kafka的官方网站在哪里? 三.在哪里下载?需要哪些组件的支持? 四.如何安装? 五.FAQ 六.扩展阅读   一.什么是kafka? kafka是LinkedIn开发并开源的一个分布式MQ系统,现在是Apache的一个孵化项目.在它的主页描述kafka为一个高吞吐量的分布式(能将消息分散到不同的节点上)MQ.Kafka仅仅由7000行Scala编写,据了解,Kafka每秒可以生产约25万消息(50 MB),每秒处理55万消息(110 MB). kafka目

spark2.x由浅入深深到底系列六之RDD java api详解三

学习任何spark知识点之前请先正确理解spark,可以参考:正确理解spark 本文详细介绍了spark key-value类型的rdd java api 一.key-value类型的RDD的创建方式 1.sparkContext.parallelizePairs JavaPairRDD<String, Integer> javaPairRDD =         sc.parallelizePairs(Arrays.asList(new Tuple2("test", 3

spark2.x由浅入深深到底系列六之RDD java api详解一

以下对RDD的三种创建方式.单类型RDD基本的transformation api.采样Api以及pipe操作进行了java api方面的阐述 一.RDD的三种创建方式 从稳定的文件存储系统中创建RDD,比如local fileSystem或者hdfs等,如下: //从hdfs文件中创建 JavaRDD<String> textFileRDD = sc.textFile("hdfs://master:9999/users/hadoop-twq/word.txt"); //从

spark2.x由浅入深深到底系列六之RDD java api调用scala api的原理

RDD java api其实底层是调用了scala的api来实现的,所以我们有必要对java api是怎么样去调用scala api,我们先自己简单的实现一个scala版本和java版本的RDD和SparkContext 一.简单实现scala版本的RDD和SparkContext class RDD[T](value: Seq[T]) {   //RDD的map操作   def map[U](f: T => U): RDD[U] = {     new RDD(value.map(f))