Using Kafka with Flume

这个文档是 Cloudera Distribution of Apache Kafka 1.3.x. 其他版本的文档在Cloudera Documentation.

Using Kafka with Flume

在CDH 5.2.0 及更高的版本中, Flume 包含一个Kafka source and sink。使用它们可以让数据从Kafka流入Hadoop或者从任何Flume source 流入Kafka。

重要提示:不能配置一个Kafka source发送数据到 a Kafka sink.如果这么做, the Kafka source sets the topic in the event header, overriding the sink configuration and creating an infinite loop, sending messages back and forth between the source and sink. If you need to use both a source and a sink, use an interceptor to modify the event header and set a different topic.

Kafka Source

使用Kafka source 让数据从Kafka topics 流入 Hadoop. The Kafka source 可以与任何Flume sink合并, 这样很容易把数据从 Kafka 写到 HDFS, HBase, 以及Solr.

下面的 Flume 配置示例,是使用 Kafka source 发送数据到 HDFS sink:

tier1.sources  = source1
 tier1.channels = channel1
 tier1.sinks = sink1

 tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
 tier1.sources.source1.zookeeperConnect = zk01.example.com:2181
 tier1.sources.source1.topic = weblogs
 tier1.sources.source1.groupId = flume
 tier1.sources.source1.channels = channel1
 tier1.sources.source1.interceptors = i1
 tier1.sources.source1.interceptors.i1.type = timestamp
 tier1.sources.source1.kafka.consumer.timeout.ms = 100

 tier1.channels.channel1.type = memory
 tier1.channels.channel1.capacity = 10000
 tier1.channels.channel1.transactionCapacity = 1000

 tier1.sinks.sink1.type = hdfs
 tier1.sinks.sink1.hdfs.path = /tmp/kafka/%{topic}/%y-%m-%d
 tier1.sinks.sink1.hdfs.rollInterval = 5
 tier1.sinks.sink1.hdfs.rollSize = 0
 tier1.sinks.sink1.hdfs.rollCount = 0
 tier1.sinks.sink1.hdfs.fileType = DataStream
 tier1.sinks.sink1.channel = channel1

为了更高的吞吐量, 可以配置多个Kafka sources读取一个 topic.如果所有sources配置一个相同的groupID, 并且topic 有多个分区, 设置每一个source 从不同的分区读取数据,就可以改善效率.

下面的列表描述Kafka source 支持的参数; 必须的参数使用粗体列出
.

Table 1. Kafka Source Properties

Property Name Default Value Description
type   必须设置为org.apache.flume.source.kafka.KafkaSource.
zookeeperConnect   The URI of the ZooKeeper server or quorum used by Kafka. This can be a single node (for example, zk01.example.com:2181) or a comma-separated list of nodes in a ZooKeeper quorum (for example, zk01.example.com:2181,zk02.example.com:2181, zk03.example.com:2181).
topic   source 读取消息的Kafka topic。 Flume 每个source只支持一个 topic.。
groupID flume The unique identifier of the Kafka consumer group. Set the same groupID in all sources to indicate that they belong to the same consumer group.
batchSize 1000 向channel写入消息的最多条数
batchDurationMillis 1000 向channel书写的最大时间 (毫秒)  。 
其他Kafka consumer  支持的属性   通过Kafka source配置Kafka consumer。可以使用任何consumer 支持的属性。 Prepend the consumer property name with the prefix kafka. (for example, kafka.fetch.min.bytes). See the Kafka documentation for the full list of Kafka consumer properties.

调优

Kafka source 重写了两个Kafka consumer 的属性:

  1. auto.commit.enable 设置为 false by the source, and every batch is committed. 为了改善性能, 设置为 true 改为使用 kafka.auto.commit.enable。 这个可能会丢失数据 if the source goes down before committing.
  2. consumer.timeout.ms设置为 10, so when Flume polls Kafka for new data, it waits no more than 10 ms for the data to be available. Setting this to a higher value can reduce CPU utilization due to less frequent polling, but introduces latency in writing batches to the channel.

Kafka Sink

使用Kafka sink 从一个 Flume source发送数据到 Kafka . You can use the Kafka sink in addition to Flume sinks such as HBase or HDFS.

The following Flume configuration example uses a Kafka sink with an exec source:

tier1.sources  = source1
 tier1.channels = channel1
 tier1.sinks = sink1

 tier1.sources.source1.type = exec
 tier1.sources.source1.command = /usr/bin/vmstat 1
 tier1.sources.source1.channels = channel1

 tier1.channels.channel1.type = memory
 tier1.channels.channel1.capacity = 10000
 tier1.channels.channel1.transactionCapacity = 1000

 tier1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
 tier1.sinks.sink1.topic = sink1
 tier1.sinks.sink1.brokerList = kafka01.example.com:9092,kafka02.example.com:9092
 tier1.sinks.sink1.channel = channel1
 tier1.sinks.sink1.batchSize = 20

The following table describes parameters the Kafka sink supports; required properties are listed in bold.

