Java_并发线程_Condition

1.概述

使用Condition应在Lock的前提下,请先参见Java_并发线程_Lock、ReadWriteLock一文。在synchronized同步代码块中使用了obj的锁对象,然后通过obj.notify()和obj.wait()来配合处理多线程的问题。然而,同样lock和condition配合使用同样可以完成同样的功能,condition只有配合lock使用才有意义,只不过lock更加的灵活,使用的格式如下。

//lock 与 Condition
private static ReentrantLock lock = new ReentrantLock();
private static Condition condition1 = lock.newCondition();
{
	lock.lock();
	try{
		//
		condition1.await();//condition1应在与其对应的lock区间被调用,等待其它线程调用 condition1.signal();
		//
	}finally{
		lock.unlock();
	}
}

2.await()与signal()原理分析

static final class Node {
    volatile Node prev;
    volatile Node next;
    volatile Thread thread;
    Node nextWaiter;
}
public class ConditionObject implements Condition, java.io.Serializable {
	/** First node of condition queue. */
	private transient Node firstWaiter;
	/** Last node of condition queue. */
	private transient Node lastWaiter;

	//...
}

Condition内部维护了一个Node的双向链表,调用condition.await(),则会新创建一个Node节点,lastWaiter指向这个新创建的节点对象;每次调用signal(),则会通过firstWaiter从链表的前面拿到Node对象,并将firstWaiter指向当前Node.nextWaiter对象。然后对这个Node对象进行操作判断。

(1).await()

作用:当前线程休眠停止调度;是否锁;置于队列等待signal()

    /**
     * 如果当前线程中断,则抛出InterruptedException
     * 阻塞直到 调用了signal和线程被中断
     */
    public final void await() throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        Node node = addConditionWaiter();

        //新创建node对象,放入链表尾
        int savedState = fullyRelease(node);
        int interruptMode = 0;
        //判断是否在同步队列
        while (!isOnSyncQueue(node)) {
            LockSupport.park(this);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }

        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }

(2).signal()

    /**
     * Moves the longest-waiting thread, if one exists, from the
     * wait queue for this condition to the wait queue for the
     * owning lock.
     *
     * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
     *         returns {@code false}
     */
    public final void signal() {
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        Node first = firstWaiter;
        if (first != null)
            doSignal(first);
    }

3.自定义阻塞队列

class BoundedBuffer {
	final Lock lock = new ReentrantLock();
	final Condition notFull = lock.newCondition();
	final Condition notEmpty = lock.newCondition();

	final Object[] items = new Object[100];
	int putptr, takeptr, count;

	public void put(Object x) throws InterruptedException {
		lock.lock();
		try {
			while (count == items.length)
				//等待没有慢
				notFull.await();
			items[putptr] = x;
			if (++putptr == items.length)
				putptr = 0;
			++count;
			//已经不为空
			notEmpty.signal();
		} finally {
			lock.unlock();
		}
	}

	public Object take() throws InterruptedException {
		lock.lock();
		try {
			while (count == 0)
				//等待不为空
				notEmpty.await();
			Object x = items[takeptr];
			if (++takeptr == items.length)
				takeptr = 0;
			--count;
			//已经不为满
			notFull.signal();
			return x;
		} finally {
			lock.unlock();
		}
	}
}

