CountDownLatch的2个用途:
1. 所有线程都到达相同的起跑线后,再一起开始跑(并非同时开始,而是队列中一个唤醒另一个)【此情况需到达起跑线后再调用await()等待其他线程】
2. 所有线程都到达终点(执行完)后,再一起庆祝 (并非同时开始,而是队列中一个唤醒另一个)【此情况需到达起终点后再调用await()等待其他线程】
package com.study.concurrent_utils; import java.util.concurrent.CountDownLatch; public class Test_CountDownLatch { /* * 没隔1s开启一个线程,共开启6个线程 * 若希望6个线程 同时 执行某一操作 * 可以用CountDownLatch实现 */ public static void test01() throws InterruptedException { CountDownLatch ctl = new CountDownLatch(6); for (int i = 0; i < 6; i++) { new Thread() { @Override public void run() { ctl.countDown(); try { ctl.await(); // 6个线程都启动执行到此处时,打印如下 System.out.println("here I am..."); } catch (InterruptedException e) { e.printStackTrace(); } } }.start(); Thread.sleep(1000L); } } /* * 开启6个线程,6个线程都执行完后,才执行某个操作 * 可以用CountDownLatch来实现 */ public static void test02() throws InterruptedException { JamesCountDownLatch ctl = new JamesCountDownLatch(6); for (int i = 0; i < 6; i++) { new Thread() { @Override public void run() { System.out.println("after print..."); ctl.countDown(); } }.start(); Thread.sleep(1000L); } ctl.await(); // 6条线程都执行完后同时打印这句话 System.out.println("main thread do something ..."); } public static void main(String args[]) throws InterruptedException { test02(); } }
手写CountDownLatch(基于AQS)
countDown()方法:释放共享锁,首先会尝试释放共享锁(其实际是做CAS操作将state减1,如果state减到了0,返回true),如果返回true,说明读锁已释放完,则将等待队列头部线程唤醒。
await()方法:获取共享锁,首先会尝试获取共享锁(其实际操作,获取并判断state值:return getState()==0 ? 1: -1;),若state不是0,即所有线程还没到齐,集体活动还不能开始,此时将其加入等待队列,并且开始自旋,不断判断自己是不是队列头部,即下一个开始跑的是不是自己,是的话就再次尝试获取共享锁,若失败就将自己挂起,若成功即从等待队列移除,并唤醒下一个要获取共享锁的线程。
package com.study.concurrent_utils; import java.util.concurrent.locks.AbstractQueuedSynchronizer; public class JamesCountDownLatch { private Sync sync; public JamesCountDownLatch(int count) { sync = new Sync(count); } public void countDown() { sync.releaseShared(1); } public void await() { sync.acquireShared(1); } class Sync extends AbstractQueuedSynchronizer { public Sync(int count) { setState(count); } @Override protected int tryAcquireShared(int arg) { // 只有当state变为0时,加锁成功 return getState() == 0 ? 1 : -1; } /* * countdown的方法 */ @Override protected boolean tryReleaseShared(int arg) { for (;;) { int c = getState(); if (c == 0) return false; int nextc = c - arg; // 用CAS操作,讲count减一 if (compareAndSetState(c, nextc)) { // 当state=0时,释放锁成功,返回true return nextc == 0; } } } } }
手写Semaphore(基于AQS)
package com.study.concurrent_utils; import java.util.concurrent.locks.AbstractQueuedSynchronizer; public class JamesSemaphore { private Sync sync; public JamesSemaphore(int state) { sync = new Sync(state); } public void acquire() { sync.acquireShared(1); } public void release() { sync.releaseShared(1); } class Sync extends AbstractQueuedSynchronizer { int state; public Sync(int state) { this.state = state; } @Override protected int tryAcquireShared(int arg) { for (;;) { int available = getState(); int remaining = available - arg; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } @Override protected boolean tryReleaseShared(int arg) { for (;;) { int current = getState(); int next = current + arg; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } } } }
package com.study.concurrent_utils; import java.util.concurrent.CyclicBarrier; public class TestCyclicBarrier { public static void main(String[] args) throws InterruptedException { CyclicBarrier cyclicBarrier = new CyclicBarrier(3, new Runnable() { @Override public void run() { System.out.println(">>>>3个已满,走起<<<"); } }); for (int i = 0; i < 30; i++) { new Thread(new Runnable() { @Override public void run() { try { cyclicBarrier.await(); System.out.println(Thread.currentThread() + ":start..."); } catch (Exception e) { e.printStackTrace(); } } }).start(); Thread.sleep(1000L); } } }
>>>>3个已满,走起<<< Thread[Thread-2,5,main]:start... Thread[Thread-0,5,main]:start... Thread[Thread-1,5,main]:start... >>>>3个已满,走起<<< Thread[Thread-5,5,main]:start... Thread[Thread-3,5,main]:start... Thread[Thread-4,5,main]:start... >>>>3个已满,走起<<< Thread[Thread-8,5,main]:start... Thread[Thread-6,5,main]:start... Thread[Thread-7,5,main]:start...
手写CyclicBarrier(基于ReentrantLock)
ReentrantLock的Condition就是一个等待队列,ReentrantLock是一个可重入锁
package com.study.concurrent_utils; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; public class JamesCyclicBarrier { private final ReentrantLock lock = new ReentrantLock(); private final Condition condition = lock.newCondition(); // 记录当前这个批次有多少个 private int count = 0; // 记录批次的大小 private final int parties; // 分代 private Object generation = new Object(); public JamesCyclicBarrier(int parties) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; } // 进入下一个分代 public void nextGeneration() { condition.signalAll(); count = 0; generation = new Object(); } public void await() { // 实现排队,需要将线程放到等待队列 // 还需要将线程挂起 // final ReentrantLock lock = this.lock; lock.lock(); try { // 记录当前的generation,相当于记录当前批次的id final Object g = generation; int index = ++count; // 批次已经达到parties, if (index == parties) { // 进入下一个批次 nextGeneration(); return; } // 若未达到批次,就进入等待 for (;;) { try { condition.await(); } catch (InterruptedException e) { } if (g != generation) { return; } } } finally { lock.unlock(); } } }
Future/Runnable
package com.study.futuretask; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; import java.util.concurrent.locks.LockSupport; public class Demo3_CallableTest { public static void main(String args[]) throws InterruptedException, ExecutionException { CallTask cTask = new CallTask(); JamesFutureTask<String> fTask = new JamesFutureTask<String>(cTask); // 执行第一次 Thread th = new Thread(fTask); th.start(); System.out.println("begain to get..."); String result = fTask.get(); System.out.println(result); // 执行第二次,失败 Thread th1 = new Thread(fTask); th1.start(); } } class CallTask implements Callable<String> { @Override public String call() throws Exception { LockSupport.parkNanos(1000 * 1000 * 1000 * 5L); System.out.println("done..."); return "James"; } }
手写FutureTask
package com.study.futuretask; import java.util.concurrent.Callable; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.LockSupport; public class JamesFutureTask<T> implements Runnable { // future只能执行一次 private volatile int state = NEW; private static final int NEW = 0; private static final int RUNNING = 1; private static final int FINISED = 2; public JamesFutureTask(Callable<T> task) { this.callable = task; } // 程序执行的结果 private T result; // 要自行的task Callable<T> callable; // 获取结果的线层等待队列 LinkedBlockingQueue<Thread> waiters = new LinkedBlockingQueue<>(100); // 执行当前FutureTask的线程,用CAS进行争抢 AtomicReference<Thread> runner = new AtomicReference<>(); @Override public void run() { // 判断当前对象的状态,如果是New就执行,如果 if (state != NEW || !runner.compareAndSet(null, Thread.currentThread())) return; state = RUNNING; try { result = callable.call(); } catch (Exception e) { e.printStackTrace(); } finally { state = FINISED; } // 方法执行完,唤醒所有线程 while (true) { Thread waiter = waiters.poll(); if (waiter == null) break; LockSupport.unpark(waiter); } } public T get() { if (state != FINISED) { waiters.offer(Thread.currentThread()); } while (state != FINISED) { LockSupport.park(); } return result; } }
原文地址:https://www.cnblogs.com/yfzhou528/p/11300248.html
时间: 2024-11-05 19:40:29