Kafka: Connect

Kafka Connect 简介

Kafka Connect 是一个可以在Kafka与其他系统之间提供可靠的、易于扩展的数据流处理工具。使用它能够使得数据进出Kafka变得很简单。Kafka Connect有如下特性:

·是一个通用的构造kafka connector的框架

·有单机、分布式两种模式。开发时建议使用单机模式,生产环境下使用分布式模式。

·提供restful的管理connector的API。

·自动化的offset管理。Kafka Connect自动的管理offset提交。

·分布式、可扩展。采用与concumer group中对partition rebalance同样的机制来管理在worker group中的connector、task。

·流/批处理的集成。

接下来会对Kafka Connect做一个全面的分析,来帮助了解上述特性。

  • 1、Kafka Connect 组件说明

    • 1.1 Worker
    • 1.2 Connector
    • 1.3 Task
      • 1.3.1 Kafka Connect 与KafkaProducer、KafkaConsumer的关系
    • 1.4 Converter
  • 2、Rebalance
  • 3、Storage
    • 3.1 内置Storage
  • 4、Connector 管理
    • 4.1 Herder
    • 4.2 Restful
    • 4.3 通用Connector Plugin升级步骤
  • 5、Configuration
    • 5.1 Worker

      • 5.1.1 Common
      • 5.1.2 Standalone
      • 5.1.3 Distributed

1、基本组件:

1.1 Worker

 

Worker用于调度source task、sink task 来处理数据的进出Kafka。一个Worker中包括多个Connector。一个Connector与多个task关联。当启动(或者加入)一个Connector时,与之关联的Tasks会被创建并提交到executor去执行;停止(或移除)一个Connector时,与之关联的Tasks会被停止并移除。也就是说Worker管理Connector插件的插、拔,Task的启、停。

 

1.2 Connector

Connector,可以看做是Kafka Connect的插件,用来与其他系统进行集成。有两类:SourceConnector、SinkConnector。如果要实现一个Connector,那么必须继承这两个类中的一个,这是硬性要求。Connector 与Task关联,Worker对Connector的启、停、插、拔,其实最终反映在对Task启、停、插、拔上。

Connector主要为创建Task实例提供配置,可以提供connector本身的配置,也可以提供task的配置。

1.3 Task

Task是由Worker中的executor来执行的。在Worker中包括了所有的WorkerTask。WorkerTask有两类:WorkerSourceTask、WorkSinkTask。

在运行过程中,WorkerSourceTask不断的做如下调用:

1)调用SourceTask.poll()方法取得数据SourceRecords,

2)使用相应的Converter将SourceRecords转换为ProducerRecords,

3)并由KafkaProducer#send推给Kafka Broker

在运行过程中,WorkerSinkTask不断的做如下调用:

1)调用KafkaConsumer#poll方法从Kafka Broker拉取ConsumerRecords,

2)使用相应的Converter将ConsumerRecords转换为SinkRecords,

3)调用SinkTask#flush方法来执行自定义的处理

4)提交 offset到Kafka Broker。

不难看出,只有系统运行正常,SinkTask、SourceTask都会被一次又一次的调用。即一个Task实例会占用一个线程。如果一个为Connector配置了task数量大于1,就会有多个tasks实例(这多个Task实例是同一个Java类的多个实例)与该Connector关联,那么这多个tasks在运行时必然是并发执行的。这就要求SourceTask#poll()、SinkTask#flush()在运行时能够保证线程安全性。

另外从WorkerSinkTask#flush如果出现异常,是不会提交offset的。此外,还会调用consumer#seek找到上一次提交的offset,seek的作用是:让下一次使用consumer#poll方法时,从上一次提交的offset开始poll。这样就可以保证数据在consume过程中不会丢失。

1.3.1 Kafka Connect与Kafka Producer、Kafka Consumer关系

Kafka Connect必然是要依赖于KafkaProducer、KafkaConsumer的。这种依赖关系到底是怎样的呢?从上面WorkerSinkTask、WorkerSourceTask的执行过程来看,他们是直接与KafkaConsumer、KafkaProducer直接关联的。经过源码的查看可以了解到:

