聊聊高并发(二十五)解析java.util.concurrent各个组件(七) 理解Semaphore

前几篇分析了一下AQS的原理和实现。这篇拿Semaphore信号量做样例看看AQS实际是怎样使用的。

Semaphore表示了一种能够同一时候有多个线程进入临界区的同步器,它维护了一个状态表示可用的票据,仅仅有拿到了票据的线程尽能够进入临界区,否则就等待。直到获得释放出的票据。

Semaphore经常使用在资源池中来管理资源。当状态仅仅有1个0两个值时,它退化成了一个相互排斥的同步器。类似锁。

以下来看看Semaphore的代码。

它维护了一个内部类Sync来继承AQS,定制tryXXX方法来使用AQS。

我们之前提到过AQS支持独占和共享两种模式,Semaphore明显就是共享模式。它支持多个线程能够同一时候进入临界区。所以Sync扩展了Shared相关的方法。

能够看到Sync的主要操作都是对状态的无锁改动,它不须要处理AQS队列相关的操作。在聊聊高并发(二十四)解析java.util.concurrent各个组件(六) 深入理解AQS(四) 我们说了AQS提供了tryXXX接口给子类扩展,相当于给子类一个机会,能够自己处理状态,决定是否入同步队列。

1. nonfailTryAcquireShared()非公平的tryAcquire,它立马改动了票据状态,而不须要管是否有先来的线程正在等待,而一旦有可用的票据,就直接获得了锁,不须要进入AQS的队列等待同步。

2. tryReleaseShared()方法负责释放共享状态的资源,它仅仅改动了票据状态。由AQS的releaseShared()方法来负责唤醒在AQS队列等待的线程

3. reducePermits()和drainPermits()方法都是直接改动了状态,从而限制可用的资源

abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;

        Sync(int permits) {
            setState(permits);
        }

        final int getPermits() {
            return getState();
        }

        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }

        final void reducePermits(int reductions) {
            for (;;) {
                int current = getState();
                int next = current - reductions;
                if (next > current) // underflow
                    throw new Error("Permit count underflow");
                if (compareAndSetState(current, next))
                    return;
            }
        }

        final int drainPermits() {
            for (;;) {
                int current = getState();
                if (current == 0 || compareAndSetState(current, 0))
                    return current;
            }
        }
    }

Sync也是一个抽象类,详细的实现是NonfailSync和FairSync。代表了非公平实现和公平实现。在上一篇已经提到,所谓的非公平仅仅是说在获取资源时开了一个口子。能够让后来的线程不须要管在AQS队列中的先来的线程来获取资源。而一旦获取失败,就得进入AQS队列等待,而AQS队列是先来先服务的FIFO队列。

能够看到,NonfailSync和FairSync仅仅是在tryAcquireShared方法的实现上不同,其它都是一样的。

