消息驱动的微服务 - Spring Cloud Alibaba RocketMQ

引入MQ后的架构演进


MQ的选择

消息队列对比参照表:

RocketMQ vs. ActiveMQ vs. Kafka:

参考至:


CentOS7上搭建RocketMQ

环境要求:

  • CentOS 7.2
  • 64位JDK1.8+
  • 4G+的可用磁盘空间

1、下载RocketMQ的二进制包,我这里使用的是4.5.1版本,下载地址如下:

http://rocketmq.apache.org/release_notes/release-notes-4.5.1/

使用wget命令下载:

[[email protected] ~]# cd /usr/local/src
[[email protected] /usr/local/src]# wget http://mirror.bit.edu.cn/apache/rocketmq/4.5.1/rocketmq-all-4.5.1-bin-release.zip

2、解压下载好的压缩包,并移动到合适的目录下:

[[email protected] /usr/local/src]# unzip rocketmq-all-4.5.1-bin-release.zip
[[email protected] /usr/local/src]# mv rocketmq-all-4.5.1-bin-release /usr/local/rocketmq-4.5.1

注:若没有安装unzip命令则使用如下命令安装:
yum install -y unzip

3、进入rocketmq的根目录并查看是否包含如下目录及文件:

[[email protected] /usr/local/src]# cd /usr/local/rocketmq-4.5.1
[[email protected] /usr/local/rocketmq-4.5.1]# ls
benchmark  bin  conf  lib  LICENSE  NOTICE  README.md

4、没问题后,使用如下命令启动Name Server:

[[email protected] /usr/local/rocketmq-4.5.1]# nohup sh bin/mqnamesrv &
[1] 2448
[[email protected] /usr/local/rocketmq-4.5.1]# 

5、查看默认的9876端口是否被监听,以验证Name Server是否启动成功:

[[email protected] /usr/local/rocketmq-4.5.1]# netstat -lntp |grep java
tcp6       0      0 :::9876                 :::*                    LISTEN      2454/java
[[email protected] /usr/local/rocketmq-4.5.1]#

6、启动Broker:

[[email protected] /usr/local/rocketmq-4.5.1]# nohup sh bin/mqbroker -n localhost:9876 &
[2] 2485
[[email protected] /usr/local/rocketmq-4.5.1]# 

7、验证Broker是否启动成功,如果启动成功,能看到类似如下的日志::

[[email protected] /usr/local/rocketmq-4.5.1]# cat ~/logs/rocketmqlogs/broker.log |grep "boot success"
2019-08-04 01:27:38 INFO main - The broker[study-01, 192.168.190.129:10911] boot success. serializeType=JSON and name server is localhost:9876
[[email protected] /usr/local/rocketmq-4.5.1]# 

若想停止Name Server和Broker,则依次执行以下两条命令即可:

[[email protected] /usr/local/rocketmq-4.5.1]# sh bin/mqshutdown broker
The mqbroker(2492) is running...
Send shutdown request to mqbroker(2492) OK  # 输出该信息说明停止成功
[[email protected] /usr/local/rocketmq-4.5.1]# sh bin/mqshutdown namesrv
The mqnamesrv(2454) is running...
Send shutdown request to mqnamesrv(2454) OK  # 输出该信息说明停止成功
[2]+  退出 143              nohup sh bin/mqbroker -n localhost:9876
[[email protected] /usr/local/rocketmq-4.5.1]#

验证RocketMQ功能是否正常

1、验证生产消息正常,执行如下命令:

[[email protected] /usr/local/rocketmq-4.5.1]# export NAMESRV_ADDR=localhost:9876
[[email protected] /usr/local/rocketmq-4.5.1]# sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

正常的情况下,会看到一堆的类似于如下的输出,这是生产消息后成功的result:

SendResult [sendStatus=SEND_OK, msgId=C0A8BE810A690D7163610FCC253B03E7, offsetMsgId=C0A8BE8100002A9F000000000002BDFE, messageQueue=MessageQueue [topic=TopicTest, brokerName=study-01, queueId=3], queueOffset=249]

