Kudu+Impala很适合数据分析, 但直接使用Insert values语句往Kudu表插入数据, 效率实在不好, 测试下来insert的速度仅为80笔/秒. 原因也是显然的, Kudu本身写入效率很高, 但是Impala并没有做这方面优化, 观察下来每次Impala语句执行的overhead都太大了, 导致频繁小批次写入效率非常差, Kudu官方推荐使用Java API或Python API完成数据写入工作. 下面是使用Java API的测试用例, 也可以看出Kudu API的大致用法.
=========================
准备测试Table
=========================
-- kudu table CREATE TABLE kudu_testdb.tmp_test_perf ( id string ENCODING PLAIN_ENCODING COMPRESSION SNAPPY, name string ENCODING DICT_ENCODING COMPRESSION SNAPPY, PRIMARY KEY (id) ) PARTITION BY HASH (id) PARTITIONS 6 STORED AS KUDU TBLPROPERTIES ( ‘kudu.table_name‘ = ‘testdb.tmp_test_perf‘, ‘kudu.master_addresses‘ = ‘10.0.0.100:7051,10.0.0.101:7051,10.0.0.101:7051‘, ‘kudu.num_tablet_replicas‘ = ‘1‘ ) ;
=========================
编写测试java程序
=========================
package kudu_perf_test; import java.sql.Timestamp; import java.util.UUID; import org.apache.kudu.client.*; public class Test { private final static int OPERATION_BATCH = 500; //同时支持三个模式的测试用例 public static void insertTestGeneric(KuduSession session, KuduTable table, SessionConfiguration.FlushMode mode, int recordCount) throws Exception { // SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND // SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC // SessionConfiguration.FlushMode.MANUAL_FLUSH session.setFlushMode(mode); if (SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC != mode) { session.setMutationBufferSpace(OPERATION_BATCH); } int uncommit = 0; for (int i = 0; i < recordCount; i++) { Insert insert = table.newInsert(); PartialRow row = insert.getRow(); UUID uuid = UUID.randomUUID(); row.addString("id", uuid.toString()); row.addString("name", mode.name()); session.apply(insert); // 对于手工提交, 需要buffer在未满的时候flush,这里采用了buffer一半时即提交 if (SessionConfiguration.FlushMode.MANUAL_FLUSH == mode) { uncommit = uncommit + 1; if (uncommit > OPERATION_BATCH / 2) { session.flush(); uncommit = 0; } } } // 对于手工提交, 保证完成最后的提交 if (SessionConfiguration.FlushMode.MANUAL_FLUSH == mode && uncommit > 0) { session.flush(); } // 对于后台自动提交, 必须保证完成最后的提交, 并保证有错误时能抛出异常 if (SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND == mode) { session.flush(); RowErrorsAndOverflowStatus error = session.getPendingErrors(); if (error.isOverflowed() || error.getRowErrors().length > 0) { if (error.isOverflowed()) { throw new Exception("Kudu overflow exception occurred."); } StringBuilder errorMessage = new StringBuilder(); if (error.getRowErrors().length > 0) { for (RowError errorObj : error.getRowErrors()) { errorMessage.append(errorObj.toString()); errorMessage.append(";"); } } throw new Exception(errorMessage.toString()); } } } //仅支持手动flush的测试用例 public static void insertTestManual(KuduSession session, KuduTable table, int recordCount) throws Exception { // SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND // SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC // SessionConfiguration.FlushMode.MANUAL_FLUSH SessionConfiguration.FlushMode mode = SessionConfiguration.FlushMode.MANUAL_FLUSH; session.setFlushMode(mode); session.setMutationBufferSpace(OPERATION_BATCH); int uncommit = 0; for (int i = 0; i < recordCount; i++) { Insert insert = table.newInsert(); PartialRow row = insert.getRow(); UUID uuid = UUID.randomUUID(); row.addString("id", uuid.toString()); row.addString("name", mode.name()); session.apply(insert); // 对于手工提交, 需要buffer在未满的时候flush,这里采用了buffer一半时即提交 uncommit = uncommit + 1; if (uncommit > OPERATION_BATCH / 2) { session.flush(); uncommit = 0; } } // 对于手工提交, 保证完成最后的提交 if (uncommit > 0) { session.flush(); } } //仅支持自动flush的测试用例 public static void insertTestInAutoSync(KuduSession session, KuduTable table, int recordCount) throws Exception { // SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND // SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC // SessionConfiguration.FlushMode.MANUAL_FLUSH SessionConfiguration.FlushMode mode = SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC; session.setFlushMode(mode); for (int i = 0; i < recordCount; i++) { Insert insert = table.newInsert(); PartialRow row = insert.getRow(); UUID uuid = UUID.randomUUID(); row.addString("id", uuid.toString()); row.addString("name", mode.name()); //对于AUTO_FLUSH_SYNC模式, apply()将立即完成kudu写入 session.apply(insert); } } public static void test() throws KuduException { KuduClient client = new KuduClient.KuduClientBuilder("10.0.0.100:7051,10.0.0.101:7051,10.0.0.101:7051") .build(); KuduSession session = client.newSession(); KuduTable table = client.openTable("testdb.tmp_test_perf"); SessionConfiguration.FlushMode mode; Timestamp d1 = null; Timestamp d2 = null; long millis; long seconds; int recordCount = 0; try { mode = SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND; d1 = new Timestamp(System.currentTimeMillis()); insertTestGeneric(session, table, mode, recordCount); d2 = new Timestamp(System.currentTimeMillis()); millis = d2.getTime() - d1.getTime(); seconds = millis / 1000 % 60; System.out.println(mode.name() + "耗时秒数:" + seconds); mode = SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC; d1 = new Timestamp(System.currentTimeMillis()); insertTestInAutoSync(session, table, recordCount); d2 = new Timestamp(System.currentTimeMillis()); millis = d2.getTime() - d1.getTime(); seconds = millis / 1000 % 60; System.out.println(mode.name() + "耗时秒数:" + seconds); mode = SessionConfiguration.FlushMode.MANUAL_FLUSH; d1 = new Timestamp(System.currentTimeMillis()); insertTestManual(session, table, recordCount); d2 = new Timestamp(System.currentTimeMillis()); millis = d2.getTime() - d1.getTime(); seconds = millis / 1000 % 60; System.out.println(mode.name() + "耗时秒数:" + seconds); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { if (!session.isClosed()) { session.close(); } } } public static void main(String[] args) { try { test(); } catch (KuduException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("Done"); } }
=========================
性能测试结果
=========================
MANUAL_FLUSH 模式:8000 row/second
AUTO_FLUSH_BACKGROUND 模式:8000 row/second
AUTO_FLUSH_SYNC 模式:1000 row/second
Impala SQL Insert 语句:80 row/second
=========================
Kudu API 使用总结
=========================
1. 尽量采用 MANUAL_FLUSH, 性能最好, 如果有写入kudu错误, flush()函数就会抛出异常, 逻辑非常清晰.
2. 在性能要求不高的情况下, AUTO_FLUSH_SYNC 也是一个好的选择.
3. 仅仅在demo场景下使用 AUTO_FLUSH_BACKGROUND, 不考虑异常处理时候代码可以很简单, 性能也很好. 在生产环境下, 不推荐的 原因是: 插入数据可能会是乱序, 一旦考虑捕获异常代码就很拖沓.
原文地址:https://www.cnblogs.com/harrychinese/p/kudu_java_api.html