Exchanger有两个用户,当一(A)方调用exchange方法之后,就开始等待,直到另一(B)方开始调用exchange方法。两个exchange可以认为是原子性的。
public class Core { public static void main(String[] args) { List<String> buffer1 = new ArrayList<>(); List<String> buffer2 = new ArrayList<>(); Exchanger<List<String>> exchanger = new Exchanger<>(); Producer producer = new Producer(buffer1, exchanger); Consumer consumer = new Consumer(buffer2, exchanger); Thread threadProducer = new Thread(producer); Thread threadConsumer = new Thread(consumer); threadProducer.start(); threadConsumer.start(); } }
public class Producer implements Runnable { private List<String> buffer; private final Exchanger<List<String>> exchanger; public Producer(List<String> buffer, Exchanger<List<String>> exchanger) { this.buffer = buffer; this.exchanger = exchanger; } @Override public void run() { int cycle = 1; for (int i = 0; i < 19; i++) { System.out.printf("Producer: Cycle %d\n", cycle); for (int j = 0; j < 8; j++) { String message = "Event " + ((i * 19) + j); System.out.printf("Producer: %s\n", message); buffer.add(message); } try { buffer = exchanger.exchange(buffer); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Producer: " + buffer.size()); cycle++; } } }
public class Consumer implements Runnable { private List<String> buffer; private final Exchanger<List<String>> exchanger; public Consumer(List<String> buffer, Exchanger<List<String>> exchanger) { this.buffer = buffer; this.exchanger = exchanger; } @Override public void run() { int cycle = 1; for (int i = 0; i < 19; i++) { System.out.printf("Consumer: Cycle %d\n", cycle); try { buffer = exchanger.exchange(buffer); } catch (InterruptedException e) { e.printStackTrace(); } System.out.printf("Consumer: Cycle %d, total %d\n ", cycle, buffer.size()); for (int j = 0; j < 8; j++) { String message = buffer.get(0); System.out.printf("Consumer: Cycle %d %s\n", cycle, message); buffer.remove(0); } cycle++; } } }
时间: 2024-10-06 05:37:39