kafka的c/c++高性能客户端librdkafka简介

Librdkafka是c语言实现的apachekafka的高性能客户端,为生产和使用kafka提供高效可靠的客户端,并且提供了c++接口

性能:

Librdkafka 是一款专为现代硬件使用而设计的高性能库,它尝试将内存复制保持在最小,可以让用户决定是需要高吞吐量还是低延迟的服务,性能调优的两个最重要的配置是:

*batch.num.messages:在发送消息之前累积在本地队列中等待的消息的最小数量。

*queue.buffering.max.ms:等待batch.num.messages多长时间来填写到本地队列中。

使用:

源码中的rdkafka.h、CONFIGURATION.md有Librdkafka的API的说明

初始化:

应用程序需要实例化一个顶级对象rd_kafka_t作为基础容器,提供全局配置和共享状态,调用rd_kafka_new()创建。

还需要实例化一个或多个topics(`rd_kafka_topic_t`)来提供生产或消费,topic对象保存topic特定的配置,并在内部填充所有可用分区和leader brokers,通过调用`rd_kafka_topic_new()`创建。

`rd_kafka_t``rd_kafka_topic_t`自带一个可选的配置API,如果没有调用API,Librdkafka将会使用CONFIGURATION.md中的默认配置。

注意

1.应用程序可能会创建多个`rd_kafka_t`对象,并且它们不共享任何状态

2.一个`rd_kafka_topic_t`对象仅可以用于创建它的`rd_kafka_t`对象

配置

为了简化与Apache Kafka官方软件的集成,降低学习曲线,librdkafka实现了与Apache Kafka官方客户端相同的配置属性。

使用`rd_kafka_conf_set()` 和`rd_kafka_topic_conf_set()`在创建对象之前应用配置。

注意:

