tcc分布式事务框架解析

前言碎语

楼主之前推荐过2pc的分布式事务框架LCN。今天来详细聊聊TCC事务协议。

2pc实现:https://github.com/codingapi/tx-lcn

tcc实现:https://github.com/yu199195/hmily

首先我们了解下什么是tcc,如下图

tcc分布式事务协议控制整体业务事务分为三个阶段。

try:执行业务逻辑

confirm:确定业务逻辑执行无误后,确定业务逻辑执行完成

cancel:假如try阶段有问题,执行cancel阶段逻辑,取消try阶段的数据

这就需要我们在设计业务时,在try阶段多想想业务处理的折中状态,比如,处理中,支付中,进行中等,在confirm阶段变更为处理完成,或者在cancel阶段变更为处理失败。

下面以电商下单为例子:.

假设我们有一个电商下单的业务,有三个服务组成,订单服务处理下单逻辑,库存服务处理减库存逻辑,支付服务处理减账户余额逻辑。在下单服务里先后调用减库存和减余额的方法。如果使用tcc分布式事务来协调事务,我们服务就要做如下设计:

订单服务:

  • try:支付状态设置为支付中
  • confirm:设置为支付完成
  • cancel:设置为支付失败

库存服务:

多加一个锁定库存的字段记录,用于记录业务处理中状态

  • try:总库存-1,锁定库存+1
  • confirm:锁定库存-1
  • cancel:总库存+1,锁定库存-1

支付服务:

多加一个冻结金额的字段记录,用于记录业务处理中状态

  • try:余额-1,冻结金额+1
  • confirm:冻结金额-1
  • cancel:余额+1,冻结金额-1

tcc分布式事务在这里起到了一个事务协调者的角色。真实业务只需要调用try阶段的方法。confirm和cancel阶段的额方法由tcc框架来帮我们调用完成最终业务逻辑。下面我们假设如下三个场景的业务情况,看tcc如何协调业务最终一致的。

  1. 服务一切正常:所有服务的try方法执行后都没有问题,库存足够,余额足够。tcc事务协调器会触发订单服务的confirm方法,将订单更新为支付完成,触发库存服务的confirm方法锁定库存-1,触发支付服务的confirm方法冻结金额-1
  1. 库存服务故障,无法调通:这个时候订单已经生成,状态为待支付。当调用库存超时抛异常后,tcc事务协调器会触发订单服务的cancel方法将订单状态更新为支付失败。
  1. 支付服务故障,无法调通:这个时候订单已经生成,状态为待支付,总库存-1,锁定库存+1了。当调用支付服务超时抛异常时,tcc事务协调器会触发订单服务的cancel方法将订单状态更新为支付失败,触发库存服务的cancel方法将库存+1,锁定库存-1。

hmily事务框架怎么做的?

通过上面对tcc事务协议说明大家应该都了解了tcc的处理协调机制,下面我们来看看hmily是怎么做到的,我们以接入支持dubbo服务为例。

概要:首先最基础两个应用点是aop和dubbo的filter机制,其次针对一组事务,定义了启动事务处理器,参与事务处理器去协调处理不同的事务单元。外加一个disruptor+ScheduledService处理事务日志,补偿处理失败的事务。

hmily框架以@Hmily注解为切入点,定义了一个环绕织入的切面,注解必填两个参数confirmMethod和cancelMethod,也就是tcc协调的两个阶段方法。在需要tcc事务的方法上面加上这个注解,也就托管了tcc三个阶段的处理流程。下面是aspect切面的抽象类,不同的RPC框架支持会有不同的实现 。其中真正处理业务逻辑需要实现HmilyTransactionInterceptor接口

@Aspect
public abstract class AbstractHmilyTransactionAspect {

    private HmilyTransactionInterceptor hmilyTransactionInterceptor;

