java concurrent包介绍及使用

说一说java的concurrent包1-concurrent包简介

前面一个系列的文章都在围绕hash展开,今天准备先说下concurrent包,这个系列可能会以使用场景说明为主,concurrent包本身的代码分析可能比较少; 我在这方面的实践经验较为有限,有错误欢迎批评指正

不过前一个系列并未结束,还有一些文章没有放出来,欢迎关注核桃博客

concurrent包是jdk1.5引入的重要的包,主要代码由大牛Doug Lea完成,其实是在jdk1.4时代,由于java语言内置对多线程编程的支持比较基础和有限,所以他写了这个,因为实在太过于优秀,所以被加入到jdk之中;

通常所说的concurrent包基本有3个package组成

java.util.concurrent:提供大部分关于并发的接口和类,如BlockingQueue,Callable,ConcurrentHashMap,ExecutorService, Semaphore等

java.util.concurrent.atomic:提供所有原子操作的类, 如AtomicInteger, AtomicLong等;

java.util.concurrent.locks:提供锁相关的类, 如Lock, ReentrantLock, ReadWriteLock, Condition等;

concurrent包的优点:

1. 首先,功能非常丰富,诸如线程池(ThreadPoolExecutor),CountDownLatch等并发编程中需要的类已经有现成的实现,不需要自己去实现一套; 毕竟jdk1.4对多线程编程的主要支持几乎就只有Thread, Runnable,synchronized等

2. concurrent包里面的一些操作是基于硬件级别的CAS(compare and swap),就是在cpu级别提供了原子操作,简单的说就可以提供无阻塞、无锁定的算法; 而现代cpu大部分都是支持这样的算法的;

说一说java的concurrent包2-等待多个线程完成执行的CountDownLatch

前面一篇说了concurrent包的基本结构,接下来首先看一下一个非常有用的类,CountDownLatch, 可以用来在一个线程中等待多个线程完成任务的类;

前面一篇说了concurrent包的基本结构,接下来首先看一下一个非常有用的类,CountDownLatch, 可以用来在一个线程中等待多个线程完成任务的类;

通常的使用场景是,某个主线程接到一个任务,起了n个子线程去完成,但是主线程需要等待这n个子线程都完成任务了以后才开始执行某个操作;

下面是一段演示代码

Java代码

@Test  
public void demoCountDown()  
{  
    int count = 10;  
  
    final CountDownLatch l = new CountDownLatch(count);  
    for(int i = 0; i < count; ++i)  
    {  
        final int index = i;  
        new Thread(new Runnable() {  
  
            @Override  
            public void run() {  
  
                try {  
                    Thread.currentThread().sleep(20 * 1000);  
                } catch (InterruptedException e) {  
  
                    e.printStackTrace();  
                }  
  
                System.out.println("thread " + index + " has finished...");  
  
                l.countDown();  
  
            }  
        }).start();  
    }  
  
    try {  
        l.await();  
    } catch (InterruptedException e) {  
  
        e.printStackTrace();  
    }  
  
    System.out.println("now all threads have finished");  
  
}

运行的结果

thread 1 has finished...

thread 3 has finished...

thread 4 has finished...

thread 6 has finished...

thread 8 has finished...

thread 0 has finished...

thread 7 has finished...

thread 9 has finished...

thread 2 has finished...

thread 5 has finished...

now all threads have finished

前面10个线程的执行完成顺序会变化,但是最后一句始终会等待前面10个线程都完成之后才会执行

说一说java的concurrent包3-线程安全并且无阻塞的Atomic类

有了CountDownLatch,涉及到多线程同步的演示就比较容易了,接下来我们看下Atomic相关的类, 比如AtomicLong, AtomicInteger等这些;

有了CountDownLatch,涉及到多线程同步的演示就比较容易了,接下来我们看下Atomic相关的类, 比如AtomicLong, AtomicInteger等这些;

简单的说,这些类都是线程安全的,支持无阻塞无锁定的

Java代码

set()

get()

getAndSet()

getAndIncrement()

getAndDecrement()

getAndAdd()

等操作

下面是一个测试代码

Java代码

package com.hetaoblog.concurrent.test;  
  
import java.util.concurrent.CountDownLatch;  
import java.util.concurrent.atomic.AtomicLong;  
  
import org.junit.Test;  
/** 
 * 
 * by http://www.hetaoblog.com 
 * @author hetaoblog 
 * 
 */  
public class AtomicTest {  
  
    @Test  
    public void testAtomic()  
    {  
        final int loopcount = 10000;  
        int threadcount = 10;  
  
        final NonSafeSeq seq1 = new NonSafeSeq();  
        final SafeSeq seq2 = new SafeSeq();  
  
        final CountDownLatch l = new CountDownLatch(threadcount);  
  
        for(int i = 0; i < threadcount; ++i)  
        {  
            final int index = i;  
            new Thread(new Runnable() {  
  
                @Override  
                public void run() {  
                    for(int j = 0; j < loopcount; ++j)  
                    {  
  
                        seq1.inc();  
                        seq2.inc();  
                    }  
  
                    System.out.println("finished : " + index);  
                    l.countDown();  
  
                }  
            }).start();  
        }  
  
        try {  
            l.await();  
        } catch (InterruptedException e) {  
  
            e.printStackTrace();  
        }  
  
        System.out.println("both have finished....");  
  
        System.out.println("NonSafeSeq:" + seq1.get());  
        System.out.println("SafeSeq with atomic: " + seq2.get());  
  
    }  
}  
  
class NonSafeSeq{  
    private long count = 0;  
    public void inc()  
    {  
        count++;  
    }  
  
    public long  get()  
    {  
        return count;  
    }  
}  
  
class SafeSeq{  
    private AtomicLong count  = new AtomicLong(0);  
  
    public void inc()  
    {  
        count.incrementAndGet();  
    }  
  
    public long get()  
    {  
        return count.longValue();  
    }  
}

其中NonSafeSeq是作为对比的类,直接放一个private long count不是线程安全的,而SafeSeq里面放了一个AtomicLong,是线程安全的;可以直接调用incrementAndGet来增加

运行代码,可以得到类似这样的结果

finished : 1

finished : 0

finished : 3

finished : 2

finished : 5

finished : 4

finished : 6

finished : 8

finished : 9

finished : 7

both have finished....

NonSafeSeq:91723

