kafka producer实例及原理分析

1.前言

首先,描述下应用场景:


假设,公司有一款游戏,需要做行为统计分析,数据的源头来自日志,由于用户行为非常多,导致日志量非常大。将日志数据插入数据库然后再进行分析,已经满足不了。最好的办法是存日志,然后通过对日志的分析,计算出有用的数据。我们采用kafka这种分布式日志系统来实现这一过程。

步骤如下:

  • 搭建KAFKA系统运行环境

如果你还没有搭建起来,可以参考我的博客:

http://zhangfengzhe.blog.51cto.com/8855103/1556650

  • 设计数据存储格式
  • Producer端获取数据,并对数据按上述设计的格式进行编码

  • Producer将已经编码的数据发送到broker上,在broker上进行存储
  • Consumer端从broker中获取数据,分析计算。

2.实现过程

为了快速实现,我们简化日志消息格式。

在eclipse新建JAVA PROJECT,将kafka/libs下*.jar配置到项目build path即可。

Step 1 : 简单的POJO对象(MobileGameLog)

private String actionType;
private String appKey;
private String guid;
private String time;

说明:

actionType 代表行为类型

appKey     代表游戏ID

guid       代表角色

time       代表时间

提供getter/setter方法,并override toString()

Step 2 : 提供serializer

需要注意的是,POJO对象需要序列化转化成KAFKA识别的消息存储格式--byte[]

public class MobileGameKafkaMessage implements kafka.serializer.Encoder<MobileGameLog>{
@Override
public byte[] toBytes(MobileGameLog mobileGameLog) {
return mobileGameLog.toString().getBytes();
}
public MobileGameKafkaMessage(VerifiableProperties props){
}
}

Step 3 : 提供Partitioner

我们可以提供Partitioner,这样可以使得数据按照我们的策略来存储在brokers中。

这里,我根据appKey来进行分区。

Step 4 : 提供Producer

  • 提供配置

  • 运行kafka环境

启动zookeeper:

[[email protected] kafka_2.9.2-0.8.1.1]# bin/zookeeper-server-start.sh  
config/zookeeper.properties &

启动kafka broker(id=0):

[[email protected] kafka_2.9.2-0.8.1.1]# bin/kafka-server-start.sh 
config/server.properties &

启动kafka broker(id=1)

[[email protected] kafka_2.9.2-0.8.1.1]# bin/kafka-server-start.sh  
config/server-1.properties &

上述过程,在我的博客【搭建kafka运行环境】里面都有详细记录,大家可以参考。

创建一个topic:

[[email protected] kafka_2.9.2-0.8.1.1]# bin/kafka-topics.sh --zookeeper localhost:2181 
--create --topic log_1 --replication-factor 2 --partitions 3

注意topic:log_1有3个分区,2个复制。

  • 制造数据并发送
// Producer<key , value>
// V: type of the message
// K: type of the optional key associated with the message
kafka.javaapi.producer.Producer<MobileGameLog, MobileGameLog> producer 
= new Producer<MobileGameLog, MobileGameLog>(
config);

List<KeyedMessage<MobileGameLog, MobileGameLog>> list 
= new ArrayList<KeyedMessage<MobileGameLog, MobileGameLog>>();

// 5条tlbb数据
for (int i = 1; i <= 5; i++) {
MobileGameLog log = new MobileGameLog();
log.setActionType("YuanBaoShop");
log.setAppKey("tlbb");
log.setGuid("xxx_" + i);
log.setTime("2014-10-01 10:00:20");
KeyedMessage<MobileGameLog, MobileGameLog> keyedMessage 
= new KeyedMessage<MobileGameLog, MobileGameLog>(
"log_1", log, log);
list.add(keyedMessage);
}

// 8条ldj数据
for (int i = 1; i <= 8; i++) {
MobileGameLog log = new MobileGameLog();
log.setActionType("BlackMarket");
log.setAppKey("ldj");
log.setGuid("yyy_" + i);
log.setTime("2014-10-02 10:00:20");
KeyedMessage<MobileGameLog, MobileGameLog> keyedMessage 
= new KeyedMessage<MobileGameLog, MobileGameLog>(
"log_1", log, log);
list.add(keyedMessage);
}

producer.send(list);
producer.close();

说明:

a.producer既可以send 一个keyedMessage,可以是一个keyedMessage list.

b.注意producer实例化时的泛型。value是消息对象,即POJO,key是这个pojo的标示,这个是要用来进行分区的。

c.producer向broker发送的是KeyedMessage,注意实例化时的泛型,KEY/VALUE的意义同b.

d.KeyedMessage需要指明topic name.

  • eclipse 运行结果如下:

