Java并发编程--BlockingQueue

概述

  BlockingQueue支持两个附加操作的Queue:1)当Queue为空时,获取元素线程被阻塞直到Queue变为非空;2)当Queue满时,添加元素线程被阻塞直到Queue不满。BlockingQueue不允许元素为null,如果入队一个null元素,会抛NullPointerException。常用于生产者消费者模式。

  BlockingQueue对于不能满足条件的操作,提供了四种处理方式:

    1)直接抛异常,抛出异常。如果队列已满,添加元素会抛出IllegalStateException异常;如果队列为空,获取元素会抛出NoSuchElementException异常;

    2)返回一个特殊值(null或false);

    3)在满足条件之前,无限期的阻塞当前线程,当队列满足条件或响应中断退出;

    4)在有限时间内阻塞当前线程,超时后返回失败。

  抛出异常 返回特殊值 阻塞 超时
入队 add(e) offer(e) put(e)  offer(e, time, unit)
出队 remove() poll() take() poll(time, unit)
检查 element() peek()    

  内存一致性效果:当存在其他并发 collection 时,将对象放入 BlockingQueue 之前的线程中的操作 happen-before 随后通过另一线程从 BlockingQueue 中访问或移除该元素的操作。

  JDK提供的阻塞队列:

    ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列,遵循FIFO原则。

    LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列,遵循FIFO原则,默认和最大长度为Integer.MAX_VALUE。

    PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。

    DelayQueue:一个使用优先级队列实现的无界阻塞队列。

    SynchronousQueue:一个不存储元素的阻塞队列。

    LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。

    LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

使用

  示例:生产者-消费者,BlockingQueue 可以安全地与多个生产者和多个使用者一起使用。

class Producer implements Runnable {
    private final BlockingQueue queue;
    Producer(BlockingQueue q) { queue = q; }
    public void run() {
        try {
            while(true) { queue.put(produce()); }    //当队列满时,生产者阻塞等待
        } catch (InterruptedException ex) { ... handle ...}
    }
    Object produce() { ... }
}

//消费者
class Consumer implements Runnable {
    private final BlockingQueue queue;
    Consumer(BlockingQueue q) { queue = q; }
    public void run() {
    try {
        while(true) { consume(queue.take()); }    //当队列空时,消费者阻塞等待
    } catch (InterruptedException ex) { ... handle ...}
    }
    void consume(Object x) { ... }
}

class Setup {
    void main() {
        BlockingQueue q = new SomeQueueImplementation();
        Producer p = new Producer(q);
        Consumer c1 = new Consumer(q);
        Consumer c2 = new Consumer(q);
        new Thread(p).start();
        new Thread(c1).start();
        new Thread(c2).start();
    }
}

