一个基于RabbitMQ的可复用的事务消息方案

原文:一个基于RabbitMQ的可复用的事务消息方案

前提#

分布式事务是微服务实践中一个比较棘手的问题,在笔者所实施的微服务实践方案中,都采用了折中或者规避强一致性的方案。参考Ebay多年前提出的本地消息表方案,基于RabbitMQMySQLJDBC)做了轻量级的封装,实现了低入侵性的事务消息模块。本文的内容就是详细分析整个方案的设计思路和实施。环境依赖如下:

  • JDK1.8+
  • spring-boot-start-web:2.x.xspring-boot-start-jdbc:2.x.xspring-boot-start-amqp:2.x.x
  • HikariCP:3.x.xspring-boot-start-jdbc自带)、mysql-connector-java:5.1.48
  • redisson:3.12.1

方案设计思路#

事务消息原则上只适合弱一致性(或者说最终一致性)的场景,常见的弱一致性场景如:

  • 用户服务完成了注册动作,向短信服务推送一条营销相关的消息。
  • 信贷体系中,订单服务保存订单完毕,向审批服务推送一条待审批的订单记录信息。
  • ......

强一致性的场景一般不应该选用事务消息

一般情况下,要求强一致性说明要严格同步,也就是所有操作必须同时成功或者同时失败,这样就会引入同步带来的额外消耗。如果一个事务消息模块设计合理,补偿、查询、监控等等功能都完毕,由于系统交互是异步的,整体吞吐要比严格同步高。在笔者负责的业务系统中基于事务消息使用还定制了一条基本原则:消息内容正确的前提下,消费方出现异常需要自理

简单来说就是:上游保证了自身的业务正确性,成功推送了正确的消息到RabbitMQ就认为上游义务已经结束。

为了降低代码的入侵性,事务消息需要借助Spring编程式事务或者声明式事务。编程式事务一般依赖于TransactionTemplate,而声明式事务依托于AOP模块,依赖于注解@Transactional

接着需要自定义一个事务消息功能模块,新增一个事务消息记录表(其实就是本地消息表),用于保存每一条需要发送的消息记录。事务消息功能模块的主要功能是:

  • 保存消息记录。
  • 推送消息到RabbitMQ服务端。
  • 消息记录的查询、补偿推送等等。

事务执行的逻辑单元#

在事务执行的逻辑单元里面,需要进行待推送的事务消息记录的保存,也就是:本地(业务)逻辑和事务消息记录保存操作绑定在同一个事务

发送消息到RabbitMQ服务端这一步需要延后到事务提交之后,这样才能保证事务提交成功和消息成功发送到RabbitMQ服务端这两个操作是一致的。为了把保存待发送的事务消息发送消息到RabbitMQ两个动作从使用者感知角度合并为一个动作,这里需要用到Spring特有的事务同步器TransactionSynchronization,这里分析一下事务同步器的主要方法的回调位置,主要参考AbstractPlatformTransactionManager#commit()或者AbstractPlatformTransactionManager#processCommit()方法:

上图仅仅演示了事务正确提交的场景(不包含异常的场景)。这里可以明确知道,事务同步器TransactionSynchronizationafterCommit()afterCompletion(int status)方法都在真正的事务提交点AbstractPlatformTransactionManager#doCommit()之后回调,因此可以选用这两个方法其中之一用于执行推送消息到RabbitMQ服务端,整体的伪代码如下:

Copy

@Transactional
public Dto businessMethod(){
    business transaction code block ...
    // 保存事务消息
    [saveTransactionMessageRecord()]
    // 注册事务同步器 - 在afterCommit()方法中推送消息到RabbitMQ
    [register TransactionSynchronization,send message in method afterCommit()]
    business transaction code block ...
}

上面伪代码中,保存事务消息注册事务同步器两个步骤可以安插在事务方法中的任意位置,也就是说与执行顺序无关。

事务消息的补偿#

虽然之前提到笔者建议下游服务自理自身服务消费异常的场景,但是有些时候迫于无奈还是需要上游把对应的消息重新推送,这个算是特殊的场景。另外还有一个场景需要考虑:事务提交之后触发事务同步器TransactionSynchronizationafterCommit()方法失败。这是一个低概率的场景,但是在生产中一定会出现,一个比较典型的原因就是:事务提交完成后尚未来得及触发TransactionSynchronization#afterCommit()方法进行推送服务实例就被重启。如下图所示:

