RabbitMQ解决分布式事务

案例:
经典案例,以目前流行点外卖的案例,用户下单后,调用订单服务,让后订单服务调用派单系统通知送外卖人员送单,这时候订单系统与派单系统采用MQ异步通讯。

RabbitMQ解决分布式事务原理: 采用最终一致性原理。
需要保证以下三要素
1、确认生产者一定要将数据投递到MQ服务器中(采用MQ消息确认机制)
2、MQ消费者消息能够正确消费消息,采用手动ACK模式,使用不补偿机制(注意重试幂等性问题)
3、如何保证第一个事务先执行,采用补偿机制(补单机制),在创建一个补单消费者进行监听,如果订单没有创建成功,进行补单。

用传统的HTTP协议不能解决高并发:

MQ解决分布式事务一致性

案例中 订单表 和 派单表必须一致!

用MQ 可以做流量削峰值

MQ解决分布式事务最终一致性思想

1.   确保生产者消息 一定要投递到MQ服务器端成功

如果生产者投递消息到MQ服务器成功

场景1  如果消费者消费消息失败了

生产者是不需要回滚事务。 消费者采用手动ack应答方式  进行补偿机制,补偿的过程中注意 幂等性 问题。

分布式事务中遵循base理论 遵循cpa理论

如何确保生产者发送消息一定发送到MQ消息服务器端成功? confirm机制 确认应答机制

场景2    如果生产者发送消息到MQ服务器端失败

使用生产者重试机制进行发消息

派件表:

create TABLE platoon(
  id INT PRIMARY KEY AUTO_INCREMENT,
  orderId VARCHAR(255),
  takeout_userId int

)

订单表:

create TABLE order_info(
  id INT PRIMARY KEY AUTO_INCREMENT,
  name VARCHAR(30),
  order_money INT,
  orderId VARCHAR(255)

);

生产者

1.实现接口 implements RabbitTemplate.ConfirmCallback

2. 重写回调方法   成功 失败的  调用

