RabbitMq解决分布式事物

一、RabbitMQ解决分布式事务思路:

案例: 经典案例,以目前流行点外卖的案例,用户下单后,调用订单服务,让后订单服务调用派单系统通知送外卖人员送单,这时候订单系统与派单系统采用MQ异步通讯。

二、RabbitMQ解决分布式事务原理采用最终一致性原理

需要保证以下三要素

1、确认生产者一定要将数据投递到MQ服务器中(采用MQ消息确认机制)

2、MQ消费者消息能够正确消费消息,采用手动ACK模式(注意重试幂等性问题)

3、如何保证第一个事务先执行,采用补偿机制,在创建一个补单消费者进行监听,如果订单没有创建成功,进行补单。

三、如果生产者投递消息到MQ服务器成功

场景1:如果消费者消费消息失败了,生产者是不需要回滚事务的。

解决方案:消费者采用手动ack应答模式,采用MQ进行补偿重试机制,注意MQ补偿幂等性问题。

问题:如何确保生产者投递消息到MQ服务器一定能成功?

解决方案:confirm机制(确认应答机制)。

场景2 如果生产者投递消息到MQ服务器失败,如何解决?

解决方案:使用生产者重试机制进行发消息,注意幂等性问题。

场景3  如何保证一个事务先执行,生产者投递消息到MQ服务器成功,消费者消费成功了,但是订单却回滚了。

解决方案:补单机制。

传统解决方式:

RabbitMq解决方案:

MQ解决分布式事务一致性

案例中 订单表 和 派单表必须一致!

用MQ 可以做流量削峰值

MQ解决分布式事务最终一致性思想

1.   确保生产者消息 一定要投递到MQ服务器端成功

如果生产者投递消息到MQ服务器成功

场景1  如果消费者消费消息失败了

生产者是不需要回滚事务。 消费者采用手动ack应答方式  进行补偿机制,补偿的过程中注意 幂等性 问题。

分布式事务中遵循base理论 遵循cpa理论

如何确保生产者发送消息一定发送到MQ消息服务器端成功? confirm机制 确认应答机制

场景2    如果生产者发送消息到MQ服务器端失败

使用生产者重试机制进行发消息

四、代码实现

1、派单表

create TABLE platoon(
  id INT PRIMARY KEY AUTO_INCREMENT,
  orderId VARCHAR(255),
  takeout_userId int
)

2、订单表

create TABLE order_info(
  id INT PRIMARY KEY AUTO_INCREMENT,
  name VARCHAR(30),
  order_money INT,
  orderId VARCHAR(255)
);

3、生产者

1.实现接口 implements RabbitTemplate.ConfirmCallback

2. 重写回调方法   成功 失败的  调用

