并发程序测试的要点
- 吞吐量
- 响应性
- 可伸缩性
正确性测试
首先需要一个可供测试的程序做为栗子。就是下面这个了。一个固定长度的 队列,其中定义可阻塞的put和take方法,并通过两个计数器进行控制。
import java.util.concurrent.Semaphore; public class BoundedBuffer<E> { private final Semaphore availableItems, availableSpaces; private final E[] Items; private int putPosition = 0, takePosition = 0; public BoundedBuffer(int capacity) { availableItems = new Semaphore(0); availableSpaces = new Semaphore(capacity); Items = (E[]) new Object[capacity]; } public boolean isEmpty() { return availableItems.availablePermits() == 0; } public boolean isFull() { return availableSpaces.availablePermits() == 0; } public void put(E x) throws InterruptedException { availableSpaces.acquire(); doInsert(x); availableItems.release(); } public E take() throws InterruptedException { availableItems.acquire(); E item = doExtra(); availableSpaces.release(); return item; } private synchronized void doInsert(E x) { // TODO Auto-generated method stub int i = putPosition; Items[i] = x; putPosition = (++i == Items.length) ? 0 : i; } private synchronized E doExtra() { int i = takePosition; E x = Items[i]; Items[i] = null; takePosition = (++i == Items.length) ? 0 : i; return x; } }
基本的单元测试
使用JUnit就可以了
import static org.junit.Assert.*; import org.junit.Test; public class BoundedBufferTest { @Test public void test() { testIsEmptyWhenConstructed(); try { testIsFullAfterPuts(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } void testIsEmptyWhenConstructed() { BoundedBuffer<Integer> bb = new BoundedBuffer<Integer>(10); assertTrue(bb.isEmpty()); assertFalse(bb.isFull()); } void testIsFullAfterPuts() throws InterruptedException { BoundedBuffer<Integer> bb = new BoundedBuffer<Integer>(10); for (int i = 0; i < 10; i++) { bb.put(i); } assertFalse(bb.isEmpty()); assertTrue(bb.isFull()); } }
主要是测试边界情况。
对阻塞操作的测试
测试代码尝试从空的缓存中take一个元素,如果能成功那么就测试失败了。然后再等待一段时间 ,再中断该线程。如果获取线程正确的在take中阻塞,那么将抛出interruptedException。捕获到异常的catch块将此试为测试成功并让线程退出,然后主测试线程尝试与获取线程合并,通过调用isAlive方法验证Join方法是否成功返回。如果获取线程可以中断线程,那么 join能很快完成。
void testTakeBlocksWhenEmpty() { final BoundedBuffer<Integer> bb = new BoundedBuffer<Integer>(10); Thread taker = new Thread() { @Override public void run() { // TODO Auto-generated method stub try { int unused = bb.take(); fail(); } catch (InterruptedException success) { } } }; try { taker.start(); Thread.sleep(LOCKUP_DETECT_TIMEOUT); taker.interrupt(); taker.join(LOCKUP_DETECT_TIMEOUT); assertFalse(taker.isAlive()); } catch (Exception unException) { fail(); } }
安全性测试
测试在数据竞争条件下是否会发生错误。这就需要一个并发的测试程序。或许比编写本身要测试的类更加困难。
通过一个对顺序敏感的校验和计算函数来计算 所有入列的元素以及出列元素的检验和,并进行比较。如果二者相等,那么测试就是成功的。如果只有一个生产者一个消费者,那么 这种方法能发挥最大的作用。因为它不仅能测试出是否取出了正确的元素,而且还能测试出元素被取出的顺序是否正确、
如果要将这种方法扩展到多生产者多消费者的情况时,就需要一个对元素入列和出列顺序不敏感的校验和函数。从而在测试程序运行完以后,可以将多个检验和以不同的顺序组合起来,如果不是这样,多个线程就需要访问 同一个共享的检验和变量 ,因此就需要同步,这将成为一个并发的瓶颈。
要确保测试程序能够正确地测试所有要点,就一定不能让编译器可以预先猜测到检验 和的值。那么会对许多 其他 的测试造成影响。由于大多数随机类生成器都是线程安全的。并且会带来额外的同步开销。所以还不如用一个简单的伪随机函数 。
static int XorShift(int y) { y ^= (y << 6); y ^= (y >>> 21); y ^= (y << 7); return y; }
参数用nanotime();我想用上面这个伪随机数的作用可能就是拖延一点时间 ,因为nanotime是获取纳秒级别的时间 。通过运算拖延一点时间 后,再产生的数据必然不一样。所以就基本达到了获得不同的数据的需求。
import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; import static org.junit.Assert.*; import org.junit.Test; public class PutTakeTest { private static final ExecutorService pool = Executors.newCachedThreadPool(); private final AtomicInteger putSum = new AtomicInteger(0); private final AtomicInteger takeSum = new AtomicInteger(0); private final CyclicBarrier barrier; private final BoundedBuffer<Integer> bb; private final int Ntrials, nPairs; public PutTakeTest(int capacity, int ntrials, int nPairs) { this.bb = new BoundedBuffer<Integer>(capacity); Ntrials = ntrials; this.nPairs = nPairs; this.barrier = new CyclicBarrier(nPairs * 2 + 1); } public static void main(String[] args) { new PutTakeTest(10, 100000, 10).test(); pool.shutdown(); } void test() { try { for (int i = 0; i < nPairs; i++) { pool.execute(new Producer()); pool.execute(new Consumer()); } barrier.await(); barrier.await(); assertEquals(putSum.get(), takeSum.get()); } catch (Exception e) { // TODO: handle exception throw new RuntimeException(e); } } class Producer implements Runnable { @Override public void run() { // TODO Auto-generated method stub try { int seed = (this.hashCode() ^ (int) System.nanoTime()); int sum = 0; barrier.await(); for (int i = Ntrials; i > 0; i--) { bb.put(seed); sum += seed; seed = XorShift(seed); } putSum.getAndAdd(sum); barrier.await(); } catch (Exception e) { // TODO: handle exception throw new RuntimeException(e); } } } class Consumer implements Runnable { @Override public void run() { // TODO Auto-generated method stub try { barrier.await(); int sum = 0; for (int i = Ntrials; i > 0; i--) { sum += bb.take(); } takeSum.getAndAdd(sum); barrier.await(); } catch (Exception e) { // TODO: handle exception throw new RuntimeException(e); } } } static int XorShift(int y) { y ^= (y << 6); y ^= (y >>> 21); y ^= (y << 7); return y; } }
这种测试应该放在多处理器的系统上运行。要最大程序地检测出一些对执行时序敏感的数据竞争,那么测试中的线程数量应该多于CPU数量,这样在任意时刻都会有一些线程在运行,而另一些被交换出去,从而可以检查线程是交替行为的可预测性。