-------start info

运行至MobileGameKafkaPartition

VerifiableProperties : {metadata.broker.list=192.168.152.2:9092,192.168.152.2:9093,

zk.connectiontimeout.ms=6000, request.required.acks=1,

partitioner.class=com.sohu.game.kafka.day2.MobileGameKafkaPartition,

serializer.class=com.sohu.game.kafka.day2.MobileGameKafkaMessage}

-------end info

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".

SLF4J: Defaulting to no-operation (NOP) logger implementation

SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

-------start info

运行至MobileGameKafkaPartition的partition方法,分区大小为:3

分区key : tlbb

存储的分区为:0

-------end info

-------start info

运行至MobileGameKafkaPartition的partition方法,分区大小为:3

分区key : tlbb

存储的分区为:0

-------end info

-------start info

运行至MobileGameKafkaPartition的partition方法,分区大小为:3

分区key : tlbb

存储的分区为:0

-------end info

-------start info

运行至MobileGameKafkaPartition的partition方法,分区大小为:3

分区key : tlbb

存储的分区为:0

-------end info

-------start info

运行至MobileGameKafkaPartition的partition方法,分区大小为:3

分区key : tlbb

存储的分区为:0

-------end info

-------start info

运行至MobileGameKafkaPartition的partition方法,分区大小为:3

分区key : ldj

存储的分区为:2

-------end info

-------start info

运行至MobileGameKafkaPartition的partition方法,分区大小为:3

分区key : ldj

存储的分区为:2

-------end info

-------start info

运行至MobileGameKafkaPartition的partition方法,分区大小为:3

分区key : ldj

存储的分区为:2

-------end info

-------start info

运行至MobileGameKafkaPartition的partition方法,分区大小为:3

分区key : ldj

存储的分区为:2

-------end info

-------start info

运行至MobileGameKafkaPartition的partition方法,分区大小为:3

分区key : ldj

存储的分区为:2

-------end info

-------start info

运行至MobileGameKafkaPartition的partition方法,分区大小为:3

分区key : ldj

存储的分区为:2

-------end info

-------start info

运行至MobileGameKafkaPartition的partition方法,分区大小为:3

分区key : ldj

存储的分区为:2

-------end info

-------start info

运行至MobileGameKafkaPartition的partition方法,分区大小为:3

分区key : ldj

存储的分区为:2

-------end info

  • kafka consumer console 结果如下:

3.原理分析

查看topic:log_1详细信息:

[[email protected] kafka_2.9.2-0.8.1.1]# bin/kafka-topics.sh --zookeeper localhost:2181 
--describe --topic log_1
Topic: log_1 PartitionCount:3 ReplicationFactor:2 Configs:
Topic: log_1 Partition: 0 Leader: 0 Replicas: 1,0 Isr: 0,1
Topic: log_1 Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: log_1 Partition: 2 Leader: 0 Replicas: 1,0 Isr: 0,1

log_1有2个broker进行储存,每一个broker上有3个分区,并且每一个分区的leader都是broker(id=0)

查看broker(id=0)上的信息:

