kafka 客户端封装

kafka客户端封装源码

1.为什么进行封装?

kafka官方自带的客户端,需要对参数进行设置,如下代码,很多参数的key都是字符串,这样对于编程人员来说非常不友好。参数很多的时候,有两种处理方式:(1)传一个config类进去解析;(2)使用建造者模式,笔者在此就使用建造者模式,对官方客户端进行简单封装,使之易用。

官方的例子如下:

 1  Properties props = new Properties();
 2  props.put("bootstrap.servers", "localhost:4242");
 3  props.put("acks", "all");
 4  props.put("retries", 0);
 5  props.put("batch.size", 16384);
 6  props.put("linger.ms", 1);
 7  props.put("buffer.memory", 33554432);
 8  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 9  props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
10
11  Producer<String, String> producer = new KafkaProducer<>(props);
12  for(int i = 0; i < 100; i++)
13      producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
14
15  producer.close();

封装后:

CloudKafkaConsumer<String, String> kafkaConsumer =
                KafkaConsumerBuilder.builder()
                        .withBootstrapServers("172.31.1.161:9092")
                        .withGroupId("connector")
                        .withAutoCommit("true")
                        .withCommitIntervalMs("1000")
                        .withSessionimeoutMs("30000")
                        .build();
        kafkaConsumer.subscribe(Arrays.asList("test"));
        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(5000);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf(record.value());
        }

2.这样其实还不是很好,对于value的类型没有限定,所以我们可以对value的类型限定。如果是基础类型就限定为:boolean、int、String。如果不是基础类型就限定为Enum,这样客户端使用起来会优雅很多。

时间: 2024-10-09 16:20:01

kafka 客户端封装的相关文章

Kafka客户端二次封装扩展总体设计

前言背景 消息系统经过多年使用和运维管理平台开发迭代,能较好支持支撑业务发展,公司主流语言为java,但缺乏一个基于Kafka二次封装简单好用的java客户端.遇到问题如下所示: 使用好kafka客户端对业务要求高,非专业技术方向很难有精力全面掌握 异常情况会catch不全 客户端生产消息及双活机房容灾缺失 集群升级难度增加,因为无法全面及时掌握客户端信息(kafka版本.groupid) 不支持动态配置更新,业务使用错误及引发的潜在故障无法及时修正,例如Producer写入倾斜导致磁盘报警,参

Kafka 客户端实现逻辑分析

这里主要分析kafka 客户端实现 (代码分析以perl kafka实现为准) kafka客户端分为生产者和消费者,生产者发送消息,消费者获取消息. 在kafka协议里客户端通信中用到的最多的四个协议命令是fetch,fetchoffset,send,metadata.这四个分别是获取消息,获取offset,发送消息,获取metadata.剩下的其他协议命令大多都是kafka server内部通信用到的.offsetcommit这个命令在有些语言的client api的实现里给出了接口可以自己提

Erlang 编写 Kafka 客户端之最简单入门

Erlang 编写 Kafka 客户端之最简单入门 费劲周折,终于测通了 erlang 向kafka 发送消息,使用了ekaf 库,参考: An advanced but simple to use, Kafka producer written in Erlang https://github.com/helpshift/ekaf 1 准备kafka客户端 准备2台机器,一台是ekaf运行的kafka客户端(192.168.191.2),一台是kafka服务端(zookeeper+kafka)

Apache kafka客户端开发-java

apache kafka中国社区QQ群:162272557 1.依赖包 <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.1</version> </dependency> 2.producer程序开发例子 2.1 producer参数说明 #指定kafka节点列表,

kafka客户端代码解析

转载:http://backend.blog.163.com/blog/static/202294126201431724652597/ 可以使用服务器端下载的kafka二进制包及依赖,也可以通过mavne获取(注意实测发现该方式拿到的包是用jdk7打的): <dependency> <groupId>com.sksamuel.kafka</groupId> <artifactId>kafka_2.10</artifactId> <vers

Elasticsearch RestHighLevelClient客户端封装

目录 EsClientRHL 开源源码地址 开发原因: 使用前你应该具有哪些技能 工具功能范围介绍 工具源码结构介绍 开始使用 未来规划 EsClientRHL EsClientRHL是一个可基于springboot的elasticsearch RestHighLevelClient客户端调用封装工具,主要提供了es索引结构工具.es索引数据增删改工具.es查询工具.es数据分析工具. 基于elasticsearch6.6+版本进行开发,由于采用RestHighLevelClient,所以版本兼

Java6 WebService客户端封装

在开发WebService客户端应用的时候,面临的最大挑战是事先不知道服务端要部署在哪里,端口是什么,因此Hello World的例子并不适合生产环境,还需要进一步的做改进. 改进的总体思想是将静态的部分替换为动态的,也就是说,将服务的ip和端口,做成动态参数,然后根据服务端的部署灵活改变这些配置参数. 下面针对前面的例子对客户端做一个改进. 1.写WebService服务的配置参数文件 wscfg.properties #Java6WS服务的配置 Java6WS.wsip=192.168.14

android 蓝牙连接端(客户端)封装

0.权限  AndroidManifest.xml <uses-permission android:name="android.permission.BLUETOOTH"/><uses-permission android:name="android.permission.BLUETOOTH_ADMIN"/> <activity android:name=".DeviceListActivity" android:

c++ kafka 客户端rdkafka报Receive failed: Disconnected问题原因以及解决方法

%3|1538976114.812|FAIL|rdkafka#producer-1| [thrd:kafka-server:9092/bootstrap]: kafka-server:9092/0: Receive failed: Disconnected%3|1538976114.812|ERROR|rdkafka#producer-1| [thrd:kafka-server:9092/bootstrap]: kafka-server:9092/0: Receive failed: Disco