帐务处理最终一致性方案

随着交易量逐步上升,业务越来越复杂,在设计整个帐务处理中考虑最终一致性的方案。

整个方案大概流程可以分为:

  • 在业务完成后同步记录资金变动流水
  • 有的业务场景需要实时处理的账务,则同步发出账务处理的异步消息
  • 通过定时任务每分钟查询需要进行更新帐务的流水记录
  • 启动线程池对每笔流水记录进行帐务更新

在大量数据需要更新的情况下,由于对于处理账务要求非常严格,所以在整个执行过程中需要引入很多技术手段才能达到快速并且正确记账。

  1. 和业务执行一起生成最关键的资金变动流水,需要和其他业务绑定在一个事务中,由于这是账务只需要insert流水,所以是一个执行开销很小的数据库操作。通过new DefaultTransactionDefinition() 来控制事务的提交。

    //获取事务
    DefaultTransactionDefinition definition = new DefaultTransactionDefinition();
    TransactionStatus ts = transactionManager.getTransaction(definition);
    try {
     //依次相关业务数据再生成对应的账务流水
    
     // step 1 更新其他相关A表数据
     recordADao.update(recordA);
     // step 2 更新信用卡信息
     amortizePaymentBiz.update(record);
     // step 3 更新子商户账务即插入流水
     subAccountBiz.insertAccountHistory(record);
     // step 4 更新主账户即插入流水
     accountPaymentBiz.insertAccountHistory(record);
     }
     //统一提交事务
     transactionManager.commit(ts);
    } catch (Exception e) {
     //所有数据操作出现异常都会导致整个事务回滚
     transactionManager.rollback(ts);
     log.error("账务新增数据异常");
     throw e;
    }
    
  2. 执行账务插入流水,根据业务设计设置UNIQUE KEY索引确保不重复记账,账务记录会变更状态则需要生成对应的流水记录。对于需要加急处理的账务直接发送异步消息进行处理。

    AccountProcessPending processPending = new AccountProcessPending();
    processPending.setAccountProcessNo(requestVo.getAccountProcessNo());
    //设置待处理状态,定时任务通过这个状态位来拉取记录进行真正的账务更新
    processPending.setProcessStage(AccountProcessPendingStageEnum.PENDING.getValue());
    //为了确保不重复记账,由几个关键字段信息拼接出唯一标识dataUnqKey,表设计DataUnqKey为 UNIQUE KEY索引
    processPending.setDataUnqKey(dataUnqKey);
    ...
    //执行账务插入,因为需要更新ProcessStage状态位,这个表也会有对应的流水表记录
    isSuccess  = accountProcessPendingBiz.add(processPending);
    //需要加急处理,则通过MQ通知立即执行账务处理
     if(isSuccess && isUrgent){
        this.notifyDoAccountProcess(requestVo.getAccountProcessNo());
     }
    

     

  3. 通过系统的定时任务调度在规定的时间去拉取待处理状态PENDING的AccountProcessPending记录。
    - 在生产环境由于交易数据量很大定时任务启动频率会比较高,一般一分钟会启动一次,所以为了能够快速处理这里采用线程池来承接每次定时任务。
    - 由于使用了线程池任务能够并发执行,这样会带来另外一个问题,账务记录可能会重复执行。即一条待处理的账务记录A,定时任务a拿到几千条记录包含了A,但是还没处理完毕这时定时任务b启动,会去搜索同样数量的待处理记录这是包含了A,这种情况A记录就会被重复记录两边,所以为了防止这种情况发生增加了资源锁来控制确保不会并发执行。
    String clientFlag = "appAccountNotify_asynchronousScheduleTask";
    //通过数据库字段标识来实现同步锁机制
    final String clientId = lockHelper.tryLock(resourceId, expireSecond, clientFlag);
    if(StringUtil.isEmpty(clientId)){
    	logger.info("asynchronousScheduleTask 获取锁失败,当前任务可能已经在执行!");
    	return;
    }
    //启动线程池拉取需要处理的账务记录
    addTaskExecutor.execute(new Runnable() {
    	@Override
    	public void run() {
    		try{
    			accountAsyncBiz.accountAsyncTaskExecute();
    		} finally {
    			//数据处理完毕释放资源锁
    			lockHelper.unlock(resourceId, clientId, true);
    		}
    	}
    });
  4. 对于已经拉取到的PENDING记录,启动线程池更新每一笔记录的账务金额
    - 每次获取最近三天的固定数量的记录,如果交易量大可能最近三天待处理的记录已经超过固定的处理数量,这样就需要等待当前已经查询的待处理记录全部处理完成后才继续查询下一批数据进行更新。
    - 对于查询到的每批数据启动线程池来加快处理速度,对于整批记录通过countDownLatch来控制当前启动的所有线程执行完毕再去查询下一批待处理记录。 
    //每次根据日期限定获取固定最多数量的记录
    accountProcessNoList = listAccountProcessNo(fetchDate,num);
    if(accountProcessNoList == null || accountProcessNoList.size() <= 0){
    	isNeedChangeDate = true;
    }else{
    	//获取处理线程数量,可以设计在缓存中动态调整线程数量
    	int threadNum = this.getThreadNum();
    	//按照线程数据平均分配需要处理的accountProcessNoList
    	Map<Integer, List<String>> groupMap = this.splitGroup(accountProcessNoList, threadNum);
    	//通过countDownLatch功能来控制每批获取的accountProcessNoList全部处理完毕后才能处理下一批,
    	//多线程同时执行,每次执行完一个线程任务时countDownLatch数量减一
    	CountDownLatch countDownLatch = new CountDownLatch(groupMap.size());
    	for(Map.Entry<Integer, List<String>> entry : groupMap.entrySet()){
    		Integer groupNum = entry.getKey();
    		List<String> list = entry.getValue();
    		//启动一个线程处理分配好的账务记录
    		ThreadTask task = new ThreadTask(groupNum, list, countDownLatch);
    		task.start();
    	}
    	//等待所有任务执行完毕,确保取的accountProcessNoList只有单线程执行
    	countDownLatch.await();
    }
    
  5. 对于查询的每批数据被平均分配到固定的几个线程中执行,在执行过程中通过Redis分布式锁来控制每个具体账务进行同步更新
    - 更新AccountProcessPending记录的状态,从最开始的待处理变更为处理中
    - 使用Redis分布式锁给所有处理的商户账户加锁再进行账务更新
    - 更新完毕后将AccountProcessPending状态变更为处理完毕

     //step 1 将所有的AccountProcessPending记录状态变更为处理中
    try{
    	accountProcessBiz.beforeSyncExecute(requestVo, processVoList);
    }catch(Throwable e){
    	return false;
    }
    
    //step 2 给所有需要处理的账户都加锁
    List<RLock> lockList = null;
    try{
    	//使用Redis分布式锁
    	lockList = redisClient.tryLock(...);
    }catch(Throwable e){
    	//加锁出现异常则回滚step 1中变更的状态
    	accountProcessBiz.beforeSyncExecuteRevert(requestVo, processVoList);
    }
    //setp 3 真正变更每个商户的账务金额
    try{
    	isSuccess = accountProcessBiz.executeSync(requestVo, groupMap);
    }catch(Throwable ex){
    	//账户变更出现异常则需要释放锁
    	redisClient.unlock(lockList);
    }
    
    //setp 4 账务处理成功后变更AccountProcessPending的状态为处理完毕
    try{
    	processResultId = accountProcessBiz.afterSyncExecute(...);
    }catch (Throwable e){
    	//异常处理
    }
    

    整个账务更新过程中为了达到不重复记账,在每个阶段都采用不用的技术手段进行同步操作,为了能够加快处理速度都会使用线程池来执行,这样最后达到了快速且正确执行账务操作。  

