Kafka Connect 简介
Kafka Connect 是一个可以在Kafka与其他系统之间提供可靠的、易于扩展的数据流处理工具。使用它能够使得数据进出Kafka变得很简单。Kafka Connect有如下特性:
·是一个通用的构造kafka connector的框架
·有单机、分布式两种模式。开发时建议使用单机模式,生产环境下使用分布式模式。
·提供restful的管理connector的API。
·自动化的offset管理。Kafka Connect自动的管理offset提交。
·分布式、可扩展。采用与concumer group中对partition rebalance同样的机制来管理在worker group中的connector、task。
·流/批处理的集成。
接下来会对Kafka Connect做一个全面的分析,来帮助了解上述特性。
- 1、Kafka Connect 组件说明
- 1.1 Worker
- 1.2 Connector
- 1.3 Task
- 1.3.1 Kafka Connect 与KafkaProducer、KafkaConsumer的关系
- 1.4 Converter
- 2、Rebalance
- 3、Storage
- 3.1 内置Storage
- 4、Connector 管理
- 4.1 Herder
- 4.2 Restful
- 4.3 通用Connector Plugin升级步骤
- 5、Configuration
- 5.1 Worker
- 5.1.1 Common
- 5.1.2 Standalone
- 5.1.3 Distributed
- 5.1 Worker
1、基本组件:
1.1 Worker
Worker用于调度source task、sink task 来处理数据的进出Kafka。一个Worker中包括多个Connector。一个Connector与多个task关联。当启动(或者加入)一个Connector时,与之关联的Tasks会被创建并提交到executor去执行;停止(或移除)一个Connector时,与之关联的Tasks会被停止并移除。也就是说Worker管理Connector插件的插、拔,Task的启、停。
1.2 Connector
Connector,可以看做是Kafka Connect的插件,用来与其他系统进行集成。有两类:SourceConnector、SinkConnector。如果要实现一个Connector,那么必须继承这两个类中的一个,这是硬性要求。Connector 与Task关联,Worker对Connector的启、停、插、拔,其实最终反映在对Task启、停、插、拔上。
Connector主要为创建Task实例提供配置,可以提供connector本身的配置,也可以提供task的配置。
1.3 Task
Task是由Worker中的executor来执行的。在Worker中包括了所有的WorkerTask。WorkerTask有两类:WorkerSourceTask、WorkSinkTask。
在运行过程中,WorkerSourceTask不断的做如下调用:
1)调用SourceTask.poll()方法取得数据SourceRecords,
2)使用相应的Converter将SourceRecords转换为ProducerRecords,
3)并由KafkaProducer#send推给Kafka Broker
在运行过程中,WorkerSinkTask不断的做如下调用:
1)调用KafkaConsumer#poll方法从Kafka Broker拉取ConsumerRecords,
2)使用相应的Converter将ConsumerRecords转换为SinkRecords,
3)调用SinkTask#flush方法来执行自定义的处理
4)提交 offset到Kafka Broker。
不难看出,只有系统运行正常,SinkTask、SourceTask都会被一次又一次的调用。即一个Task实例会占用一个线程。如果一个为Connector配置了task数量大于1,就会有多个tasks实例(这多个Task实例是同一个Java类的多个实例)与该Connector关联,那么这多个tasks在运行时必然是并发执行的。这就要求SourceTask#poll()、SinkTask#flush()在运行时能够保证线程安全性。
另外从WorkerSinkTask#flush如果出现异常,是不会提交offset的。此外,还会调用consumer#seek找到上一次提交的offset,seek的作用是:让下一次使用consumer#poll方法时,从上一次提交的offset开始poll。这样就可以保证数据在consume过程中不会丢失。
1.3.1 Kafka Connect与Kafka Producer、Kafka Consumer关系
Kafka Connect必然是要依赖于KafkaProducer、KafkaConsumer的。这种依赖关系到底是怎样的呢?从上面WorkerSinkTask、WorkerSourceTask的执行过程来看,他们是直接与KafkaConsumer、KafkaProducer直接关联的。经过源码的查看可以了解到:
在一个Worker内,所有的WorkerSourceTask共享同一个KafkaProducer对象来发送record到Kafka,这种设计也符合KafkaProducer的使用原则;每一个WorkerSinkTask拥有一个私有的KafkaConsumer,也就是说不存在KafkaConsumer共享使用的情况。
既然一个WorkerSinkTask独自使用一个KafkaConsumer,那么一个WorkerSinkTask就可以认为是一个独立的KafkaConsumer。那么如果一个WorkerSinkTask因某种原因启动、加入、重启、结束、中断、停止,就等于是说一个KafkaConsumer加入、退出consumer group,那么该consumer group 的Coordinator Broker就会发起rebalance,使得该KafkaConsumer被分配到的partiion分配到group 内其他的KafkaConsumer上。
在Kafka-Connect中,如何对KafkaConsumer进行分组呢?
从WorkerSinkTask的相关源码来看:
private KafkaConsumer<byte[], byte[]> createConsumer() { // Include any unknown worker configs so consumer configs can be set globally on the worker // and through to the task Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.GROUP_ID_CONFIG, "connect-" + id.connector()); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.join(workerConfig.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ",")); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); props.putAll(workerConfig.originalsWithPrefix("consumer.")); KafkaConsumer<byte[], byte[]> newConsumer; try { newConsumer = new KafkaConsumer<>(props); } catch (Throwable t) { throw new ConnectException("Failed to create consumer", t); } return newConsumer; } |
是把与同一个Connector关联的Task会分到一个Consumer Group中。如果要自定义并且 组名默认是connector的名字。如果要自定义Consumer的属性,可以在worker 的配置文件中使用consumer.前缀的方式来完成。
通过上述代码也可以看出,默认情况下,关闭了KafkaConsumer的自动提交offset,改为由Kafka Connect来完成offset的提交。
1.4 Converter
WorkerSinkTask、WorkerSourceTask的执行过程中,会使用Key的Converter、Value的Converter对Key、Value进行转换。
这样做的原因:
作为一个Connector框架,在进行producer#send、consumer#pool时,框架本身并不知道用户的程序中,如何进行序列化、反序列化操作。并且进入Kafka的数据类别不同,所需要的序列化、反序列化工具也不同。
为了解决这一问题,Kafka Connect中引入了Converter、Schema。
WorkerSoureTask中,将SourceRecord中的key、value取出来,经过Converter转换(序列化)成byte[],转成ProducerRecord。
for (final SourceRecord record : toSend) { byte[] key = keyConverter.fromConnectData(record.topic(), record.keySchema(), record.key()); byte[] value = valueConverter.fromConnectData(record.topic(), record.valueSchema(), record.value()); final ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(record.topic(), record.kafkaPartition(), key, value); … } |
WorkerSinkTask中,将consumer#poll到的ConsumerRecord转换(反序列化)成SinkRecord:
private void convertMessages(ConsumerRecords<byte[], byte[]> msgs) { for (ConsumerRecord<byte[], byte[]> msg : msgs) { log.trace("Consuming message with key {}, value {}", msg.key(), msg.value()); SchemaAndValue keyAndSchema = keyConverter.toConnectData(msg.topic(), msg.key()); SchemaAndValue valueAndSchema = valueConverter.toConnectData(msg.topic(), msg.value()); messageBatch.add( new SinkRecord(msg.topic(), msg.partition(), keyAndSchema.schema(), keyAndSchema.value(), valueAndSchema.schema(), valueAndSchema.value(), msg.offset()) ); } } |
2、Rebalance
Kafka Connect支持单机、分布式(集群)两种模式。在生产环境下,通常会使用分布式模式。
通常情况下业务系统集群的数据进入到kafka,然后有Consumer集群进行数据消费处理。如果使用Kafka Connect,集群方案就变得很简单并且扩展性很好。
根据前面所了解的内容来看,一个Connector关联到一类Task,可以指定该类Task的数目,也就是Consumer的数目。也就是说一个Connector关联到一个Consumer Group上。如果Worker是单机模式,那么这个Consumer Group内的多个Consumer就是在同一进程内。如果进程挂掉,也就是整个Consumer Group全部挂掉。
在使用集群模式(Worker Cluster)时,这个问题如果不解决,所谓的集群就没有任何意义。所以在Worker Cluster模式时,重点要解决上述问题。
通过之前的Consumer、Broker的学习,知道Broker会为每一个Consumer Group提供一个协处理器。在Broker Coordinator、 Consumer Leader的配合下,当有Consumer 加入或退出Consumer Group时,会对partition进行rebalance。使得partition相对均匀的分配给各个Consumer上,以此来避免出现热点(partition分配给少数的Consumer上)问题。
Kafka Connect在解决这个问题时,也采用了同样的方案:即使用Broker Coordinator 配合 Worker Leader的方式,对connector、task进行rebalance。这样一来,使connector、task相对均匀的分配到同一个Worker Group中的不同的Worker上。如此便可以解决上述问题了。
一个Worker Leader分配connector、task时的处理是:
private Map<String, ByteBuffer> performTaskAssignment(String leaderId, long maxOffset, Map<String, ConnectProtocol.WorkerState> allConfigs) { Map<String, List<String>> connectorAssignments = new HashMap<>(); Map<String, List<ConnectorTaskId>> taskAssignments = new HashMap<>(); // Perform round-robin task assignment CircularIterator<String> memberIt = new CircularIterator<>(sorted(allConfigs.keySet())); for (String connectorId : sorted(configSnapshot.connectors())) { String connectorAssignedTo = memberIt.next(); log.trace("Assigning connector {} to {}", connectorId, connectorAssignedTo); List<String> memberConnectors = connectorAssignments.get(connectorAssignedTo); if (memberConnectors == null) { memberConnectors = new ArrayList<>(); connectorAssignments.put(connectorAssignedTo, memberConnectors); } memberConnectors.add(connectorId); for (ConnectorTaskId taskId : sorted(configSnapshot.tasks(connectorId))) { String taskAssignedTo = memberIt.next(); log.trace("Assigning task {} to {}", taskId, taskAssignedTo); List<ConnectorTaskId> memberTasks = taskAssignments.get(taskAssignedTo); if (memberTasks == null) { memberTasks = new ArrayList<>(); taskAssignments.put(taskAssignedTo, memberTasks); } memberTasks.add(taskId); } } this.leaderState = new LeaderState(allConfigs, connectorAssignments, taskAssignments); return fillAssignmentsAndSerialize(allConfigs.keySet(), ConnectProtocol.Assignment.NO_ERROR, leaderId, allConfigs.get(leaderId).url(), maxOffset, connectorAssignments, taskAssignments); } |
下面这张图展示了某个Worker失败情况下,对Task Rebalance的情况:
一个Connector 1设置了5个task:
第一列表示所有的Worker都正常工作情况下 Task的分配。
第二列表示Worker 2进程出现故障, task2、task3也就挂掉了。
第三列表示对Task进行rebalance后,task2、task3被rebalance到其他的Worker名下了。
3、Storage
3.1 内置Storage
在Kafka Connect中,内置了3种Storages。这三种配置都会(不论是Standalone,还是Distributed)有一份基于内存的Storage。下面只是针对内存Storage以外的方式进行说明:
·status’s storage
存储各个connector、task的状态信息。在Worker分布式的情况下,存储在Kafka 的一个Topic中,topic的名字由worker配置项status.storage.topic来指定。
Status 有5种:
1) UNASSIGNED:connector或者task是否被分配到了个Worker上。
2) RUNNING: connector或者task是否正在运行。
3) PAUSED: connector或者task是否已暂停运行。
4) FAILED: connector或者task是否失败(通常是抛出异常)。
5) DESTROYED:connector或者task是否已被删除。
在运行时,会定时更新状态信息。
·offset’s storage
在Kafka Connect中,为每一个WorkerSourceTask ID对应了一个序号(不是ID)。每启动一个WorkerSourceTask,这个序号就会加1。
例如一个单机的Worker上,Connector提交了两个Tasks,那么首次启动Worker时,连个任务对应的offset都是0:
conn1-task1:0, conn1-task2:0
如果重启worker后,两个任务的offset仍然是0,不过有可能在某种情况下(失败后重启)就变成了1:
conn1-task1:1, conn1-task2:1。
对于分布式的Worker,在rebalance后,offset也会加1。
存储各个source task 的下一次创建的task实例的序号offset。在运行时会更新这些source task的offset信息。单机模式、分布式模式下,都会存储这个offset的。只是在Standalone模式下,是以本地File方式存储。分布式模式下,是存储在Kafka中的一个topic中。Topic的名称由worker配置项offset.storage.topic来指定。
这个offset与Broker中的Log的offset 没有任何的关系。
·config’s storage
在Kafka connect中,每一个Connector,以及与之关联的Task都会有一些配置信息。在rebalance后,还是需要用到这些配置的。为了使得Worker Group内共享配置,也需要对connector、task的配置进行存储。
分布式模式下,会在一个topic中存储。Topic的名称由worker配置项 config.storage.topic来指定。
4、Connector 管理
4.1 Herder
在前面的基本组件中,可以了解到,Worker可以对Connector进行启、停、插、拔。为了更加方便的管理Connector,Kafka-Connect中在Worker之外提供了一个工具Herder(翻译为汉语:牧人,其实也就是管理者的意思)。
使用Herder可以插、拔Connector,可以启、停Task,可以修改Connector的配置,获取connector、task的状态。
针对Standalone、Distributed模式,实现的Herder也是不尽相同的。一些修改性质的操作都是只有Worker Leader能够进行的。所以一个Worker Group中的某个Worker的管理者Herder接收到一个修改的请求时,会将该请求路由(通过Restful请求)到Worker Leader。
在使用Kafka Connect时,如果是将Kafka Connect内嵌到业务系统中时,我们是可以通过Herder 对象来对Kafka Connect中的组件进行管理操作的。
但是很多情况下,并不是将Kafka Connect内嵌到我们的系统中的,而是使用它的CLI独立的启动一个或者多个进程的。这种情况下,如何来管理Kafka Connect的组件呢?
4.2 Restful
为了方便管理,另外也专门提供了一种基于Restful的方式来管理connector、task。不论是Standalone还是Distributed模式都支持Restful方式管理。
Restful管理方式,会在Kafka-Connect进程内部内嵌一个基于Jetty的Web Server。默认的端口是8083。
下面会列出当前Kafka-Connector 版本0.10.1.0支持的Restful请求:
- GET /connectors – 获取当前活动的 connectors列表
- POST /connectors – 创建一个新的Connector。请求体是一个JSON对象,包括connector的名称,和配置。
{ name:”conn-2”, config:{ // connector的配置项。 } } |
- GET /connectors/{name} – 获取指定的connector的信息。
- GET /connectors/{name}/config - 获取指定的connector的配置项。
- PUT /connectors/{name}/config – 修改指定的connector的配置项。
- GET /connectors/{name}/status – 获取指定的connector的当前状态。包括: 1)connector的status, 2)分配在哪个worker上, 3)如果失败的话,错误信息,4)与之关联的所有任务的状态。
- GET /connectors/{name}/tasks – 获取指定connector的任务列表。
- GET /connectors/{name}/tasks/{taskid}/status – 获取指定的connector的指定task的状态信息。包括:1)该任务的status,2)该任务分配在哪个worker上,3)如果有失败,错误信息。
- PUT /connectors/{name}/pause – 暂停某个Connector。暂停某个connector时,与之关联的task都会暂停。
- PUT /connectors/{name}/resume – 继续某个Connector。如果这个connector根本就没有暂停,那就什么也不用做。
- POST /connectors/{name}/restart – 重启一个Connector。通常在一Connector失败时。
- POST /connectors/{name}/tasks/{taskId}/restart – 重启一个task。
- DELETE /connectors/{name} –删除一个Connector。删除Connector时,会停止它关联的task,并删除相关的配置信息。
Kafka Connect 也为获取Connector Plugin的信息提供了 REST API :
- GET /connector-plugins- 获取在改Kafka-Connect集群中安装的Connector Plugin列表。需要注意的是,该API只能获取到正在Worker中正常运行状态的connector。某些connector是看不到的,特别是在进行升级时,例如add一个connector jar包。
- PUT/connector-plugins/{connector-type}/config/validate 验证connector的配置。
4.3 通用Connector升级
目前已提供了很多通用的Connector插件。
地址:https://www.confluent.io/product/connectors/
在使用这些插件时,如果需要升级connector,需要按照下面步骤进行:
1) 下载新版的connector
2) 停止所有的Kafka connect workers。
3)根据connector plugin 的安装说明来安装。
4)启动workers
5)如果采用分布式模式,启动connector。
5、Configuration
5.1 Worker
Worker 可以在Standalone模式运行,也可以在Distributed模式下运行,两种情况下有不同的配置,所以这里需要对他们加以区分:
5.1.1 Common Configuration
·bootstrap.server
因为Worker必然要有至少一个KafkaProducer实例(分布式时有两个),所以需要至少配置一个Kafka broker的host:port对。如果是Broker集群模式,也没有必要将所有的broker配置上。格式是:host1:port1,host2:port2,host3:port3...
·key.converter, value.converter
配置record中的key、value 所采用的converter类名。比较流行的Converter有:JSON、Avro。
·internal.key.converter, internal.value.converter
配置connect-offset topic (也即 offset storage’s topic)中的record的key、value所采用的converter类名。比较流行的Converter有JSON、Avro。
·rest.host.name, rest.port
Rest Server的要绑定的IP和port。其中port模式是8083。
·session.time.out
每一个worker会周期性的发送一个heartbeat到Broker,以告诉Broker自己还活着。如果超过session.timeout.ms 还没有发送heartbeat给Broker,Worker就会断开相关的连接。那么Broker就不能收到heartbeat,就会导致Broker 认为该Worker已经挂掉了。然后Broker就会触发Rebalance,来重新分配connector、task。这个值必须配置的 group.min.session.timeout.ms 与 group.max.session.timeout.ms之间。
默认值:10000,即10s。
·heartbeat.interval.ms
心跳间隔。心跳用于keep-alive Worker与Broker之间的连接。这个值必须低于session.timeout.ms。建议设置的值不要高于session.timeout.ms的1/3。
默认值:3000,即3s 。
. rebalance.timeout.ms
一旦一个rebalance开始了,一个Worker加入到group的时间。在此期间,运行在该Worker上的所有的任务需要flush掉那些还没有处理完毕的数据,并且要提交offset。
如果超过了这个时间,offset commit就会失败。
默认值 60000,即60s 。
·connection.max.idle.ms
连接最大空闲时间。模式值:540000,即9 min。
·client.id
指定一个认为可读的worker id,只是用于跟踪。
5.1.2 Standalone Worker Configuration
·offset.storage.file.filename
存储connector offset 的文件。
5.1.3 Distributed Worker Configuration
·group.id
指定worker属于哪个worker group。
·config.storage.topic, offset.storage.topic, status.storage.topic
在一个Worker Group内, Workers 能够发现彼此并共享connector、Task相关的config、offset、status信息。这些共享信息分别存储在一个topic内。这3个配置项就是用来指定topic名称的。在同一个Worker Group下的每一个worker的配置文件中,这三项必须以一致的。
创建这三个topic时的注意事项:
1)Offset存储的topic 需要有很多个partation并且每一个partition至少三个replicas。
2)config存储的topic 只创建一个partition,并且至少3个replicas。
3)status存储的topic需要创建多个partition并且每一个partition至少3个replicas。
要了解更多信息,可以参考前面Storage部分。
如果想要了解更多关于Kafka-Connect的知识,可以参考:http://docs.confluent.io/current