Java 并发编程 - 3

JDK 1.5 之前的同步容器

JDK 1.5 之前, 主要包括:

  • 同步容器 (Vector 和 Hashtable)
  • 同步包装类 (Collections.synchronizedXxx)

这些类的共同特征是, 公共方法都是由 synchronized 来修饰的, 以限制一次只能有一个线程能访问容器.

同步容器中出现的问题

复合操作

老的容器自身并不支持复合操作, 包括:

  1. 迭代(反复获取元素, 直到获得容器中的最后一个元素)
  2. 导航(navigation, 根据一定的顺序寻找下一个元素)
  3. 条件运算(check-then-act)

好在老的容器类遵循一个支持 客户端加锁 的同步策略. 来解决复合运算的问题:

  • 解决迭代和导航:

    synchronized(list) { // 确保调用 size() 后, list 大小不会改变
        for (int i = 0; i < list.size(); ++i) {
            doSomething(list[i]);
        }
    }
  • 解决条件运算:
    synchronized(list) { // 确保调用 size() 后, list 大小不会改变
        int lastIndex = list.size() - 1;
        list.remove(lastIndex);
    }

这样做的弊端是:

做任何操作都要锁住整个容器, 效率低, 容易出错.

迭代器 和 ConcurrentModificationException

Collection进行迭代的标准方法是使用 Iterator, 无论是显式使用还是 通过 JDK 1.5 之后的 for-each 语法.

在 迭代 的时候, 仍有其他线程在并发修改容器的可能性, 使用迭代器仍不可避免地需要在迭代期间对容器加锁.

迭代器在并发修改的时候, 策略是 及时失败(fail-fast) 的: 当发现迭代器被修改后(如: add 和 remove), 会抛出一个未检查的 ConcurrentModificationException.

以 ArrayList 为例子, 其父类 AbstractList 内部有一个字段名为 modCount 的计数器. 任何改变 List 大小的操作都需要改变 modCount 这个值.

这个值会被用来在迭代或者时, 检查有没有修改容器, 套路是这样的:

修改时:

    if (modCount != expectedModCount)
        throw new ConcurrentModificationException();
    }
    // Add or Remove
    // .......
    expectedModCount = modCount;

迭代:

    public E prev/next() {
        if (modCount != expectedModCount)
            throw new ConcurrentModificationException();
        }
        // Other.....
    }

Note: ConcurrentModificationException 也可以出现单线程的代码中, 比如当在迭代期间调用 remove 方法

隐藏的迭代器

有时候, 一些操作会隐含的调用迭代器, 比如:

  1. 调用 toString() 方法, 尤其是写 log 时, 有

    log("Set:" + set);

    这样的语句.

  2. hashCode 和 equals 方法, 以下是 HashTable 的 hashCode 和 equals 方法:
    public synchronized boolean equals(Object o) {
        if (o == this)
        return true;
    
        if (!(o instanceof Map))
            return false;
        Map<?,?> t = (Map<?,?>) o;
        if (t.size() != size())
            return false;
    
        try {
            Iterator<Map.Entry<K,V>> i = entrySet().iterator();
            while (i.hasNext()) {
                Map.Entry<K,V> e = i.next();
                K key = e.getKey();
                V value = e.getValue();
                if (value == null) {
                    if (!(t.get(key)==null && t.containsKey(key)))
                        return false;
                } else {
                    if (!value.equals(t.get(key)))
                        return false;
                }
            }
        } catch (ClassCastException unused)   {
            return false;
        } catch (NullPointerException unused) {
            return false;
        }
    
        return true;
    }
    
    public synchronized int hashCode() {
        int h = 0;
        if (count == 0 || loadFactor < 0)
            return h;  // Returns zero
    
        loadFactor = -loadFactor;  // Mark hashCode computation in progress
        Entry<?,?>[] tab = table;
        for (Entry<?,?> entry : tab) {
            while (entry != null) {
                h += entry.hashCode();
                entry = entry.next;
            }
        }
    
        loadFactor = -loadFactor;  // Mark hashCode computation complete
    
        return h;
    }
  3. 另外 containAllremoveAll 和 retainAll 也会产生迭代.

