切分大任务成多个子任务(事务),汇总后统一提交或回滚

示例代码可以从github上获取 https://github.com/git-simm/simm-framework.git

一、业务场景:

  系统中存在一个盘库的功能,用户一次盘库形成一两万条的盘库明细单,一次性提交给服务器进行处理。服务器性能比较优越,平均也得运行30秒左右。性能上需要进行优化。

二、处理方案:

  做过代码分析后,发现单线程逻辑没有什么优化空间。开始考虑引入多线程处理模型,用10个子线程进行任务切分处理。切分子线程问题需要考虑事务的一致性。10个子线程对应10个事务,需要保证所有事务一起提交或一起回滚。这里使用synchronized(wait,notifyall)机制做线程协作。

三、代码实现:

  3.1、添加一个多线程协作标志类,用于做子线程运行状态统计,通知子线程做事务提交还是回滚的操作;

package simm.framework.threadutils.multi;

import java.util.UUID;

/**
 * 多线程结束标志
 * 2018.09.22 by simm
 */
public class MultiEndFlag {
    private volatile boolean fired = false;
    //是否执行成功
    private volatile boolean isAllSuccess = false;
    private volatile int threadCount = 0;
    private volatile int failCount = 0;

    /**
     * 初始化子线程的总数
     * @param count
     */
    public MultiEndFlag(int count){
        threadCount = count;
    }

    public boolean isAllSuccess() {
        return isAllSuccess;
    }

