(十七)JAVA springcloud ssm b2b2c多用户商城系统-消息驱动 Spring Cloud Stream

在使用spring cloud云架构的时候,我们不得不使用Spring cloud Stream,因为消息中间件的使用在项目中无处不在,我们公司后面做了娱乐方面的APP,在使用spring cloud做架构的时候,其中消息的异步通知,业务的异步处理都需要使用消息中间件机制。spring cloud的官方给出的集成建议(使用rabbit mq和kafka),我看了一下源码和配置,只要把rabbit mq集成,kafka只是换了一个pom配置jar包而已,闲话少说,我们就直接进入配置实施:

1. 简介:

Spring cloud Stream 数据流操作开发包,封装了与Redis,Rabbit、Kafka等发送接收消息。

2. 使用工具:

rabbit,具体的下载和安装细节我这里不做太多讲解,网上的实例太多了

3. 创建commonservice-mq-producer消息的发送者项目,在pom里面配置stream-rabbit的依赖

<span style="font-size: 16px;"><!-- 引入MQ消息驱动的微服务包,引入stream只需要进行配置化即可,是对rabbit、kafka很好的封装 -->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency></span>  

4. 在yml文件里面配置rabbit mq

<span style="font-size: 16px;">server:
  port: 5666
spring:
  application:
    name: commonservice-mq-producer
  profiles:
    active: dev
  cloud:
    config:
      discovery:
        enabled: true
        service-id: commonservice-config-server
  <span style="color: #ff0000;"># rabbitmq和kafka都有相关配置的默认值,如果修改,可以再次进行配置
    stream:
      bindings:
        mqScoreOutput:
          destination: honghu_exchange
          contentType: application/json  

  rabbitmq:
     host: localhost
     port: 5672
     username: honghu
     password: honghu</span>
eureka:
  client:
    service-url:
      defaultZone: http://honghu:[email protected]:8761/eureka
  instance:
    prefer-ip-address: true</span>  

5. 定义接口ProducerService

<span style="font-size: 16px;">package com.honghu.cloud.producer;  

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.SubscribableChannel;  

public interface ProducerService {  

    String SCORE_OUPUT = "mqScoreOutput";  

    @Output(ProducerService.SCORE_OUPUT)
    SubscribableChannel sendMessage();
}</span>  

6. 定义绑定

<span style="font-size: 16px;">package com.honghu.cloud.producer;  

import org.springframework.cloud.stream.annotation.EnableBinding;  

@EnableBinding(ProducerService.class)
public class SendServerConfig {  

}</span>  

7. 定义发送消息业务ProducerController

<span style="font-size: 16px;">package com.honghu.cloud.controller;  

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;  

import com.honghu.cloud.common.code.ResponseCode;
import com.honghu.cloud.common.code.ResponseVO;
import com.honghu.cloud.entity.User;
import com.honghu.cloud.producer.ProducerService;  

import net.sf.json.JSONObject;  

@RestController
@RequestMapping(value = "producer")
public class ProducerController {  

    @Autowired
    private ProducerService producerService;  

    /**
     * 通过get方式发送</span>对象<span style="font-size: 16px;">
     * @param name 路径参数
     * @return 成功|失败
     */
    @RequestMapping(value = "/sendObj", method = RequestMethod.GET)
    public ResponseVO sendObj() {
        User user = new User(1, "hello User");
        <span style="color: #ff0000;">Message<User> msg = MessageBuilder.withPayload(user).build();</span>
        boolean result = producerService.sendMessage().send(msg);
        if(result){
            return ResponseCode.buildEnumResponseVO(ResponseCode.RESPONSE_CODE_SUCCESS, false);
        }
        return ResponseCode.buildEnumResponseVO(ResponseCode.RESPONSE_CODE_FAILURE, false);
    }  

    /**
     * 通过get方式发送字符串消息
     * @param name 路径参数
     * @return 成功|失败
     */
    @RequestMapping(value = "/send/{name}", method = RequestMethod.GET)
    public ResponseVO send(@PathVariable(value = "name", required = true) String name) {
        Message msg = MessageBuilder.withPayload(name.getBytes()).build();
        boolean result = producerService.sendMessage().send(msg);
        if(result){
            return ResponseCode.buildEnumResponseVO(ResponseCode.RESPONSE_CODE_SUCCESS, false);
        }
        return ResponseCode.buildEnumResponseVO(ResponseCode.RESPONSE_CODE_FAILURE, false);
    }  