在一个Worker内,所有的WorkerSourceTask共享同一个KafkaProducer对象来发送record到Kafka,这种设计也符合KafkaProducer的使用原则;每一个WorkerSinkTask拥有一个私有的KafkaConsumer,也就是说不存在KafkaConsumer共享使用的情况。

既然一个WorkerSinkTask独自使用一个KafkaConsumer,那么一个WorkerSinkTask就可以认为是一个独立的KafkaConsumer。那么如果一个WorkerSinkTask因某种原因启动、加入、重启、结束、中断、停止,就等于是说一个KafkaConsumer加入、退出consumer group,那么该consumer group 的Coordinator Broker就会发起rebalance,使得该KafkaConsumer被分配到的partiion分配到group 内其他的KafkaConsumer上。

Kafka-Connect中,如何对KafkaConsumer进行分组呢?

从WorkerSinkTask的相关源码来看:


private KafkaConsumer<byte[], byte[]> createConsumer() {

// Include any unknown worker configs so consumer configs can be set globally on the worker

// and through to the task

Map<String, Object> props = new HashMap<>();

props.put(ConsumerConfig.GROUP_ID_CONFIG, "connect-" + id.connector());

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,

Utils.join(workerConfig.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");

props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");

props.putAll(workerConfig.originalsWithPrefix("consumer."));

KafkaConsumer<byte[], byte[]> newConsumer;

try {

newConsumer = new KafkaConsumer<>(props);

} catch (Throwable t) {

throw new ConnectException("Failed to create consumer", t);

}

return newConsumer;

}

是把与同一个Connector关联的Task会分到一个Consumer Group中。如果要自定义并且 组名默认是connector的名字。如果要自定义Consumer的属性,可以在worker 的配置文件中使用consumer.前缀的方式来完成。

通过上述代码也可以看出,默认情况下,关闭了KafkaConsumer的自动提交offset,改为由Kafka Connect来完成offset的提交。

1.4 Converter

WorkerSinkTask、WorkerSourceTask的执行过程中,会使用Key的Converter、Value的Converter对Key、Value进行转换。

这样做的原因:

作为一个Connector框架,在进行producer#send、consumer#pool时,框架本身并不知道用户的程序中,如何进行序列化、反序列化操作。并且进入Kafka的数据类别不同,所需要的序列化、反序列化工具也不同。

为了解决这一问题,Kafka Connect中引入了Converter、Schema。

WorkerSoureTask中,将SourceRecord中的key、value取出来,经过Converter转换(序列化)成byte[],转成ProducerRecord。


for (final SourceRecord record : toSend) {

byte[] key = keyConverter.fromConnectData(record.topic(), record.keySchema(), record.key());

byte[] value = valueConverter.fromConnectData(record.topic(), record.valueSchema(), record.value());

final ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(record.topic(), record.kafkaPartition(), key, value);

}

WorkerSinkTask中,将consumer#poll到的ConsumerRecord转换(反序列化)成SinkRecord:


private void convertMessages(ConsumerRecords<byte[], byte[]> msgs) {

for (ConsumerRecord<byte[], byte[]> msg : msgs) {

log.trace("Consuming message with key {}, value {}", msg.key(), msg.value());

SchemaAndValue keyAndSchema = keyConverter.toConnectData(msg.topic(), msg.key());

SchemaAndValue valueAndSchema = valueConverter.toConnectData(msg.topic(), msg.value());

messageBatch.add(

new SinkRecord(msg.topic(), msg.partition(),

keyAndSchema.schema(), keyAndSchema.value(),

valueAndSchema.schema(), valueAndSchema.value(),

msg.offset())

);

}

}

2、Rebalance

Kafka Connect支持单机、分布式(集群)两种模式。在生产环境下,通常会使用分布式模式。

通常情况下业务系统集群的数据进入到kafka,然后有Consumer集群进行数据消费处理。如果使用Kafka Connect,集群方案就变得很简单并且扩展性很好。

根据前面所了解的内容来看,一个Connector关联到一类Task,可以指定该类Task的数目,也就是Consumer的数目。也就是说一个Connector关联到一个Consumer Group上。如果Worker是单机模式,那么这个Consumer Group内的多个Consumer就是在同一进程内。如果进程挂掉,也就是整个Consumer Group全部挂掉。

在使用集群模式(Worker Cluster)时,这个问题如果不解决,所谓的集群就没有任何意义。所以在Worker Cluster模式时,重点要解决上述问题。

通过之前的Consumer、Broker的学习,知道Broker会为每一个Consumer Group提供一个协处理器。在Broker Coordinator、 Consumer Leader的配合下,当有Consumer 加入或退出Consumer Group时,会对partition进行rebalance。使得partition相对均匀的分配给各个Consumer上,以此来避免出现热点(partition分配给少数的Consumer上)问题。

Kafka Connect在解决这个问题时,也采用了同样的方案:即使用Broker Coordinator 配合 Worker Leader的方式,对connector、task进行rebalance。这样一来,使connector、task相对均匀的分配到同一个Worker Group中的不同的Worker上。如此便可以解决上述问题了。

一个Worker Leader分配connector、task时的处理是:


private Map<String, ByteBuffer> performTaskAssignment(String leaderId, long maxOffset, Map<String, ConnectProtocol.WorkerState> allConfigs) {

Map<String, List<String>> connectorAssignments = new HashMap<>();

Map<String, List<ConnectorTaskId>> taskAssignments = new HashMap<>();

// Perform round-robin task assignment

CircularIterator<String> memberIt = new CircularIterator<>(sorted(allConfigs.keySet()));

for (String connectorId : sorted(configSnapshot.connectors())) {

String connectorAssignedTo = memberIt.next();

log.trace("Assigning connector {} to {}", connectorId, connectorAssignedTo);

List<String> memberConnectors = connectorAssignments.get(connectorAssignedTo);

if (memberConnectors == null) {

memberConnectors = new ArrayList<>();

connectorAssignments.put(connectorAssignedTo, memberConnectors);

}

memberConnectors.add(connectorId);

for (ConnectorTaskId taskId : sorted(configSnapshot.tasks(connectorId))) {

String taskAssignedTo = memberIt.next();

log.trace("Assigning task {} to {}", taskId, taskAssignedTo);

List<ConnectorTaskId> memberTasks = taskAssignments.get(taskAssignedTo);

if (memberTasks == null) {

memberTasks = new ArrayList<>();

taskAssignments.put(taskAssignedTo, memberTasks);

}

memberTasks.add(taskId);

}

}

this.leaderState = new LeaderState(allConfigs, connectorAssignments, taskAssignments);

return fillAssignmentsAndSerialize(allConfigs.keySet(), ConnectProtocol.Assignment.NO_ERROR,

leaderId, allConfigs.get(leaderId).url(), maxOffset, connectorAssignments, taskAssignments);

}

下面这张图展示了某个Worker失败情况下,对Task Rebalance的情况:

一个Connector 1设置了5个task:

第一列表示所有的Worker都正常工作情况下 Task的分配。

第二列表示Worker 2进程出现故障, task2、task3也就挂掉了。

第三列表示对Task进行rebalance后,task2、task3被rebalance到其他的Worker名下了。

3、Storage

3.1 内置Storage

在Kafka Connect中,内置了3种Storages。这三种配置都会(不论是Standalone,还是Distributed)有一份基于内存的Storage。下面只是针对内存Storage以外的方式进行说明:

·status’s storage

存储各个connector、task的状态信息。在Worker分布式的情况下,存储在Kafka 的一个Topic中,topic的名字由worker配置项status.storage.topic来指定。

Status 有5种:

1) UNASSIGNED:connector或者task是否被分配到了个Worker上。

