Semaphore(信号量)使用来控制通知访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。
我们可以这么理解Semaphore,比如一个厕所只有6个坑,同时只能满足6个人上厕所(变态除外),其他人想蹲坑,只能排队等待,如果有人从厕所出来,后面的一个人就可以进去。在这个例子中人就是线程,蹲坑表示线程在执行,离开表示线程执行完毕,而坑的数量就表示Semaphore的个数。
一.Semaphore的应用场景
Semaphore可以用于做流量控制,特别是公用资源有限的应用场景,比如数据库连接。假如有一个需求,要读取几万个文件的数据,因为都是IO密集型任务,我们可以启动几十个线程并发地读取,但是如果读到内存后,还需要存储到数据库中,而数据库的连接数只有10个,这时我们必须控制只有10个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连接。这个时候,就可以使用Semaphore来做流量控制。简单实现如下:
public class SemaphoreTest { private static final int THREAD_COUNT = 30; private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT); //只有10个数据库链接,这里创建10个信号量 private static Semaphore semaphore = new Semaphore(10); public static void main(String[] args){ boolean shutDownThreadPool = false; for(int index = 0; index < THREAD_COUNT ;index++){ threadPool.execute(new Runnable() { @Override public void run() { try { //获取一个信号 semaphore.acquire(); //执行操作 System.out.println("wait for write data..."); Thread.sleep(1000); //释放信号 semaphore.release(); } catch (InterruptedException e) { e.printStackTrace(); } } }); if(index == THREAD_COUNT-1){ shutDownThreadPool = true; } } while(!shutDownThreadPool){ threadPool.shutdown(); } } }
上述代码中虽然创建了30个线程,但是同时只能有是个线程在并发的执行。Semaphore的构造方法Semaphore(int permits)接受一个整型的数字,表示可用的许可证数量。Semaphore(10)表示允许10个线程获取许可证,也就是最大并发数是10。Semaphore的用法也很简单,首先线程使用Semaphore的acquire()方法获取一个许可证,使用完之后调用release()方法归还许可证。还可以用tryAcquire()方法尝试获取许可证。
二.Semaphore的其他方法
int availablePermits():返回当前可用的许可证数量
int getQueueLength():获取正在等待获取许可证的线程数量
boolean hasQueuedThreads():获取是否还有等待获取许可证的线程
void reducePermits(int reduction):减少reduction个许可证