    protected void setHmilyTransactionInterceptor(final HmilyTransactionInterceptor hmilyTransactionInterceptor) {
        this.hmilyTransactionInterceptor = hmilyTransactionInterceptor;
    }

    /**
     * this is point cut with {@linkplain Hmily }.
     */
    @Pointcut("@annotation(org.dromara.hmily.annotation.Hmily)")
    public void hmilyInterceptor() {
    }

    /**
     * this is around in {@linkplain Hmily }.
     * @param proceedingJoinPoint proceedingJoinPoint
     * @return Object
     * @throws Throwable  Throwable
     */
    @Around("hmilyInterceptor()")
    public Object interceptTccMethod(final ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
        return hmilyTransactionInterceptor.interceptor(proceedingJoinPoint);
    }

    /**
     * spring Order.
     *
     * @return int
     */
    public abstract int getOrder();
}

dubbo的aspect抽象实现

@Aspect
@Component
public class DubboHmilyTransactionAspect extends AbstractHmilyTransactionAspect implements Ordered {

    @Autowired
    public DubboHmilyTransactionAspect(final DubboHmilyTransactionInterceptor dubboHmilyTransactionInterceptor) {
        super.setHmilyTransactionInterceptor(dubboHmilyTransactionInterceptor);
    }

    @Override
    public int getOrder() {
        return Ordered.HIGHEST_PRECEDENCE;
    }
}

dubbo的HmilyTransactionInterceptor实现

@Component
public class DubboHmilyTransactionInterceptor implements HmilyTransactionInterceptor {

    private final HmilyTransactionAspectService hmilyTransactionAspectService;

    @Autowired
    public DubboHmilyTransactionInterceptor(final HmilyTransactionAspectService hmilyTransactionAspectService) {
        this.hmilyTransactionAspectService = hmilyTransactionAspectService;
    }

  @Override
public Object interceptor(final ProceedingJoinPoint pjp) throws Throwable {
    final String context = RpcContext.getContext().getAttachment(CommonConstant.HMILY_TRANSACTION_CONTEXT);
    HmilyTransactionContext hmilyTransactionContext;
    //判断dubbo上下文中是否携带了tcc事务,如果有就取出反序列化为事务上下文对象
    if (StringUtils.isNoneBlank(context)) {
        hmilyTransactionContext = GsonUtils.getInstance().fromJson(context, HmilyTransactionContext.class);
        RpcContext.getContext().getAttachments().remove(CommonConstant.HMILY_TRANSACTION_CONTEXT);
    } else {
        //如果dubbo上下文中没有,就从当前上下文中获取。如果是事务发起者,这里其实也获取不到事务
        hmilyTransactionContext = HmilyTransactionContextLocal.getInstance().get();
    }
    return hmilyTransactionAspectService.invoke(hmilyTransactionContext, pjp);
}

}

这里主要判断了dubbo上下文中是否携带了tcc事务。如果没有就从当前线程上下文中获取,如果是事务的发起者,这里其实获取不到事务上下文对象的。在invoke里有个获取事务处理器的逻辑,如果事务上下文入参 为null,那么获取到的就是启动事务处理器。启动事务处理器处理逻辑如下

public Object handler(final ProceedingJoinPoint point, final HmilyTransactionContext context)
        throws Throwable {
    System.err.println("StarterHmilyTransactionHandler");
    Object returnValue;
    try {
        HmilyTransaction hmilyTransaction = hmilyTransactionExecutor.begin(point);
        try {
            //execute try
            returnValue = point.proceed();
            hmilyTransaction.setStatus(HmilyActionEnum.TRYING.getCode());
            hmilyTransactionExecutor.updateStatus(hmilyTransaction);
        } catch (Throwable throwable) {
            //if exception ,execute cancel
            final HmilyTransaction currentTransaction = hmilyTransactionExecutor.getCurrentTransaction();
            executor.execute(() -> hmilyTransactionExecutor
                    .cancel(currentTransaction));
            throw throwable;
        }
        //execute confirm
        final HmilyTransaction currentTransaction = hmilyTransactionExecutor.getCurrentTransaction();
        executor.execute(() -> hmilyTransactionExecutor.confirm(currentTransaction));
    } finally {
        HmilyTransactionContextLocal.getInstance().remove();
        hmilyTransactionExecutor.remove();
    }
    return returnValue;
}