@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {

send方法里面调用回调函数:

this.rabbitTemplate.setMandatory(true);
this.rabbitTemplate.setConfirmCallback(this);

yml需要配置回调机制:

###开启消息确认机制 confirms
publisher-confirms: true
publisher-returns: true

重试也是有一定次数限制的 如果超过一定次数 就需要进行人工补偿了

上面已经实现了确保消息发送给 消费者   此时的数据不一致问题 就是:

场景3.  如何保证第一个事务先执行,生产者投递消息到MQ服务器成功,消费者消费消息成功了,但是订单事务回滚了。    (生产者投递消息给消费者消费成功 然后 生产者回滚了)

MQ解决分布式原理通过最终一致性解决总体框架图:  交换机采用路由键模式  补单队列和派但队列都绑定同一个路由键

下单:

pom:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.mayikt</groupId>
    <artifactId>rabbitmq_order</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.0.1.RELEASE</version>
</parent>
    <dependencies>

        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>1.1.1</version>
        </dependency>
        <!-- mysql 依赖 -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>
        <!-- 阿里巴巴数据源 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.0.14</version>
        </dependency>
        <!-- SpringBoot整合Web组件 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- springboot- 整个 lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!-- 添加springboot对amqp的支持 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>
        <!--fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.49</version>
        </dependency>
    </dependencies>
</project>

基础包:

public interface ApiConstants {
    // 响应请求成功
    String HTTP_RES_CODE_200_VALUE = "success";
    // 系统错误
    String HTTP_RES_CODE_500_VALUE = "fial";
    // 响应请求成功code
    Integer HTTP_RES_CODE_200 = 200;
    // 系统错误
    Integer HTTP_RES_CODE_500 = 500;
    // 未关联QQ账号
    Integer HTTP_RES_CODE_201 = 201;

}
import org.springframework.stereotype.Component;

@Component
public class BaseApiService {

    public ResponseBase setResultError(Integer code, String msg) {
        return setResult(code, msg, null);
    }

    // 返回错误,可以传msg
    public ResponseBase setResultError(String msg) {
        return setResult(ApiConstants.HTTP_RES_CODE_500, msg, null);
    }

    // 返回成功,可以传data值
    public ResponseBase setResultSuccess(Object data) {
        return setResult(ApiConstants.HTTP_RES_CODE_200, ApiConstants.HTTP_RES_CODE_200_VALUE, data);
    }

    // 返回成功,沒有data值
    public ResponseBase setResultSuccess() {
        return setResult(ApiConstants.HTTP_RES_CODE_200, ApiConstants.HTTP_RES_CODE_200_VALUE, null);
    }

    // 返回成功,沒有data值
    public ResponseBase setResultSuccess(String msg) {
        return setResult(ApiConstants.HTTP_RES_CODE_200, msg, null);
    }

    // 通用封装
    public ResponseBase setResult(Integer code, String msg, Object data) {
        return new ResponseBase(code, msg, data);
    }

}
import lombok.Data;

@Data
public class ResponseBase {

    private Integer rtnCode;
    private String msg;
    private Object data;

    public ResponseBase() {

    }

    public ResponseBase(Integer rtnCode, String msg, Object data) {
        super();
        this.rtnCode = rtnCode;
        this.msg = msg;
        this.data = data;
    }

    @Override
    public String toString() {
        return "ResponseBase [rtnCode=" + rtnCode + ", msg=" + msg + ", data=" + data + "]";
    }

}

补偿队列:

import lombok.Data;

@Data
public class ResponseBase {

    private Integer rtnCode;
    private String msg;
    private Object data;

    public ResponseBase() {

    }

    public ResponseBase(Integer rtnCode, String msg, Object data) {
        super();
        this.rtnCode = rtnCode;
        this.msg = msg;
        this.data = data;
    }

    @Override
    public String toString() {
        return "ResponseBase [rtnCode=" + rtnCode + ", msg=" + msg + ", data=" + data + "]";
    }

}

entity:

import lombok.Data;

@Data
public class OrderEntity {

    private Long id;
    // 订单名称
    private String name;
    // 下单金额
    private Double orderMoney;
    // 订单id
    private String orderId;
}

mapper:

import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Options;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;

import com.mayikt.entity.OrderEntity;

public interface OrderMapper {

    @Insert(value = "INSERT INTO `order_info` VALUES (#{id}, #{name}, #{orderMoney},#{orderId})")
    @Options(useGeneratedKeys = true, keyProperty = "id", keyColumn = "id")
    public int addOrder(OrderEntity orderEntity);

    @Select("SELECT id as id ,name as name , order_money as orderMoney,orderId as orderId from order_info where orderId=#{orderId};")
    public OrderEntity findOrderId(@Param("orderId") String orderId);

}

MQ的配置:

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

@Component
public class RabbitmqConfig {

    // 下单并且派单存队列
    public static final String ORDER_DIC_QUEUE = "order_dic_queue";
    // 补单队列,判断订单是否已经被创建
    public static final String ORDER_CREATE_QUEUE = "order_create_queue";
    // 下单并且派单交换机
    private static final String ORDER_EXCHANGE_NAME = "order_exchange_name";

    // 1.定义订单队列
    @Bean
    public Queue directOrderDicQueue() {
        return new Queue(ORDER_DIC_QUEUE);
    }

    // 2.定义补订单队列
    @Bean
    public Queue directCreateOrderQueue() {
        return new Queue(ORDER_CREATE_QUEUE);
    }

    // 2.定义交换机
    @Bean
    DirectExchange directOrderExchange() {
        return new DirectExchange(ORDER_EXCHANGE_NAME);
    }

    // 3.订单队列与交换机绑定
    @Bean
    Binding bindingExchangeOrderDicQueue() {
        return BindingBuilder.bind(directOrderDicQueue()).to(directOrderExchange()).with("orderRoutingKey");
    }

    // 3.补单队列与交换机绑定
    @Bean
    Binding bindingExchangeCreateOrder() {
        return BindingBuilder.bind(directCreateOrderQueue()).to(directOrderExchange()).with("orderRoutingKey");
    }

}

service:

import java.util.UUID;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import com.alibaba.fastjson.JSONObject;
import com.mayikt.base.BaseApiService;
import com.mayikt.base.ResponseBase;
import com.mayikt.entity.OrderEntity;
import com.mayikt.mapper.OrderMapper;

@Service
public class OrderService extends BaseApiService implements RabbitTemplate.ConfirmCallback {
    @Autowired
    private OrderMapper orderMapper;
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public ResponseBase addOrderAndDispatch() {
        //先下单 订单表插入数据
        OrderEntity orderEntity = new OrderEntity();
        orderEntity.setName("黄焖鸡米饭");
        // 价格是300元
        orderEntity.setOrderMoney(300d);
        // 商品id
        String orderId = UUID.randomUUID().toString();
        orderEntity.setOrderId(orderId);
        // 1.先下单,创建订单 (往订单数据库中插入一条数据)
        int orderResult = orderMapper.addOrder(orderEntity);
        System.out.println("orderResult:" + orderResult);
        if (orderResult <= 0) {
            return setResultError("下单失败!");
        }
        // 2.订单表插插入完数据后 订单表发送 外卖小哥
        send(orderId);
    //    int i = 1/0;   //发生异常
        return setResultSuccess();
    }

    private void send(String orderId) {
        JSONObject jsonObect = new JSONObject();
        jsonObect.put("orderId", orderId);
        String msg = jsonObect.toJSONString();
        System.out.println("msg:" + msg);
        // 封装消息
        Message message = MessageBuilder.withBody(msg.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON)
                .setContentEncoding("utf-8").setMessageId(orderId).build();
        // 构建回调返回的数据
        CorrelationData correlationData = new CorrelationData(orderId);
        // 发送消息
        this.rabbitTemplate.setMandatory(true);
        this.rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.convertAndSend("order_exchange_name", "orderRoutingKey", message, correlationData);

    }

    // 生产消息确认机制 生产者往服务器端发送消息的时候 采用应答机制
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String orderId = correlationData.getId(); //id 都是相同的哦  全局ID
        System.out.println("消息id:" + correlationData.getId());
        if (ack) { //消息发送成功
            System.out.println("消息发送确认成功");
        } else {
            //重试机制
            send(orderId);
            System.out.println("消息发送确认失败:" + cause);
        }

    }

}