JDK 1.5 之后的容器

JDK 1.5 后, 新增加了:

  • ConcurrentHashMap, 来替代同步的 Map 实现, 增加了 put-if-absent, 替换和条件删除
  • CopyOnWriteArrayList, 是 List 相应的同步实现
  • Queue, 用来临时保存正在等待进一步处理的一系列元素, 实现包括
    • ConcurrentLinkedQueue, 一个传统的 FIFO 队列
    • PriorityQueue, 一个(非并发)居右优先级顺序的队列
  • BlockingQueue, 拓展自 Queue, 增加了可阻塞的插入和获取操作.
    • 如果队列是空的, 那么获取操作会被阻塞直到有元素存在;
    • 如果队列是满的, 那么插入操作会被阻塞直到有有元素被取出.

JDK 1.6 后, 新增加了

  1. Deque 和 BlockingDeque, 分别扩展了 Queue 和 BlockingQueue:
    • Deque 接口, 实现类是 ArrayDeque, 不阻塞
    • BlockingDeque 接口, 实现类是 LinkedBlockingDeque, 阻塞.
  2. ConcurrentSkipListMap 和 ConcurrentSkipListSet, 作为 SortedMap 和 SortedSet 的并发替代品

Note: 从一个空的Queue中取元素, 并不会阻塞, 而是返回 null

ConcurrentHashMap

在 ConcurrentHashMap 之前, HashTable 和 SynchronizedMap 都是通过给整个方法加 synchronized 来达到同步的, 这样限制某一时刻只有一个线程可以访问容器.

ConcurrentHashMap 使用一个更加细化的锁机制, 名叫分离锁. 这个机制允许更深层次的共享访问:

  • 任意数量的读线程可以并发访问 Map.
  • 读者和写者可以并发访问 Map.
  • 有限数量的写线程可以并发修改 Map.

由于并发环境中, Map 的大小通常是动态的, size 和 isEmpty 返回的只是个估算值(可能返回后接着过期).

支持的复合操作:

  1. put-if-absent
  2. remove-if-equal
  3. replace-if-equal

CopyOnWriteArrayList

写入时复制(COW)容器的线程安全原理:

只要不可边对象被正确发布, 那么访问它将不需要更多的同步.

因此, 每次添加/修改一个元素, 容器内就会新创建一个新的数组, 容器底层的数组会指向这个新数组. 旧数组仍然被使用, 直到没有引用后被 GC 回收.

由于 COW 复制数组有开销, 所以 COW 适用于容器迭代操作远远高于对容器修改的频率.

FAQ: Arrays.copyOf 和 System.arraycopy 区别?

Arrays.copyOf 不仅会复制元素, 还会创建新的数组. System.arrayCopy 拷贝到一个现有数组, Arrays.copyOf 实现中用了 System.arrayCopy;

阻塞队列和生产者-消费者模式

生产者-消费者设计分离了 "识别需要完成的工作" 和 "执行工作". 该模式不会发现一个工作便立即处理, 而是把工作置入一个任务清单中:

  • 生产者不需要知道消费者的身份或者数量, 甚至根本没有消费者.
  • 消费者也不需要知道生产者是谁, 以及是谁给它们安排的工作.
  • 生产者和消费者的关系是相对的, 消费者可以成为下一个任务队列的生产者

最常见的生产者-消费者设计是: 线程池和工作队列的结合

在设计初期就使用阻塞队列建立对资源的管理, 提早做这件事情会比日后再修复容易的多.