真正业务处理方法,point.proceed();被try,catch包起来了,如果try里面的方法出现异常,就会走hmilyTransactionExecutor.cancel(currentTransaction)的逻辑,如果成功,就走hmilyTransactionExecutor.confirm(currentTransaction)逻辑。其中cancel和confirm里都有协调参与者事务的处理逻辑,以confirm逻辑为例。

public void confirm(final HmilyTransaction currentTransaction) throws HmilyRuntimeException {
    LogUtil.debug(LOGGER, () -> "tcc confirm .......!start");
    if (Objects.isNull(currentTransaction) || CollectionUtils.isEmpty(currentTransaction.getHmilyParticipants())) {
        return;
    }
    currentTransaction.setStatus(HmilyActionEnum.CONFIRMING.getCode());
    updateStatus(currentTransaction);
    final ListhmilyParticipants = currentTransaction.getHmilyParticipants();
    ListfailList = Lists.newArrayListWithCapacity(hmilyParticipants.size());
    boolean success = true;
    if (CollectionUtils.isNotEmpty(hmilyParticipants)) {
        for (HmilyParticipant hmilyParticipant : hmilyParticipants) {
            try {
                HmilyTransactionContext context = new HmilyTransactionContext();
                context.setAction(HmilyActionEnum.CONFIRMING.getCode());
                context.setRole(HmilyRoleEnum.START.getCode());
                context.setTransId(hmilyParticipant.getTransId());
                HmilyTransactionContextLocal.getInstance().set(context);
                executeParticipantMethod(hmilyParticipant.getConfirmHmilyInvocation());
            } catch (Exception e) {
                LogUtil.error(LOGGER, "execute confirm :{}", () -> e);
                success = false;
                failList.add(hmilyParticipant);
            } finally {
                HmilyTransactionContextLocal.getInstance().remove();
            }
        }
        executeHandler(success, currentTransaction, failList);
    }
}

可以看到executeParticipantMethod(hmilyParticipant.getConfirmHmilyInvocation()),这里执行了事务参与者的confirm方法。同理cancel里面也有类似代码,执行事务参与者的cancel方法。那么事务参与者的信息是怎么获取到的呢?我们需要回到一开始提到的dubbo的filter机制。

@Activate(group = {Constants.SERVER_KEY, Constants.CONSUMER})
public class DubboHmilyTransactionFilter implements Filter {

    private HmilyTransactionExecutor hmilyTransactionExecutor;

    /**
     * this is init by dubbo spi
     * set hmilyTransactionExecutor.
     *
     * @param hmilyTransactionExecutor {@linkplain HmilyTransactionExecutor }
     */
    public void setHmilyTransactionExecutor(final HmilyTransactionExecutor hmilyTransactionExecutor) {
        this.hmilyTransactionExecutor = hmilyTransactionExecutor;
    }

