示例代码可以从github上获取 https://github.com/git-simm/simm-framework.git
一、业务场景:
系统中存在一个盘库的功能,用户一次盘库形成一两万条的盘库明细单,一次性提交给服务器进行处理。服务器性能比较优越,平均也得运行30秒左右。性能上需要进行优化。
二、处理方案:
做过代码分析后,发现单线程逻辑没有什么优化空间。开始考虑引入多线程处理模型,用10个子线程进行任务切分处理。切分子线程问题需要考虑事务的一致性。10个子线程对应10个事务,需要保证所有事务一起提交或一起回滚。这里使用synchronized(wait,notifyall)机制做线程协作。
三、代码实现:
3.1、添加一个多线程协作标志类,用于做子线程运行状态统计,通知子线程做事务提交还是回滚的操作;
package simm.framework.threadutils.multi; import java.util.UUID; /** * 多线程结束标志 * 2018.09.22 by simm */ public class MultiEndFlag { private volatile boolean fired = false; //是否执行成功 private volatile boolean isAllSuccess = false; private volatile int threadCount = 0; private volatile int failCount = 0; /** * 初始化子线程的总数 * @param count */ public MultiEndFlag(int count){ threadCount = count; } public boolean isAllSuccess() { return isAllSuccess; } /** * 等待全部结束 * @param threadId * @param result */ public synchronized void waitForEnd(UUID threadId,int result){ //统计失败的线程个数 if(result==0){ failCount++; } threadCount--; while (!fired){ try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } /** * 执行结束通知 */ public synchronized void go(){ fired = true; //结果都显示成功 isAllSuccess = (failCount == 0); notifyAll(); } /** * 等待结束 */ public void end(){ while (threadCount > 0){ waitFunc(50); } System.out.println("线程全部执行完毕通知"); go(); } /** * 等待 */ private void waitFunc(long millis){ try { Thread.sleep(millis); } catch (InterruptedException e) { e.printStackTrace(); } } }
3.2、提供一个数据保存服务的接口定义,一个默认的子线程任务执行类(需要接收数据保存服务实现,业务数据,协作标志变量);
package simm.framework.threadutils.multi; import java.util.List; import java.util.UUID; /** * 保存服务接口 * 2018.09.22 by simm * @param <T> */ public interface ISaveService<T> { /** * 子线程批量保存方法 * @param list * @param endFlag * @param threadId * @return * @throws Exception */ Integer batchSave(List<T> list, MultiEndFlag endFlag, UUID threadId) throws Exception; }
package simm.framework.threadutils.multi; import java.util.List; import java.util.UUID; import java.util.concurrent.Callable; /** * 默认的执行任务 * 2018.09.22 by simm */ public class DefaultExecTask<T> implements Callable<Integer> { private List<T> list; private ISaveService saveService; private MultiEndFlag endFlag; private UUID threadId; /** * 盘库子任务 * @param saveService * @param notes * @param flag */ public DefaultExecTask(ISaveService saveService, List<T> notes, MultiEndFlag flag){ this.saveService = saveService; this.list = notes; this.endFlag = flag; this.threadId = UUID.randomUUID(); } @Override public Integer call() throws Exception { return saveService.batchSave(this.list,this.endFlag,this.threadId); } }
3.3、实现最核心的线程池分发子线程,并汇总结果通知子线程事务做最终的提交或回滚。线程池使用定长池 newFixedThreadPool,子线程使用futureTask,可接收返回值和异常信息。
package simm.framework.threadutils.multi; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; /** * 多线程切分执行器 * 2018.09.22 by simm */ public class MultiExecutor { private static int maxThreadCount = 10; /** * 执行方法(分批创建子线程) * @param saveService * @param notes * @param groupLen * @return * @throws ExecutionException * @throws InterruptedException */ public static <T> Boolean exec(ISaveService saveService,List<T> notes,int groupLen) throws ExecutionException, InterruptedException { if(notes==null || notes.size()==0) return true; //创建一个线程池,最大10个线程 ExecutorService executorService = Executors.newFixedThreadPool(maxThreadCount); List<Future<Integer>> futures = new ArrayList<>(); int noteSize = notes.size(); int batches = (int) Math.ceil(noteSize * 1.0 /groupLen); //分组超长最大线程限制,则设置分组数为10,计算分组集合尺寸 if(batches>maxThreadCount){ batches = maxThreadCount; groupLen = (int) Math.ceil(noteSize * 1.0 /batches); } System.out.println("总长度:"+noteSize+" 批次信息:"+batches+" 分组长度:"+groupLen); MultiEndFlag flag = new MultiEndFlag(batches); int startIndex, toIndex, maxIndex = notes.size(); for(int i=0;i<batches;i++){ startIndex = i * groupLen; toIndex = startIndex + groupLen; if(toIndex> maxIndex) { toIndex = maxIndex; } List<T> temp = notes.subList(startIndex,toIndex); if(temp == null || temp.size()==0) continue; futures.add(executorService.submit(new DefaultExecTask(saveService,temp,flag))); } flag.end(); //子线程全部等待返回(存在异常,则直接抛向主线程) for(Future<Integer> future:futures){ future.get(); } //所有线程返回后,关闭线程池 executorService.shutdown(); return true; } }
四、给出一个调用伪代码。需要注意的一点,子线程开启事务,这里使用@Transactional声明式事务,这要求服务的实体类需要通过spring的bean工厂创建,得到一个动态代理类,以达到支持事务拦截器的目的,保证注解的有效性。
package multi; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import simm.framework.threadutils.multi.DefaultExecTask; import simm.framework.threadutils.multi.ISaveService; import simm.framework.threadutils.multi.MultiEndFlag; import simm.framework.threadutils.multi.MultiExecutor; import java.util.ArrayList; import java.util.List; import java.util.UUID; import java.util.concurrent.ExecutionException; /** * 损益单保存服务 */ @Service public class DemoService implements ISaveService<NoteCheckBalance> { private static final Logger logger = LoggerFactory.getLogger(DefaultExecTask.class); @Autowired private NoteCheckBalanceMapper noteCheckBalanceMapper; /** * 业务保存 * @param list */ public void save(List<NoteCheckBalance> list){ for(NoteCheckBalance item :list){ noteCheckBalanceMapper.insert(item); } } /** * 批量保存事件 */ @Transactional(rollbackFor = Exception.class) @Override public Integer batchSave(List<NoteCheckBalance> list, MultiEndFlag endFlag, UUID threadId) throws Exception { int result = 0; try{ //业务操作 save(list); result = 1; //进行waitForEnd 操作,是为了确保所有的线程都最终通知同步协作标志 endFlag.waitForEnd(threadId ,result); //其他线程异常手工回滚 if(result==1 && !endFlag.isAllSuccess()){ String message = "子线程未全部执行成功,对线程["+threadId+"]进行回滚"; throw new Exception(message); } return result; }catch (Exception ex){ logger.error(ex.toString()); if(result ==0){ //本身线程异常抛出异常,通知已经做完(判断是为了防止 与 try块中的通知重复) endFlag.waitForEnd(threadId ,result); } throw ex; } } /** * 调用示例 * @param args * @throws ExecutionException * @throws InterruptedException */ public static void main(String[] args) throws ExecutionException, InterruptedException { //调用示例 MultiExecutor.exec(new DemoService(), new ArrayList<NoteCheckBalance>(),500); } }
参考文章
https://www.cnblogs.com/LipeiNet/p/6475851.html
原文地址:https://www.cnblogs.com/MrSi/p/9690937.html
时间: 2024-10-12 03:05:57