kafka官方Quick Start

1、下载kafka,并上传到服务器

2、如果之前没安装zookeeper,这里可以启动一个简单的zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties &

3、配置kafka集群(多个broker)

cp config/server.properties config/server-1.properties

cp config/server.properties config/server-2.properties

config/server-1.properties:

broker.id=1

port=9093

log.dir=/tmp/kafka-logs-1

config/server-2.properties:

broker.id=2

port=9094

log.dir=/tmp/kafka-logs-2

3、启动kafka

bin/kafka-server-start.sh config/server.properties &

bin/kafka-server-start.sh config/server-1.properties &

bin/kafka-server-start.sh config/server-2.properties &

4、创建一个topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

5、通过此命令查看topic

bin/kafka-topics.sh --list --zookeeper localhost:2181

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

6、启动一个生产者程序,向test这个topic发布一些消息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic

7、启动一个消费者,从test这个topic读取所有消息

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic

8、停止kafka

bin/kafka-server-stop.sh

发布消息时出错,错误信息如下:

No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}

解决问题方法:http://blog.linuxphp.org/archives/1626/

就是把config/server.properties里的host.name=localhost

时间: 2024-12-18 15:35:28

kafka官方Quick Start的相关文章

Kafka官方文档翻译——设计

下面是博主的公众号,后续会发布和讨论一系列分布式消息队列相关的内容,欢迎关注. --------------------------------------------------------------------------------------------------------- Design 1. Motivation 我们设计Kafka用来作为统一的平台来处理大公司可能拥有的所有实时数据源.为了做到这点,我们必须思考大量的使用场景. 它必须有高吞吐去支持大数据流,例如实时日志聚合.

kafka 官方示例代码--消费者

kafka 0.9.0添加了一套新的Java 消费者API,用以替换之前的high-level API (基于ZK) 和low-level API.新的Java消费者API目前为测试版.另外kafka 0.9暂时还支持0.8的Client. 1.High Level Consumer(0.8) package com.test.groups; import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaStream; impo

kafka 客户端封装

kafka客户端封装源码. 1.为什么进行封装? kafka官方自带的客户端,需要对参数进行设置,如下代码,很多参数的key都是字符串,这样对于编程人员来说非常不友好.参数很多的时候,有两种处理方式:(1)传一个config类进去解析:(2)使用建造者模式,笔者在此就使用建造者模式,对官方客户端进行简单封装,使之易用. 官方的例子如下: 1 Properties props = new Properties(); 2 props.put("bootstrap.servers", &qu

Kafka是如何实现高吞吐率的

Kafka是如何实现高吞吐率的 原创 2016-02-27 杜亦舒 性能与架构 Kafka是分布式消息系统,需要处理海量的消息,Kafka的设计是把所有的消息都写入速度低容量大的硬盘,以此来换取更强的存储能力,但实际上,使用硬盘并没有带来过多的性能损失 kafka主要使用了以下几个方式实现了超高的吞吐率 顺序读写 kafka的消息是不断追加到文件中的,这个特性使kafka可以充分利用磁盘的顺序读写性能 顺序读写不需要硬盘磁头的寻道时间,只需很少的扇区旋转时间,所以速度远快于随机读写 Kafka官

kafka 0.8.1 新producer 源码简单分析

1 背景 最近由于项目需要,需要使用kafka的producer.但是对于c++,kafka官方并没有很好的支持. 在kafka官网上可以找到0.8.x的客户端.可以使用的客户端有C版本客户端,此客户端虽然目前看来还较为活跃,但是代码问题还是较多的,而且对于c++的支持并不是很好. 还有c++版本,虽然该客户端是按照c++的思路设计,但是最近更新时间为2013年12月19日,已经很久没有更新了. 从官方了解到,kafka作者对于现有的producer和consumer的设计是不太满意的.他们打算

Kafka集群安装与扩容

介绍略 集群安装: 一.准备工作: 1.版本介绍: 目前我们使用版本为kafka_2.9.2-0.8.1(scala-2.9.2为kafka官方推荐版本,此外还有2.8.2和2.10.2可以选择) 2.环境准备: 安装JDK6,目前使用版本为1.6,并配置JAVA_HOME 3.配置修改: 1)拷贝线上配置到本地kafka目录. 2)需要注意的是server.properties里broker和ip的指定,必须要唯一. 3)server.properties中log.dirs必须要手动指定.此配

1、Kafka学习分享-V1.0

Kafka学习分享 .1       什么是Kafka Apache Kafka是一个开源的流处理平台,由 Apache Software Foundation使用Scala and Java编写发展而来.Kafka?用于构建实时数据管道和流媒体应用. 它具有水平可扩展性,容错性,快速性,并在数千家公司生产中运行. 它的主要功能:数据流的发布和订阅.数据流的处理.数据流的存储.像一个消息系统一样发布和订阅数据流,有效且实时地处理数据流,在一个分布式备份的集群中安全地处理存储数据流. .2    

Kafka Java API操作topic

Kafka官方提供了两个脚本来管理topic,包括topic的增删改查.其中kafka-topics.sh负责topic的创建与删除:kafka-configs.sh脚本负责topic的修改和查询,但很多用户都更加倾向于使用程序API的方式对topic进行操作. 上一篇文章中提到了如何使用客户端协议(client protocol)来创建topic,本文则使用服务器端的Java API对topic进行增删改查.开始之前,需要明确的是,下面的代码需要引入kafka-core的依赖,以kafka 0

Kafka 0.9+Zookeeper3.4.6集群搭建、配置,新Client API的使用要点,高可用性测试,以及各种坑 (转载)

Kafka 0.9版本对java client的api做出了较大调整,本文主要总结了Kafka 0.9在集群搭建.高可用性.新API方面的相关过程和细节,以及本人在安装调试过程中踩出的各种坑. 关于Kafka的结构.功能.特点.适用场景等,网上到处都是,我就不再赘述了,直接进入正文 Kafka 0.9集群安装配置 操作系统:CentOS 6.5 1. 安装Java环境 Zookeeper和Kafka的运行都需要Java环境,所以先安装JRE,Kafka默认使用G1垃圾回收器,如果不更改垃圾回收器