[[email protected] tmp]# ll
total 52
drwxr-xr-x  2 root root 4096 Oct  7 01:23 hsperfdata_root
drwxr-xr-x 10 root root 4096 Oct  7 02:40 kafka-logs
drwxr-xr-x  8 root root 4096 Oct  7 02:40 kafka-logs-1
srwxr-xr-x  1 root root    0 Sep 20 18:15 mapping-root
drwxrwxrwt  2 root root 4096 Oct  6 00:34 VMwareDnD
drwx------  2 root root 4096 Oct  6 18:05 vmware-root
drwxr-xr-x  3 root root 4096 Sep 20 19:58 zookeeper
[[email protected] tmp]# 
[[email protected] tmp]# 
[[email protected] tmp]# 
[[email protected] tmp]# cd kafka-logs
[[email protected] kafka-logs]# pwd
/tmp/kafka-logs
[[email protected] kafka-logs]# ll
total 80
drwxr-xr-x 2 root root 4096 Oct  7 01:02 log_1-0
drwxr-xr-x 2 root root 4096 Oct  7 01:02 log_1-1
drwxr-xr-x 2 root root 4096 Oct  7 01:02 log_1-2
drwxr-xr-x 2 root root 4096 Oct  6 01:01 my_first_topic-0
-rw-r--r-- 1 root root  100 Oct  7 02:40 recovery-point-offset-checkpoint
-rw-r--r-- 1 root root  100 Oct  7 02:40 replication-offset-checkpoint
drwxr-xr-x 2 root root 4096 Oct  6 01:01 test-0
drwxr-xr-x 2 root root 4096 Oct  6 01:01 topic_1-0
drwxr-xr-x 2 root root 4096 Sep 21 00:21 topic_2-0
drwxr-xr-x 2 root root 4096 Sep 21 00:22 topic_3-0
[[email protected] kafka-logs]# cd log_1-0/
[[email protected] log_1-0]# ll
total 12
-rw-r--r-- 1 root root 10485760 Oct  7 01:16 00000000000000000000.index
-rw-r--r-- 1 root root     1020 Oct  7 01:18 00000000000000000000.log
[[email protected] log_1-0]# cat -A 00000000000000000000.log 
^@^@^@^@^@^@^@^@^@^@^@[email protected]^L2M-V^@^@^@^@^@YMobileGameLog [actionType=YuanBaoShop, appKey=tlbb, guid=xxx_1, time=2014-10-01 10:00:20]^@^@^@YMobileGameLog [actionType=YuanBaoShop, appKey=tlbb, guid=xxx_1, time=2014-10-01 10:00:20]^@^@^@^@^@^@^@^A^@^@^@[email protected]^^M-46M-h^@^@^@^@^@YMobileGameLog [actionType=YuanBaoShop, appKey=tlbb, guid=xxx_2, time=2014-10-01 10:00:20]^@^@^@YMobileGameLog [actionType=YuanBaoShop, appKey=tlbb, guid=xxx_2, 
time=2014-10-01 10:00:20]^@^@^@^@^@^@^@^B^@^@^@[email protected]=^@^@^@^@^@YMobileGameLog [actionType=YuanBaoShop, appKey=tlbb, guid=xxx_3, time=2014-10-01 10:00:20]^@^@^@YMobileGameLog [actionType=YuanBaoShop, appKey=tlbb, guid=xxx_3, 
time=2014-10-01 10:00:20]^@^@^@^@^@^@^@^C^@^@^@[email protected]^\M-58M-U^@^@^@^@^@YMobileGameLog [actionType=YuanBaoShop, appKey=tlbb, guid=xxx_4, time=2014-10-01 10:00:20]^@^@^@YMobileGameLog [actionType=YuanBaoShop, appKey=tlbb, guid=xxx_4, 
time=2014-10-01 10:00:20]^@^@^@^@^@^@^@^D^@^@^@[email protected]^@^@^@^@^@^@YMobileGameLog [actionType=YuanBaoShop, appKey=tlbb, guid=xxx_5, time=2014-10-01 10:00:20]^@^@^@YMobileGameLog [actionType=YuanBaoShop, appKey=tlbb, guid=xxx_5, 
time=2014-10-01 10:00:20][[email protected] log_1-0]#

注意kafka broker(id=0)的日志信息显示:

有log_1-0,log_1-1,log_1-2三个目录,对应于0,1,2三个分区。

说明,topic在broker上是以partition为单位进行储存的。

上面的0分区的日志信息显示,tlbb的5条数据都被储存了2遍,并且可以发现在分区内,都是有序的。

我们在创建log_1时指定复制2份,所以数据在分区内被储存了2遍。

同理,我们继续分析broker(id=0)上的1,2分区的内容,有:

分区1无数据,分区2上8条ldj的数据被储存了2遍。

由于我们只制造了2种appkey的数据,根据分区函数,只会返回2个partition number,所以导致有一个分区没有数据。

同上的,继续分析broker(id=1)上的0,1,2分区的内容,有:

分区0,tlbb的5条数据被储存2遍

分区1,没有数据

分区2,ldj的8条数据被储存2遍

可见,broker(id=0),broker(id=1)他们的分区数据完全一致,这也就是为什么kafka的高可用性,某些broker挂了,其他的broker还可以继续提供服务和数据。

时间: 2024-10-13 06:45:14

kafka producer实例及原理分析的相关文章

大数据技术之_10_Kafka学习_Kafka概述+Kafka集群部署+Kafka工作流程分析+Kafka API实战+Kafka Producer拦截器+Kafka Streams

第1章 Kafka概述1.1 消息队列1.2 为什么需要消息队列1.3 什么是Kafka1.4 Kafka架构第2章 Kafka集群部署2.1 环境准备2.1.1 集群规划2.1.2 jar包下载2.2 Kafka集群部署2.3 Kafka命令行操作第3章 Kafka工作流程分析3.1 Kafka 生产过程分析3.1.1 写入方式3.1.2 分区(Partition)3.1.3 副本(Replication)3.1.4 写入流程3.2 Broker 保存消息3.2.1 存储方式3.2.2 存储策

【转】Kafka producer原理 (Scala版同步producer)