SafeSeq with atomic: 100000

可以看到,10个线程,每个线程运行了10,000次,理论上应该有100,000次增加,使用了普通的long是非线程安全的,而使用了AtomicLong是线程安全的;

注意,这个例子也说明,虽然long本身的单个设置是原子的,要么成功要么不成功,但是诸如count++这样的操作就不是线程安全的;因为这包括了读取和写入两步操作;

说一说java的concurrent包4--可以代替synchronized关键字的ReentrantLock

在jdk 1.4时代,线程间的同步主要依赖于synchronized关键字,本质上该关键字是一个对象锁,可以加在不同的instance上或者class上,从使用的角度则分别可以加在非静态方法,静态方法,以及直接synchronized(MyObject)这样的用法;

在jdk 1.4时代,线程间的同步主要依赖于synchronized关键字,本质上该关键字是一个对象锁,可以加在不同的instance上或者class上,从使用的角度则分别可以加在非静态方法,静态方法,以及直接synchronized(MyObject)这样的用法;

concurrent包提供了一个可以替代synchronized关键字的ReentrantLock,

简单的说你可以new一个ReentrantLock, 然后通过lock.lock和lock.unlock来获取锁和释放锁;注意必须将unlock放在finally块里面,

reentrantlock的好处

1. 是更好的性能,

2. 提供同一个lock对象上不同condition的信号通知

3. 还提供lockInterruptibly这样支持响应中断的加锁过程,意思是说你试图去加锁,但是当前锁被其他线程hold住,然后你这个线程可以被中断;

简单的一个例子:

Java代码

package com.hetaoblog.concurrent.test;  
  
import java.util.concurrent.CountDownLatch;  
import java.util.concurrent.locks.ReentrantLock;  
  
import org.junit.Test;  
  
public class ReentrantLockDemo {  
  
    @Test  
    public void demoLock()  
    {  
        final int loopcount = 10000;  
        int threadcount = 10;  
  
        final SafeSeqWithLock seq = new SafeSeqWithLock();  
  
        final CountDownLatch l = new CountDownLatch(threadcount);  
  
        for(int i = 0; i < threadcount; ++i)  
        {  
            final int index = i;  
            new Thread(new Runnable() {  
  
                @Override  
                public void run() {  
                    for(int j = 0; j < loopcount; ++j)  
                    {  
  
                        seq.inc();  
  
                    }  
  
                    System.out.println("finished : " + index);  
                    l.countDown();  
  
                }  
            }).start();  
        }  
  
        try {  
            l.await();  
        } catch (InterruptedException e) {  
  
            e.printStackTrace();  
        }  
  
        System.out.println("both have finished....");  
  
        System.out.println("SafeSeqWithLock:" + seq.get());  
  
    }  
}  
  
class SafeSeqWithLock{  
    private long count = 0;  
  
    private ReentrantLock lock = new ReentrantLock();  
  
    public void inc()  
    {  
        lock.lock();  
  
        try{  
            count++;  
        }  
        finally{  
            lock.unlock();  
        }  
    }  
  
    public long get()  
    {  
        return count;  
    }  
}

同样以前面的类似Sequence的类举例,通过对inc操作加锁,保证了线程安全;

当然,这里get()我没有加锁,对于这样直接读取返回原子类型的函数,我认为不加锁是没问题的,相当于返回最近成功操作的值;

运行结果类似这样,

finished : 7

finished : 2

finished : 6

finished : 1

finished : 5

finished : 3

finished : 0

finished : 9

finished : 8

finished : 4

both have finished....

SafeSeqWithLock:100000

说一说java的concurrent包5--读写锁ReadWriteLock

concurrent包里面还提供了一个非常有用的锁,读写锁ReadWriteLock

concurrent包里面还提供了一个非常有用的锁,读写锁ReadWriteLock

下面是ReadWriteLock接口的说明:

A ReadWriteLock maintains a pair of associated locks, one for read-only operations and one for writing. The read lock may be held simultaneously by multiple reader threads, so long as there are no writers. The write lock is exclusive.

意思是说读锁可以有很多个锁同时上锁,只要当前没有写锁;

写锁是排他的,上了写锁,其他线程既不能上读锁,也不能上写锁;同样,需要上写锁的前提是既没有读锁,也没有写锁;

两个写锁不能同时获得无需说明,下面一段程序说明下上了读锁以后,其他线程需要上写锁也无法获得

Java代码

@Test  
public void testRWLock_getw_onr()  
{  
    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();  
  
    final Lock rlock = lock.readLock();  
    final Lock wlock = lock.writeLock();  
  
    final CountDownLatch l  = new CountDownLatch(2);  
  
    // start r thread  
    new Thread(new Runnable() {  
  
        @Override  
        public void run() {  
  
            System.out.println(new Date() + "now to get rlock");  
            rlock.lock();  
  
            try {  
                Thread.currentThread().sleep(20 * 1000);  
            } catch (InterruptedException e) {  
  
                e.printStackTrace();  
            }  
  
            System.out.println(new Date() + "now to unlock rlock");  
            rlock.unlock();  
  
            l.countDown();  
        }  
    }).start();  
  
    // start w thread  
    new Thread(new Runnable() {  
  
        @Override  
        public void run() {  
  
            System.out.println(new Date() + "now to get wlock");  
            wlock.lock();  
  
            System.out.println(new Date() + "now to unlock wlock");  
            wlock.unlock();  
  
            l.countDown();  
        }  
    }).start();  
  
    try {  
        l.await();  
    } catch (InterruptedException e) {  
  
        e.printStackTrace();  
    }  
  
    System.out.println(new Date() + "finished");  
}

这代码在我机器上打印的结果是, 也就是试图获得写锁的线程只有当另外一个线程将读锁释放了以后才可以获得

Tue Feb 28 23:18:13 CST 2012now to get rlock

Tue Feb 28 23:18:13 CST 2012now to get wlock

Tue Feb 28 23:18:33 CST 2012now to unlock rlock

Tue Feb 28 23:18:33 CST 2012now to unlock wlock

Tue Feb 28 23:18:33 CST 2012finished

ReadWriteLock的实现是ReentrantReadWriteLock,

有趣的是,在一个线程中,读锁不能直接升级为写锁,但是写锁可以降级为读锁;

这意思是,如果你已经有了读锁,再去试图获得写锁,将会无法获得, 一直堵住了;

