Flume连接Kafka的broker出错

在启动Flume的时候,出现下面的异常,但是程序照样能运行,Kafka也能够收到数据,只是偶尔会断点。

2016-08-25 15:32:54,561 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Fetching metadata from broker id:2,host:10.208.129.5,port:9092 with correlation id 192126244 for 1 topic(s) Set(TRAFFICxxx_LOG)
2016-08-25 15:32:54,562 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - kafka.utils.Logging$class.error(Logging.scala:103)] Producer connection to 10.208.129.5:9092 unsuccessful
java.net.ConnectException: Connection refused
        at sun.nio.ch.Net.connect0(Native Method)
        at sun.nio.ch.Net.connect(Net.java:454)
        at sun.nio.ch.Net.connect(Net.java:446)
        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:648)
        at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
        at kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
        at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156)
        at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
        at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
        at kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)
        at kafka.utils.Utils$.swallow(Utils.scala:167)
        at kafka.utils.Logging$class.swallowError(Logging.scala:106)
        at kafka.utils.Utils$.swallowError(Utils.scala:46)
        at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
        at kafka.producer.Producer.send(Producer.scala:76)
        at kafka.javaapi.producer.Producer.send(Producer.scala:42)
        at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:129)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:745)
2016-08-25 15:32:54,563 (SinkRunner-PollingRunner-DefaultSinkProcessor) [WARN - kafka.utils.Logging$class.warn(Logging.scala:89)] Fetching topic metadata with correlation id 192126244 for topics [Set(TRAFFICxxx_LOG)] from broker [id:2,host:10.208.129.5,port:9092] failed
java.net.ConnectException: Connection refused
        at sun.nio.ch.Net.connect0(Native Method)
        at sun.nio.ch.Net.connect(Net.java:454)
        at sun.nio.ch.Net.connect(Net.java:446)
        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:648)
        at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
        at kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
        at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156)
        at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
        at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
        at kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)
        at kafka.utils.Utils$.swallow(Utils.scala:167)
        at kafka.utils.Logging$class.swallowError(Logging.scala:106)
        at kafka.utils.Utils$.swallowError(Utils.scala:46)
        at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
        at kafka.producer.Producer.send(Producer.scala:76)
        at kafka.javaapi.producer.Producer.send(Producer.scala:42)
        at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:129)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:745)
2016-08-25 15:32:54,564 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Fetching metadata from broker id:1,host:10.208.129.4,port:9092 with correlation id 192126244 for 1 topic(s) Set(TRAFFICxxx_LOG)
2016-08-25 15:32:54,565 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - kafka.utils.Logging$class.error(Logging.scala:103)] Producer connection to 10.208.129.4:9092 unsuccessful
java.net.ConnectException: Connection refused
        at sun.nio.ch.Net.connect0(Native Method)
        at sun.nio.ch.Net.connect(Net.java:454)
        at sun.nio.ch.Net.connect(Net.java:446)
        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:648)
        at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
        at kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
        at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156)
        at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
        at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
        at kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)
        at kafka.utils.Utils$.swallow(Utils.scala:167)
        at kafka.utils.Logging$class.swallowError(Logging.scala:106)
        at kafka.utils.Utils$.swallowError(Utils.scala:46)
        at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
        at kafka.producer.Producer.send(Producer.scala:76)
        at kafka.javaapi.producer.Producer.send(Producer.scala:42)
        at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:129)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:745)
2016-08-25 15:32:54,566 (SinkRunner-PollingRunner-DefaultSinkProcessor) [WARN - kafka.utils.Logging$class.warn(Logging.scala:89)] Fetching topic metadata with correlation id 192126244 for topics [Set(TRAFFICxxx_LOG)] from broker [id:1,host:10.208.129.4,port:9092] failed
java.net.ConnectException: Connection refused
        at sun.nio.ch.Net.connect0(Native Method)
        at sun.nio.ch.Net.connect(Net.java:454)
        at sun.nio.ch.Net.connect(Net.java:446)
        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:648)
        at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
        at kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
        at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156)
        at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
        at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
        at kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)
        at kafka.utils.Utils$.swallow(Utils.scala:167)
        at kafka.utils.Logging$class.swallowError(Logging.scala:106)
        at kafka.utils.Utils$.swallowError(Utils.scala:46)
        at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
        at kafka.producer.Producer.send(Producer.scala:76)
        at kafka.javaapi.producer.Producer.send(Producer.scala:42)
        at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:129)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:745)