为了统一处理补偿推送的问题,使用了有限状态判断消息是否已经推送成功:

  • 在事务方法内,保存事务消息的时候,标记消息记录推送状态为处理中
  • 事务同步器接口TransactionSynchronizationafterCommit()方法的实现中,推送对应的消息到RabbitMQ,然后更变事务消息记录的状态为推送成功

还有一种极为特殊的情况是RabbitMQ服务端本身出现故障导致消息推送异常,这种情况下需要进行重试(补偿推送),经验证明短时间内的反复重试是没有意义的,故障的服务一般不会瞬时恢复,所以可以考虑使用指数退避算法进行重试,同时需要限制最大重试次数。

指数值、间隔值和最大重试次数上限需要根据实际情况设定,否则容易出现消息延时过大或者重试过于频繁等问题。

方案实施#

引入核心依赖:

Copy

<properties>
    <spring.boot.version>2.2.4.RELEASE</spring.boot.version>
    <redisson.version>3.12.1</redisson.version>
    <mysql.connector.version>5.1.48</mysql.connector.version>
</properties>
<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-dependencies</artifactId>
            <version>${spring.boot.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>
<dependencies>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>${mysql.connector.version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-jdbc</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-aop</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.redisson</groupId>
        <artifactId>redisson</artifactId>
        <version>${redisson.version}</version>
    </dependency>
</dependencies>

spring-boot-starter-jdbcmysql-connector-javaspring-boot-starter-aopMySQL事务相关,而spring-boot-starter-amqpRabbitMQ客户端的封装,redisson主要使用其分布式锁,用于补偿定时任务的加锁执行(以防止服务多个节点并发执行补偿推送)。

表设计#

事务消息模块主要涉及两张表,以MySQL为例,建表DDL如下:

Copy

CREATE TABLE `t_transactional_message`
(
    id                  BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
    create_time         DATETIME        NOT NULL DEFAULT CURRENT_TIMESTAMP,
    edit_time           DATETIME        NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    creator             VARCHAR(20)     NOT NULL DEFAULT ‘admin‘,
    editor              VARCHAR(20)     NOT NULL DEFAULT ‘admin‘,
    deleted             TINYINT         NOT NULL DEFAULT 0,
    current_retry_times TINYINT         NOT NULL DEFAULT 0 COMMENT ‘当前重试次数‘,
    max_retry_times     TINYINT         NOT NULL DEFAULT 5 COMMENT ‘最大重试次数‘,
    queue_name          VARCHAR(255)    NOT NULL COMMENT ‘队列名‘,
    exchange_name       VARCHAR(255)    NOT NULL COMMENT ‘交换器名‘,
    exchange_type       VARCHAR(8)      NOT NULL COMMENT ‘交换类型‘,
    routing_key         VARCHAR(255) COMMENT ‘路由键‘,
    business_module     VARCHAR(32)     NOT NULL COMMENT ‘业务模块‘,
    business_key        VARCHAR(255)    NOT NULL COMMENT ‘业务键‘,
    next_schedule_time  DATETIME        NOT NULL COMMENT ‘下一次调度时间‘,
    message_status      TINYINT         NOT NULL DEFAULT 0 COMMENT ‘消息状态‘,
    init_backoff        BIGINT UNSIGNED NOT NULL DEFAULT 10 COMMENT ‘退避初始化值,单位为秒‘,
    backoff_factor      TINYINT         NOT NULL DEFAULT 2 COMMENT ‘退避因子(也就是指数)‘,
    INDEX idx_queue_name (queue_name),
    INDEX idx_create_time (create_time),
    INDEX idx_next_schedule_time (next_schedule_time),
    INDEX idx_business_key (business_key)
) COMMENT ‘事务消息表‘;

CREATE TABLE `t_transactional_message_content`
(
    id         BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
    message_id BIGINT UNSIGNED NOT NULL COMMENT ‘事务消息记录ID‘,
    content    TEXT COMMENT ‘消息内容‘
) COMMENT ‘事务消息内容表‘;

因为此模块有可能扩展出一个后台管理模块,所以要把消息的管理和状态相关字段和大体积的消息内容分别存放在两个表,从而避免大批量查询消息记录的时候MySQL服务IO使用率过高的问题(这是和上一个公司的DBA团队商讨后得到的一个比较合理的方案)。预留了两个业务字段business_modulebusiness_key用于标识业务模块和业务键(一般是唯一识别号,例如订单号)。

一般情况下,如果服务通过配置自行提前声明队列和交换器的绑定关系,那么发送RabbitMQ消息的时候其实只依赖于exchangeNameroutingKey两个字段(header类型的交换器是特殊的,也比较少用,这里暂时不用考虑),考虑到服务可能会遗漏声明操作,发送消息的时候会基于队列进行首次绑定声明并且缓存相关的信息(RabbitMQ中的队列-交换器绑定声明只要每次声明绑定关系的参数一致,则不会抛出异常)。

方案代码设计#

下面的方案设计描述中,暂时忽略了消息事务管理后台的API设计,这些可以在后期补充。

定义贫血模型实体类TransactionalMessageTransactionalMessageContent

Copy

@Data
public class TransactionalMessage {

    private Long id;
    private LocalDateTime createTime;
    private LocalDateTime editTime;
    private String creator;
    private String editor;
    private Integer deleted;
    private Integer currentRetryTimes;
    private Integer maxRetryTimes;
    private String queueName;
    private String exchangeName;
    private String exchangeType;
    private String routingKey;
    private String businessModule;
    private String businessKey;
    private LocalDateTime nextScheduleTime;
    private Integer messageStatus;
    private Long initBackoff;
    private Integer backoffFactor;
}

@Data
public class TransactionalMessageContent {

    private Long id;
    private Long messageId;
    private String content;
}

然后定义dao接口(这里暂时不展开实现的细节代码,存储使用MySQL,如果要替换为其他类型的数据库,只需要使用不同的实现即可):

Copy

public interface TransactionalMessageDao {

    void insertSelective(TransactionalMessage record);

    void updateStatusSelective(TransactionalMessage record);

    List<TransactionalMessage> queryPendingCompensationRecords(LocalDateTime minScheduleTime,
                                                               LocalDateTime maxScheduleTime,
                                                               int limit);
}

public interface TransactionalMessageContentDao {

    void insert(TransactionalMessageContent record);

    List<TransactionalMessageContent> queryByMessageIds(String messageIds);
}

接着定义事务消息服务接口TransactionalMessageService

Copy

// 对外提供的服务类接口
public interface TransactionalMessageService {

    void sendTransactionalMessage(Destination destination, TxMessage message);
}

@Getter
@RequiredArgsConstructor
public enum ExchangeType {

    FANOUT("fanout"),

    DIRECT("direct"),

    TOPIC("topic"),

    DEFAULT(""),

    ;

    private final String type;
}

// 发送消息的目的地
public interface Destination {

    ExchangeType exchangeType();

    String queueName();

    String exchangeName();

    String routingKey();
}

@Builder
public class DefaultDestination implements Destination {

    private ExchangeType exchangeType;
    private String queueName;
    private String exchangeName;
    private String routingKey;

    @Override
    public ExchangeType exchangeType() {
        return exchangeType;
    }

    @Override
    public String queueName() {
        return queueName;
    }

    @Override
    public String exchangeName() {
        return exchangeName;
    }

    @Override
    public String routingKey() {
        return routingKey;
    }
}

// 事务消息
public interface TxMessage {

    String businessModule();

    String businessKey();

    String content();
}

@Builder
public class DefaultTxMessage implements TxMessage {

    private String businessModule;
    private String businessKey;
    private String content;

    @Override
    public String businessModule() {
        return businessModule;
    }

    @Override
    public String businessKey() {
        return businessKey;
    }

    @Override
    public String content() {
        return content;
    }
}

// 消息状态
@RequiredArgsConstructor
public enum TxMessageStatus {

    /**
     * 成功
     */
    SUCCESS(1),

    /**
     * 待处理
     */
    PENDING(0),

    /**
     * 处理失败
     */
    FAIL(-1),

    ;

    private final Integer status;
}

TransactionalMessageService的实现类是事务消息的核心功能实现,代码如下:

Copy

@Slf4j
@Service
@RequiredArgsConstructor
public class RabbitTransactionalMessageService implements TransactionalMessageService {

    private final AmqpAdmin amqpAdmin;
    private final TransactionalMessageManagementService managementService;

    private static final ConcurrentMap<String, Boolean> QUEUE_ALREADY_DECLARE = new ConcurrentHashMap<>();

    @Override
    public void sendTransactionalMessage(Destination destination, TxMessage message) {
        String queueName = destination.queueName();
        String exchangeName = destination.exchangeName();
        String routingKey = destination.routingKey();
        ExchangeType exchangeType = destination.exchangeType();
        // 原子性的预声明
        QUEUE_ALREADY_DECLARE.computeIfAbsent(queueName, k -> {
            Queue queue = new Queue(queueName);
            amqpAdmin.declareQueue(queue);
            Exchange exchange = new CustomExchange(exchangeName, exchangeType.getType());
            amqpAdmin.declareExchange(exchange);
            Binding binding = BindingBuilder.bind(queue).to(exchange).with(routingKey).noargs();
            amqpAdmin.declareBinding(binding);
            return true;
        });
        TransactionalMessage record = new TransactionalMessage();
        record.setQueueName(queueName);
        record.setExchangeName(exchangeName);
        record.setExchangeType(exchangeType.getType());
        record.setRoutingKey(routingKey);
        record.setBusinessModule(message.businessModule());
        record.setBusinessKey(message.businessKey());
        String content = message.content();
        // 保存事务消息记录
        managementService.saveTransactionalMessageRecord(record, content);
        // 注册事务同步器
        TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
            @Override
            public void afterCommit() {
                managementService.sendMessageSync(record, content);
            }
        });
    }
}