但是如果你有了写锁,再去试图获得读锁,没问题;

下面是一段降级的代码,

Java代码

@Test  
public void testRWLock_downgrade()  
{  
    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();  
  
    Lock rlock = lock.readLock();  
    Lock wlock = lock.writeLock();  
  
    System.out.println("now to get wlock");  
  
    wlock.lock();  
    System.out.println("now to get rlock");  
    rlock.lock();  
  
    System.out.println("now to unlock wlock");  
  
    wlock.unlock();  
  
    System.out.println("now to unlock rlock");  
    rlock.unlock();  
  
    System.out.println("finished");  
  
}

可以正常打印出

now to get wlock

now to get rlock

now to unlock wlock

now to unlock rlock

finished

下面是一段升级的代码,

Java代码

@Test  
    public void testRWLock_upgrade()  
    {  
        ReentrantReadWriteLock lock = new ReentrantReadWriteLock();  
  
        Lock rlock = lock.readLock();  
        Lock wlock = lock.writeLock();  
  
        System.out.println("now to get rlock");  
        rlock.lock();  
  
        System.out.println("now to get wlock");  
        wlock.lock();  
  
        System.out.println("now to unlock wlock");  
        wlock.unlock();  
  
        System.out.println("now to unlock rlock");  
        rlock.unlock();  
  
        System.out.println("finished");  
  
    }

只能打印出下面两句,后面就一直挂住了

now to get rlock

now to get wlock

说一说java的concurrent包6–java里面的线程基础类Thread

有网友建议我在介绍concurrent包之前先介绍下jdk1.5之前的多线程知识,这是个相当不错的想法, 这篇就先介绍下Thread类;

有网友建议我在介绍concurrent包之前先介绍下jdk1.5之前的多线程知识,这是个相当不错的想法, 这篇就先介绍下Thread类;

Thread类是java中的线程,几乎所有的多线程都在Thread这个类的基础之后展开;

下面介绍这个类的基本用法,Thread类的最基本函数就是run函数

public void run()

简单的说来,基本的创建一个完成自己功能的线程可以继承Thread类,然后override这个run方法, 如下所示

Java代码

public class ThreadDemo {  
  
    @Test  
    public void testThread()  
    {  
        SimpleThread t = new SimpleThread();  
        t.start();  
  
    }   
  
}  
class SimpleThread extends Thread{  
  
    @Override  
    public void run() {  
  
        System.out.println( Thread.currentThread().getName() + " is running  ");  
    }  
}

通常在run方法里面实现自己要做的功能,这里简单的打印了了一句话, 运行结果是

Thread-0 is running

启动一个线程就是new一个自己的Thread对象,然后调用其中的start方法启动这个线程;注意, run()方法运行结束之后这个线程的生命周期就结束了;

上面举的例子是说启动一个线程就去完成一个任务,有的时候我们需要一个线程始终在跑,定期执行一些任务,然后在某个时刻停止这个线程的运行; 那么可以有类似下面的一段代码:

Java代码

public class ThreadDemo {  
  
    public static void main(String[] args)  
    {  
        PeriodicalRunningThread t = new PeriodicalRunningThread();  
        t.start();  
  
        System.out.println("main thread is going to sleep...");  
        try {  
            Thread.currentThread().sleep(20 * 1000);  
  
        } catch (InterruptedException e) {  
  
            e.printStackTrace();  
        }  
  
        System.out.println(new Date() + " now to stop PeriodicalRunningThread");  
        t.setRunning(false);  
  
    }  
  
}   
  
class PeriodicalRunningThread extends Thread{  
  
    private volatile boolean running = true;  
  
    @Override  
    public void run() {  
  
        while(running)  
        {  
            System.out.println(new Date() + " " + Thread.currentThread().getName() +  " is running " + new Date());  
  
            try {  
                Thread.currentThread().sleep(5 * 1000);  
  
            } catch (InterruptedException e) {  
  
                e.printStackTrace();  
            }  
        }  
  
        System.out.println(new Date() + " " + Thread.currentThread().getName() + " will end");  
    }  
  
    public void setRunning(boolean running) {  
        this.running = running;  
    }  
  
}

这段代码的打印结果是:

main thread is going to sleep…

Wed Feb 29 21:10:39 CST 2012 Thread-0 is running Wed Feb 29 21:10:39 CST 2012

Wed Feb 29 21:10:44 CST 2012 Thread-0 is running Wed Feb 29 21:10:44 CST 2012

Wed Feb 29 21:10:49 CST 2012 Thread-0 is running Wed Feb 29 21:10:49 CST 2012

Wed Feb 29 21:10:54 CST 2012 Thread-0 is running Wed Feb 29 21:10:54 CST 2012

Wed Feb 29 21:10:59 CST 2012 now to stop PeriodicalRunningThread

Wed Feb 29 21:10:59 CST 2012 Thread-0 will end

这里通过一个volatile的boolean值来作为标识表示这个线程的停止;

关于这里的volatile关键字的使用,如有兴趣可以先看这个,核桃博客也会在这个系列的后续文章中对这个关键字做说明

http://www.ibm.com/developerworks/cn/java/j-jtp06197.html

这样,在这个running标识为true的时候,该线程一直在跑,但是完成一段任务后会sleep一段时间,然后继续执行;

说一说java的concurrent包7–Thread和Runnable

这篇还是Thread和Runnable的基础

这篇还是Thread和Runnable的基础

在前面一篇的代码里面已经介绍了Thread类的其他几个常用的方法,

1. sleep函数,作用是让当前线程sleep一段时间,单位以毫秒计算;

public static void sleep(long millis)

2. 静态方法Thread.currentThread(), 得到当前线程

public static Thread currentThread()

3. getName方法,得到当前线程名称

public final String getName()

这个名称可以在构造Thread的时候传入, 也可以通过setName()方法设置;这个在多线程调试的时候是比较有用的,设置当前线程名,然后在log4j的输出字符串格式里面加入%t,就可以在日志中打印当前线程名称,方便看到当前的日志是从哪里来的;

现在介绍下多线程里面另外一个重要的接口Runnable, 这个接口表示可以被一个线程执行的任务,事实上Thread类也实现了这个Runnable接口;

这个接口只有一个函数, 实现者只要在里面调用代码就可以了

void run()

同时, Thread类有个构造函数是传入一个Runnable实现的;