controller:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import com.mayikt.base.BaseApiService;
import com.mayikt.base.ResponseBase;
import com.mayikt.service.OrderService;

@RestController
public class OrderController extends BaseApiService {
    @Autowired
    private OrderService orderService;

    @RequestMapping("/addOrder")
    public ResponseBase addOrder() {
        return orderService.addOrderAndDispatch();
    }

}

yml:

spring:
  rabbitmq:
  ####连接地址
    host: 192.168.91.6
   ####端口号
    port: 5672
   ####账号
    username: admin
   ####密码
    password: admin
   ### 地址
    virtual-host: /admin_toov5
    ###开启消息确认机制 confirms
    publisher-confirms: true
    publisher-returns: true
  #数据库连接信息
  datasource:
        name: test
        url: jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf8&autoReconnect=true&rewriteBatchedStatements=TRUE
        username: root
        password: root
        # 使用druid数据源
        type: com.alibaba.druid.pool.DruidDataSource
        driver-class-name: com.mysql.jdbc.Driver

启动类:

package com.mayikt;

import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@MapperScan("com.mayikt.mapper")
@SpringBootApplication
public class AppOrder {

    public static void main(String[] args) {
        SpringApplication.run(AppOrder.class, args);
    }

}

派单:

Entity:

package com.mayikt.entity;

import lombok.Data;

@Data
public class DispatchEntity {

    private Long id;
    // 订单号
    private String orderId;
    // 外卖员id
    private Long takeoutUserId;

}

Mapper:

package com.mayikt.mapper;

import org.apache.ibatis.annotations.Insert;

import com.mayikt.entity.DispatchEntity;

public interface DispatchMapper {

    /**
     * 新增派单任务
     */
    @Insert("INSERT into platoon values (null,#{orderId},#{takeoutUserId});")
    public int insertDistribute(DispatchEntity distributeEntity);

}

consumer:

做一些路由器 队列 路由键的绑定 声明工作

package com.mayikt.rabbitmq.consumer;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

@Component
public class RabbitmqConfig {

    // 下单并且派单存队列
    public static final String ORDER_DIC_QUEUE = "order_dic_queue";
    // 补单队列,判断订单是否已经被创建
    public static final String ORDER_CREATE_QUEUE = "order_create_queue";
    // 下单并且派单交换机
    private static final String ORDER_EXCHANGE_NAME = "order_exchange_name";

    // 1.定义派单队列
    @Bean
    public Queue OrderDicQueue() {
        return new Queue(ORDER_DIC_QUEUE);
    }
/*
    // 2.定义补订单队列
    @Bean
    public Queue directCreateOrderQueue() {
        return new Queue(ORDER_CREATE_QUEUE);
    }*/

    // 2.定义交换机
    @Bean
    DirectExchange directOrderExchange() {
        return new DirectExchange(ORDER_EXCHANGE_NAME);
    }

    // 3.订单队列与交换机绑定
    @Bean
    Binding bindingExchangeOrderDicQueue() {
        return BindingBuilder.bind(OrderDicQueue()).to(directOrderExchange()).with("orderRoutingKey");
    }

