Spring cloud stream【消息分区】

??在上篇文章中我们给大家介绍了Stream的消息分组,可以实现消息的重复消费的问题,但在某些场景下分组还不能满足我们的需求,比如,同时有多条同一个用户的数据,发送过来,我们需要根据用户统计,但是消息被分散到了不同的集群节点上了,这时我们就可以考虑消息分区了。
??当生产者将消息数据发送给多个消费者实例时,保证同一消息数据始终是由同一个消费者实例接收和处理。

Stream 消息分区

创建项目

??将我们上篇文章中的分组的三个项目,拷贝一份修改名称及服务名称

没有分区的情况下演示

发送多条消息查看效果

@RunWith(SpringRunner.class)
@SpringBootTest(classes=StreamSenderStart.class)
public class StreamTest {

    @Autowired
    private ISendeService sendService;

    @Test
    public void testStream(){
        Product p = new Product(999, "stream test ...999");
        // 将需要发送的消息封装为Message对象
        Message message = MessageBuilder
                                .withPayload(p)
                                .build();
        for (int i = 0; i < 10; i++) {
            // 发送多条消息到队列中
            sendService.send().send(message );
        }

    }
}

10条消息被随机的分散到了两个消费者中:


我们可以看到A中6条消息,B中4条消息,而且这是随机的,下次执行的结果肯定不一样。

分区

1.发送者中配置

spring.application.name=stream-partition-sender
server.port=9060
#设置服务注册中心地址,指向另一个注册中心
eureka.client.serviceUrl.defaultZone=http://dpb:[email protected]:8761/eureka/,http://dpb:[email protected]:8761/eureka/

#rebbitmq 链接信息
spring.rabbitmq.host=192.168.88.150
spring.rabbitmq.port=5672
spring.rabbitmq.username=dpb
spring.rabbitmq.password=123
spring.rabbitmq.virtualHost=/

# 对应 MQ 是 exchange  outputProduct自定义的信息
spring.cloud.stream.bindings.outputProduct.destination=exchangeProduct

#通过该参数指定了分区键的表达式规则
spring.cloud.stream.bindings.outputProduct.producer.partitionKeyExpression=payload
#指定了消息分区的数量。
spring.cloud.stream.bindings.outputProduct.producer.partitionCount=2

2.消费者中配置

服务A

spring.application.name=stream-partition-receiverA
server.port=9070
#设置服务注册中心地址,指向另一个注册中心
eureka.client.serviceUrl.defaultZone=http://dpb:[email protected]:8761/eureka/,http://dpb:[email protected]:8761/eureka/

#rebbitmq 链接信息
spring.rabbitmq.host=192.168.88.150
spring.rabbitmq.port=5672
spring.rabbitmq.username=dpb
spring.rabbitmq.password=123
spring.rabbitmq.virtualHost=/

# 对应 MQ 是 exchange  和消息发送者的 交换器是同一个
spring.cloud.stream.bindings.inputProduct.destination=exchangeProduct
# 具体分组 对应 MQ 是 队列名称 并且持久化队列  inputProduct 自定义
spring.cloud.stream.bindings.inputProduct.group=groupProduct999

#开启消费者分区功能
spring.cloud.stream.bindings.inputProduct.consumer.partitioned=true
#指定了当前消费者的总实例数量
spring.cloud.stream.instanceCount=2
#设置当前实例的索引号,从 0 开始
spring.cloud.stream.instanceIndex=0

服务B

spring.application.name=stream-partition-receiverB
server.port=9071
#设置服务注册中心地址,指向另一个注册中心
eureka.client.serviceUrl.defaultZone=http://dpb:[email protected]:8761/eureka/,http://dpb:[email protected]:8761/eureka/

#rebbitmq 链接信息
spring.rabbitmq.host=192.168.88.150
spring.rabbitmq.port=5672
spring.rabbitmq.username=dpb
spring.rabbitmq.password=123
spring.rabbitmq.virtualHost=/

# 对应 MQ 是 exchange  和消息发送者的 交换器是同一个
spring.cloud.stream.bindings.inputProduct.destination=exchangeProduct
# 具体分组 对应 MQ 是 队列名称 并且持久化队列  inputProduct 自定义
spring.cloud.stream.bindings.inputProduct.group=groupProduct999

#开启消费者分区功能
spring.cloud.stream.bindings.inputProduct.consumer.partitioned=true
#指定了当前消费者的总实例数量
spring.cloud.stream.instanceCount=2
#设置当前实例的索引号,从 1 开始
spring.cloud.stream.instanceIndex=1

启动服务测试

10个消息都被消费者A给消费了,说明到达了我们需要的效果。
案例源码:https://github.com/q279583842q/springcloud-e-book

原文地址:https://www.cnblogs.com/dengpengbo/p/11104841.html

时间: 2024-10-04 08:01:30

Spring cloud stream【消息分区】的相关文章

第十章 消息驱动的微服务: Spring Cloud Stream

