java并行调度框架封装及示例

参考资料:  阿里巴巴开源项目 CobarClient  源码实现。

分享作者:闫建忠

分享时间:2014年5月7日

---------------------------------------------------------------------------------------

并行调度封装类设计: BXexample.java

package org.hdht.business.ordermanager.quartzjob;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.commons.lang.exception.ExceptionUtils;
import org.springframework.dao.ConcurrencyFailureException;

public class BXexample {

	 private static ExecutorService createCustomExecutorService(int poolSize, final String method) {
	        int coreSize = Runtime.getRuntime().availableProcessors();//返回系统CUP数量
	        if (poolSize < coreSize) {
	            coreSize = poolSize;
	        }
	        ThreadFactory tf = new ThreadFactory() {
	            public Thread newThread(Runnable r) {
	                Thread t = new Thread(r, "thread created at BXexample method [" + method + "]");
	                t.setDaemon(true);
	                return t;
	            }
	        };
	        BlockingQueue<Runnable> queueToUse = new LinkedBlockingQueue<Runnable>();
	        final ThreadPoolExecutor executor = new ThreadPoolExecutor(coreSize, poolSize, 60,
	        		TimeUnit.SECONDS, queueToUse, tf, new ThreadPoolExecutor.CallerRunsPolicy());

	        return executor;
	 }

	public static <T> List<T> getSubListPage(List<T> list, int skip,int pageSize) {
		if (list == null || list.isEmpty()) {
			return null;
		}
		int startIndex = skip;
		int endIndex = skip + pageSize;
		if (startIndex > endIndex || startIndex > list.size()) {
			return null;
		}
		if (endIndex > list.size()) {
			endIndex = list.size();
		}
		return list.subList(startIndex, endIndex);
	}

	public static void BXfunction(Collection<?> paramCollection,final ExectueCallBack ecb){
		//构建执行器
		ExecutorService executor = createCustomExecutorService(Runtime.getRuntime().availableProcessors(), "batchExecuteProjection");
		try {
			//监视器
			final CountDownLatch latch = new CountDownLatch(paramCollection.size());
			final StringBuffer exceptionStaktrace = new StringBuffer();
			Iterator<?> iter = paramCollection.iterator();
			while (iter.hasNext()) {
				final Object entity = iter.next();
				Runnable task = new Runnable() {
					public void run() {
						try {
							ecb.doExectue(entity);
						} catch (Throwable t) {
							exceptionStaktrace.append(ExceptionUtils.getFullStackTrace(t));
						} finally {
							latch.countDown();
						}
					}
				};
				executor.execute(task);//并行调度
			}

			try {
				latch.await();//监视器等待所有线程执行完毕
			} catch (InterruptedException e) {
				//调度异常
				throw new ConcurrencyFailureException(
						"unexpected interruption when re-arranging parameter collection into sub-collections ",e);
			}
			if (exceptionStaktrace.length() > 0) {
				//业务异常
				throw new ConcurrencyFailureException(
						"unpected exception when re-arranging parameter collection, check previous log for details.\n"+ exceptionStaktrace);
			}

		} finally {
			executor.shutdown();//执行器关闭
		}
	}

}

回调接口类设计:ExectueCallBack.java

package org.hdht.business.ordermanager.quartzjob;

public interface ExectueCallBack {
	void doExectue(Object executor) throws Exception;
}

示例(hello 示例)

	public static void main(String[] args) {

		List<String> paramCollection  = new ArrayList<String>();
		paramCollection.add("9");
		paramCollection.add("2");
		paramCollection.add("18");
		paramCollection.add("7");
		paramCollection.add("6");
		paramCollection.add("1");
		paramCollection.add("3");
		paramCollection.add("4");
        paramCollection.add("14");
		paramCollection.add("13");

		int freesize = 3;//当前处理能力

		for(int i=0;i<paramCollection.size();i=i+freesize){

			List<String> tl = BXexample.getSubListPage(paramCollection, i, freesize);

			BXexample.BXfunction(tl,new ExectueCallBack() {
	            public void doExectue(Object executor) throws Exception {
	            	int k = Integer.parseInt((String)executor);

	            	for(int i=0;i<k*10000000;i++){
	                    //执行循环
	            	}
	            	System.out.println(k+":hello world");
	            }
	        });

		}
	}

示例(实际业务应用示例)

/**
		 * 并行调度相关处理
		 *
		 * 按卫星*日期 ,将待处理的任务分解为 卫星+日期 粒度的子任务 添加到paramMapList列表中
		 */
		List<Map<String, Object>> paramMapList = new ArrayList<Map<String, Object>>();
		for (Iterator<OrderParamSatellite> iterator = paramSatellites.iterator(); iterator.hasNext();) {
			OrderParamSatellite paramSatellite = iterator.next();

			paramMapList.addAll(this.getParamMapList(paramSatellite));
		}

		//根据集群最大处理能力,分页处理任务列表,作为list截取的步长

		int fsize = HostServerQueue.getInstance().freeSize();
		for(int i=0;i<paramMapList.size();i=i+fsize){
			List<Map<String, Object>> tl = BXexample.getSubListPage(paramMapList, i,  fsize);
			//并行调度
			BXexample.BXfunction(tl,new ExectueCallBack(){
	            	public void doExectue(Object executor) throws Exception {
	            		ExecuteOrderBTask((Map<String, Object>)executor);
	            	}
	        });

			//动态查找空闲节点数量,即集群最大处理能力
			fsize = HostServerQueue.getInstance().freeSize();
		}

java并行调度框架封装及示例,布布扣,bubuko.com

