漫谈并发编程(五):线程之间的协作

编写多线程程序需要进行线程协作,前面介绍的利用互斥来防止线程竞速是来解决线程协作的衍生危害的。编写线程协作程序的关键是解决线程之间的协调问题,在这些任务中,某些可以并行执行,但是某些步骤需要所有的任务都结束之后才能开动。

wait()与notifyAll()

wait()使你可以等待某个条件发生变化,wait()会在等待外部世界产生变化的时候将任务挂起,并且只有在notify()或notifyAll()发生时,即表示发生了某些感兴趣的事物,这个任务才会被唤醒并去检查所产生的变化。

调用sleep()的时候锁并没有被释放,调用yield()也属于这种情况,理解这一点很关键。另一方面,当一个任务在方法里遇到了对wait()的调用的时候,线程的执行被挂起,对象上的锁被释放。因此wait()释放锁,这就意味着另一个任务可以获得这个锁,因此在该对象中的其他synchronized方法可以在wait()期间被调用,而其他的方法通常将会产生改变,而这种改变正是使被挂起的任务重新唤醒所感兴趣的变化。

有两种形式的wait()。第一种版本接受毫秒数为参数,含义与sleep()方法里的参数的意思相同,都是指"在此期间暂停"。但是与sleep()不同的是,对于wait()而言:

  1. 在wait()期间对象锁是释放的
  2. 可以通过notify()、notifyAll(),或者令时间到期,从wait()中恢复执行。

第二种,也是更常见形式的wait()不接收任何参数。这种wait()将无限等待下去,直到线程接收到notify()或者notifyAll()消息。

可以想象,wait()、notify()、notifyAll()一定是基于某个"东西",把自身状态附加上去,来实现这种通知及状态的变化。考虑设计方式:1. 这种东西可以单独被定义出来。 2. 在Object中提供该"东西"的实现。  明显第二种方式要轻松方便许多,迁移性更强。其次,这种东西可能不是线程安全的,所以需要锁来支持。使用synchronized来进行同步的保护是理所应当,因为"东西"的实现就在Object中,其次使用synchronized的好处是一定程度可以避免由于锁不一致的情况下产生的wait()及notifyAll的不对应,wait()在一把锁中释放了锁,和notifyAll在另一把锁进行操作毫无相关。

java要求只能在同步控制方法或同步控制块里调用wait()、notify()和notifyAll()。

下面演示一个例子,一个是将蜡涂到Car上,一个是抛光它。抛光任务在涂蜡任务完成之前,是不能执行其工作的,而涂蜡任务在涂另一层蜡之前,必须等待抛光任务完成。WaxOn和WaxOff都使用了Car对象,该对象在这些任务等待条件变化时候,使用wait()和notifyAll()来挂起和重新启动这些任务:

class Car {
     private boolean waxOn = false;
     public synchronized void waxed() {
          waxOn = true;
          notifyAll( );
     }
     public synchronized void buffed( ) {
          waxOn = false;
          notifyAll( );
     }
     public synchronized void waitForWaxing( )  throws InterruptedException{
          while(waxOn == false)
               wait( );
     }
     public synchronized void waitForBuffing( ) throws InterruptedException {
          while(waxOn == true)
               wait( );
     }
}

class WaxOn implements Runnable {
     private Car car;
     public WaxOn(Car c) { car = c;}
     public void run() {
          try {
               while(!Thread.interrupted()) {
                    System.out.print(" Wax on!");
                    TimeUnit.MILLISECONDS.sleep(200);
                    car.waxed();
                    car.waitForBuffing();
               }
          } catch (InterruptedException e) {
               System.out.println("Exiting via interrupt");
          }
          System.out.println("Ending Wax On task");
     }
}

class WaxOff implements Runnable {
     private Car car;
     public WaxOff(Car c) {car = c;}
     public void run( ) {
          try {
               while(!Thread.interrupted()) {
                    car.waitForWaxing();
                    System.out.print("Wax Off");
                    TimeUnit.MILLISECONDS.sleep(200);
                    car.buffed();
               }
          } catch(InterruptedException e) {
               System.out.println("Exiting via interrupt");
          }
          System.out.println("Ending Wax Off task");
     }
}