    @Override
    @SuppressWarnings("unchecked")
    public Result invoke(final Invoker invoker, final Invocation invocation) throws RpcException {
        String methodName = invocation.getMethodName();
        Class clazz = invoker.getInterface();
        Class[] args = invocation.getParameterTypes();
        final Object[] arguments = invocation.getArguments();
        converterParamsClass(args, arguments);
        Method method = null;
        Hmily hmily = null;
        try {
            method = clazz.getMethod(methodName, args);
            hmily = method.getAnnotation(Hmily.class);
        } catch (NoSuchMethodException e) {
            e.printStackTrace();
        }
        if (Objects.nonNull(hmily)) {
            try {
                final HmilyTransactionContext hmilyTransactionContext = HmilyTransactionContextLocal.getInstance().get();
                if (Objects.nonNull(hmilyTransactionContext)) {
                    if (hmilyTransactionContext.getRole() == HmilyRoleEnum.LOCAL.getCode()) {
                        hmilyTransactionContext.setRole(HmilyRoleEnum.INLINE.getCode());
                    }
                    RpcContext.getContext().setAttachment(CommonConstant.HMILY_TRANSACTION_CONTEXT, GsonUtils.getInstance().toJson(hmilyTransactionContext));
                }
                final Result result = invoker.invoke(invocation);
                //if result has not exception
                if (!result.hasException()) {
                    final HmilyParticipant hmilyParticipant = buildParticipant(hmilyTransactionContext, hmily, method, clazz, arguments, args);
                    if (hmilyTransactionContext.getRole() == HmilyRoleEnum.INLINE.getCode()) {
                        hmilyTransactionExecutor.registerByNested(hmilyTransactionContext.getTransId(),
                                hmilyParticipant);
                    } else {
                        hmilyTransactionExecutor.enlistParticipant(hmilyParticipant);
                    }
                } else {
                    throw new HmilyRuntimeException("rpc invoke exception{}", result.getException());
                }
                return result;
            } catch (RpcException e) {
                e.printStackTrace();
                throw e;
            }
        } else {
            return invoker.invoke(invocation);
        }
    }

    @SuppressWarnings("unchecked")
    private HmilyParticipant buildParticipant(final HmilyTransactionContext hmilyTransactionContext,
                                              final Hmily hmily,
                                              final Method method, final Class clazz,
                                              final Object[] arguments, final Class... args) throws HmilyRuntimeException {

        if (Objects.isNull(hmilyTransactionContext)
                || (HmilyActionEnum.TRYING.getCode() != hmilyTransactionContext.getAction())) {returnnull;}//获取协调方法String confirmMethodName = hmily.confirmMethod();if(StringUtils.isBlank(confirmMethodName)){
            confirmMethodName = method.getName();}String cancelMethodName = hmily.cancelMethod();if(StringUtils.isBlank(cancelMethodName)){
            cancelMethodName = method.getName();}HmilyInvocation confirmInvocation =newHmilyInvocation(clazz, confirmMethodName, args, arguments);HmilyInvocation cancelInvocation =newHmilyInvocation(clazz, cancelMethodName, args, arguments);//封装调用点returnnewHmilyParticipant(hmilyTransactionContext.getTransId(), confirmInvocation, cancelInvocation);}privatevoid converterParamsClass(finalClass[] args,finalObject[] arguments){if(arguments ==null|| arguments.length <1){return;}for(int i =0; i < arguments.length; i++){
            args[i]= arguments[i].getClass();}}}

需要注意三个地方。

  1. 一个是filter的group定义@Activate(group = {Constants.SERVER_KEY, Constants.CONSUMER}),这里这样定义后,就只有服务的消费者会生效,也就是事务的发起者,服务的调用方会进filter的invoke逻辑。
  1. 只有加@Hmily注解的方法或进事务处理逻辑,其他的方法直接跳过处理
  1. 最关键的是buildParticipant(hmilyTransactionContext, hmily, method, clazz, arguments, args)方法。dubbo的filter唯一的作用就是收集事务参与者信息并更新当前事务上线文信息。那么在事务协调时就能够从当前事务上线文里面获取到需要协调的事务参与者信息了。

参数者事务处理器

