线程基础知识系列(四)线程的同步2 线程通信和Condition变量

本文是系列的第四篇。

线程基础知识系列(三)线程的同步  :同步控制,锁及synchronized

线程基础知识系列(二)线程的管理 :线程的状态,控制,休眠,Interrupt,yield等

线程基础知识系列(一)线程的创建和启动  :线程的创建和启动,join(),daemon线程,Callable任务。

第三篇文章,重点阐述了如何使用锁和同步块对线程间共享可变变量保护,保证只有一个线程可以进入临界区。其实,没有过多的涉及另一个重要的同步概念:线程协作。第三篇中涉及的线程间并没有有效的协调。本篇重点阐述线程间的协调工作,具体包含以下主题。

  1. Wait-Notify机制
  2. 认识Condition
  3. 应用Condition

1.Wait-Notify机制

相关Object API.,一般使用在线程间通信

public class Object {

public final native void wait(long timeout) throws InterruptedException;
public final void wait(long timeout, int nanos) throws InterruptedException {
    ...
    wait(timeout);
}
public final void wait() throws InterruptedException {
    wait(0);
}

public final native void notifyAll();
public final native void notify();
}

  1)wait()、notify()和notifyAll()方法是本地方法,并且为final方法,无法被重写。

  2)调用某个对象的wait()方法能让当前线程阻塞,并且当前线程必须拥有此对象的monitor(即锁)

  3)调用某个对象的notify()方法能够唤醒一个正在等待这个对象的monitor的线程,如果有多个线程都在等待这个对象的monitor,则只能唤醒其中一个线程;

  4)调用notifyAll()方法能够唤醒所有正在等待这个对象的monitor的线程;

所谓的线程通信,也就是Wait-Notify场景。一般与以下内容相关:

1、状态变量(State Variable)也就是共享可变变量。
当线程需要wait的时候,总是因为一些状态不满足导致的。如往BlockingQueue里加元素队列已满的时候。这里的状态变量就是指“队列元素数目”。

2、条件断言(Condition Predicate)
当线程确定是否进入wait或者从notify中醒来的时候是否继续往下执行,大都要测试状态条件是否满足,如往BlockingQueue里加元素队列已满,于是阻塞,后续其它线程从队列里取走了元素,就通知在等待的线程“队列不是满的了,可以往里加东西了”,这时候在等待的线程就会醒来,然后看看是不是真的队列不为满的状态,如果是,就将元素添加进去,如果不是,就继续等待。这儿的条件断言,就是指“队列中持有元素数目是否等于队列长度。

3、内置条件队列(Condition Queue)
每个对象都有一个内置的条件队列,当一个线程在该对象是调用wait的时候,就会将该线程加入该对象的条件队列。具体可以查看<深入JVM锁机制之一:synchronized>文章,了解下synchronized底层原理。

文章截图

简单强调下:

必须在临界区中调用wait,notify,notifyAll方法。否则报java.lang.IllegalMonitorStateException异常。

如果持有其他锁,调用obj.wait,notify方法,也是不可以的。

错误情形1

package com.ticmy.concurrency;
public class TestWatiNotifyMechanism {
    private static Object obj = new Object();
    public static void main(String[] args) throws Exception {
        //错误代码,无意义,仅测试用
        obj.wait();
        //obj.notify();
    }
}

错误情形2

package com.ticmy.concurrency;
public class TestWatiNotifyMechanism {
    private static Object obj = new Object();
    public static void main(String[] args) throws Exception {
        //错误代码,无意义,仅测试用
        synchronized(TestWatiNotifyMechanism.class) {
            obj.wait();
            //obj.notify();
        }
    }
}

为了更好的说明wait-notify的含义。参考《Java并发编程实战》中的例子

1.BaseBoundedBuffer.jar 有界缓存基类

package net.jcip.examples;

import net.jcip.annotations.*;

/**
 * BaseBoundedBuffer
 * <p/>
 * Base class for bounded buffer implementations
 *
 * @author Brian Goetz and Tim Peierls
 */
@ThreadSafe
public abstract class BaseBoundedBuffer <V> {
    @GuardedBy("this") private final V[] buf;
    @GuardedBy("this") private int tail;
    @GuardedBy("this") private int head;
    @GuardedBy("this") private int count;

    protected BaseBoundedBuffer(int capacity) {
        this.buf = (V[]) new Object[capacity];
    }

    protected synchronized final void doPut(V v) {
        buf[tail] = v;
        if (++tail == buf.length)
            tail = 0;
        ++count;
    }

    protected synchronized final V doTake() {
        V v = buf[head];
        buf[head] = null;
        if (++head == buf.length)
            head = 0;
        --count;
        return v;
    }

