kafka

kafka生产实践

1.引言

最近接触到一个APP流量分析的项目,类似于友盟。涉及到几个C端(客户端)高并发的接口,这几个接口主要用于C端数据的提交。在没有任何缓冲的情况下,一个接口涉及到5张表的提交。压测的结果很不理想,主要瓶颈就在与RDS的交互。

一台双核,16G机子,单实例,jdbc最大连接数100,吞吐量竟然只有50TPS。

能想到的改造方案就是引入一层缓冲,让C端接口不与RDS直接交互,很自然就想到了rabbitmq,但是rabbitmq对分布式的支持比较一般,我们的数据体量也比较大,所以我们借鉴了友盟,引入了kafka,Kafka是一种高吞吐量的分布式发布订阅消息系统,起初在不做任何kafka优化的时候,简单地将C端提交的数据直接send到单节点kafka,就这样,我们的吞吐量达到了100TPS.还是有点小惊喜的。

最近一段时间研究了一下kafka,对一些参数进行调整,目前接口的吞吐量已经达到220TPS,写这篇文章主要想记录一下自己优化和部署经历。

2.kafka简介

kafka的结构图

这张图很好的诠释了kafka的结构,但是遗漏了一点,就是group的概念,我这里补充一下,一个组可以包含多个consumer对多个topic进行消费,但是不同组的消费都是独立的。

也就是说同一个topic的同一条消息可以被不同组的consumer消费。

我这里的主要的优化途径就是将kafka集群化,多partition化,使其并发度更高。

集群化都很好理解,那什么是多partition?

partition是topic的一个概念,即对topic进行分组,不同partition之间的消费相互独立,且有序。并且任意Partition在某一个时刻只能被一个Consumer Group内的一个Consumer消费,所以咯,假如topic只有一个partition的话,那么在一个Consumer Group内有效的消费者实例最多也就1个,并发度就会受限于kafka的partition数目。

上面都是讲消费,其实send操作也是一样的,要保证有序必然要等上一个发送ack之后,下一个发送才能进行,如果只有一个partition,那send之后的ack的等待时间必然会阻塞下面一次send,设计多个partition之后,可以同时往多个partition发送消息,自然吞吐量也就上去。

3.kafka集群的搭建以及参数配置

集群搭建

