Java并发包——Blockingqueue,ConcurrentLinkedQueue,Executors

背景

通过做以下一个小的接口系统gate,了解一下mina和java并发包里的东西。A系统为javaweb项目,B为C语言项目,gate是本篇须要完毕的系统。

需求

1. A为集群系统,并发较高,会批量发送给gate消息,而且接受gate返回的消息。

2. gate独立部署,将从A接受到的消息压入队列,与B建立连接后,将每条消息验证签名等工作后,发送给B。须要保证性能;

3. B负责处理消息,并返回处理结果,B为gate提供提供六个port,一个port可有三个长连接(须由gate发送心跳保持长连接,否则超时切断连接)。

实例

项目中用到了两个框架mina2.0.7和axis2。首先,gate须要接收从A发送过来的消息。为保证消息顺序性。压入队列中。为保证性能。将队列中的消息通过不同的连接发送至B,这让我们非常快就想到了多线程中生产者消费者的那张图,而且这是一个生产者。多个消费者,以下我们来看代码。

首先,gate作为服务端,要为A提供一个接口,使用axis2完毕了。关于webservice就不必多说,可看我前面的博客。配置例如以下:

<serviceGroup>
<service name="sendService" scope="application">
    <description>
        SendService
    </description>
    <messageReceivers>
        <messageReceiver mep="http://www.w3.org/2004/08/wsdl/in-only" class="org.apache.axis2.rpc.receivers.RPCInOnlyMessageReceiver"/>
        <messageReceiver mep="http://www.w3.org/2004/08/wsdl/in-out" class="org.apache.axis2.rpc.receivers.RPCMessageReceiver"/>
    </messageReceivers>
    <parameter name="ServiceClass">
        cn.net.easyway.customer.SendService
    </parameter>
</service>
</serviceGroup>

以下是服务实现类:

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import cn.net.easyway.nds.MsgConsumer;
import cn.net.easyway.nds.MsgProducer;

/**
 * 为用户管理系统提供服务接口
 * @author yuanfubiao
 *
 */
public class SendService {

	private static Log logger = LogFactory.getLog(SendService.class);

	private static int num = 0;
	//消息队列
	private static LinkedBlockingQueue<String> msgQueue = new LinkedBlockingQueue<String>();
	//生产者线程池
	private static ExecutorService executorProducer = Executors.newFixedThreadPool(20); //创建20个线程。应对并发较高的情况
	//消费者线程池
	private static ExecutorService executorCustomer = Executors.newFixedThreadPool(18); //和连接数相应

	/**
	 * 放入消息
	 * @param list 消息列表
	 */
	public void putMsg(List<String> list){

		//将消息放入队列
		executorProducer.execute(new MsgProducer(msgQueue,list));

		//取出消息:数据量大,启用全部线程
		if(list.size() > 18){
			for(int i=0;i<18;i++){
				executorCustomer.execute(new MsgConsumer(msgQueue));
			}
		}else{
			executorCustomer.execute(new MsgConsumer(msgQueue));
		}
	}
}

Java并发包为我们提供了非常多有用的多线程东西,因此没有必要自己去实现一个队列和线程池。如上面代码我们用到的队列是LinkedBlockingQueue,他为线程安全的堵塞队列。多线程操作时不必为了同步而担心。而且会将进出两边自己主动负载,他实现自BlockingQueue接口。

从jdk中能够看到实现BlockingQueue接口的还有ArrayBlockingQueue,DelayQueue,
LinkedBlockingDeque,LinkedBlockingQueue,LinkedTransferQueue,PriorityBlockingQueue,SynchronousQueue;此接口就是提供一个堵塞队列,从api中我们看到例如以下一张图:

Throwsexception:当队列已满。再次加入会抛出错误,取数据也是如此。

Specialvalue:加入或取出时会有一个返回值;

Blocks:是在队列已满或为空时,会一直堵塞;

Time Out:指堵塞到一定时间。线程退出。

当中,另一个并发队列也是作为生产者消费者的首选:ConcurrentLinkedQueue,它是非堵塞队列。肯定就不是出自Blockingqueue接口,而是出自AbstractQueue,因此也就没有put和take方法,使用这个并发队列须要有两点注意:第一。推断是否为空尽量使用isEmpty方法。不要用size()。有人測试过size方法非常耗费时间。第二就是线程问题。尽管ConcurrentLinkedQueue是线程安全的,可是仅仅负责原子性的。就是说当你操作queue.add()
or queue.poll的时候是安全的。当并发量较大时,你在使用queue.isEmpty时还不为空,但就在这空当有可能就运行poll操作。导致队列为空引起异常,可用例如以下代码:

synchronized(queue) {
    if(!queue.isEmpty()) {
       queue.poll();
    }
}

在gate中。我定义了两个线程池。一个是生产者。还有一个是消费者:

//生产者线程池
	private static ExecutorService executorProducer = Executors.newFixedThreadPool(20); //创建20个线程。应对并发较高的情况
	//消费者线程池
	private static ExecutorService executorCustomer = Executors.newFixedThreadPool(18); 

Executors提供了一个工厂方法,用来创建线程池。返回的线程池都实现了ExecutorService接口,能够创建例如以下线程池:

newCachedThreadPool():创建一个可缓存的线程池,调用execute将重用曾经构造的线程。假设如今线程没有可用的,则创建一个新线程加入到池中,终止并从缓存中溢出那些已有60秒未被使用的线程;

newFixedThreadPool(intnThreads):创建固定的线程;

newScheduledThreadPool(intcorePoolSize):创建一个支持定时及周期性的任务运行的线程池;

newSingleThreadExecutor():创建一个单线程的Executor。

启动线程。有两个方法。一个是execute(),还有一个是submit(),后者是有返回值的,会将运行的结果Future返回,关于Future可移步这里

以下就是生产者和消费者代码:

生产者:

import java.util.Iterator;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * 向队列加入消息
 * @author yuanfubiao
 *
 */
public class MsgProducer implements Runnable {

	private LinkedBlockingQueue<String> msgQueue;

	private List<String> message;

	public MsgProducer(LinkedBlockingQueue<String> queue,List<String> msg) {
		this.msgQueue = queue;
		this.message = msg;
	}