Blocking queue 提供了可阻塞的 put 和 take 方法. 常见的实现有:

  1. LinkedBlockedQueue, FIFO, 链表实现, 队列首 take, 队列尾 put.
  2. ArrayBlockingQueue, FIFO, 数组实现, 可以在 putIndex(队列尾) 插入, 从 takeIndex(队列首) 取出.
  3. PriorityBlockingQueue, 根据 Comparator 排序顺序取出
  4. SynchronousQueue, 生产线程直接和消费线程对接, 如果生产线程找不到消费者或反之, 则, put 和 take 会一直阻止. 只有在消费者充足的时候比较适合, 他们总能为下一个任务做好准备.

双端队列和窃取工作

双端队列用来实现 窃取工作(work stealing) 模式.

在传统的 生产者-消费者 设计中, 所有的消费者只共享一个工作队列.

而在 窃取工作 设计中, 每一个消费者都有一个自己的双端队列. 如果一个消费者完成了自己双端队列中的全部工作, 它可以偷取其他消费者的双端队列的 末尾 任务(其他消费者仍然从队列  取任务).

因为工作者线程并不会竞争一个共享的任务队列, 所以 窃取工作 模式比传统的 生产者-消费者 设计有更好的伸缩性.

阻塞和可中断的方法

阻塞: 线程被挂起, 状态变为BLOCKEDWAITING 或是 TIMED_WAITING等待直到一个事件发生才能继续进行.

BlockingQueue 的 put 和 take 方法会抛出一个受检查的 InterruptedException, 这个异常说明这是个阻塞方法, 可以被中断来提前结束阻塞.

处理中断的方法:

  • 传递 InterruptedException. 传递给调用者, 可以对其中特定活动进行简洁地清理后, 再抛出.
  • 恢复中断. 当代码是 Runnable的一部分时, 必须捕获 InterruptedException. 并且, 在当前线程中调用 interrupt 重新设置中断状态(抛出异常会清理中断标志位), 这样调用栈中更高层代码可以发现中断已经发生.
    try {
        processTask(queue.take());
    } catch (InterruptedException e) {
        // 恢复中断状态
        Thread.currentThread().interrupt();
    }

Synchronizer

Synchronizer 是一个对象, 它根据本身的状态调节线程的控制流. 主要类型有:

  1. 信号量(semaphore)
  2. 关卡(barrier)
  3. 闭锁(latch)

他们的特性: 封装状态, 这些状态绝对着线程执行到某一点时是通过还是被迫等待.

闭锁 latch

直到 闭锁 到达 终点状态 之前, 门一直是关闭的, 没有线程能够通过, 在 终点状态 到来的时候, 门开了, 允许所有线程通过. 一旦到了终点状态, 他就 不能 再改变状态了.

用例:

  1. 确保一个计算不会执行, 直到它需要的资源初始化.
  2. 确保一个服务不会开始, 直到它依赖的其他服务都已经开始.
  3. 所有玩家等待就绪, 再开始.

FutureTask

FutureTask 描述了一个抽象的可携带结果的计算. FutureTask的计算通过 Callable 实现.

Callable 等价于一个可携带结果的 RunnableCallable 有三种状态:

  1. 等待
  2. 运行
  3. 完成(包括正常结束, 取消 和 异常)

要获取 FutureTask 的结果, 可以调用 get() 方法. 调用 get() 时, 有两种情况:

  1. 若已经完成, 则直接获取结果
  2. 若还未完成, 则阻塞, 直至任务完成返回结果或者抛出异常.

FutureTask 保证了计算结果将计算线程安全的传递到当前线程.

假如FutureTask执行的任务有异常抛出, 则异常会被封装在 ExecutionException 里. 以下代码可以从 ExecutionException 中取出异常:

    try {
        futureTask.get();
    } catch (ExecutionException e) {
        Throwable cause = e.getCause();
        if (cause instanceOf XXXException) {
            // 自己想要捕获的异常
        } else {
            throw launderThrowable(cause);
        }
    }

    public static RuntimeException launderThrowable(Throwable cause) {
        if (t instanceOf RuntimeException) {
            return (RuntimeException)t;
        } else if (t instanceOf Error) {

        } else {
            throw new IllegalStateException("Not unchecked", t);
        }
    }