    public synchronized final boolean isFull() {
        return count == buf.length;
    }

    public synchronized final boolean isEmpty() {
        return count == 0;
    }
}

2.BoundedBuffer.java ,有界缓存实现

package net.jcip.examples;

import net.jcip.annotations.*;

/**
 * BoundedBuffer
 * <p/>
 * Bounded buffer using condition queues
 *
 * @author Brian Goetz and Tim Peierls
 */
@ThreadSafe
        public class BoundedBuffer <V> extends BaseBoundedBuffer<V> {
    // CONDITION PREDICATE: not-full (!isFull())
    // CONDITION PREDICATE: not-empty (!isEmpty())
    public BoundedBuffer() {
        this(100);
    }

    public BoundedBuffer(int size) {
        super(size);
    }

    // BLOCKS-UNTIL: not-full
    public synchronized void put(V v) throws InterruptedException {
        while (isFull())
            wait();
        doPut(v);
        notifyAll();
    }

    // BLOCKS-UNTIL: not-empty
    public synchronized V take() throws InterruptedException {
        while (isEmpty())
            wait();
        V v = doTake();
        notifyAll();
        return v;
    }

    // BLOCKS-UNTIL: not-full
    // Alternate form of put() using conditional notification
    public synchronized void alternatePut(V v) throws InterruptedException {
        while (isFull())
            wait();
        boolean wasEmpty = isEmpty();
        doPut(v);
        if (wasEmpty)
            notifyAll();
    }
}

由于逻辑简单,不多说了。 可以想一下为什么使用notifyAll().从中,可以对号入座,

状态变量在哪?条件断言在哪?条件队列,因为使用synchronized保证的同步,所以是JVM底层实现的。如果你有心的话,应该注意一个情况:wait操作,都出现在循环中

为什么要在循环中wait?有以下几个原因。
1、一个对象锁可能用于保护多个状态变量,当它们都需要wait-notify操作时,如果不将wait放到while中就有问题。例如,某对象锁obj保护两种状态变量a和b,当a的条件断言不成立时发生了wait操作,当b的条件断言不成立时也发生了wait操作,两个线程被加入到obj对应的条件队列中,现在若改变状态变量a的某操作发生,在obj上调用了notifyAll操作,obj对应的条件队列里的所有线程均被唤醒,之前等待a的某个或几个线程去判断a的条件断言可能成立了,但b对应的条件断言肯定仍不成立,而此时等待b的线程也被唤醒了,所以需要循环判断b的条件断言是否满足,如果不满足,继续wait。
2、多个线程wait的同一个状态的条件断言。如BlockingQueue场景下,当前队列是空的,多个线程要从里面取元素,于是都wait了。此时另一个线程往里面添加了一个元素,调用了notifyAll操作,唤醒了所有线程,但只有一个线程能拿到那个新加进来的元素,继续走下去,其它的仍需等待。
3、虚假唤醒。在没有被通知、中断或超时的情况下,线程自动苏醒了。虽然这种情况在实践中很少发生,但是必须通过循环检测条件是否满足的方式来防止其发生,如果不满足该条件,则继续等待。

2.认识Condition

Condition是在java 1.5中才出现的,它用来替代传统的Object的wait()、notify()实现线程间的协作,相比使用Object的wait()、notify(),使用Condition1的await()、signal()这种方式实现线程间协作更加安全和高效。因此通常来说比较推荐使用Condition,在阻塞队列那一篇博文中就讲述到了,阻塞队列实际上是使用了Condition来模拟线程间协作。
    Condition是个接口,基本的方法就是await()和signal()方法;
    Condition依赖于Lock接口,生成一个Condition的基本代码是lock.newCondition()
    调用Condition的await()和signal()方法,都必须在lock保护之内
 Conditon中的await()对应Object的wait();
 Condition中的signal()对应Object的notify();
 Condition中的signalAll()对应Object的notifyAll()。

强调:这里的Condition可以对应条件断言(Condition Predicate),一并理解。需要说到的是,它与内置锁、内置条件队列的区别

1、内置锁对象只有一个条件队列,而显式锁可以通过newCondition方法创建多个条件队列,这样就可以避免不同的条件断言关联同一个条件队列造成的问题。
2、如同Lock比内置锁更灵活一样,显式的条件队列也提供了更多的方法供调用(如等待的时候不可被中断的awaitUninterruptibly方法),更多方法参见java.util.concurrent.locks.Condition的JAVA API。
3、Condition也有wait、notify方法,它们从Object类继承而来,一般实际中不会调用这些方法(要调用这些方法必须持有Condition对象的锁,而不是Lock的锁定)以避免混淆。
4、Condition可以继承Lock的公平策略。如new ReentrantLock的时候传入的公平策略参数。当公平策略为true的时候,signal的时候,Condition中的线程唤醒顺序是FIFO的。