2) RUNNING: connector或者task是否正在运行。

3) PAUSED: connector或者task是否已暂停运行。

4) FAILED: connector或者task是否失败(通常是抛出异常)。

5) DESTROYED:connector或者task是否已被删除。

在运行时,会定时更新状态信息。

·offset’s storage

在Kafka Connect中,为每一个WorkerSourceTask  ID对应了一个序号(不是ID)。每启动一个WorkerSourceTask,这个序号就会加1。

例如一个单机的Worker上,Connector提交了两个Tasks,那么首次启动Worker时,连个任务对应的offset都是0:

conn1-task1:0, conn1-task2:0

如果重启worker后,两个任务的offset仍然是0,不过有可能在某种情况下(失败后重启)就变成了1:

conn1-task1:1, conn1-task2:1。

对于分布式的Worker,在rebalance后,offset也会加1。

存储各个source task 的下一次创建的task实例的序号offset。在运行时会更新这些source task的offset信息。单机模式、分布式模式下,都会存储这个offset的。只是在Standalone模式下,是以本地File方式存储。分布式模式下,是存储在Kafka中的一个topic中。Topic的名称由worker配置项offset.storage.topic来指定。

这个offset与Broker中的Log的offset 没有任何的关系。

·config’s storage

在Kafka connect中,每一个Connector,以及与之关联的Task都会有一些配置信息。在rebalance后,还是需要用到这些配置的。为了使得Worker Group内共享配置,也需要对connector、task的配置进行存储。