public class WaxOMatic {
     public static void main(String[] args) throws Exception{
          Car car = new Car();
          ExecutorService exec = Executors.newCachedThreadPool();
          exec.execute(new WaxOff(car));
          exec.execute(new WaxOn(car));
          TimeUnit.SECONDS.sleep(5);
          exec.shutdownNow();
     }
}

前面的示例强调你必须用一个检查感兴趣的条件的while循环包围wait()。这很重要,因为:

  • 你可能有多个任务出于相同的原因在等待一个锁,而第一个唤醒任务可能已经改变这种状况(即使你没有这么做,有人也会通过继承你的类去这么做)。如果属于这种情况,那么这个任务应该被再次挂起,直至其感兴趣的条件发生变化。
  • 也有可能某些任务处于不同的原因在等待你的对象上锁(在这种情况下必须使用(notifyAll))。在这种情况下,你需要检查是否已经由正确的原因唤醒,如果不是,就再次调用wait()。

notify()与notifyAll()

因为在技术上,可能会有多个任务在单个Car对象上处于wait()状态,因此调用notifyAll()比调用notify()要更安全。但是,上面程序的结构只会有一个任务处于wait()状态,因此你可以使用notify()来代替notifyAll()。

使用notify()而不是notifyAll()是一种优化。使用notify()时,在众多等待同一个锁的任务中只有一个会被唤醒,因此如果你希望使用notify()就必须保证被唤醒的是恰当的任务。另外,为了使用notify(),所有任务必须等待相同的条件,因为如果你有多个任务在等待不同的条件,那么你就不会知道是否唤醒的恰当的任务。如果使用notify(),当条件发生变化时,必须只有一个任务能从中受益。最后,这些限制对所有可能存在的子类都必须总是起作用的。如果这些规则中有任何一条不满足,那么你就必须使用notifyAll()而不是notify()。

用wait()和notifyAll()实现生产者消费者问题

使用wait()和notifyAll()时一定要注意不能两层嵌套synchronized,如果使用了两层,则外层的sycnhronized加的锁无法释放。而且需要注意的是不能使用Lock来限制资源的访问,因为wait时无法释放该锁。如果还要限制在notifyAll时不能notifyAll到同类,那么实现这个问题还是有难度的。

下面贴上一个自己一个粗陋的实现,各位朋友有漂亮代码的也可以贴上来交流下。

class Meal {
}

class WaitPerson implements Runnable {
     private String name;
     private Restaurant restaurant;

     public WaitPerson(String name, Restaurant res) {
          this.name = name;
          this.restaurant = res;
     }

     @Override
     public void run() {
          try {
               while (!Thread.interrupted()) {
                    synchronized (restaurant.waitPersons) {
                         while (restaurant.meals.size() < 1) {
                              restaurant.waitPersons.wait();
                         }
                    }
                    synchronized (restaurant.chefs) {
                         if (restaurant.meals.size() >= 1) {
                              restaurant.meals.poll();
                              restaurant.chefs.notifyAll();
                              System.out.println(name + " consumed a meal !");
                         }
                    }
               }
          } catch (InterruptedException e) {
               System.out.println(name + " is ended via InterruptedException !");
               return;
          }
          System.out.println(name + " is ended via InterruptedException !");
     }
}

class Chef implements Runnable {
     private String name;
     private Restaurant restaurant;

     public Chef(String name, Restaurant res) {
          this.name = name;
          this.restaurant = res;
     }

     @Override
     public void run() {
          try {
               while (!Thread.interrupted()) {
                    synchronized (restaurant.chefs) {
                         while (restaurant.meals.size() > 10) {
                         restaurant.chefs.wait();
                    }
               }
                    synchronized (restaurant.waitPersons) {
                         if (restaurant.meals.size() <= 10) {
                              restaurant.meals.add(new Meal());
                              restaurant.waitPersons.notifyAll();
                              System.out.println(name + " produced a meal !");
                         }
                    }
               }
          } catch (InterruptedException e) {
               System.out.println(name + " is ended via InterruptedException !");
               return;
          }
          System.out.println(name + " is ended via InterruptedException !");
     }
}

public class Restaurant {
     public Queue<Meal> meals = new ConcurrentLinkedQueue<Meal>();
     public List<WaitPerson> waitPersons = new ArrayList<WaitPerson>();
     public List<Chef> chefs = new ArrayList<Chef>();