常用的一个用法就是通过匿名内部类来创建线程执行简单任务,避免写太多的类,外部需要的变量可以通过加final修饰符后传入, 代码例子如下:

Java代码

public static void testThreadWithRunnable()  
{  
    final String word = "hello,world";  
    new Thread(new Runnable() {  
  
        @Override  
        public void run() {  
            System.out.println(word);  
  
        }  
    }).start();  
}  
  
public static void main(String[] args)  
{  
    //periodicalThreadTest();  
  
    testThreadWithRunnable();  
  
}

上面的代码会打印

hello,world

说一说java的concurrent包8–用在一个lock上的多个Condition

concurrent系列的前一篇说到说一说java的concurrent包7–thread和runnable,现在继续,今天介绍下Condtion这个接口,可以用在一个lock上的多个不同的情况;

在jdk的线程同步代码中,无论的synchronized关键字,或者是lock上的await/signal等,都只能在一个锁上做同步通知;

假设有3个线程,要对一个资源做同步,一般只能有一个锁来做同步通知操作,那么通知的时候无法做到精确的通知3个线程中的某一个的;

因为你调用了wait()/notify()的时候,具体的调度是jvm决定的;

但是有的时候的确需要需要对一个锁做多种不同情况的精确通知, 比如一个缓存,满了和空了是两种不同的情况,可以分别通知取数据的线程和放数据的线程;

Condition的基本使用如下:

* Condition是个接口,基本的方法就是await()和signal()方法;

* Condition依赖于Lock接口,生成一个Condition的基本代码是lock.newCondition()

* 调用Condition的await()和signal()方法,都必须在lock保护之内,就是说必须在lock.lock()和lock.unlock之间才可以

* 和Object.wait()方法一样,每次调用Condition的await()方法的时候,当前线程就自动释放了对当前锁的拥有权

当然,Condition其实是个接口,上面说的这几点,在实现Condition的时候可以自由控制一点;但是jdk的javadoc说了,如果有啥特别的实现,必须要清楚的说明的;

下一节我会结合具体的代码来介绍下Condition的使用;

说一说java的concurrent包9–Condition的代码例子BoundedBuffer

面说了Condition的基本含义,今天这篇说下Condition的一个代码例子;

javadoc里面对Condition有一个绝佳的例子,BoundedBuffer类,就是一个线程安全的有界限的缓存;非常巧妙的利用了Condition,根据来通知不同的线程做不同的事情;

下面先看下具体代码:

Java代码

class BoundedBuffer {  
  
   final Lock lock = new ReentrantLock();  
  
   final Condition notFull  = lock.newCondition();   
  
   final Condition notEmpty = lock.newCondition();   
  
  
  
   final Object[] items = new Object[100];  
  
   int putptr, takeptr, count;  
  
  
  
   public void put(Object x) throws InterruptedException {  
  
     lock.lock();  
  
     try {  
  
       while (count == items.length)   
  
         notFull.await();  
  
       items[putptr] = x;   
  
       if (++putptr == items.length) putptr = 0;  
  
       ++count;  
  
       notEmpty.signal();  
  
     } finally {  
  
       lock.unlock();  
  
     }  
  
   }  
  
  
  
   public Object take() throws InterruptedException {  
  
     lock.lock();  
  
     try {  
  
       while (count == 0)   
  
         notEmpty.await();  
  
       Object x = items[takeptr];   
  
       if (++takeptr == items.length) takeptr = 0;  
  
       --count;  
  
       notFull.signal();  
  
       return x;  
  
     } finally {  
  
       lock.unlock();  
  
     }  
  
   }   
  
 }

代码意思不复杂,一个有界的buffer,里面是个数组,可以往里面放数据和取数据;

由于该buffer被多个线程共享,所以每次放和取操作的时候都用一个lock保护起来;

每次取数据(take)的时候,

a. 如果当前个数是0(用一个count计数), 那么就调用notEmpty.await等待,锁就释放了;

b. 取数据的索引专门有一个,每次向前一步; 如果到头了就从0开始循环使用

c.如果有数据,那就取一个数据,将count减1,同时调用notfull.signal(),

每次放数据(put)的时候

a.如果count和length相等,也就是满了,那就调用notFull.await等待,释放了锁; 等待有一些take()调用完成之后才会进入

b. 放数据也有一个索引putptr, 放入数据; 如果到头了也从0开始循环使用

c. 调用notempty.signal(); 如果有线程在take()的时候await住了,那么就会被通知到,可以继续进行操作

说一说java的concurrent包10–Condition和BoundedBuffer的测试代码

前面一篇说了Condition和BoundedBuffer的基本代码,下面写一个简单的程序测试下这个BoundedBuffer;

前面一篇说了Condition和BoundedBuffer的基本代码,下面写一个简单的程序测试下这个BoundedBuffer;

这段程序的目的是测试先put()后take()的操作,

1. 我将BoundedBuffer的大小设置成5,同时在每次进入notFull和notEmpty的await()的时候打印一下表示当前线程正在等待;

2. 先开启10个线程做put()操作,预计有5个线程可以完成,另外5个会进入等待

3. 主线程sleep10秒中,然后启动10个线程做take()操作;

这个时候,首先第一个take()必然成功完成,在这之前等待的5个put()线程都不会被唤醒, 接下来的事情就不好说了;

剩下的5个put()线程和9个take()线程中的任何一个都可能会被jvm调度;

比如可能出现

a. 开始take()的时候,有5个连续的take()线程完成操作; 然后又进入put()和take()交替的情况

b. 第一个take()之后,立刻会有一个put()线程被notFull().signal()唤醒; 然后继续有take()和put()交替的情况;

其中take()线程也可能进入notEmpty.await()操作;

但是任何时候,未完成的take()线程始终>=未完成的put()线程, 这个也是很自然的;

Java代码

package com.hetaoblog.concurrent.test;  
  
  
  
import java.util.Date;  
  
import java.util.concurrent.CountDownLatch;  
  
import java.util.concurrent.locks.Condition;  
  
import java.util.concurrent.locks.Lock;  
  
import java.util.concurrent.locks.ReentrantLock;  
  
  
  
import org.junit.Test;  
  
  
  
public class BoundedBufferTest {  
  
      
  
      
  
      
  
    @Test  
  
    public void testPutTake()  
  
