1.3.4 并发工具类CountDownLatch/Semaphore/CyclicBarrier/FutureTask

CountDownLatch的2个用途:

1. 所有线程都到达相同的起跑线后,再一起开始跑(并非同时开始,而是队列中一个唤醒另一个)【此情况需到达起跑线后再调用await()等待其他线程】

2. 所有线程都到达终点(执行完)后,再一起庆祝 (并非同时开始,而是队列中一个唤醒另一个)【此情况需到达起终点后再调用await()等待其他线程】

package com.study.concurrent_utils;

import java.util.concurrent.CountDownLatch;

public class Test_CountDownLatch {

	/*
	 * 没隔1s开启一个线程,共开启6个线程
	 * 若希望6个线程 同时 执行某一操作
	 * 可以用CountDownLatch实现
	 */
	public static void test01() throws InterruptedException {
		CountDownLatch ctl = new CountDownLatch(6);

		for (int i = 0; i < 6; i++) {
			new Thread() {
				@Override
				public void run() {
					ctl.countDown();
					try {
						ctl.await();
						// 6个线程都启动执行到此处时,打印如下
						System.out.println("here I am...");
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
			}.start();
			Thread.sleep(1000L);
		}
	}

	/*
	 * 开启6个线程,6个线程都执行完后,才执行某个操作
	 * 可以用CountDownLatch来实现
	 */
	public static void test02() throws InterruptedException {
		JamesCountDownLatch ctl = new JamesCountDownLatch(6);

		for (int i = 0; i < 6; i++) {
			new Thread() {
				@Override
				public void run() {
					System.out.println("after print...");
					ctl.countDown();
				}
			}.start();
			Thread.sleep(1000L);
		}

		ctl.await();
		// 6条线程都执行完后同时打印这句话
		System.out.println("main thread do something ...");

	}

	public static void main(String args[]) throws InterruptedException {
		test02();
	}
}

手写CountDownLatch(基于AQS)

countDown()方法:释放共享锁,首先会尝试释放共享锁(其实际是做CAS操作将state减1,如果state减到了0,返回true),如果返回true,说明读锁已释放完,则将等待队列头部线程唤醒。

await()方法:获取共享锁,首先会尝试获取共享锁(其实际操作,获取并判断state值:return getState()==0 ? 1: -1;),若state不是0,即所有线程还没到齐,集体活动还不能开始,此时将其加入等待队列,并且开始自旋,不断判断自己是不是队列头部,即下一个开始跑的是不是自己,是的话就再次尝试获取共享锁,若失败就将自己挂起,若成功即从等待队列移除,并唤醒下一个要获取共享锁的线程。

package com.study.concurrent_utils;

import java.util.concurrent.locks.AbstractQueuedSynchronizer;

public class JamesCountDownLatch {

	private Sync sync;

	public JamesCountDownLatch(int count) {
		sync = new Sync(count);
	}

	public void countDown() {
		sync.releaseShared(1);
	}

	public void await() {
		sync.acquireShared(1);
	}

	class Sync extends AbstractQueuedSynchronizer {
		public Sync(int count) {
			setState(count);
		}

		@Override
		protected int tryAcquireShared(int arg) {
			// 只有当state变为0时,加锁成功
			return getState() == 0 ? 1 : -1;
		}

		/*
		 * countdown的方法
		 */
		@Override
		protected boolean tryReleaseShared(int arg) {
			for (;;) {
				int c = getState();
				if (c == 0)
					return false;
				int nextc = c - arg;
				// 用CAS操作,讲count减一
				if (compareAndSetState(c, nextc)) {
					// 当state=0时,释放锁成功,返回true
					return nextc == 0;
				}
			}
		}
	}
}

手写Semaphore(基于AQS)

package com.study.concurrent_utils;

import java.util.concurrent.locks.AbstractQueuedSynchronizer;

public class JamesSemaphore {

	private Sync sync;

	public JamesSemaphore(int state) {
		sync = new Sync(state);
	}

	public void acquire() {
		sync.acquireShared(1);
	}

	public void release() {
		sync.releaseShared(1);
	}

	class Sync extends AbstractQueuedSynchronizer {
		int state;

		public Sync(int state) {
			this.state = state;
		}

		@Override
		protected int tryAcquireShared(int arg) {
			for (;;) {
				int available = getState();
				int remaining = available - arg;
				if (remaining < 0 || compareAndSetState(available, remaining))
					return remaining;
			}
		}

		@Override
		protected boolean tryReleaseShared(int arg) {
			for (;;) {
				int current = getState();
				int next = current + arg;
				if (next < current) // overflow
					throw new Error("Maximum permit count exceeded");
				if (compareAndSetState(current, next))
					return true;
			}
		}
	}
}

package com.study.concurrent_utils;

import java.util.concurrent.CyclicBarrier;

public class TestCyclicBarrier {
	public static void main(String[] args) throws InterruptedException {
		CyclicBarrier cyclicBarrier = new CyclicBarrier(3, new Runnable() {
			@Override
			public void run() {
				System.out.println(">>>>3个已满,走起<<<");
			}
		});

		for (int i = 0; i < 30; i++) {
			new Thread(new Runnable() {
				@Override
				public void run() {
					try {
						cyclicBarrier.await();
						System.out.println(Thread.currentThread() + ":start...");
					} catch (Exception e) {
						e.printStackTrace();
					}
				}
			}).start();
			Thread.sleep(1000L);
		}
	}
}
>>>>3个已满,走起<<<
Thread[Thread-2,5,main]:start...
Thread[Thread-0,5,main]:start...
Thread[Thread-1,5,main]:start...
>>>>3个已满,走起<<<
Thread[Thread-5,5,main]:start...
Thread[Thread-3,5,main]:start...
Thread[Thread-4,5,main]:start...
>>>>3个已满,走起<<<
Thread[Thread-8,5,main]:start...
Thread[Thread-6,5,main]:start...
Thread[Thread-7,5,main]:start...

手写CyclicBarrier(基于ReentrantLock)

ReentrantLock的Condition就是一个等待队列,ReentrantLock是一个可重入锁

package com.study.concurrent_utils;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class JamesCyclicBarrier {

	private final ReentrantLock lock = new ReentrantLock();
	private final Condition condition = lock.newCondition();

	// 记录当前这个批次有多少个
	private int count = 0;

	// 记录批次的大小
	private final int parties;

	// 分代
	private Object generation = new Object();

	public JamesCyclicBarrier(int parties) {
		if (parties <= 0)
			throw new IllegalArgumentException();
		this.parties = parties;
	}

	// 进入下一个分代
	public void nextGeneration() {
		condition.signalAll();
		count = 0;
		generation = new Object();
	}

	public void await() {
		// 实现排队,需要将线程放到等待队列
		// 还需要将线程挂起
		//
		final ReentrantLock lock = this.lock;
		lock.lock();
		try {
			// 记录当前的generation,相当于记录当前批次的id
			final Object g = generation;

			int index = ++count;
			// 批次已经达到parties,
			if (index == parties) {
				// 进入下一个批次
				nextGeneration();
				return;
			}

			// 若未达到批次,就进入等待
			for (;;) {
				try {
					condition.await();
				} catch (InterruptedException e) {

				}
				if (g != generation) {
					return;
				}
			}

		} finally {
			lock.unlock();
		}
	}
}

Future/Runnable

package com.study.futuretask;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.locks.LockSupport;

public class Demo3_CallableTest {
	public static void main(String args[]) throws InterruptedException, ExecutionException {
		CallTask cTask = new CallTask();
		JamesFutureTask<String> fTask = new JamesFutureTask<String>(cTask);

		// 执行第一次
		Thread th = new Thread(fTask);
		th.start();

		System.out.println("begain to get...");
		String result = fTask.get();
		System.out.println(result);

		// 执行第二次,失败
		Thread th1 = new Thread(fTask);
		th1.start();
	}
}

class CallTask implements Callable<String> {

	@Override
	public String call() throws Exception {
		LockSupport.parkNanos(1000 * 1000 * 1000 * 5L);
		System.out.println("done...");
		return "James";
	}
}

手写FutureTask

package com.study.futuretask;

import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;

public class JamesFutureTask<T> implements Runnable {

	// future只能执行一次
	private volatile int state = NEW;
	private static final int NEW = 0;
	private static final int RUNNING = 1;
	private static final int FINISED = 2;

	public JamesFutureTask(Callable<T> task) {
		this.callable = task;
	}

	// 程序执行的结果
	private T result;

	// 要自行的task
	Callable<T> callable;

	// 获取结果的线层等待队列
	LinkedBlockingQueue<Thread> waiters = new LinkedBlockingQueue<>(100);

	// 执行当前FutureTask的线程,用CAS进行争抢
	AtomicReference<Thread> runner = new AtomicReference<>();

	@Override
	public void run() {
		// 判断当前对象的状态,如果是New就执行,如果
		if (state != NEW || !runner.compareAndSet(null, Thread.currentThread()))
			return;
		state = RUNNING;

		try {
			result = callable.call();
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			state = FINISED;
		}

		// 方法执行完,唤醒所有线程
		while (true) {
			Thread waiter = waiters.poll();
			if (waiter == null)
				break;
			LockSupport.unpark(waiter);
		}

	}

	public T get() {
		if (state != FINISED) {
			waiters.offer(Thread.currentThread());
		}

		while (state != FINISED) {
			LockSupport.park();
		}

		return result;
	}
}

原文地址:https://www.cnblogs.com/yfzhou528/p/11300248.html

时间: 2024-11-05 19:40:29

1.3.4 并发工具类CountDownLatch/Semaphore/CyclicBarrier/FutureTask的相关文章

25.大白话说java并发工具类-CountDownLatch,CyclicBarrier,Semaphore,Exchanger

1. 倒计时器CountDownLatch 在多线程协作完成业务功能时,有时候需要等待其他多个线程完成任务之后,主线程才能继续往下执行业务功能,在这种的业务场景下,通常可以使用Thread类的join方法,让主线程等待被join的线程执行完之后,主线程才能继续往下执行.当然,使用线程间消息通信机制也可以完成.其实,java并发工具类中为我们提供了类似"倒计时"这样的工具类,可以十分方便的完成所说的这种业务场景. 为了能够理解CountDownLatch,举一个很通俗的例子,运动员进行跑

java同步并发工具类CountDownLatch、CyclicBarrier和Semaphore

闭锁CountDownLatch 闭锁是一种同步工具类,可以延迟线程的进度直到其到达终止状态.闭锁的作用相当于一扇门:在闭锁到达结束状态之前,这扇门一直是关闭的,并且没有任何线程能通过,当到达结束状态时,这扇门会打开并允许所有的线程通过.当闭锁到达结束状态后,将不会再改变状态,因此这扇门将永远保持打开状态.闭锁可以用来确保某些活动直到其他活动都完成后才继续执行,例如: 确保某个计算在其需要的所有资源都被初始化之后才继续执行.二元闭锁(包括两个状态)可以用来表示"资源R已经被初始化",而

Java并发工具类 - CountDownLatch

Java并发工具类 - CountDownLatch 1.简介 CountDownLatch是Java1.5之后引入的Java并发工具类,放在java.util.concurrent包下面 http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/package-summary.html 官方API. CountDownLatch能够使一个或多个线程等待其他线程完成各自的工作后再执行:CountDownLatch是JDK 5+里面

【Java并发工具类】Semaphore

前言 1965年,荷兰计算机科学家Dijkstra提出的信号量机制成为一种高效的进程同步机制.这之后的15年,信号量一直都是并发编程领域的终结者.1980年,管程被提出,成为继信号量之后的在并发编程领域的第二个选择.目前几乎所有的语言都支持信号量机制,Java也不例外.Java中提供了Semaphore并发工具类来支持信号量机制.下面我们就来了解Java实现的信号量机制. 首先介绍信号量模型,然后介绍如何使用,最后使用信号量来实现一个限流器. 信号量模型 信号量模型图(图来自参考[1]): 信号

同步工具类 CountDownLatch 和 CyclicBarrier

在开发中,一些异步操作会明显加快执行速度带来更好的体验,但同时也增加了开发的复杂度,想了用好多线程,就必须从这些方面去了解 线程的 wait() notify() notifyall() 方法 线程异步返回 Future ThreadLocal 类 线程池 ThreadPoolExecutor 同步工具类 CountDownLatch,CyclicBarrier,Semaphore,Phaser,Exchanger 估计上面每一个对于 2~3 年的 java 同学来说都是恶梦,比较难以理解,本文

JUC常用同步工具类——CountDownLatch,CyclicBarrier,Semaphore

在 JUC 下包含了一些常用的同步工具类,今天就来详细介绍一下,CountDownLatch,CyclicBarrier,Semaphore 的使用方法以及它们之间的区别. 一.CountDownLatch 先看一下,CountDownLatch 源码的官方介绍. 意思是,它是一个同步辅助器,允许一个或多个线程一直等待,直到一组在其他线程执行的操作全部完成. public CountDownLatch(int count) { if (count < 0) throw new IllegalAr

Java并发编程-线程的并发工具类

Fork-Join 什么是分而治之?规模为N的问题,N<阈值,直接解决,N>阈值,将N分解为K个小规模子问题,子问题互相对立,与原问题形式相同,将子问题的解合并得到原问题的解动态规范工作密取workStealing Fork/Join使用的标准范式 常用的并发工具类CountDownLatch作用:是一组线程等待其他的线程完成工作以后在执行,加强版joinawait用来等待,countDown负责计数器的减一CyclicBarrier让一组线程达到某个屏障,被阻塞,一直到组内最后一个线程达到屏

并发编程(2)--线程的并发工具类

1.线程的并发工具类 Fork-Join 什么是分而治之? 规模为N的问题,N<阈值,直接解决,N>阈值,将N分解为K个小规模子问题,子问题互相对立,与原问题形式相同,将子问题的解合并得到原问题的解 动态规范 工作密取 workStealing Fork/Join使用的标准范式 下面演示第一种用法:由于上下文切换的原因,所以性能上有可能不如单线程效果好. package com.xiangxue.ch2.forkjoin.sum; import java.util.Random; /** *

并发工具类:CountDownLatch、CyclicBarrier、Semaphore

在多线程的场景下,有些并发流程需要人为来控制,在JDK的并发包里提供了几个并发工具类:CountDownLatch.CyclicBarrier.Semaphore. 一.CountDownLatch 1 import java.util.concurrent.CountDownLatch; 2 3 4 public class CountDownLatchTest 5 { //设置N为2 6 static CountDownLatch c = new CountDownLatch(2); 7 p