Spring Boot Kafka

1、创建集群

http://kafka.apache.org/documentation/#quickstart

有一句我觉得特别重要: For Kafka, a single broker is just a cluster of size one.

1.1、命令行操作

#解压文件
tar -zxf kafka_2.11-1.1.0.tgz
cd kafka_2.11-1.1.0

#启动Zookeerper
bin/zookeeper-server-start.sh config/zookeeper.properties

#启动Kafka
bin/kafka-server-start.sh config/server.properties &
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties

config/server-1.properties:
    broker.id=1
    listeners=PLAINTEXT://:9093
    log.dir=/tmp/kafka-logs-1

config/server-2.properties:
    broker.id=2
    listeners=PLAINTEXT://:9094
    log.dir=/tmp/kafka-logs-2

bin/kafka-server-start.sh config/server-1.properties &
bin/kafka-server-start.sh config/server-2.properties &

#创建集群
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic myTopic

#查看主题
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic myTopic

1.2、图形化界面操作

除了命令行以为,也可以通过kafka-manager查看

2、Spring Boot集成Kafka

2.1、引入Maven依赖

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

2.2、配置

spring:
  kafka:
    bootstrap-servers: 10.123.52.76:9092,10.123.52.76:9093,10.123.52.76:9094
    consumer:
      group-id: myGroup

2.3、收发消息

package com.cjs.boot.message;

import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class MyListener {

    @KafkaListener(topics = "myTopic")
    public void processMessage2(String content) {
        log.info("【Received Message From ‘myTopic‘】: {}", content);
    }

}

package com.cjs.boot.controller;

import com.cjs.boot.response.RespResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Controller;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.servlet.ModelAndView;

@Controller
@RequestMapping("/message")
public class MessageController extends BaseController {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @GetMapping("/add.html")
    public ModelAndView add() {
        return new ModelAndView("message/add");
    }

    @PostMapping("/send.json")
    @ResponseBody
    public RespResult send(String text) {
        ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send("myTopic", String.valueOf(System.currentTimeMillis()), text);
        return RespResult.success();
    }

}

2018-05-04 12:36:59.736  INFO 7552 --- [nio-8080-exec-2] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 1.0.1
2018-05-04 12:36:59.736  INFO 7552 --- [nio-8080-exec-2] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : c0518aa65f25317e2018-05-04 12:36:59.830  INFO 7552 --- [ntainer#0-0-C-1] com.cjs.boot.message.MyListener          : 【Received Message From ‘myTopic‘】: 大家好啊
2018-05-04 12:37:24.107  INFO 7552 --- [ntainer#0-0-C-1] com.cjs.boot.message.MyListener          : 【Received Message From ‘myTopic‘】: 吃饭啦

2.4、截图

原文地址:https://www.cnblogs.com/mrchenzheng/p/12165740.html

时间: 2024-07-30 22:04:33

Spring Boot Kafka的相关文章

spring boot+kafka整合(未完待续)

springboot版本是2.0.4 在整合过程中,spring boot帮我们把kafka的大部分属性直接带出来了,但是有些不常用的属性,需要通过 spring.kafka.consumer.properties.* 来设置,例如max.partition.fetch.bytes,一次fetch请求,从一个partition中取得的records最大值. 在application.properties中添加kafka扩展属性, #设置一次fetch记录的最大值2M(2*1024*1024),默

Spring Kafka和Spring Boot整合实现消息发送与消费简单案例

本文主要分享下Spring Boot和Spring Kafka如何配置整合,实现发送和接收来自Spring Kafka的消息. 先前我已经分享了Kafka的基本介绍与集群环境搭建方法.关于Kafka的介绍请阅读Apache Kafka简介与安装(一),关于Kafka安装请阅读Apache Kafka安装,关于Kafka集群环境搭建请阅读Apache Kafka集群环境搭建 .这里关于服务器环境搭建不在赘述. Spring Kafka整合Spring Boot创建生产者客户端案例 创建一个kafk

Kafka 入门和 Spring Boot 集成

Kafka 入门和 Spring Boot 集成 标签:博客 [TOC] 概述 kafka 是一个高性能的消息队列,也是一个分布式流处理平台(这里的流指的是数据流).由java 和 Scala 语言编写,最早由 LinkedIn 开发,并 2011年开源,现在由 Apache 开发维护. 应用场景 下面列举了一些kafka常见的应用场景. 消息队列 : Kafka 可以作为消息队列使用,可用于系统内异步解耦,流量削峰等场景. 应用监控:利用 Kafka 采集应用程序和服务器健康相关的指标,如应用

spring boot 整合kafka 报错 Exception thrown when sending a message with key=&#39;null&#39; and payload=JSON to topic proccess_trading_end: TimeoutException: Failed to update metadata after 60000 ms.

org.springframework.kafka.support.LoggingProducerListener- Exception thrown when sending a message with key='null' and payload='{"dataDts":["20180329","20180328","20180327","20180326","20180323"]

Kafka的安装及与Spring Boot的集成

安装JDK 下载jdk-8u202-ea-bin-b03-linux-x64-07_nov_2018.tar.gz 解压 配置 $ vi /etc/profile,在最后加入下面两行 export JAVA_HOME=/usr/local/bigdata/jdk1.8.0_202 export PATH=$JAVA_HOME/bin:$PATH 重新登录执行 java,验证JDK配置成功 安装Kafka 下载kafka_2.11-1.0.2.tgz,这里主要1.0.2这个Kafka Server

通过Spring Boot Webflux实现Reactor Kafka

在Apache Kafka简介中,我们研究了分布式流媒体平台Apache Kafka.这一次,我们将关注Reactor Kafka,这个库可以创建从Project Reactor到Kafka Topics的Reactive Streams,反之亦然. 我们将使用两个小型示例应用程序,Paymentprocessor Gateway和PaymentValidator.这些应用程序的代码可以在这里找到. Paymentprocessor网关提供了一个小网页,可以生成一个随机的信用卡号码(显然是伪造的

Spring boot 集成Kafka

搭建Kafka集群,参考: https://www.cnblogs.com/jonban/p/kafka.html 源码示例如下: 1.新建 Maven 项目 kafka 2.pom.xml <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http:/

Spring Boot 之 Kafka

Kafka是一个分布式的流平台,具有以下功能: 1.发布和订阅消息. 2.以容错的方式存储消息流. 3.实时的流处理. 主要的两大应用: 1.作为实时的流数据通道,在应用程序之间传递消息. 2.构建实时流处理,对数据流进行转换或反应. 四个核心的API: Producer API(输出向),Consumer API(输入向),Streams API(输入&&输出向),Connector API(输入||输出向) https://www.orchome.com/5 Kafka的安装: htt

kafka学习(五)Spring Boot 整合 Kafka

一.创建Spring boot 工程 创建过程不再描述,创建后的工程结构如下: POM文件中要加入几个依赖: <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:sch