SpringBoot2 整合Kafka组件,应用案例和流程详解

本文源码:GitHub·点这里 || GitEE·点这里

一、搭建Kafka环境

1、下载解压

-- 下载
wget http://mirror.bit.edu.cn/apache/kafka/2.2.0/kafka_2.11-2.2.0.tgz
-- 解压
tar -zxvf kafka_2.11-2.2.0.tgz
-- 重命名
mv kafka_2.11-2.2.0 kafka2.11

2、启动Kafka服务

kafka依赖ZooKeeper服务,需要本地安装并启动ZooKeeper。

参考文章:Linux系统搭建ZooKeeper3.4中间件,常用命令总结

-- 执行位置
-- /usr/local/mysoft/kafka2.11
bin/kafka-server-start.sh config/server.properties

3、查看服务

ps -aux |grep kafka

4、开放地址端口

-- 基础路径
-- /usr/local/mysoft/kafka2.11/config
vim server.properties
-- 添加下面注释
advertised.listeners=PLAINTEXT://192.168.72.130:9092

二、Kafka基础概念

1、基础描述

Kafka是由Apache开源,具有分布式、分区的、多副本的、多订阅者,基于Zookeeper协调的分布式处理平台,由Scala和Java语言编写。通常用来搜集用户在应用服务中产生的动作日志数据,并高速的处理。日志类的数据需要高吞吐量的性能要求,对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。

2、功能特点

(1)、通过磁盘数据结构提供消息的持久化,消息存储也能够保持长时间稳定性;

(2)、高吞吐量,即使是非常普通的硬件Kafka也可以支持每秒超高的并发量;

(3)、支持通过Kafka服务器和消费机集群来分区消息;

(4)、支持Hadoop并行数据加载;

(5)、API包封装的非常好,简单易用,上手快 ;

(6)、分布式消息队列。Kafka对消息保存时根据Topic进行归类,发送消息者称为Producer,消息接受者称为Consumer;

3、消息功能

  • 点对点模式

点对点模型通常是一个基于拉取或者轮询的消息传递模型,消费者主动拉取数据,消息收到后从队列移除消息,这种模型不是将消息推送到客户端,而是从队列中请求消息。特点是发送到队列的消息被一个且只有一个消费者接收处理,即使有多个消费者监听队列也是如此。

  • 发布订阅模式

发布订阅模型则是一个基于推送的消息传送模型,消息产生后,推送给所有订阅者。发布订阅模型可以有多种不同的订阅者,临时订阅者只在主动监听主题时才接收消息,而持久订阅者则监听主题的所有消息,即使当前订阅者不可用,处于离线状态。

4、消息队列作用

  • 程序解耦,生产者和消费者独立,各自异步执行;
  • 消息数据进行持久化存储,直到被全部消费,规避了数据丢失风险;
  • 流量削峰,使用消息队列承接访问压力,尽量避免程序雪崩 ;
  • 降低进程间的耦合度,系统部分组件崩溃时,不会影响到整个系统;
  • 保证消息顺序执行,解决特定场景业务需求 ;

5、专业术语简介

  • Broker

一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。

  • Producer

消息生产者,就是向kafka broker发消息的客户端。

  • Consumer

消息消费者,向kafka broker取消息的客户端。

  • Topic

每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic,可以理解为一个队列。

  • Consumer Group

每个Consumer属于一个特定的Consumer Group,可为每个Consumer指定group name,若不指定group name则属于默认的分组。

  • Partition

一个庞大大的topic可以分布到多个broker上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体的顺序。Partition是物理上的概念,方便在集群中扩展,提高并发。

三、整合SpringBoot2框架

1、案例结构

  • 消息生产者 : kafka-producer-server
  • 消息消费方 : kafka-consumer-server

2、基础依赖

<!-- SpringBoot依赖 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- kafka 依赖 -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.2.4.RELEASE</version>
</dependency>

3、生产者配置

spring:
  kafka:
    bootstrap-servers: 127.0.0.1:9092

4、消息生成

@RestController
public class ProducerWeb {

    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

    @RequestMapping("/send")
    public String sendMsg () {
        MsgLog msgLog = new MsgLog(1,"消息生成",
                                 1,"消息日志",new Date()) ;
        String msg = JSON.toJSONString(msgLog) ;
        // 这里Topic如果不存在,会自动创建
        kafkaTemplate.send("cicada-topic", msg);
        return msg ;
    }
}