2016-08-25 15:32:54,566 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Fetching metadata from broker id:0,host:10.208.129.3,port:9092 with correlation id 192126244 for 1 topic(s) Set(TRAFFICxxx_LOG)
2016-08-25 15:32:54,567 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Connected to 10.208.129.3:9092 for producing
2016-08-25 15:32:54,569 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Disconnecting from 10.208.129.3:9092
2016-08-25 15:32:54,570 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Disconnecting from kafka2:9092
2016-08-25 15:32:54,572 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Connected to kafka5:9092 for producing

一开始怀疑.4和.5号机器ping不通或者端口没开,然后查了下端口,发现.4和.5上的Kafka没启动。为了验证这个还专门去Zookeeper上查看了下Kafka的节点信息。

10.208.129.3:2182上有Zookeeper,但上面没有跑Kafka,只跑了4个Storm拓扑
[zk: 10.208.129.3:2182(CONNECTED) 0] ls /
[consumers, storm, brokers, zookeeper]
[zk: 10.208.129.3:2182(CONNECTED) 1] ls /storm
[workerbeats, storms, supervisors, errors, assignments]
[zk: 10.208.129.3:2182(CONNECTED) 2] ls /storm/storms
[rule-hot-config-stat-5-1471598199, traffic-info-stat-8-1471599337, ruleStatTopology-9-1471599960, rule-isp-stat-4-1471598183]
[zk: 10.208.129.3:2182(CONNECTED) 3] ls /consumers   
[]
[zk: 10.208.129.3:2182(CONNECTED) 4] ls /brokers  
[topics]
[zk: 10.208.129.3:2182(CONNECTED) 5] ls /brokers/topics
[]
[zk: 10.208.129.3:2182(CONNECTED) 6] ls /zookeeper     
[quota]
[zk: 10.208.129.3:2182(CONNECTED) 7] ls /zookeeper/quota
[]

10.208.129.4:2181上有Zookeeper,上面跑了Kafka,并且显示启动的Kafka broker id为144、139、141分别对应10.208.129.3、10.208.129.6、10.208.129.11。
[zk: 10.208.129.4:2181(CONNECTED) 0] ls /
[traffic-infoxxx-stat-offset, rule-hot-config-offset, rule-info-stat-offset, traffic-info-stat-offset, zookeeper, rule-isp-stat-offset, consumers, kafka, hive_zookeeper_namespace_hive, rule-config-stat-offset, brokers]
[zk: 10.208.129.4:2181(CONNECTED) 1] ls /kafka
[consumers, config, controller, brokers, admin, controller_epoch]
[zk: 10.208.129.4:2181(CONNECTED) 2] ls /kafka/brokers
[topics, ids]
[zk: 10.208.129.4:2181(CONNECTED) 3] ls /kafka/brokers/ids
[144, 139, 141]

因为Kafka是别的团队搭建的,不好改动。所以只能改Flume的配置文件,修改如下:

agent1.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kafkaSink.topic = TRAFFICxxx_LOG
#agent1.sinks.kafkaSink.brokerList = 10.208.129.3:9092,10.208.129.4:9092,10.208.129.5:9092
#agent1.sinks.kafkaSink.metadata.broker.list = 10.208.129.3:9092,10.208.129.4:9092,10.208.129.5:9092
agent1.sinks.kafkaSink.brokerList = 10.208.129.3:9092,10.208.129.6:9092,10.208.129.11:9092
agent1.sinks.kafkaSink.metadata.broker.list = 10.208.129.3:9092,10.208.129.6:9092,10.208.129.11:9092
agent1.sinks.kafkaSink.producer.type=sync
agent1.sinks.kafkaSink.serializer.class=kafka.serializer.DefaultEncoder
agent1.sinks.kafkaSink.channel = memoryChannel

将broke.list和metadata.broker.list修改为启动了Kafka的ip,一切运行正常

时间: 2025-01-03 18:38:03

Flume连接Kafka的broker出错的相关文章

Flume和Kafka