public Object handler(final ProceedingJoinPoint point, final HmilyTransactionContext context) throws Throwable {
    HmilyTransaction hmilyTransaction = null;
    HmilyTransaction currentTransaction;
    switch (HmilyActionEnum.acquireByCode(context.getAction())) {
        case TRYING:
            try {
                hmilyTransaction = hmilyTransactionExecutor.beginParticipant(context, point);
                final Object proceed = point.proceed();
                hmilyTransaction.setStatus(HmilyActionEnum.TRYING.getCode());
                //update log status to try
                hmilyTransactionExecutor.updateStatus(hmilyTransaction);
                return proceed;
            } catch (Throwable throwable) {
                //if exception ,delete log.
                hmilyTransactionExecutor.deleteTransaction(hmilyTransaction);
                throw throwable;
            } finally {
               HmilyTransactionContextLocal.getInstance().remove();
            }
        case CONFIRMING:
            currentTransaction = HmilyTransactionCacheManager.getInstance().getTccTransaction(context.getTransId());
            hmilyTransactionExecutor.confirm(currentTransaction);
            break;
        case CANCELING:
            currentTransaction = HmilyTransactionCacheManager.getInstance().getTccTransaction(context.getTransId());
            hmilyTransactionExecutor.cancel(currentTransaction);
            break;
        default:
            break;
    }
    Method method = ((MethodSignature) (point.getSignature())).getMethod();
    logger.error(HmilyActionEnum.acquireByCode(context.getAction()).getDesc());

    return DefaultValueUtils.getDefaultValue(method.getReturnType());
}

参与者事务处理器的逻辑比启动事务处理器要简单很多,try阶段记录事务日志用于事务补偿的时候使用。其他的confirm和cancel都是由启动事务管理器来触发调用执行的。这个地方之前纠结了楼主几个小时,怎么一个环绕织入的切面会被触发执行两次,其实是启动事务处理器里的confirm或cancel触发的。

disruptor+ScheduledService处理事务日志,补偿处理失败的事务

这个不细聊了,简述下。disruptor是一个高性能的队列。对事务日志落地的所有操作都是通过disruptor来异步完成的。ScheduledService默认128秒执行一次,来检查是否有处理失败的事务日志,用于补偿事务协调失败的事务

文末结语

相比较2pc的LCN而言,tcc分布式事务对业务侵入性更高。也因2pc的长时间占用事务资源,tcc的性能肯定比2pc要好。两者之间本身不存在谁优谁劣的问题。所以在做分布式事务选型时,选一个对的适合自身业务的分布式事务框架就比较重要了。

原文地址:https://www.cnblogs.com/jay-wu/p/10417449.html

时间: 2024-07-30 14:32:36

tcc分布式事务框架解析的相关文章

如何实现一个TCC分布式事务框架的一点思考

一个TCC事务框架需要解决的当然是分布式事务的管理.关于TCC事务机制的介绍,可以参考TCC事务机制简介.http://www.bytesoft.org/tcc-intro TCC事务模型虽然说起来简单,然而要基于TCC实现一个通用的分布式事务框架,却比它看上去要复杂的多,不只是简单的调用一下Confirm/Cancel业务就可以了的. 本文将以Spring容器为例,试图分析一下,实现一个通用的TCC分布式事务框架需要注意的一些问题. 一.TCC全局事务必须基于RM本地事务来实现全局事务 TCC

分布式事务原理解析

1. 分布式事务原理解析 1.1. TCC分布式事务 了解过TCC分布式事务的都知道它有三个阶段:try,confirm,cancel,但很多文章就只有原理图,和对原理图的解释,看一遍也留不下印象,这里用实际场景举个例子,说明TCC分布式事务原理 try阶段:假设我们又订单系统,它需要调用库存和积分系统,try阶段我们进行的是预处理,比如下单1个商品,在try操作中,我们在库存表设置个冻结字段,表示冻结1个商品,商品的存量先不扣除,而积分表同样添加个预增加积分字段,添加个预积分比如10 conf

终于有人把“TCC分布式事务”实现原理讲明白了!