消息记录状态和内容持久化的管理统一放在TransactionalMessageManagementService中:

Copy

@Slf4j
@RequiredArgsConstructor
@Service
public class TransactionalMessageManagementService {

    private final TransactionalMessageDao messageDao;
    private final TransactionalMessageContentDao contentDao;
    private final RabbitTemplate rabbitTemplate;

    private static final LocalDateTime END = LocalDateTime.of(2999, 1, 1, 0, 0, 0);
    private static final long DEFAULT_INIT_BACKOFF = 10L;
    private static final int DEFAULT_BACKOFF_FACTOR = 2;
    private static final int DEFAULT_MAX_RETRY_TIMES = 5;
    private static final int LIMIT = 100;

    public void saveTransactionalMessageRecord(TransactionalMessage record, String content) {
        record.setMessageStatus(TxMessageStatus.PENDING.getStatus());
        record.setNextScheduleTime(calculateNextScheduleTime(LocalDateTime.now(), DEFAULT_INIT_BACKOFF,
                DEFAULT_BACKOFF_FACTOR, 0));
        record.setCurrentRetryTimes(0);
        record.setInitBackoff(DEFAULT_INIT_BACKOFF);
        record.setBackoffFactor(DEFAULT_BACKOFF_FACTOR);
        record.setMaxRetryTimes(DEFAULT_MAX_RETRY_TIMES);
        messageDao.insertSelective(record);
        TransactionalMessageContent messageContent = new TransactionalMessageContent();
        messageContent.setContent(content);
        messageContent.setMessageId(record.getId());
        contentDao.insert(messageContent);
    }