Table 2. Kafka Sink Properties

Property Name Default Value Description
type   必须设置为: org.apache.flume.sink.kafka.KafkaSink.
brokerList   The brokers the Kafka sink uses to discover topic partitions, formatted as a comma-separated list of hostname:port entries. You do not need to specify the entire list of brokers, but Cloudera recommends that you specify at least two for high availability.
topic default-flume-topic The Kafka topic to which messages are published by default. If the event header contains a topic field, the event is published to the designated topic, overriding the configured topic.
batchSize 100 The number of messages to process in a single batch. Specifying a larger batchSize can improve throughput and increase latency.
requiredAcks 1 The number of replicas that must acknowledge a message before it is written successfully. Possible values are 0 (do not wait for an acknowledgement), 1 (wait for the leader to acknowledge only), and -1 (wait for all replicas to acknowledge). To avoid potential loss of data in case of a leader failure, set this to -1.
其他Kafka producer所支持的属性   Used to configure the Kafka producer used by the Kafka sink. You can use any producer properties supported by Kafka. Prepend the producer property name with the prefix kafka. (for example, kafka.compression.codec). See the Kafka documentation for the full list of Kafka producer properties.

Kafka sink 使用 topic 以及 key properties from the FlumeEvent headers to determine where to send events in Kafka. If the header contains the topic property, that event is sent to the designated topic, overriding the configured topic. If the header contains the key property, that key is used to partition events within the topic. Events with the same key are sent to the same partition. If the key parameter is not specified, events are distributed randomly to partitions. Use these properties to control the topics and partitions to which events are sent through the Flume source or interceptor.

Kafka Channel

CDH 5.3 以及更高的版本包含一个Kafka channel to Flume in addition to the existing memory and file channels. 可以使用Kafka channel:

  • To write to Hadoop directly from Kafka without using a source.不使用source,从Kafka直接向hadoop中写数据。
  • To write to Kafka directly from Flume sources without additional buffering.不使用额外的缓冲区直接从Flume source向Kafka写数据。
  • As a reliable and highly available channel for any source/sink combination.可以与任何source/sink结合。

如下的 Flume 配置使用了一个Kafka channel 以及一个exec source 和 hdfs sink:

tier1.sources = source1
tier1.channels = channel1
tier1.sinks = sink1

tier1.sources.source1.type = exec
tier1.sources.source1.command = /usr/bin/vmstat 1
tier1.sources.source1.channels = channel1

tier1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
tier1.channels.channel1.capacity = 10000
tier1.channels.channel1.transactionCapacity = 1000
tier1.channels.channel1.brokerList = kafka02.example.com:9092,kafka03.example.com:9092
tier1.channels.channel1.topic = channel2
tier1.channels.channel1.zookeeperConnect = zk01.example.com:2181
tier1.channels.channel1.parseAsFlumeEvent = true

tier1.sinks.sink1.type = hdfs
tier1.sinks.sink1.hdfs.path = /tmp/kafka/channel
tier1.sinks.sink1.hdfs.rollInterval = 5
tier1.sinks.sink1.hdfs.rollSize = 0
tier1.sinks.sink1.hdfs.rollCount = 0
tier1.sinks.sink1.hdfs.fileType = DataStream
tier1.sinks.sink1.channel = channel1

下面的列表描述了Kafka channel 所支持的参数; 粗体为必要参数.

Table 3. Kafka Channel Properties

Property Name Default Value Description
type   必须设置为:org.apache.flume.channel.kafka.KafkaChannel.
brokerList   The brokers the Kafka channel uses to discover topic partitions, formatted as a comma-separated list of hostname:port entries. You do not need to specify the entire list of brokers, but Cloudera recommends that you specify at least two for high availability.
zookeeperConnect   The URI of the ZooKeeper server or quorum used by Kafka. This can be a single node (for example, zk01.example.com:2181) or a comma-separated list of nodes in a ZooKeeper quorum (for example, zk01.example.com:2181,zk02.example.com:2181, zk03.example.com:2181).
topic flume-channel The Kafka topic the channel will use.
groupID flume The unique identifier of the Kafka consumer group the channel uses to register with Kafka.
parseAsFlumeEvent true Set to true if a Flume source is writing to the channel and expects AvroDataums with the FlumeEvent schema (org.apache.flume.source.avro.AvroFlumeEvent) in the channel. Set to false if other producers are writing to the topic that the channel is using.
readSmallestOffset false If true, reads all data in the topic. If false, reads only data written after the channel has started. Only used when parseAsFlumeEvent is false.
kafka.consumer.timeout.ms 100 当向sink写数据时轮询的间隔时间.
其他Kafka producer所支持的属性   Used to configure the Kafka producer. You can use any producer properties supported by Kafka. Prepend the producer property name with the prefix kafka. (for example, kafka.compression.codec). See the Kafka documentation for the full list of Kafka producer properties.

