【MINA】OrderedThreadPoolExecutor和UnorderedThreadPoolExecutor的事件监听线程池的选择

mina中有两个线程池概念 1.处理监听建立连接的线程池  2.处理读写事件的线程池

本文中主要探讨读写事件的线程池的选择

这两种都经过实际项目的使用和检测,说说优缺点

早期的项目是用UnorderedThreadPoolExecutor【无序线程池】

特点:线程池管理一个无界阻塞队列,线程在分配事件,并发处理maximumPoolSize个事件,

后面来的事件在没有空闲线程的时候要进入这个无界阻塞队列等待执行

可以看出,他的特点是session之间是没有影响的,从事件的执行效率上要更高一些,但是同session的事件执行顺序很难保证

UnorderedThreadPoolExecutor源码

public UnorderedThreadPoolExecutor(
            int corePoolSize, int maximumPoolSize,
            long keepAliveTime, TimeUnit unit,
            ThreadFactory threadFactory, IoEventQueueHandler queueHandler) {

        super(0, 1, keepAliveTime, unit, new LinkedBlockingQueue<Runnable>(), threadFactory, new AbortPolicy());
        if (corePoolSize < 0) {
            throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
        }

        if (maximumPoolSize == 0 || maximumPoolSize < corePoolSize) {
            throw new IllegalArgumentException("maximumPoolSize: " + maximumPoolSize);
        }

        if (queueHandler == null) {
            queueHandler = IoEventQueueHandler.NOOP;
        }

        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.queueHandler = queueHandler;
    }

@Override
    public void execute(Runnable task) {
        if (shutdown) {
            rejectTask(task);
        }

        checkTaskType(task);

        IoEvent e = (IoEvent) task;
        boolean offeredEvent = queueHandler.accept(this, e);
        if (offeredEvent) {
            getQueue().offer(e);
        }

        addWorkerIfNecessary();

        if (offeredEvent) {
            queueHandler.offered(this, e);
        }
    }

  

后来的新项目使用了OrderedThreadPoolExecutor【有序的线程池】

特点:每个session有自己的事件队列,线程池在分配session,相当于每个session有唯一的线程在处理session中的任务队列,如果没有空闲线程可调度,

那么其他session就只能进入waitingSessions这个阻塞队列等待有空闲线程接管这个session的任务队列

可以看出,session之间是有影响的,但是保证了同session的事件执行是有序的

MyOrderedThreadPoolExecutor代码完全抄OrderedThreadPoolExecutor目地是想加入一个参数

maxQueueSize控制每个session任务队列的最大长度,超出就丢弃后进的任务

public MyOrderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
			long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory,
			IoEventQueueHandler eventQueueHandler, int maxQueueSize) {
         // 从这里可以看到,其实现类似于{@link Executors.newCachedThreadPool},不过其可以设置corePoolSize和maximumPoolSize.
        // 不过其execute方法是自实现的,否则如果用父类的则会出现问题,即在任务繁忙的时候会出现任务被拒绝
        // 因为其把任务放到了session的任务队列中.即没有由线程池本身来保存
        // 另外可以看到初始化的corePoolSize和maximumPoolSize分别传了0和1.这是为了更好的处理异常,因为super不能try/catch
		super(DEFAULT_INITIAL_THREAD_POOL_SIZE, 1, keepAliveTime, unit,
				new SynchronousQueue<Runnable>(), threadFactory,
				new AbortPolicy());

		if (corePoolSize < DEFAULT_INITIAL_THREAD_POOL_SIZE) {
			throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
		}

		if ((maximumPoolSize == 0) || (maximumPoolSize < corePoolSize)) {
			throw new IllegalArgumentException("maximumPoolSize: "
					+ maximumPoolSize);
		}

		// Now, we can setup the pool sizes
		super.setCorePoolSize(corePoolSize);
		super.setMaximumPoolSize(maximumPoolSize);

		// The queueHandler might be null.
		if (eventQueueHandler == null) {
			this.eventQueueHandler = IoEventQueueHandler.NOOP;
		} else {
			this.eventQueueHandler = eventQueueHandler;
		}

		this.maxQueueSize = maxQueueSize;
	}