信号量 (Semaphore)

计数信号量用来控制能够同时访问某种资源的活动的数量, 或者同时执行某一操作的数量.

使用计数信号量之前需要先构造一个, 构造时可以将许可集(permit)总数传递进去. 在使用计数信号量时, 要先尝试获取(acquire)一个许可, 假如此时有剩余许可则继续执行, 若没有, 则 阻塞. 使用完之后, 要手动释放(release)一个许可.

用处:

  1. 构造一个定长的池.
  2. 构建有界阻塞容器.

关卡 (CyclicBarrier)

关卡用来阻塞一组线程, 直到 所有线程 达到一个条件. 就像一些家庭成员指定商场的一个集合地点:"我们每个人6:00在麦当劳见, 到了以后不见不散, 之后我们再决定接下来做什么".

关卡 与 闭锁 的不同:

关卡: 等待的是其他线程, 可以重复被使用 闭锁: 等待的是事件, 只能使用一次

当一个线程到达关卡点时, 调用 awaitawait 会被阻塞, 直到所有线程都到达关卡点.

  • 如果所有线程都到达了关卡点, 关卡就被成功地突破, 所有线程会被释放.
  • 如果对 await 的调用超时, 或者阻塞中的线程被中断, 那么关卡就被认为是 失败 的.
    • 若某一个线程调用有时限的 await, 那么当这个线程 await 超时, 这个线程会抛出 TimeoutException 异常, 其他调用 barrior.await() 的线程会抛出 BrokenBarrierException;

如果成功地通过关卡, await 为每一个线程返回一个唯一的到达索引号, 可以用来 "选举" 产生一个领导, 在下一次迭代中承担一些特殊工作.

CyclicBarrier 也允许你向构造函数传递一个 关卡行为(Barrier action), 这是一个 Runnable, 当成功通过关卡的时候, 会(在 某一个 子任务线程中) 执行, 但是在阻塞线程被释放之前是不能执行的.

Exchanger

Exchanger 是关卡的另一种形式, 它是一种两步关卡, 在关卡点会交换数据.

时间: 2024-08-06 18:46:36

Java 并发编程 - 3的相关文章

Java并发编程:Concurrent锁机制解析

.title { text-align: center } .todo { font-family: monospace; color: red } .done { color: green } .tag { background-color: #eee; font-family: monospace; padding: 2px; font-size: 80%; font-weight: normal } .timestamp { color: #bebebe } .timestamp-kwd

Java并发编程:Callable、Future和FutureTask(转)

Java并发编程:Callable.Future和FutureTask 在前面的文章中我们讲述了创建线程的2种方式,一种是直接继承Thread,另外一种就是实现Runnable接口. 这2种方式都有一个缺陷就是:在执行完任务之后无法获取执行结果. 如果需要获取执行结果,就必须通过共享变量或者使用线程通信的方式来达到效果,这样使用起来就比较麻烦. 而自从Java 1.5开始,就提供了Callable和Future,通过它们可以在任务执行完毕之后得到任务执行结果. 今天我们就来讨论一下Callabl

Java并发编程 Volatile关键字解析

volatile关键字的两层语义 一旦一个共享变量(类的成员变量.类的静态成员变量)被volatile修饰之后,那么就具备了两层语义: 1)保证了不同线程对这个变量进行操作时的可见性,即一个线程修改了某个变量的值,这新值对其他线程来说是立即可见的. 2)禁止进行指令重排序. 根据volatile的语义,我们可以看到,volatile主要针对的是并发三要素(原子性,可见性和有序性)中的后两者有实际优化作用. 可见性: 线程本身并不直接与主内存进行数据的交互,而是通过线程的工作内存来完成相应的操作.

Java并发编程