准备两台机子,然后去官网(http://kafka.apache.org/downloads)下载一个包。通过scp到服务器上,解压进入config目录,编辑server.config.

第一台机子配置(172.18.240.36):

broker.id=0  每台服务器的broker.id都不能相同

#hostname
host.name=172.18.240.36

#在log.retention.hours=168 下面新增下面三项
message.max.byte=5242880
default.replication.factor=2
replica.fetch.max.bytes=5242880

#设置zookeeper的连接端口
zookeeper.connect=172.18.240.36:4001#默认partition数num.partitions=2

第二台机子配置(172.18.240.62):

broker.id=1  每台服务器的broker.id都不能相同

#hostname
host.name=172.18.240.62

#在log.retention.hours=168 下面新增下面三项
message.max.byte=5242880
default.replication.factor=2
replica.fetch.max.bytes=5242880

#设置zookeeper的连接端口
zookeeper.connect=172.18.240.36:4001
#默认partition数
num.partitions=2

新增或者修改成以上配置。

对了,在此之前请先安装zookeeper,如果你用的是zookeeper集群的话,zookeeper.connect可以填写多个,中间用逗号隔开。

然后启动


1

nohup  ./kafka-server-start.sh ../config/server.properties 1>/dev/null 2>&1 &

测试一下:

在第一台机子上开启一个producer

./kafka-console-producer.sh --broker-list 172.18.240.36:9092 --topic test-test

在第二台机子上开启一个consumer

./kafka-console-consumer.sh --bootstrap-server 172.18.240.62:9092 --topic test-test --from-beginning

第一台机子发送一条消息

第二台机子立马收到消息

这样kafka的集群部署就完成了。就下来我们来看看,java的客户端代码如何编写。

4.kafka客户端代码示例

我这里的工程是建立在spring boot 之下的,仅供参考。

在 application.yml下添加如下配置:

kafka:
  consumer:
    default:
      server: 172.18.240.36:9092,172.18.240.62:9092
      enableAutoCommit: false
      autoCommitIntervalMs: 100
      sessionTimeoutMs: 15000
      groupId: data_analysis_group
      autoOffsetReset: latest
  producer:
    default:
      server: 172.18.240.36:9092,172.18.240.62:9092
      retries: 0
      batchSize: 4096
      lingerMs: 1
      bufferMemory: 40960

添加两个配置类

package com.dtdream.analysis.config;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;

import java.util.HashMap;
import java.util.Map;

@ConfigurationProperties(
        prefix = "kafka.consumer.default"
)
@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    private static final Logger log = LoggerFactory.getLogger(KafkaConsumerConfig.class);
    private static String autoCommitIntervalMs;

    private static String sessionTimeoutMs;

    private static Class keyDeserializerClass = StringDeserializer.class;

    private static Class valueDeserializerClass = StringDeserializer.class;

    private static String groupId = "test-group";

    private static String autoOffsetReset = "latest";

    private static String server;

    private static boolean enableAutoCommit;

    public static String getServer() {
        return server;
    }

    public static void setServer(String server) {
        KafkaConsumerConfig.server = server;
    }

    public static boolean isEnableAutoCommit() {
        return enableAutoCommit;
    }

    public static void setEnableAutoCommit(boolean enableAutoCommit) {
        KafkaConsumerConfig.enableAutoCommit = enableAutoCommit;
    }

    public static String getAutoCommitIntervalMs() {
        return autoCommitIntervalMs;
    }

    public static void setAutoCommitIntervalMs(String autoCommitIntervalMs) {
        KafkaConsumerConfig.autoCommitIntervalMs = autoCommitIntervalMs;
    }

    public static String getSessionTimeoutMs() {
        return sessionTimeoutMs;
    }

    public static void setSessionTimeoutMs(String sessionTimeoutMs) {
        KafkaConsumerConfig.sessionTimeoutMs = sessionTimeoutMs;
    }

    public static Class getKeyDeserializerClass() {
        return keyDeserializerClass;
    }

    public static void setKeyDeserializerClass(Class keyDeserializerClass) {
        KafkaConsumerConfig.keyDeserializerClass = keyDeserializerClass;
    }

    public static Class getValueDeserializerClass() {
        return valueDeserializerClass;
    }

    public static void setValueDeserializerClass(Class valueDeserializerClass) {
        KafkaConsumerConfig.valueDeserializerClass = valueDeserializerClass;
    }

    public static String getGroupId() {
        return groupId;
    }

    public static void setGroupId(String groupId) {
        KafkaConsumerConfig.groupId = groupId;
    }

    public static String getAutoOffsetReset() {
        return autoOffsetReset;
    }

    public static void setAutoOffsetReset(String autoOffsetReset) {
        KafkaConsumerConfig.autoOffsetReset = autoOffsetReset;
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(10);
        factory.getContainerProperties().setPollTimeout(3000);
        factory.setRecordFilterStrategy(new RecordFilterStrategy<String, String>() {
            @Override
            public boolean filter(ConsumerRecord<String, String> consumerRecord) {
                log.debug("partition is {},key is {},topic is {}",
                        consumerRecord.partition(), consumerRecord.key(), consumerRecord.topic());
                return false;
            }
        });
        return factory;
    }

    private ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    private Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs);
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        return propsMap;

    }

   /* @Bean
    public Listener listener() {
        return new Listener();
    }*/
}

package com.dtdream.analysis.config;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

/**
 * Created with IntelliJ IDEA.
 * User: chenqimiao
 * Date: 2017/7/24
 * Time: 9:43
 * To change this template use File | Settings | File Templates.
 */
@ConfigurationProperties(
        prefix = "kafka.producer.default",
        ignoreInvalidFields = true
)//注入一些属性域
@EnableKafka
@Configuration//使得@Bean注解生效
public class KafkaProducerConfig {
    private static String server;
    private static Integer retries;
    private static Integer batchSize;
    private static Integer lingerMs;
    private static Integer bufferMemory;
    private static Class keySerializerClass = StringSerializer.class;
    private static Class valueSerializerClass = StringSerializer.class;

    private Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
        props.put(ProducerConfig.RETRIES_CONFIG, retries);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
        props.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass);
        return props;
    }

    private ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    public static String getServer() {
        return server;
    }

    public static void setServer(String server) {
        KafkaProducerConfig.server = server;
    }

    public static Integer getRetries() {
        return retries;
    }

    public static void setRetries(Integer retries) {
        KafkaProducerConfig.retries = retries;
    }

    public static Integer getBatchSize() {
        return batchSize;
    }

    public static void setBatchSize(Integer batchSize) {
        KafkaProducerConfig.batchSize = batchSize;
    }

    public static Integer getLingerMs() {
        return lingerMs;
    }

    public static void setLingerMs(Integer lingerMs) {
        KafkaProducerConfig.lingerMs = lingerMs;
    }

    public static Integer getBufferMemory() {
        return bufferMemory;
    }

    public static void setBufferMemory(Integer bufferMemory) {
        KafkaProducerConfig.bufferMemory = bufferMemory;
    }

    public static Class getKeySerializerClass() {
        return keySerializerClass;
    }

    public static void setKeySerializerClass(Class keySerializerClass) {
        KafkaProducerConfig.keySerializerClass = keySerializerClass;
    }

    public static Class getValueSerializerClass() {
        return valueSerializerClass;
    }

    public static void setValueSerializerClass(Class valueSerializerClass) {
        KafkaProducerConfig.valueSerializerClass = valueSerializerClass;
    }

    @Bean(name = "kafkaTemplate")
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