本文是学习时的自我总结,用于日后温习.如有错误还望谅解,不吝赐教 此处附上部分内容所出博客:http://blog.csdn.net/ymh198816/article/details/51998085 Flume+Kafka+Storm+Redis实时分析系统基本架构 1)    整个实时分析系统的架构是 2)    先由电商系统的订单服务器产生订单日志, 3)    然后使用Flume去监听订单日志, 4)    并实时把每一条日志信息抓取下来并存进Kafka消息系统中, 5)    接着由

消息系统Flume与Kafka的区别

首先Flume和Kafka都是消息系统,但是它俩也有着很多不同的地方,Flume更趋向于消息采集系统,而Kafka更趋向于消息缓存系统. [一]设计上的不同 Flume是消息采集系统,它主要解决问题是消息的多元采集.因此Flume在实现上提供了多达十几种的Flume Source,以供用户根据不同的应用场景来采集数据.也正因为Flume提供了这 些采集消息的Flume Source,使得用户采集消息变得很简单,用户往往只需要对原始数据稍作处理然后将数据发送给Flume Source.在Flume

flume写kafka topic覆盖问题fix

结构: nginx-flume->kafka->flume->kafka(因为牵扯到跨机房问题,在两个kafka之间加了个flume,蛋疼..) 现象: 在第二层,写入kafka的topic和读取的kafka的topic相同,手动设定的sink topic不生效 打开debug日志: source实例化: 21 Apr 2015 19:24:03,146 INFO [conf-file-poller-0] (org.apache.flume.source.DefaultSourceFac

flume从kafka中读取数据

a1.sources = r1 a1.sinks = k1 a1.channels = c1 #使用内置kafka source a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource #kafka连接的zookeeper a1.sources.r1.zookeeperConnect = localhost:2181 a1.sources.r1.topic = kkt-test-topic a1.sources.r1.batc

Kafka实战-Flume到Kafka (转)

原文链接:Kafka实战-Flume到Kafka 1.概述 前面给大家介绍了整个Kafka项目的开发流程,今天给大家分享Kafka如何获取数据源,即Kafka生产数据.下面是今天要分享的目录: 数据来源 Flume到Kafka 数据源加载 预览 下面开始今天的分享内容. 2.数据来源 Kafka生产的数据,是由Flume的Sink提供的,这里我们需要用到Flume集群,通过Flume集群将Agent的日志收集分发到 Kafka(供实时计算处理)和HDFS(离线计算处理).关于Flume集群的Ag

Spark学习八:spark streaming与flume和kafka集成

Spark学习八:spark streaming与flume和kafka集成 标签(空格分隔): Spark Spark学习八spark streaming与flume和kafka集成 一Kafka 二flume和kafka的集成 三kafka和spark streaming的集成方式一kafka推送 四kafka和spark streaming的集成方式一spark streaam主动获取 五spark stream的高级应用updateStateByKey实现累加功能 六spark stre

Flume、Kafka结合

Todo: 对Flume的sink进行重构,调用kafka的消费生产者(producer)发送消息; 在Sotrm的spout中继承IRichSpout接口,调用kafka的消息消费者(Consumer)来接收消息,然后经过几个自定义的Bolt,将自定义的内容进行输出 编写KafkaSink 从$KAFKA_HOME/lib下复制 kafka_2.10-0.8.2.1.jar kafka-clients-0.8.2.1.jar scala-library-2.10.4.jar 到$FLUME_H

使用flume将kafka数据sink到HBase【转】

1. hbase sink介绍 1.1 HbaseSink 1.2 AsyncHbaseSink 2. 配置flume 3. 运行测试flume 4. 使用RegexHbaseEventSerializer来处理些HBASE的值 5. 效率测试 1. hbase sink介绍 如果还不了解flume请查看我写的其他flume下的博客. 接下来的内容主要来自flume官方文档的学习. 顺便也强烈推荐flume 1.6 官方API hbase的sink主要有以下两种.两种方式都提供和HBASE一样的

ambari下的flume和kafka整合

1.配置flume 1 #扫描指定文件配置 2 agent.sources = s1 3 agent.channels = c1 4 agent.sinks = k1 5 6 agent.sources.s1.type=exec 7 agent.sources.s1.command=tail -F /home/flume/test/test.log 8 agent.sources.s1.channels=c1 9 agent.channels.c1.type=memory 10 agent.ch