    {  
  
          
  
        final BoundedBuffer bb = new BoundedBuffer();  
  
          
  
          
  
        int count = 10;  
  
        final CountDownLatch c = new CountDownLatch(count * 2);  
  
          
  
          
  
        System.out.println(new Date() + " now try to call put for " + count );  
  
          
  
        for(int i = 0; i < count ; ++i)  
  
        {  
  
            final int index = i;  
  
            try {  
  
                Thread t = new Thread(new Runnable() {  
  
                      
  
                    @Override  
  
                    public void run() {  
  
                          
  
                        try {  
  
                            bb.put(index);  
  
                            System.out.println(new Date() + "  put finished:  " + index);  
  
                        } catch (InterruptedException e) {  
  
                              
  
                            e.printStackTrace();  
  
                        }  
  
                          
  
                        c.countDown();  
  
                    }  
  
                });  
  
  
  
                t.start();  
  
                  
  
                  
  
            } catch (Exception e) {  
  
                  
  
                e.printStackTrace();  
  
            }  
  
        }  
  
          
  
        try {  
  
            System.out.println(new Date() + " main thread is going to sleep for 10 seconds");  
  
            Thread.sleep(10 * 1000);  
  
        } catch (InterruptedException e1) {  
  
              
  
            e1.printStackTrace();  
  
        }  
  
          
  
        System.out.println(new Date() + " now try to take for count: " + count);  
  
          
  
          
  
        for(int i =0; i < count; ++i)  
  
        {  
  
              
  
            Thread t= new Thread(new Runnable() {  
  
                  
  
                @Override  
  
                public void run() {  
  
                      
  
                    try {  
  
                          
  
                          
  
                        Object o = bb.take();  
  
                          
  
                        System.out.println(new Date() + " take get: " + o);  
  
                    } catch (InterruptedException e) {  
  
                          
  
                        e.printStackTrace();  
  
                    }  
  
                      
  
                      
  
                      
  
                      
  
                      
  
                      
  
                    c.countDown();  
  
                      
  
                }  
  
            });  
  
              
  
            t.start();  
  
        }  
  
          
  
        try {  
  
              
  
            System.out.println(new Date() + ": main thread is to wait for all threads");  
  
            c.await();  
  
              
  
              
  
        } catch (InterruptedException e) {  
  
              
  
            e.printStackTrace();  
  
        }  
  
          
  
        System.out.println(new Date() + " all threads finished");  
  
          
  
    }  
  
  
  
}  
  
class BoundedBuffer {  
  
       final Lock lock = new ReentrantLock();  
  
       final Condition notFull  = lock.newCondition();   
  
       final Condition notEmpty = lock.newCondition();   
  
  
  
       final Object[] items = new Object[5];  
  
       int putptr, takeptr, count;  
  
  
  
       public void put(Object x) throws InterruptedException {  
  
         lock.lock();  
  
         try {  
  
           while (count == items.length)   
  
           {  
  
               System.out.println(new Date() + " put  is to wait....");  
  
             notFull.await();  
  
           }  
  
           items[putptr] = x;   
  
           if (++putptr == items.length) putptr = 0;  
  
           ++count;  
  
           notEmpty.signal();  
  
         } finally {  
  
           lock.unlock();  
  
         }  
  
       }  
  
  
  
       public Object take() throws InterruptedException {  
  
         lock.lock();  
  
         try {  
  
           while (count == 0)  
  
           {  
  
               System.out.println(new Date() + " take is going to wait..");  
  
             notEmpty.await();  
  
           }  
  
           Object x = items[takeptr];   
  
           if (++takeptr == items.length) takeptr = 0;  
  
           --count;  
  
           notFull.signal();  
  
           return x;  
  
         } finally {  
  
           lock.unlock();  
  
         }  
  
       }   
  
     }

下面是这段程序在我机器上的运行结果:

这是其中一个执行结果,正好对应前面说的情况a, 5个take()先完成;这里出现了take()线程调用notEmpty.await()的情况

Thu Mar 15 21:15:13 CST 2012 now try to call put for 10

Thu Mar 15 21:15:13 CST 2012 put finished: 0

Thu Mar 15 21:15:13 CST 2012 put finished: 2

Thu Mar 15 21:15:13 CST 2012 put finished: 3

Thu Mar 15 21:15:13 CST 2012 put finished: 1

Thu Mar 15 21:15:13 CST 2012 main thread is going to sleep for 10 seconds

Thu Mar 15 21:15:13 CST 2012 put finished: 4

Thu Mar 15 21:15:13 CST 2012 put is to wait....

Thu Mar 15 21:15:13 CST 2012 put is to wait....

Thu Mar 15 21:15:13 CST 2012 put is to wait....

Thu Mar 15 21:15:13 CST 2012 put is to wait....

Thu Mar 15 21:15:13 CST 2012 put is to wait....

Thu Mar 15 21:15:23 CST 2012 now try to take for count: 10

Thu Mar 15 21:15:23 CST 2012 take get: 3

Thu Mar 15 21:15:23 CST 2012 take get: 2

Thu Mar 15 21:15:23 CST 2012 take get: 1

Thu Mar 15 21:15:23 CST 2012 take get: 0

Thu Mar 15 21:15:23 CST 2012 take get: 4

Thu Mar 15 21:15:23 CST 2012 put finished: 5

Thu Mar 15 21:15:23 CST 2012: main thread is to wait for all threads

Thu Mar 15 21:15:23 CST 2012 take is going to wait..

Thu Mar 15 21:15:23 CST 2012 take get: 5

Thu Mar 15 21:15:23 CST 2012 put finished: 6

Thu Mar 15 21:15:23 CST 2012 put finished: 8

Thu Mar 15 21:15:23 CST 2012 put finished: 7

Thu Mar 15 21:15:23 CST 2012 put finished: 9

Thu Mar 15 21:15:23 CST 2012 take get: 6

Thu Mar 15 21:15:23 CST 2012 take get: 7

Thu Mar 15 21:15:23 CST 2012 take get: 8

Thu Mar 15 21:15:23 CST 2012 take get: 9

Thu Mar 15 21:15:23 CST 2012 all threads finished

这是另一个执行结果:

Thu Mar 15 21:02:49 CST 2012 now try to call put for 10

Thu Mar 15 21:02:49 CST 2012 put finished: 3

Thu Mar 15 21:02:49 CST 2012 put finished: 1

