Java实现锁、公平锁、读写锁、信号量、阻塞队列、线程池等常用并发工具

锁的实现

锁的实现其实很简单,主要使用Java中synchronized关键字。

public class Lock {

    private volatile boolean isLocked = false;

    private Thread lockingThread = null;

    public synchronized void lock() throws InterruptedExpection {
        while(isLocked){
            wait();
        }
        isLocked = true;
        lockingThread = Thread.currentThread();
    }

    public synchronized void unlock() {
        if(this.lockingThread != Thread.currentThread()){
            throw new IllegalMonitorStateException("Calling thread has not locked this lock");
        }
        isLocked = false;
        lockingThread = null;
        notify();
    }
}

公平锁的实现

上面的锁的实现严格意义上说是会存在线程饥饿现象的(也就是说在多线程竞争的条件下,存在一种极端情况,即某个线程一直阻塞在锁上,永远都是其他线程被优先唤醒,导致自己得不到执行)。下面是公平锁的实现:

/**
 * @Author: Jeysin
 * @Date: 2019/4/16 12:16
 * @Desc: 公平锁的实现,不会存在线程饿死现象。
 * 实现原理:每个线程在不同的对象上调用wait方法,Lock类可以决定调用哪个对象的notify方法,所以可以做到唤醒特定的线程
 */

public class FairLock {

    private volatile boolean isLocked = false;

    private Thread lockingThread = null;

    private List<QueueObject> waitingThreads = new ArrayList<QueueObject>();

    public void lock() throws InterruptedException{
        QueueObject queueObject = new QueueObject();//首先给每个要加锁的线程new一个QueueObject对象
        boolean isLockedForThisThread = true;
        synchronized (this){
            waitingThreads.add(queueObject);//将这个对象添加到链表里,注意用synchronize关键字做并发控制
        }
        while(isLockedForThisThread){
            synchronized (this) {
                //判断一下当前锁是否没有被占用,并且判断当前线程对应的QueueObject是否是链表中的第一个(因为默认链表中第一个线程首先获得锁)
                isLockedForThisThread = isLocked || waitingThreads.get(0) != queueObject;
                if (!isLockedForThisThread) {
                    isLocked = true;
                    waitingThreads.remove(queueObject);
                    lockingThread = Thread.currentThread();
                    return;//链表中第一个线程加锁成功后从链表中移除自身对应的QueueObject对象,并从这条语句返回
                }
            }
            try{
                queueObject.doWait();//其他线程阻塞在这条语句上
            }catch (InterruptedException e){
                synchronized (this){
                    waitingThreads.remove(queueObject);
                    throw e;
                }
            }
        }
    }

    public synchronized void unlock(){
        if(this.lockingThread != Thread.currentThread()){
            throw new IllegalMonitorStateException("Calling thread has not locked this lock");
        }
        isLocked = false;
        lockingThread = null;
        if(waitingThreads.size() > 0){
            waitingThreads.get(0).doNotify();//默认唤醒链表中第一个对象对应的线程,达到公平的目的
        }
    }
}
/**
 * @Author: Jeysin
 * @Date: 2019/4/16 12:20
 * @Desc:
 */

public class QueueObject {

    private boolean isNotified = false;

    public synchronized void doWait() throws InterruptedException{
        while(!isNotified){
            this.wait();
        }
        this.isNotified = false;
    }

    public synchronized void doNotify(){
        this.isNotified = true;
        this.notify();
    }

    @Override
    public boolean equals(Object obj) {
        return this == obj;
    }
}

读写锁的实现