5、消费者配置

spring:
  kafka:
    bootstrap-servers: 127.0.0.1:9092
    consumer:
      group-id: test-consumer-group

6、消息消费

@Component
public class ConsumerMsg {

    private static Logger LOGGER = LoggerFactory.getLogger(ConsumerMsg.class);

    @KafkaListener(topics = "cicada-topic")
    public void listenMsg (ConsumerRecord<?,String> record) {
        String value = record.value();
        LOGGER.info("ConsumerMsg====>>"+value);
    }
}

四、消息流程分析

1、生产者分析

  • 写入方式

生产者基于推push推模式将消息发布到broker,每条消息都被追加到分区patition中,属于磁盘顺序写,效率比随机写内存要高,保障kafka高吞吐量。

  • 分区概念

消息发送时都被发送到一个topic,而topic是由Partition Logs(分区日志)组成,其组织结构如下图所示:

每个Partition中的消息都是有序的,生产的消息被不断追加到Partitionlog上,其中的每一个消息都被赋予了一个唯一的offset值。每个Partition可以通过调整以适配它所在的机器,而一个topic又可以有多个Partition组成,因此整个集群就可以适应任意大小的数据。分区的原则:指定patition,则直接使用;未指定patition但指定key,通过对key的value进行hash出一个patition;patition和key都未指定,使用轮询选出一个patition。

2、消费者分析

  • 消费图解

消费者是以consumer group消费者组的方式工作,由一个或者多个消费者组成一个组,共同消费一个topic。每个分区在同一时间只能由group中的一个消费者读取,但是多个group可以同时消费一个partition。

  • 消费方式

消费者采用pull拉模式从broker中读取数据。对于Kafka而言,pull模式更合适,它可简化broker的设计,consumer可自主控制消费消息的速率,同时consumer可以自己控制消费方式——即可批量消费也可逐条消费,同时还能选择不同的提交方式从而实现不同的数据传输场景。

五、源代码地址

GitHub·地址
https://github.com/cicadasmile/middle-ware-parent
GitEE·地址
https://gitee.com/cicadasmile/middle-ware-parent

推荐阅读:SpringBoot2整合中间件

整合 shard-jdbc 中间件,实现数据分库分表

整合 JavaMail ,实现异步发送邮件功能

整合 RocketMQ ,实现请求异步处理

整合 Swagger2 ,构建接口管理界面

整合 QuartJob ,实现定时器实时管理

整合 Redis集群 ,实现消息队列场景

整合 Dubbo框架 ,实现RPC服务远程调用

整合 ElasticSearch框架,实现高性能搜索引擎

整合 JWT 框架,解决Token跨域验证问题

整合 FastDFS 中间件,实现文件分布管理

整合 Shiro 框架,实现用户权限管理

整合 Security 框架,实现用户权限管理

整合 ClickHouse数据库,实现数据高性能查询分析

整合 Drools规则引擎,实现高效的业务规则

整合MybatisPlus增强插件,配置多数据源

整合 Zookeeper组件,管理架构中服务协调

整合Nacos组件,环境搭建和入门案例详解

原文地址:https://blog.51cto.com/14439672/2471556

时间: 2024-11-13 09:57:22

SpringBoot2 整合Kafka组件,应用案例和流程详解的相关文章

Spring整合JUnit框架进行单元测试代码使用详解

[转]Spring整合JUnit框架进行单元测试代码使用详解 转自 http://blog.csdn.net/yaerfeng/article/details/25187775 感谢博主 :云淡风轻 .仅此一抹 一.Spring提供的JUnit框架扩展: 1. AbstractSpringContextTests:spring中使用spring上下文测试的Junit扩展类,我们一般不会使用这个类来进行单元测试,它是spring内部设计使用到的类    2. AbstractDependencyI

unity3d-配置Android环境,打包发布Apk流程详解

31:unity3d-配置Android环境,打包发布Apk流程详解 作者 阿西纳尼 关注 2016.08.28 22:52 字数 498 阅读 1806评论 0喜欢 5 Unity配置Android环境,打包发布安卓流程 一:SDK与JDK下载地址:http://pan.baidu.com/s/1mhVaXHe下载完成后,解压文件 SDK文件 二.安装 JDK 运行安装程序jdk-7u67-windows-x64 Java-JDK 分别点击下一步进行安装. 安装中 在安装过程中先后会出现两次选