     public static void main(String[] args) throws InterruptedException {
          Restaurant res = new Restaurant();
          ExecutorService exec = Executors.newCachedThreadPool();
          Chef chef1 = new Chef("chef1", res);
          Chef chef2 = new Chef("chef2", res);
          res.chefs.add(chef1);
          res.chefs.add(chef2);
          exec.execute(chef1);
          exec.execute(chef2);
          WaitPerson waitPerson1 = new WaitPerson("waitPerson1", res);
          WaitPerson waitPerson2 = new WaitPerson("waitPerson2", res);
          res.waitPersons.add(waitPerson1);
          res.waitPersons.add(waitPerson2);
          exec.execute(waitPerson1);
          exec.execute(waitPerson2);
          // TimeUnit.MILLISECONDS.sleep(3000);
          // exec.shutdownNow();
     }
}

上面这个程序可以证明出来是线程安全的。不过使用这种方式实在是太晦涩了,生产者消费者问题的机制需要我们去控制,实际上,java并发类库为我们提供了这种模型的实现,我们待会会用阻塞队列来重写这个问题。

使用显式的Lock和Condition对象

我们可以显式的使用Condition对象来替代我前面提到的"东西",使用这种方式将更加灵活,且有更清晰的辩识度,但会增加程序中对象的数量。你可以通过在Condition上调用await()来挂起一个任务。当外部条件发生变化,意味着某个任务应该继续执行时,你可以通过调用signal()来通知这个任务,从而唤醒一个任务,或者调用signalAll()来唤醒所有在这个Condition上被其自身挂起的任务。

下面我们利用此工具重写前面例子中的Car类。

class Car {
     private boolean waxOn = false;
     private Lock lock = new ReentrantLock();
     private Condition condition = lock.newCondition();
     public  void waxed() {
          lock.lock();
          try {
               waxOn = true;
               condition.signalAll();
          } finally {
               lock.unlock();
          }
     }
     public void buffed( ) {
          lock.lock();
          try {
               waxOn = false;
               condition.signalAll();
          } finally {
               lock.unlock();
          }
     }
     public void waitForWaxing( )  throws InterruptedException{
          lock.lock();
          try{
               while(waxOn == false)
               condition.await();
          } finally {
               lock.unlock();
          }
     }
     public  void waitForBuffing( ) throws InterruptedException {
          lock.lock();
          try {
               while(waxOn == true)
               condition.await( );
          } finally {
               lock.unlock();
          }
     }
}

使用BlockingQueue来解决生产者消费者问题

java帮我们抽象了生产者消费者问题,我们可以使用同步队列来解决任务协作的问题,同步队列在任何时刻都只允许一个任务插入或移除元素。在java.util.concurrent.BlockingQueue接口中提供了这个队列,这个接口有大量的实现。你通常可以使用LinkedBlockingQueue,它是一个无界队列,还可以使用ArrayBlockingQueue,它具有固定的尺寸,因此你可以在它被阻塞之前,向其中放置有限数量的元素。

如果消费者任务试图从队列中获取对象,而该队列此时为空,那么这些队列还可以挂起消费者任务,并且当有更多的元素可用时恢复消费者任务。阻塞队列可以解决非常大量的问题,而其方式与wait()和notifyAll()相比,则简单并可靠太多。

下面利用阻塞队列实现了上面的餐厅问题。

class Meal {
}

class WaitPerson implements Runnable {
     private String name;
     private RestaurantBlookingQueue restaurant;
     public WaitPerson(String name, RestaurantBlookingQueue res) {
          this.name = name;
          this.restaurant = res;
     }

     @Override
     public void run() {
          try {
               while (!Thread.interrupted()) {
                    restaurant.meals.take();
                    System.out.println(name + "taked a Meal");
                    Thread.sleep(100);
               }
          } catch (InterruptedException e) {
               System.out.println(name + " is ended via InterruptedException !");
               return;
          }
          System.out.println(name + " is ended via InterruptedException !");
     }
}

class Chef implements Runnable {
     private String name;
     private RestaurantBlookingQueue restaurant;

     public Chef(String name, RestaurantBlookingQueue res) {
          this.name = name;
          this.restaurant = res;
     }

     @Override
     public void run() {
          try {
               while (!Thread.interrupted()) {
                    restaurant.meals.put(new Meal());
                    System.out.println(this.name + "made a meal");
                    Thread.sleep(100);
               }
          } catch (InterruptedException e) {
               System.out.println(name + " is ended via InterruptedException !");
               return;
          }
          System.out.println(name + " is ended via InterruptedException !");
     }
}