Spring Cloud Stream 是一个用来为微服务应用构建消息驱动能力的框架. 它可以基于Spring Boot 来创建独立的. 可用于生产的 Spring 应用程序. 它通过使用 Spring Integration 来连接消息代理中间件以实现消息事件驱动. Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,并且引入了发布-订阅. 消费组以及分区这三个核心概念. 简单地说, Spring Cloud Stream 本质上就是整合了 Spr

Spring Cloud Stream如何消费自己生产的消息

在上一篇<Spring Cloud Stream如何处理消息重复消费>中,我们通过消费组的配置解决了多实例部署情况下消息重复消费这一入门时的常见问题.本文将继续说说在另外一个被经常问到的问题:如果微服务生产的消息自己也想要消费一份,应该如何实现呢? 常见错误 在放出标准答案前,先放出一个常见的错误姿势和告警信息(以便您可以通过搜索引擎找到这里^_^).以下错误基于Spring Boot 2.0.5.Spring Cloud Finchley SR1. 首先,根据入门示例,为了生产和消费消息,需

(十七)JAVA springcloud ssm b2b2c多用户商城系统-消息驱动 Spring Cloud Stream

在使用spring cloud云架构的时候,我们不得不使用Spring cloud Stream,因为消息中间件的使用在项目中无处不在,我们公司后面做了娱乐方面的APP,在使用spring cloud做架构的时候,其中消息的异步通知,业务的异步处理都需要使用消息中间件机制.spring cloud的官方给出的集成建议(使用rabbit mq和kafka),我看了一下源码和配置,只要把rabbit mq集成,kafka只是换了一个pom配置jar包而已,闲话少说,我们就直接进入配置实施: 1. 简

Spring Cloud Stream同一通道根据消息内容分发不同的消费逻辑

应用场景 有的时候,我们对于同一通道中的消息处理,会通过判断头信息或者消息内容来做一些差异化处理,比如:可能在消息头信息中带入消息版本号,然后通过if判断来执行不同的处理逻辑,其代码结构可能是这样的: @StreamListener(value = TestTopic.INPUT) public void receiveV1(String payload, @Header("version") String version) { if("1.0".equals(ve

Spring Cloud Stream在同一通道根据消息内容分发不同的消费逻辑

应用场景 有的时候,我们对于同一通道中的消息处理,会通过判断头信息或者消息内容来做一些差异化处理,比如:可能在消息头信息中带入消息版本号,然后通过if判断来执行不同的处理逻辑,其代码结构可能是这样的: @StreamListener(value = TestTopic.INPUT) public void receiveV1(String payload, @Header("version") String version) { if("1.0".equals(ve

Spring cloud stream【消息分组】

??上篇文章我们简单的介绍了stream的使用,发现使用还是蛮方便的,但是在上个案例中,如果有多个消息接收者,那么消息生产者发送的消息会被多个消费者都接收到,这种情况在某些实际场景下是有很大问题的,比如在如下场景中,订单系统我们做集群部署,都会从RabbitMQ中获取订单信息,那如果一个订单同时被两个服务获取到,那么就会造成数据错误,我们得避免这种情况.这时我们就可以使用Stream中的消息分组来解决了! Stream消息分组 ??消息分组的作用我们已经介绍了.注意在Stream中处于同一个gr

Spring Cloud Stream实现消息过滤的三种主要方式

消息过滤 消息过滤是指消费者不希望消费topic里的所有消息,而是只消费部分特定的消息.从topic中挑选出这些特定的消息,就是所谓的消息过滤.通过消息过滤可以实现消息的分流处理,例如生产者生产的消息,header可能都是不尽相同的,这样我们就可以编写两个或多个消费者,对不同header的消息进行针对性的处理. Spring Cloud Stream实现消息过滤的方式主要有三种,一是使用@StreamListener注解的condition属性指定条件表达式,二是在消息的header中设置TAG

Spring cloud stream【入门介绍】

案例代码:https://github.com/q279583842q/springcloud-e-book ??在实际开发过程中,服务与服务之间通信经常会使用到消息中间件,而以往使用了哪个中间件比如RabbitMQ,那么该中间件和系统的耦合性就会非常高,如果我们要替换为Kafka那么变动会比较大,这时我们可以使用SpringCloudStream来整合我们的消息中间件,来降低系统和中间件的耦合性. 一.什么是SpringCloudStream ??官方定义 Spring Cloud Strea

Spring Cloud Stream总结

概念 1.group: 组内只有1个实例消费.如果不设置group,则stream会自动为每个实例创建匿名且独立的group--于是每个实例都会消费 组内单次只有1个实例消费,并且会轮询负载均衡.通常,在将应用程序绑定到给定目标时,最好始终指定consumer group 2.destination binder: 与外部消息系统通信的组件,为构造 Binding提供了 2 个方法,分别是 bindConsumer 和 bindProducer ,它们分别用于构造生产者和消费者.Binder使S