至于是选择显式的条件队列还是内置的,如同内置锁和Lock一样,取决于应用是否需要使用内置条件队列无法提供而显式条件队列提供了的特性。如果已使用了Lock,那么使用Condition是自然而然的事情。

官方API中提供的例子,如下

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class BoundedBuffer<T> {
     final Lock lock = new ReentrantLock();
     final Condition notFull  = lock.newCondition();
     final Condition notEmpty = lock.newCondition();
     final T[] items =(T[]) new Object[10];
     int putptr, takeptr, count;
     public void put(T x) throws InterruptedException {
       lock.lock();
       try {
         while (count == items.length) {
             notFull.await();
         }
         items[putptr] = x;
         if (++putptr == items.length) putptr = 0;
         ++count;
         notEmpty.signal();
       } finally {
         lock.unlock();
       }
     }
  
     public T take() throws InterruptedException {
       lock.lock();
       try {
         while (count == 0) {
             notEmpty.await();
         }
         T x = items[takeptr];
         if (++takeptr == items.length) takeptr = 0;
         --count;
         notFull.signal();
         return x;
       } finally {
         lock.unlock();
       }
     }
   }

可以和上文中的BoundedBuffer对比阅读。

两者的最大不同:除了同步形式不同,更主要的是方法一,涉及到竞态的地方都用了synchronized,这样做,虽然保证了线程安全,但降低了并发。

3.应用Condition

Condition到底有啥用,相信有人和我一样,感到很困惑。不妨做个例子,使用Lock,但不借助Condition的情况,是什么样的。为了更好说明Condition的使用场景,例子最好要有多个竞态条件。

下面就使用Condition来完成一个线程面试题:有三个线程分别打印A、B、C,请用多线程编程实现,在屏幕上循环打印10次ABCABC…

3.1不使用Condition版本

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ConditionExample {
    private static   Object lock = new Object();
    /** 当前线程的名字 */
    private static volatile char currentThreadName = ‘A‘;
    private static class ThreadA implements Runnable {
        public void run() {
            for (int i = 0; i < 10; i++) {
                synchronized (lock){
                    {
                        while (currentThreadName != ‘A‘) {
                            try {
                            /*
                             * 如果当前线程名字不是A,那么ThreadA就处理等待状态
                             */
                                lock.wait();
                            } catch (InterruptedException e) {

                            }
                        }
                    /*
                     * 打印出第几遍以及A信息
                     */
                        System.out.println(String.format("第%d遍", i + 1));
                        System.out.println("A");

                    /*
                     * 将当前线程名置为B, 然后通知ThreadB执行
                     */
                        currentThreadName = ‘B‘;
                        lock.notifyAll();
                    }
                }
            }
        }

    }

    private static class ThreadB implements Runnable {
        public void run() {
            for (int i = 0; i < 10; i++) {
                synchronized (lock){
                    {
                        while (currentThreadName != ‘B‘) {
                            try {
                            /*
                             * 如果当前线程名字不是A,那么ThreadA就处理等待状态
                             */
                                lock.wait();
                            } catch (InterruptedException e) {

                            }
                        }
                        System.out.println("B");

                    /*
                     * 将当前线程名置为B, 然后通知ThreadB执行
                     */
                        currentThreadName = ‘C‘;
                        lock.notifyAll();
                    }
                }
            }
        }

    }

    private static class ThreadC implements Runnable {
        public void run() {
            for (int i = 0; i < 10; i++) {
                synchronized (lock){
                    {
                        while (currentThreadName != ‘C‘) {
                            try {
                            /*
                             * 如果当前线程名字不是A,那么ThreadA就处理等待状态
                             */
                                lock.wait();
                            } catch (InterruptedException e) {

                            }
                        }
                        System.out.println("C");

                    /*
                     * 将当前线程名置为B, 然后通知ThreadB执行
                     */
                        currentThreadName = ‘A‘;
                        lock.notifyAll();
                    }
                }
            }
        }

    }

    public static void main(String[] args) {
        ExecutorService service = Executors.newFixedThreadPool(3);
        service.execute(new ThreadA());
        service.execute(new ThreadB());
        service.execute(new ThreadC());
//        new Thread(new Task("C")).start();
    }
}