原文地址:http://www.cloudera.com/content/cloudera/en/documentation/cloudera-kafka/latest/topics/kafka_flume.html

时间: 2024-11-07 13:31:38

Using Kafka with Flume的相关文章

开源日志系统比较:scribe、chukwa、kafka、flume

1. 背景介绍 许多公司的平台每天会产生大量的日志(一般为流式数据,如,搜索引擎的pv,查询等),处理这些日志需要特定的日志系统,一般而言,这些系统需要具有以下特征: (1) 构建应用系统和分析系统的桥梁,并将它们之间的关联解耦: (2) 支持近实时的在线分析系统和类似于Hadoop之类的离线分析系统: (3) 具有高可扩展性.即:当数据量增加时,可以通过增加节点进行水平扩展. 本文从设计架构,负载均衡,可扩展性和容错性等方面对比了当今开源的日志系统,包括facebook的scribe,apac

scribe、chukwa、kafka、flume日志系统对比

scribe.chukwa.kafka.flume日志系统对比 1. 背景介绍许多公司的平台每天会产生大量的日志(一般为流式数据,如,搜索引擎的pv,查询等),处理 这些日志需要特定的日志系统,一般而言,这些系统需要具有以下特征:(1) 构建应用系统和分析系统的桥梁,并将它们之间的关联解耦:(2) 支持近实时的在线分析系统和类似于Hadoop之类的离线分析系统:(3) 具有高可扩展性.即:当数据量增加时,可以通过增加节点进行水平扩展. 本文从设计架构,负载均衡,可扩展性和容错性等方面对比了当今开

【采集层】Kafka 与 Flume 如何选择

采集层 主要可以使用Flume, Kafka两种技术. Flume:Flume 是管道流方式,提供了很多的默认实现,让用户通过参数部署,及扩展API. Kafka:Kafka是一个可持久化的分布式的消息队列. Kafka 是一个非常通用的系统.你可以有许多生产者和很多的消费者共享多个主题Topics.相比之下,Flume是一个专用工具被设计为旨在往HDFS,HBase发送数据.它对HDFS有特殊的优化,并且集成了Hadoop的安全特性.所以,Cloudera 建议如果数据被多个系统消费的话,使用

KafKa+Zookeeper+Flume部署脚本

喜欢学习的朋友可以收藏 愿意了解框架技术或者源码的朋友直接加求求(企鹅):2042849237

hadoop+kafka+strom+flume第一步

1.所有主机需要安装JDK,并配置JDK环境变量 2.所有主机安装SSH,并相互间实现无密访问 3.修改主机hosts :文件/etc/hosts,保证各机器通过机器名可以互访 4.安装python 2.6及以上(storm用) 5.ZeroMQ Java代码 wget http://download.zeromq.org/zeromq-2.1.7.tar.gz tar -xzf zeromq-2.1.7.tar.gz cd zeromq-2.1.7 ./configure make sudo 

flume + kafka + sparkStreaming + HDFS 构建实时日志分析系统

一.Hadoop配置安装 注意:apache提供的hadoop-2.6.0的安装包是在32位操作系统编译的,因为hadoop依赖一些C++的本地库, 所以如果在64位的操作上安装hadoop-2.4.1就需要重新在64操作系统上重新编译 1.修改Linux主机名 2.修改IP 3.修改主机名和IP的映射关系 ######注意######如果你们公司是租用的服务器或是使用的云主机(如华为用主机.阿里云主机等) /etc/hosts里面要配置的是内网IP地址和主机名的映射关系 4.关闭防火墙 5.s

Flume 与Kafka区别

今天开会讨论日志处理为什么要同时使用Flume和Kafka,是否可以只用Kafka 不使用Flume?当时想到的就只用Flume的接口多,不管是输入接口(socket 和 文件)以及输出接口(Kafka/HDFS/HBase等). 考虑单一应用场景,从简化系统的角度考虑,在满足应用需求的情况下可能只使用一个比较好.但是考虑到现有系统业务发展,为了后面的灵活扩展,在先用系统设计时留有一定的扩展性感觉更重要,可能使用Flume+kafka架构相对只使用Kafka会多占用1-2台机器做Flume日志采

Flume简介与使用(三)——Kafka Sink消费数据之Kafka安装

前面已经介绍了如何利用Thrift Source生产数据,今天介绍如何用Kafka Sink消费数据. 其实之前已经在Flume配置文件里设置了用Kafka Sink消费数据 agent1.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink agent1.sinks.kafkaSink.topic = TRAFFIC_LOG agent1.sinks.kafkaSink.brokerList = 10.208.129.3:90

Spark学习八:spark streaming与flume和kafka集成

Spark学习八:spark streaming与flume和kafka集成 标签(空格分隔): Spark Spark学习八spark streaming与flume和kafka集成 一Kafka 二flume和kafka的集成 三kafka和spark streaming的集成方式一kafka推送 四kafka和spark streaming的集成方式一spark streaam主动获取 五spark stream的高级应用updateStateByKey实现累加功能 六spark stre