原文地址:https://www.cnblogs.com/kma-3/p/12323017.html

时间: 2024-10-07 23:59:38

帐务处理最终一致性方案的相关文章

分布式事务解决(5):可靠消息的最终一致性方案-消息重复发送问题与业务接口的幂等性设计

一.消息消费流程的异常分析与处理 1.1.消息消费流程的异常点 1.2.消息消费流程的异常处理 方法:对于未确认的消息,采用按规则重新投递的方式进行处理. 问题:消息的重复发送会导致业务处理接口出现重复调用的问题. 未完,待续....

分布式事务最终一致性常用方案

目前的应用系统,不管是企业级应用还是互联网应用,最终数据的一致性是每个应用系统都要面临的问题,随着分布式的逐渐普及,数据一致性更加艰难,但是也很难有银弹的解决方案,也并不是引入特定的中间件或者特定的开源框架能够解决的,更多的还是看业务场景,根据场景来给出解决方案.根据笔者最近几年的了解,总结了几个点,更多的应用系统在编码的时候,更加关注数据的一致性,这样系统才是健壮的. 一.基础理论 目前关于事务的几大理论包括:ACID事务特性,CAP分布式理论,以及BASE等.ACID在数据库事务中体现,CA

分布式事务一致性方案

http://www.infoq.com/cn/articles/solution-of-distributed-system-transaction-consistency 在OLTP系统领域,我们在很多业务场景下都会面临事务一致性方面的需求,例如最经典的Bob给Smith转账的案例.传统的企业开发,系统往往是以单体应用形式存在的,也没有横跨多个数据库.我们通常只需借助开发平台中特有数据访问技术和框架(例如Spring.JDBC.ADO.NET),结合关系型数据库自带的事务管理机制来实现事务性

