SpringBoot(八) SpringBoot整合Kafka

window下安装kafka和zooker,超详细:https://blog.csdn.net/weixin_33446857/article/details/81982455

kafka:安装下载教程网址(CentOS Linux):https://www.cnblogs.com/subendong/p/7786547.html

zooker的下载安装网址:https://blog.csdn.net/ring300/article/details/80446918

一、准备工作
提前说明:如果你运行出问题,请检查Kafka的版本与SpringBoot的版本是否与我文中的一致,本文中的环境已经经过测试。

Kafka服务版本为 kafka_2.11-1.1.0 (Scala), 也就是1.1.0
SpringBoot版本:1.5.10.RELEASE

提前启动zk,kafka,并且创建一个Topic

[[email protected] kafka_2.11-1.1.0]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test_topic

确保你的kafka能够访问,如果访问不了,需要打开外网访问。
config/server.properties

advertised.listeners=PLAINTEXT://192.168.239.128:9092

Maven 依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
</dependency>

二、项目结构
为了更加体现实际开发需求,一般生产者都是在调用某些接口的服务处理完逻辑之后然后往kafka里面扔数据,然后有一个消费者不停的监控这个Topic,然后处理数据,所以这里把生产者作为一个接口,消费者放到kafka这个目录下,注意@Component注解,不然扫描不到@KafkaListener

三、具体实现代码
SpringBoot配置文件
application.yml

spring:
kafka:
bootstrap-servers: 192.168.239.128:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: test
enable-auto-commit: true
auto-commit-interval: 1000
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

生产者
package cn.saytime.web;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
* 测试kafka生产者
*/
@RestController
@RequestMapping("kafka")
public class TestKafkaProducerController {

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

@RequestMapping("send")
public String send(String msg){
kafkaTemplate.send("test_topic", msg);
return "success";
}

}
消费者
这里的消费者会监听这个主题,有消息就会执行,不需要进行while(true)

package cn.saytime.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

/**
* kafka消费者测试
*/
@Component
public class TestConsumer {

@KafkaListener(topics = "test_topic")
public void listen (ConsumerRecord<?, ?> record) throws Exception {
System.out.printf("topic = %s, offset = %d, value = %s \n", record.topic(), record.offset(), record.value());
}
}

项目启动类

package cn.saytime;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class TestApplication{

public static void main(String[] args) {
SpringApplication.run(TestApplication.class, args);
}
}

四、测试
运行项目,执行:http://localhost:8080/kafka/send?msg=hello

控制台输出:

topic = test_topic, offset = 19, value = hello
1
为了体现消费者不止执行一次就结束,再调用一次接口:
http://localhost:8080/kafka/send?msg=kafka

topic = test_topic, offset = 20, value = kafka
1
所以可以看到这里消费者实际上是不停的poll Topic数据的。

原文地址:https://www.cnblogs.com/2019lgg/p/11679389.html

时间: 2024-07-30 12:55:21

SpringBoot(八) SpringBoot整合Kafka的相关文章

SpringBoot(八)----SpringBoot配置日志文件

今天介绍一下SpringBoot配置日志文件 SpringBoot在所有的内部日志中使用Commons Logging,但是默认配置也提供了对常用日志的支持,如Java Util Logging,Log4J,Log4J2和Logback.但是每种Logger都可以通过配置使用控制台或者文件输出日志内容. 一.SpringBoot默认日志Logback SLF4J,是一个针对各类Java日志框架的统一Façade抽象.Java有很多的日志框架,如java.util.logging,log4j,lo

SpringBoot(八) -- SpringBoot与Docker

一.Docker简介 Docker是一个开源的应用容器引擎,基于Go语言并遵从Apache2.0协议开源.Docker可以让开发者打包他们的应用以及依赖到一个轻量级,可移植的容器中,然后发布到任何流行的Linux机器上,也可以实现虚拟化. 容器完全使用沙箱机制,相互之间不会有任何接口,更重要的是容器性能开销极低.Docker支持将软件编译成一个镜像;然后在镜像中各种软件做好布置,将镜像发布出去,其他使用者可以直接使用这个镜像.运行中的这个镜像称为容器,容器启动时非常快速的. 二.Docker核心

