Kafka安装与使用

Kafka安装与使用

下载地址:https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.1.1/kafka_2.10-0.8.1.1.tgz

安装以及启动kafka

步骤1:安装kafka

[email protected]lhost:~$ tar -xzf kafka_2.10-0.8.1.1.tgz
[email protected]:~$ cd kafka_2.10-0.8.1.1.tgz

步骤2:配置server.properties

配置zookeeper(假设您已经安装了zookeeper,如果没有安装,请再网上搜索安装方法)

进入kafka安装工程根目录编辑 vim config/server.properties

修改属性zookeeper.connect=ip:2181,ip2: 2181

步骤3:server.properties配置说明

kafka最为重要三个配置依次为:broker.id、log.dir、zookeeper.connect

kafka server端config/server.properties参数说明和解释如下:

(参考配置说明地址:http://blog.csdn.net/lizhitao/article/details/25667831)

#实际使用案例 这里211上面的kafka 配置文件

broker.id=1
port=9092
host.name=192.168.1.211
num.network.threads=2
num.io.threads=8
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=2
log.retention.hours=168
log.segment.bytes=536870912
log.retention.check.interval.ms=60000
log.cleaner.enable=false
zookeeper.connect=192.168.1.213:2181,192.168.1.216:2181,192.168.1.217:2181
zookeeper.connection.timeout.ms=1000000
#kafka实际使用案例 210服务器kafka配置
broker.id=2
port=9092
host.name=192.168.1.210
num.network.threads=2
num.io.threads=8
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=2
log.retention.hours=168
log.segment.bytes=536870912
log.retention.check.interval.ms=60000
log.cleaner.enable=false
zookeeper.connect=192.168.1.213:2181,192.168.1.216:2181,192.168.1.217:2181
zookeeper.connection.timeout.ms=1000000

步骤4: 启动kafka

(先启动zookeeper $:  bin/zkServer.sh start config/zookeeper.properties &)

cd kafka-0.8.1

[email protected]:~$ bin/kafka-server-start.sh -daemon config/server.properties &

(实验时,需要启动至少两个broker   bin/kafka-server-start.sh -daemon config/server-1.properties &)

步骤5:创建topic

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

步骤6:验证topic是否创建成功

[email protected]:~$ bin/kafka-topics.sh --list --zookeeper localhost:2181

localhostzookeeper地址

 

topic描述:

bin/kafka-topics.sh --describe --zookeeper 192.168.1.8:2181 --topic test

步骤7:发送消息

发送一些消息验证,在console模式下,启动producer

bin/kafka-console-producer.sh --broker-list 192.168.1.9:9092 --topic zjcTest

(此处localhost改为本机ip,否则报错,I don’t  know why

消息:

{"price":"100000","userId":14615501351480021,"payType":3,"code":"AFD3B8","payTime":{"time":1457330791333,"minutes":6,"seconds":31,"hours":14,"month":2,"year":116,"timezoneOffset":-480,"day":1,"date":7},"orderId":12222096,"goodsName":"高中半年会员"}

步骤8:启动一个consumer

[email protected]:~$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

删除topic,慎用,只会删除zookeeper中的元数据,消息文件须手动删除

bin/kafka-run-class.sh kafka.admin.DeleteTopicCommand --topic test666 --zookeeper 192.168.197.170:2181 ,192.168.197.171:2181

配置kafka集群模式,需要由多个broker组成

和单机环境一样,只是需要修改下broker 的配置文件而已。

1、将单机版的kafka 目录复制到其他几台电脑上。

2、修改每台电脑上的kafka 目录下的server.properties 文件。

broker.id=1//这个参数在kafka 的broker 集群中必须唯一,且为正整数。

3、启动每台电脑上的kafka 即可。

本机配置伪分布式

首先为每个节点编写配置文件:

> cp config/server.properties config/server-1.properties

> cp config/server.properties config/server-2.properties

在拷贝出的新文件中添加以下参数:

config/server-1.properties:

broker.id=1

port=9093

log.dir=/tmp/kafka-logs-1

config/server-2.properties:

broker.id=2

port=9094

log.dir=/tmp/kafka-logs-2

现在启动另外两个节点:

> bin/kafka-server-start.sh config/server-1.properties &

...

> bin/kafka-server-start.sh config/server-2.properties &

...

创建一个拥有3个副本的topic:

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

运行“"describe topics”命令知道每个节点的信息

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:

Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0

leader:负责处理消息的读和写,leader是从所有节点中随机选择的.

replicas:列出了所有的副本节点,不管节点是否在服务中.

isr:是正在服务中的节点.

搭建Kafka开发环境

1 在pom.xml中引入kafka依赖jar包

<!-- kafka配置 -->

<dependency>

<groupId>org.apache.kafka</groupId>

<artifactId>kafka_2.9.2</artifactId>

<version>${kafka.version}</version>

<exclusions>

<!-- 实际应用中单独引入下面的jar包,不使用kafka带的 -->

<exclusion>

<artifactId>zookeeper</artifactId>

<groupId>org.apache.zookeeper</groupId>

</exclusion>

<exclusion>

<artifactId>zkclient</artifactId>

<groupId>com.101tec</groupId>

</exclusion>

<exclusion>

<artifactId>slf4j-api</artifactId>

<groupId>org.slf4j</groupId>

</exclusion>

</exclusions>

</dependency>

2.属性文件 kafka.properties

#zookeeper.connect=192.168.1.8:2181,192.168.1.13:2181,192.168.1.16:2181

#zookeeper.connect=zkserver1.vko.cn:2181,zkserver2.vko.cn:2181,zkserver3.vko.cn:2181

zookeeper.connect=192.168.1.179:2181

metadata.broker.list=192.168.1.179:9092

#metadata.broker.list=kafka.server1.vko.cn:9092,kafka.server2.vko.cn:9092

 

#zookeeper.connect.timeout=15000

#zookeeper.session.timeout.ms=15000

#zookeeper.sync.time.ms=20000

#auto.commit.interval.ms=20000

#auto.offset.reset=smallest

#serializer.class=kafka.serializer.StringEncoder

#producer.type=async

#queue.buffering.max.ms=6000

 

group.id=llx

kafka.sellstat.topics=llx

在spring配置文件中引入此properties文件

<!-- 这个是加载给spring 用的.-->  

<bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">

<property name="locations">

<list>

<value>classpath:kafka.properties</value>

</list>

</property>

</bean>

<!-- 这个是用来在代码中注入用的.-->  

<bean id="configProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean">

<property name="locations">

<list>

<value>classpath:kafka.properties</value>

</list>

</property>

</bean>

3.定义收信人

<!-- 定义收信人 receiver -->

<bean id="testReceiver" class="cn.vko.index.Receiver">

 

<constructor-arg index="0" value="${zookeeper.connect}" />

 

<constructor-arg index="1" value="${group.id}" />

 

<constructor-arg index="2" value="${kafka.sellstat.topics}"/>

 

<constructor-arg index="3" ref="testConsumer" />

</bean>

4. spring中定义一个消息处理器(需要实现vkoConsumer)

<!-- 定义消息处理器 -->

<bean id="testConsumer" class="cn.vko.index.TestConsumer" ></bean>

5代码实现

package cn.vko.index;

 

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.stereotype.Controller;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.ResponseBody;

 

import cn.vko.common.base.JsonMsg;

/**

* 测试kafka发送消息

* @author lilixin

*

*/

@Controller

public class TestProducer {

private Logger logger = LoggerFactory.getLogger(this.getClass());

@Value("#{configProperties[‘metadata.broker.list‘]}")

private String metadataBrokerList;

@Value("#{configProperties[‘kafka.sellstat.topics‘]}")

private String topic;

 

@ResponseBody

@RequestMapping("send")

public JsonMsg send(String msg){

logger.info("发送开始-------------------------");

VkoProducer vkoProducer =new VkoProducer(metadataBrokerList);

logger.info("连接完成-------------------------");

vkoProducer.send(topic, msg);

logger.info("发送完成-------------------------");

return new JsonMsg();

}

}

package cn.vko.index;

 

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

 

import org.springframework.stereotype.Service;

/**

* 测试kafka接收消息

* @author llx

*

*/

@Service

public class TestConsumer implements VkoConsumer{

 

private Logger logger = LoggerFactory.getLogger(this.getClass());

@Override

public void dealMsg(String msg) {

logger.info("--------kafka接收消息开始---------");

logger.info(msg);

logger.info("--------kafka接收消息结束 ---------");

}

 

}

package cn.vko.index;

 

import java.util.Properties;

 

import org.apache.commons.lang3.StringUtils;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

 

import cn.vko.common.utils.mybatis.GenCreateInterceptor;

 

import kafka.javaapi.producer.Producer;

import kafka.producer.KeyedMessage;

import kafka.producer.ProducerConfig;

 

public class VkoProducer {

private Logger log = LoggerFactory.getLogger(VkoProducer.class);

private String metadataBrokerList;

private Producer<String, String> producer;

 

public VkoProducer(String metadataBrokerList) {

super();

if(StringUtils.isEmpty(metadataBrokerList)){

String message = "metadataBrokerList 不可以为空";

// log.error(message);

throw new RuntimeException(message);

}

this.metadataBrokerList = metadataBrokerList;

// 设置配置属性

Properties props = new Properties();

props.put("metadata.broker.list", metadataBrokerList);

props.put("serializer.class", "kafka.serializer.StringEncoder");

props.put("key.serializer.class", "kafka.serializer.StringEncoder");

props.put("request.required.acks", "1");

//props.put("producer.type", "async");

props.put("queue.buffering.max.ms", "5000");

props.put("queue.buffering.max.messages", "30000");

props.put("queue.enqueue.timeout.ms", "-1");

props.put("batch.num.messages", "1");

// 可选配置,如果不配置,则使用默认的partitioner

//props.put("partitioner.class", "cn.vko.kafka.PartitionerDemo");

// 触发acknowledgement机制,否则是fire and forget,可能会引起数据丢失

// 值为0,1,-1,可以参考

// http://kafka.apache.org/08/configuration.html

ProducerConfig config = new ProducerConfig(props);

producer = new Producer<String, String>(config);

}

 

/**

* 单条插入队列

* @param msg

* @param topic 主题

* @return

*/

public String send(String topic, String msg) {

log.info("向topic : "+topic + " 发送消息 ="+msg);

// Long start = System.currentTimeMillis();

KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, msg);

producer.send(data);

// log.info("发送消息耗时:{}",System.currentTimeMillis()- start);

return "ok";

}

}

package cn.vko.index;

 

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.Properties;

 

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

 

import cn.vko.common.utils.mybatis.GenCreateInterceptor;

import cn.vko.component.pageframework.util.StringUtil;

 

import kafka.consumer.Consumer;

import kafka.consumer.ConsumerConfig;

import kafka.consumer.ConsumerIterator;

import kafka.consumer.KafkaStream;

import kafka.javaapi.consumer.ConsumerConnector;

 

public class Receiver {

private Logger log = LoggerFactory.getLogger(Receiver.class);

private String zookeeperConnect;

private String groupId;

private String topic;

private VkoConsumer vkoConsumer;

/**

* 创建收件人

* @param zookeeperConnect zk集群地址,逗号分隔

* @param groupId 组id

* @param topic 主题

* @param vkoConsumer 处理器

*/

public Receiver(String zookeeperConnect, String groupId, String topic,VkoConsumer vkoConsumer) {

super();

if(StringUtil.isEmpty(zookeeperConnect)){

String message = "zookeeperConnect 不可以为空";

log.error(message);

throw new RuntimeException(message);

}

if(StringUtil.isEmpty(groupId)){

String message = "groupId 不可以为空";

log.error(message);

throw new RuntimeException(message);

}

if(StringUtil.isEmpty(topic)){

String message = "topic 不可以为空";

log.error(message);

throw new RuntimeException(message);

}

if(vkoConsumer == null){

String message = "vkoConsumer 不可以为空";

log.error(message);

throw new RuntimeException(message);

}

this.zookeeperConnect = zookeeperConnect;

this.groupId = groupId;

this.topic = topic;

this.vkoConsumer = vkoConsumer;

log.info("kafka vkoConsumer 创建完成:groupId:{},topic:{},zookeeperConnect:{}",groupId, topic, zookeeperConnect);

receive();

}

 

private void receive(){

Properties props = new Properties();

props.put("zookeeper.connect", zookeeperConnect);

props.put("group.id", groupId);

props.put("zookeeper.session.timeout.ms", "14000");

props.put("zookeeper.sync.time.ms", "200");

props.put("auto.commit.interval.ms", "1000");

ConsumerConfig conf = new ConsumerConfig(props);

ConsumerConnector cc = Consumer.createJavaConsumerConnector(conf);

Map<String, Integer> topicCountMap = new HashMap<String, Integer>();

// 目前每个topic都是2个分区

topicCountMap.put(topic,2);

Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = cc.createMessageStreams(topicCountMap);

List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

for (final KafkaStream<byte[], byte[]> stream : streams) {

new Thread(){

public void run(){

ConsumerIterator<byte[], byte[]> it = stream.iterator();

while(it.hasNext()){

String msg = new String(it.next().message());

try{

vkoConsumer.dealMsg(msg);

}catch(Exception e){

log.error("kafka vkoConsumer topic:{} 收到消息:{} 消费异常 xxxxxxxxxxxxxxxxxx", topic, msg,e);

}

log.info("kafka vkoConsumer topic:{} 收到消息:{}", topic, msg);

}

}

}.start();

log.info("kafka vkoConsumer 启动完成:groupId:{},topic:{},zookeeperConnect:{}",groupId, topic, zookeeperConnect);

}

log.info("kafka vkoConsumer 准备接收消息:groupId:{},topic:{},zookeeperConnect:{}",groupId, topic, zookeeperConnect);

}

}

package cn.vko.index;

 

public interface VkoConsumer {

public void dealMsg(String strings);

}






//实际项目中写的consumer

@Service

public class SellStatConsumer implements VkoConsumer{

private Logger logger = LoggerFactory.getLogger(this.getClass());

@Autowired

private ISellDetailService sellDetailService;

@Autowired

private ISellerService sellerService;

@Autowired(required=false)

private IIpServiceRemote ipServiceRemote;

@Autowired(required=false)

private IPhoneCityServiceRemote phoneCityServiceRemote;

@Override

public void dealMsg(String rowData) {

if (!new JsonValidator().validate(rowData)) {

logger.error("json error ...... : {}", rowData);

return;

}

logger.info("========start kafka consumer=============="+rowData);

JSONObject json = JSONObject.fromObject(rowData);

PayInfoForm form = (PayInfoForm)JSONObject.toBean(json, PayInfoForm.class);

//do something

}

}

封装了三个类

时间: 2024-08-01 00:31:14

Kafka安装与使用的相关文章

Kafka安装及部署

阅读目录 一.环境配置 二.操作过程 Kafka介绍 安装及部署 回到顶部 一.环境配置 操作系统:Cent OS 7 Kafka版本:0.9.0.0 Kafka官网下载:请点击 JDK版本:1.7.0_51 SSH Secure Shell版本:XShell 5 回到顶部 二.操作过程 1.下载Kafka并解压 下载: curl -L -O http://mirrors.cnnic.cn/apache/kafka/0.9.0.0/kafka_2.10-0.9.0.0.tgz 解压: tar z

kafka 安装步骤

kafka安装文档 1.解压缩(官网下载:http://kafka.apache.org/downloads.html) tar -xzf kafka_2.10-0.8.2.0.tgz cd kafka_2.10-0.8.2.0 2.启动server服务(包括zookeeper服务.kafka服务) bin/zookeeper-server-start.sh  config/zookeeper.properties & (&表示在后台执行) bin/kafka-server-start.s

KAFKA安装+配置详解+常用操作+监控

一.kafka理论 1.kafka是神马? kafka是分布式发布-订阅消息系统.它最初由LinkedIn公司开发,之后成为Apache项目的一部分.Kafka是一个分布式的,可划分的,冗余备份的持久性的日志服务.它主要用于处理活跃的流式数据.分布式系统,易于向外扩展.所有的producer.broker和consumer都会有多个,均为分布式的.无需停机即可扩展机器. 2.kafka的设计方案 消息持久化及其缓存 磁盘性能:在传统的磁盘写入很慢,因为它使用随机写入 50k/s(6个7200转的

kafka安装和使用

kafka安装和启动 kafka的背景知识已经讲了很多了,让我们现在开始实践吧,假设你现在没有Kafka和ZooKeeper环境. Step 1: 下载代码 下载0.10.0.0版本并且解压它. > tar -xzf kafka_2.11-0.10.0.0.tgz > cd kafka_2.11-0.10.0.0 Step 2: 启动服务 运行kafka需要使用Zookeeper,所以你需要先启动Zookeeper,如果你没有Zookeeper,你可以使用kafka自带打包和配置好的Zooke

zookeeper与kafka安装部署及java环境搭建

1. ZooKeeper安装部署 本文在一台机器上模拟3个zk server的集群安装. 1.1. 创建目录.解压 cd /usr/ #创建项目目录 mkdir zookeeper cd zookeeper mkdir tmp mkdir zookeeper-1 mkdir zookeeper-2 mkdir zookeeper-3 cd tmp mkdir zk1 mkdir zk2 mkdir zk3 cd zk1 mkdir data mkdir log cd zk2 mkdir data

kafka安装步骤

kafka 安装内存会报不够 https://stackoverflow.com/questions/9350437/incompatible-initial-and-maximum-heap-sizes-specified Picked up _JAVA_OPTIONS: -Xmx1024M OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00000000b5a00000, 1073741824, 0) failed; e

kafka安装和使用远程代码进行访问 ---附踩坑记录

kafka安装和使用java连接远程服务器进行消息的生成与消费 首先要使用kafka,要有jdk和zookeeper的环境 本文在阿里云的centos7环境上进行 jdk版本选择的是1.8.0_181 zookeeper的版本是3.4.12 kafka的版本是2.12-1.1.1 关于kafka命令的介绍 本文不介绍了 只介绍怎么搭建一个kafka单点服务器 以及怎么使用代码 远程连接kafka服务器 下载地址 kafka下载地址 :http://kafka.apache.org/downloa

kafka安装部署

kafka安装教程 1.下载 下载地址:http://kafka.apache.org/downloads 以键头所指版本为例 2.安装 下载的版本已经编译,直接解压到想要的目录就算安装好了 tar -zxf kafka_2.11-0.11.0.1.tgz -C /usr/myapp 3.配置单节点 3.1 配置单节点zookeeper 我们使用kafka自带的zookeeper cd /usr/myapp/kafka_2.11 #进入kafka主目录 mkdir -p zk/data #创建z

Kafka安装和常用操作命令

Kafka安装: 下载kafka_2.10-0.8.2.1 1.关闭防火墙 2.修改配置文件  server.properties broker.id=1log.dirs= /usr/kafka_2.10-0.8.2.1/data  //最后不要写logzookeeper.connect=master:2181,slave01:2181,slave02:2181delete.topic.enable = true //删除话题的时候需要设置其为truenum.partitions=3//建议默认