分布式模式下,会在一个topic中存储。Topic的名称由worker配置项 config.storage.topic来指定。

4、Connector 管理

4.1 Herder

在前面的基本组件中,可以了解到,Worker可以对Connector进行启、停、插、拔。为了更加方便的管理Connector,Kafka-Connect中在Worker之外提供了一个工具Herder(翻译为汉语:牧人,其实也就是管理者的意思)。

使用Herder可以插、拔Connector,可以启、停Task,可以修改Connector的配置,获取connector、task的状态。

针对Standalone、Distributed模式,实现的Herder也是不尽相同的。一些修改性质的操作都是只有Worker Leader能够进行的。所以一个Worker Group中的某个Worker的管理者Herder接收到一个修改的请求时,会将该请求路由(通过Restful请求)到Worker Leader。

在使用Kafka Connect时,如果是将Kafka Connect内嵌到业务系统中时,我们是可以通过Herder 对象来对Kafka Connect中的组件进行管理操作的。

但是很多情况下,并不是将Kafka Connect内嵌到我们的系统中的,而是使用它的CLI独立的启动一个或者多个进程的。这种情况下,如何来管理Kafka Connect的组件呢?

4.2 Restful

为了方便管理,另外也专门提供了一种基于Restful的方式来管理connector、task。不论是Standalone还是Distributed模式都支持Restful方式管理。

Restful管理方式,会在Kafka-Connect进程内部内嵌一个基于Jetty的Web Server。默认的端口是8083。

下面会列出当前Kafka-Connector 版本0.10.1.0支持的Restful请求:

  • GET /connectors – 获取当前活动的 connectors列表
  • POST /connectors – 创建一个新的Connector。请求体是一个JSON对象,包括connector的名称,和配置。

{

name:”conn-2”,

config:{

// connector的配置项。

}

}

  • GET /connectors/{name} – 获取指定的connector的信息。
  • GET /connectors/{name}/config -  获取指定的connector的配置项。
  • PUT /connectors/{name}/config – 修改指定的connector的配置项。
  • GET /connectors/{name}/status – 获取指定的connector的当前状态。包括: 1)connector的status, 2)分配在哪个worker上, 3)如果失败的话,错误信息,4)与之关联的所有任务的状态。
  • GET /connectors/{name}/tasks – 获取指定connector的任务列表。
  • GET /connectors/{name}/tasks/{taskid}/status – 获取指定的connector的指定task的状态信息。包括:1)该任务的status,2)该任务分配在哪个worker上,3)如果有失败,错误信息。
  • PUT /connectors/{name}/pause – 暂停某个Connector。暂停某个connector时,与之关联的task都会暂停。
  • PUT /connectors/{name}/resume – 继续某个Connector。如果这个connector根本就没有暂停,那就什么也不用做。
  • POST /connectors/{name}/restart – 重启一个Connector。通常在一Connector失败时。
  • POST /connectors/{name}/tasks/{taskId}/restart – 重启一个task。
  • DELETE /connectors/{name} –删除一个Connector。删除Connector时,会停止它关联的task,并删除相关的配置信息。

