最近把《java并发编程实战》-Java Consurrency in Practice 重温了一遍,把书中提到的一些常用工具记录于此:
一、闭锁(门栓)- CountDownLatch
/** * 闭锁测试(菩提树下的杨过 http://yjmyzz.cnblogs.com/) * * @throws InterruptedException */ @Test public void countdownLatch() throws InterruptedException { CountDownLatch startLatch = new CountDownLatch(1); //类似发令枪 CountDownLatch endLatch = new CountDownLatch(10);//这里的数量,要与线程数相同 for (int i = 0; i < 10; i++) { Thread t = new Thread(() -> { try { startLatch.await(); //先等着,直到发令枪响,防止有线程先run System.out.println(Thread.currentThread().getName() + " is running..."); Thread.sleep(10); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { endLatch.countDown(); //每个线程执行完成后,计数 } }); t.setName("线程-" + i); t.start(); } long start = System.currentTimeMillis(); startLatch.countDown();//发令枪响,所有线程『开跑』 endLatch.await();//等所有线程都完成 long end = System.currentTimeMillis(); System.out.println("done! exec time => " + (end - start) + " ms"); }
线程-1 is running...
线程-5 is running...
线程-8 is running...
线程-4 is running...
线程-3 is running...
线程-0 is running...
线程-2 is running...
线程-9 is running...
线程-7 is running...
线程-6 is running...
done! exec time => 13 ms
public class BoundedHashSet<T> { private final Set<T> set; private final Semaphore semaphore; public BoundedHashSet(int bound) { this.set = Collections.synchronizedSet(new HashSet<T>()); this.semaphore = new Semaphore(bound); } public boolean add(T t) throws InterruptedException { if (!semaphore.tryAcquire(5, TimeUnit.SECONDS)) { return false; } ; boolean added = false; try { added = set.add(t); return added; } finally { if (!added) { semaphore.release(); } } } public boolean remove(Object o) { boolean removed = set.remove(o); if (removed) { semaphore.release(); } return removed; } } @Test public void semaphoreTest() throws InterruptedException { BoundedHashSet<String> set = new BoundedHashSet<>(5); for (int i = 0; i < 6; i++) { if (set.add(i + "")) { System.out.println(i + " added !"); } else { System.out.println(i + " not add to Set!"); } } }
0 added !
1 added !
2 added !
3 added !
4 added !
5 not add to Set!
public class Worker extends Thread { private CyclicBarrier cyclicBarrier; public Worker(CyclicBarrier cyclicBarrier) { this.cyclicBarrier = cyclicBarrier; } private void step1() { System.out.println(this.getName() + " step 1 ..."); } private void step2() { System.out.println(this.getName() + " step 2 ..."); } public void run() { step1(); try { cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } step2(); } } @Test public void cyclicBarrierTest() throws InterruptedException, BrokenBarrierException { CyclicBarrier cyclicBarrier = new CyclicBarrier(11); for (int i = 0; i < 10; i++) { Worker w = new Worker(cyclicBarrier); w.start(); } cyclicBarrier.await(); }
这里我们假设有一个worder线程,里面有2步操作,要求所有线程完成step1后,才能继续step2. 执行结果如下:
Thread-0 step 1 ...
Thread-1 step 1 ...
Thread-2 step 1 ...
Thread-3 step 1 ...
Thread-4 step 1 ...
Thread-5 step 1 ...
Thread-6 step 1 ...
Thread-7 step 1 ...
Thread-8 step 1 ...
Thread-9 step 1 ...
Thread-9 step 2 ...
Thread-0 step 2 ...
Thread-3 step 2 ...
Thread-4 step 2 ...
Thread-6 step 2 ...
Thread-2 step 2 ...
Thread-1 step 2 ...
Thread-8 step 2 ...
Thread-7 step 2 ...
Thread-5 step 2 ...
@Test public void exchangerTest() { Exchanger<String> exchanger = new Exchanger<>(); Thread t1 = new Thread(() -> { String temp = "AAAAAA"; System.out.println("thread 1 交换前:" + temp); try { temp = exchanger.exchange(temp); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("thread 1 交换后:" + temp); }); Thread t2 = new Thread(() -> { String temp = "BBBBBB"; System.out.println("thread 2 交换前:" + temp); try { temp = exchanger.exchange(temp); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("thread 2 交换后:" + temp); }); t1.start(); t2.start(); }
thread 1 交换前:AAAAAA
thread 2 交换前:BBBBBB
thread 2 交换后:AAAAAA
thread 1 交换后:BBBBBB
@Test public void futureTaskTest() throws ExecutionException, InterruptedException, TimeoutException { Callable<String> callable = () -> { System.out.println("很耗时的操作处理中。。。"); Thread.sleep(5000); return "done"; }; FutureTask<String> futureTask = new FutureTask<>(callable); System.out.println("就绪。。。"); new Thread(futureTask).start(); System.out.println("主线程其它处理。。。"); System.out.println(futureTask.get()); System.out.println("处理完成!"); System.out.println("-----------------"); System.out.println("executor 就绪。。。"); ExecutorService executorService = Executors.newSingleThreadExecutor(); Future<String> future = executorService.submit(callable); System.out.println(future.get(10, TimeUnit.SECONDS)); }
executor 就绪。。。
@Test public void blockingQueueTest() throws InterruptedException { final BlockingQueue<String> blockingDeque = new ArrayBlockingQueue<>(5); Thread producer = new Thread() { public void run() { Random rnd = new Random(); while (true) { try { int i = rnd.nextInt(10000); blockingDeque.put(i + ""); System.out.println(this.getName() + " 产生了一个数字:" + i); Thread.sleep(rnd.nextInt(50));//模拟生产者快速生产 } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } }; producer.setName("producer 1"); Thread consumer = new Thread() { public void run() { while (true) { Random rnd = new Random(); try { String i = blockingDeque.take(); System.out.println(this.getName() + " 消费了一个数字:" + i); Thread.sleep(rnd.nextInt(10000));//消费者模拟慢速消费 } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } }; consumer.setName("consumer 1"); producer.start(); consumer.start(); while (true) { Thread.sleep(100); } }
producer 1 产生了一个数字:6773
consumer 1 消费了一个数字:6773
producer 1 产生了一个数字:4456
producer 1 产生了一个数字:8572
producer 1 产生了一个数字:5764
producer 1 产生了一个数字:2874
producer 1 产生了一个数字:780 # 注意这里就已经堵住了,直到有消费者消费一条数据,才能继续生产
consumer 1 消费了一个数字:4456
producer 1 产生了一个数字:4193