@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {

send方法里面调用回调函数:

this.rabbitTemplate.setMandatory(true);
this.rabbitTemplate.setConfirmCallback(this);

yml需要配置回调机制:

###开启消息确认机制 confirms
publisher-confirms: true
publisher-returns: true

重试也是有一定次数限制的 如果超过一定次数 就需要进行人工补偿了

上面已经实现了确保消息发送给 消费者   此时的数据不一致问题 就是:

场景3.  如何保证第一个事务先执行,生产者投递消息到MQ服务器成功,消费者消费消息成功了,但是订单事务回滚了。

(生产者投递消息给消费者消费成功 然后 生产者回滚了)

MQ解决分布式原理通过最终一致性解决总体框架图:  交换机采用路由键模式  补单队列和派但队列都绑定同一个路由键

支付服务和积分服务

下单

pom文件

  <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.6.RELEASE</version>
    </parent>
    <dependencies>

        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>1.1.1</version>
        </dependency>
        <!-- mysql 依赖 -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>
        <!-- 阿里巴巴数据源 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.0.14</version>
        </dependency>
        <!-- SpringBoot整合Web组件 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- springboot- 整个 lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!-- 添加springboot对amqp的支持 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>
        <!--fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.49</version>
        </dependency>
    </dependencies>

基础包:

@Component
public class BaseApiService {
    public BasicResult setResultError(Integer code, String msg) {
        return setResult(code, msg, null);
    }

    // 返回错误,可以传msg
    public BasicResult setResultError(String msg) {
        return setResult(ApiConstants.HTTP_RES_CODE_500, msg, null);
    }

    // 返回成功,可以传data值
    public BasicResult setResultSuccess(Object data) {
        return setResult(ApiConstants.HTTP_RES_CODE_200, ApiConstants.HTTP_RES_CODE_200_VALUE, data);
    }

    // 返回成功,沒有data值
    public BasicResult setResultSuccess() {
        return setResult(ApiConstants.HTTP_RES_CODE_200, ApiConstants.HTTP_RES_CODE_200_VALUE, null);
    }

    // 返回成功,沒有data值
    public BasicResult setResultSuccess(String msg) {
        return setResult(ApiConstants.HTTP_RES_CODE_200, msg, null);
    }

    // 通用封装
    public BasicResult setResult(Integer code, String msg, Object data) {
        return new BasicResult(code, msg, data);
    }
}
@Data
public class BasicResult {
    private Integer rtnCode;
    private String msg;
    private Object data;

    public BasicResult() {

    }

    public BasicResult(Integer rtnCode, String msg, Object data) {
        super();
        this.rtnCode = rtnCode;
        this.msg = msg;
        this.data = data;
    }

    @Override
    public String toString() {
        return "ResponseBase [rtnCode=" + rtnCode + ", msg=" + msg + ", data=" + data + "]";
    }
}

常量

public interface ApiConstants {
    // 响应请求成功
    String HTTP_RES_CODE_200_VALUE = "success";
    // 系统错误
    String HTTP_RES_CODE_500_VALUE = "fial";
    // 响应请求成功code
    Integer HTTP_RES_CODE_200 = 200;
    // 系统错误
    Integer HTTP_RES_CODE_500 = 500;
    // 未关联QQ账号
    Integer HTTP_RES_CODE_201 = 201;
}

config类

@Configuration
public class RabbitmqConfig {

    // 下单并且派单存队列
    public static final String ORDER_DIC_QUEUE = "order_dic_queue";
    // 补单队列,判断订单是否已经被创建
    public static final String ORDER_CREATE_QUEUE = "order_create_queue";
    // 下单并且派单交换机
    private static final String ORDER_EXCHANGE_NAME = "order_exchange_name";

    // 1.定义订单队列
    @Bean
    public Queue directOrderDicQueue() {
        return new Queue(ORDER_DIC_QUEUE);
    }

    // 2.定义补订单队列
    @Bean
    public Queue directCreateOrderQueue() {
        return new Queue(ORDER_CREATE_QUEUE);
    }

    // 2.定义交换机
    @Bean
    DirectExchange directOrderExchange() {
        return new DirectExchange(ORDER_EXCHANGE_NAME);
    }

    // 3.订单队列与交换机绑定
    @Bean
    Binding bindingExchangeOrderDicQueue() {
        return BindingBuilder.bind(directOrderDicQueue()).to(directOrderExchange()).with("orderRoutingKey");
    }

    // 3.补单队列与交换机绑定
    @Bean
    Binding bindingExchangeCreateOrder() {
        return BindingBuilder.bind(directCreateOrderQueue()).to(directOrderExchange()).with("orderRoutingKey");
    }

}

实体类

@Data
public class OrderEntity {

    private Long id;
    // 订单名称
    private String name;
    // 下单金额
    private Double orderMoney;
    // 订单id
    private String orderId;
}

mapper接口

public interface OrderMapper {

    @Insert(value = "INSERT INTO `order_info` VALUES (#{id}, #{name}, #{orderMoney},#{orderId})")
    @Options(useGeneratedKeys = true, keyProperty = "id", keyColumn = "id")
    public int addOrder(OrderEntity orderEntity);

    @Select("SELECT id as id ,name as name , order_money as orderMoney,orderId as orderId from order_info where orderId=#{orderId};")
    public OrderEntity findOrderId(@Param("orderId") String orderId);

}

service类

@Service
public class OrderService extends BaseApiService implements RabbitTemplate.ConfirmCallback {

    @Autowired
    private OrderMapper orderMapper;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Transactional
    public BasicResult addOrderAndDispatch(){
        //先下单 订单表插入数据
        OrderEntity orderEntity = new OrderEntity();
        orderEntity.setName("黄焖鸡米饭");
        // 价格是300元
        orderEntity.setOrderMoney(300d);
        // 商品id
        String orderId = UUID.randomUUID().toString();
        orderEntity.setOrderId(orderId);
        // 1.先下单,创建订单 (往订单数据库中插入一条数据)
        int orderResult = orderMapper.addOrder(orderEntity);
        System.out.println("orderResult:" + orderResult);
        if (orderResult <= 0) {
            return setResultError("下单失败!");
        }
        // 2.订单表插插入完数据后 订单表发送 外卖小哥
        send(orderId);
        //出现异常的时候
        //int i = 1/0;
        return setResultSuccess();
    }

    /**
     * 发送消息
     * @param orderId
     */
    private void send(String orderId) {
        JSONObject jsonObect = new JSONObject();
        jsonObect.put("orderId", orderId);
        String msg = jsonObect.toJSONString();
        System.out.println("msg:" + msg);
        //封装消息
        MessageBuilder.withBody(msg.getBytes())
                .setContentType(MessageProperties.CONTENT_TYPE_JSON)
                .setContentEncoding("utf-8")
                .setMessageId(orderId);
        //构建回调参数
        CorrelationData correlationData = new CorrelationData(orderId);
        //发送消息
        this.rabbitTemplate.setMandatory(true);
        this.rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.convertAndSend("order_exchange_name","orderRoutingKey"
        ,msg,correlationData);
    }
    // 生产消息确认机制 生产者往服务器端发送消息的时候 采用应答机制
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        //全局ID 都是相同的
        String orderId = correlationData.getId();
        System.out.println("消息id:" + correlationData.getId());
        if (ack) { //消息发送成功
            System.out.println("消息发送确认成功");
        } else {
            //重试机制
            send(orderId);
            System.out.println("消息发送确认失败:" + cause);
        }
    }
}