	@Override
	public void run() {
		Iterator<String> iter = message.iterator();
		while(iter.hasNext()){
			String msg = iter.next();
			try {
				msgQueue.put(msg);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
}

消费者:

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.LinkedBlockingQueue;

import nds.framework.security.NDSMD5;

import org.apache.commons.codec.binary.Hex;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.mina.core.session.IoSession;

/**
 * 从消息队列取出消息
 * @author yuanfubiao
 *
 */
public class MsgConsumer implements Runnable{

	private static Log logger = LogFactory.getLog(MsgConsumer.class);
	private LinkedBlockingQueue<String> msgQueue;

	public MsgConsumer(LinkedBlockingQueue<String> queue) {
		this.msgQueue = queue;
	}

	@Override
	public void run() {
		while(!msgQueue.isEmpty()){

			String msg = null;
			try {
				msg = msgQueue.take();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}

			if(null == msg){
				return;
			}

			//增加时间
			SimpleDateFormat format = new SimpleDateFormat("yyyyMMddHHmmss");
			String now = format.format(new Date());
			String prefix = msg.substring(0, 19);
			String suffix = msg.substring(33, msg.length());
			String packet = prefix.trim() + now.trim() + suffix.trim();

			//签名部分忽略
		        //TODO
			String newStr = packet  // 签名 + signature.toUpperCase().trim();
			//关于mina,可见我下篇文章
			IoSession session = SessionPool.getSession(newStr.substring(13, 15));
			logger.info("发送数据:" + newStr);
			session.write(newStr);

			try {
				Thread.sleep(1000); //等待一秒
			} catch (InterruptedException e1) {
				e1.printStackTrace();
			}
		}
	}
}

源代码下载:http://download.csdn.net/detail/stubbornpotatoes/7438435

好文推荐:http://blog.csdn.net/defonds/article/details/44021605

时间: 2024-10-13 16:20:22

Java并发包——Blockingqueue,ConcurrentLinkedQueue,Executors的相关文章

java并发包小结(一)

java.util.concurrent 包含许多线程安全.高性能的并发构建块.换句话讲,创建 java.util.concurrent 的目的就是要实现 Collection 框架对数据结构所执行的并发操作.通过提供一组可靠的.高性能并发构建块,开发人员可以提高并发类的线程安全.可伸缩性.性能.可读性和可靠性. JDK 5.0 中的并发改进可以分为三组:    1. JVM 级别更改.大多数现代处理器对并发对某一硬件级别提供支持,通常以 compare-and-swap (CAS)指令形式.C

java并发包&amp;线程池原理分析&amp;锁的深度化

      java并发包&线程池原理分析&锁的深度化 并发包 同步容器类 Vector与ArrayList区别 1.ArrayList是最常用的List实现类,内部是通过数组实现的,它允许对元素进行快速随机访问.数组的缺点是每个元素之间不能有间隔,当数组大小不满足时需要增加存储能力,就要讲已经有数组的数据复制到新的存储空间中.当从ArrayList的中间位置插入或者删除元素时,需要对数组进行复制.移动.代价比较高.因此,它适合随机查找和遍历,不适合插入和删除. 2.Vector与Arra

【java基础】Java并发包

Java并发包 一.并发包的结构 java并发包中共分为五类: 1.集合框架:包括队列和并发集合 2.同步辅助类 3.线程池 4.Lock锁 5.原子类 二.详解部分 1.同步辅助类详解部分 (1)CountDownLatch 举例:有三个工作,只有三个工作都完成,任务才算执行完成. 1 import java.util.concurrent.CountDownLatch; 2 3 /** 4 * 5 * @author qiuxiang 6 * 7 */ 8 public class Coun

Java并发包总结

Java 并发包 并发包中除了提供高性能的线程安全的集合对象外,还提供了很多并发场景需要的原子操作类,例如AtomicInteger,另外还提供了一些用于避免并发时资源冲突的Lock及Condition类. ConcurrentHashMap 线程安全的HashMap的实现. 维护的是一个Segment对象数组,segment继承ReentrantLock 方法 意义 ConcurrentHashMap() put(Object key,Object value) ConcurrentHashM

java并发包提供的三种常用并发队列实现

java并发包中提供了三个常用的并发队列实现,分别是:ConcurrentLinkedQueue.LinkedBlockingQueue和ArrayBlockingQueue. ConcurrentLinkedQueue使用的是CAS原语无锁队列实现,是一个异步队列,入队速度很快,出队进行了加锁,性能稍慢: LinkedBlockingQueue也是阻塞队列,入队和出队都用了加锁,当队空的时候线程会暂时阻塞:当队空的时候线程会暂时阻塞: ArrayBlockingQueue是初始容器固定的阻塞队

【学习】004 java并发包&amp;线程池原理分析&amp;锁的深度化

并发包[jdk1.7] 同步容器类 Vector与ArrayList区别 1.ArrayList是最常用的List实现类,内部是通过数组实现的,它允许对元素进行快速随机访问.数组的缺点是每个元素之间不能有间隔,当数组大小不满足时需要增加存储能力,就要讲已经有数组的数据复制到新的存储空间中.当从ArrayList的中间位置插入或者删除元素时,需要对数组进行复制.移动.代价比较高.因此,它适合随机查找和遍历,不适合插入和删除. 2.Vector与ArrayList一样,也是通过数组实现的,不同的是它

java.util.concurrent BlockingQueue详解

什么是阻塞队列? 阻塞队列(BlockingQueue)是一个支持两个附加操作的队列.这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空.当队列满时,存储元素的线程会等待队列可用.阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程.阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素. 阻塞队列提供了四种处理方法: 方法\处理方式 抛出异常 返回特殊值 一直阻塞 超时退出 插入 add(e) offer(e) put(e) o

java并发包小结(二)

接上一篇 java并发包小结(一):http://blog.csdn.net/aalansehaiyang52/article/details/8877579 Future 接口Future 接口允许表示已经完成的任务.正在执行过程中的任务或者尚未开始执行的任务.通过 Future 接口,可以尝试取消尚未完成的任务,查询任务已经完成还是取消了,以及提取(或等待)任务的结果值.FutureTask 类实现了 Future,并包含一些构造函数,允许将 Runnable 或 Callable(会产生结

Java 中的BlockingQueue

前言: 在新增的Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题.通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利.本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及常见使用场景. 认识BlockingQueue阻塞队列,顾名思义,首先它是一个队列,而一个队列在数据结构中所起的作用大致如下图所示:从上图我们可以很清楚看到,通过一个共享的队列,可以使得数据由队列的一端输入,从另外一