    /**
     * 等待全部结束
     * @param threadId
     * @param result
     */
    public synchronized void waitForEnd(UUID threadId,int result){
        //统计失败的线程个数
        if(result==0){
            failCount++;
        }
        threadCount--;
        while (!fired){
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 执行结束通知
     */
    public synchronized void go(){
        fired = true;
        //结果都显示成功
        isAllSuccess = (failCount == 0);
        notifyAll();
    }
    /**
     * 等待结束
     */
    public void end(){
        while (threadCount > 0){
            waitFunc(50);
        }
        System.out.println("线程全部执行完毕通知");
        go();
    }

    /**
     * 等待
     */
    private void waitFunc(long millis){
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

  3.2、提供一个数据保存服务的接口定义,一个默认的子线程任务执行类(需要接收数据保存服务实现,业务数据,协作标志变量);

package simm.framework.threadutils.multi;

import java.util.List;
import java.util.UUID;

/**
 * 保存服务接口
 * 2018.09.22 by simm
 * @param <T>
 */
public interface ISaveService<T> {
    /**
     * 子线程批量保存方法
     * @param list
     * @param endFlag
     * @param threadId
     * @return
     * @throws Exception
     */
    Integer batchSave(List<T> list, MultiEndFlag endFlag, UUID threadId) throws Exception;
}
package simm.framework.threadutils.multi;

import java.util.List;
import java.util.UUID;
import java.util.concurrent.Callable;

/**
 * 默认的执行任务
 * 2018.09.22 by simm
 */
public class DefaultExecTask<T> implements Callable<Integer> {
    private List<T> list;
    private ISaveService saveService;
    private MultiEndFlag endFlag;
    private UUID threadId;
    /**
     * 盘库子任务
     * @param saveService
     * @param notes
     * @param flag
     */
    public DefaultExecTask(ISaveService saveService, List<T> notes, MultiEndFlag flag){
        this.saveService = saveService;
        this.list = notes;
        this.endFlag = flag;
        this.threadId = UUID.randomUUID();
    }
    @Override
    public Integer call() throws Exception {
        return saveService.batchSave(this.list,this.endFlag,this.threadId);
    }
}

  3.3、实现最核心的线程池分发子线程,并汇总结果通知子线程事务做最终的提交或回滚。线程池使用定长池 newFixedThreadPool,子线程使用futureTask,可接收返回值和异常信息。

package simm.framework.threadutils.multi;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
 * 多线程切分执行器
 * 2018.09.22 by simm
 */
public class MultiExecutor {
    private static int maxThreadCount = 10;
    /**
     * 执行方法(分批创建子线程)
     * @param saveService
     * @param notes
     * @param groupLen
     * @return
     * @throws ExecutionException
     * @throws InterruptedException
     */
    public static <T> Boolean exec(ISaveService saveService,List<T> notes,int groupLen) throws ExecutionException, InterruptedException {
        if(notes==null || notes.size()==0) return true;
        //创建一个线程池,最大10个线程
        ExecutorService executorService = Executors.newFixedThreadPool(maxThreadCount);
        List<Future<Integer>> futures = new ArrayList<>();
        int noteSize = notes.size();
        int batches = (int) Math.ceil(noteSize * 1.0 /groupLen);
        //分组超长最大线程限制,则设置分组数为10,计算分组集合尺寸
        if(batches>maxThreadCount){
            batches = maxThreadCount;
            groupLen = (int) Math.ceil(noteSize * 1.0 /batches);
        }
        System.out.println("总长度:"+noteSize+"  批次信息:"+batches+"  分组长度:"+groupLen);
        MultiEndFlag flag = new MultiEndFlag(batches);
        int startIndex, toIndex, maxIndex = notes.size();
        for(int i=0;i<batches;i++){
            startIndex = i * groupLen;
            toIndex = startIndex + groupLen;
            if(toIndex> maxIndex) {
                toIndex = maxIndex;
            }
            List<T> temp = notes.subList(startIndex,toIndex);
            if(temp == null || temp.size()==0) continue;
            futures.add(executorService.submit(new DefaultExecTask(saveService,temp,flag)));
        }
        flag.end();
        //子线程全部等待返回(存在异常,则直接抛向主线程)
        for(Future<Integer> future:futures){
            future.get();
        }
        //所有线程返回后,关闭线程池
        executorService.shutdown();
        return true;
    }
}

四、给出一个调用伪代码。需要注意的一点,子线程开启事务,这里使用@Transactional声明式事务,这要求服务的实体类需要通过spring的bean工厂创建,得到一个动态代理类,以达到支持事务拦截器的目的,保证注解的有效性。

package multi;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import simm.framework.threadutils.multi.DefaultExecTask;
import simm.framework.threadutils.multi.ISaveService;
import simm.framework.threadutils.multi.MultiEndFlag;
import simm.framework.threadutils.multi.MultiExecutor;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutionException;

/**
 * 损益单保存服务
 */
@Service
public class DemoService implements ISaveService<NoteCheckBalance> {
    private static final Logger logger = LoggerFactory.getLogger(DefaultExecTask.class);
    @Autowired
    private NoteCheckBalanceMapper noteCheckBalanceMapper;

    /**
     * 业务保存
     * @param list
     */
    public void save(List<NoteCheckBalance> list){
        for(NoteCheckBalance item :list){
            noteCheckBalanceMapper.insert(item);
        }
    }
    /**
     * 批量保存事件
     */
    @Transactional(rollbackFor = Exception.class)
    @Override
    public Integer batchSave(List<NoteCheckBalance> list, MultiEndFlag endFlag, UUID threadId) throws Exception {
        int result = 0;
        try{
            //业务操作
            save(list);
            result = 1;
            //进行waitForEnd 操作,是为了确保所有的线程都最终通知同步协作标志
            endFlag.waitForEnd(threadId ,result);
            //其他线程异常手工回滚
            if(result==1 && !endFlag.isAllSuccess()){
                String message = "子线程未全部执行成功,对线程["+threadId+"]进行回滚";
                throw new Exception(message);
            }
            return result;
        }catch (Exception ex){
            logger.error(ex.toString());
            if(result ==0){
                //本身线程异常抛出异常,通知已经做完(判断是为了防止 与 try块中的通知重复)
                endFlag.waitForEnd(threadId ,result);
            }
            throw ex;
        }
    }

    /**
     * 调用示例
     * @param args
     * @throws ExecutionException
     * @throws InterruptedException
     */
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //调用示例
        MultiExecutor.exec(new DemoService(), new ArrayList<NoteCheckBalance>(),500);
    }
}

参考文章

https://www.cnblogs.com/LipeiNet/p/6475851.html

原文地址:https://www.cnblogs.com/MrSi/p/9690937.html

时间: 2024-10-12 03:05:57

切分大任务成多个子任务(事务),汇总后统一提交或回滚的相关文章

(MYSQL学习笔记4)事务的开启、提交、回滚

使用事务要注意以下三点: 1.在 MySQL 中只有使用了 Innodb 数据库引擎的数据库或表才支持事务. 2.事务处理可以用来维护数据库的完整性,保证成批的 SQL 语句要么全部执行,要么全部不执行. 3.事务用来管理 insert,update,delete 语句 MYSQL 事务处理主要有两种方法: 1.用 BEGIN, ROLLBACK, COMMIT来实现 BEGIN 开始一个事务 ROLLBACK 事务回滚 COMMIT 事务确认 2.直接用 SET 来改变 MySQL 的自动提交

【转】批量复制操作(SqlBulkCopy)的出错处理:事务提交、回滚

原文地址:http://blog.csdn.net/westsource/article/details/6658109 默认情况下,批量复制操作作为独立的操作执行. 批量复制操作以非事务性方式发生,不可能使其回滚. 如果需要在出错时回滚全部批量复制或它的一部分,可以使用 SqlBulkCopy 托管的事务,在现有事务中执行批量复制操作,或者在 System.Transactions Transaction 中登记它. 由于不同批次在不同事务中执行,因此,如果在批量复制操作期间发生错误,则当前批

SQL Server事务执行一半出错是否自动回滚整个事务 【转】

http://www.2cto.com/database/201308/234728.html SQL Server事务执行一半出错是否自动回滚整个事务 大家都知道SQL Server事务是单个的工作单元.如果某一事务成功,则在该事务中进行的所有数据修改均会提交,成为数据库中的永久组成部分.如果事务遇到错误且必须取消或回滚,则所有数据修改均被清除. 所以是不是说事务出错一定会回滚整个事物呢? 先看几个个例子: --createtable create table testrollback(idi

事务场景中,抛出异常被catch后,如果需要回滚,一定要手动回滚事务

Spring使用声明式事务处理,默认情况下,如果被注解的数据库操作方法中发生了unchecked异常,所有的数据库操作将rollback:如果发生的异常是checked异常,默认情况下数据库操作还是会提交的. checked异常: 表示无效,不是程序中可以预测的.比如无效的用户输入,文件不存在,网络或者数据库链接错误.这些都是外在的原因,都不是程序内部可以控制的.必须在代码中显式地处理.比如try-catch块处理,或者给所在的方法加上throws说明,将异常抛到调用栈的上一层. 阿里编码规约示

RocketMQ源码分析之RocketMQ事务消息实现原下篇(事务提交或回滚)

本文将重点分析RocketMQ Broker如何处理事务消息提交.回滚命令,根据前面的介绍,其入口EndTransactionProcessor#proce***equest: OperationResult result = new OperationResult();if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) { // @1result = this.brokerCont

关于调用方有事务,被调用的SP中也有事务,在嵌套SP中回滚代码的报错处理,好文推荐

SQL报错异常:Transaction count after EXECUTE indicates a mismatching number of BEGIN and COMMIT statements. Previous count = 1, current count = 0. --首先明确一点,在SQL中开启事务时,Begin Tran时,@@TRANCOUNT会加1,Commit Tran时@@TRANCOUNT会减1,但是当ROLLBACK TRAN时会把@@TranCount直接设置

mysql 实现事务的提交与回滚

最近要对数据库的数据进行一个定时迁移,为了防止在执行过程sql语句因为某些原因报错而导致数据转移混乱,因此要对我们的脚本加以事务进行控制. 首先我们建一张tran_test表 CREATE TABLE tran_test( f1 VARCHAR(10) NOT NULL, f2 INT(1) DEFAULT NULL, PRIMARY KEY (f1) )ENGINE=INNODB CHARSET=utf8 我想对tran_test插入两条数据,但是为了防止插入中报错,因此我要把插入语句控制在一

Spring学习笔记——Spring事务只对运行时异常回滚

我们在使用Spring时候一般都知道事务在遇到异常的时候会回滚,岂不知Spring的事务默认只有在发生运行时异常即:RunTimeException时才会发生事务,如果一个方法抛出Exception或者Checked异常Spring的事务并不会回滚. 下面我们来看看异常的分类,异常一般分为Checked异常和RunTime异常. CheckedException: Java认为Checked异常都是可以被处理的异常,所以Java程序必须显式的处理Checked异常,如果程序没有处理checked

关于SAP的事务提交和回滚(LUW)

1 Sap的更新的类型 在sap中,可以使用CALL FUNCTION ... IN UPDATE TASK将多个数据更新绑定到一个database LUW中.程序使用COMMIT WORK提交修改请求.在sap中将更新分四种类型: 1.1 Asynchronous Update(异步更新):在这种情况,调用程序提交事务不需要等待Update Work Process完成数据更新处理. 1.2 Updating Asynchronously in Steps(多步异步更新): 这种更新分两步.