在API中是这样来描述Semaphore
的
Semaphore 通常用于限制可以访问某些资源(物理或逻辑的)的线程数目。
一个计数信号量。从概念上讲,信号量维护了一个许可集。如有必要,在许可可用前会阻塞每一个 acquire()
,然后再获取该许可。每个
release()
添加一个许可,从而可能释放一个正在阻塞的获取者。但是,不使用实际的许可对象,Semaphore
只对可用许可的号码进行计数,并采取相应的行动。
例如,下面的类使用信号量控制线程并发的数量
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; public class TestSemaphore { /** * @param args */ public static void main(String[] args) { ExecutorService pool = Executors.newCachedThreadPool(); final Semaphore sp = new Semaphore(3,true); for(int i=0;i<10;i++){ Runnable runnable = new Runnable() { @Override public void run() { try { sp.acquire(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(sp.availablePermits()); System.out.println("线程 "+ Thread.currentThread().getName() +"进入,已有"+ (3-sp.availablePermits())+ "并发") ; try { Thread.sleep((long) (Math.random()*3000)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("线程 "+Thread.currentThread().getName() +"即将离开 " ); sp.release(); System.out.println("线程 "+Thread.currentThread().getName() +"离开 ,已有"+ (3-sp.availablePermits()) + "并发"); } }; pool.execute(runnable); } } }
再例如可以通过信号量来控制线程访问资源:
import java.util.concurrent.Semaphore; public class DownloadThread { private static int in_index = 0; private static int out_index = 0; private static int buffer_count = 100; public static boolean g_downloadComplete; private static Semaphore g_seFull = new Semaphore(0); private static Semaphore g_seEmpty = new Semaphore(buffer_count); public static boolean getBlockFromNet(int in_index) { int i = 0; while (i < 10000) i++; if (in_index < buffer_count - 1) return false; else return true; } public static void writeBlockToDisk(int out_index) { int i = 0; while (i < 100000) i++; } /** * @param args */ public static void main(String[] args) { g_downloadComplete = false; Thread threadA = new Thread() { public void run() { proA(); } }; Thread threadB = new Thread() { public void run() { proB(); } }; threadB.start(); threadA.start(); } public static void proA(){ while (g_seFull.availablePermits() < buffer_count) { try { g_seEmpty.acquire(); } catch (InterruptedException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } g_downloadComplete = getBlockFromNet(in_index); in_index = (in_index + 1) % buffer_count; g_seFull.release(); System.out.println("download a block " + in_index); if (g_downloadComplete) break; } } public static void proB(){ while (g_seEmpty.availablePermits() > 0) { try { g_seFull.acquire(); } catch (InterruptedException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } writeBlockToDisk(out_index); out_index = (out_index + 1) % buffer_count; g_seEmpty.release(); System.out.println("write a block " + out_index); if (g_downloadComplete && out_index == in_index) break; } } }
时间: 2024-10-14 13:08:39