public class RestaurantBlookingQueue {
     public BlockingQueue<Meal> meals = new ArrayBlockingQueue<Meal>(10);
     public List<WaitPerson> waitPersons = new ArrayList<WaitPerson>();
     public List<Chef> chefs = new ArrayList<Chef>();

     public static void main(String[] args) throws InterruptedException {
          RestaurantBlookingQueue res = new RestaurantBlookingQueue();
          ExecutorService exec = Executors.newCachedThreadPool();
          Chef chef1 = new Chef("chef1", res);
          Chef chef2 = new Chef("chef2", res);
          res.chefs.add(chef1);
          res.chefs.add(chef2);
          exec.execute(chef1);
          exec.execute(chef2);
          WaitPerson waitPerson1 = new WaitPerson("waitPerson1", res);
          WaitPerson waitPerson2 = new WaitPerson("waitPerson2", res);
          res.waitPersons.add(waitPerson1);
          res.waitPersons.add(waitPerson2);
          exec.execute(waitPerson1);
          exec.execute(waitPerson2);

     // TimeUnit.MILLISECONDS.sleep(3000);
     // exec.shutdownNow();
     }
}

任务间使用管道进行输入/输出

通过输入/输出在线程间进行通信通常很有用。提供线程功能的类库以"管道"的形式对线程的输入/输出提供了支持。它们在Java输入/输出类库中的对应物就是PipedWriter类(允许任务向管道写)和PipedReader类(允许不同任务从同一个管道读)。这个模型可以看成是"生产者-消费者"问题的变体。管道基本是一个阻塞队列,存在于多个引入BlookingQueue之前的Java版本。

class Sender implements Runnable {
     private Random rand = new Random(47);
     private PipedWriter out = new PipedWriter();
     public PipedWriter getPipedWriter( ) {return out;}
     public void run( ) {
          try {
               while(true) {
                    for(char c = 'A' ; c <= 'z'; c++) {
                         out.write(c);
                         TimeUnit.MILLISECONDS.sleep( rand.nextInt(500));
                    }
               }
          } catch (IOException e) {
               System.out.println(e + " Sender write exception");
          } catch (InterruptedException e) {
               System.out.println(e + " Sender sleep exception");
          }
     }
}

class Receiver implements Runnable {
     private PipedReader in;
     public Receiver(Sender sender) throws IOException {
          in = new PipedReader(sender.getPipedWriter());
     }
     public void run( ) {
          try {
               while(true) {
                    System.out.print("Read: "+(char)in.read() + ", ");
               }
          } catch (IOException e) {
               System.out.println(e + " Receiver read exception");
          }
     }
}

public class PipedIO {
     public static void main(String []args) throws Exception {
          Sender sender = new Sender( );
          Receiver receiver = new Receiver( sender );
          ExecutorService exec = Executors.newCachedThreadPool();
          exec.execute(sender);
          exec.execute(receiver);
          TimeUnit.SECONDS.sleep( 4 );
          exec.shutdownNow();
     }
}

时间: 2024-08-23 23:51:36

漫谈并发编程(五):线程之间的协作的相关文章

Java 并发编程:线程间的协作(wait/notify/sleep/yield/join)

Java并发编程系列[未完]: Java 并发编程:核心理论 Java并发编程:Synchronized及其实现原理 Java并发编程:Synchronized底层优化(轻量级锁.偏向锁) Java 并发编程:线程间的协作(wait/notify/sleep/yield/join) 一.线程的状态 Java中线程中状态可分为五种:New(新建状态),Runnable(就绪状态),Running(运行状态),Blocked(阻塞状态),Dead(死亡状态). New:新建状态,当线程创建完成时为新

java并发系列(二)-----线程之间的协作(wait、notify、join、CountDownLatch、CyclicBarrier)

在java中,线程之间的切换是由操作系统说了算的,操作系统会给每个线程分配一个时间片,在时间片到期之后,线程让出cpu资源,由其他线程一起抢夺,那么如果开发想自己去在一定程度上(因为没办法100%控制它)让线程之间互相协作.通信,有哪些方式呢? wait.notify.notifyAll 1.void wait( ) 导致当前的线程等待,直到其他线程调用此对象的notify( ) 方法或 notifyAll( ) 方法 2.void wait(long timeout) 导致当前的线程等待,直到