Kafka Connect 也为获取Connector Plugin的信息提供了 REST API :

  • GET /connector-plugins- 获取在改Kafka-Connect集群中安装的Connector Plugin列表。需要注意的是,该API只能获取到正在Worker中正常运行状态的connector。某些connector是看不到的,特别是在进行升级时,例如add一个connector jar包。
  • PUT/connector-plugins/{connector-type}/config/validate   验证connector的配置。

4.3 通用Connector升级

目前已提供了很多通用的Connector插件。

地址:https://www.confluent.io/product/connectors/

在使用这些插件时,如果需要升级connector,需要按照下面步骤进行:

1) 下载新版的connector

2) 停止所有的Kafka connect workers。

3)根据connector plugin 的安装说明来安装。

4)启动workers

5)如果采用分布式模式,启动connector。

5、Configuration

5.1 Worker

Worker 可以在Standalone模式运行,也可以在Distributed模式下运行,两种情况下有不同的配置,所以这里需要对他们加以区分:

5.1.1 Common Configuration

·bootstrap.server

因为Worker必然要有至少一个KafkaProducer实例(分布式时有两个),所以需要至少配置一个Kafka broker的host:port对。如果是Broker集群模式,也没有必要将所有的broker配置上。格式是:host1:port1,host2:port2,host3:port3...

·key.converter, value.converter

配置record中的key、value 所采用的converter类名。比较流行的Converter有:JSON、Avro。

·internal.key.converter, internal.value.converter

配置connect-offset topic (也即 offset storage’s topic)中的record的key、value所采用的converter类名。比较流行的Converter有JSON、Avro。

·rest.host.name, rest.port

Rest Server的要绑定的IP和port。其中port模式是8083。

·session.time.out

每一个worker会周期性的发送一个heartbeat到Broker,以告诉Broker自己还活着。如果超过session.timeout.ms 还没有发送heartbeat给Broker,Worker就会断开相关的连接。那么Broker就不能收到heartbeat,就会导致Broker 认为该Worker已经挂掉了。然后Broker就会触发Rebalance,来重新分配connector、task。这个值必须配置的 group.min.session.timeout.ms 与 group.max.session.timeout.ms之间。

默认值:10000,即10s。

·heartbeat.interval.ms

心跳间隔。心跳用于keep-alive Worker与Broker之间的连接。这个值必须低于session.timeout.ms。建议设置的值不要高于session.timeout.ms的1/3。

默认值:3000,即3s 。

. rebalance.timeout.ms

一旦一个rebalance开始了,一个Worker加入到group的时间。在此期间,运行在该Worker上的所有的任务需要flush掉那些还没有处理完毕的数据,并且要提交offset。

如果超过了这个时间,offset commit就会失败。

默认值 60000,即60s 。

·connection.max.idle.ms

连接最大空闲时间。模式值:540000,即9 min。

·client.id

指定一个认为可读的worker id,只是用于跟踪。

5.1.2 Standalone Worker Configuration

·offset.storage.file.filename

存储connector offset 的文件。

5.1.3 Distributed Worker Configuration

·group.id

指定worker属于哪个worker group。

 

·config.storage.topic, offset.storage.topic, status.storage.topic

在一个Worker Group内,  Workers 能够发现彼此并共享connector、Task相关的config、offset、status信息。这些共享信息分别存储在一个topic内。这3个配置项就是用来指定topic名称的。在同一个Worker Group下的每一个worker的配置文件中,这三项必须以一致的。

创建这三个topic时的注意事项:

1)Offset存储的topic 需要有很多个partation并且每一个partition至少三个replicas。

2)config存储的topic 只创建一个partition,并且至少3个replicas。

3)status存储的topic需要创建多个partition并且每一个partition至少3个replicas。

要了解更多信息,可以参考前面Storage部分。