    public void sendMessageSync(TransactionalMessage record, String content) {
        try {
            rabbitTemplate.convertAndSend(record.getExchangeName(), record.getRoutingKey(), content);
            if (log.isDebugEnabled()) {
                log.debug("发送消息成功,目标队列:{},消息内容:{}", record.getQueueName(), content);
            }
            // 标记成功
            markSuccess(record);
        } catch (Exception e) {
            // 标记失败
            markFail(record, e);
        }
    }

    private void markSuccess(TransactionalMessage record) {
        // 标记下一次执行时间为最大值
        record.setNextScheduleTime(END);
        record.setCurrentRetryTimes(record.getCurrentRetryTimes().compareTo(record.getMaxRetryTimes()) >= 0 ?
                record.getMaxRetryTimes() : record.getCurrentRetryTimes() + 1);
        record.setMessageStatus(TxMessageStatus.SUCCESS.getStatus());
        record.setEditTime(LocalDateTime.now());
        messageDao.updateStatusSelective(record);
    }

    private void markFail(TransactionalMessage record, Exception e) {
        log.error("发送消息失败,目标队列:{}", record.getQueueName(), e);
        record.setCurrentRetryTimes(record.getCurrentRetryTimes().compareTo(record.getMaxRetryTimes()) >= 0 ?
                record.getMaxRetryTimes() : record.getCurrentRetryTimes() + 1);
        // 计算下一次的执行时间
        LocalDateTime nextScheduleTime = calculateNextScheduleTime(
                record.getNextScheduleTime(),
                record.getInitBackoff(),
                record.getBackoffFactor(),
                record.getCurrentRetryTimes()
        );
        record.setNextScheduleTime(nextScheduleTime);
        record.setMessageStatus(TxMessageStatus.FAIL.getStatus());
        record.setEditTime(LocalDateTime.now());
        messageDao.updateStatusSelective(record);
    }