分布式事务:两段式提交(最终一致性)

[MySQL如何实现分布式事务?] http://www.linuxidc.com/Linux/2013-10/91925.htm Innodb存储引擎支持XA事务,通过XA事务可以支持分布式事务的实现.分布式事务指的是允许多个独立的事务资源(transac tional resources)参与一个全局的事务中.事务资源通常是关系型数据库系统,也可以是其它类型的资源. 全局事务要求在其中所有参与的事务要么全部提交,要么全部回滚,这对于事务原有的ACID要求又有了提高.另外,在使用分布式事务时候

Eventually Consistent(最终一致性)(转)

应该说搞分布式系统必读的文章了,转过来,这是2008年12月Werner revise过的版本,先贴上内容简介:分布式系统的CAP理论 CAP理论(data consistency, system availability, and tolerance),也就是数据一致性,系统可用性和网络分区容错性,在一个分布式系统中CAP是不能同时保证的,最多只能同时满足两个.如果一个系统不必考虑网络分区容错性,那么它可以同时取得数据一致性和可用性,这通常可以通过处理协议来保证.    然而不考虑网络分区容错

分布式之数据库和缓存双写一致性方案解析

引言 为什么写这篇文章? 首先,缓存由于其高并发和高性能的特性,已经在项目中被广泛使用.在读取缓存方面,大家没啥疑问,都是按照下图的流程来进行业务操作. 但是在更新缓存方面,对于更新完数据库,是更新缓存呢,还是删除缓存.又或者是先删除缓存,再更新数据库,其实大家存在很大的争议.目前没有一篇全面的博客,对这几种方案进行解析.于是博主战战兢兢,顶着被大家喷的风险,写了这篇文章. 正文 先做一个说明,从理论上来说,给缓存设置过期时间,是保证最终一致性的解决方案.这种方案下,我们可以对存入缓存的数据设置

为什么分布式一定要有一致性方案?

0 引言为什么写这篇文章? 首先,缓存由于其高并发和高性能的特性,已经在项目中被广泛使用.在读取缓存方面,大家没啥疑问,都是按照下图的流程来进行业务操作.但是在更新缓存方面,对于更新完数据库,是更新缓存呢,还是删除缓存.又或者是先删除缓存,再更新数据库,其实大家存在很大的争议.目前没有一篇全面的博客,对这几种方案进行解析.于是博主战战兢兢,顶着被大家喷的风险,写了这篇文章. 1 正文先做一个说明,从理论上来说,给缓存设置过期时间,是保证最终一致性的解决方案.这种方案下,我们可以对存入缓存的数据设

布式之数据库和缓存双写一致性方案解析(转)

引言 为什么写这篇文章? 首先,缓存由于其高并发和高性能的特性,已经在项目中被广泛使用.在读取缓存方面,大家没啥疑问,都是按照下图的流程来进行业务操作.但是在更新缓存方面,对于更新完数据库,是更新缓存呢,还是删除缓存.又或者是先删除缓存,再更新数据库,其实大家存在很大的争议.目前没有一篇全面的博客,对这几种方案进行解析.于是博主战战兢兢,顶着被大家喷的风险,写了这篇文章. 文章结构 本文由以下三个部分组成1.讲解缓存更新策略2.对每种策略进行缺点分析3.针对缺点给出改进方案 正文 先做一个说明,

分布式系统最终一致性

前言 目前的应用系统,不管是企业级应用还是互联网应用,最终数据的一致性是每个应用系统都要面临的问题,随着分布式的逐渐普及,数据一致性更加艰难,但是也很难有银弹的解决方案,也并不是引入特定的中间件或者特定的开源框架能够解决的,更多的还是看业务场景,根据场景来给出解决方案.根据笔者最近几年的了解,总结了几个点,更多的应用系统在编码的时候,更加关注数据的一致性,这样系统才是健壮的. 基础理论相关 说起事务,目前的几个理论,ACID事务特性,CAP分布式理论,以及BASE等,ACID在数据库事务中体现,