转载自:http://www.cnblogs.com/huxi2b/p/4583249.html     供参考 本文分析的Kafka代码为kafka-0.8.2.1.另外,由于Kafka目前提供了两套Producer代码,一套是Scala版的旧版本:一套是Java版的新版本.虽然Kafka社区极力推荐大家使用Java版本的producer,但目前很多已有的程序还是调用了Scala版的API.今天我们就分析一下旧版producer的代码. producer还分为同步和异步模式,由属性produc

【原创】Kafka producer原理 (Scala版同步producer)

本文分析的Kafka代码为kafka-0.8.2.1.另外,由于Kafka目前提供了两套Producer代码,一套是Scala版的旧版本:一套是Java版的新版本.虽然Kafka社区极力推荐大家使用Java版本的producer,但目前很多已有的程序还是调用了Scala版的API.今天我们就分析一下旧版producer的代码. producer还分为同步和异步模式,由属性producer.type指定,默认是sync,即同步发送模式.本文也主要关注于同步发送的代码走读.下面以console-pr

关于高并发下kafka producer send异步发送耗时问题的分析

最近开发网关服务的过程当中,需要用到kafka转发消息与保存日志,在进行压测的过程中由于是多线程并发操作kafka producer 进行异步send,发现send耗时有时会达到几十毫秒的阻塞,很大程度上上影响了并发的性能,而在后续的测试中发现单线程发送反而比多线程发送效率高出几倍.所以就对kafka API send 的源码进行了一下跟踪和分析,在此总结记录一下. 首先看springboot下 kafka producer 的使用 在config中进行配置,向IOC容器中注入DefaultKa

Adaboost算法原理分析和实例+代码(简明易懂)

Adaboost算法原理分析和实例+代码(简明易懂) [尊重原创,转载请注明出处] http://blog.csdn.net/guyuealian/article/details/70995333     本人最初了解AdaBoost算法着实是花了几天时间,才明白他的基本原理.也许是自己能力有限吧,很多资料也是看得懵懵懂懂.网上找了一下关于Adaboost算法原理分析,大都是你复制我,我摘抄你,反正我也搞不清谁是原创.有些资料给出的Adaboost实例,要么是没有代码,要么省略很多步骤,让初学者

Kafka Producer相关代码分析

Kafka Producer相关代码分析 标签(空格分隔): kafka Kafka Producer将用户的消息发送到Kafka集群(准确讲是发送到Broker).本文将分析Producer相关的代码实现. 类kafka.producer.Producer 如果你自己实现Kafka客户端来发送消息的话,你就是用到这个类提供的接口来发送消息.(如果你对如何利用Producer API来发送消息还不是很熟悉的话,可以参看官方的例子).这个类提供了同步和异步两种方式来发送消息. 异步发送消息是基于同

kafka 0.8.1 新producer 源码简单分析

1 背景 最近由于项目需要,需要使用kafka的producer.但是对于c++,kafka官方并没有很好的支持. 在kafka官网上可以找到0.8.x的客户端.可以使用的客户端有C版本客户端,此客户端虽然目前看来还较为活跃,但是代码问题还是较多的,而且对于c++的支持并不是很好. 还有c++版本,虽然该客户端是按照c++的思路设计,但是最近更新时间为2013年12月19日,已经很久没有更新了. 从官方了解到,kafka作者对于现有的producer和consumer的设计是不太满意的.他们打算

MyBatis的深入原理分析之1-架构设计以及实例分析

MyBatis是目前非常流行的ORM框架,它的功能很强大,然而其实现却比较简单.优雅.本文主要讲述MyBatis的架构设计思路,并且讨论MyBatis的几个核心部件,然后结合一个select查询实例,深入代码,来探究MyBatis的实现. 一.MyBatis的框架设计        注:上图很大程度上参考了iteye 上的chenjc_it所写的博文原理分析之二:框架整体设计 中的MyBatis架构体图,chenjc_it总结的非常好,赞一个! 1.接口层---和数据库交互的方式 MyBatis

从安全攻击实例看数据库安全(三)数据库攻击原理分析

摘要:本文将通过对SQL注入攻击技术和数据库加密技术原理以及防护效果进行深入的分析,来辨析数据库安全技术误区"数据库加密能解决SQL注入",同时本文也给出了SQL注入的防护方法. 1. 数据库安全误区 针对2015年4月互联网大规模报道的全国30省市社保等行业用户信息泄露事件,安华金和对乌云历史报道的社保行业相关漏洞进行集中分析,得出的结论为:大量的信息泄露主要是由于软件中存在的SQL注入漏洞被黑客利用引起的,我们可以把SQL注入比作黑客攻击数据库"锋利的矛".