    /**
     * 计算下一次执行时间
     *
     * @param base          基础时间
     * @param initBackoff   退避基准值
     * @param backoffFactor 退避指数
     * @param round         轮数
     * @return LocalDateTime
     */
    private LocalDateTime calculateNextScheduleTime(LocalDateTime base,
                                                    long initBackoff,
                                                    long backoffFactor,
                                                    long round) {
        double delta = initBackoff * Math.pow(backoffFactor, round);
        return base.plusSeconds((long) delta);
    }

    /**
     * 推送补偿 - 里面的参数应该根据实际场景定制
     */
    public void processPendingCompensationRecords() {
        // 时间的右值为当前时间减去退避初始值,这里预防把刚保存的消息也推送了
        LocalDateTime max = LocalDateTime.now().plusSeconds(-DEFAULT_INIT_BACKOFF);
        // 时间的左值为右值减去1小时
        LocalDateTime min = max.plusHours(-1);
        Map<Long, TransactionalMessage> collect = messageDao.queryPendingCompensationRecords(min, max, LIMIT)
                .stream()
                .collect(Collectors.toMap(TransactionalMessage::getId, x -> x));
        if (!collect.isEmpty()) {
            StringJoiner joiner = new StringJoiner(",", "(", ")");
            collect.keySet().forEach(x -> joiner.add(x.toString()));
            contentDao.queryByMessageIds(joiner.toString())
                    .forEach(item -> {
                        TransactionalMessage message = collect.get(item.getMessageId());
                        sendMessageSync(message, item.getContent());
                    });
        }
    }
}

这里有一点尚待优化:更新事务消息记录状态的方法可以优化为批量更新,在limit比较大的时候,批量更新的效率会更高。

最后是定时任务的配置类:

Copy

@Slf4j
@RequiredArgsConstructor
@Configuration
@EnableScheduling
public class ScheduleJobAutoConfiguration {

    private final TransactionalMessageManagementService managementService;

    /**
     * 这里用的是本地的Redis,实际上要做成配置
     */
    private final RedissonClient redisson = Redisson.create();

    @Scheduled(fixedDelay = 10000)
    public void transactionalMessageCompensationTask() throws Exception {
        RLock lock = redisson.getLock("transactionalMessageCompensationTask");
        // 等待时间5秒,预期300秒执行完毕,这两个值需要按照实际场景定制
        boolean tryLock = lock.tryLock(5, 300, TimeUnit.SECONDS);
        if (tryLock) {
            try {
                long start = System.currentTimeMillis();
                log.info("开始执行事务消息推送补偿定时任务...");
                managementService.processPendingCompensationRecords();
                long end = System.currentTimeMillis();
                long delta = end - start;
                // 以防锁过早释放
                if (delta < 5000) {
                    Thread.sleep(5000 - delta);
                }
                log.info("执行事务消息推送补偿定时任务完毕,耗时:{} ms...", end - start);
            } finally {
                lock.unlock();
            }
        }
    }
}

基本代码编写完,整个项目的结构如下:

最后添加两个测试类:

Copy

@RequiredArgsConstructor
@Component
public class MockBusinessRunner implements CommandLineRunner {

    private final MockBusinessService mockBusinessService;

    @Override
    public void run(String... args) throws Exception {
        mockBusinessService.saveOrder();
    }
}

@Slf4j
@RequiredArgsConstructor
@Service
public class MockBusinessService {