之前网上看到很多写分布式事务的文章,不过大多都是将分布式事务各种技术方案简单介绍一下.很多朋友看了还是不知道分布式事务到底怎么回事,在项目里到底如何使用. 所以这篇文章,就用大白话+手工绘图,并结合一个电商系统的案例实践,来给大家讲清楚到底什么是 TCC 分布式事务. 首先说一下,这里可能会牵扯到一些 Spring Cloud 的原理,如果有不太清楚的同学,可以参考之前的文章:<拜托,面试请不要再问我Spring Cloud底层原理!>. 业务场景介绍 咱们先来看看业务场景,假设你现在有一个电

TCC分布式事务

转: https://www.cnblogs.com/jajian/p/10014145.html 终于有人把“TCC分布式事务”实现原理讲明白了! - JaJian - 博客园 之前网上看到很多写分布式事务的文章,不过大多都是将分布式事务各种技术方案简单介绍一下.很多朋友看了还是不知道分布式事务到底怎么回事,在项目里到底如何使用. 所以这篇文章,就用大白话+手工绘图,并结合一个电商系统的案例实践,来给大家讲清楚到底什么是 TCC 分布式事务. 首先说一下,这里可能会牵扯到一些 Spring C

分布式事务框架-seata初识

摘自:https://www.cnblogs.com/iceggboom/p/12144570.html 分布式事务框架-seata初识 一.事务与分布式事务 事务,在数据库中指的是操作数据库的最小单位,往大了看,事务是应用程序中一系列严密的操作,所有操作必须成功完成,否则在每个操作中所作的所有更改都会被撤消. 那为什么会有分布式事务呢?单机事务是通过将操作限制在一个会话内通过数据库本身的锁以及日志来实现ACID.因为引入了分布式架构,所以事务的参与者.支持事务的服务器.资源服务器以及事务管理器

基于Dubbo的分布式事务框架(LCN)

原文地址:http://原文地址:https://github.com/1991wangliang/transaction 该框架依赖Redis/dubbo/txManager服务.依赖第三方框架lorne_core 原理与功能 基于对spring tx PlatformTransactionManager的本地模块事务控制从而达到全局控制事务的目的.该框架兼容任何依赖PlatformTransactionManager的DB框架.利用三阶段提交的方式来确保事务的一致性,支持本地事务和分布式事务

MQ关于实现最终一致性分布式事务原理解析

本文讲述阿里云官方文档中关于通过MQ实现分布式事务最终一致性原理 概念介绍 事务消息:消息队列 MQ 提供类似 X/Open XA 的分布式事务功能,通过消息队列 MQ 事务消息能达到分布式事务的最终一致. 半事务消息:暂不能投递的消息,发送方已经成功地将消息发送到了消息队列 MQ 服务端,但是服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半事务消息. 消息回查:由于网络闪断.生产者应用重启等原因,导致某条事务消息的二次确认丢失,消息队列 MQ

分布式事务框架选型

个人发展建议:研究源码,知核心,知原理,清楚了解细节,甚至改造,自己写!! 阿里Fescar(GTS开源版本,GTS有阿里云服务)官方中文:https://github.com/alibaba/fescar/wiki/Home_Chinese 开源消息和与其他分布式框架(ByteTCC.LCN)对比:https://blog.csdn.net/xlgen157387/article/details/86289066 介绍和对比:https://blog.csdn.net/weixin_39800

【原创】分布式事务之TCC事务模型

引言 在上篇文章<老生常谈--利用消息队列处理分布式事务>一文中留了一个坑,今天来填坑.如下图所示 如果服务A和服务B之间是同步调用,比如服务C需要按流程调服务A和服务B,服务A和服务B要么一起成功,要么一起失败. 针对这种情况,目前业内普遍推荐使用TCC事务来解决的! 正文 ok,老规矩,我们先套一个业务场景进去,如下图所示 那页面点了支付按钮,调用支付服务,那我们后台要实现下面三个步骤 [1] 订单服务-修改订单状态 [2] 账户服务-扣减金钱 [3] 库存服务-扣减库存 达到事务的效果,