实现原理

  当队列满时,生产者会一直阻塞,当消费者从队列中取出元素时,如何通知生产者队列可以继续,以ArrayBlockingQueue和LinkedBlockingQueue为例,分析源代码如何实现阻塞队列。它们的阻塞机制都是基于Lock和Condition实现,其中LinkedBlockingQueue还用到了原子变量类。

  ArrayBlockingQueue

    域

 1 /** The queued items */
 2 final Object[] items;
 3 /** items index for next take, poll, peek or remove */
 4 int takeIndex;
 5 /** items index for next put, offer, or add */
 6 int putIndex;
 7 /** Number of elements in the queue */
 8 int count;
 9 /*
10  * Concurrency control uses the classic two-condition algorithm
11  * found in any textbook.
12  */
13
14 /** Main lock guarding all access */
15 final ReentrantLock lock;
16 /** Condition for waiting takes */
17 private final Condition notEmpty;
18 /** Condition for waiting puts */
19 private final Condition notFull;

      由ArrayBlockingQueue的域可以看出,使用循环数组存储队列中的元素,两个索引takeIndex和putIndex分别指向下一个要出队和入队的数组位置,线程间的通信是使用ReentrantLock和两个Condition实现的。

    put(e)&take() 阻塞

      当不满足入队或出队条件时,当前线程阻塞等待。即当队列满时,生产者会一直阻塞直到被唤醒,当队列空时,消费者会一直阻塞直到被唤醒。

      入队(put)

 1 //在队列的尾部(当前putIndex指定的位置)插入指定的元素
 2 public void put(E e) throws InterruptedException {
 3     checkNotNull(e);
 4     final ReentrantLock lock = this.lock;
 5     lock.lockInterruptibly();    //可响应中断获取锁
 6     try {
 7         while (count == items.length)    //如果队列满,在入队条件notFull的等待队列上等待。
 8                                         //这里使用While循环而非if判断,目的是防止过早或意外的通知,只有条件符合才能推出循环
 9             notFull.await();
10         insert(e);
11     } finally {
12         lock.unlock();    //释放锁,唤醒同步队列中的后继节点
13     }
14 }
15 //为保证操作线程安全,此方法必须在获取锁的前提下才能被调用
16 private void insert(E x) {
17     items[putIndex] = x;
18     putIndex = inc(putIndex);
19     ++count;            //元素数量+1
20     notEmpty.signal();    //唤醒出队条件的等待队列上的线程
21 }
22 //将i增1,当++i等于数组的最大容量时,将i置为0。即通过循环数组的方式
23 final int inc(int i) {
24     return (++i == items.length) ? 0 : i;
25 }

      从源码可以看出,入队的大致步骤如下:

        1)首先获取锁,如果获取锁失败,当前线程可能自旋获取锁或被阻塞直到获取到锁,否则执行2);

        2)循环判断队列是否满,如果满,那么当前线程被阻塞到notFull条件的等待队列中,并释放锁,等待被唤醒;

        3)当队列非满或从await方法中返回(此时当前线程从等待队列中被唤醒并重新获取到锁)时,执行插入元素操作。

        4)入队完成后,释放锁,唤醒同步队列中的后继节点。

      出队(take)

 1 public E take() throws InterruptedException {
 2     final ReentrantLock lock = this.lock;
 3     lock.lockInterruptibly();    //可响应中断获取锁
 4     try {
 5         while (count == 0)    //如果队列为空,在出队条件notEmpty的等待队列中等待
 6             notEmpty.await();
 7         return extract();
 8     } finally {
 9         lock.unlock();    //释放锁
10     }
11 }
12 //在当前takeIndex指定的位置取出元素,此方法必须在获取锁的前提下才能被调用
13 private E extract() {
14     final Object[] items = this.items;
15     E x = this.<E>cast(items[takeIndex]);    //强制类型转换
16     items[takeIndex] = null;
17     takeIndex = inc(takeIndex);    //出队索引同样采用循环的方式增1
18     --count;
19     notFull.signal();    //唤醒入队条件的等待队列中的线程
20     return x;
21 }

      从源码可以看出,出队的大致步骤如下:

        1)首先获取锁,如果获取锁失败,当前线程可能自旋获取锁或被阻塞直到获取到锁成功。

        2)获取锁成功,循环判断队列是否为空,如果为空,那么当前线程被阻塞到 notEmpty 条件的等待队列中,并释放锁,等待被唤醒;

        3)当队列非空或从await方法中返回(此时当前线程从等待队列中被唤醒并重新获取到锁)时,执行取出元素操作。

        4)出队完成后,释放锁,唤醒同步队列的后继节点,

    offer(e)&poll() 返回特殊值

      当不能满足入队或出队条件时,返回特殊值。当队列满时,入队会失败,offer方法直接返回false,反之入队成功,返回true;当队列空时,poll方法返回null。

      入队(offer)

 1 public boolean offer(E e) {
 2     checkNotNull(e);
 3     final ReentrantLock lock = this.lock;
 4     lock.lock();    //获取锁
 5     try {
 6         if (count == items.length)    //如果队列满,与put阻塞当前线程不同的是,offer方法直接返回false
 7             return false;
 8         else {
 9             insert(e);
10             return true;
11         }
12     } finally {
13         lock.unlock();    //释放锁
14     }
15 }

      出队(poll)

 1 //出队
 2 public E poll() {
 3     final ReentrantLock lock = this.lock;
 4     lock.lock();    //获取锁
 5     try {
 6         return (count == 0) ? null : extract();    //如果队列空,与take阻塞当前线程不同的是,poll方法返回null
 7     } finally {
 8         lock.unlock();    //释放锁
 9     }
10 }

    add(e)&remove() 抛异常

      当不能满足入队或出队条件时,直接抛出异常。当队列满时,入队失败,抛IllegalStateException("Queue full");当队列空时,remove方法抛NoSuchElementException()异常。

      入队(add)

 1 public boolean add(E e) {
 2     return super.add(e);
 3 }
 4
 5 //抽象类AbstractQueue提供的方法
 6 public boolean add(E e) {
 7     //如果offer返回true,那么add方法返回true;如果offer返回false,那么add方法抛IllegalStateException("Queue full")异常
 8     if (offer(e))
 9         return true;
10     else
11         throw new IllegalStateException("Queue full");
12 }

      出队(remove)