Thu Mar 15 21:02:49 CST 2012 put finished: 0

Thu Mar 15 21:02:49 CST 2012 put finished: 2

Thu Mar 15 21:02:49 CST 2012 put finished: 4

Thu Mar 15 21:02:49 CST 2012 put is to wait....

Thu Mar 15 21:02:49 CST 2012 put is to wait....

Thu Mar 15 21:02:49 CST 2012 put is to wait....

Thu Mar 15 21:02:49 CST 2012 main thread is going to sleep for 10 seconds

Thu Mar 15 21:02:49 CST 2012 put is to wait....

Thu Mar 15 21:02:49 CST 2012 put is to wait....

Thu Mar 15 21:02:59 CST 2012 now try to take for count: 10

Thu Mar 15 21:02:59 CST 2012 take get: 1

Thu Mar 15 21:02:59 CST 2012 take get: 0

Thu Mar 15 21:02:59 CST 2012 take get: 3

Thu Mar 15 21:02:59 CST 2012 take get: 4

Thu Mar 15 21:02:59 CST 2012: main thread is to wait for all threads

Thu Mar 15 21:02:59 CST 2012 take is going to wait..

Thu Mar 15 21:02:59 CST 2012 take is going to wait..

Thu Mar 15 21:02:59 CST 2012 put finished: 5

Thu Mar 15 21:02:59 CST 2012 take get: 2

Thu Mar 15 21:02:59 CST 2012 take get: 5

Thu Mar 15 21:02:59 CST 2012 take is going to wait..

Thu Mar 15 21:02:59 CST 2012 take is going to wait..

Thu Mar 15 21:02:59 CST 2012 put finished: 7

Thu Mar 15 21:02:59 CST 2012 put finished: 6

Thu Mar 15 21:02:59 CST 2012 put finished: 8

Thu Mar 15 21:02:59 CST 2012 put finished: 9

Thu Mar 15 21:02:59 CST 2012 take get: 7

Thu Mar 15 21:02:59 CST 2012 take get: 6

Thu Mar 15 21:02:59 CST 2012 take get: 8

Thu Mar 15 21:02:59 CST 2012 take get: 9

Thu Mar 15 21:02:59 CST 2012 all threads finished

执行结果2:

Thu Mar 15 21:14:30 CST 2012 now try to call put for 10

Thu Mar 15 21:14:30 CST 2012 main thread is going to sleep for 10 seconds

Thu Mar 15 21:14:30 CST 2012 put finished: 8

Thu Mar 15 21:14:30 CST 2012 put finished: 6

Thu Mar 15 21:14:30 CST 2012 put finished: 2

Thu Mar 15 21:14:30 CST 2012 put finished: 0

Thu Mar 15 21:14:30 CST 2012 put finished: 4

Thu Mar 15 21:14:30 CST 2012 put is to wait....

Thu Mar 15 21:14:30 CST 2012 put is to wait....

Thu Mar 15 21:14:30 CST 2012 put is to wait....

Thu Mar 15 21:14:30 CST 2012 put is to wait....

Thu Mar 15 21:14:30 CST 2012 put is to wait....

Thu Mar 15 21:14:40 CST 2012 now try to take for count: 10

Thu Mar 15 21:14:40 CST 2012 take get: 8

Thu Mar 15 21:14:40 CST 2012 take get: 6

Thu Mar 15 21:14:40 CST 2012 take get: 4

Thu Mar 15 21:14:40 CST 2012 take get: 2

Thu Mar 15 21:14:40 CST 2012: main thread is to wait for all threads

Thu Mar 15 21:14:40 CST 2012 take get: 0

Thu Mar 15 21:14:40 CST 2012 take is going to wait..

Thu Mar 15 21:14:40 CST 2012 take is going to wait..

Thu Mar 15 21:14:40 CST 2012 take is going to wait..

Thu Mar 15 21:14:40 CST 2012 put finished: 1

Thu Mar 15 21:14:40 CST 2012 put finished: 5

Thu Mar 15 21:14:40 CST 2012 put finished: 3

Thu Mar 15 21:14:40 CST 2012 put finished: 9

Thu Mar 15 21:14:40 CST 2012 take get: 1

Thu Mar 15 21:14:40 CST 2012 put finished: 7

Thu Mar 15 21:14:40 CST 2012 take get: 5

Thu Mar 15 21:14:40 CST 2012 take get: 3

Thu Mar 15 21:14:40 CST 2012 take get: 7

Thu Mar 15 21:14:40 CST 2012 take get: 9

Thu Mar 15 21:14:40 CST 2012 all threads finished

在几次不同的执行中,始终可以观察到任何时候,未完成的take()线程数>= 未完成的put()线程; 在未完成的线程数相等的情况下,即使jvm首先调度到了take()线程,也会进入notEmpty.await()释放锁,进入等待

说一说java的concurrent包11–Condition和BoundedBuffer的测试代码2

前面一篇说了一个Condition和BoundedBuffer的测试代码例子,前面测试的是先put()再take()的操作,这篇说一下先take()再put()的操作;

前面一篇说了一个Condition和BoundedBuffer的测试代码例子,前面测试的是先put()再take()的操作,这篇说一下先take()再put()的操作;

当然,必须先要说明的是,这篇和前面这篇在打印日志的时候其实是有错误的,这个错误在前面一篇并不明显,不会导致明显的问题;

但是同样的原因导致现在这个先take()再put()的操作会出现明显的错误,看上去会显得不可思议;

具体情况留到下一篇详细说明,这里先上测试目的,测试代码和运行结果;

同时说明多线程编程需要非常谨慎,否则极易出错

测试目的:

1. 我将BoundedBuffer的大小设置成5,同时在每次进入notFull和notEmpty的await()的时候打印一下表示当前线程正在等待;

2. 先开启10个线程做take()操作,由于开始BoundedBuffer里面没有东西,所以10个线程全部调用await进入等待

3. 主线程sleep10秒中,然后启动10个线程做put()操作;

在第一个put()完成之后,接下来应该会有部分put()线程和take()线程先后完成;

理论上,

a. 任何一个元素的put()都会发生在take()之前;

b. 如果X表示某个操作成功的次数,在X(put)-X(take)<5的时候,put线程不会进入等待状态

下面是测试代码:

Java代码

    @Test  
  
    public void testTakePut()  
  
    {  
  
  
  
        final BoundedBuffer bb = new BoundedBuffer();  
  
          
  
          
  
        int count = 10;  
  
        final CountDownLatch c = new CountDownLatch(count * 2);  
  
          
  
  
  
        System.out.println(new Date() + " first try to call take for count: " + count);  
  
        for(int i =0; i < count; ++i)  
  
        {  
  
            final int index = i;  
  
            Thread t= new Thread(new Runnable() {  
  
                  
  
                @Override  
  
                public void run() {  
  
                      
  
                    try {  
  
                          
  
                        Thread.currentThread().setName(" TAKE " + index);  
  
                          
  
                          
  
                        Object o = bb.take();  
  
                          
  
                        System.out.println(new Date() + " " + " take get: " + o );  
  
                    } catch (InterruptedException e) {  
  
                          
  
                        e.printStackTrace();  
  
                    }  
  
                      
  
                      
  
                      
  
                      
  
                      
  
                      
  
                    c.countDown();  
  
                      
  
                }  
  
            });  
  
              
  
            t.start();  
  
        }  
  
          
  
        try {  
  
            System.out.println(new Date() + " main thread is going to sleep for 10 seconds");  
  
            Thread.sleep(10 * 1000);  
  
        } catch (InterruptedException e1) {  
  
              
  
            e1.printStackTrace();  
  
        }  
  
          
  
        System.out.println(new Date() + " now try to call put for " + count );  
  
          
  
        for(int i = 0; i < count ; ++i)  
  
        {  
  
            final int index = i;  
  
            try {  
  
                Thread t = new Thread(new Runnable() {  
  
                      
  
                    @Override  
  
                    public void run() {  
  
                          
  
                          
  
                        Thread.currentThread().setName(" PUT " + index);  
  
                          
  
                        try {  
  
                            bb.put(index);  
  
                            System.out.println(new Date() + " " + "  put finished:  " + index );  
  
                        } catch (InterruptedException e) {  
  
                              
  
                            e.printStackTrace();  
  
                        }  
  
                          
  
                        c.countDown();  
  
                    }  
  
                });  
  
  
  
                t.start();  
  
                  
  
                  
  
            } catch (Exception e) {  
  
                  
  
                e.printStackTrace();  
  
            }  
  
        }  
  
          
  
          
  
        try {  
  
              
  
            System.out.println(new Date() + ": main thread is to wait for all threads");  
  
            c.await();  
  
              
  
              
  
        } catch (InterruptedException e) {  
  
              
  
            e.printStackTrace();  
  
        }  
  
          
  
        System.out.println(new Date() + " all threads finished");  
  
    }  
  
  
  
  
  
class BoundedBuffer {  
  
       final Lock lock = new ReentrantLock();  
  
       final Condition notFull  = lock.newCondition();   
  
       final Condition notEmpty = lock.newCondition();   
  
  
  
       final Object[] items = new Object[5];  
  
       int putptr, takeptr, count;  
  
  
  
       public void put(Object x) throws InterruptedException {  
  
         lock.lock();  
  
         try {  
  
           while (count == items.length)   
  
           {  
  
               System.out.println(new Date() + " " + Thread.currentThread().getName() + " put  is to wait....: " + System.currentTimeMillis());    
  
             notFull.await();  
  
               
  
           }  
  
           items[putptr] = x;   
  
           if (++putptr == items.length) putptr = 0;  
  
           ++count;  
  
             
  
           notEmpty.signal();  
  
         } finally {  
  
           lock.unlock();  
  
         }  
  
       }  
  
  
  
       public Object take() throws InterruptedException {  
  
         lock.lock();  
  
         try {  
  
           while (count == 0)  
  
           {  
  
               System.out.println(new Date() + " " + Thread.currentThread().getName() + " take is going to wait.. " + System.currentTimeMillis());    
  
             notEmpty.await();  
  
               
  
           }  
  
           Object x = items[takeptr];   
  
           if (++takeptr == items.length) takeptr = 0;  
  
           --count;  
  
             
  
             
  
             
  
           notFull.signal();  
  
           return x;  
  
         } finally {  
  
           lock.unlock();  
  
         }  
  
       }   
  
     }

运行结果1:

Fri Mar 16 20:50:10 CST 2012 first try to call take for count: 10

Fri Mar 16 20:50:10 CST 2012 TAKE 0 take is going to wait..

Fri Mar 16 20:50:10 CST 2012 TAKE 1 take is going to wait..

Fri Mar 16 20:50:10 CST 2012 TAKE 2 take is going to wait..

Fri Mar 16 20:50:10 CST 2012 TAKE 3 take is going to wait..

Fri Mar 16 20:50:10 CST 2012 TAKE 5 take is going to wait..

Fri Mar 16 20:50:10 CST 2012 main thread is going to sleep for 10 seconds

Fri Mar 16 20:50:10 CST 2012 TAKE 4 take is going to wait..

Fri Mar 16 20:50:10 CST 2012 TAKE 7 take is going to wait..

Fri Mar 16 20:50:10 CST 2012 TAKE 6 take is going to wait..

Fri Mar 16 20:50:10 CST 2012 TAKE 9 take is going to wait..

Fri Mar 16 20:50:10 CST 2012 TAKE 8 take is going to wait..

Fri Mar 16 20:50:20 CST 2012 now try to call put for 10

Fri Mar 16 20:50:20 CST 2012: main thread is to wait for all threads

Fri Mar 16 20:50:20 CST 2012 PUT 7 put finished: 7

Fri Mar 16 20:50:20 CST 2012 PUT 9 put finished: 9

Fri Mar 16 20:50:20 CST 2012 PUT 8 put finished: 8

Fri Mar 16 20:50:20 CST 2012 PUT 3 put is to wait....

Fri Mar 16 20:50:20 CST 2012 PUT 1 put is to wait....

Fri Mar 16 20:50:20 CST 2012 PUT 5 put finished: 5

Fri Mar 16 20:50:20 CST 2012 PUT 4 put is to wait....

Fri Mar 16 20:50:20 CST 2012 TAKE 0 take get: 8

Fri Mar 16 20:50:20 CST 2012 TAKE 2 take get: 9

Fri Mar 16 20:50:20 CST 2012 TAKE 3 take get: 0

Fri Mar 16 20:50:20 CST 2012 TAKE 5 take get: 6

