委托是创建线程安全类的一个最有效策略:只需让现有的线程安全类管理所有的状态即可
Java类库包含丰富的并发基础构建模块,如线程安全的容器以及各种用于协调多个相互协作的线程控制流的同步工具类
- 1. 同步容器类
这些类的方式是:将它们的状态封装起来,并对每个公有方法都进行同步,使得每次只有一个线程能访问容器的状态。
同步容器类都是线程安全的,但在某些情况下可能需要额外的客户端加锁类保护复合操作如:迭代或者条件运算(若没有则添加)。
不过同步容器类是通过其自身的锁来保护它的每个方法,因此通过获得容器类的锁,可以在客户端构造原子操作。
public static Object getLast(Vector list){ synchronized(list){ int lastIndex = list.size() - 1; return list.get(lastIndex); } }
通过在客户端加锁可以解决不可靠的迭代问题,但要牺牲一些伸缩性。在迭代期间导致其他线程无法访问,降低了并发性
//在调用size和get之间存在并发访问 synchronized(vector){ for(int i=0; i<vector.size(); i++){ doSomething(vector.get(i)); } }
同步容器类的迭代器并没有考虑并发修改的问题,且它们表现出的行为是‘及时失败’,即当它们发现容器在迭代过程中被修改时会抛出ConcurrentModificationException。
要想避免ConcurrentModificationException,就必须在迭代过程中持有容器的锁。
如果不想在迭代期间对容器加锁,一种替代的方法就是克隆容器(克隆过程仍然要对容器加锁),在副本上进行迭代。
List<String> list = Collections.synchronizedList(new ArrayList<String>()); //可能抛出ConcurrentModificationException for(String s : list){ doSomething(s); }
调用容器的toString,hsahVode,equals,containsAll,removeAll,retainAll等方法时,以及把容器作为参数的构造函数,都会间接的对容器进行迭代。
所有这些迭代操作都可能抛出ConcurrentModificationException。
- 2. 并发容器
同步容器将所有对容器状态的访问都串行化,以实现他们的线程安全性,这种方法的代价是验证降低并发性。
Java 5.0提供了多种并发容器来改进同步容器的性能。
- 2.1 Queue用来临时保存一组等待处理的元素。它提供几种实现包括:ConcurentLinkedQueue,传统的先进先出队列
BlockingQueue扩展了Queue,增加了可阻塞的插入和获取等操作。即如果队列为空,那么获取元素的操作将一直阻塞,直到队列中出现一个可用的元素;如果队列已满,那么插入元素的操作将一直阻塞,直到队列中出现可用的空间。在“生产者-消费者”模式中,阻塞队列是常用的方式。
- 2.2 ConcurrentHashMap
同步容器类在执行每个操作期间都持有一个锁。在一些操作中,例如:HashMap.get或List.contains可能包含大量的工作,当遍历散列桶或链表来查找某个特定的对象时,必须在许多元素上调用equals(equals本身包含一定的计算量)。在基于散列的容器中,如果hashCode不能很均匀地分布散列值,那么容器中的元素就不会均匀地分布在整个容器中。某些情况下,某个糟糕的散列函数还会把一个散列表变成线性链表。当遍历很长的链表并且在某些或者全部元素上调用equals方法时,会花费相当长的时间,这段时间内其他线程都不能访问该容器。
与HashMap一样,ConcurrentHashMap也是基于散列的Map,但它使用了一种完全不同的加锁策略来提供更高的并发性和伸缩性。ConcurrentHashMap并不是将每个方法都在同一个锁上同步使得每次只能有一个线程访问容器,而是用一种粒度更细的加锁机制来实现更大程度的共享,这种机制称为分段锁。在这种机制中,任意数量的读取线程可以并发的访问Map,执行读取操作的线程和执行写入操作的线程可以并发的访问Map,并且一定数量的写入线程可以并发地修改Map。ConcurrentHashMap带来的结果是,在并发访问环境下将实现更高的吞吐量,而在单线程环境中只损失非常小的性能。
ConcurrentHashMap与其他并发容器一起增强了同步容器类:他们提供的迭代器不会抛出ConcurrentModificationException,因此不需要再迭代过程中对容器加锁。ConcurrentHashMap返回的迭代器具有弱一致性,弱一致性的迭代器可以容忍并发的修改,当创建迭代器时会遍历已有的元素,并可以(但不保证)在迭代器被构造后将修改操作反映给容器。
尽管有这些改进,但仍然有一些需要权衡的因素。对于一些需要在整个Map上进行计算的方法,例如:size和isEmpty,这些方法的语义被略微减弱了以反映容器的并发特性,由于size返回的结果在计算时可能已经过期了,它实际只是一个近似值,但这是允许的,虽然看上去令人有些不安,但事实上size和isEmpty这样的方法在并发环境下的用处很小,因为它们的返回值总在不断变化。从而这些操作的需求被弱化了,以换取对其他更重要操作的性能优化,如get,put,containsKey,remove等。
由于ConcurrentHashMap不能被加锁来执行独占访问,因此无法使用客户端加锁来创建新的原子操作,但一些常见的复合操作,若没有则添加putIfAbsent,若相等则去除,若相等则替换等,都已经实现为原子操作并在ConcurrentMap中声明。
- 2.3 CopyOnWriteArrayList
CopyOnWriteArrayList(CopyOnWriteArraySet类似) 用于替代同步List,在某些情况下提供更好的并发性能,并且在迭代期间不需要对容器进行加锁和复制。
写入时复制容器的线程安全性在于,只要正确地发布一个事实不可变对象,那么在访问该对象时就不再需要进一步的同步。在每次修改时,都会创建并重新发布一个新的容器副本,从而实现可变性。容器的迭代器保留一个指向底层基础数组的引用,这个数组当前位于数组的起始位置,由于它不会被修改,因此在对其进行同步时只需确保数组内容的可见性。因此,多个线程可以同时对这个容器进行迭代,而不会彼此干扰或者与修改容器的线程相互干扰。容器返回的迭代器不会抛出ConCurrentModificationException,并且返回的元素与迭代器创建时的元素完全一致,而不必考虑之后修改所带来的影响。
显然,每当修改容器时都会复制底层数组,这需要一定的开销,尤其当容器的规模较大时。因此,仅当迭代操作远远多于修改操作时,才应该使用“写入时复制”容器。
- 2.4 阻塞队列、生产者-消费者模式
阻塞队列提供可阻塞的put和take方法,以及定时的offer和poll方法。如果队列已经满了,那么put方法阻塞直到有空间可用;如果队列为空,那么take方法将会阻塞直到 有元素可用。队列可用有界也可用无界,无界队列永远都不会充满,因此无界队列的put方法永远也不会阻塞。
阻塞队列支持生产者-消费者模式。该模式将要完成的工作与执行工作两个过程分离开来,这样简化了开发过程,消除了生产者类和消费者类之间的代码依赖性,此外,该模式还将生产数据的过程与使用数据的过程解耦开来以简化工作负载的管理,因为这两个过程在处理数据的速率上有所不同。
一种最常见的生产者-消费者模式就是线程池与工作队列的组合,在Executor任务执行框架中体现了这种模式。
BlockingQueue的几种实现:
LinkedBlockigQueue和ArrayBlockingQueue是FIFO队列,二者区别分别与LinkedList与ArrayList类似,但比同步List拥有更好的并发性。
PriorityBlockingQueue是一个按优先级排序的队列,既可以根据元素的自然顺序来比较元素(元素实现了Comparable方法),也可以使用Comparator来比较。
SynchronousQueue 并不是一个真正的队列,因为它不会为队列中元素维护存储空间。与其他队列不同的是,它维护一组线程,这些线程在等待着把元素加入或移出队列。这种区别就好像将文件直接交给同事还是将文件放到他的邮箱中希望他能尽快拿到文件。因为没有存储功能,因此put和take会一直阻塞,直到有另一个线程已经准备好参与到交付过程中。仅当有足够多的消费者,并且总是有一个消费者准备好获取交付的工作时,才适合使用同步队列。
- 示例:桌面搜索
/** * 在某个文件层次结构中搜索符合索引标准的文件,并将它们的名称放入工作队列。 * 在Indexer中给出了一个消费者任务,即从队列中取出文件名称并对它们建立索引。 * 将文件遍历与建立索引分解为独立操作,每个操作只需完成一个任务,并且阻塞队列将负责所有的控制流,因此每个功能的代码都更加简单清晰。 * 生产者和消费者可以并发地执行。 * 如果一个是I/O密集型,另一个是CPU密集型,那么并发执行的吞吐率要高于串行执行的吞吐率。 * 如果二者并行度不同,那么耦合在一起的结果会把整体并行度降为二者中更小的并行度。 * 由于每个程序都在各自的线程中运行,消费者线程永远不会退出,因而程序无法终止。 */ public class FileCrawler implements Runnable{ private final BlockingQueue<File> fileQueue; private final FileFilter fileFilter; private final File root; //... public void run() { try{ crawl(root); }catch(InterruptedException e){ Thread.currentThread().interrupt(); } } private void crawl(File root) throws InterruptedException{ File[] entries = root.listFiles(fileFilter); if(entries != null){ for(File entry : entries){ if(entry.isDirectory()){ crawl(entry); }else if(!alreadyIndexed(entry)){ fileQueue.put(entry); } } } } } class Indexer implements Runnable{ private final BlockingQueue<File> queue; public Indexer(BlockingQueue<File> queue){ this.queue = queue; } public void run(){ try{ while(true){ indexFile(queue.take()); } }catch(InterruptedException e){ Thread.currentThread().interrupt(); } } }
#笔记内容来自《java并发编程实战》
原文地址:https://www.cnblogs.com/shanhm1991/p/9899045.html