还记得秋招面试美团的时候,二面面试官的第一道编程题就是实现一个读写锁,当时不会Java,用C++写的,还记得当时用的是Linux下的pthread_mutex(也就是互斥量),耗了半个小时死活没有实现出一个读写锁,感觉怎么写都不对,都有点怀疑人生了,毫无疑问那场面试挂掉了。当时我就在想,肯定是一开始思路就错了,pthread_mutex虽然也可以实现一个锁的功能,但是离实现读写锁还是差了太远,一个pthread_mutex肯定是不行的(甚至用两个也不行,别问我是怎么知道的,我在那半个小时的面试里尝试了无数次最后还是不行)。直到最近看了Java版本的一个实现,synchronized加上wait和notify完美解决问题,我才意识到果然是一开始思路就错了,也许当时我用一个pthread_mutex和一个pthread_cond就可以解决问题。现在想来,要实现一个读写锁最关键的地方要有线程的唤醒机制,notify可以做到,pthread_cond也可以做到,但是光用pthread_mutex是不可能做到的。啥也不说了,Java大法好。

/**
 * @Author: Jeysin
 * @Date: 2019/4/16 22:01
 * @Desc: 不可重入的读写锁实现
 */

public class ReadWriteLock {
    private volatile int readers = 0;
    private volatile int writers = 0;
    private volatile int writeRequests = 0;

    public synchronized void lockRead() throws InterruptedException{
        while(writers > 0 || writeRequests > 0){
            this.wait();
        }
        ++readers;
    }

    public synchronized void unlockRead(){
        --readers;
        this.notifyAll();
    }

    public synchronized void lockWrite() throws InterruptedException{
        ++writeRequests;
        while(readers > 0 || writers > 0){
            wait();
        }
        --writeRequests;
        ++writers;
    }

    public synchronized void unlockWrite(){
        --writers;
        notifyAll();
    }
}

顺带附上一个可重入版本的读写锁实现:

/**
 * @Author: Jeysin
 * @Date: 2019/4/16 22:33
 * @Desc: 可重入读写锁的实现
 */

public class ReentrantReadWriteLock {

    private Map<Thread, Integer> readingThreadsMap = new HashMap<Thread, Integer>();

    private volatile int writers = 0;

    private volatile int writeRequests = 0;

    private volatile Thread writingThread = null;

    public synchronized void lockRead() throws InterruptedException{
        Thread callingThread = Thread.currentThread();
        while(!canGrantReadAccess(callingThread)){
            wait();
        }
        readingThreadsMap.put(callingThread,getAccessCount(callingThread) + 1);
    }

    public synchronized void unlockRead(){
        Thread callingThread = Thread.currentThread();
        int count = getAccessCount(callingThread);
        if(count == 1){
            readingThreadsMap.remove(callingThread);
        }else {
            readingThreadsMap.put(callingThread, count-1);
        }
        notifyAll();
    }

    public synchronized void lockWrite() throws InterruptedException{
        ++writeRequests;
        Thread callingThread = Thread.currentThread();
        while(!canGrantWriteAccess(callingThread)){
            wait();
        }
        --writeRequests;
        ++writers;
        writingThread = callingThread;
    }

    public synchronized void unlockWrite(){
        --writers;
        if(writers == 0){
            writingThread = null;
        }
        notifyAll();
    }

    private boolean canGrantWriteAccess(Thread callingThread){
        if(readingThreadsMap.size() > 0){
            return false;
        }
        if(writers > 0 && writingThread != callingThread){
            return false;
        }
        return true;
    }

    private boolean canGrantReadAccess(Thread callingThread){
        if(writers > 0){
            return false;
        }
        if(readingThreadsMap.get(callingThread) != null){
            return true;
        }
        if(writeRequests > 0){
            return false;
        }
        return true;
    }

    private Integer getAccessCount(Thread callingThread){
        Integer count = readingThreadsMap.get(callingThread);
        if(count == null){
            return 0;
        }
        return count;
    }
}

信号量

信号量的实现同样也可以借用synchronized关键字,不得不说,synchronized大法好啊~

/**
 * @Author: Jeysin
 * @Date: 2019/4/18 15:16
 * @Desc: 信号量的实现
 */

public class Semaphore {

    private volatile boolean signal = false;

    public synchronized void take(){
        this.signal = true;
        this.notify();
    }

