自己动手写线程池

一、线程池源码如下

1、阻塞任务队列 BlockingQueue

public interface BlockingQueue<E> {
  boolean offer(E e);
  public E take();
}

阻塞任务队列实现类  LinkedBlockingQueue

import java.util.concurrent.atomic.AtomicInteger;

public class LinkedBlockingQueue<E> implements BlockingQueue<E> {
  private final AtomicInteger count = new AtomicInteger();
  private final int capacity;

  transient Node<E> head;
  private transient Node<E> last;

  public LinkedBlockingQueue(int capacity) {
    this.capacity = capacity;
    last = head = new Node<E>(null);
  }
  //向阻塞队列的尾部添加一个任务
  @Override
  public boolean offer(E e) {
    //如果已达到队列的容量,则拒接添加,返回false
    if(count.get() == capacity) {
      return false;
    }
    Node<E> node = new Node<E>(e);
    last = last.next = node;
    count.getAndIncrement();
    return true;
  }

  //从阻塞队列的头部取出一个任务
  @Override
  public E take() {
    Node<E> h = head.next;
    if(h == null) {
      return null;
    }
    E x = h.item;
    head = h;
    return x;
  }

  static class Node<E>{
    E item;
    Node<E> next;

    Node(E x){
      item = x;
    }
  }
}

2、创建线程的工厂  ThreadFactory

public interface ThreadFactory {
  Thread newThread(Runnable r);
}

创建线程的工厂实现类    MyThreadFactory

import java.util.concurrent.atomic.AtomicInteger;

public class MyThreadFactory implements ThreadFactory {
  private final AtomicInteger threadNumber = new AtomicInteger(1);

  @Override
  public Thread newThread(Runnable r) {
    Thread t = new Thread(r, "demo" + threadNumber.getAndIncrement());
    return t;
  }
}

3、线程池接口   ExecutorService

public interface ExecutorService {
  void execute(Runnable command);
}

线程池实现类    ThreadPoolExecutor

import java.util.HashSet;
import java.util.concurrent.atomic.AtomicInteger;

public class ThreadPoolExecutor implements ExecutorService{
  private final HashSet<Worker> workers = new HashSet<Worker>();
  private static final AtomicInteger ctl = new AtomicInteger(0);

  private final BlockingQueue<Runnable> workQueue;
  private volatile ThreadFactory threadFactory;
  private volatile int corePoolSize;
  private volatile int maximumPoolSize;

  public ThreadPoolExecutor(int corePoolSize,
               int maximumPoolSize,
               BlockingQueue<Runnable> workQueue,
              ThreadFactory threadFactory) {
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.threadFactory = threadFactory;
  }

  @Override
  public void execute(Runnable command) {
    //如果线程池当前线程数小于核心线程数,则调用addWorker方法增加一个工作线程
    if(ctl.get() < corePoolSize) {
      if(addWorker(command, true)) {
        return;
      }
    }
    //否则尝试将该任务添加到任务队列
    if(workQueue.offer(command)) {
      workQueue.offer(command);
    }else{//如果添加该任务到任务队列失败则说明任务队列已满,新开启线程执行该任务
      //如果新开启线程失败
      if(!addWorker(command, false)) {
        System.out.println("任务" + command + "被线程池拒绝");
      }
    }
  }

  private boolean addWorker(Runnable firstTask, boolean core) {
    int c = ctl.get();
    if(c >= (core ? corePoolSize : maximumPoolSize)) {
      return false;
    }
    ctl.compareAndSet(c, c+1);

    Worker w = new Worker(firstTask);
    final Thread t = w.thread;
    workers.add(w);
    t.start();

    return true;
  }

  private final class Worker implements Runnable{
    final Thread thread;
    Runnable firstTask;

    Worker(Runnable firstTask) {
      this.firstTask = firstTask;
      this.thread = threadFactory.newThread(this);
    }

    @Override
    public void run() {
      Runnable task = this.firstTask;
      while (task != null || (task = workQueue.take()) != null) {
        task.run();
        task = null;
      }
    }
  }
}

4、创建线程池的工具类   Executors