1 //抽象类AbstractQueue提供的方法
2 public E remove() {
3     E x = poll();
4     if (x != null)
5         return x;
6     else
7         throw new NoSuchElementException();
8 }

    offer&poll 超时

      使用Condition的超时等待机制实现,当不满足条件时,只在有限的时间内阻塞,超过超时时间仍然不满足条件才返回false或null。

      入队(offer(E e, long timeout, TimeUnit unit))

 1 public boolean offer(E e, long timeout, TimeUnit unit)
 2     throws InterruptedException {
 3
 4     checkNotNull(e);
 5     long nanos = unit.toNanos(timeout);    //转换为纳秒
 6     final ReentrantLock lock = this.lock;
 7     lock.lockInterruptibly();
 8     try {
 9         while (count == items.length) {
10             if (nanos <= 0)
11                 return false;
12             nanos = notFull.awaitNanos(nanos);    //与offer直接返回false不同,此处使用Condition的超时等待机制实现,超过等待时间如果仍然不满足条件才返回false
13         }
14         insert(e);
15         return true;
16     } finally {
17         lock.unlock();
18     }
19 }

      出队(poll(long timeout, TimeUnit unit))

 1 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
 2     long nanos = unit.toNanos(timeout);
 3     final ReentrantLock lock = this.lock;
 4     lock.lockInterruptibly();
 5     try {
 6         while (count == 0) {
 7             if (nanos <= 0)
 8                 return null;
 9             nanos = notEmpty.awaitNanos(nanos);
10         }
11         return extract();
12     } finally {
13         lock.unlock();
14     }
15 }

  LinkedBlockingQueue

    Executors创建固定大小线程池的代码,就使用了LinkedBlockingQueue来作为任务队列。

    域

 1 /** The capacity bound, or Integer.MAX_VALUE if none */
 2 private final int capacity;    //队列最大容量,默认为Integer.MAX_VALUE
 3 /** Current number of elements */
 4 private final AtomicInteger count = new AtomicInteger(0);    //当前元素数量,原子类保证线程安全
 5 /**
 6  * Head of linked list.
 7  * Invariant: head.item == null
 8  */
 9 private transient Node<E> head;    //队列的首节点,head节点是个空节点,head.item == null,实际存储元素的第一个节点是head.next
