Kafka学习--spring boot 整合kafka

一、启动kafka

  启动kafka之前一定要启动zookeeper,因为要使用kafka必须要使用zookeeper。

  windows环境下启动,直接使用kafka自带的zookeeper:

  E:\kafka_2.12-2.4.0\bin\windows  zookeeper-server-start.bat ..\..\config\zookeeper.properties

  接下来启动kafka

  E:\kafka_2.12-2.4.0\bin\windows  kafka-server-start.bat ..\..\config\server.properties

二、spring boot整合kafka项目实例

1.导入的maven

<dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

配置文件:

server.port=80
#kafka地址,可以有多个
spring.kafka.bootstrap-servers=127.0.0.1:9092
#------生产者配置文件---------
#指定kafka消息体和key的编码格式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#设置等待acks返回的机制,有三个值
# 0:不等待返回的acks(可能会丢数据,因为发送消息没有了失败重试机制,但是这是最低延迟)
# 1:消息发送给kafka分区中的leader后就返回(如果follower没有同步完成leader就宕机了,就会丢数据)
# -1(默认):等待所有follower同步完消息后再发送(绝对不会丢数据)
spring.kafka.producer.acks=-1

# 消息累计到batch-size的值后,才会发送消息,默认为16384
spring.kafka.producer.batch-size=16384
#如果kafka迟迟不发送消息(这里指的是消息没堆积到指定数量),那么过了这个时间(单位:毫米)开始发送
spring.kafka.producer.properties.linger.ms=1
#设置缓冲区大小,默认是33554432
#这个缓冲区是kafka中两个线程里的共享变量
#这个两个线程是main和sender,main负责把消息发送到共享变量,sender从共享变量拉数据
spring.kafka.producer.buffer-memory=33554432

#失败重试发送的次数
spring.kafka.producer.retries=2
#------消费者配置文件---------
#指定kafka消息体和key的编码格式
spring.kafka.consumer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#指定消费者组的group_id
spring.kafka.consumer.group-id=kafka_test
#kafka意外宕机时,再次开启消息消费的策略,共有三种策略
#earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
#latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据、
#none:当所有分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
spring.kafka.consumer.auto-offset-reset=earliest

#自动提交offset
spring.kafka.consumer.enable-auto-commit=true
#提交offset时间间隔
spring.kafka.consumer.auto-commit-interval=100
#消费监听接口监听的主题不存在时,默认会报错因此要关掉这个
spring.kafka.listener.missing-topics-fatal=false

2.创建topic

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 使用代码创建的topic
 * 三个参数意思:topic的名称;分区数量,新主题的复制因子;如果指定了副本分配,则为-1。
 */
@Configuration
public class KafkaTopic {

     @Bean
    public NewTopic batchTopic() {
        return new NewTopic("testTopic", 8, (short) 1);
    }
}

3.生产者代码

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * kafka生产者代码
 */
@RestController
public class ProductorController {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @RequestMapping("/test")
    public String show() {
        kafkaTemplate.send("testTopic", "你好");
        return "发送成功";
    }

}

4.消费者代码

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;

import java.util.Optional;

/**
 * kafka消费者代码
 */
@Configuration
public class KafkaConsumer {

    @KafkaListener(topics = "testTopic")
    public void consumer(ConsumerRecord consumerRecord){
        Optional<Object> kafkaMassage = Optional.ofNullable(consumerRecord.value());
        if(kafkaMassage.isPresent()){
            Object o = kafkaMassage.get();
            System.out.println("接收到的消息是:"+o);
        }

    }

}

测试结果:

原文地址:https://www.cnblogs.com/daijiting/p/12243520.html

时间: 2024-08-23 22:58:00

Kafka学习--spring boot 整合kafka的相关文章

spring boot 整合kafka 报错 Exception thrown when sending a message with key=&#39;null&#39; and payload=JSON to topic proccess_trading_end: TimeoutException: Failed to update metadata after 60000 ms.

org.springframework.kafka.support.LoggingProducerListener- Exception thrown when sending a message with key='null' and payload='{"dataDts":["20180329","20180328","20180327","20180326","20180323"]

Spring Kafka和Spring Boot整合实现消息发送与消费简单案例

本文主要分享下Spring Boot和Spring Kafka如何配置整合,实现发送和接收来自Spring Kafka的消息. 先前我已经分享了Kafka的基本介绍与集群环境搭建方法.关于Kafka的介绍请阅读Apache Kafka简介与安装(一),关于Kafka安装请阅读Apache Kafka安装,关于Kafka集群环境搭建请阅读Apache Kafka集群环境搭建 .这里关于服务器环境搭建不在赘述. Spring Kafka整合Spring Boot创建生产者客户端案例 创建一个kafk

spring boot整合kafka

最近项目需求用到了kafka信息中间件,在此做一次简单的记录,方便以后其它项目用到. 引入依赖 <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> 配置文件 kafka.consumer.servers=127.0.0.1:9092 kafka.consumer.ena

kafka学习(五)Spring Boot 整合 Kafka

一.创建Spring boot 工程 创建过程不再描述,创建后的工程结构如下: POM文件中要加入几个依赖: <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:sch

Spring Boot 之 Kafka

Kafka是一个分布式的流平台,具有以下功能: 1.发布和订阅消息. 2.以容错的方式存储消息流. 3.实时的流处理. 主要的两大应用: 1.作为实时的流数据通道,在应用程序之间传递消息. 2.构建实时流处理,对数据流进行转换或反应. 四个核心的API: Producer API(输出向),Consumer API(输入向),Streams API(输入&&输出向),Connector API(输入||输出向) https://www.orchome.com/5 Kafka的安装: htt

spring boot整合jsp的那些坑(spring boot 学习笔记之三)

Spring Boot 整合 Jsp 步骤: 1.新建一个spring boot项目 2.修改pom文件 <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-web</artifactId>        </dependency>        <depend

activeMQ入门+spring boot整合activeMQ

最近想要学习MOM(消息中间件:Message Oriented Middleware),就从比较基础的activeMQ学起,rabbitMQ.zeroMQ.rocketMQ.Kafka等后续再去学习. 上面说activeMQ是一种消息中间件,可是为什么要使用activeMQ? 在没有使用JMS的时候,很多应用会出现同步通信(客户端发起请求后需要等待服务端返回结果才能继续执行).客户端服务端耦合.单一点对点(P2P)通信的问题,JMS可以通过面向消息中间件的方式很好的解决了上面的问题. JMS规

Spring boot整合jsp

这几天在集中学习Spring boot+Shiro框架,因为之前view层用jsp比较多,所以想在spring boot中配置jsp,但是spring boot官方不推荐使用jsp,因为jsp相对于一些模板引擎,性能都比较低,官方推荐使用thymeleaf,但是Spring boot整合jsp的过程已经完成,在这里记录一下. 这篇博文是在LZ上篇文章spring boot+mybatis整合基础上写的,开发工具仍然是Intellij idea.这篇文章的重点是Intellij idea的设置,否

Spring Boot整合EhCache

本文讲解Spring Boot与EhCache的整合. 1 EhCache简介 EhCache 是一个纯Java的进程内缓存框架,具有快速.精干等特点,是Hibernate中默认CacheProvider.Ehcache是一种广泛使用的开源Java分布式缓存.主要面向通用缓存,Java EE和轻量级容器.它具有内存和磁盘存储,缓存加载器,缓存扩展,缓存异常处理程序,一个gzip缓存servlet过滤器,支持REST和SOAP api等特点. 2 Spring Boot整合EhCache步骤 2.