    /*// 3.补单队列与交换机绑定
    @Bean
    Binding bindingExchangeCreateOrder() {
        return BindingBuilder.bind(OrderDicQueue()).to(directOrderExchange()).with("orderRoutingKey");
    }*/

}

派件消费:

package com.mayikt.rabbitmq.consumer;

import java.util.Map;

import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.stereotype.Component;

import com.alibaba.fastjson.JSONObject;
import com.mayikt.entity.DispatchEntity;
import com.mayikt.mapper.DispatchMapper;
import com.rabbitmq.client.Channel;

/**
 * 派单服务
 */
@Component
public class DispatchConsumer {
    @Autowired
    private DispatchMapper dispatchMapper;

    @RabbitListener(queues = "order_dic_queue")
    public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception {
        String messageId = message.getMessageProperties().getMessageId();
        String msg = new String(message.getBody(), "UTF-8");
        System.out.println("派单服务平台" + msg + ",消息id:" + messageId);
        JSONObject jsonObject = JSONObject.parseObject(msg);
        String orderId = jsonObject.getString("orderId");
        if (StringUtils.isEmpty(orderId)) {
            // 日志记录
            return;
        }
        DispatchEntity dispatchEntity = new DispatchEntity();
        // 订单id
        dispatchEntity.setOrderId(orderId);
        // 外卖员id
        dispatchEntity.setTakeoutUserId(12l);

        try {
            int insertDistribute = dispatchMapper.insertDistribute(dispatchEntity);
            if (insertDistribute > 0) {
                // 手动签收消息,通知mq服务器端删除该消息
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            }
        } catch (Exception e) {
            e.printStackTrace();
            // // 丢弃该消息
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        }
    }

}

启动类:

package com.mayikt;

import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@MapperScan("com.mayikt.mapper")
@SpringBootApplication
public class AppDispatch {

    public static void main(String[] args) {
        SpringApplication.run(AppDispatch.class, args);
    }

}

yml:

spring:
  rabbitmq:
  ####连接地址
    host: 192.168.91.6
   ####端口号
    port: 5672
   ####账号
    username: admin
   ####密码
    password: admin
   ### 地址
    virtual-host: /admin_toov5
    listener:
      simple:
        retry:
        ####开启消费者(程序出现异常的情况下会)进行重试
          enabled: true
         ####最大重试次数
          max-attempts: 5
        ####重试间隔次数
          initial-interval: 3000
        ####开启手动ack
        acknowledge-mode: manual

  #数据库连接信息
  datasource:
        name: test
        url: jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf8&autoReconnect=true&rewriteBatchedStatements=TRUE
        username: root
        password: root
        # 使用druid数据源
        type: com.alibaba.druid.pool.DruidDataSource
        driver-class-name: com.mysql.jdbc.Driver
server:
  port: 8081

pom:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.itmayiedu</groupId>
    <artifactId>rabbitmq_stock</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.1.RELEASE</version>
    </parent>
    <dependencies>

        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>1.1.1</version>
        </dependency>
        <!-- mysql 依赖 -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>
        <!-- 阿里巴巴数据源 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.0.14</version>
        </dependency>
        <!-- SpringBoot整合Web组件 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- springboot- 整个 lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!-- 添加springboot对amqp的支持 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>
        <!--fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.49</version>
        </dependency>
    </dependencies>
</project>

原文地址:https://www.cnblogs.com/toov5/p/10289999.html

时间: 2024-09-30 10:15:53

RabbitMQ解决分布式事务的相关文章

RabbitMq解决分布式事物

一.RabbitMQ解决分布式事务思路: 案例: 经典案例,以目前流行点外卖的案例,用户下单后,调用订单服务,让后订单服务调用派单系统通知送外卖人员送单,这时候订单系统与派单系统采用MQ异步通讯. 二.RabbitMQ解决分布式事务原理:采用最终一致性原理. 需要保证以下三要素 1.确认生产者一定要将数据投递到MQ服务器中(采用MQ消息确认机制) 2.MQ消费者消息能够正确消费消息,采用手动ACK模式(注意重试幂等性问题) 3.如何保证第一个事务先执行,采用补偿机制,在创建一个补单消费者进行监听

使用kafka消息队列解决分布式事务

微服务框架Spring Cloud介绍 Part1: 使用事件和消息队列实现分布式事务 本文转自:http://skaka.me/blog/2016/04/21/springcloud1/ 不同于单一架构应用(Monolith), 分布式环境下, 进行事务操作将变得困难, 因为分布式环境通常会有多个数据源, 只用本地数据库事务难以保证多个数据源数据的一致性. 这种情况下, 可以使用两阶段或者三阶段提交协议来完成分布式事务.但是使用这种方式一般来说性能较差, 因为事务管理器需要在多个数据源之间进行

