kafka无法发送消息问题处理

背景

在服务器上搭建了一个单机环境的kafka broker,在服务器上使用命令生产消息时,一切正常。当在本地使用JAVA程序发送消息时,一直出错。

抛出的错误为:

Exception in thread "main" Failed to send requests for topics test with correlation ids in [0,12]
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.

问题追踪

最初怀疑是防火墙限制了端口,因此在本地使用telnet连接服务器端口,发现无法连接,因此关闭服务器防火墙。重试,还是抛同样的问题。

后来查看kafka日志,发现TCP连接可以正常关闭,而且IP也是客户端的IP,证明网络没问题,客户端确实可以连上kafka服务器。

那到底是什么问题呢?

通过查看kafka配置,发现有个属性:advertised.host.name。官方文档里的备注信息表明,该字段的值是生产者和消费者使用的。如果没有设置,则会取host.name的值,默认情况下,该值为localhost。思考一下,如果生产者拿到localhost这个值,只往本地发消息,必然会报错(因为本地没有kafka服务器)。

问题处理

将advertised.host.name设置为服务器IP地址,经测试,消息顺利发送。

总结

其实,在生产者的日志中,也看到先连接kafka服务器,然后关闭;然后又连接了本地,再关闭。

Connected to 192.168.56.101:9092 for producing
Disconnecting from 192.168.56.101:9092
Connected to localhost:9092 for producing
Disconnecting from localhost:9092

但是因为不了解kafka的配置信息,所以也没仔细分析。

另外,在配置生产者时,metadata.broker.list会设置成kafka服务器的IP和地址。但这个只是获取一些元信息,后续发送消息时会根据获取的元信息来发送,而获取得元信息中,由于advertised.host.name被默认为localhost,所以本地当然会把消息发到本地,结果导致问题出现。

版权声明:本文为博主原创文章,未经博主允许不得转载。

时间: 2024-12-10 23:10:39

kafka无法发送消息问题处理的相关文章

【原创】Kafka接受发送消息对象Object基础版

首先感谢 kafka 中国社区 王扬庭例子的帮助和指导~~~~~(kafka_2.9.2-0.8.1.1) kafka常用的发送消息的方法如下: Properties props = new Properties(); props.put("zookeeper.connect", "slaves2:2181,slaves3:2181,slaves4:2181"); props.put("serializer.class", "kafka

Golang之发送消息至kafka

windows下安装zookeeper 1.安装JAVA-JDK,从oracle下载最新的SDK安装(我用的是1.8的) 2.安装zookeeper3.3.6,下载地址:http://apache.fayea.com/zookeeper/ 3.重命名conf/zoo_sample.cfg 为conf/zoo.cfg 4.编辑 conf/zoo.cfg,修改dataDir=D:\zookeeper-3.3.6\data\ 4.运行bin/zkServer.cmd 启动结果如下: 安装kafka 1

log4j2发送消息至Kafka

title: 自定义log4j2发送日志到Kafka 图片描述(最多50字) tags: log4j2,kafka 为了给公司的大数据平台提供各项目组的日志,而又使各项目组在改动上无感知.做了一番调研后才发现log4j2默认有支持将日志发送到kafka的功能,惊喜之下赶紧看了下log4j对其的实现源码!发现默认的实现是同步阻塞的,如果kafka服务一旦挂掉会阻塞正常服务的日志打印,为此本人在参考源码的基础上做了一些修改. log4j日志工作流程 log4j2对于log4j在性能上有着显著的提升,

kafka producer batch 发送消息

1. 使用 KafkaProducer 发送消息,是按 batch 发送的,producer 首先把消息放入 ProducerBatch 中: org.apache.kafka.clients.producer.internals.ProducerBatch 2. KafkaProduer 类中有一个 Thread 属性,负责 IO,发送和接收数据: this.sender = new Sender(logContext, client, this.metadata, this.accumula

java客户端向单机版kafka发送消息没有接收到

kafka版本:kafka_2.11-0.10.0.0 在kafka服务器命令发送消息,消费者可以接受到, 但是在java客户端向kafka发送消息时消费者接受不到, 在kafka/config/sever.properties把这个注解打开 advertised.listeners=PLAINTEXT://ip.137:9092        #本机服务器ip 意思就是说:hostname.port都会广播给producer.consumer.如果你没有配置了这个属性的话,则使用listene

NET中解决KafKa多线程发送多主题的问题

一般在KafKa消费程序中消费可以设置多个主题,那在同一程序中需要向KafKa发送不同主题的消息,如异常需要发到异常主题,正常的发送到正常的主题,这时候就需要实例化多个主题,然后逐个发送. 在NET中用RdKafka组件来做消息处理,在Nuget中引用. 在程序中初始化Producer,并创建多个Topic private string comtopic = "topic1"; private string errtopic = "topic2"; private

【原创】Kafka 0.11消息设计

Kafka 0.11版本增加了很多新功能,包括支持事务.精确一次处理语义和幂等producer等,而实现这些新功能的前提就是要提供支持这些功能的新版本消息格式,同时也要维护与老版本的兼容性.本文将详细探讨Kafka 0.11新版本消息格式的设计,其中会着重比较新旧两版本消息格式在设计上的异同.毕竟只有深入理解了Kafka的消息设计,我们才能更好地学习Kafka所提供的各种功能. 1. Kafka消息层次设计 不管是0.11版本还是之前的版本,Kafka的消息层次都是分为两层:消息集合(messa

Kafka 自定义指定消息partition策略规则及DefaultPartitioner源码分析

Kafka 自定义指定消息partition策略规则及DefaultPartitioner源码分析 一.概述 kafka默认使用DefaultPartitioner类作为默认的partition策略规则,具体默认设置是在ProducerConfig类中(如下图) 二.DefaultPartitioner.class 源码分析 1.类关系图 2.源码分析 public class DefaultPartitioner implements Partitioner { //缓存map key->to

Kafka 生产者无消息丢失配置

prodcer同步发送的机制虽然能保证消息不丢失,但是不推荐生产环境使用,性能很差.一般都是采用异步方式发送消息,把消息先放入缓冲区,然后再由IO线程推送出去,存在消息丢失的风险,而且可能发生乱序. 下面给出Kafka无消息丢失的配置: Producer端配置 block.on.buffer.full = true0.9版本后已经废弃,改用max.block.ms代替,推荐用户显示设置为true,使得缓冲期填满时,producer处于阻塞状态并停止接收新的消息而不是抛出异常. acks = al