    /**
     * 通过post方式发送</span>json对象<span style="font-size: 16px;">
     * @param name 路径参数
     * @return 成功|失败
     */
    @RequestMapping(value = "/sendJsonObj", method = RequestMethod.POST)
    public ResponseVO sendJsonObj(@RequestBody JSONObject jsonObj) {
        Message<JSONObject> msg = MessageBuilder.withPayload(jsonObj).build();
        boolean result = producerService.sendMessage().send(msg);
        if(result){
            return ResponseCode.buildEnumResponseVO(ResponseCode.RESPONSE_CODE_SUCCESS, false);
        }
        return ResponseCode.buildEnumResponseVO(ResponseCode.RESPONSE_CODE_FAILURE, false);
    }
}
</span>  

8. 创建commonservice-mq-consumer1消息的消费者项目,在pom里面配置stream-rabbit的依赖

<!-- 引入MQ消息驱动的微服务包,引入stream只需要进行配置化即可,是对rabbit、kafka很好的封装 -->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>  

9. 在yml文件中配置:

server:
  port: 5111
spring:
  application:
    name: commonservice-mq-consumer1
  profiles:
    active: dev
  cloud:
    config:
      discovery:
        enabled: true
        service-id: commonservice-config-server  

    <span style="color: #ff0000;">stream:
      bindings:
        mqScoreInput:
          group: honghu_queue
          destination: honghu_exchange
          contentType: application/json  

  rabbitmq:
     host: localhost
     port: 5672
     username: honghu
     password: honghu</span>
eureka:
  client:
    service-url:
      defaultZone: http://honghu:[email protected]:8761/eureka
  instance:
    prefer-ip-address: true  

10. 定义接口ConsumerService

package com.honghu.cloud.consumer;  

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;  

public interface ConsumerService {  

    <span style="color: #ff0000;">String SCORE_INPUT = "mqScoreInput";  

    @Input(ConsumerService.SCORE_INPUT)
    SubscribableChannel sendMessage();</span>  

}  

11. 定义启动类和消息消费

package com.honghu.cloud;  

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;  

import com.honghu.cloud.consumer.ConsumerService;
import com.honghu.cloud.entity.User;  

@EnableEurekaClient
@SpringBootApplication
@EnableBinding(ConsumerService.class) //可以绑定多个接口
public class ConsumerApplication {  

    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }  

    <span style="color: #ff0000;">@StreamListener(ConsumerService.SCORE_INPUT)
    public void onMessage(Object obj) {
        System.out.println("消费者1,接收到的消息:" + obj);
    }</span>  

}  

12. 分别启动commonservice-mq-producer、commonservice-mq-consumer1

13. 通过postman来验证消息的发送和接收


 

 

 

 

 

可以看到接收到了消息,下一章我们介绍mq的集群方案。

到此,整个消息中心方案集成完毕

欢迎大家和我一起学习spring cloud构建微服务云架构,我这边会将近期研发的spring cloud微服务云架构的搭建过程和精髓记录下来,帮助更多有兴趣研发spring cloud框架的朋友,大家来一起探讨spring cloud架构的搭建过程及如何运用于企业项目。
需要JAVA Spring Cloud大型企业分布式微服务云构建的B2B2C电子商务平台源码请加企鹅求求: 贰一四七七七五六叁叁    完整项目的源码来源

原文地址:https://www.cnblogs.com/sccoming/p/10141634.html

时间: 2024-10-09 00:14:22

(十七)JAVA springcloud ssm b2b2c多用户商城系统-消息驱动 Spring Cloud Stream的相关文章

(十六)JAVA springcloud ssm b2b2c多用户商城系统-使用spring cloud Bus刷新配置

我们使用spring cloud分布式微服务云架构做了b2b2c的电子商务系统,除了架构本身自带的系统服务外,我们将b2b2c的业务服务进行了细粒度拆分,做成了不同的业务微服务. 当我们的业务系统越来越庞大复杂的时候,各种配置也会随之增多.配置文件只要一修改,会对commonservice-config配置中心先停止服务,然后再重新启动,最后使配置生效. 如果服务少,我们可以手动方式来启动,但是对业务和系统的稳定性肯定有一定的影响. 如果是成百上千的服务都靠手动操作,我估计运维人员或技术人员会疯