【分布式事务】使用atomikos+jta解决分布式事务问题

一.前言 分布式事务,这个问题困惑了小编很久,在3个月之前,就间断性的研究分布式事务.从MQ方面,数据库事务方面,jta方面.近期终于成功了,使用JTA解决了分布式事务问题.先写一下心得,后面的二级提交也会在研究. 二.介绍 分布式事务 说到分布式事务,可以理解为,由于分布式而引起的事务不一致的问题.随着项目做大,模块拆分,数据库拆分.一次包含增删改操作数据库涉及到了更新两个不同物理节点的数据库,这样的数据库事务只能保证自己处理的部分的事务,但是整个的事务就不能保证一致性. 网上针对分布式事务常

搞懂分布式技术19:使用RocketMQ事务消息解决分布式事务

搞懂分布式技术19:使用RocketMQ事务消息解决分布式事务 初步认识RocketMQ的核心模块 rocketmq模块 rocketmq-broker:接受生产者发来的消息并存储(通过调用rocketmq-store),消费者从这里取得消息. rocketmq-client:提供发送.接受消息的客户端API. rocketmq-namesrv:NameServer,类似于Zookeeper,这里保存着消息的TopicName,队列等运行时的元信息.(有点NameNode的味道) rocketm

一文教你迅速解决分布式事务 XA 一致性问题

欢迎大家前往腾讯云技术社区,获取更多腾讯海量技术实践干货哦~ 作者:腾讯云数据库团队 近日,腾讯云发布了分布式数据库解决方案(DCDB),其最明显的特性之一就是提供了高于开源分布式事务XA的性能.大型业务系统有着用户多.并发高的特点,在这方面,集中式数据库(单机数据库)的性能很难支持,因此主流的互联网公司往往采用分布式(架构)数据库,物理上利用更多的低端设备,逻辑上对大表水平拆分支撑业务的需要. 虽然分布式数据库能解决性能难题,但事务一致性(Consistency)的问题,却很难在分布式数据库上

Spring Boot微服务如何集成fescar解决分布式事务?

什么是fescar? 关于fescar的详细介绍,请参阅fescar wiki. 传统的2PC提交协议,会持有一个全局性的锁,所有局部事务预提交成功后一起提交,或有一个局部事务预提交失败后一起回滚,最后释放全局锁.锁持有的时间较长,会对并发造成较大的影响,死锁的风险也较高. fescar的创新之处在于,每个局部事务执行完立即提交,释放本地锁:它会去解析你代码中的sql,从数据库中获得事务提交前的事务资源即数据,存放到undo_log中,全局事务协调器在回滚的时候直接使用undo_log中的数据覆

Spring Boot 集成 Seata 解决分布式事务问题

文章首发于公众号<程序员果果>地址 : https://mp.weixin.qq.com/s/aDhGG3Y2t4lPYetK01Wmxg seata 简介 Seata 是 阿里巴巴2019年开源的分布式事务解决方案,致力于在微服务架构下提供高性能和简单易用的分布式事务服务.在 Seata 开源之前,Seata 对应的内部版本在阿里内部一直扮演着分布式一致性中间件的角色,帮助阿里度过历年的双11,对各业务进行了有力的支撑.经过多年沉淀与积累,2019.1 Seata 正式宣布对外开源 .目前

RocketMQ解决分布式事务

1.原理图: 2.设计实现思路: 1.生产者(发送方)投递事务消息到Broker中,设置该消息为半消息 不可以被消费: 2.开始执行我们的本地事务,将本地事务执行的结果(回滚或者提交)发送给Broker: 3.Broker获取回滚或者提交,如果是回滚的情况则删除该消息.如果是提交的话,该消息就可以被消费者消费: 4.Broker如果没有及时的获取发送方本地事务结果的话,会主动查询本地事务结果. 核心代码发送方 1 @RestController 2 public class ProducerCo

【故障处理】分布式事务ORA-01591错误解决

[故障处理]分布式事务ORA-01591错误解决 1  BLOG文档结构图       2  前言部分 2.1  导读和注意事项 各位技术爱好者,看完本文后,你可以掌握如下的技能,也可以学到一些其它你所不知道的知识,~O(∩_∩)O~: ① 分布式事务的简单概念         ② ORA-01591错误解决   Tips: ① 本文在ITpub(http://blog.itpub.net/26736162).博客园(http://www.cnblogs.com/lhrbest)和微信公众号(x