并发编程之线程共享和协作(一)

更多Android架构进阶视频学习请点击:https://space.bilibili.com/474380680本篇文章将从以下几个内容来阐述线程共享和协作: [基础概念之CPU核心数.线程数,时间片轮转机制解读][线程之间的共享][线程间的协作] 一.基础概念 CPU核心数.线程数两者的关系:cpu的核心数与线程数是1:1的关系,例如一个8核的cpu,支持8个线程同时运行.但在intel引入超线程技术以后,cpu与线程数的关系就变成了1:2.此外在开发过程中并没感觉到线程的限制,那是因为cp

19、Java并发编程:线程间协作的两种方式:wait、notify、notifyAll和Condition

Java并发编程:线程间协作的两种方式:wait.notify.notifyAll和Condition 在前面我们将了很多关于同步的问题,然而在现实中,需要线程之间的协作.比如说最经典的生产者-消费者模型:当队列满时,生产者需要等待队列有空间才能继续往里面放入商品,而在等待的期间内,生产者必须释放对临界资源(即队列)的占用权.因为生产者如果不释放对临界资源的占用权,那么消费者就无法消费队列中的商品,就不会让队列有空间,那么生产者就会一直无限等待下去.因此,一般情况下,当队列满时,会让生产者交出对

Java并发编程:线程间协作的两种方式:wait、notify、notifyAll和Condition

在前面我们将了很多关于同步的问题,然而在现实中,需要线程之间的协作.比如说最经典的生产者-消费者模型:当队列满时,生产者需要等待队列有空间才能继续往里面放入商品,而在等待的期间内,生产者必须释放对临界资源(即队列)的占用权.因为生产者如果不释放对临界资源的占用权,那么消费者就无法消费队列中的商品,就不会让队列有空间,那么生产者就会一直无限等待下去.因此,一般情况下,当队列满时,会让生产者交出对临界资源的占用权,并进入挂起状态.然后等待消费者消费了商品,然后消费者通知生产者队列有空间了.同样地,当

第十五章、并发编程之线程

目录 第十五章.并发编程之线程 1.什么是线程 2. 进程和线程的区别 3. 开启线程的两种方式 函数开启 类开启 4.子线程与子进程创建速度 5.子线程共享数据的证明 6.线程的join方法 单个子线程 多个子线程 思考 7.了解进程的join 8. 线程的其他相关用法 第十五章.并发编程之线程 1.什么是线程 纠正概念:进程其实不是个执行单位,进程是一个资源单位,每个进程内自带一个线程,线程才是cpu上的执行单位 抽象理解: 进程是指在系统中正在运行的一个应用程序:线程是系统分配处理器时间资

漫谈并发编程(一) - 并发简介

并发编程是每个程序员进阶的必修之课,想写一个安全稳定,性能强劲的并发程序可没那么容易.我将在未来的日子里,与大家分享一个并发小白成长路上的所思所想.并发编程的思想是通的,但是例子得要是具现的,在该系列中将使用java语言用以演示. 此文作为为漫谈并发编程系列的第一篇,由于本人喜欢先论理再论事,而非先论事再论理,所以就以一篇对并发的文字描述开头了. 并发编程由来 早年的计算机中没有操作系统,在某个时间段内只支持运行一个程序,并且这个程序能访问计算机的所有资源.在这个程序完全执行完后,再执行下一个程

漫谈并发编程(一) - 并发简单介绍

并发编程是每一个程序猿进阶的必修之课,想写一个安全稳定,性能强劲的并发程序可没那么easy.我将在未来的日子里,与大家分享一个并发小白成长路上的所思所想.并发编程的思想是通的,可是样例得要是具现的,在该系列中将使用java语言用以演示. 此文作为为漫谈并发编程系列的第一篇,探本溯源,以一篇对并发的文字描写叙述开头. 并发编程由来 早年的计算机中没有操作系统,在某个时间段内仅仅支持运行一个程序,而且这个程序能訪问计算机的全部资源.在这个程序全然运行完后,再运行下一个程序. 在此时,引入并发编程的优

【转】Java并发编程:线程池的使用

Java并发编程:线程池的使用 在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题: 如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间. 那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务? 在Java中可以通过线程池来达到这样的效果.今天我们就来详细讲解一下Java的线程池,首先我们从最核心的ThreadPool