4.应用实例

	private static ReentrantLock lock = new ReentrantLock();
	private static Condition condition1 = lock.newCondition();

	public static void main(String[] args) {
		new Thread() {
			@Override
			public void run() {
				lock.lock();
				try {
					//
					System.out.println(Thread.currentThread().getName() + ", locked");
					try {
						condition1.await();
						System.out.println(Thread.currentThread().getName() + ", awaited");
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
					System.out.println(Thread.currentThread().getName() + ", will finally");
				} finally {
					System.out.println(Thread.currentThread().getName() + ", finally");
					lock.unlock();
					System.out.println(Thread.currentThread().getName() + ", unlocked");
				}
			}
		}.start();
		new Thread() {
			@Override
			public void run() {
				lock.lock();
				try {
					System.out.println(Thread.currentThread().getName() + ", locked");
					condition1.signal();//但是没有释放锁,等待lock.unlock()后,condition1对应的线程才被唤醒,和synchronized一样
					System.out.println(Thread.currentThread().getName() + ", will finally");
				} finally {
					System.out.println(Thread.currentThread().getName() + ", finally");

					try {
						Thread.sleep(10);
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
					System.out.println(Thread.currentThread().getName() + ", unlocked");
					lock.unlock();
				}
			}
		}.start();
	}
	/*
	 *	Thread-1, locked
		Thread-2, locked
		Thread-2, will finally
		Thread-2, finally
		Thread-2, unlocked
		Thread-1, awaited
		Thread-1, will finally
		Thread-1, finally
		Thread-1, unlocked
	 */
时间: 2024-10-10 00:46:35

Java_并发线程_Condition的相关文章

Java_并发线程_Semaphore、CountDownLatch、CyclicBarrier、Exchanger

1.Semaphore 信号量(Semaphore),有时被称为信号灯,是在多线程环境下使用的一种设施, 它负责协调各个线程, 以保证它们能够正确.合理的使用公共资源. Semaphore当前在多线程环境下被扩放使用,操作系统的信号量是个很重要的概念,在进程控制方面都有应用.Java并发库Semaphore 可以很轻松完成信号量控制,Semaphore可以控制某个资源可被同时访问的个数,通过 acquire() 获取一个许可,如果没有就等待,而 release() 释放一个许可.比如在Windo

Java_并发线程_Futrue、FutureTask、Callable

1.Futrue public interface Future<V> //Future 表示异步计算的结果 ExecutorService threadPool = Executors.newSingleThreadExecutor(); Future<String> future = threadPool.submit(new Callable<String>() { public String call() throws Exception { Thread.sl

Java_并发线程_CompletionService

1.CompletionService源码分析 CompletionService内部实现还是维护了一个可阻塞的队列,通过代理设计模式,从而操作队列. /** * Creates an ExecutorCompletionService using the supplied * executor for base task execution and a * {@link LinkedBlockingQueue} as a completion queue. * * @param executo

c++11 条件变量 生产者-消费者 并发线程

http://baptiste-wicht.com/posts/2012/04/c11-concurrency-tutorial-advanced-locking-and-condition-variables.html ? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 struc

Java 并发 线程同步

Java 并发 线程同步 @author ixenos 同步 1.异步线程本身包含了执行时需要的数据和方法,不需要外部提供的资源和方法,在执行时也不关心与其并发执行的其他线程的状态和行为 2.然而,大多数实际的多线程应用中,两个或两个以上的线程需要共享对同一数据的存取,这将产生同步问题(可见性和同步性的丢失) 比如两个线程同时执行指令account[to] += amount,这不是原子操作,可能被处理如下: a)将account[to]加载到寄存器 b)增加amount c)将结果写回acco

Java 并发 线程属性

Java 并发 线程属性 @author ixenos 线程优先级 1.每当线程调度器有机会选择新线程时,首先选择具有较高优先级的线程 2.默认情况下,一个线程继承它的父线程的优先级 当在一个运行的线程A里,创建另一个线程B的时候,那么A是父线程,B是子线程.当在一个运行的线程A里,创建线程B,然后又创建了线程C,这时候虽然B比C创建早,可是B并不是C的父线程,而A是B和C的父线程. 3.线程的优先级高度依赖于系统,当虚拟机依赖于宿主机平台的线程实现机制时,Java线程的优先级被映射到宿主机平台

Java 并发 线程的优先级

Java 并发 线程的优先级 @author ixenos 低优先级线程的执行时刻 1.在任意时刻,当有多个线程处于可运行状态时,运行系统总是挑选一个优先级最高的线程执行,只有当线程停止.退出或者由于某些原因不执行的时候,低优先级的线程才可能被执行 2.两个优先级相同的线程同时等待执行时,那么运行系统会以round-robin的方式选择一个线程执行(即轮询调度,以该算法所定的)(Java的优先级策略是抢占式调度!) 3.被选中的线程可因为一下原因退出,而给其他线程执行的机会: 1) 一个更高优先

控制每次线程池的并发线程的最大个数

[本人原创],欢迎交流和分形技术,转载请附上如下内容: 作者:itshare [转自]http://www.cnblogs.com/itshare/ 1. 实验目的:       使用线程池的时候,有时候需要考虑服务器的最大线程数目和程序最快执行所有业务逻辑的取舍.并非逻辑线程越多也好,而且新的逻辑线程必须会在线程池的等待队列中等待 ,直到线程池中工作的线程执行完毕,才会有系统线程取出等待队列中的逻辑线程,进行CPU运算. 2.  解决问题:     <a>如果不考虑服务器实际可支持的最大并行

使用CountDownLatch和CyclicBarrier处理并发线程

闲话不说,首先看一段代码: { IValueCallback remoteCallback = new IValueCallback.Stub() { <strong><span style="color:#ff0000;">(B)</span></strong> public void onReceiveValue(final Bundle value) throws RemoteException { synchronized (sy