10 /**
11  * Tail of linked list.
12  * Invariant: last.next == null
13  */
14 private transient Node<E> last;    //队列的尾节点
15 /** Lock held by take, poll, etc */
16 private final ReentrantLock takeLock = new ReentrantLock();    //出队锁
17 /** Wait queue for waiting takes */
18 private final Condition notEmpty = takeLock.newCondition();    //出队条件
19 /** Lock held by put, offer, etc */
20 private final ReentrantLock putLock = new ReentrantLock();    //入队锁
21 /** Wait queue for waiting puts */
22 private final Condition notFull = putLock.newCondition();    //入队条件

      Node类:

 1 static class Node<E> {
 2     E item;    //元素
 3     /**
 4      * One of:
 5      * - the real successor Node
 6      * - this Node, meaning the successor is head.next
 7      * - null, meaning there is no successor (this is the last node)
 8      */
 9     Node<E> next;    //后继节点,LinkedBlockingQueue使用的是单向链表
10     Node(E x) { item = x; }
11 }

      由LinkedBlockingQueue的域可以看出,它使用链表存储元素。线程间的通信也是使用ReentrantLock和Condition实现的,与ArrayBlockingQueue不同的是,LinkedBlockingQueue在入队和出队操作时分别使用两个锁putLock和takeLock。

      思考问题一:为什么使用两把锁?

      为了提高并发度和吞吐量,使用两把锁,takeLock只负责出队,putLock只负责入队,入队和出队可以同时进行,提高入队和出队操作的效率,增大队列的吞吐量。LinkedBlockingQueue队列的吞吐量通常要高于ArrayBlockingQueue队列,但是在高并发条件下可预测性降低。

      思考问题二:ArrayBlockingQueue中的count是一个普通的int型变量,LinkedBlockingQueue的count为什么是AtomicInteger类型的?

      因为ArrayBlockingQueue的入队和出队操作使用同一把锁,对count的修改都是在处于线程获取锁的情况下进行操作,因此不会有线程安全问题。而LinkedBlockingQueue的入队和出队操作使用的是不同的锁,会有对count变量并发修改的情况,所以使用原子变量保证线程安全。

      思考问题三:像notEmpty、takeLock、count域等都声明为final型,final成员变量有什么特点?

      1)对于一个final变量,如果是基本数据类型的变量,则其数值一旦在初始化之后便不能更改;如果是引用类型的变量,则在对其初始化之后便不能再让其指向另一个对象。

      2)对于一个final成员变量,必须在定义时或者构造器中进行初始化赋值,而且final变量一旦被初始化赋值之后,就不能再被赋值了。只要对象是正确构造的(被构造对象的引用在构造函数中没有“逸出”),那么不需要使用同步(指lock和volatile的使用)就可以保证任意线程都能看到这个final域在构造函数中被初始化之后的值。

    初始化

