背景:系统的数据量越来越大,日志不能再简单的文件的保存,如此日志将会越来越大,也不方便查找与分析,综合考虑下使用了flume来收集日志,收集日志后向kafka传递消息,下面给出具体的配置
# The configuration file needs to define the sources, # the channels and the sinks. # Sources, channels and sinks are defined per agent, # in this case called ‘agent‘ agent.sources = r1 agent.channels = c1 agent.sinks = s1 # For each one of the sources, the type is defined agent.sources.r1.type = netcat agent.sources.r1.bind = localhost agent.sources.r1.port = 10245 agent.sources.r1.charset = UTF-8 # The channel can be defined as follows. agent.sources.r1.channels = c1 # Each sink‘s type must be defined agent.sinks.s1.type = org.apache.flume.sink.kafka.KafkaSink agent.sinks.s1.topic = test agent.sinks.s1.brokerList = ip:9092 agent.sinks.s1.requiredAcks = 1 agent.sinks.s1.batchSize = 20 agent.sinks.s1.channel = c1 # Each channel‘s type is defined. agent.channels.c1.type = memory # Other config values specific to each type of channel(sink or source) # can be defined as well # In this case, it specifies the capacity of the memory channel agent.channels.c1.capacity = 100
启动方式:
bin/flume-ng agent --conf conf --conf-file conf/kafka.conf --name agent -Dflume.root.logger=INFO,console
再启动之前一定要先启动kafka,这里可能会有一个错误
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Batch Expired
这个是因为默认情况下kafka是广播的localhost,所以如果不是同一个机器需要修改下配置
advertised.listeners=PLAINTEXT://ip:9092把默认的localhost替换成IP地址 重新启动下就可以了.
时间: 2024-09-30 19:04:47