springboot整合kafka

参考地址:http://www.54tianzhisheng.cn/2018/01/05/SpringBoot-Kafka/

1、pom文件

<!--kafka-->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.41</version>
        </dependency>
<dependency>   <groupId>org.projectlombok</groupId>   <artifactId>lombok</artifactId></dependency>

2、配置文件

########################kafka相关配置##########################################
# 指定kafka 代理地址,可以多个
spring.kafka.bootstrap-servers=172.16.0.79:9092,172.16.0.79:9093
#=============== provider  =======================
#retries=0,时允许重试失败的发送
spring.kafka.producer.retries=0
# 每次批量发送消息的数量
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
#指定消息key和消息体的编码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

#=============== consumer  =======================
# 指定默认消费者group id
spring.kafka.consumer.group-id=test-consumer-group
#当没有初始化偏移量或者偏移量不存在时,自动重置偏移量为最开始的偏移量
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100

# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

3、消息类

@Data
public class Message {

    private Long id;

    private String msg;

    private Date sendTime;
}

4、kafka消息发送类

@Component
@Slf4j
public class KafkaSenderService {
    @Autowired
    private KafkaTemplate kafkaTemplate;

    public void sendMessage(){
        Message message = new Message();
        message.setId(System.currentTimeMillis());
        message.setMsg(UUID.randomUUID().toString());
        message.setSendTime(new Date());
        String s = JSONObject.toJSONString(message);
        log.info("+++++++++++++++++++++  message = {}", s);     //如果主题不存在,则会自动创建
        kafkaTemplate.send("test",s);
    }
}

5、消息接收

@Component
@Slf4j
public class KafkaReceiverService {

    @KafkaListener(topics="test")
    public void listen(ConsumerRecord<?,?>record){
        Optional<?> value = Optional.of(record.value());
        if (value.isPresent()){
            Object o = value.get();

            log.info("-----------record:"+record);
            log.info("-----------message:"+o);
        }
    }
}

原文地址:https://www.cnblogs.com/cq-yangzhou/p/11428927.html

时间: 2024-08-23 23:54:45

springboot整合kafka的相关文章

SpringBoot整合Kafka和Storm

前言 本篇文章主要介绍的是SpringBoot整合kafka和storm以及在这过程遇到的一些问题和解决方案. kafka和storm的相关知识 如果你对kafka和storm熟悉的话,这一段可以直接跳过!如果不熟,也可以看看我之前写的博客.一些相关博客如下. kafka 和 storm的环境安装 地址:http://www.panchengming.com/2018/01/26/pancm70/ kafka的相关使用 地址:http://www.panchengming.com/2018/01

SpringBoot(八) SpringBoot整合Kafka

window下安装kafka和zooker,超详细:https://blog.csdn.net/weixin_33446857/article/details/81982455 kafka:安装下载教程网址(CentOS Linux):https://www.cnblogs.com/subendong/p/7786547.html zooker的下载安装网址:https://blog.csdn.net/ring300/article/details/80446918 一.准备工作提前说明:如果你

SpringBoot系列八:SpringBoot整合消息服务(SpringBoot 整合 ActiveMQ、SpringBoot 整合 RabbitMQ、SpringBoot 整合 Kafka)

https://www.cnblogs.com/xuyiqing/p/10851859.html https://www.cnblogs.com/leeSmall/p/8721556.html https://www.cnblogs.com/linyufeng/p/9885645.html 原文地址:https://www.cnblogs.com/418836844qqcom/p/11540020.html

SpringBoot进阶教程(六十二)整合Kafka

在上一篇文章<Linux安装Kafka>中,已经介绍了如何在Linux安装Kafka,以及Kafka的启动/关闭和创建发话题并产生消息和消费消息.这篇文章就介绍介绍SpringBoot整合Kafka. v创建项目 若是已有的项目中添加kafka, 请直接跳至1.3 1.1 创建springboot: 1.2 选web和kafka: 1.3 已有的项目中添加kafka, pom.xml中添加依赖 <dependency> <groupId>org.springframew

SpringBoot系列十二:SpringBoot整合 Shiro

1.概念:SpringBoot 整合 Shiro 2.具体内容 Shiro 是现在最为流行的权限认证开发框架,与它起名的只有最初的 SpringSecurity(这个开发框架非常不好用,但是千万不要 以为 SpringSecurity 没有用处,它在 SpringCloud 阶段将发挥重大的作用).但是现在如果要想整合 Shiro 开发框架有一点很遗憾, SpringBoot 没有直接的配置支持,它不像整合所谓的 Kafka.Redis.DataSource,也就是说如果要想整合 Shiro 开

SpringBoot 2.SpringBoot整合Mybatis

一.创建Springboot的配置文件:application.properties SpringApplication 会从 application.properties 文件中加载配置信息,下面是添加Spring配置信息的文件目录顺序: 当前目录下的/config子目录中 当前目录中 一个 classpath 包下的 /config 目录中 classpath 根目录中 大家根据自己习惯来即可. /application.properties 文件配置如下: spring.datasourc

springboot学习笔记-6 springboot整合RabbitMQ

一 RabbitMQ的介绍 RabbitMQ是消息中间件的一种,消息中间件即分布式系统中完成消息的发送和接收的基础软件.这些软件有很多,包括ActiveMQ(apache公司的),RocketMQ(阿里巴巴公司的,现已经转让给apache). 消息中间件的工作过程可以用生产者消费者模型来表示.即,生产者不断的向消息队列发送信息,而消费者从消息队列中消费信息.具体过程如下: 从上图可看出,对于消息队列来说,生产者,消息队列,消费者是最重要的三个概念,生产者发消息到消息队列中去,消费者监听指定的消息

整合Kafka到Spark Streaming——代码示例和挑战

作者Michael G. Noll是瑞士的一位工程师和研究员,效力于Verisign,是Verisign实验室的大规模数据分析基础设施(基础Hadoop)的技术主管.本文,Michael详细的演示了如何将Kafka整合到Spark Streaming中. 期间, Michael还提到了将Kafka整合到 Spark Streaming中的一些现状,非常值得阅读,虽然有一些信息在Spark 1.2版本中已发生了一些变化,比如HA策略: 通过Spark Contributor.Spark布道者陈超我

SpringBoot整合Quartz定时任务

记录一个SpringBoot 整合 Quartz 的Demo实例 POM.XML文件 <!-- 定时器任务 quartz需要导入的坐标 --> <dependency> <groupId>org.quartz-scheduler</groupId> <artifactId>quartz</artifactId> <version>1.8.5</version> </dependency> 类似于控制