Java中提供多种方式协调线程的工作。
CountDownLatch:当多个也就是具体的数量等于CountDownLatch初始化参数的时候。线程达到了预期状态或者完成了预期工作时触发事件,其他线程可以等待这个事件来触发自己的后续工作。等待的线程是多个。达到了预期状态的线程会调用CountDownLatch的countDown方法。等待的线程会调用CountDownLatch的await方法。
import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class CountDownLatchDemo { static ThreadPoolExecutor tp = new ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(5) ); public static void main(String[] args) throws InterruptedException { int count = 10; final CountDownLatch latch = new CountDownLatch(count); int[] datas = new int[10240]; int step = datas.length / count; for (int i=0; i<count; i++) { int start = i * step; int end = (i+1) * step; if (i == count -1) end = datas.length; tp.execute(new MyRunable(latch, datas, start, end)); } latch.await(); } }
import java.util.concurrent.CountDownLatch; public class MyRunable implements Runnable { CountDownLatch latch; int[] datas; int start; int end; public CountDownLatch getLatch() { return latch; } public void setLatch(CountDownLatch latch) { this.latch = latch; } public int[] getDatas() { return datas; } public void setDatas(int[] datas) { this.datas = datas; } public int getStart() { return start; } public void setStart(int start) { this.start = start; } public int getEnd() { return end; } public void setEnd(int end) { this.end = end; } public MyRunable(CountDownLatch latch, int[] datas, int start, int end) { this.latch = latch; this.datas = datas; this.start = start; this.end = end; } @Override public void run() { latch.countDown(); } }
CyclicBarrier是指循环屏障。它可以协同多个线程,让多个线程在这个屏障前等待。直到所有的线程都达到了这个屏障,再一起继续执行后面的工作。
import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class CyclicBarrierDemo { static ThreadPoolExecutor tp = new ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(5) ); public static void main(String[] args) throws InterruptedException { int count = 10; final CyclicBarrier latch = new CyclicBarrier(count); int[] datas = new int[10240]; int step = datas.length / count; for (int i=0; i<count; i++) { int start = i * step; int end = (i+1) * step; if (i == count -1) end = datas.length; tp.execute(new MyRunable1(latch, datas, start, end)); } try { latch.await(); } catch (BrokenBarrierException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
CyclicBarrier与CountDownLatch都用于线程之间的协调。两者之间的差别在于,CountDownLatch是在多个线程都进行了latch.countDown方法后,才会触发事件,唤醒await在latch上的线程。而执行countDown的线程,行完countDown后会继续自己的线程工作。CyclicBarrier用于同步所有调用await方法的线程,并且等待所有线程都到await方法时,这些线程才继续各自的工作。
Exchanger
不同线程之间的数据交换用Exchanger。当一个线程的方法执行调用exchanger.exchange的时候,会等待其他线程也执行到调用这个方法。然后交换彼此之间的数据
import java.util.ArrayList; import java.util.List; import java.util.concurrent.Exchanger; public class ExchangerDemo { public static void main(String[] args) { final Exchanger<List<Integer>> exchanger = new Exchanger<List<Integer>>(); new Thread(){ public void run(){ List<Integer> list = new ArrayList<Integer>(2); list.add(1); list.add(2); try { list = exchanger.exchange(list); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("thread1:" + list); } }.start(); new Thread(){ public void run(){ List<Integer> list = new ArrayList<Integer>(3); list.add(3); list.add(4); list.add(5); try { list = exchanger.exchange(list); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("thread2:" + list); } }.start(); } }
时间: 2024-10-20 11:35:49