2、验证消费消息正常,执行如下命令:

[[email protected] /usr/local/rocketmq-4.5.1]# sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

正常的情况下,会看到一堆的类似于如下的输出,这是消费的消息内容:

ConsumeMessageThread_6 Receive New Messages: [MessageExt [queueId=3, storeSize=180, queueOffset=242, sysFlag=0, bornTimestamp=1564853837073, bornHost=/192.168.190.129:34708, storeTimestamp=1564853837074, storeHost=/192.168.190.129:10911, msgId=C0A8BE8100002A9F000000000002AA4E, commitLogOffset=174670, bodyCRC=911284903, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘TopicTest‘, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1564854006859, UNIQ_KEY=C0A8BE810A690D7163610FCC251103CB, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 55, 49], transactionId=‘null‘}]]

搭建RocketMQ控制台

RocketMQ官方提供了一个基于Spring Boot开发的可视化控制台,可以方便我们查看RocketMQ的运行情况以及提升运维效率。所以本小节将介绍一下如何搭建搭建RocketMQ的控制台,由于我们使用的RocketMQ版本是4.5.1,所以需要对控制台的源码进行一些改动以适配RocketMQ的4.5.1版本。

1、首先需要下载源码,有两种方式,一是使用git克隆代码仓库,二是直接下载rocketmq-externals的zip包,我这里使用git方式,执行如下命令:

git clone https://github.com/apache/rocketmq-externals.git

2、修改控制台代码,使用IDE打开rocketmq-console项目,如下图所示:

2.1、修改项目中的application.properties配置文件,我这里主要是修改了监听端口和Name Server的连接地址,至于其他配置项有需要的话可按照说明自行修改:

# console的监听端口,默认是8080
server.port=8011
# Name Server的连接地址;非必须,可以在启动了console后,在控制台导航栏 - 运维 - NameSvrAddrList一栏设置
rocketmq.config.namesrvAddr=192.168.190.129:9876

2.2、修改依赖,由于console项目默认使用的rocketmq版本是4.4.0,与我们这里使用的是4.5.1不完全兼容,所以需要修改一下依赖版本,找到这一行:

<rocketmq.version>4.4.0</rocketmq.version>

修改为:

<rocketmq.version>4.5.1</rocketmq.version>

2.3、修改代码,由于修改了rocketmq的版本,会导致org.apache.rocketmq.console.service.impl.MessageServiceImpl#queryMessageByTopic方法编译报错,所以需要改动一下此处代码 ,将:

@Override
public List<MessageView> queryMessageByTopic(String topic, final long begin, final long end) {
    DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, null);
    ...

修改为:

@Override
public List<MessageView> queryMessageByTopic(String topic, final long begin, final long end) {
    RPCHook rpcHook = null;
    DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, rpcHook);
    ...

3、打包构建并启动,打开idea的terminal,执行如下命令:

# 在rocketmq-console目录下执行
mvn clean package -DskipTests

# 进入jar包存放目录
cd target

# 启动rocketmq console
java -jar rocketmq-console-ng-1.0.1.jar

4、使用浏览器访问控制台,我这里由于修改了端口,所以访问地址是:http://localhost:8011,正常的情况下能看到如下界面:

不习惯英文的话可以在右上角切换语言:

由于控制台是可视化界面并且支持中文,这里就不过多介绍了,可以参考官方的控制台使用说明文档:


RocketMQ术语与概念

我这里将基本的术语与概念简单总结成了思维导图:

官方文档:


Spring消息编程模型 - 编写生产者

在以上小节搭建完RocketMQ之后,我们来使用Spring的消息编程模型,编写一个简单的示例。首先需要在项目中添加相关依赖如下:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.0.3</version>
</dependency>

在配置文件中添加rocketmq相关的配置如下:

rocketmq:
  name-server: 192.168.190.129:9876
  producer:
    # 小坑:必须指定group
    group: test-group

编写生产者的代码,这里以Controller做示例,具体代码如下:

package com.zj.node.contentcenter.controller.content;

import lombok.Data;
import lombok.RequiredArgsConstructor;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;