synchronized是Java中的关键字,在并发编程中被称为内置锁或者监视器锁.当用它来修饰一个方法或者一个代码块的时候能够保证同一时刻最多只有一个线程执行该段代码. Java的内置锁相当于一种互斥锁,最多只有一个线程能够持有这种锁,故而由这个锁保护的同步代码块会以原子方式执行,多个线程在执行该代码时就不会相互干扰. 但由于被锁保护的同步块代码是以串行形式来访问的,即多个线程以独占的方式访问对象,而这也导致如果被锁保护的同步代码块的作用范围过大,会导致并发不良. 这里有必要简单讲一下内置锁的

6、Java并发编程:volatile关键字解析

Java并发编程:volatile关键字解析 volatile这个关键字可能很多朋友都听说过,或许也都用过.在Java 5之前,它是一个备受争议的关键字,因为在程序中使用它往往会导致出人意料的结果.在Java 5之后,volatile关键字才得以重获生机. volatile关键字虽然从字面上理解起来比较简单,但是要用好不是一件容易的事情.由于volatile关键字是与Java的内存模型有关的,因此在讲述volatile关键之前,我们先来了解一下与内存模型相关的概念和知识,然后分析了volatil

7、Java并发编程:深入剖析ThreadLocal

Java并发编程:深入剖析ThreadLocal 想必很多朋友对ThreadLocal并不陌生,今天我们就来一起探讨下ThreadLocal的使用方法和实现原理.首先,本文先谈一下对ThreadLocal的理解,然后根据ThreadLocal类的源码分析了其实现原理和使用需要注意的地方,最后给出了两个应用场景. 以下是本文目录大纲: 一.对ThreadLocal的理解 二.深入解析ThreadLocal类 三.ThreadLocal的应用场景 若有不正之处请多多谅解,并欢迎批评指正. 请尊重作者

java并发编程实战学习(3)--基础构建模块

转自:java并发编程实战 5.3阻塞队列和生产者-消费者模式 BlockingQueue阻塞队列提供可阻塞的put和take方法,以及支持定时的offer和poll方法.如果队列已经满了,那么put方法将阻塞直到空间可用:如果队列为空,那么take方法将阻塞直到有元素可用.队列可以是有界的也可以是无界的. 如果生产者生成工作的速率比消费者处理工作的速率款,那么工作项会在队列中累计起来,最终好紧内存.同样,put方法的阻塞特性也极大地简化了生产者的编码.如果使用有界队列,当队列充满时,生产者将阻

Java 并发编程之任务取消(九)

Jvm关闭 jvm可正常关闭也可强行关闭,正常关闭有多种触发方式: 当最后一个正常(非守护,下面会讲到什么是守护线程)线程结束时 当调用system.exit时,或者通过其他特定于平台的方法关闭时(例如发送了SIGINT信号或键入Ctrl-c) 通过其他特定平台的方法关闭jvm,调用Runtime.halt或者在操作系统当中杀死JVM进程(例如发送sigkill)来强行关闭jvm. 关闭钩子 在正常关闭中,jvm首先调用所有已注册的关闭钩子,关闭钩子是指通过 Runtime.addShutdow

Java并发编程笔记 并发概览

并发概览 >>同步 如何同步多个线程对共享资源的访问是多线程编程中最基本的问题之一.当多个线程并发访问共享数据时会出现数据处于计算中间状态或者不一致的问题,从而影响到程序的正确运行.我们通常把这种情况叫做竞争条件(race condition),把并发访问共享数据的代码叫做关键区域(critical section).同步就是使得多个线程顺序进入关键区域从而避免竞争条件的发生. >>线程安全性 编写线程安全的代码的核心是要对状态访问操作进行管理,尤其是对共享的和可变的状态访问. 线

Java并发编程:锁的释放

.title { text-align: center } .todo { font-family: monospace; color: red } .done { color: green } .tag { background-color: #eee; font-family: monospace; padding: 2px; font-size: 80%; font-weight: normal } .timestamp { color: #bebebe } .timestamp-kwd