利用kafkaTemplate即可完成发送。

@Autowired
private KafkaTemplate<String,String> kafkaTemplate;

 @RequestMapping(
            value = "/openApp",
            method = RequestMethod.POST,
            produces = MediaType.APPLICATION_JSON_UTF8_VALUE,
            consumes = MediaType.APPLICATION_JSON_UTF8_VALUE
    )
    @ResponseBody
    public ResultDTO openApp(@RequestBody ActiveLogPushBo activeLogPushBo, HttpServletRequest request) {

        logger.info("openApp: activeLogPushBo {}, dateTime {}", JSONObject.toJSONString(activeLogPushBo),new DateTime().toString("yyyy-MM-dd HH:mm:ss.SSS"));

        String ip = (String) request.getAttribute("ip");

        activeLogPushBo.setIp(ip);

        activeLogPushBo.setDate(new Date());

        //ResultDTO resultDTO = dataCollectionService.collectOpenInfo(activeLogPushBo);

        kafkaTemplate.send("data_collection_open",JSONObject.toJSONString(activeLogPushBo));

       // logger.info("openApp: resultDTO {} ,dateTime {}", resultDTO.toJSONString(),new DateTime().toString("yyyy-MM-dd HH:mm:ss.SSS"));

        return new ResultDTO().success();
    }

kafkaTemplate的send方法会更根据你指定的key进行hash,再对partition数进行去模,最后决定发送到那一个分区,假如没有指定key,那send方法对分区的选择是随机。具体怎么随机的话,这里就不展开讲了,有兴趣的同学可以自己看源码,我们可以交流交流。

接着配置一个监听器

package com.dtdream.analysis.listener;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;

import java.util.Optional;
@Component
public class Listener {

    private Logger logger = LoggerFactory.getLogger(this.getClass());

    @KafkaListener(topics = {"test-topic"})
    public void listen(ConsumerRecord<?, ?> record) {
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {
            Object message = kafkaMessage.get();
            logger.info("message is {} ", message);
        }
    }
}

@KafkaListener其实可以具体指定消费哪个分区,如果不指定的话,并且只有一个消费者实例,那么这个实例会消费所有的分区的消息。

消费者的数量是一定要少于partition的数量的,不然没有任何意义。会出现消费者过剩的情况。

消费者数量和partition数量的多与少,会动态影响消费节点所消费的partition数目,最终会在整个集群中达到一种动态平衡。具体的关系可以参考http://www.jianshu.com/p/6233d5341dfe

5.总结

理论上只要cpu核心数无限,那么partition数也可以无上限,与此同时消费者节点和生产者节点也可以无上限,最终会使单个topic的并发无上限。单机的cpu的核心数总是会达到一个上限,kafka作为分布式系统,可以很好利用集群的运算能力,进行动态扩展,在DT时代,应该会慢慢成为主流吧。

时间: 2024-10-26 10:26:14

kafka的相关文章

Kafka----Apache Kafka官网首页

Apache Kafka  是发布-订阅机制的消息系统,可以认为具有分布式日志提交功能. Fast-快速 一个单独的Kafka  broker每秒可以处理来自成千上万个客户端的数百兆字节的读写操作. Scalable-可扩展性 对于大规模系统来说,一个单独的kafka集群从设计上就实现了数据中心的功能,而且无需宕机就能提供弹性而又透明的扩展,在数据存储方式上,kafka采用了分区设计理念,它通过将数据分别存储在集群中服务器这种方式,使得集群存储能力远大于单个服务器,这样也使得消费者可以从集群中不

rabbitMQ、activeMQ、zeroMQ、Kafka、Redis 比较