/**
 * 生产者
 *
 * @author 01
 * @date 2019-08-03
 **/
@RestController
@RequiredArgsConstructor
public class TestProducerController {

    /**
     * 用于发送消息到 RocketMQ 的api
     */
    private final RocketMQTemplate rocketMQTemplate;

    @GetMapping("/test-rocketmq/sendMsg")
    public String testSendMsg() {
        String topic = "test-topic";
        // 发送消息
        rocketMQTemplate.convertAndSend(topic, Message.getInstance());

        return "send message success";
    }
}

@Data
class Message {
    private Integer id;
    private String name;
    private String status;
    private Date createTime;

    static Message getInstance() {
        Message message = new Message();
        message.id = 1;
        message.name = "×××";
        message.status = "default";
        message.createTime = new Date();

        return message;
    }
}

编写完成后,启动项目,访问该接口:

消息发送成功后,可以到RocketMQ的控制台中进行查看:

消息体可以在消息详情中查看,如下:

从生产者的代码来看,可以说是十分的简单了,只需要使用一个RocketMQTemplate就可以实现将对象转换成消息体并发送消息。实际上除了RocketMQ外,其他的MQ也有对应的Template,如下:

  • RocketMQ:RocketMQTemplate
  • ActiveMQ/Artemis:JmsTemplate
  • RabbitMQ:AmqpTemplate
  • Kafka:KafkaTemplate

Spring消息编程模型 - 编写消费者

在消费者项目中,也需要添加rocketmq的依赖:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.0.3</version>
</dependency>

同样需要配置Name Server的连接地址:

rocketmq:
  name-server: 192.168.190.129:9876

编写消费者的代码,具体代码如下:

package com.zj.node.usercenter.rocketmq;

import com.alibaba.fastjson.JSON;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * 消费者监听器
 *
 * @author 01
 * @date 2019-08-03
 **/
@Slf4j
@Component
// topic需要和生产者的topic一致,consumerGroup属性是必须指定的,内容可以随意
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "consumer-group")
public class TestConsumerListener implements RocketMQListener<Message> {

    /**
     * 监听到消息的时候就会调用该方法
     *
     * @param message 消息体
     */
    @Override
    public void onMessage(Message message) {
        log.info("从test-topic中监听到消息");
        log.info(JSON.toJSONString(message));
    }
}

/**
 * 消息体结构需要一致
 */
@Data
class Message {
    private Integer id;
    private String name;
    private String status;
    private Date createTime;
}

编写完成后启动项目,由于之前我们已经往队列里发送了消息,所以此时消费者项目一启动,就可以监听到消息并消费,控制台就会输出如下日志:

原文地址:https://blog.51cto.com/zero01/2426303

时间: 2024-10-05 11:57:01

消息驱动的微服务 - Spring Cloud Alibaba RocketMQ的相关文章

第十章 消息驱动的微服务: Spring Cloud Stream

Spring Cloud Stream 是一个用来为微服务应用构建消息驱动能力的框架. 它可以基于Spring Boot 来创建独立的. 可用于生产的 Spring 应用程序. 它通过使用 Spring Integration 来连接消息代理中间件以实现消息事件驱动. Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,并且引入了发布-订阅. 消费组以及分区这三个核心概念. 简单地说, Spring Cloud Stream 本质上就是整合了 Spr

Spring Cloud构建微服务架构 消息驱动的微服务(消费分区)【Dalston版】

通过上一篇<消息驱动的微服务(消费组)>的学习,我们已经能够在多实例环境下,保证同一消息只被一个消费者实例进行接收和处理.但是,对于一些特殊场景,除了要保证单一实例消费之外,还希望那些具备相同特征的消息都能够被同一个实例进行消费.这时候我们就需要对消息进行分区处理. 使用消息分区 在Spring Cloud Stream中实现消息分区非常简单,我们可以根据消费组示例做一些配置修改就能实现,具体如下: 在消费者应用SinkReceiver中,我们对配置文件做一些修改,具体如下: spring.c

微服务Spring Cloud 入门