如果想要了解更多关于Kafka-Connect的知识,可以参考:http://docs.confluent.io/current

时间: 2024-10-25 15:54:21

Kafka: Connect的相关文章

Apache Kafka系列(五) Kafka Connect及FileConnector示例

Apache Kafka系列(一) 起步 Apache Kafka系列(二) 命令行工具(CLI) Apache Kafka系列(三) Java API使用 Apache Kafka系列(四) 多线程Consumer方案 Apache Kafka系列(五) Kafka Connect及FileConnector示例 一. Kafka Connect简介 Kafka是一个使用越来越广的消息系统,尤其是在大数据开发中(实时数据处理和分析).为何集成其他系统和解耦应用,经常使用Producer来发送消

Kafka Connect Details 详解

目录 1. Kafka Connect Details 详解 1.1. 概览 1.2. 启动和配置 1.2.1. Standalone 单机模式 1.2.2. Distribute 分布式模式 1.2.3. Connector的配置 1.3. Transformations 转换器 1.4. REST API 1.5. Kafka Connect 开发详解 1.6. Kafka Connect VS Producer Consumer 1.6.1. Kafka Connect的优点 1.7. 第

Build an ETL Pipeline With Kafka Connect via JDBC Connectors

This article is an in-depth tutorial for using Kafka to move data from PostgreSQL to Hadoop HDFS via JDBC connections. Read this eGuide to discover the fundamental differences between iPaaS and dPaaS and how the innovative approach of dPaaS gets to t

Kafka connect in practice(2): distributed mode mysql binlog -&gt;kafka-&gt;hive

In the previous post Kafka connect in practice(1): standalone, I have introduced about the basics of kafka connect  configuration and demonstrate a local standalone demo. In this post we will show the knowledge about distributed data pull an sink. To

Kafka Connect Architecture

Kafka Connect's goal of copying data between systems has been tackled by a variety of frameworks, many of them still actively developed and maintained. This section explains the motivation behind Kafka Connect, where it fits in the design space, and

Kafka Connect REST Interface

Since Kafka Connect is intended to be run as a service, it also supports a REST API for managing connectors. By default this service runs on port 8083. When executed in distributed mode, the REST API will be the primary interface to the cluster. You

Kafka Connect HDFS

概述 Kafka 的数据如何传输到HDFS?如果仔细思考,会发现这个问题并不简单. 不妨先想一下这两个问题? 1)为什么要将Kafka的数据传输到HDFS上? 2)为什么不直接写HDFS而要通过Kafka? HDFS一直以来是为离线数据的存储和计算设计的,因此对实时事件数据的写入并不友好,而Kafka生来就是为实时数据设计的,但是数据在Kafka上无法使用离线计算框架来作批量离线分析. 那么,Kafka为什么就不能支持批量离线分析呢?想象我们将Kafka的数据按天拆分topic,并建足够多的分区

Kafka系列之1—Kafka的总体认识

Kafka的总体认识 1.非中心的架构模型 2.基于TCP的一套Kafka通信协议 3.消息中间件&存储系统 4.存储逻辑层的高并发保证 5.isr机制降低了保证分布式一致性的代价 1. 非中心的架构模型 我们知道,在分布式系统的架构类型里,既有主从式的架构,也有非中心式的架构,像hadoop和hbase都采用了主从式的架构模型,主从式的架构优点有很多,但是主从式下为了避免单点故障而采取的各种策略使得主从式架构的优点并不那么理想,kafka作为一个分布式的消息系统,它采用了非中心式的架构模型,每

[翻译和注解]Kafka Streams简介: 让流处理变得更简单

Introducing Kafka Streams: Stream Processing Made Simple 这是Jay Kreps在三月写的一篇文章,用来介绍Kafka Streams.当时Kafka Streams还没有正式发布,所以具体的API和功能和0.10.0.0版(2016年6月发布)有所区别.但是Jay Krpes在这简文章里介绍了很多Kafka Streams在设计方面的考虑,还是很值得一看的. 以下的并不会完全按照原文翻译,因为那么搞太累了……这篇文件的确很长,而且Jay