/**
     * NonFair version
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;

        NonfairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }

    /**
     * Fair version
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944L;

        FairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }

再来看看Semaphore自己提供的方法,

1.支持可中断和不可中断的获取/释放

2.支持限时获取

3.支持tryXX获取/释放

4. 支持同一时候获取/释放多个资源

能够看到Semaphore的实现都是基于AQS的方法来作的,单个资源的获取/释放操作都是请求1个资源,所以參数传递的是1,多个资源获取传递了一个int个数。

public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

public void acquireUninterruptibly() {
        sync.acquireShared(1);
    }

public boolean tryAcquire() {
        return sync.nonfairTryAcquireShared(1) >= 0;
    }

public boolean tryAcquire(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

public void release() {
        sync.releaseShared(1);
    }

public void acquire(int permits) throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireSharedInterruptibly(permits);
    }

public void acquireUninterruptibly(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireShared(permits);
    }

public boolean tryAcquire(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        return sync.nonfairTryAcquireShared(permits) >= 0;
    }

public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
        throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
    }

public void release(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.releaseShared(permits);
    }

以下用一个实例来測试一下Semaphore的功能。

1. 创建一个有两个票据的Semaphore

2. 创建20个线程来竞争运行race()方法

3. 在race()方法里先打印一句等待获取资源的话,再获取资源,获得资源后打印一句话,最后释放资源,释放资源前打印一句话

package com.lock.test;

import java.util.concurrent.Semaphore;

public class SemaphoreUsecase {
	private Semaphore semaphore = new Semaphore(2);

	public void race(){
		System.out.println("Thread " + Thread.currentThread().getName() + " is waiting the resource");
		semaphore.acquireUninterruptibly();
		try{
			System.out.println("Thread " + Thread.currentThread().getName() + " got the resource");
			try {
				Thread.sleep(3000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}finally{
			System.out.println("Thread " + Thread.currentThread().getName() + " is releasing the resource");
			semaphore.release();
		}
	}

	public static void main(String[] args){
		final SemaphoreUsecase usecase = new SemaphoreUsecase();

		for(int i = 0; i < 10; i++){
			Thread t = new Thread(new Runnable(){

				@Override
				public void run() {
					usecase.race();
				}

			}, String.valueOf(i));
			t.start();
		}
	}
}

測试结果:

能够看到先来的两个线程先获得了资源,后来的线程都在等待,当有线程释放资源之后,等待的线程才会去获得资源,直到都获得/释放资源

Thread 0 is waiting the resource
Thread 0 got the resource
Thread 2 is waiting the resource
Thread 2 got the resource
Thread 1 is waiting the resource
Thread 4 is waiting the resource
Thread 3 is waiting the resource
Thread 5 is waiting the resource
Thread 6 is waiting the resource
Thread 7 is waiting the resource
Thread 8 is waiting the resource
Thread 9 is waiting the resource
Thread 2 is releasing the resource
Thread 0 is releasing the resource
Thread 1 got the resource
Thread 4 got the resource
Thread 1 is releasing the resource
Thread 4 is releasing the resource
Thread 3 got the resource
Thread 5 got the resource
Thread 3 is releasing the resource
Thread 5 is releasing the resource
Thread 6 got the resource
Thread 7 got the resource
Thread 7 is releasing the resource
Thread 6 is releasing the resource
Thread 8 got the resource
Thread 9 got the resource
Thread 8 is releasing the resource
Thread 9 is releasing the resource
时间: 2024-10-08 15:22:36

聊聊高并发(二十五)解析java.util.concurrent各个组件(七) 理解Semaphore的相关文章

聊聊高并发(二十)解析java.util.concurrent各个组件(二) 12个原子变量相关类

这篇说说java.util.concurrent.atomic包里的类,总共12个.网上有非常多文章解析这几个类.这里挑些重点说说. 这12个类能够分为三组: 1. 普通类型的原子变量 2. 数组类型的原子变量 3. 域更新器 普通类型的原子变量的6个, 1. 当中AtomicBoolean, AtomicInteger, AtomicLong, AtomicReference分别相应boolean, int,  long, object完毕主要的原子操作 2. AtomicMarkableRe

聊聊高并发(二十七)解析java.util.concurrent各个组件(九) 理解ReentrantLock可重入锁

这篇讲讲ReentrantLock可重入锁,JUC里提供的可重入锁是基于AQS实现的阻塞式可重入锁.这篇 聊聊高并发(十六)实现一个简单的可重入锁 模拟了可重入锁的实现.可重入锁的特点是: 1. 是互斥锁,基于AQS的互斥模式实现,也就是说同时只有一个线程进入临界区,唤醒下一个线程时也只能释放一个等待线程 2. 可重入,通过设置了一个字段exclusiveOwnerThread来标示当前获得锁的线程.获取锁操作是,如果当前线程是已经获得锁的线程,那么获取操作成功.把当前状态作为获得锁次数的计数器

聊聊高并发(三十)解析java.util.concurrent各个组件(十二) 理解CyclicBarrier栅栏

这篇讲讲CyclicBarrier栅栏,从它的名字可以看出,它是可循环使用的.它的功能和CountDownLatch类似,也是让一组线程等待,然后一起开始往下执行.但是两者还是有几个区别 1. 等待的对象不同.CountDownLatch的一组线程等待的是一个事件,或者说是一个计数器归0的事件.而CyclicBarrier等待的对象是线程,只有线程都到齐了才往下执行 2. 使用方式不同,这个也是由等待的对象不同引起的,CountDownLatch需要调用await()来让线程等待,调用count

谈论高并发(三十)解析java.util.concurrent各种组件(十二) 认识CyclicBarrier栅栏

这次谈话CyclicBarrier栅栏,如可以从它的名字可以看出,它是可重复使用. 它的功能和CountDownLatch类别似,也让一组线程等待,然后开始往下跑起来.但也有在两者之间有一些差别 1. 不同的对象等.CountDownLatch组线程等待的是一个事件.或者说是一个计数器归0的事件.而CyclicBarrier等待的对象是线程,仅仅有线程都到齐了才往下运行 2. 使用方式不同,这个也是由等待的对象不同引起的,CountDownLatch须要调用await()来让线程等待.调用cou

聊聊高并发(四十)解析java.util.concurrent各个组件(十六) ThreadPoolExecutor源代码分析

ThreadPoolExecutor是Executor运行框架最重要的一个实现类.提供了线程池管理和任务管理是两个最主要的能力.这篇通过分析ThreadPoolExecutor的源代码来看看怎样设计和实现一个基于生产者消费者模型的运行器. 生产者消费者模型 生产者消费者模型包括三个角色:生产者,工作队列,消费者.对于ThreadPoolExecutor来说, 1. 生产者是任务的提交者,是外部调用ThreadPoolExecutor的线程 2. 工作队列是一个堵塞队列的接口,详细的实现类能够有非

聊聊高并发(四十一)解析java.util.concurrent各个组件(十七) 任务的异步执行和状态控制

聊聊高并发(三十九)解析java.util.concurrent各个组件(十五) 理解ExecutorService接口的设计这篇说了ExecutorService接口扩展了Executor接口,在执行任务的基础上,提供了执行框架生命周期的管理,任务的异步执行,批量任务的执行的能力.AbstractExecutorService抽象类实现了ExecutorService接口,提供了任务异步执行和批量执行的默认实现.这篇说说任务的异步执行和状态控制 说明一点,使用Executor框架执行任务的方式

聊聊高并发(二十九)解析java.util.concurrent各个组件(十一) 再看看ReentrantReadWriteLock可重入读-写锁

上一篇聊聊高并发(二十八)解析java.util.concurrent各个组件(十) 理解ReentrantReadWriteLock可重入读-写锁 讲了可重入读写锁的基本情况和主要的方法,显示了如何实现的锁降级.但是下面几个问题没说清楚,这篇补充一下 1. 释放锁时的优先级问题,是让写锁先获得还是先让读锁先获得 2. 是否允许读线程插队 3. 是否允许写线程插队,因为读写锁一般用在大量读,少量写的情况,如果写线程没有优先级,那么可能造成写线程的饥饿 关于释放锁后是让写锁先获得还是让读锁先获得,

聊聊高并发(二十六)解析java.util.concurrent各个组件(八) 理解CountDownLatch闭锁

CountDownLatch闭锁也是基于AQS实现的一种同步器,它表示了"所有线程都等待,直到锁打开才继续执行"的含义.它和Semaphore的语意不同, Semaphore的获取和释放操作都会修改状态,都可能让自己或者其他线程立刻拿到锁.而闭锁的获取操作只判断状态是否为0,不修改状态本身,闭锁的释放操作会修改状态,每次递减1,直到状态为0. 所以正常情况下,闭锁的获取操作只是等待,不会立刻让自己获得锁,直到释放操作把状态变为0. 闭锁可以用来实现很多场景,比如: 1. 某个服务依赖于

聊聊高并发(十五)实现一个简单的读-写锁(共享-排他锁)

读写锁是数据库中很常见的锁,又叫共享-排他锁,S锁和X锁.读写锁在大量读少量写的情况下有很高的效率优势. 读写锁是基于普通的互斥锁构建出来的更复杂的锁,它有两个基本特点: 1. 当任一线程持有读锁或写锁时,不允许其他线程再持有写锁 2. 当任一线程持有写锁时,不允许其他线程再持有读锁 也就是说,写锁是排他的,只要有一个线程持有写锁,就不允许其他线程再上锁.读锁是共享的,可以有多个线程持有读锁,但不允许同时持有写锁. 读锁和写锁还存在一个锁升级的问题,比如一个线程先持有了读锁,想升级成写锁,这时候