CentOS 5,6 系统启动流程详解

一.linux 组成介绍 1.linux 组成: Linux: kernel+rootfs(根文件系统) kernel: 进程管理.内存管理.网络管理.驱动程序.文件系统.安全功能 rootfs: 程序和glibc 库:函数集合, function, 调用接口(头文件负责描述) 过程调用: procedure,无返回值 函数调用: function 程序:二进制执行文件 2.内核设计流派: 单内核(monolithic kernel): Linux 把所有功能集成于同一个程序 微内核(micro

Android4.0 input事件输入流程详解(中间层到应用层)

在Android系统中,类似于键盘按键.触摸屏等事件是由WindowManagerService服务来管理的,然后再以消息的形式来分发给应用程序进行处理.系统启动时,窗口管理服务也会启动,该服务启动过程中,会通过系统输入管理器InputManager来负责监控键盘消息.当某一个Activity激活时,会在该Service下注册一个接收消息的通道,表明可以处理具体的消息,然后当有消息时,InputManager就会分发给当前处于激活状态下的Activity进行处理. InputManager的启动

ssl协议工作流程详解

SSL 协议 (HTTPS) 握手.工作流程详解 (双向 HTTPS 流程 )SSL 协议的工作流程:服务器认证阶段: 1)客户端向服务器发送一个开始信息"Hello"以便开始一个新的会话连接; 2)服务器根据客户的信息确定是否需要生成新的主密钥,如需要则服务器在响应客户的"Hello"信息时将包含生成主密钥所需的信息; 3)客户根据收到的服务器响应信息,产生一个主密钥,并用服务器的公开密钥加密后传给服务器; 4)服务器恢复该主密钥,并返回给客户一个用主密钥认证的信

[nRF51822] 5、 霸屏了——详解nRF51 SDK中的GPIOTE(从GPIO电平变化到产生中断事件的流程详解)

:由于在大多数情况下GPIO的状态变化都会触发应用程序执行一些动作.为了方便nRF51官方把该流程封装成了GPIOTE,全称:The GPIO Tasks and Events (GPIOTE) . 从GPIO电平变化到产生中断事件的流程详解  1.GPIOTE概览 nRF51上面有32个GPIO,由于在大多数情况下GPIO的状态变化都会触发应用程序执行一些动作.为了方便nRF51官方把该流程封装成了GPIOTE,全称:The GPIO Tasks and Events (GPIOTE) .GP

linux中断流程详解

异常体系比较复杂,但是linux已经准备了很多的函数和框架,但是因为中断是和具体的开发板相关,所以中断需要我们自己来处理一些方面,但是这也是很少的一部分,很多公用的处理函数内核已经实现,linux内核搭建了一个非常容易扩充的中断处理体系. 中 断系统结构涉及的方面很多,而且分布在很多的函数中,这里我主要理清一些结构和流程顺序已经在哪些函数中实现,我不知道其他人怎么样?但是我自己一开始怎 是找不到linux内核是怎么把GPIO设置成中断的,我找了很久都找不到,还有我们很多的设置,初始化等等东西好像

CentOS开机流程详解

CentOS开机流程详解 一.linux开机流程: BIOS:(Basic Input Output System)基本输入输出系统,它是一组固化到计算机内主板上一个ROM芯片 上的程序,保存着计算机最重要的基本输入输出的程序.开机后自检程序和系统自启动程序,可从CMOS中读写系统设置的具体信息. MBR:Master Boot Record,主要引导记录区. Boot Loader:启动引导程序. 二.详细流程 第一步:加载BIOS 打开计算机电源,计算机硬件会自动加载BIOS,读取BIOS内

Linux启动流程详解【转载】

在BIOS阶段,计算机的行为基本上被写死了,可以做的事情并不多:一般就是通电.BIOS.主引导记录.操作系统这四步.所以我们一般认为加载内核是linux启动流程的第一步. 第一步.加载内核 操作系统接管硬件以后,首先读入 /boot 目录下的内核文件. 我们查看一下,/boot 目录下面大概是这样一些文件: 第二步.启动初始化进程 内核文件加载以后,就开始运行第一个程序 /sbin/init,它的作用是初始化系统环境. 由于init是第一个运行的程序,它的进程编号(pid)就是1.其他所有进程都