cotroller类

@RestController
public class OrderController extends BaseApiService {
    @Autowired
    private OrderService orderService;

    @RequestMapping("/addOrder")
    public BasicResult addOrder() {
        return orderService.addOrderAndDispatch();
    }
}

启动类

@MapperScan("com.yehui.mapper")
@SpringBootApplication
public class AppOrder {

    public static void main(String[] args) {
        SpringApplication.run(AppOrder.class, args);
    }

}

yml配置文件

spring:
  rabbitmq:
  ####连接地址
    host: localhost
   ####端口号
    port: 5672
   ####账号
    username: guest
   ####密码
    password: guest
   ### 地址
    virtual-host: /
    ###开启消息确认机制 confirms
    publisher-confirms: true
    publisher-returns: true
  #数据库连接信息
  datasource:
        name: test
        url: jdbc:mysql://127.0.0.1:3306/project?useUnicode=true&characterEncoding=utf8&autoReconnect=true&rewriteBatchedStatements=TRUE
        username: root
        password: root
        # 使用druid数据源
        type: com.alibaba.druid.pool.DruidDataSource
        driver-class-name: com.mysql.jdbc.Driver

派单

pom文件

 <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.6.RELEASE</version>
    </parent>
    <dependencies>

        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>1.1.1</version>
        </dependency>
        <!-- mysql 依赖 -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>
        <!-- 阿里巴巴数据源 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.0.14</version>
        </dependency>
        <!-- SpringBoot整合Web组件 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- springboot- 整个 lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!-- 添加springboot对amqp的支持 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>
        <!--fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.49</version>
        </dependency>
    </dependencies>

常量类

public interface ApiConstants {
    // 响应请求成功
    String HTTP_RES_CODE_200_VALUE = "success";
    // 系统错误
    String HTTP_RES_CODE_500_VALUE = "fial";
    // 响应请求成功code
    Integer HTTP_RES_CODE_200 = 200;
    // 系统错误
    Integer HTTP_RES_CODE_500 = 500;
    // 未关联QQ账号
    Integer HTTP_RES_CODE_201 = 201;

}

实体类

@Data
public class DispatchEntity {

    private Long id;
    // 订单号
    private String orderId;
    // 外卖员id
    private Long takeoutUserId;

}

mapper接口

public interface DispatchMapper {

    /**
     * 新增派单任务
     */
    @Insert("INSERT into platoon values (null,#{orderId},#{takeoutUserId});")
    public int insertDistribute(DispatchEntity distributeEntity);
    /**
     * 查询是否已经派单了
     */
    @Select("SELECT * FROM platoon WHERE orderid =#{OrderId}")
    public DispatchEntity findByOrderId(@Param("orderId") String OrderId);

}

config类

@Configuration
public class RabbitmqConfig {

    // 下单并且派单存队列
    public static final String ORDER_DIC_QUEUE = "order_dic_queue";
    // 补单队列,判断订单是否已经被创建
    public static final String ORDER_CREATE_QUEUE = "order_create_queue";
    // 下单并且派单交换机
    private static final String ORDER_EXCHANGE_NAME = "order_exchange_name";

    // 1.定义派单队列
    @Bean
    public Queue OrderDicQueue() {
        return new Queue(ORDER_DIC_QUEUE);
    }
/*
    // 2.定义补订单队列
    @Bean
    public Queue directCreateOrderQueue() {
        return new Queue(ORDER_CREATE_QUEUE);
    }*/