    public synchronized void release() throws InterruptedException{
        while(!this.signal){
            wait();
        }
        this.signal = false;
    }
}
/**
 * @Author: Jeysin
 * @Date: 2019/4/18 15:21
 * @Desc: 有上限的信号量的实现
 */

public class BoundedSemaphore {

    private volatile int signal = 0;

    private volatile int bound = 0;

    public BoundedSemaphore(int bound){
        this.bound = bound;
    }

    public synchronized void take() throws InterruptedException{
        while(this.signal == this.bound){
            wait();
        }
        ++signal;
        notify();
    }

    public synchronized void release() throws InterruptedException{
        while(signal == 0){
            wait();
        }
        --signal;
        notify();
    }
}

阻塞队列

/**
 * @Author: Jeysin
 * @Date: 2019/4/18 15:43
 * @Desc: 阻塞队列的实现
 */

public class BlockQueue {

    private List queue = new LinkedList();

    private volatile int limit = 10;

    public BlockQueue(int limit){
        this.limit = limit;
    }

    public synchronized void enqueue(Object object) throws InterruptedException{
        while(this.queue.size() > limit){
            wait();
        }
        if(this.queue.size() == 1){
            notifyAll();
        }
        queue.add(object);
    }

    public synchronized Object dequeue() throws InterruptedException{
        while(this.queue.size() == 0){
            wait();
        }
        if(this.queue.size() == limit){
            notifyAll();
        }
        return this.queue.remove(0);
    }
}

线程池

有了阻塞队列,线程池的实现就很简单了

/**
 * @Author: Jeysin
 * @Date: 2019/4/18 16:07
 * @Desc: 线程池的实现
 */

public class ThreadPool {

    private BlockingQueue<Runnable> taskQueue = null;

    private List<PoolThread> threads = new ArrayList<PoolThread>();

    private volatile boolean isStopped = false;

    public ThreadPool(int threadNums, int maxTaskNums){
        this.taskQueue = new LinkedBlockingQueue<Runnable>(maxTaskNums);
        for(int i=0; i<threadNums; ++i){
            threads.add(new PoolThread(taskQueue));
        }
        for(PoolThread poolThread : threads){
            poolThread.start();
        }
    }

    public synchronized void execute(Runnable task){
        if(this.isStopped){
            throw new IllegalStateException("Thread pool is stopped");
        }
        this.taskQueue.add(task);
    }

    public synchronized void stop(){
        this.isStopped = true;
        for(PoolThread poolThread : threads){
            poolThread.toStop();
        }
    }
}
/**
 * @Author: Jeysin
 * @Date: 2019/4/18 16:09
 * @Desc:
 */

public class PoolThread extends Thread {

    private BlockingQueue<Runnable> taskQueue = null;

    private volatile boolean isStopped = false;

    public PoolThread(BlockingQueue<Runnable> queue){
        this.taskQueue = queue;
    }