Fri Mar 16 20:50:20 CST 2012 TAKE 4 take get: 5

Fri Mar 16 20:50:20 CST 2012 PUT 2 put finished: 2

Fri Mar 16 20:50:20 CST 2012 PUT 3 put finished: 3

Fri Mar 16 20:50:20 CST 2012 PUT 1 put finished: 1

Fri Mar 16 20:50:20 CST 2012 TAKE 7 take get: 2

Fri Mar 16 20:50:20 CST 2012 TAKE 6 take get: 3

Fri Mar 16 20:50:20 CST 2012 TAKE 9 take get: 1

Fri Mar 16 20:50:20 CST 2012 TAKE 8 take get: 4

Fri Mar 16 20:50:20 CST 2012 PUT 6 put finished: 6

Fri Mar 16 20:50:20 CST 2012 PUT 0 put finished: 0

Fri Mar 16 20:50:20 CST 2012 PUT 4 put finished: 4

Fri Mar 16 20:50:20 CST 2012 TAKE 1 take get: 7

Fri Mar 16 20:50:20 CST 2012 all threads finished

注意到红色部分:

第一个加为红色是因为按照打印结果,put()只完成了3次,就开始有put()进入等待了,而BoundedBuffer的大小是5,理论上应该没有满的!

第二个加为红色是因为元素4竟然先被take,然后再被put! 显然程序有地方出错了!具体原因分析,欢迎关注核桃博客:)

时间: 2024-10-06 13:49:01

java concurrent包介绍及使用的相关文章

Java5对线程处理的新操作-concurrent包介绍

上节中简单介绍了传统的jdk中的线程的概念,本节中接着介绍下jdk5之后对线程处理有哪些改变. 首先,介绍下java.util.concurrent包下有个字包atomic(原子的)包,其中的一些类提供原子性操作类,分别是: 1 AtomicBoolean, 2 AtomicInteger,AtomicIntegerArray,AtomicIntegerFieldUpdater<T>, 3 AtomicLong,AtomicLongArray,AtomicLongFieldUpdater<

java concurrent包的实现原理

由于java的CAS同时具有 volatile 读和volatile写的内存语义,因此Java线程之间的通信现在有了下面四种方式: A线程写volatile变量,随后B线程读这个volatile变量. A线程写volatile变量,随后B线程用CAS更新这个volatile变量. A线程用CAS更新一个volatile变量,随后B线程用CAS更新这个volatile变量. A线程用CAS更新一个volatile变量,随后B线程读这个volatile变量. Java的CAS会使用现代处理器上提供的

利用java concurrent 包实现日志写数据库的并发处理

一.概述 在很多系统中,往往需要将各种操作写入数据库(比如客户端发起的操作). 最简单的做法是,封装一个公共的写日志的api,各个操作中调用该api完成自己操作日志的入库.但因为入数据库效率比较低,如果每个操作自己入库,则会影响响应速度.而且当操作并发度很高时,往往同时有多个线程在写数据库,也会对系统有影响. 考虑的解决方案是,这个api并不实际完成入库,而是将每个操作日志信息写到一个公共的缓存中,然后应用系统起了一个独立的线程(一直运行)在后台进行入库.如果当前缓存中有记录,就写库,没有记录,

java concurrent包常用类小结

concurrent包是常用多线程的相关包,最近由于开发sdn程序,对于多线程使用比以前多了很多,现简单总结下. 第一类  原子类:用在多个线程共同操作一个计数的情况 AtomicLong AtomicInteger 第二类 lock和condition condition是从lock中得到的,所以在使用时,在执行了lock.lock()后才进行condition的操作,condition常用的两个方法await和signal. 常用在多个线程操作一个共同的资源,一个线程执行结束后,另一个线程才

带你玩转java多线程系列 “道篇” 多线程的优势及利用util.concurrent包测试单核多核下多线程的效率

java多线程 “道篇” - 多线程的优势及用concurrent包测试单核多核下多线程的效率 1 超哥对于多线程自己的理解 2 测试代码 3 CountDownLatch这个同步辅助类科普 4 如何把电脑设置成单核 5 测试结果 1 超哥对于多线程自己的理解 超哥的理解:对于多线程,无非是对于顺序执行下任务的一种抽取和封装,将原来顺序执行的任务单独拿出来放到线程类的run方法中,通过线程类的start方法进行执行,对于多线程访问共同资源时,我们需要加锁,也就是只有某个线程在拥有锁的时候,才能够

《java.util.concurrent 包源码阅读》09 线程池系列之介绍篇

concurrent包中Executor接口的主要类的关系图如下: Executor接口非常单一,就是执行一个Runnable的命令. public interface Executor { void execute(Runnable command); } ExecutorService接口扩展了Executor接口,增加状态控制,执行多个任务返回Future. 关于状态控制的方法: // 发出关闭信号,不会等到现有任务执行完成再返回,但是现有任务还是会继续执行, // 可以调用awaitTe

《java.util.concurrent 包源码阅读》 结束语

<java.util.concurrent 包源码阅读>系列文章已经全部写完了.开始的几篇文章是根据自己的读书笔记整理出来的(当时只阅读了部分的源代码),后面的大部分都是一边读源代码代码,一边写文章. 由于水平有限,在阅读源代码的时候,分析得也比较浅显,也有很多地方自己也没有研究明白,文章有的地方显得语焉不详,只能请各位多多见谅了. 后面会继续写一些关于Java并发编程的文章,希望各位多多指教. 这里整理了一个简单的目录,包含了本系列所有文章的链接: <java.util.concurr

《java.util.concurrent 包源码阅读》13 线程池系列之ThreadPoolExecutor 第三部分

这一部分来说说线程池如何进行状态控制,即线程池的开启和关闭. 先来说说线程池的开启,这部分来看ThreadPoolExecutor构造方法: public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecut

java中concurrent包内容

有BlockingQueue及其相关的类,跟阻塞队列有关系. ConcurrentHashMap,ConcurrentLinkedQueue等,这些是相关集合的线程同步版本. CopyOnWriteArrayList,也是一种并发用的容器,当我们改变这个数组的时候,先复制一个副本,修改这个副本,再复制回去.这样就实现了读写分离,适用于读多写少的并发场景. CountDownLatch,这个类适用于这种情况:多个线程同时工作,然后其中几个可以随意并发执行,但有一个线程需要等其他线程工作结束后,才能