@Override
	public void execute(Runnable task) {
		if (shutdown) {
			rejectTask(task);
		}

		// Check that it‘s a IoEvent task
		checkTaskType(task);

		IoEvent event = (IoEvent) task;

		// Get the associated session
		IoSession session = event.getSession();

		// Get the session‘s queue of events
		SessionTasksQueue sessionTasksQueue = getSessionTasksQueue(session);
		Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;
         //超过任务队列上限就丢弃任务,目前定的值时20
		if (tasksQueue.size() > this.maxQueueSize) {
			LOGGER.error(com.youxigu.dynasty2.i18n.MarkupMessages.getString("MyOrderedThreadPoolExecutor_0"), session.getRemoteAddress(),
					this.maxQueueSize);
			rejectTask(task);
			return;
		}
		boolean offerSession;

		// propose the new event to the event queue handler. If we
		// use a throttle queue handler, the message may be rejected
		// if the maximum size has been reached.
		boolean offerEvent = eventQueueHandler.accept(this, event);

		if (offerEvent) {
			// Ok, the message has been accepted
			synchronized (tasksQueue) {
				// Inject the event into the executor taskQueue
				tasksQueue.offer(event);

				if (sessionTasksQueue.processingCompleted) {
					sessionTasksQueue.processingCompleted = false;
					offerSession = true;
				} else {
					offerSession = false;
				}

				if (LOGGER.isDebugEnabled()) {
					print(tasksQueue, event);
				}
			}
		} else {
			offerSession = false;
		}

		if (offerSession) {
			// As the tasksQueue was empty, the task has been executed
			// immediately, so we can move the session to the queue
			// of sessions waiting for completion.
			waitingSessions.offer(session);
		}

		addWorkerIfNecessary();

		if (offerEvent) {
			eventQueueHandler.offered(this, event);
		}
	}

  

1.为什么要选用后者那

在业务层通常是有locker控制并发,但是还是可能产生并发事件,导致数据错乱的问题,

1>同session的事件执行顺序,不能保证,

2>程序员写业务时对locker的使用不规范,多对象修改时考虑不全,但是单线程有序执行,就可以解决这些问题,但对不同session导致数据交叉是没作用的,那个还得依赖locker

2.考虑两个问题

1>session队列的长度要可控,所以要改造一下,见上面MyOrderedThreadPoolExecutor 红色代码部分

2>线程初始数量的上限的定义选值

3>ExecutorFilter提供了业务逻辑的执行线程,可以将应用层业务逻辑通过配置该filter在配置的线程池内执行

        /**
	 * 消息执行器线程池的最小线程数
	 */
       private int corePoolSize = Runtime.getRuntime().availableProcessors() * 4;

	/**
	 * 消息执行器线程池的最大线程数,读写分开是为了防止大量的读请求阻塞回写
	 */
	private int maxPoolSize = Runtime.getRuntime().availableProcessors() * 24 + 1;

	/**
	 * 消息执行器线程池超过corePoolSize的Thread存活时间;秒
	 */
	private long keepAliveTime = 300;

      /**
      * useOrderedPool=true的时候有效,指定一个session的请求队列的最大长度,超过这个长度的请求丢弃.
      */
     private int orderedPoolMaxQueueSize = 20;

略。。。。。

ThreadPoolExecutor executor = new MyOrderedThreadPoolExecutor(corePoolSize,
					maxPoolSize, keepAliveTime, TimeUnit.SECONDS,
					new NamingThreadFactory("WolfServerThread"),
					ioEventQueueHandler, orderedPoolMaxQueueSize);

         // 这里是预先启动corePoolSize个处理线程
		executor.prestartAllCoreThreads();

         //这里指定的监听事件就会由这里指定的线程池来处,而不在是processr线程来处理
		chain.addLast("exec", new ExecutorFilter(executor,
				IoEventType.EXCEPTION_CAUGHT, IoEventType.MESSAGE_RECEIVED,
				IoEventType.SESSION_CLOSED, IoEventType.SESSION_IDLE,
				IoEventType.SESSION_OPENED));

一篇好文章讲ExecutorFilter      http://www.blogjava.net/landon/archive/2014/02/03/409521.html  

时间: 2024-08-30 03:40:13

【MINA】OrderedThreadPoolExecutor和UnorderedThreadPoolExecutor的事件监听线程池的选择的相关文章

屏幕触摸事件监听,判断上下左右的操作行为,判断方法缩小的操作行为

