一、线程池源码如下
1、阻塞任务队列 BlockingQueue
public interface BlockingQueue<E> {
boolean offer(E e);
public E take();
}
阻塞任务队列实现类 LinkedBlockingQueue
import java.util.concurrent.atomic.AtomicInteger;
public class LinkedBlockingQueue<E> implements BlockingQueue<E> {
private final AtomicInteger count = new AtomicInteger();
private final int capacity;
transient Node<E> head;
private transient Node<E> last;
public LinkedBlockingQueue(int capacity) {
this.capacity = capacity;
last = head = new Node<E>(null);
}
//向阻塞队列的尾部添加一个任务
@Override
public boolean offer(E e) {
//如果已达到队列的容量,则拒接添加,返回false
if(count.get() == capacity) {
return false;
}
Node<E> node = new Node<E>(e);
last = last.next = node;
count.getAndIncrement();
return true;
}
//从阻塞队列的头部取出一个任务
@Override
public E take() {
Node<E> h = head.next;
if(h == null) {
return null;
}
E x = h.item;
head = h;
return x;
}
static class Node<E>{
E item;
Node<E> next;
Node(E x){
item = x;
}
}
}
2、创建线程的工厂 ThreadFactory
public interface ThreadFactory {
Thread newThread(Runnable r);
}
创建线程的工厂实现类 MyThreadFactory
import java.util.concurrent.atomic.AtomicInteger;
public class MyThreadFactory implements ThreadFactory {
private final AtomicInteger threadNumber = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "demo" + threadNumber.getAndIncrement());
return t;
}
}
3、线程池接口 ExecutorService
public interface ExecutorService {
void execute(Runnable command);
}
线程池实现类 ThreadPoolExecutor
import java.util.HashSet;
import java.util.concurrent.atomic.AtomicInteger;
public class ThreadPoolExecutor implements ExecutorService{
private final HashSet<Worker> workers = new HashSet<Worker>();
private static final AtomicInteger ctl = new AtomicInteger(0);
private final BlockingQueue<Runnable> workQueue;
private volatile ThreadFactory threadFactory;
private volatile int corePoolSize;
private volatile int maximumPoolSize;
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.threadFactory = threadFactory;
}
@Override
public void execute(Runnable command) {
//如果线程池当前线程数小于核心线程数,则调用addWorker方法增加一个工作线程
if(ctl.get() < corePoolSize) {
if(addWorker(command, true)) {
return;
}
}
//否则尝试将该任务添加到任务队列
if(workQueue.offer(command)) {
workQueue.offer(command);
}else{//如果添加该任务到任务队列失败则说明任务队列已满,新开启线程执行该任务
//如果新开启线程失败
if(!addWorker(command, false)) {
System.out.println("任务" + command + "被线程池拒绝");
}
}
}
private boolean addWorker(Runnable firstTask, boolean core) {
int c = ctl.get();
if(c >= (core ? corePoolSize : maximumPoolSize)) {
return false;
}
ctl.compareAndSet(c, c+1);
Worker w = new Worker(firstTask);
final Thread t = w.thread;
workers.add(w);
t.start();
return true;
}
private final class Worker implements Runnable{
final Thread thread;
Runnable firstTask;
Worker(Runnable firstTask) {
this.firstTask = firstTask;
this.thread = threadFactory.newThread(this);
}
@Override
public void run() {
Runnable task = this.firstTask;
while (task != null || (task = workQueue.take()) != null) {
task.run();
task = null;
}
}
}
}
4、创建线程池的工具类 Executors
public class Executors {
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, new LinkedBlockingQueue<Runnable>(2), new MyThreadFactory());
}
}
二、测试线程池的功能
public class ThreadTest {
public static void main(String[] args) {
/**
* 1、使用Executors创建线程池
*/
// ExecutorService executors = Executors.newFixedThreadPool(2);
/**
* 2、直接new ThreadPoolExecutor,并指定corePoolSize和maximumPoolSize不一样
*/
ExecutorService executors = new ThreadPoolExecutor(2, 4,
new LinkedBlockingQueue<Runnable>(3), new MyThreadFactory());
executors.execute(new Runnable() {
@Override
public void run() {
int i = 0;
while(i++ < 5) {
System.out.println(Thread.currentThread().getName() + "-- 111");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
executors.execute(new Runnable() {
@Override
public void run() {
int i = 0;
while(i++ < 5) {
System.out.println(Thread.currentThread().getName() + "-- 222");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
executors.execute(new Runnable() {
@Override
public void run() {
int i = 0;
while(i++ < 5) {
System.out.println(Thread.currentThread().getName() + "-- 333");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
executors.execute(new Runnable() {
@Override
public void run() {
int i = 0;
while(i++ < 5) {
System.out.println(Thread.currentThread().getName() + "-- 444");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
executors.execute(new Runnable() {
@Override
public void run() {
int i = 0;
while(i++ < 5) {
System.out.println(Thread.currentThread().getName() + "-- 555");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
executors.execute(new Runnable() {
@Override
public void run() {
int i = 0;
while(i++ < 5) {
System.out.println(Thread.currentThread().getName() + "-- 666");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
executors.execute(new Runnable() {
@Override
public void run() {
int i = 0;
while(i++ < 5) {
System.out.println(Thread.currentThread().getName() + "-- 777");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
executors.execute(new Runnable() {
@Override
public void run() {
int i = 0;
while(i++ < 5) {
System.out.println(Thread.currentThread().getName() + "-- 888");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
System.out.println("主函数结束");
}
}
从输出结果可以看出,启动了4个线程,并且有线程执行了多个任务,有任务被拒绝
原文地址:https://www.cnblogs.com/jiangwangxiang/p/9246770.html