时间: 2024-08-09 22:00:55

java并行调度框架封装及示例的相关文章

JAVA并行框架Fork/Join(一):简介和代码示例

一.背景 虽然目前处理器核心数已经发展到很大数目,但是按任务并发处理并不能完全充分的利用处理器资源,因为一般的应用程序没有那么多的并发处理任务.基于这种现状,考虑把一个任务拆分成多个单元,每个单元分别得到执行,最后合并每个单元的结果. Fork/Join框架是JAVA7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架.它非常类似于HADOOP提供的MapReduce框架,只是MapReduce的任务可以针对集群内的所有计算节点,可以

java计划任务调度框架quartz结合spring实现调度的配置实例代码分享

点击链接加入群[JavaEE(SSH+IntelliJIDE+Maven)]:http://jq.qq.com/?_wv=1027&k=L2rbHv 一:quartz简介 OpenSymphony 的Quartz提供了一个比较完美的任务调度解决方案. Quartz 是个开源的作业调度框架,定时调度器,为在 Java 应用程序中进行作业调度提供了简单却强大的机制. Quartz中有两个基本概念:作业和触发器.作业是能够调度的可执行任务,触发器提供了对作业的调度 二:quartz spring配置详

分布式开源调度框架TBSchedule原理与应用

主要内容: 第一部分 TBSchedule基本概念及原理 1. 概念介绍 2. 工作原理 3. 源代码分析 4. 与其它开源调度框架对照 第二部分 TBSchedule分布式调度演示样例 1. TBSchedule源代码下载 2. 引入源代码Demo开发演示样例 3. 控制台配置任务调度 4. selectTasks方法參数说明 5. 创建调度策略參数说明 6. 创建任务參数说明 第一部分 TBSchedule基本概念及原理 1. 概念介绍 TBSchedule是一个支持分布式的调度框架.能让一

并行编程框架 ForkJoin

本文假设您已经了解一般并行编程知识,了解Java concurrent部分如ExecutorService等相关内容. 虽说是Java的ForkJoin并行框架,但不要太在意Java,其中的思想在其它语言环境也是同样适用的.因为并发编程在本质上是一样的.就好像如何找到优秀的Ruby程序员?其实要找的只是一个优秀的程序员.当然,如果语言层面直接支持相关的语义会更好. 引言 Java 语言从一开始就支持线程和并发性语义.Java5增加的并发工具又解决了一般应用程序的并发需求,Java6.Java7又

详解应对平台高并发的分布式调度框架TBSchedule

tbschedule是一款非常优秀的高性能分布式调度框架,非常高兴能分享给大家.这篇文章是我结合多年tbschedule使用经验和研读三遍源码的基础上完成的,期间和阿里空玄有过不少技术交流,非常感谢空玄给予的大力支持.我写这篇文章的目的一是出于对tbschedule的一种热爱,二是现在是一个资源共享.技术共享的时代,希望把它展现给大家(送人玫瑰,手留余香),能给大家的工作带来帮助. 一.tbschedule初识 时下互联网和电商领域,各个平台都存在大数据.高并发的特点,对数据处理的要求越来越高,

JavaSE中线程与并行API框架学习笔记——线程为什么会不安全?

前言:休整一个多月之后,终于开始投简历了.这段时间休息了一阵子,又病了几天,真正用来复习准备的时间其实并不多.说实话,心里不是非常有底气. 这可能是学生时代遗留的思维惯性--总想着做好万全准备才去做事.当然,在学校里考试之前当然要把所有内容学一遍和复习一遍.但是,到了社会里做事,很多时候都是边做边学.应聘如此,工作如此,很多的挑战都是如此.没办法,硬着头皮上吧. 3.5 线程的分组管理 在实际的开发过程当中,可能会有多个线程同时存在,这对批量处理有了需求.这就有点像用迅雷下载电视剧,假设你在同时

业调度框架_Quartz

什么是Quartz Quartz是一个完全由Java编写的开源作业调度框架,为在Java应用程序中进行作业调度提供了简单却强大的机制.Quartz允许开发人员根据时间间隔来调度作业.它实现了作业和触发器的多对多的关系,还能把多个作业与不同的触发器关联.简单地创建一个org.quarz.Job接口的Java类,Job接口包含唯一的方法:     public void execute(JobExecutionContext context) throws JobExecutionException

Java Junit测试框架

Java    Junit测试框架 1.相关概念 ? JUnit:是一个开发源代码的Java测试框架,用于编写和运行可重复的测试.它是用于单元测试框架体系xUnit的一个实例(用于java语言).主要用于白盒测试,回归测试. ? 白盒测试:把测试对象看作一个打开的盒子,程序内部的逻辑结构和其他信息对测试人 员是公开的. ? 回归测试:软件或环境的修复或更正后的再测试,自动测试工具对这类测试尤其有用. ? 单元测试:最小粒度的测试,以测试某个功能或代码块.一般由程序员来做,因为它需要知道内部程序设

JAVA并行程序基础

JAVA并行程序基础 一.有关线程你必须知道的事 进程与线程 在等待面向线程设计的计算机结构中,进程是线程的容器.我们都知道,程序是对于指令.数据及其组织形式的描述,而进程是程序的实体. 线程是轻量级的进程,是程序执行的最小单位.(PS:使用多线程去进行并发程序的设计,是因为线程间的调度和切换成本远小于进程) 线程的状态(Thread的State类): NEW–刚刚创建的线程,需要调用start()方法来执行线程: RUNNABLE–线程处于执行状态: BLOCKED–线程遇到synchroni