    @Override
    public void run() {
        while(!isStopped){
            try{
                Runnable runnable = taskQueue.take();
                runnable.run();
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }

    public synchronized void toStop(){
        isStopped = true;
        this.interrupt();
    }
}

参考文章:
http://tutorials.jenkov.com/java-concurrency/index.html

原文地址:https://www.cnblogs.com/jeysin/p/10840069.html

时间: 2024-11-07 13:33:51

Java实现锁、公平锁、读写锁、信号量、阻塞队列、线程池等常用并发工具的相关文章

可重入锁 公平锁 读写锁

1.可重入锁 如果锁具备可重入性,则称作为可重入锁. ========================================== (转)可重入和不可重入 2011-10-04 21:38 这种情况出现在多任务系统当中,在任务执行期间捕捉到信号并对其进行处理时,进程正在执行的指令序列就被信号处理程序临时中断.如果从信号处理程序返回,则继续执行进程断点处的正常指令序列,从重新恢复到断点重新执行的过程中,函数所依赖的环境没有发生改变,就说这个函数是可重入的,反之就是不可重入的.众所周知,在进

06 锁:可重入锁 公平锁 读写锁

1.可重入锁 如果锁具备可重入性,则称作为可重入锁. 像synchronized和ReentrantLock都是可重入锁,可重入性在我看来实际上表明了锁的分配机制: 基于线程的分配,而不是基于方法调用的分配. 举个简单的例子,当一个线程执行到某个synchronized方法时,比如说method1,而在method1中会调用另外一个synchronized方法method2, 此时线程不必重新去申请锁,而是可以直接执行方法method2. class MyClass { public synch

多线程并发编程之显示锁ReentrantLock和读写锁

在Java5.0之前,只有synchronized(内置锁)和volatile. Java5.0后引入了显示锁ReentrantLock. ReentrantLock概况 ReentrantLock是可重入的锁,它不同于内置锁, 它在每次使用都需要显示的加锁和解锁, 而且提供了更高级的特性:公平锁, 定时锁, 有条件锁, 可轮询锁, 可中断锁. 可以有效避免死锁的活跃性问题.ReentrantLock实现了 Lock接口: public interface Lock { //阻塞直到获得锁或者中

锁的封装 读写锁、lock

最近由于项目上面建议使用读写锁,而去除常见的lock锁.然后就按照需求封装了下锁.以简化锁的使用.但是开发C#的童鞋都知道lock关键字用起太方便了,但是lock关键字不支持超时处理.很无奈,为了实现类似lock的功能.于是通过使用using关键字和IDisposable实现了自己的锁方法 class Program { static void Main(string[] args) { ReadWriteUtilTest(); MonitorUtilTest(); Console.ReadLi

并发包独占锁ReentrantLock与读写锁ReentrantReadWriteLock

两个锁都是依赖AQS实现的,方法基本是Sync的封装,主要看Sync的设计实现, 一.可重入独占锁ReentrantLock 1.静态内部抽象类Sync //继承AQS abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -5179523762034025860L; /** * 延迟到子类实现(公平锁与非公平锁) */ abstra

架构师养成记--14.重入锁ReentrantLock 和 读写锁 ReentrantReadWriteLock

ReentrantLock 有嗅探锁定和多路分支等功能,其实就是synchronized,wait,notify的升级. this锁定当前对象不方便,于是就有了用new Object()来作为锁的解决方案,后面jdk干脆就提供了一个Lock类. 伪代码: Lock lock = new ReentrantLock();//新建一个lock Condition condition = lock.newCondition();//获取条件 method1(){ try{ lock.lock(); 代

Java中的公平锁和非公平锁实现详解

在ReentrantLock中包含了公平锁和非公平锁两种锁,通过查看源码可以看到这两种锁都是继承自Sync,而Sync又继承自AbstractQueuedSynchronizer,而AbstractQueuedSynchronizer又继承自AbstractOwnableSynchronizer,下面是类的继承关系图: 其中AbstractOwnableSynchronizer是提供了设置占用当前锁的线程信息的方法,主要的锁的实现还是在AbstractQueuedSynchronizer中实现的

Python 线程----线程方法,线程事件,线程队列,线程池,GIL锁,协程,Greenlet

主要内容: 线程的一些其他方法 线程事件 线程队列 线程池 GIL锁 协程 Greenlet Gevent 一. 线程(threading)的一些其他方法 from threading import Thread import threading import time def work(): time.sleep(1) print("子线程对象>>>", threading.current_thread()) # 子线程对象 print("子线程名称>

了解信号量Semaphore和线程池的差异

信号量 其实本质上是锁,Lock是单锁,信号量是指定多把锁,也就是说通过信号量指定多个数线程可以访问相同资源,一般情况下读操作可以有多个,但写操作同时只有一个 信号量模块 semaphore # 使用起来和普通锁没 什么区别,但这个是比锁更加粗粒度锁,锁的是线程 # 在线程实例前加锁,把锁传递进线程,在线程结束时候释放锁 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 from threading import Thread