    // 2.定义交换机
    @Bean
    DirectExchange directOrderExchange() {
        return new DirectExchange(ORDER_EXCHANGE_NAME);
    }

    // 3.订单队列与交换机绑定
    @Bean
    Binding bindingExchangeOrderDicQueue() {
        return BindingBuilder.bind(OrderDicQueue()).to(directOrderExchange()).with("orderRoutingKey");
    }

}

派单服务

/**
 * 派单服务
 */
@Component
public class DispatchConsumer {

    @Autowired
    private DispatchMapper dispatchMapper;

    @RabbitListener(queues = "order_dic_queue")
    public void process(Message message, Channel channel) throws UnsupportedEncodingException {
        String messageId = message.getMessageProperties().getMessageId();
        String msg = new String(message.getBody(), "UTF-8");
        System.out.println("派单服务平台" + msg + ",消息id:" + messageId);
        JSONObject jsonObject = JSONObject.parseObject(msg);
        String orderId = jsonObject.getString("orderId");
        if (StringUtils.isEmpty(orderId)) {
            // 日志记录
            return;
        }
        DispatchEntity dispatchEntity = new DispatchEntity();
        // 订单id
        dispatchEntity.setOrderId(orderId);
        // 外卖员id
        dispatchEntity.setTakeoutUserId(12l);
        // 使用orderId查询是否已经派单了 网络重试间隔
        DispatchEntity byOrderId = dispatchMapper.findByOrderId(orderId);
        if (byOrderId == null) {
            // 手动签收消息,通知mq服务器端删除该消息 已经派过单了,通知MQ不要在继续重试。
            basicNack(message, channel);
            return;
        }
        //插入数据库
        int insertDistribute = dispatchMapper.insertDistribute(dispatchEntity);
        if (insertDistribute > 0) {
            // 手动签收消息,通知mq服务器端删除该消息
            basicNack(message, channel);
        }
    }
    // 消费者获取到消息之后 手动签收 通知MQ删除该消息
    private void basicNack(Message message, Channel channel) {
        try {
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

启动类

@MapperScan("com.yehui.mapper")
@SpringBootApplication
public class AppDispatch {

    public static void main(String[] args) {
        SpringApplication.run(AppDispatch.class, args);
    }

}

yml文件

spring:
  rabbitmq:
  ####连接地址
    host: localhost
   ####端口号
    port: 5672
   ####账号
    username: guest
   ####密码
    password: guest
   ### 地址
    virtual-host: /
    listener:
      simple:
        retry:
        ####开启消费者(程序出现异常的情况下会)进行重试
          enabled: true
         ####最大重试次数
          max-attempts: 5
        ####重试间隔次数
          initial-interval: 3000
        ####开启手动ack
        acknowledge-mode: manual

  #数据库连接信息
  datasource:
        name: test
        url: jdbc:mysql://127.0.0.1:3306/project?useUnicode=true&characterEncoding=utf8&autoReconnect=true&rewriteBatchedStatements=TRUE
        username: root
        password: root
        # 使用druid数据源
        type: com.alibaba.druid.pool.DruidDataSource
        driver-class-name: com.mysql.jdbc.Driver
server:
  port: 8081

原文地址:https://www.cnblogs.com/cxyyh/p/11072120.html

时间: 2024-10-10 10:11:58

RabbitMq解决分布式事物的相关文章

RabbitMQ解决分布式事务

案例:经典案例,以目前流行点外卖的案例,用户下单后,调用订单服务,让后订单服务调用派单系统通知送外卖人员送单,这时候订单系统与派单系统采用MQ异步通讯. RabbitMQ解决分布式事务原理: 采用最终一致性原理.需要保证以下三要素1.确认生产者一定要将数据投递到MQ服务器中(采用MQ消息确认机制)2.MQ消费者消息能够正确消费消息,采用手动ACK模式,使用不补偿机制(注意重试幂等性问题)3.如何保证第一个事务先执行,采用补偿机制(补单机制),在创建一个补单消费者进行监听,如果订单没有创建成功,进

mq解决分布式事物问题

今天只看看原理,下一节看项目怎么集成mq进行解决分布式事物. 1.什么情况下会使用到分布式事物? 举例说明:现有一个支付系统,因为项目使用的是微服务框架,有订单模块和支付模块两个模块.生产者进行订单的下单操作购买100元,这时候订单表数据应该是新增一条,然后支付模块的个人账户资金应该是加100元.同一个方法中既需要对订单数据库进行新增又需要调用支付模块对数据进行修改(不同项目不同数据库),这时候就会用到分布式事物,只是举个典型的例子. 那么问题来了,生产者进行订单的下单操作怎么保证订单和支付的数

使用kafka消息队列解决分布式事务

微服务框架Spring Cloud介绍 Part1: 使用事件和消息队列实现分布式事务 本文转自:http://skaka.me/blog/2016/04/21/springcloud1/ 不同于单一架构应用(Monolith), 分布式环境下, 进行事务操作将变得困难, 因为分布式环境通常会有多个数据源, 只用本地数据库事务难以保证多个数据源数据的一致性. 这种情况下, 可以使用两阶段或者三阶段提交协议来完成分布式事务.但是使用这种方式一般来说性能较差, 因为事务管理器需要在多个数据源之间进行

MySQL分布式事物(XA事物)的使用

有时一个系统的数据 放在不同的库之中.如果用普通的事物 一个分支库提交成功了,另外一个分支库提交失败了, 这候 两个库没有同步的成功或者失败.会导致系统数据的不完整. 对于处理这种情况 MySQL有了处理分布式(XA)事物的语法 XA START xid 用于启动一个带给定xid的XA事物. xid包含3个部分 gtrid,bqual,formatID gtrid 是一个分布式事物的标识符,一个分布式事物的每个XA事物的gtrid必须相同,这样可以明确知道每个XA事物属于哪个分布式事物. bqu

使用Spring Session和Redis解决分布式Session跨域共享问题

前言 对于分布式使用Nginx+Tomcat实现负载均衡,最常用的均衡算法有IP_Hash.轮训.根据权重.随机等.不管对于哪一种负载均衡算法,由于Nginx对不同的请求分发到某一个Tomcat,Tomcat在运行的时候分别是不同的容器里,因此会出现session不同步或者丢失的问题. 文末分享了我一部分私人收藏 有兴趣的可以收藏看一下的 都是架构师进阶的内容 实际上实现Session共享的方案很多,其中一种常用的就是使用Tomcat.Jetty等服务器提供的Session共享功能,将Sessi

搞懂分布式技术19:使用RocketMQ事务消息解决分布式事务

搞懂分布式技术19:使用RocketMQ事务消息解决分布式事务 初步认识RocketMQ的核心模块 rocketmq模块 rocketmq-broker:接受生产者发来的消息并存储(通过调用rocketmq-store),消费者从这里取得消息. rocketmq-client:提供发送.接受消息的客户端API. rocketmq-namesrv:NameServer,类似于Zookeeper,这里保存着消息的TopicName,队列等运行时的元信息.(有点NameNode的味道) rocketm

Spring 分布式事物详解

在学习分布式事物的过程中会遇到以下关键名词: 相关名词: XA :XA规范的目的是允许多个资源(如数据库,应用服务器,消息队列,等等)在同一事务中访问,这样可以使ACID属性跨越应用程序而保持有效.XA使用两阶段提交来保证所有资源同时提交或回滚任何特定的事务. JTA: Java事务API(Java Transaction API,简称JTA ) 是一个Java企业版 的应用程序接口,在Java环境中,允许完成跨越多个XA资源的分布式事务. 分布式事物要解决的问题: 把不同支援放到一个事物中,实

分布式事物解决方案-TCC

分布式框架下,如何保证事物一致性一直是一个热门话题.当然事物一致性解决方案有很多种(请参考:分布式事物一致性设计思路),我们今天主要介绍TCC方案解决的思路.以下是参与设计讨论的一种解决思路,大家有问题请留言. 1.基本概念 TI:Transaction Interceptor,事务拦截器,位于dapeng容器的filterChain链中. 由于TI的逻辑会比较复杂, 不太适合在IO线程中操作 TM:Transaction Manager, 事务管理器,作为一个独立的服务存在. 事务发起方: 服

基于RabbitMQ实现分布式延时任务调度

一.分布式延时任务 传统做法是将延时任务插入数据库,使用定时去扫描,比对任务是否到期,到期则执行并设置任务状态为完成.这种做法在分布式环境下还需要对定时扫描做特殊处理(加分布式锁)避免任务被重复执行. 然而使用RabbitMQ实现延时任务可以天然解决分布式环境下重复执行的问题(利用mq中消息只会被一个消费者消费这一特性可以让延时任务只会被一个消费者执行).基于RabbitMQ做延时任务的核心是利用RabbitMQ的消息到期转发特性.发送消息时设置消息到期时间,等消息到期未被消费时会将消息转发到一