我们在多线程开发中,可能会出现这种情况。就是一个线程需要另外一个线程满足某某条件才能继续运行,或者需
要其他线程满足好几个条件才能运行,对于这样的多条件的多线程并发,我们如何控制好各个线程之间的关系,使他们
能很好的处理冲突不至于相互出现问题呢,下面我们来介绍一下Java提供的Condition这个接口,这个接口很好的实现了
这种需求。
对于这个问题最经典的例子就是生产者消费者模型,生产者当缓冲区满的时候不生产商品知道缓冲区有空余,消费
者当缓冲区为0 的时候不拿商品,直到生产者向缓冲区放入商品,下面我们使用Conditon这个接口来实现这样的需求。
package com.bird.concursey.charpet4; /** * First, let's implement a class that will simulate a text file. Create a class named FileMock with two attributes: a String array named content and int named index. They will store the content of the file and the line of the simulated file that will be retrieved. * @author bird * 2014年9月21日 下午6:55:31 */ public class FileMock { private String content[]; private int index; /** * Implement the constructor of the class that initializes the content of the file with random characters. * @param size * @param length */ public FileMock(int size, int length) { content = new String[size]; for(int i = 0; i < size; i++) { StringBuilder buffer = new StringBuilder(length); for(int j = 0; j < length; j++) { int indice = (int) (Math.random() * 255); buffer.append((char)indice); } content[i] = buffer.toString(); } index = 0; } /** * Implement the method hasMoreLines() that returns true if the file has more lines to process or false if we have achieved the end of the simulated file. * @return */ public boolean hasMoreLines() { return index < content.length; } /** * Implement the method getLine() that returns the line determined by the index attribute and increases its value. * @return */ public String getLine() { if(this.hasMoreLines()) { System.out.println("Mock: " + (content.length - index)); return content[index++]; } return null; } }
package com.bird.concursey.charpet4; import java.util.LinkedList; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; /** * implement a class named Buffer that will implement the buffer shared by * producers and consumers. * * @author bird 2014年9月21日 下午6:58:13 */ public class Buffer { private LinkedList<String> buffer; private int maxSize; private ReentrantLock lock; private Condition lines; private Condition space; private boolean pendingLines; /** * Implement the constructor of the class. It initializes all the attributes described previously. * @param maxSize */ public Buffer(int maxSize) { this.maxSize = maxSize; buffer = new LinkedList<String>(); lock = new ReentrantLock(); lines = lock.newCondition(); space = lock.newCondition(); pendingLines = true; } /** * Implement the insert() method. It receives String as a parameter and tries to store it in the buffer. First, it gets the control of the lock. When it has it, it then checks if there is empty space in the buffer. If the buffer is full, it calls the await() method in the space condition to wait for free space. The thread will be woken up when another thread calls the signal() or signalAll() method in the space Condition. When that happens, the thread stores the line in the buffer and calls the signallAll() method over the lines condition. As we'll see in a moment, this condition will wake up all the threads that were waiting for lines in the buffer. * @param line */ public void insert(String line) { lock.lock(); try { while(buffer.size() == maxSize) { space.await(); } buffer.add(line); System.out.printf("%s: Inserted Line: %d\n", Thread.currentThread().getName(),buffer.size()); lines.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); }finally{ lock.unlock(); } } /** * Implement the get() method. It returns the first string stored in the buffer. First, it gets the control of the lock. When it has it, it checks if there are lines in the buffer. If the buffer is empty, it calls the await() method in the lines condition to wait for lines in the buffer. This thread will be woken up when another thread calls the signal() or signalAll() method in the lines condition. When it happens, the method gets the first line in the buffer, calls the signalAll() method over the space condition and returns String. * @return */ public String get() { String line = null; lock.lock(); try { while((buffer.size() == 0) && (hasPendingLines())) { lines.await(); } if(hasPendingLines()) { line = buffer.poll(); System.out.printf("%s: Line Readed: %d\n",Thread.currentThread().getName(),buffer.size()); space.signalAll(); } } catch (InterruptedException e) { e.printStackTrace(); }finally{ lock.unlock(); } return line; } /** * Implement the setPendingLines() method that establishes the value of the attribute pendingLines. It will be called by the producer when it has no more lines to produce. * @param pendingLines */ public void setPendingLines(boolean pendingLines) { this.pendingLines=pendingLines; } /** * Implement the hasPendingLines() method. It returns true if there are more lines to be processed, or false otherwise. * @return */ public boolean hasPendingLines() { return pendingLines || buffer.size() > 0; } }
package com.bird.concursey.charpet4; public class Producer implements Runnable { private FileMock mock; private Buffer buffer; public Producer(FileMock mock, Buffer buffer) { this.mock = mock; this.buffer = buffer; } /** * Implement the run() method that reads all the lines created in the FileMock object and uses the insert() method to store them in the buffer. Once it finishes, use the setPendingLines() method to alert the buffer that it's not going to generate more lines. */ @Override public void run() { buffer.setPendingLines(true); while(mock.hasMoreLines()) { String line = mock.getLine(); buffer.insert(line); } buffer.setPendingLines(false); } }
package com.bird.concursey.charpet4; import java.util.Random; public class Consumer implements Runnable { private Buffer buffer; public Consumer(Buffer buffer) { this.buffer = buffer; } /** * Implement the run() method. While the buffer has pending lines, it tries to get one and process it. */ @Override public void run() { while(buffer.hasPendingLines()) { String line = buffer.get(); processLine(line); } } /** * Implement the auxiliary method processLine(). It only sleeps for 10 milliseconds to simulate some kind of processing with the line. * @param line */ private void processLine(String line) { Random random = new Random(); try { Thread.sleep(random.nextInt(100)); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) { FileMock fileMock = new FileMock(100, 10); Buffer buffer = new Buffer(20); Producer producer = new Producer(fileMock, buffer); Thread threadProducer = new Thread(producer, "Producer"); Consumer consumers[] = new Consumer[3]; Thread threadConsumers[] = new Thread[3]; for(int i = 0; i < 3; i++) { consumers[i] = new Consumer(buffer); threadConsumers[i] = new Thread(consumers[i], "consumer " + i); } threadProducer.start(); for(int i = 0; i < 3; i++) { threadConsumers[i].start(); } } }
注意:
When a thread calls the await() method of a condition, it automatically frees the control of the lock, so that another thread can get it and begin the execution of the same, or another critical section protected by that lock.
时间: 2024-10-06 17:47:31