SpringBoot整合Kafka和Storm

前言 本篇文章主要介绍的是SpringBoot整合kafka和storm以及在这过程遇到的一些问题和解决方案. kafka和storm的相关知识 如果你对kafka和storm熟悉的话,这一段可以直接跳过!如果不熟,也可以看看我之前写的博客.一些相关博客如下. kafka 和 storm的环境安装 地址:http://www.panchengming.com/2018/01/26/pancm70/ kafka的相关使用 地址:http://www.panchengming.com/2018/01

SpringBoot进阶教程(六十二)整合Kafka

在上一篇文章<Linux安装Kafka>中,已经介绍了如何在Linux安装Kafka,以及Kafka的启动/关闭和创建发话题并产生消息和消费消息.这篇文章就介绍介绍SpringBoot整合Kafka. v创建项目 若是已有的项目中添加kafka, 请直接跳至1.3 1.1 创建springboot: 1.2 选web和kafka: 1.3 已有的项目中添加kafka, pom.xml中添加依赖 <dependency> <groupId>org.springframew

springboot+mybatis+springmvc整合实例

以往的ssm框架整合通常有两种形式,一种是xml形式,一种是注解形式,不管是xml还是注解,基本都会有一大堆xml标签配置,其中有很多重复性的.springboot带给我们的恰恰是"零配置","零配置"不等于什么也不配置,只是说相对于传统的ssm框架的xml配置或是注解配置,要少的多.作为常规的来说,一个ssm框架整合,拿maven来说,首先在src/main/resource下加入jdbc.properties,spring-mvc.xml,spring-myba

springboot同mybatis整合

springboot和mybatis整合有两种开发模式,首先要做的是配置好开发环境, 实现步骤: 在maven文件pom中配置: 1)SpringBoot同Mybatis整合的依赖. <dependency> <groupId>com.ruijc</groupId> <artifactId>spring-boot-starter-mybatis</artifactId> <version>3.2.2</version> &

SpringBoot: 10.整合mybatis(转)

需求:通过使用 SpringBoot+SpringMVC+MyBatis 整合实现一个对数据库中的 t_user 表的 CRUD 的操作 1.创建maven项目,添加项目所需依赖 <!--springboot项目依赖的父项目--> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId>

springboot + mybatis + mycat整合

1.mycat服务 搭建mycat服务并启动,windows安装参照. 系列文章: [Mycat 简介] [Mycat 配置文件server.xml] [Mycat 配置文件schema.xml] [Mycat 配置文件rule.xml] 2.相关配置文件 此处我的配置为: schema.xml <?xml version="1.0"?> <!DOCTYPE mycat:schema SYSTEM "schema.dtd"> <myca

springboot与dubbo整合入门(三种方式)

Springboot与Dubbo整合三种方式详解 整合环境: jdk:8.0 dubbo:2.6.2 springboot:2.1.5 项目结构: 1.搭建项目环境: (1)创建父项目与三个子项目,创建项目时,都使用spring initializr,创建时,父项目中注意的一点: (2)创建三个子项目,在已有的父项目上右键,新建模块: (3)创建完成后:将三个子项目在父项目pom.xml中配置: (4)修改所有子项目中的parent标签:(删掉之前parent中的springboot)修改为:

springBoot和MyBatis整合中出现SpringBoot无法启动时处理方式

在springBoot和Myatis   整合中出现springBoot无法启动   并且报以下错误 Description: Field userMapper in cn.lijun.controller.UserController required a bean of type 'cn.lijun.mapper.UserMapper' that could not be found. Action: Consider defining a bean of type 'cn.lijun.ma