什么是微服务? 微服务就是把原本臃肿的一个项目的所有模块拆分开来并做到互相没有关联,甚至可以不使用同一个数据库.   比如:项目里面有User模块和Power模块,但是User模块和Power模块并没有直接关系,仅仅只是一些数据需要交互,那么就可以吧这2个模块单独分开来,当user需要调用power的时候,power是一个服务方,但是power需要调用user的时候,user又是服务方了, 所以,他们并不在乎谁是服务方谁是调用方,他们都是2个独立的服务,这时候,微服务的概念就出来了. 微服务和分

Spring Cloud Alibaba微服务从入门到进阶 完整版

第1章 课程介绍课程的总体介绍,课程需要的环境搭建和一些常用的快捷键介绍. 第2章 Spring Boot基础前期先带着学习Spring Boot基础,创建Spring Boot项目,讲解Spring Boot的配置,是学习Spring Cloud Alibaba的必知必会. 第3章 微服务的拆分与编写这一章讲解的微服务的概念,使用场景,建模,架构通览,讲师带着拆分微服务并且一步步分析,编写一些基础的微服务功能 第4章 Spring Cloud Alibaba介绍学习Spring Cloud A

Spring Cloud Alibaba 新一代微服务解决方案

本篇是「跟我学 Spring Cloud Alibaba」系列的第一篇, 每期文章会在公众号「架构进化论」进行首发更新,欢迎关注. 1.Spring Cloud Alibaba 是什么 Spring Cloud Alibaba 是阿里巴巴提供的微服务开发一站式解决方案,是阿里巴巴开源中间件与 Spring Cloud 体系的融合. 马老师左手双十一,右手阿里开源组件,不仅占据了程序员的购物车,还要攻占大家的开发工具. 先说说 Spring Cloud 提起微服务,不得不提 Spring Clou

spring cloud alibaba 简介

Spring Cloud Alibaba 官方github地址 Spring Cloud Alibaba 致力于提供微服务开发的一站式解决方案.此项目包含开发分布式应用微服务的必需组件,方便开发者通过 Spring Cloud 编程模型轻松使用这些组件来开发分布式应用服务. 主要功能 服务限流降级:默认支持 WebServlet.WebFlux, OpenFeign.RestTemplate.Spring Cloud Gateway, Zuul, Dubbo 和 RocketMQ 限流降级功能的

Spring Cloud Alibaba发布第二个版本,Spring 发来贺电

还是熟悉的面孔,还是熟悉的味道,不同的是,这次的配方升级了. 今年10月底,Spring Cloud联合创始人Spencer Gibb在Spring官网的博客页面宣布:阿里巴巴开源 Spring Cloud Alibaba,并发布了首个预览版本.随后,Spring Cloud 官方Twitter也发布了此消息.- 传送门 时隔 51天,Spencer Gibb再次在Spring官网的博客页面宣布:Spring Cloud Alibaba发布了其开源后的第二个版本0.2.1,随后,Spring C

阿里巴巴开源 Spring Cloud Alibaba,加码微服务生态建设

摘要: 本周,Spring Cloud联合创始人Spencer Gibb在Spring官网的博客页面宣布:阿里巴巴开源 Spring Cloud Alibaba,并发布了首个预览版本.随后,Spring Cloud 官方Twitter也发布了此消息. 本周,Spring Cloud联合创始人Spencer Gibb在Spring官网的博客页面宣布:阿里巴巴开源 Spring Cloud Alibaba,并发布了首个预览版本.随后,Spring Cloud 官方Twitter也发布了此消息. 大家

Spring Cloud Alibaba | 微服务分布式事务之Seata

Spring Cloud Alibaba | 微服务分布式事务之Seata 本篇实战所使用Spring有关版本: SpringBoot:2.1.7.RELEASE Spring Cloud:Greenwich.SR2 Spring CLoud Alibaba:2.1.0.RELEASE 1. 概述 在构建微服务的过程中,不管是使用什么框架.组件来构建,都绕不开一个问题,跨服务的业务操作如何保持数据一致性. 2. 什么是分布式事务? 首先,设想一个传统的单体应用,无论多少内部调用,最后终归是在同一