1. Callable 接口
获取线程运行状态(get、get(long timeout)),取消线程(cancel(boolean mayinterruptifrunning))。isCancel,isDone等。
1. 实现callable 接口,
2. 创建线程池。submit thread
例如以下所看到的:
Future< ? > processFuture = submitTask( new ProcessPbxPhoneThread(queue, phoneHandler, runningLock, runningFlagMap));
private class ProcessPbxPhoneThread implements Callable<Boolean> { private BlockingQueue<PbxPhone> queue; private IPbxNotificationHandler<PbxPhone> phoneHandler; private ReadWriteLock runningLock; private Map<String, Boolean> runningFlagMap; private int processedPhoneCount = 0; private long currentTime; public ProcessPbxPhoneThread(BlockingQueue<PbxPhone> queue, IPbxNotificationHandler<PbxPhone> phoneHandler, ReadWriteLock runningLock, Map<String, Boolean> runningFlagMap) { this.queue = queue; this.phoneHandler = phoneHandler; this.runningFlagMap = runningFlagMap; this.runningLock = runningLock; } @Override public Boolean call() throws Exception { while (true) { PbxPhone phone = queue.take(); if (processedPhoneCount % 100 == 0) { long now = System.currentTimeMillis(); if (currentTime != 0) { double speed = processedPhoneCount * 1000.0 * 60 / (now - currentTime); log.debug("ProcessPbxPhoneThread phone process speed:{}*m", speed); } currentTime = now; processedPhoneCount = 0; } processedPhoneCount++; if (null != phone.getLines()) { currentTime("invokePhoneHander"); PbxNotification<PbxPhone> notification = new PbxNotification<PbxPhone>( phoneHandler.getNotificationType(), NotificationOperation.INSERT, phone.getUuid().toString(), phone); if (!phoneHandler.objectChanged(notification)) { logSpentTime("invokePhoneHander"); try { runningLock.writeLock().lock(); runningFlagMap.put("runningFlag", false); break; } finally { runningLock.writeLock().unlock(); } } else { logSpentTime("invokePhoneHander"); } } else { break; } } return true; } }
submitTask 利用封装好的线程池提交线程:
protected Future< ? > submitTask(Callable< ? > task) { return this.cucmDriverFactory.<strong style="background-color: rgb(255, 0, 0);">getExecutorService</strong>().submit(task); }
线程池创建例如以下代码:
@Override public void initialize() { executorService = Executors.newCachedThreadPool(new DefaultThreadFactory("CucmDriverFactor-")); }
@Override public ExecutorService getExecutorService() { if (executorService.isShutdown() || executorService.isTerminated()) { initialize(); } return this.executorService; }
ThreadFactory 创建例如以下:
/**
* The thread factory, with name Prefix. Copied from ThreadPoolExecutor.DefaultThreadFactory
*/
public static class DefaultThreadFactory implements ThreadFactory {
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;public DefaultThreadFactory(String namePrefix) {
SecurityManager s =时间: 2024-10-18 10:23:03