`rd_kafka.._conf_t`对象在传递给rd_kafka.._new()`之后不可重复使用,调用`rd_kafka.._new()`后,应用程序不需要free任何配置资源。

例子

[cpp] view plain copy

  1. rd_kafka_conf_t*conf;
  2. char errstr[512];
  3. conf = rd_kafka_conf_new();
  4. rd_kafka_conf_set(conf, "compression.codec","snappy", errstr, sizeof(errstr));
  5. rd_kafka_conf_set(conf, "batch.num.messages", "100",errstr, sizeof(errstr));
  6. rd_kafka_new(RD_KAFKA_PRODUCER,conf);

线程和回调函数

librdkafka内部使用多个线程来充分利用硬件资源.

API是线程安全的,应用程序可以在任意时间调用其线程内的任意api函数.

poll-based的API用于向应用程序提供信号,应用程序定期调用` rd_kafka_poll() `,poll API将会调用如下的API:

*消息传递报告回调函数:消息传递成功或失败的信号,允许应用程序释放消息中使用的任何应用程序资源。

*错误回调函数:发出错误信号,这些错误通常具有信息性质,例如连接broker失败,应用程序通常不需要做任何处理,错误的类型通过` rd_kafka_resp_err_t `枚举值传递,包括远程的broke错误和本地错误。

可选回调不是通过poll触发的,可以通过任意线程调用:

*Logging callback :允许应用程序输出librdkafka生成的日志消息

*partitioner callback:应用提供的消息分区器,可在任意时刻、任意线程中调用,对于相同的键,可以调用多次

Brokers

Librdkafka需要至少一个brokers的初始化list,称作` bootstrap brokers `,通过"metadata.broker.list"配置属性或`rd_kafka_brokers_add()`来指定,用来连接所有bootstrapbrokers,并查询每个元数据的信息,其中包含brokers、topic、partitions和它们在kafka cluster中的leaders的完整列表,

Brokers的名字被指定为"host[:port]",端口可选(默认9092),host是主机名或ip地址,如果主机解析到多个地址,librdkafka将轮询每个尝试连接的地址,因此,可以使用包含所有brokers地址的DNS记录来提供可靠的bootstrap broker。

Producer API

使用`RD_KAFKA_PRODUCER`设置了`rd_kafka_t`对象,并设置了一个或多个`rd_kafka_topic_t`对象后,librdkafka已经准备好接收要发送给brokers的消息。

`rd_kafka_produce()`函数有如下参数:

*`rkt` - 需要produce的topic,之前通过`rd_kafka_topic_new()`函数创

*`partition` - 生产到的partition,如果设置为`RD_KAFKA_PARTITION_UA`(UnAssigned),那么配置的分区函数将会用来选择目标分区。

*`msgflags` - 0,或者是:

* `RD_KAFKA_MSG_F_COPY` - librdkafka会立刻生成payload的一份拷贝,当payload在非持久化内存中(例如堆)时使用。

* `RD_KAFKA_MSG_F_FREE` - librdkafka使用完payload后,会使用`free(3)`将其释放。

这两个指标是互斥的,如果既不需要copy也不需要free,那么这两个指标都不需要设置。

如果`RD_KAFKA_MSG_F_COPY`没有设置,将不会执行数据的复制,librdkafka将会hold住payload的指针直到消息成功传输或传输失败。

当librdkafka完成消息的传递,使应用程序重新获得payload内存的所有权后,传递报告回调函数将会被调用

如果设置了`RD_KAFKA_MSG_F_FREE`,传递报告回调函数不能对payload进行free

*`payload`,`len` - 消息的payload

*`key`,`keylen` - 可以用来进行消息分区的消息键

它将被传递到topic分区回调函数(如果存在的话),并在发送给broker的时候附加在消息上

*`msg_opaque` - 应用程序提供的一个可选的每条消息的不透明指针,在消息回调函数中提供,让应用程序引用一个特定的指针。

`rd_kafka_produce()`是一个非阻塞API,它会在内部队列中排列消息并立即返回。如果已排列的消息个数超过了"queue.buffering.max.messages"配置项,`rd_kafka_produce()`返回-1并将errno设置为`ENOBUFS`,从而提供了一种背压机制

Simple Consumer API

NOTE: 对于高级KafkaConsumer接口,查看rd_kafka_subscribe(rdkafka.h) 或者 KafkaConsumer (rdkafkacpp.h)。

使用`RD_KAFKA_CONSUMER`和`rd_kafka_topic_t`实例创建`rd_kafka_t`后,应用程序还必须通过调用`rd_kafka_consume_start()`来为给定的分区启动consumer。

`rd_kafka_consume_start()` 参数:

* `rkt` - 需要消费的topic,之前通过`rd_kafka_topic_new()`创建。

 *`partition` - 从哪个分区消费

  *`offset` - 开始消费的消息offset,这可能是绝对消息偏移或两个特殊偏移之一:

`RD_KAFKA_OFFSET_BEGINNING` :从partition队列的起始位置开始消费(最老的message)

         `RD_KAFKA_OFFSET_END`:在下一个要生产到该partition上的消息处开始消费

         `RD_KAFKA_OFFSET_STORED`:使用存储的offset

一个topic+partition的consumer启动后,librdkafka将会尝试通过反复从broker获取批次消息以保持本地队列中保存"queued.min.messages"条消息,然后这个本地消息队列将会通过三个不同的consume API传递给应用程序:

*`rd_kafka_consume()` - consume单条消息

*`rd_kafka_consume_batch()` - consume单条或多条消息

*`rd_kafka_consume_callback()` - consume本地队列中的所有消息,并给每条消息调用一个回调函数

这三个API按照性能升序排列,`rd_kafka_consume()`最慢,`rd_kafka_consume_callback()`最快。

使用`rd_kafka_message_t`类型标识一条已消费的消息,其成员为:

*`err` - 发回到应用程序的错误信号,如果不为0,那么`payload`成员将被认为是一条错误消息,`err`是错误码(`rd_kafka_resp_err_t`),如果为0,`payload`则包含消息数据。

*`rkt`,`partition` - 该消息的topic和partition

*`payload`,`len` - payload消息,或者是错误信息(err!=0)

*`key`,`key_len` - 生产者指定的可选消息key

*`offset` - Message offset

`payload`和`key`以及整个消息的内存,属于librdkafka,调用`rd_kafka_message_destroy()`后不可再次使用,librdkafka将为该消息集的所有消息payloads共享相同的消息集接收缓冲存储器,以避免过度复制,这意味着如果应用程序决定hang on单个rd_kafka_message_t,它将阻止从相同消息集中释放所有其他消息的备份内存。

当应用程序完成从topic+partition的消息消费后,需要调用`rd_kafka_consume_stop()`来停止这个consumer,这也将清除本地队列中的当前的消息。

Offset management

broker version >= 0.9.0结合使用高版本的KafkaConsumer接口,可实现基于Broker的offset管理(查看rdkafka.h或 rdkafkacpp.h)

还可以通过本地文件存储来实现Offset管理,通过如下的topic配置参数,offset被永久写在本地文件中:

  * `auto.commit.enable`

  * `auto.commit.interval.ms`

  * `offset.store.path`

  * `offset.store.sync.interval.ms`

目前还没有对ZooKeeper的偏移量管理的支持。

Consumer groups

当kafka broker 版本>= 0.9 ,librdkafka支持基于broker的consumer groups

Topics

Librdkafka支持自动创建topic,broker需要配置"auto.create.topics.enable=true"

时间: 2024-07-30 23:14:54

kafka的c/c++高性能客户端librdkafka简介的相关文章

kafka C客户端librdkafka producer源码分析

简介 kafka网站上提供了C语言的客户端librdkafka,地址在这. librdkafka是使用C语言根据apache kafka 协议实现的客户端.另外这个客户端还有简单的c++接口.客户端作者对这个客户端比较上心,经常会修改bug并提交新功能. librdkafka的基本原理和我之前博客说的java版producer类似,一个线程向队列中加数据,另一个线程通过非阻塞的方式从队列中取出数据,并写入到broker. 源码分析 源码包含两个文件夹src和src-cpp src是用c实现的源码

kafka环境搭建二---Windows客户端Linux服务器

一.对于服务器端的搭建可以参考上一篇文章:kafka单机版环境搭建与测试 服务器端IP :10.0.30.221 运行环境的目录如下: 需要改动config文件夹下的server.properties中的以下两个属性 zookeeper.connect=localhost:2181改成zookeeper.connect=10.0.30.221:2181 以及默认注释掉的 #host.name=localhost改成host.name=10.0.30.221 host.name不更改会造成客户端报

使用Kafka建立可靠的高性能分布式消息传递基础结构

在优锐课学习中了解到,我们可以看到实施资源适配器以将Kafka与企业Java解决方案集成.码了很多专业的相关知识, 分享给大家参考学习. 由于世界已经变得移动化,因此应用程序现在必须实时提供数据. 不仅重要的是存储在数据库表中的最终结果,而且重要的是用户在使用应用程序时执行的所有操作. 无论可用的任何信息(例如用户点击,日志数据或传感器数据)用于增强用户体验,生成报告,为机器学习系统供稿,等等. 今天,开发人员必须专注于基于实时事件流的系统. 下图显示了基于事件流处理的体系结构示例. Apach

第4章4节《MonkeyRunner源码剖析》ADB协议及服务: ADB命令行客户端使用简介(原创)

天地会珠海分舵注:本来这一系列是准备出一本书的,详情请见早前博文"寻求合作伙伴编写<深入理解 MonkeyRunner>书籍".但因为诸多原因,没有如愿.所以这里把草稿分享出来,所以错误在所难免.有需要的就参考下吧,转发的话还请保留每篇文章结尾的出处等信息. 从前面几个小节我们知道ADB命令行客户端是存在与主机端的一个命令,用户可以使用该命令来发送服务请求到ADB服务器,ADB服务器再判断该服务请求是主机服务请求还是本地服务请求来决定是否应该将请求传送给远程adbd守护进程

客户端交互技术简介

随着网络应用的不断丰富,客户端交互技术也如雨后春笋一般,遍地开花.正是这些技术的支持,我们的互联网世界变得更加丰富多彩.一个浏览器上,不用说是简单的动画效果,就是一个Office应用也能顺畅的使用.所有这些都要感谢客户端交互技术的快速发展.甚至,现在的客户端交互技术的发展的趋势是,可以以浏览器为载体,也可以脱离浏览器以单独的应用形式存在. 总体来看,目前的客户端交互技术主要分为如下几大类别; microsoft系列: ActiveX, Silverlight java系列:Java Applet

Windows客户端开发简介(一)

在这样一个移动当道的年代,我跟大家讨论Windows客户端开发,似乎有些倚老卖老的意思了.然而我却觉得无论什么时候,Windows客户端开发其实还是有着不少实用经典的技术的.对了,确切说我是要说说Windows C++客户端开发,什么WinForm,WPF,并不在讨论范围之内,我承认用.NET ,C#做Windows客户端对开发人员来说确实是件轻松愉快的事,但是因为这些技术由于种种原因(主要还是效率问题)在经典的Windows客户端程序采用的少之又少,所以我打算把他们略过. 我并不是什么微软技术

Windows客户端开发简介(二)

一个典型的Windows客户端程序要有哪几部分构成呢?下面我会以一个国内比较流行的互联网客户端程序的基本架构来跟大家逐步展开分析,由于涉及到知识产权的问题,请大家不要问我是什么产品,当然,如果你能猜到,那我就管不着了^_^. 某视频影音互联网PC客户端产品基本架构 如上只是个粗略的分层架构图,没有更细致的划分,但是有几个地方是需要特别关注的,比如最上层的那几个部分,音视频解码引擎,UI引擎,WebKit浏览器内核,内核通信模块,日志系统. 因为音视频解码引擎和内核通信模块只是对于视频客户端和P2

Windows客户端开发简介(三)

之前的一篇文章里,我简单概要的介绍了一下界面库的知识.既然是跟界面有关,那么必然少不了很多关于绘制的内容.对于Windows开发而言,界面绘制使用的一类API就是所谓的"GDI". GDI这个东西可有历史了,但是我们就不去追根朔源了.首先,我不能免俗的要先介绍一下它的全称:"Graphic Device Interface",即"图形设备接口",从这个名称我们可以大略吸收到的信息就是:GDI是个跟图形绘制有关的接口,对的,正是这样! 先让我们来看

redis高性能客户端 - redissdk

我们首先在我们自己的工程下放置redis.properties,内容如下: #redis地址 server=192.168.0.8 #redis端口 port=6379 auth=admin max_active=600 max_idle=300 #最大等待时间 max_wait=1000 #超时 timeout=3000 test_on_borrow=true 然后我们写一个测试类,测试下redis,我们要测试对象写入,读取,那么我们先写一个简单的User类. 1 package com.re