    private final JdbcTemplate jdbcTemplate;
    private final TransactionalMessageService transactionalMessageService;
    private final ObjectMapper objectMapper;

    @Transactional(rollbackFor = Exception.class)
    public void saveOrder() throws Exception {
        String orderId = UUID.randomUUID().toString();
        BigDecimal amount = BigDecimal.valueOf(100L);
        Map<String, Object> message = new HashMap<>();
        message.put("orderId", orderId);
        message.put("amount", amount);
        jdbcTemplate.update("INSERT INTO t_order(order_id,amount) VALUES (?,?)", p -> {
            p.setString(1, orderId);
            p.setBigDecimal(2, amount);
        });
        String content = objectMapper.writeValueAsString(message);
        transactionalMessageService.sendTransactionalMessage(
                DefaultDestination.builder()
                        .exchangeName("tm.test.exchange")
                        .queueName("tm.test.queue")
                        .routingKey("tm.test.key")
                        .exchangeType(ExchangeType.DIRECT)
                        .build(),
                DefaultTxMessage.builder()
                        .businessKey(orderId)
                        .businessModule("SAVE_ORDER")
                        .content(content)
                        .build()
        );
        log.info("保存订单:{}成功...", orderId);
    }
}

某次测试结果如下:

Copy

2020-02-05 21:10:13.287  INFO 49556 --- [           main] club.throwable.cm.MockBusinessService    : 保存订单:07a75323-460b-42cb-aa63-1a0a45ce19bf成功...

模拟订单数据成功保存,而且RabbitMQ消息在事务成功提交后正常发送到RabbitMQ服务端中,如RabbitMQ控制台数据所示。

小结#

事务消息模块的设计仅仅是使异步消息推送这个功能实现趋向于完备,其实一个合理的异步消息交互系统,一定会提供同步查询接口,这一点是基于异步消息没有回调或者没有响应的特性导致的。一般而言,一个系统的吞吐量和系统的异步化处理占比成正相关(这一点可以参考Amdahl‘s Law),所以在系统架构设计实际中应该尽可能使用异步交互,提高系统吞吐量同时减少同步阻塞带来的无谓等待。事务消息模块可以扩展出一个后台管理,甚至可以配合MicrometerPrometheusGrafana体系做实时数据监控。

本文demo项目仓库:rabbit-transactional-message

demo必须本地安装MySQLRedisRabbitMQ才能正常启动,本地必须新建一个数据库命名local

个人博客#

Throwable‘s Blog

(本文完 c-5-d e-a-20200202 疫情严重,马上要开始在家办公,少出门多看书)

原文地址:https://www.cnblogs.com/lonelyxmas/p/12303315.html

时间: 2024-10-10 12:19:18

一个基于RabbitMQ的可复用的事务消息方案的相关文章

RocketMQ实现事务消息方案

RocketMQ 是一个来自阿里巴巴的分布式消息中间件,于 2012 年开源,并在 2017 年正式成为 Apache 顶级项目.据了解,包括阿里云上的消息产品以及收购的子公司在内,阿里集团的消息产品全线都运行在 RocketMQ 之上,并且最近几年的双十一大促中,RocketMQ 都有抢眼表现.Apache RocketMQ 4.3之后的版本正式支持事务消息,为分布式事务实现提供了便利性支持. RocketMQ 事务消息设计则主要是为了解决 Producer 端的消息发送与本地事务执行的原子性

RocketMQ事务消息学习及刨坑过程

一.背景 MQ组件是系统架构里必不可少的一门利器,设计层面可以降低系统耦合度,高并发场景又可以起到削峰填谷的作用,从单体应用到集群部署方案,再到现在的微服务架构,MQ凭借其优秀的性能和高可靠性,得到了广泛的认可. 随着数据量增多,系统压力变大,开始出现这种现象:数据库已经更新了,但消息没发出来,或者消息先发了,但后来数据库更新失败了,结果研发童鞋各种数据修复,这种生产问题出现的概率不大,但让人很郁闷.这个其实就是数据库事务与MQ消息的一致性问题,简单来讲,数据库的事务跟普通MQ消息发送无法直接绑

C#编写了一个基于Lucene.Net的搜索引擎查询通用工具类:SearchEngineUtil

