Kafka版本升级 ( 0.10.0 -> 0.10.2 )

升级Kafka集群的版本其实很简单,核心步骤只需要4步,但是我们需要在升级的过程中确保每一步操作都不会“打扰”到producer和consumer的正常运转。为此,笔者在本机搭了一个测试环境进行实际的版本升级实验。在开始之前,简要介绍一下测试环境的部署情况及目标:Kafka 0.10.0.0 双broker测试环境,而目标是把该集群升级到0.10.2版本

两个broker启动时分别读取server.properties和server2.properties。

一、启动测试环境
打开两个终端,分别执行startBroker1.sh和startBroker2.sh。startBroker*.sh内容很简单就是:

CURRENT_PATH=<your_path>/kafka_2.11-0.10.0.0
cd $CURRENT_PATH
JMX_PORT=9997 bin/kafka-server-start.sh ../configs/server.properties

二、创建测试topic
创建一个双分区,replication-factor=2的topic:test,然后使用kafka-topics工具describe一下:

okay,目前一切正常。

三、启动producer

很简单的producer程序,每1秒发送一条消息,然后打印成功提交的消息数和提交失败的消息数。特别注意提交失败的消息数,后续我们依赖此值来确保升级流程不会影响到producer。 主要程序代码如下:

 1 Properties props = new Properties();
 2         props.put("bootstrap.servers", "localhost:9092,localhost:9093");
 3         props.put("acks", "all");
 4         props.put("retries", Integer.MAX_VALUE);
 5         props.put("batch.size", 16384);
 6         props.put("linger.ms", 1);
 7         props.put("buffer.memory", 33554432);
 8         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 9         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
10
11         Producer<String, String> producer = new KafkaProducer<>(props);
12         final AtomicInteger success = new AtomicInteger(0);
13         final AtomicInteger failed = new AtomicInteger(0);
14         try {
15             while (true) {
16                 producer.send(new ProducerRecord<String, String>("test", "a message"), new Callback() {
17                     @Override
18                     public void onCompletion(RecordMetadata metadata, Exception exception) {
19                         if (exception != null) {
20                             System.out.println("Current failed count: " + failed.incrementAndGet());
21                         } else
22                             System.out.println("Current success count: " + success.incrementAndGet()
23                                     + ", failed: " + failed.get());
24                     }
25                 });
26                 Thread.sleep(2000);
27
28             }
29         } finally {
30             producer.close();
31         } 

四、启动consumer

为简单起见,我使用了console-consumer,如下所示。另外, 是Kafka 0.10.0.0版本,所以一定要加上`--new-consumer`才能使用新版本consumer!

bin/kafka-console-consumer.sh --topic test --from-beginning --new-consumer --bootstrap-server localhost:9092,localhost:9093

此时,你应该可以看到producer和consumer都可以正常地工作。

----------------------------- 升级的流程正式开始 -----------------------------

切记: 每做完一步都要观察producer和consumer是否出现严重错误!

五、 更新broker间通讯版本号和消息格式版本
向所有broker的server.properties文件中增加下面两行:

inter.broker.protocol.version=0.10.0
log.message.format.version=0.10.0

六、依次更新代码,重启所有broker
注意一定要依次重启,即先重启broker1,然后再重启broker2

七、再次更新broker间通讯版本和消息格式版本

inter.broker.protocol.version=0.10.2
log.message.format.version=0.10.2

注意,这次要更新成你要升级到的目标版本。比如我们要升级到0.10.2,那么就更新为0.10.2

八、再次依次重启broker
依然要依次重启

好了,当前集群版本已经升级完毕了。值得一提的是,在整个升级过程中producer应该是可以正常工作的,但consumer可能会出现位移提交失败的警告,因此有可能会造成重复消费,而broker端可能会出现“org.apache.kafka.common.errors.NotLeaderForPartitionException”异常,因为__consumers_offsets各分区的leader有可能会发生瞬时的变化,因此通常也是不必在意的。

时间: 2024-07-28 13:58:49

Kafka版本升级 ( 0.10.0 -> 0.10.2 )的相关文章

Kafka 0.10.1.0 Cluster的搭建和Topic简单操作实验

[kafka cluster机器]:机器名称 用户名称sht-sgmhadoopdn-01/02/03 root [安装目录]: /root/learnproject/app 1.将scala文件夹同步到集群其他机器(scala 2.11版本,可单独下载解压) [[email protected] app]# scp -r scala [email protected]:/root/learnproject/app/ [[email protected] app]# scp -r scala [

day04_oracle版本升级--10.2.0.1.0升级到10.2.0.4.0