在手机屏幕上能够实现的人机交互行为,大致包括点击按钮,拉动滑动块,物体缩放,上下左右拉动等. 手机屏幕触摸事件的监听方法: 1.首先要设置一块布局区域,frameLayout/LinearLayout等都可以,并为布局设置id: 2.在Activity中声明相应的布局类型,并通过findViewById()方法找到该布局,然后为该布局区域设置setOnTouchListener()方法,就能监听在相应屏幕触摸操作 实现屏幕触摸事件监听的代码: private LinearLayout Land;

JavaScript-4.5 事件大全,事件监听---ShinePans

绑定事件 <input type="bubtton" onclick="javascript:alert('I am clicked');"> 处理事件 <script language="JavaScript" for="对象" event="事件"> ... (事件处理代码) ... </script> 鼠标事件举例 <script language="

[基础控件]---状态切换控件CompoundButton及其子类CheckBox、RadioButton、ToggleButton、switch事件监听与场景使用

一.事件监听 对于普通的Button,对其进行事件监听Google官方给出了常见的三种监听方式:1.对每一个button设置事件监听器button.setOnClickListener(View.OnclickListener  listener);此种方法当button按钮较多时代码显得多.乱.不够简洁明了. 2.在Activity中实现接口View.OnclickListener,然后重写void onClick(View v)方法,在方法中通过switch(v.getId())予以区分不同

Windows 8 应用程序前后台切换事件监听

在一些情况下,我们需要监听应用程序切换到后台或者从后台切换至前台的事件,从而进行相关处理操作.支付宝应用锁屏(IOS,Android平台)的处理中就需要监听此事件,在用户将应用切换至后台一段时间后再切换至前台的情况下就需要弹出锁屏页面. 下图给出Windows 应用商店应用的生命周期图,应用前后台切换就是在运行和挂起直接进行切换,关于生命周期的详细介绍可以参阅官方文档:http://msdn.microsoft.com/zh-cn/library/windows/apps/hh464925.as

Java中的事件监听机制

鼠标事件监听机制的三个方面: 1.事件源对象: 事件源对象就是能够产生动作的对象.在Java语言中所有的容器组件和元素组件都是事件监听中的事件源对象.Java中根据事件的动作来区分不同的事件源对象,动作发生在哪个组件上,那么该组件就是事件源对象 2.事件监听方法: addMouseListener(MouseListener ml) ;该方法主要用来捕获鼠标的释放,按下,点击,进入和离开的动作:捕获到相应的动作后,交由事件处理类(实现MouseListener接口)进行处理. addAction

关于v4包的Fragment过渡动画的事件监听无响应问题解决

项目中部分功能模块采用了单Activity+多Fragment模式,当Fragment切换时,需要在过渡动画执行完后做一些操作,通常就是在自己封装的FragmentBase中重写onCreateAnimation方法,创建一个Animation对象,并添加动画的事件监听.而最近升级了v4包后,突然发现添加的动画事件监听无响应了.通过查看源码,发现在v4包中关于Fragment管理类FragmentManagerImpl中,在获取Animation对象后,也添加了对动画的监听事件,也就覆盖了我自己

UI事件监听的击穿

什么是UI事件监听的击穿 在游戏视图中,有两个UI界面叠在一起的时候,单击一个空白处,却触发了被覆盖在下层了UI界面中的单击事件,这就是单击击穿了上层界面. 假设场景中放置了一个箱子,单击箱子会触发一个开箱事件,如果单击一个UI,恰好UI在视觉上将箱子覆盖了,那么它也许就会触发箱子的单击事件. 如何避免和解决UI事件监听的击穿 第一种方法:用一层BoxCollider覆盖,进行遮挡. 在界面底板上Attach一个BoxCollider. 第二种方法:使用EventMask Unity的Camer

java事件监听

获取事件监听需要获取实现ActionListener接口的方法, public class SimpleEvent extends JFrame{    private JButton jb=new JButton("我是按钮,点击我");    public SimpleEvent(){        setLayout(null);        setVisible(true);        setSize(200, 100);        setDefaultCloseOp

(12)JavaScript之[事件][事件监听]

事件 1 /** 2 * 事件: 3 * onload 和 onunload 事件在用户进入或离开页面时被触发 4 * 5 * onchange事件常结合对输入字段的验证来使用 6 * onmouseover 和 onmouseout 事件可用于在用户的鼠标移至 HTML 元素上方或移出元素时触发函数. 7 * 8 * onmousedown, onmouseup 以及 onclick 构成了鼠标点击事件的所有部分.首先当点击鼠标按钮时, 9 * 会触发 onmousedown 事件,当释放鼠标