1 //指定容量,默认为Integer.MAX_VALUE
2 public LinkedBlockingQueue(int capacity) {
3     if (capacity <= 0) throw new IllegalArgumentException();
4     this.capacity = capacity;
5     last = head = new Node<E>(null);    //构造元素为null的head节点,并将last指向head节点
6 }

    put&take 阻塞

      阻塞式入队(put)

 1 public void put(E e) throws InterruptedException {
 2     if (e == null) throw new NullPointerException();
 3     // Note: convention in all put/take/etc is to preset local var 预置本地变量,例如入队锁赋给局部变量putLock
 4     // holding count negative to indicate failure unless set.
 5     int c = -1;
 6     Node<E> node = new Node(e);    //构造新节点
 7     //预置本地变量putLock和count
 8     final ReentrantLock putLock = this.putLock;
 9     final AtomicInteger count = this.count;
10     putLock.lockInterruptibly();    //可中断获取入队锁
11     try {
12         /*
13          * Note that count is used in wait guard even though it is
14          * not protected by lock. This works because count can
15          * only decrease at this point (all other puts are shut
16          * out by lock), and we (or some other waiting put) are
17          * signalled if it ever changes from capacity. Similarly
18          * for all other uses of count in other wait guards.
19          */
20         while (count.get() == capacity) {
21             notFull.await();
22         }
23         enqueue(node);    //在队尾插入node
24         c = count.getAndIncrement();    //count原子方式增1,返回值c为count增长之前的值
25         if (c + 1 < capacity)    //如果队列未满,通知入队线程(notFull条件等待队列中的线程)
26             notFull.signal();
27     } finally {
28         putLock.unlock();    //释放入队锁
29     }
30     //如果入队该元素之前队列中元素数量为0,那么通知出队线程(notEmpty条件等待队列中的线程)
31     if (c == 0)
32         signalNotEmpty();
33 }
34
35 //通知出队线程(notEmpty条件等待队列中的线程)
36 private void signalNotEmpty() {
37     final ReentrantLock takeLock = this.takeLock;
38     takeLock.lock();    //获取出队锁,调用notEmpty条件的方法的前提
39     try {
40         notEmpty.signal();    //唤醒一个等待出队的线程
41     } finally {
42         takeLock.unlock();    //释放出队锁
43     }
44 }
45
46 private void enqueue(Node<E> node) {
47     // assert putLock.isHeldByCurrentThread();
48     // assert last.next == null;
49     last = last.next = node;
50 }

      思考问题一:为什么要再声明一个final局部变量指向putLock和count,直接使用成员变量不行吗?

      直接使用成员变量:每次调用putLock的方法,都需要先通过this指针找到Heap中的Queue实例,然后在根据Queue实例的putLock域引用找到Lock实例,最后才能调用Lock的方法(即将相应的方法信息组装成栈帧压入栈顶)。声明一个final局部变量指向putLock:先通过this指针找到Heap中的Queue实例,将Queue实例的putLock域存储的Lock实例的地址赋给局部变量putLock,以后需要调用putLock的方法时,直接使用局部变量putLock引用就可以找到Lock实例。简化了查找Lock实例的过程。count变量也是同样的道理。个人理解应该是为了提升效率。

      思考问题二:使用两把锁怎么保证元素的可见性?

      例如:入队线程使用put方法在队列尾部插入一个元素,怎么保证出队线程能看到这个元素?ArrayBlockingQueue的入队和出队使用同一个锁,所以没有可见性问题。

      在LinkedBlockingQueue中,每次一个元素入队, 都需要获取putLock和更新count,而出队线程为了保证可见性,需要获取fullyLock(fullyLock方法用于一些批量操作,对全局加锁)或者获取takeLock,然后读取count.get()。因为volatile对象的写操作happen-before读操作,也就是写线程先写的操作对随后的读线程是可见的,volatile相当于一个内存屏障,volatile后面的指令不允许重排序到它之前,而count是原子整型类,是基于volatile变量和CAS机制实现。所以就保证了可见性,写线程修改count-->读线程读取count-->读线程。

      思考问题三:在put方法中,为什么唤醒出队线程的方法signalNotEmpty()要放在释放putLock锁(putLock.unlock())之后?同样,take也有同样的疑问?

      避免死锁的发生,因为signalNotEmpty()方法中要获取takeLock锁。如果放在释放putLock之前,相当于在入队线程需要先获取putLock锁,再获取takeLock锁。例如:当入队线程先获取到putLock锁,并尝试获取takeLock锁,出队线程获取到takeLock锁,并尝试获取putLock锁时,就会产生死锁。

      思考问题四:什么是级联通知?

      比如put操作会调用notEmpty的notify,只会唤醒一个等待的读线程来take,take之后如果发现还有剩余的元素,会继续调用notify,通知下一个线程来获取。

      阻塞式出队(take)  

 1 public E take() throws InterruptedException {
 2     E x;
 3     int c = -1;
 4     final AtomicInteger count = this.count;
 5     final ReentrantLock takeLock = this.takeLock;
 6     takeLock.lockInterruptibly();    //可中断获取出队锁
 7     try {
 8         while (count.get() == 0) {    //如果队列为空,阻塞线程同时释放锁
 9             notEmpty.await();
10         }
11         x = dequeue();    //从队列头弹出元素
12         c = count.getAndDecrement(); //count原子式递减
13         //c>1说明本次出队后,队列中还有元素
14         if (c > 1)
15             notEmpty.signal();    //唤醒一个等待出队的线程
16     } finally {
17         takeLock.unlock();    //释放出队锁
18     }
19     //c == capacity说明本次出队之前是满队列,唤醒一个等待NotFull的线程
20     if (c == capacity)
21         signalNotFull();
22     return x;
23 }
24
25 //唤醒一个等待NotFull条件的线程
26 private void signalNotFull() {
27     final ReentrantLock putLock = this.putLock;
28     putLock.lock();
29     try {
30         notFull.signal();
31     } finally {
32         putLock.unlock();
33     }
34 }
35
36 //从队列头弹出元素
37 private E dequeue() {
38     // assert takeLock.isHeldByCurrentThread();
39     // assert head.item == null;
40     Node<E> h = head;
41     Node<E> first = h.next;
42     h.next = h; // help GC
43     head = first;
44     E x = first.item;
45     first.item = null;
46     return x;
47 }

    需要注意的操作

      1)remove

      删除指定元素 全局加锁

 1 public boolean remove(Object o) {   //正常用不到remove方法,queue正常只使用入队出队操作.
 2     if (o == null) return false;
 3     fullyLock();                    // 两个锁都锁上了,禁止进行入队出队操作.
 4     try {
 5         for (Node<E> trail = head, p = trail.next;
 6              p != null;             // 按顺序一个一个检索,直到p==null
 7              trail = p, p = p.next) {
 8             if (o.equals(p.item)) { //找到了,就删除掉
 9                 unlink(p, trail);   //删除操作是一个unlink方法,意思是p从LinkedList链路中解除.
10                 return true;        //返回删除成功
11             }
12         }
13         return false;
14     } finally {
15         fullyUnlock();
16     }
17 }

      2)contains

      判断是否包含指定的元素 全局加锁

 1 public boolean contains(Object o) {     //这种需要检索的操作都是对全局加锁的,很影响性能,要小心使用!
 2     if (o == null) return false;
 3     fullyLock();
 4     try {
 5         for (Node<E> p = head.next; p != null; p = p.next)
 6             if (o.equals(p.item))
 7                 return true;
 8         return false;
 9     } finally {
10         fullyUnlock();
11     }
12 }

      全局加锁和解锁的方法作为公共的方法供其他需要全局锁的方法调用,避免由于获取锁的顺序不一致导致死锁。另外fullyLock和fullyUnlock两个方法对锁的操作要相反。

 1 /**
 2  * Lock to prevent both puts and takes.
 3  */
 4 void fullyLock() {
 5     putLock.lock();
 6     takeLock.lock();
 7 }
 8
 9 /**
10  * Unlock to allow both puts and takes.
11  */
12 void fullyUnlock() {
13     takeLock.unlock();
14     putLock.unlock();
15 }

      3)迭代器Iterator

      弱一致性,不会不会抛出ConcurrentModificationException异常,不会阻止遍历的时候对queue进行修改操作,可能会遍历到修改操作的结果.

  LinkedBlockingQueue和ArrayBlockingQueue对比

    ArrayBlockingQueue由于其底层基于数组,并且在创建时指定存储的大小,在完成后就会立即在内存分配固定大小容量的数组元素,因此其存储通常有限,故其是一个“有界“的阻塞队列;而LinkedBlockingQueue可以由用户指定最大存储容量,也可以无需指定,如果不指定则最大存储容量将是Integer.MAX_VALUE,即可以看作是一个“无界”的阻塞队列,由于其节点的创建都是动态创建,并且在节点出队列后可以被GC所回收,因此其具有灵活的伸缩性。但是由于ArrayBlockingQueue的有界性,因此其能够更好的对于性能进行预测,而LinkedBlockingQueue由于没有限制大小,当任务非常多的时候,不停地向队列中存储,就有可能导致内存溢出的情况发生。

    其次,ArrayBlockingQueue中在入队列和出队列操作过程中,使用的是同一个lock,所以即使在多核CPU的情况下,其读取和操作的都无法做到并行,而LinkedBlockingQueue的读取和插入操作所使用的锁是两个不同的lock,它们之间的操作互相不受干扰,因此两种操作可以并行完成,故LinkedBlockingQueue的吞吐量要高于ArrayBlockingQueue。