3.2 使用Condition的例子(代码来自http://mouselearnjava.iteye.com/blog/1948437)

package my.thread.test;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Logger;

/**
 * 题目:有三个线程分别打印A、B、C,请用多线程编程实现,在屏幕上循环打印10次ABCABC…
 * 
 * 本程序采用Lock和Condition来实现。
 * 
 * @author Eric
 * 
 */
public class ConditionExample {

    private Lock lock = new ReentrantLock();

    private Condition conditionA = lock.newCondition();
    private Condition conditionB = lock.newCondition();
    private Condition conditionC = lock.newCondition();

    /** 当前线程的名字 */
    private char currentThreadName = ‘A‘;

    private static final Logger logger = Logger
            .getLogger("my.thread.test.OrderPrintTest");

    public static void main(String[] args) {

        ConditionExample ce = new ConditionExample();

        ExecutorService service = Executors.newFixedThreadPool(3);
        service.execute(ce.new ThreadA());
        service.execute(ce.new ThreadB());
        service.execute(ce.new ThreadC());

        service.shutdown();
    }

    private class ThreadA implements Runnable {
        public void run() {

            for (int i = 0; i < 10; i++) {
                lock.lock();
                try {
                    while (currentThreadName != ‘A‘) {
                        try {
                            /*
                             * 如果当前线程名字不是A,那么ThreadA就处理等待状态
                             */
                            conditionA.await();
                        } catch (InterruptedException e) {
                            logger.severe(e.getLocalizedMessage());
                        }
                    }

                    /*
                     * 打印出第几遍以及A信息
                     */
                    System.out.println(String.format("第%d遍", i + 1));
                    System.out.println("A");

                    /*
                     * 将当前线程名置为B, 然后通知ThreadB执行
                     */
                    currentThreadName = ‘B‘;
                    conditionB.signal();

                } finally {
                    lock.unlock();
                }
            }
        }

    }

    private class ThreadB implements Runnable {
        public void run() {
            for (int i = 0; i < 10; i++) {
                lock.lock();
                try {
                    while (currentThreadName != ‘B‘) {
                        try {
                            /*
                             * 如果当前线程名字不是B,那么ThreadB就处理等待状态
                             */
                            conditionB.await();
                        } catch (InterruptedException e) {
                            logger.severe(e.getLocalizedMessage());
                        }
                    }

                    /*
                     * 打印信息B
                     */
                    System.out.println("B");

                    /*
                     * 将当前线程值置为C 并通过ThreadC来执行
                     */
                    currentThreadName = ‘C‘;
                    conditionC.signal();

                } finally {
                    lock.unlock();
                }
            }

        }

    }

    private class ThreadC implements Runnable {

        public void run() {
            for (int i = 0; i < 10; i++) {
                lock.lock();
                try {
                    while (currentThreadName != ‘C‘) {
                        try {
                            /*
                             * 如果当前线程名字不是C,那么ThreadC就处理等待状态
                             */
                            conditionC.await();
                        } catch (InterruptedException e) {
                            logger.severe(e.getLocalizedMessage());
                        }
                    }

                    /*
                     * 打印信息C
                     */
                    System.out.println("C");
                    System.out.println();

                    /*
                     * 将当前线程值置为A 并通过ThreadA来执行
                     */
                    currentThreadName = ‘A‘;
                    conditionA.signal();

                } finally {
                    lock.unlock();
                }

            }
        }
    }
}

个人认为:使用Condition:一个Condition,对应一个竞态条件,实现起来很简单,可以实现针对性通知。额外Condition提供了一些额外功能,如awaitUninterruptibly

中文API:http://www.cjsdn.net/Doc/JDK60/java/util/concurrent/locks/Condition.html

资源

http://www.ticmy.com/?p=219

http://www.cnblogs.com/dolphin0520/p/3920385.html

时间: 2024-10-25 02:25:20

线程基础知识系列(四)线程的同步2 线程通信和Condition变量的相关文章

线程基础知识系列(五)认识volatile

线程基础知识系列(四)线程的同步2  :线程的notify-wait通信机制,以及Condition条件变量 线程基础知识系列(三)线程的同步  :同步控制,锁及synchronized 线程基础知识系列(二)线程的管理 :线程的状态,控制,休眠,Interrupt,yield等 线程基础知识系列(一)线程的创建和启动  :线程的创建和启动,join(),daemon线程,Callable任务. 本篇文章主要讨论的关键字是volatile. volatile使用场景 volatile介绍 vol

线程基础知识系列(三)线程的同步

本文是系列的第三篇,前面2篇,主要是针对单个线程如何管理,启动等,没有过多涉及多个线程是如何协同工作的. 线程基础知识系列(二)线程的管理 :线程的状态,控制,休眠,Interrupt,yield等 线程基础知识系列(一)线程的创建和启动  :线程的创建和启动,join(),daemon线程,Callable任务. 本文的主要内容 何谓线程安全? 何谓共享可变变量? 认识synchronized关键字 认识Lock synchronized vs Lock 1.何谓线程安全 多线程是把双刃剑,带

线程基础知识系列(二)线程的管理

本篇是线程基础知识系列的第二篇,主要简单江夏线程管理相关知识点. 线程基础知识系列(一)线程的创建和启动:说明了线程的2种创建和启动,join(),daemon线程,Callable 任务. 本文的主要内容 线程的状态 线程的优先级 sleep vs wait 线程的流程控制 Interrupt yield让出你的CPU 1.线程的状态 以<线程基础知识系列(一)线程的创建和启动>这张图,是程序的运行时线程信息截图.有main线程,user Threads,daemon Threads.现在咱

C#基础知识系列四(运算符汇总)

本文来自:http://www.cnblogs.com/aehyok/p/3504995.html 前言  本节主要来讲C#中的各种运算符.主要包括is运算符.as运算符.checked和unchecked运算符.sizeof运算符.空接合运算符(??).&和&&.移位运算符.增量和减量运算符.条件运算符(三元运算符).命名空间别名限定符. 正文  1.is运算符 is运算符可以检查对象是否与特定的类型兼容.比如下例中要检查变量是否与object类型兼容: int i=0; if(

线程基础知识

什么是线程: 在一个程序里的一个执行路线就叫做线程(thread).更准确的定义是:线程是"一个进程内部的控制序列" 一切进程至少都有一个执行线程 进程与线程 进程是资源竞争的基本单位 线程是程序执行的最小单位 线程共享进程数据,但也拥有自己的一部分数据 线程ID 一组寄存器 栈 errno 信号状态 优先级 fork和创建新线程的区别 当一个进程执行一个fork调用的时候,会创建出进程的一个新拷贝,新进程将拥有它自己的变量和它自己的PID.这个新进程的运行时间是独立的,它在执行时几乎

IM开发基础知识补课(四):正确理解HTTP短连接中的Cookie、Session和Token

本文引用了简书作者“骑小猪看流星”技术文章“Cookie.Session.Token那点事儿”的部分内容,感谢原作者. 1.前言 众所周之,IM是个典型的快速数据流交换系统,当今主流IM系统(尤其移动端IM)的数据流交换方式都是Http短连接+TCP或UDP长连接来实现.Http短连接主要用于从服务器读取各种持久化信息:比如用户信息.聊天历史记录.好友列表等等,长连接则是用于实时的聊天消息或指令的接收和发送. 作为IM系统中不可或缺的技术,Http短连的重要性无可替代,但Http作为传统互联网信

HTTP基础知识(四)

超链接:HTTP基础知识(一) HTTP基础知识(二) HTTP基础知识(三) HTTP基础知识(四) 四.返回结果的HTTP状态码 1.状态码的职责:当客户端向服务端发送请求时,描述返回的请求结果. 2.状态码的类别: 3.2XX系列状态码(成功) (1)200 OK 此状态码表示从客户端发来的请求在服务端被正常处理了. 当成功进入百度网时,状态码就会显示200 (2)204 No Content 此状态码表示服务器接受的请求已成功处理,但在返回的响应报文中不含实体的主体部分. 一般在只需要从

JAVA基础知识系列---线程与进程

进程与线程 进程:每个进程都有独立的代码和数据空间(进程上下文),进程间的切换会有较大的开销,一个进程包含1--n个线程. 线程:同一类线程共享代码和数据空间,每个线程有独立的运行栈和程序计数器(PC),线程切换开销小. 1.1多线程与多进程 多进程是指操作系统能同时运行多个任务(程序). 多线程是指在同一程序中有多个顺序流在执行. 1.2线程的状态转换 1.新状态:线程对象已经创建,还没有在其上调用start()方法. 2.可运行状态:当线程有资格运行,但调度程序还没有把它选定为运行线程时线程

Java__线程---基础知识全面实战---坦克大战系列为例

今天想将自己去年自己编写的坦克大战的代码与大家分享一下,主要面向学习过java但对java运用并不是很熟悉的同学,该编程代码基本上涉及了java基础知识的各个方面,大家可以通过练习该程序对自己的java进行一下实战. 每个程序版本代码中,都附有相关注释,看完注释大家就可以对本程序设计有个很明显的思路.真的很有趣,想对java重新温习的同学完全不必再对厚厚的java基础书籍进行阅读了,在跟着本次代码练习并分析后,大家一定会对java各方面基础知识 尤其是线程的知识有更深一步的了解!!! 本次坦克大