public class Executors {
  public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads, new LinkedBlockingQueue<Runnable>(2), new MyThreadFactory());
  }
}

二、测试线程池的功能

public class ThreadTest {
  public static void main(String[] args) {
    /**
    * 1、使用Executors创建线程池
    */
    // ExecutorService executors = Executors.newFixedThreadPool(2);
    /**
    * 2、直接new ThreadPoolExecutor,并指定corePoolSize和maximumPoolSize不一样
    */
    ExecutorService executors = new ThreadPoolExecutor(2, 4,
          new LinkedBlockingQueue<Runnable>(3), new MyThreadFactory());

    executors.execute(new Runnable() {
      @Override
      public void run() {
        int i = 0;
        while(i++ < 5) {
          System.out.println(Thread.currentThread().getName() + "-- 111");
          try {
            Thread.sleep(5000);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
      }
    });
    executors.execute(new Runnable() {
      @Override
      public void run() {
        int i = 0;
        while(i++ < 5) {
          System.out.println(Thread.currentThread().getName() + "-- 222");
          try {
            Thread.sleep(5000);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
      }
    });
    executors.execute(new Runnable() {
      @Override
      public void run() {
        int i = 0;
        while(i++ < 5) {
          System.out.println(Thread.currentThread().getName() + "-- 333");
          try {
            Thread.sleep(5000);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
      }
    });
    executors.execute(new Runnable() {
      @Override
      public void run() {
        int i = 0;
        while(i++ < 5) {
          System.out.println(Thread.currentThread().getName() + "-- 444");
          try {
            Thread.sleep(5000);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
      }
    });
    executors.execute(new Runnable() {
      @Override
      public void run() {
        int i = 0;
        while(i++ < 5) {
          System.out.println(Thread.currentThread().getName() + "-- 555");
          try {
            Thread.sleep(5000);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
      }
    });
    executors.execute(new Runnable() {
      @Override
      public void run() {
        int i = 0;
        while(i++ < 5) {
          System.out.println(Thread.currentThread().getName() + "-- 666");
          try {
            Thread.sleep(5000);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
      }
    });
    executors.execute(new Runnable() {
      @Override
      public void run() {
        int i = 0;
        while(i++ < 5) {
          System.out.println(Thread.currentThread().getName() + "-- 777");
          try {
            Thread.sleep(5000);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
      }
    });
    executors.execute(new Runnable() {
      @Override
      public void run() {
        int i = 0;
        while(i++ < 5) {
          System.out.println(Thread.currentThread().getName() + "-- 888");
          try {
            Thread.sleep(5000);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
      }
    });

    System.out.println("主函数结束");
  }
}

从输出结果可以看出,启动了4个线程,并且有线程执行了多个任务,有任务被拒绝

原文地址:https://www.cnblogs.com/jiangwangxiang/p/9246770.html

时间: 2024-10-07 20:18:09

自己动手写线程池的相关文章

手写线程池 (一)

前言准备 1.jdk线程池的使用:https://www.cnblogs.com/jtfr/p/10187419.html 2.线程池核心:线程的复用. 运行的线程是线程池的核心,被添加的任务需要实现过Runnable接口,主要是保证有run方法.运行时候 对象.run() . 一.手写线程池注意要点 1.线程池需要添加任务,任务是放置在一个队列(FIFO)当中,具体只要保证FIFO,或优先级保证(Map集合)先执行.2.线程池运行,需要一个容器存放创建的线程,可数组或集合,可以自己设计思考.3

图解线程池工作机制,手写线程池?

ThreadPoolExecutor构造函数的各个参数说明 public ThreadPoolExecutor(int corePoolSize,//线程池中核心线程数 int maximumPoolSize,//允许的最大线程数 long keepAliveTime,//线程空闲下来后,存活的时间,这个参数只在> corePoolSize才有用 TimeUnit unit,//存活时间的单位值 BlockingQueue<Runnable> workQueue,//保存任务的阻塞队列

手写线程池

Executors.newSingleThreadExecutor(): 只有一个线程的线程池,因此所有提交的任务是顺序执行 Executors.newCachedThreadPool(): 线程池里有很多线程需要同时执行,老的可用线程将被新的任务触发重新执行, 如果线程超过60秒内没执行,那么将被终止并从池中删除 Executors.newFixedThreadPool(): 拥有固定线程数的线程池,如果没有任务执行,那么线程会一直等待 Executors.newScheduledThread

死磕 java线程系列之自己动手写一个线程池(续)

(手机横屏看源码更方便) 问题 (1)自己动手写的线程池如何支持带返回值的任务呢? (2)如果任务执行的过程中抛出异常了该怎么处理呢? 简介 上一章我们自己动手写了一个线程池,但是它是不支持带返回值的任务的,那么,我们自己能否实现呢?必须可以,今天我们就一起来实现带返回值任务的线程池. 前情回顾 首先,让我们先回顾一下上一章写的线程池: (1)它包含四个要素:核心线程数.最大线程数.任务队列.拒绝策略: (2)它具有执行无返回值任务的能力: (3)它无法处理有返回值的任务: (4)它无法处理任务

Tomcat线程池,更符合大家想象的可扩展线程池

因由 说起线程池,大家可能受连接池的印象影响,天然的认为,它应该是一开始有core条线程,忙不过来了就扩展到max条线程,闲的时候又回落到core条线程,如果还有更高的高峰,就放进一个缓冲队列里缓冲一下. 有些整天只和SSH打交道的同学,可能现在还是这样认为的. 无情的现实就是,JDK只有两种典型的线程池,FixedPool 与 CachedPool: FixedPool固定线程数,忙不过来的全放到无限长的缓冲队列里. CachedPool,忙不过来时无限的增加临时线程,闲时回落,没有缓冲队列.

异步线程池的实现(一)-------具体实现方法

本篇是这个内容的第一篇,主要是写:遇到的问题,和自己摸索实现的方法.后面还会有一篇是总结性地写线程池的相关内容(偏理论的). 一.背景介绍 朋友的项目开发到一定程度之后,又遇到了一些问题:在某些流程中的一些节点,由于是串联执行的.上一步要等下一步执行完毕:或者提交数据之后要等待后台其他系统处理完成之后,才能返回结果.这样就会导致,请求发起方不得不一直等待结果,用户体验很不好:从项目优化来说,模块与模块之间构成了强耦合,这也是不利于以后扩展的,更不用说访问量上来之后,肯定会抓瞎的问题.所以,我就着

用阻塞队列和线程池简单实现生产者和消费者场景

本例子仅仅是博主学习阻塞队列和后的一些小实践,并不是真正的应用场景! 生产者消费者场景是我们应用中最常见的场景,我们可以通过ReentrantLock的Condition和对线程进行wait,notify同通信来实现生产者和消费者场景,前者可以实现多生产者和多消费者模式,后者仅可以实现一生产者,一消费者模式. 今天我们就利用阻塞队列来实现下生产者和消费者模式(里面还利用了线程池). 看过我关于阻塞队列博文的朋友已经知道,阻塞队列其实就是由ReentrantLock实现的! 场景就不描述了,为简单

提升爬虫效率之线程池

一.使用Flask模拟阻塞,利用线程池爬取数据 #模拟Flask #pip install flask from flask import Flask,render_template #返回一个模板文件需要导入render_tamplate from time import sleep app = Flask(__name__) @app.route('/bobo') def index_1(): return 'hello world' @app.route('/jay') def index

线程池ThreadPoolExecutor源码分析(一)

1.线程池简介 1.1 线程池的概念: 线程池就是首先创建一些线程,它们的集合称为线程池.使用线程池可以很好地提高性能,线程池在系统启动时即创建大量空闲的线程,程序将一个任务传给线程池,线程池就会启动一条线程来执行这个任务,执行结束以后,该线程并不会死亡,而是再次返回线程池中成为空闲状态,等待执行下一个任务. 1.2 线程池的工作机制 在线程池的编程模式下,任务是提交给整个线程池,而不是直接提交给某个线程,线程池在拿到任务后,就在内部寻找是否有空闲的线程,如果有,则将任务交给某个空闲的线程. 一