参考资料

  (LinkedBlockingQueue源码分析)http://www.jianshu.com/p/cc2281b1a6bc

  (ArrayBlockingQueue源码分析)http://www.jianshu.com/p/9a652250e0d1

  (源码分析-LinkedBlockingQueue) http://blog.csdn.net/u011518120/article/details/53886256

  (阻塞队列LinkedBlockingQueue源码分析)http://blog.csdn.net/levena/article/details/78322573

  《Java并发编程的艺术》

时间: 2024-10-07 17:45:52

Java并发编程--BlockingQueue的相关文章

java并发编程实战学习(3)--基础构建模块

转自:java并发编程实战 5.3阻塞队列和生产者-消费者模式 BlockingQueue阻塞队列提供可阻塞的put和take方法,以及支持定时的offer和poll方法.如果队列已经满了,那么put方法将阻塞直到空间可用:如果队列为空,那么take方法将阻塞直到有元素可用.队列可以是有界的也可以是无界的. 如果生产者生成工作的速率比消费者处理工作的速率款,那么工作项会在队列中累计起来,最终好紧内存.同样,put方法的阻塞特性也极大地简化了生产者的编码.如果使用有界队列,当队列充满时,生产者将阻

Java并发编程笔记 并发概览

并发概览 >>同步 如何同步多个线程对共享资源的访问是多线程编程中最基本的问题之一.当多个线程并发访问共享数据时会出现数据处于计算中间状态或者不一致的问题,从而影响到程序的正确运行.我们通常把这种情况叫做竞争条件(race condition),把并发访问共享数据的代码叫做关键区域(critical section).同步就是使得多个线程顺序进入关键区域从而避免竞争条件的发生. >>线程安全性 编写线程安全的代码的核心是要对状态访问操作进行管理,尤其是对共享的和可变的状态访问. 线