最近由于工作原因,一直忙于公司的各种项目(大部份都是基于spring cloud的微服务项目),故有一段时间没有与大家分享总结最近的技术研究成果的,其实最近我一直在不断的深入研究学习Spring.Spring Boot.Spring Cloud的各种框架原理,同时也随时关注着.NET CORE的发展情况及最新技术点,也在极客时间上订阅相关的专栏,只要下班有空我都会去认真阅读观看,纸质书箱也买了一些,总之近一年都是在通过:微信技术公众号(.NET.JAVA.算法.前端等技术方向).极客时间.技术书

一个基于redis和disque实现的轻量级异步任务执行器

简介 horae是一个基于redis和disque实现的轻量级.高性能的异步任务执行器,它的核心是disque提供的任务队列,而队列有先进先出的时序关系,顾得名:horae. horae : 时序女神,希腊神话中司掌季节时间和人间秩序的三女神,又译"荷莱". horae的关注点不是队列服务的实现本身(已经有不少队列服务的实现了),而是希望借助于redis与disque提供的纯内存的高性能的队列机制,实现一个异步任务执行器.它可以自由配置任务来自哪种队列服务,它不关注任务执行的最终状态(

一个基于SSM框架开发的高并发电商秒杀Web系统

0 前言 一个基于SSM框架的高并发秒杀系统采用IDEA+Maven+SSM+Mysql+Redis+Jetty.Bootstrap/Jquery开发. 通过这个小项目,理清了基于SSM框架开发Web应用的流程以及常见的避坑方法,并在最后简单采用了Redis缓存以及Mysql Procedure对项目进行了高并发优化. 接下来从DAO层.Service层.Web层开发以及高并发优化4个方面梳理整个项目开发过程. 源码地址https://github.com/Allegr0/seckill 项目准

基于CMS的组件复用实践

作者 个推高级前端开发工程师沈创目前前端项目大多基于Vue.React.Angular等框架来实现,这一类框架都有一个明显的特点:基于模块化以及组件化思维.所以,开发者在使用上述框架时,实际上是在写一个一个的组件,并且组件与组件之间呈嵌套的形式.当一个项目中多次出现同一功能时,他们会选择将其提取出来,并且放到components文件夹中,以达到复用的目的,但是这些复用都是基于同一项目的,所以,当写另一个项目时,又要开始写一些重复的代码. 个推拥有多条业务线,在进行前端项目时总会遇到重复写代码的困

ASP.NET Core2基于RabbitMQ对Web前端实现推送功能

在我们很多的Web应用中会遇到需要从后端将指定的数据或消息实时推送到前端,通常的做法是前端写个脚本定时到后端获取,或者借助WebSocket技术实现前后端实时通讯.因定时刷新的方法弊端很多(已不再采用),所以基于WebSocket技术实现的通讯方案正越来越受大家喜爱,于是在ASP.NET中就有了鼎鼎大名的Signalr.但Signalr不是咱们这里的主角,这里将给大家介绍另一套基于WebSocket的前后端通讯方案,可以给大家在应用中多一个选择. 准备 在开始动手前,咱们先简单介绍下方案的组成部

eShopOnContainers 是一个基于微服务的.NET Core示例框架

找到一个好的示例框架很难,但不是不可能.大多数是小型Todo风格的应用程序,通常基于SimpleCRUD.值得庆幸的是,Microsoft已经为eShopOnContainers创建了一个基于微服务的.NET Core示例应用程序. eShopOnContainers是 .NET Core示例应用框架,由Microsoft提供支持,基于简化的微服务架构和Docker容器技术. 这个示例应用程序在服务器和客户端是跨平台的,这要归功于.NET Core服务能够在Linux或Windows容器上运行,

基于RabbitMQ实现分布式延时任务调度

一.分布式延时任务 传统做法是将延时任务插入数据库,使用定时去扫描,比对任务是否到期,到期则执行并设置任务状态为完成.这种做法在分布式环境下还需要对定时扫描做特殊处理(加分布式锁)避免任务被重复执行. 然而使用RabbitMQ实现延时任务可以天然解决分布式环境下重复执行的问题(利用mq中消息只会被一个消费者消费这一特性可以让延时任务只会被一个消费者执行).基于RabbitMQ做延时任务的核心是利用RabbitMQ的消息到期转发特性.发送消息时设置消息到期时间,等消息到期未被消费时会将消息转发到一