软件升级 1.oracle很少升级,除非你要用什么新的功能.因为升级要停库,好多和库相关的组件可能全要升级.升级前一定要开会研究很长时间. 2.一些大公司定期委托第三方公司做<系统安全防护评测>,扫描出一些漏洞时,会要求升级并打补丁[联通] 神州数码信息服务股份有限公司.绿盟科技 如下形式的漏洞: 漏洞详细信息 Oracle数据库Network Foundation组件远程拒绝服务漏洞 详细描述 本次扫描是通过版本进行的,可能发生误报. Oracle Network Foundation是 O

Kafka0.10.2.0分布式集群安装

一.依赖文件安装 1.1 JDK 参见博文:http://www.cnblogs.com/liugh/p/6623530.html 1.2 Scala 参见博文:http://www.cnblogs.com/liugh/p/6624491.html 1.3 Zookeeper 参见博文:http://www.cnblogs.com/liugh/p/6671460.html 二.文件准备 2.1 文件名称 kafka_2.11-0.10.2.0.tgz 2.2 下载地址 http://kafka.

【Oracle】RAC 10.2.0.1升级10.2.0.5

环境: OS:OEL5.6 RAC:10.2.0.1.0 相关环境变量: CRS_HOME /u01/app/oracle/product/10.2.0/db_1 ORACLE_HOME   /u01/app/oracle/product/10.2.0/db_2 crs_stop -all关闭所有资源 [[email protected] ~]$ crs_stat -t Name           Type           Target    State     Host --------

Oracle升级_oracle 10g版本由 10.2.0.1.0升级为10.2.0.4.0(即CPU升级)

CPU升级_ oracle 10g版本由 10.2.0.1.0升级为10.2.0.4.0 ***********************************************声明************************************************ 原创作品,出自 "深蓝的blog" 博客,欢迎转载,转载时请务必注明出处(http://blog.csdn.net/huangyanlong). 表述有错误之处,请您留言,不胜感激. 提醒:点击目录,更有

升级_开阔视野之Oracle图形化升级(dbca建库后升级)—10.2.0.1.0升为10.2.0.5.0

***********************************************声明***********************************************************************  原创作品,出自 "深蓝的blog" 博客,欢迎转载,转载时请务必注明出处,否则追究版权法律责任. 表述有错误之处,请您留言或邮件([email protected])指明,不胜感激. 本文转载必须保留此处:http://blog.csdn.net

Oracle 10.2.0.5 RMAN迁移并升级11.2.0.4一例

一.环境介绍 1. 源数据库环境 操作系统版本: OEL 5.4 x64数据库版本  : 10.2.0.5 x64数据库sid名 : orcl Oracle 10g 10.2.0.5(64bit)安装目录如下: 数据库软件:/u01/app/oracle/product/10.2.0/db_1数据库文件:/u01/app/oracle/oradata/orcl 归档目录:/u01/archivelog RMAN目录:/backup/dbbak/orabak 背景:一个老oracle10g数据库,

maven中引入oracle驱动报错Missing artifact com.oracle:ojdbc14:jar:10.2.0.4.0

问题:引入依赖之后会报错. 1.首先我们要去下载一个oracle的驱动jar包,ojdbc6.jar(我这里本机安装了oracle,所以在oracle安装目录 F:\app\zyh\product\11.1.0\db_1\jdbc\lib 目录下会有驱动的jar包,如果你没有安装oracle,那么可以从官网上下载jar包,反正就是必须要先有一个ojdbc6.jar,只要搞到就行了). 2.以我本机为例子,打开命令提示符(cmd),进入ojdbc6.jar所在目录,在我这里就进入F:\app\zy

核心CEI Ensight 10.2.0(c) Win32_64 & Linux32_64 & MacOSX 5DVD

CEI Ensight 10.2.0(c) Win32_64 & Linux32_64 & MacOSX 5DVD Ensight是Computational Engineering International (CEI) 公司的最核心工具,可用于工业.科研.产品设计等领域,处理CFD.FEA等领域的百万甚至上亿的结点单元,具有并行处理与渲染的优势,并支持VR系统环境以及实时协同等功能.QQ:16264558   电话TEL:13963782271 CEI提供几种形式的EnSight: 包

Oracle Linux 5.8安装Oracle 10.2.0.5 x64

一.正常方式全新安装 1. oracle安装环境准备 (1) 确认安装以下包 # rpm -q --qf '%{NAME}-%{VERSION}-%{RELEASE}  (%{ARCH})\n' binutils compat-libstdc++-33 compat-gcc-34-c++ elfutils-libelf elfutils-libelf-devel elfutils-libelf-devel-static gcc gcc-c++ glibc glibc-common glibc-d