JAVA并发编程J.U.C学习总结

前言 学习了一段时间J.U.C,打算做个小结,个人感觉总结还是非常重要,要不然总感觉知识点零零散散的. 有错误也欢迎指正,大家共同进步: 另外,转载请注明链接,写篇文章不容易啊,http://www.cnblogs.com/chenpi/p/5614290.html 本文目录如下,基本上涵盖了J.U.C的主要内容: JSR 166及J.U.C Executor框架(线程池. Callable .Future) AbstractQueuedSynchronizer(AQS框架) Locks & C

【转】Java并发编程:线程池的使用

Java并发编程:线程池的使用 在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题: 如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间. 那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务? 在Java中可以通过线程池来达到这样的效果.今天我们就来详细讲解一下Java的线程池,首先我们从最核心的ThreadPool

Java 并发编程之任务取消(八)

处理非正常的线程中止 当单线程的控制台程序由于 发生了一个未捕获的异常而终止时,程序将停止运行,并产生与程序正常输出非常不同的栈追踪信息,这种情况是很容易理解的.然而,如果并发程序中的某个线程发生故障,那么通常不会如此明显.在控制台中可能会输出栈追踪信息,但没有人会观察控制台.此外,当线程发生故障时,应用程序可能看起来仍然 在工作,所以这个失败很可能被忽略.下面要讲的问题就是监测并防止在程序中"遗漏"线程的方法 . 导致线程提前死亡的最主要原因就是RuntimeException. 我

《Java并发编程实战》第八章 线程池的使用 读书笔记

一.在任务与执行策略之间的隐性解耦 有些类型的任务需要明确地指定执行策略,包括: . 依赖性任务.依赖关系对执行策略造成约束,需要注意活跃性问题.要求线程池足够大,确保任务都能放入. . 使用线程封闭机制的任务.需要串行执行. . 对响应时间敏感的任务. . 使用ThreadLocal的任务. 1. 线程饥饿死锁 线程池中如果所有正在执行任务的线程都由于等待其他仍处于工作队列中的任务而阻塞,这种现象称为线程饥饿死锁. 2. 运行时间较长的任务 Java提供了限时版本与无限时版本.例如Thread

12、Java并发编程:阻塞队列

Java并发编程:阻塞队列 在前面几篇文章中,我们讨论了同步容器(Hashtable.Vector),也讨论了并发容器(ConcurrentHashMap.CopyOnWriteArrayList),这些工具都为我们编写多线程程序提供了很大的方便.今天我们来讨论另外一类容器:阻塞队列. 在前面我们接触的队列都是非阻塞队列,比如PriorityQueue.LinkedList(LinkedList是双向链表,它实现了Dequeue接口). 使用非阻塞队列的时候有一个很大问题就是:它不会对当前线程产

《Java并发编程实战》要点笔记及java.util.concurrent 的结构介绍

买了<java并发编程实战>这本书,看了好几遍都不是很懂,这个还是要在实战中找取其中的要点的,后面看到一篇文章笔记做的很不错分享给大家!! 原文地址:http://blog.csdn.net/cdl2008sky/article/details/26377433 Subsections  1.线程安全(Thread safety) 2.锁(lock) 3.共享对象 4.对象组合 5.基础构建模块 6.任务执行 7.取消和关闭 8.线程池的使用 9.性能与可伸缩性 10.并发程序的测试 11.显

Java并发编程(六) 一个日志服务的例子

日志服务需要提供的功能有: 可以从外部安全地开启和关闭日志服务: 可以供多个线程安全地记录日志消息: 在日志服务关闭后,可以把剩余未记录的消息写入日志文件: public class LogService { private final BlockingQueue<String> msgQueue; //阻塞的消息队列保存日志消息 private final PrintWrite writer; //写消息到日志文件 private final LoggerThread logThread;