spring boot整合kafka

最近项目需求用到了kafka信息中间件,在此做一次简单的记录,方便以后其它项目用到。

引入依赖

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

配置文件

kafka.consumer.servers=127.0.0.1:9092
kafka.consumer.enable.auto.commit=true
kafka.consumer.session.timeout=6000
kafka.consumer.auto.commit.interval=100
kafka.consumer.auto.offset.reset=latest
kafka.consumer.group.id=kafka-test-group
kafka.consumer.concurrency=10

kafka.producer.servers=127.0.0.1:9092
kafka.producer.retries=1
kafka.producer.batch.size=4096
kafka.producer.linger=1
kafka.producer.buffer.memory=40960

生产者配置类

@Configuration
@EnableKafka
public class KafkaProducerConfig {
    @Value("${kafka.producer.servers}")
    private String servers;
    @Value("${kafka.producer.retries}")
    private int retries;
    @Value("${kafka.producer.batch.size}")
    private int batchSize;
    @Value("${kafka.producer.linger}")
    private int linger;
    @Value("${kafka.producer.buffer.memory}")
    private int bufferMemory;
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate(producerFactory());
    }
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ProducerConfig.RETRIES_CONFIG, retries);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
        props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }
}

消费者配置类

@Configuration
@EnableKafka
public class KafkaConsumerConfig {
    @Value("${kafka.consumer.servers}")
    private String servers;
    @Value("${kafka.consumer.enable.auto.commit}")
    private boolean enableAutoCommit;
    @Value("${kafka.consumer.session.timeout}")
    private String sessionTimeout;
    @Value("${kafka.consumer.auto.commit.interval}")
    private String autoCommitInterval;
    @Value("${kafka.consumer.group.id}")
    private String groupId;
    @Value("${kafka.consumer.auto.offset.reset}")
    private String autoOffsetReset;
    @Value("${kafka.consumer.concurrency}")
    private int concurrency;
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(concurrency);
        factory.getContainerProperties().setPollTimeout(1500);
        return factory;
    }
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>(8);
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        return propsMap;
    }
}

生产者类

@Component
public class KafkaProducer {
    private Logger logger = LoggerFactory.getLogger(getClass());
    @Autowired
    private KafkaTemplate kafkaTemplate;

    public void sendMessage(String topic, String message) {
        logger.info("on message:{}", message);
        kafkaTemplate.send(topic, message);
    }
}

消费者类

@Component
public class VideoCosConsumer {
    protected final Logger logger = LoggerFactory.getLogger(this.getClass());

    @KafkaListener(topics = {"test-topic"})
    public void consumerMessage(String message) {
        logger.info("on message:{}", message);
    }
}

  以上就是spring cloud整合kafka的过程,现在spring让我们代码搬运工越来越没有活干了,连复制粘贴都不行了,只能简单的拼装需要的实体类。

原文地址:https://www.cnblogs.com/wolf-bin/p/8179001.html

时间: 2024-10-10 12:20:14

spring boot整合kafka的相关文章

spring boot 整合kafka 报错 Exception thrown when sending a message with key=&#39;null&#39; and payload=JSON to topic proccess_trading_end: TimeoutException: Failed to update metadata after 60000 ms.

org.springframework.kafka.support.LoggingProducerListener- Exception thrown when sending a message with key='null' and payload='{"dataDts":["20180329","20180328","20180327","20180326","20180323"]

Kafka学习--spring boot 整合kafka

一.启动kafka 启动kafka之前一定要启动zookeeper,因为要使用kafka必须要使用zookeeper. windows环境下启动,直接使用kafka自带的zookeeper: E:\kafka_2.12-2.4.0\bin\windows  zookeeper-server-start.bat ..\..\config\zookeeper.properties 接下来启动kafka E:\kafka_2.12-2.4.0\bin\windows  kafka-server-sta

kafka学习(五)Spring Boot 整合 Kafka

一.创建Spring boot 工程 创建过程不再描述,创建后的工程结构如下: POM文件中要加入几个依赖: <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:sch

Spring Kafka和Spring Boot整合实现消息发送与消费简单案例

本文主要分享下Spring Boot和Spring Kafka如何配置整合,实现发送和接收来自Spring Kafka的消息. 先前我已经分享了Kafka的基本介绍与集群环境搭建方法.关于Kafka的介绍请阅读Apache Kafka简介与安装(一),关于Kafka安装请阅读Apache Kafka安装,关于Kafka集群环境搭建请阅读Apache Kafka集群环境搭建 .这里关于服务器环境搭建不在赘述. Spring Kafka整合Spring Boot创建生产者客户端案例 创建一个kafk

activeMQ入门+spring boot整合activeMQ

最近想要学习MOM(消息中间件:Message Oriented Middleware),就从比较基础的activeMQ学起,rabbitMQ.zeroMQ.rocketMQ.Kafka等后续再去学习. 上面说activeMQ是一种消息中间件,可是为什么要使用activeMQ? 在没有使用JMS的时候,很多应用会出现同步通信(客户端发起请求后需要等待服务端返回结果才能继续执行).客户端服务端耦合.单一点对点(P2P)通信的问题,JMS可以通过面向消息中间件的方式很好的解决了上面的问题. JMS规

Spring Boot 之 Kafka

Kafka是一个分布式的流平台,具有以下功能: 1.发布和订阅消息. 2.以容错的方式存储消息流. 3.实时的流处理. 主要的两大应用: 1.作为实时的流数据通道,在应用程序之间传递消息. 2.构建实时流处理,对数据流进行转换或反应. 四个核心的API: Producer API(输出向),Consumer API(输入向),Streams API(输入&&输出向),Connector API(输入||输出向) https://www.orchome.com/5 Kafka的安装: htt

spring boot 整合 quartz 集群环境 实现 动态定时任务配置【原】

最近做了一个spring boot 整合 quartz  实现 动态定时任务配置,在集群环境下运行的 任务.能够对定时任务,动态的进行增删改查,界面效果图如下: 1. 在项目中引入jar 2. 将需要的表导入数据库 官网上有不同数据库的脚本,找到对应的,导入即可 3. java 代码 将quartz 的相关配置文件,配置为暴露bean,方便后期引用. 有一处关键的地方,就是注入spring 上下文,也可以算是一个坑.如果,不注入spring 上下文,那么新添加的定时任务job,是新new 的一个

spring boot 整合spring Data JPA+Spring Security+Thymeleaf框架(上)

最近上班太忙所以耽搁了给大家分享实战springboot 框架的使用. 下面是spring boot 整合多个框架的使用. 首先是准备工作要做好. 第一  导入框架所需的包,我们用的事maven 进行对包的管理. 以上的举例是本人的H5DS的真实的后台管理项目,这个项目正在盛情融资中,各位多多捧点人场.关注一下软件发展的动态,说不定以后就是您的生活不可或缺的软件哟. 点击打开链接.闲话少说.现在切入正题. 第二,写点配置文件 第三,spring data -设计一个简单的po关系,这里需要下载一

spring boot整合jsp的那些坑(spring boot 学习笔记之三)

Spring Boot 整合 Jsp 步骤: 1.新建一个spring boot项目 2.修改pom文件 <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-web</artifactId>        </dependency>        <depend