JAVA springcloud ssm b2b2c多用户商城系统源码-docker-feign-hystrix(六)

简介 上一节我们讨论feign的配置,这节我们讨论一下,feign+hystrix调用生产者时,进行容错处理 一.创建模块(microservice-consumer-movie-feign-with-hystrix) 二.pom.xml文件 <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmln

(三)JAVA springcloud ssm b2b2c多用户商城系统:服务提供与调用

上一篇文章我们介绍了eureka服务注册中心的搭建,这篇文章介绍一下如何使用eureka服务注册中心,搭建一个简单的服务端注册服务,客户端去调用服务使用的案例. 案例中有三个角色:服务注册中心.服务提供者.服务消费者,其中服务注册中心就是我们上一篇的eureka单机版启动既可,流程是首先启动注册中心,服务提供者生产服务并注册到服务中心中,消费者从服务中心中获取服务并执行. 服务提供 我们假设服务提供者有一个hello方法,可以根据传入的参数,提供输出"hello xxx,this is firs

JAVA springboot ssm b2b2c多用户商城系统源码(六) 分布式配置中心(Spring Cloud Config)

一.简介 在分布式系统中,由于服务数量巨多,为了方便服务配置文件统一管理,实时更新,所以需要分布式配置中心组件.在Spring Cloud中,有分布式配置中心组件spring cloud config ,它支持配置服务放在配置服务的内存中(即本地),也支持放在远程Git仓库中.在spring cloud config 组件中,分两个角色,一是config server,二是config client. 二.构建Config Server 创建一个spring-boot项目,取名为config-s

SpringBoot b2b2c 多用户商城系统-docker-feign配置(五)

简介 上一节我们讨论了怎么用feign声明式调用cloud的生产者,这节我们讨论一下feign配置,通过编写配置类,我们可以自定义feign的日志级别,日志扫描目录,可以通过feign调用服务在eureka上的调用信息 feign声明接口之后,在代码中通过@Resource或者@Autowired注入之后即可使用. @FeignClient标签的常用属性如下: name:指定FeignClient的名称,如果项目使用了Ribbon,name属性会作为微服务的名称,用于服务发现 url: url一

SpringBoot b2b2c 多用户商城系统 (十五)Springboot整合RabbitMQ

这篇文章带你了解怎么整合RabbitMQ服务器,并且通过它怎么去发送和接收消息.我将构建一个springboot工程,通过RabbitTemplate去通过MessageListenerAdapter去订阅一个POJO类型的消息. 准备工作 15min IDEA maven 3.0 在开始构建项目之前,机器需要安装rabbitmq,你可以去官网下载,http://www.rabbitmq.com/download.html ,如果你是用的Mac,你可以这样下载: brew install rab

SpringBoot b2b2c 多用户商城系统 (十六)用restTemplate消费服务

构架工程 创建一个springboot工程,去消费RESTFUL的服务.这个服务是 http:///gturnquist-quoters.cfapps.io/api/random ,它会随机返回Json字符串. 在Spring项目中,它提供了一个非常简便的类,叫RestTemplate,它可以很简便的消费服务. 消费服务 通过RestTemplate消费服务,需要先context中注册一个RestTemplate bean.代码如下: @Bean public RestTemplate rest

Java B2B2C多用户商城 springboot架构-config-bus(十三)

简介 当我们的业务系统越来越庞大复杂的时候,各种配置就会层出不群.一旦配置修改了,那么我们就是必须修改后停服务,然后再上线,如果服务少,我们可以手动来操作,如果是成千上百的服务,如果是手动操作,肯定就不合适宜了,然后SpringCloudConfig就出来了,就是我们通常意义上的配置中心,把应用原本放在本地文件的配置抽取出来放在中心服务器,从而能够提供更好的管理.发布能力.Java B2B2C多用户商城 springboot架构-config-bus(十三) SpringCloudConfig分

(十二)Java B2B2C多用户商城 springboot架构-SSO单点登录之OAuth2.0

上一篇我根据框架中OAuth2.0的使用总结,画了一个根据用户名+密码实现OAuth2.0的登录认证的流程图,今天我们看一下logout的流程: /** * 用户注销 * @param accessToken * @return */ @RequestMapping(value = "/user/logout", method = RequestMethod.POST) public ResponseVO userLogout(@RequestHeader(value = "