Kafka作为时下最流行的开源消息系统,被广泛地应用在数据缓冲.异步通信.汇集日志.系统解耦等方面.相比较于RocketMQ等其他常见消息系统,Kafka在保障了大部分功能特性的同时,还提供了超一流的读写性能. 针对Kafka性能方面进行简单分析,相关数据请参考:https://segmentfault.com/a/1190000003985468,下面介绍一下Kafka的架构和涉及到的名词: Topic:用于划分Message的逻辑概念,一个Topic可以分布在多个Broker上. Parti

kafka入门:简介、使用场景、设计原理、主要配置及集群搭建(转)

问题导读: 1.zookeeper在kafka的作用是什么? 2.kafka中几乎不允许对消息进行"随机读写"的原因是什么? 3.kafka集群consumer和producer状态信息是如何保存的? 4.partitions设计的目的的根本原因是什么? 一.入门 1.简介 Kafka is a distributed,partitioned,replicated commit logservice.它提供了类似于JMS的特性,但是在设计实现上完全不同,此外它并不是JMS规范的实现.k

kafka producer实例及原理分析

1.前言 首先,描述下应用场景: 假设,公司有一款游戏,需要做行为统计分析,数据的源头来自日志,由于用户行为非常多,导致日志量非常大.将日志数据插入数据库然后再进行分析,已经满足不了.最好的办法是存日志,然后通过对日志的分析,计算出有用的数据.我们采用kafka这种分布式日志系统来实现这一过程. 步骤如下: 搭建KAFKA系统运行环境 如果你还没有搭建起来,可以参考我的博客: http://zhangfengzhe.blog.51cto.com/8855103/1556650 设计数据存储格式

windows 下部署kafka 日记 转

windows 下部署kafka 日记 转一.下载去apache 的官网(http://kafka.apache.org/downloads.html)下载最新的二进制版的压缩包.目前的最新版本是kafka_2.11-0.8.2.1.tgz.二.解压直接解压到D 盘根目录下.三.修改配置文件注意版本不同,可能配置文件不同.请参照实际情况修改.1.修改log4j.properties 文件中的“kafka.logs.dir=logs ”为“kafka.logs.dir=/tmp/logs”.2.修

Kafka Server写数据的时候报错org.apache.kafka.common.errors.RecordTooLargeException

向Kafka中输入数据,抛异常org.apache.kafka.common.errors.RecordTooLargeException 官网两个参数描述如下: message.max.bytes The maximum size of message that the server can receive int 1000012 [0,...] high fetch.message.max.bytes 1024 * 1024 The number of byes of messages to

kafka Consumer分区数与多线程消费topic

单线程消费数据适合在本地跑. 参考文档: http://kafka.apache.org/documentation.html 对于一个topic,可以发送给若干个partitions. partition在创建topic的时候就指定分区的数目. 分区.Offset.消费线程.group.id的关系 1)一组(类)消息通常由某个topic来归类,我们可以把这组消息"分发"给若干个分区(partition),每个分区的消息各不相同: 2)每个分区都维护着他自己的偏移量(Offset),记

kafka中处理超大消息的一些考虑

Kafka设计的初衷是迅速处理短小的消息,一般10K大小的消息吞吐性能最好(可参见LinkedIn的kafka性能测试).但有时候,我们需要处理更大的消息,比如XML文档或JSON内容,一个消息差不多有10-100M,这种情况下,Kakfa应该如何处理? 针对这个问题,有以下几个建议: 最好的方法是不直接传送这些大的数据.如果有共享存储,如NAS, HDFS, S3等,可以把这些大的文件存放到共享存储,然后使用Kafka来传送文件的位置信息. 第二个方法是,将大的消息数据切片或切块,在生产端将数

跟我一起学kafka(一)

从昨天下午接到新任务,要采集一个法院网站得所有公告,大概是需要采集这个网站得所有公告列表里得所有txt内容,txt文件里边是一件件赤裸裸得案件,记录这案由,原告被告等相关属性(不知道该叫什么就称之为属性吧,汗),把这些文件放到本地某个目录,并把一个案件作为一条数据放入数据库中.本以为很轻松得用Jsoup就可以完成,但是我还是低估了政府部门填写数据得人得不规范性,你妹啊,一会英文冒号,一会中文冒号,一会当事人,一会原告人得......气死我了,昨天晚回家了一个钟头,今天又忙活到下午3点才算采集完毕

kafka无法发送消息问题处理

背景 在服务器上搭建了一个单机环境的kafka broker,在服务器上使用命令生产消息时,一切正常.当在本地使用JAVA程序发送消息时,一直出错. 抛出的错误为: Exception in thread "main" Failed to send requests for topics test with correlation ids in [0,12] kafka.common.FailedToSendMessageException: Failed to send messag