随着交易量逐步上升,业务越来越复杂,在设计整个帐务处理中考虑最终一致性的方案。
整个方案大概流程可以分为:
- 在业务完成后同步记录资金变动流水
- 有的业务场景需要实时处理的账务,则同步发出账务处理的异步消息
- 通过定时任务每分钟查询需要进行更新帐务的流水记录
- 启动线程池对每笔流水记录进行帐务更新
在大量数据需要更新的情况下,由于对于处理账务要求非常严格,所以在整个执行过程中需要引入很多技术手段才能达到快速并且正确记账。
- 和业务执行一起生成最关键的资金变动流水,需要和其他业务绑定在一个事务中,由于这是账务只需要insert流水,所以是一个执行开销很小的数据库操作。通过new DefaultTransactionDefinition() 来控制事务的提交。
//获取事务 DefaultTransactionDefinition definition = new DefaultTransactionDefinition(); TransactionStatus ts = transactionManager.getTransaction(definition); try { //依次相关业务数据再生成对应的账务流水 // step 1 更新其他相关A表数据 recordADao.update(recordA); // step 2 更新信用卡信息 amortizePaymentBiz.update(record); // step 3 更新子商户账务即插入流水 subAccountBiz.insertAccountHistory(record); // step 4 更新主账户即插入流水 accountPaymentBiz.insertAccountHistory(record); } //统一提交事务 transactionManager.commit(ts); } catch (Exception e) { //所有数据操作出现异常都会导致整个事务回滚 transactionManager.rollback(ts); log.error("账务新增数据异常"); throw e; }
- 执行账务插入流水,根据业务设计设置UNIQUE KEY索引确保不重复记账,账务记录会变更状态则需要生成对应的流水记录。对于需要加急处理的账务直接发送异步消息进行处理。
AccountProcessPending processPending = new AccountProcessPending(); processPending.setAccountProcessNo(requestVo.getAccountProcessNo()); //设置待处理状态,定时任务通过这个状态位来拉取记录进行真正的账务更新 processPending.setProcessStage(AccountProcessPendingStageEnum.PENDING.getValue()); //为了确保不重复记账,由几个关键字段信息拼接出唯一标识dataUnqKey,表设计DataUnqKey为 UNIQUE KEY索引 processPending.setDataUnqKey(dataUnqKey); ... //执行账务插入,因为需要更新ProcessStage状态位,这个表也会有对应的流水表记录 isSuccess = accountProcessPendingBiz.add(processPending); //需要加急处理,则通过MQ通知立即执行账务处理 if(isSuccess && isUrgent){ this.notifyDoAccountProcess(requestVo.getAccountProcessNo()); }
- 通过系统的定时任务调度在规定的时间去拉取待处理状态PENDING的AccountProcessPending记录。
- 在生产环境由于交易数据量很大定时任务启动频率会比较高,一般一分钟会启动一次,所以为了能够快速处理这里采用线程池来承接每次定时任务。
- 由于使用了线程池任务能够并发执行,这样会带来另外一个问题,账务记录可能会重复执行。即一条待处理的账务记录A,定时任务a拿到几千条记录包含了A,但是还没处理完毕这时定时任务b启动,会去搜索同样数量的待处理记录这是包含了A,这种情况A记录就会被重复记录两边,所以为了防止这种情况发生增加了资源锁来控制确保不会并发执行。String clientFlag = "appAccountNotify_asynchronousScheduleTask"; //通过数据库字段标识来实现同步锁机制 final String clientId = lockHelper.tryLock(resourceId, expireSecond, clientFlag); if(StringUtil.isEmpty(clientId)){ logger.info("asynchronousScheduleTask 获取锁失败,当前任务可能已经在执行!"); return; } //启动线程池拉取需要处理的账务记录 addTaskExecutor.execute(new Runnable() { @Override public void run() { try{ accountAsyncBiz.accountAsyncTaskExecute(); } finally { //数据处理完毕释放资源锁 lockHelper.unlock(resourceId, clientId, true); } } });
- 对于已经拉取到的PENDING记录,启动线程池更新每一笔记录的账务金额
- 每次获取最近三天的固定数量的记录,如果交易量大可能最近三天待处理的记录已经超过固定的处理数量,这样就需要等待当前已经查询的待处理记录全部处理完成后才继续查询下一批数据进行更新。
- 对于查询到的每批数据启动线程池来加快处理速度,对于整批记录通过countDownLatch来控制当前启动的所有线程执行完毕再去查询下一批待处理记录。//每次根据日期限定获取固定最多数量的记录 accountProcessNoList = listAccountProcessNo(fetchDate,num); if(accountProcessNoList == null || accountProcessNoList.size() <= 0){ isNeedChangeDate = true; }else{ //获取处理线程数量,可以设计在缓存中动态调整线程数量 int threadNum = this.getThreadNum(); //按照线程数据平均分配需要处理的accountProcessNoList Map<Integer, List<String>> groupMap = this.splitGroup(accountProcessNoList, threadNum); //通过countDownLatch功能来控制每批获取的accountProcessNoList全部处理完毕后才能处理下一批, //多线程同时执行,每次执行完一个线程任务时countDownLatch数量减一 CountDownLatch countDownLatch = new CountDownLatch(groupMap.size()); for(Map.Entry<Integer, List<String>> entry : groupMap.entrySet()){ Integer groupNum = entry.getKey(); List<String> list = entry.getValue(); //启动一个线程处理分配好的账务记录 ThreadTask task = new ThreadTask(groupNum, list, countDownLatch); task.start(); } //等待所有任务执行完毕,确保取的accountProcessNoList只有单线程执行 countDownLatch.await(); }
- 对于查询的每批数据被平均分配到固定的几个线程中执行,在执行过程中通过Redis分布式锁来控制每个具体账务进行同步更新
- 更新AccountProcessPending记录的状态,从最开始的待处理变更为处理中
- 使用Redis分布式锁给所有处理的商户账户加锁再进行账务更新
- 更新完毕后将AccountProcessPending状态变更为处理完毕//step 1 将所有的AccountProcessPending记录状态变更为处理中 try{ accountProcessBiz.beforeSyncExecute(requestVo, processVoList); }catch(Throwable e){ return false; } //step 2 给所有需要处理的账户都加锁 List<RLock> lockList = null; try{ //使用Redis分布式锁 lockList = redisClient.tryLock(...); }catch(Throwable e){ //加锁出现异常则回滚step 1中变更的状态 accountProcessBiz.beforeSyncExecuteRevert(requestVo, processVoList); } //setp 3 真正变更每个商户的账务金额 try{ isSuccess = accountProcessBiz.executeSync(requestVo, groupMap); }catch(Throwable ex){ //账户变更出现异常则需要释放锁 redisClient.unlock(lockList); } //setp 4 账务处理成功后变更AccountProcessPending的状态为处理完毕 try{ processResultId = accountProcessBiz.afterSyncExecute(...); }catch (Throwable e){ //异常处理 }
整个账务更新过程中为了达到不重复记账,在每个阶段都采用不用的技术手段进行同步操作,为了能够加快处理速度都会使用线程池来执行,这样最后达到了快速且正确执行账务操作。
原文地址:https://www.cnblogs.com/kma-3/p/12323017.html
时间: 2024-10-07 23:59:38