1.前言
首先,描述下应用场景:
假设,公司有一款游戏,需要做行为统计分析,数据的源头来自日志,由于用户行为非常多,导致日志量非常大。将日志数据插入数据库然后再进行分析,已经满足不了。最好的办法是存日志,然后通过对日志的分析,计算出有用的数据。我们采用kafka这种分布式日志系统来实现这一过程。 |
步骤如下:
如果你还没有搭建起来,可以参考我的博客: http://zhangfengzhe.blog.51cto.com/8855103/1556650
|
2.实现过程
为了快速实现,我们简化日志消息格式。
在eclipse新建JAVA PROJECT,将kafka/libs下*.jar配置到项目build path即可。
Step 1 : 简单的POJO对象(MobileGameLog)
private String actionType; private String appKey; private String guid; private String time; 说明: actionType 代表行为类型 appKey 代表游戏ID guid 代表角色 time 代表时间 提供getter/setter方法,并override toString() |
Step 2 : 提供serializer
需要注意的是,POJO对象需要序列化转化成KAFKA识别的消息存储格式--byte[]
public class MobileGameKafkaMessage implements kafka.serializer.Encoder<MobileGameLog>{ @Override public byte[] toBytes(MobileGameLog mobileGameLog) { return mobileGameLog.toString().getBytes(); } public MobileGameKafkaMessage(VerifiableProperties props){ } } |
Step 3 : 提供Partitioner
我们可以提供Partitioner,这样可以使得数据按照我们的策略来存储在brokers中。
这里,我根据appKey来进行分区。
Step 4 : 提供Producer
启动zookeeper: [[email protected] kafka_2.9.2-0.8.1.1]# bin/zookeeper-server-start.sh config/zookeeper.properties & 启动kafka broker(id=0): [[email protected] kafka_2.9.2-0.8.1.1]# bin/kafka-server-start.sh config/server.properties & 启动kafka broker(id=1) [[email protected] kafka_2.9.2-0.8.1.1]# bin/kafka-server-start.sh config/server-1.properties & 上述过程,在我的博客【搭建kafka运行环境】里面都有详细记录,大家可以参考。 创建一个topic: [[email protected] kafka_2.9.2-0.8.1.1]# bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic log_1 --replication-factor 2 --partitions 3 注意topic:log_1有3个分区,2个复制。
// Producer<key , value> // V: type of the message // K: type of the optional key associated with the message kafka.javaapi.producer.Producer<MobileGameLog, MobileGameLog> producer = new Producer<MobileGameLog, MobileGameLog>( config); List<KeyedMessage<MobileGameLog, MobileGameLog>> list = new ArrayList<KeyedMessage<MobileGameLog, MobileGameLog>>(); // 5条tlbb数据 for (int i = 1; i <= 5; i++) { MobileGameLog log = new MobileGameLog(); log.setActionType("YuanBaoShop"); log.setAppKey("tlbb"); log.setGuid("xxx_" + i); log.setTime("2014-10-01 10:00:20"); KeyedMessage<MobileGameLog, MobileGameLog> keyedMessage = new KeyedMessage<MobileGameLog, MobileGameLog>( "log_1", log, log); list.add(keyedMessage); } // 8条ldj数据 for (int i = 1; i <= 8; i++) { MobileGameLog log = new MobileGameLog(); log.setActionType("BlackMarket"); log.setAppKey("ldj"); log.setGuid("yyy_" + i); log.setTime("2014-10-02 10:00:20"); KeyedMessage<MobileGameLog, MobileGameLog> keyedMessage = new KeyedMessage<MobileGameLog, MobileGameLog>( "log_1", log, log); list.add(keyedMessage); } producer.send(list); producer.close(); 说明: a.producer既可以send 一个keyedMessage,可以是一个keyedMessage list. b.注意producer实例化时的泛型。value是消息对象,即POJO,key是这个pojo的标示,这个是要用来进行分区的。 c.producer向broker发送的是KeyedMessage,注意实例化时的泛型,KEY/VALUE的意义同b. d.KeyedMessage需要指明topic name.
-------start info 运行至MobileGameKafkaPartition VerifiableProperties : {metadata.broker.list=192.168.152.2:9092,192.168.152.2:9093, zk.connectiontimeout.ms=6000, request.required.acks=1, partitioner.class=com.sohu.game.kafka.day2.MobileGameKafkaPartition, serializer.class=com.sohu.game.kafka.day2.MobileGameKafkaMessage} -------end info SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. -------start info 运行至MobileGameKafkaPartition的partition方法,分区大小为:3 分区key : tlbb 存储的分区为:0 -------end info -------start info 运行至MobileGameKafkaPartition的partition方法,分区大小为:3 分区key : tlbb 存储的分区为:0 -------end info -------start info 运行至MobileGameKafkaPartition的partition方法,分区大小为:3 分区key : tlbb 存储的分区为:0 -------end info -------start info 运行至MobileGameKafkaPartition的partition方法,分区大小为:3 分区key : tlbb 存储的分区为:0 -------end info -------start info 运行至MobileGameKafkaPartition的partition方法,分区大小为:3 分区key : tlbb 存储的分区为:0 -------end info -------start info 运行至MobileGameKafkaPartition的partition方法,分区大小为:3 分区key : ldj 存储的分区为:2 -------end info -------start info 运行至MobileGameKafkaPartition的partition方法,分区大小为:3 分区key : ldj 存储的分区为:2 -------end info -------start info 运行至MobileGameKafkaPartition的partition方法,分区大小为:3 分区key : ldj 存储的分区为:2 -------end info -------start info 运行至MobileGameKafkaPartition的partition方法,分区大小为:3 分区key : ldj 存储的分区为:2 -------end info -------start info 运行至MobileGameKafkaPartition的partition方法,分区大小为:3 分区key : ldj 存储的分区为:2 -------end info -------start info 运行至MobileGameKafkaPartition的partition方法,分区大小为:3 分区key : ldj 存储的分区为:2 -------end info -------start info 运行至MobileGameKafkaPartition的partition方法,分区大小为:3 分区key : ldj 存储的分区为:2 -------end info -------start info 运行至MobileGameKafkaPartition的partition方法,分区大小为:3 分区key : ldj 存储的分区为:2 -------end info
|
3.原理分析
查看topic:log_1详细信息:
[[email protected] kafka_2.9.2-0.8.1.1]# bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic log_1 Topic: log_1 PartitionCount:3 ReplicationFactor:2 Configs: Topic: log_1 Partition: 0 Leader: 0 Replicas: 1,0 Isr: 0,1 Topic: log_1 Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0,1 Topic: log_1 Partition: 2 Leader: 0 Replicas: 1,0 Isr: 0,1
log_1有2个broker进行储存,每一个broker上有3个分区,并且每一个分区的leader都是broker(id=0)
查看broker(id=0)上的信息:
[[email protected] tmp]# ll total 52 drwxr-xr-x 2 root root 4096 Oct 7 01:23 hsperfdata_root drwxr-xr-x 10 root root 4096 Oct 7 02:40 kafka-logs drwxr-xr-x 8 root root 4096 Oct 7 02:40 kafka-logs-1 srwxr-xr-x 1 root root 0 Sep 20 18:15 mapping-root drwxrwxrwt 2 root root 4096 Oct 6 00:34 VMwareDnD drwx------ 2 root root 4096 Oct 6 18:05 vmware-root drwxr-xr-x 3 root root 4096 Sep 20 19:58 zookeeper [[email protected] tmp]# [[email protected] tmp]# [[email protected] tmp]# [[email protected] tmp]# cd kafka-logs [[email protected] kafka-logs]# pwd /tmp/kafka-logs [[email protected] kafka-logs]# ll total 80 drwxr-xr-x 2 root root 4096 Oct 7 01:02 log_1-0 drwxr-xr-x 2 root root 4096 Oct 7 01:02 log_1-1 drwxr-xr-x 2 root root 4096 Oct 7 01:02 log_1-2 drwxr-xr-x 2 root root 4096 Oct 6 01:01 my_first_topic-0 -rw-r--r-- 1 root root 100 Oct 7 02:40 recovery-point-offset-checkpoint -rw-r--r-- 1 root root 100 Oct 7 02:40 replication-offset-checkpoint drwxr-xr-x 2 root root 4096 Oct 6 01:01 test-0 drwxr-xr-x 2 root root 4096 Oct 6 01:01 topic_1-0 drwxr-xr-x 2 root root 4096 Sep 21 00:21 topic_2-0 drwxr-xr-x 2 root root 4096 Sep 21 00:22 topic_3-0 [[email protected] kafka-logs]# cd log_1-0/ [[email protected] log_1-0]# ll total 12 -rw-r--r-- 1 root root 10485760 Oct 7 01:16 00000000000000000000.index -rw-r--r-- 1 root root 1020 Oct 7 01:18 00000000000000000000.log [[email protected] log_1-0]# cat -A 00000000000000000000.log ^@^@^@^@^@^@^@^@^@^@^@[email protected]^L2M-V^@^@^@^@^@YMobileGameLog [actionType=YuanBaoShop, appKey=tlbb, guid=xxx_1, time=2014-10-01 10:00:20]^@^@^@YMobileGameLog [actionType=YuanBaoShop, appKey=tlbb, guid=xxx_1, time=2014-10-01 10:00:20]^@^@^@^@^@^@^@^A^@^@^@[email protected]^^M-46M-h^@^@^@^@^@YMobileGameLog [actionType=YuanBaoShop, appKey=tlbb, guid=xxx_2, time=2014-10-01 10:00:20]^@^@^@YMobileGameLog [actionType=YuanBaoShop, appKey=tlbb, guid=xxx_2, time=2014-10-01 10:00:20]^@^@^@^@^@^@^@^B^@^@^@[email protected]=^@^@^@^@^@YMobileGameLog [actionType=YuanBaoShop, appKey=tlbb, guid=xxx_3, time=2014-10-01 10:00:20]^@^@^@YMobileGameLog [actionType=YuanBaoShop, appKey=tlbb, guid=xxx_3, time=2014-10-01 10:00:20]^@^@^@^@^@^@^@^C^@^@^@[email protected]^\M-58M-U^@^@^@^@^@YMobileGameLog [actionType=YuanBaoShop, appKey=tlbb, guid=xxx_4, time=2014-10-01 10:00:20]^@^@^@YMobileGameLog [actionType=YuanBaoShop, appKey=tlbb, guid=xxx_4, time=2014-10-01 10:00:20]^@^@^@^@^@^@^@^D^@^@^@[email protected]^@^@^@^@^@^@YMobileGameLog [actionType=YuanBaoShop, appKey=tlbb, guid=xxx_5, time=2014-10-01 10:00:20]^@^@^@YMobileGameLog [actionType=YuanBaoShop, appKey=tlbb, guid=xxx_5, time=2014-10-01 10:00:20][[email protected] log_1-0]#
注意kafka broker(id=0)的日志信息显示:
有log_1-0,log_1-1,log_1-2三个目录,对应于0,1,2三个分区。
说明,topic在broker上是以partition为单位进行储存的。
上面的0分区的日志信息显示,tlbb的5条数据都被储存了2遍,并且可以发现在分区内,都是有序的。
我们在创建log_1时指定复制2份,所以数据在分区内被储存了2遍。
同理,我们继续分析broker(id=0)上的1,2分区的内容,有:
分区1无数据,分区2上8条ldj的数据被储存了2遍。
由于我们只制造了2种appkey的数据,根据分区函数,只会返回2个partition number,所以导致有一个分区没有数据。
同上的,继续分析broker(id=1)上的0,1,2分区的内容,有:
分区0,tlbb的5条数据被储存2遍
分区1,没有数据
分区2,ldj的8条数据被储存2遍
可见,broker(id=0),broker(id=1)他们的分区数据完全一致,这也就是为什么kafka的高可用性,某些broker挂了,其他的broker还可以继续提供服务和数据。