java.util.concurrent 包含许多线程安全、高性能的并发构建块。换句话讲,创建 java.util.concurrent 的目的就是要实现 Collection 框架对数据结构所执行的并发操作。通过提供一组可靠的、高性能并发构建块,开发人员可以提高并发类的线程安全、可伸缩性、性能、可读性和可靠性。
JDK 5.0 中的并发改进可以分为三组:
1. JVM 级别更改。大多数现代处理器对并发对某一硬件级别提供支持,通常以 compare-and-swap (CAS)指令形式。CAS 是一种低级别的、细粒度的技术,它允许多个线程更新一个内存位置,同时能够检测其他线程的冲突并进行恢复。它是许多高性能并发算法的基础。在 JDK 5.0 之前,Java 语言中用于协调线程之间的访问的惟一原语是同步,同步是更重量级和粗粒度的。公开 CAS 可以开发高度可伸缩的并发 Java 类。这些更改主要由 JDK 库类使用,而不是由开发人员使用。
2. 低级实用程序类 -- 锁定和原子类。使用 CAS 作为并发原语,ReentrantLock 类提供与 synchronized 原语相同的锁定和内存语义,然而这样可以更好地控制锁定(如计时的锁定等待、锁定轮询和可中断的锁定等待)和提供更好的可伸缩性(竞争时的高性能)。大多数开发人员将不再直接使用 ReentrantLock 类,而是使用在 ReentrantLock 类上构建的高级类。
3. 高级实用程序类。这些类实现并发构建块,每个计算机科学文本中都会讲述这些类 -- 信号、互斥、闩锁、屏障、交换程序、线程池和线程安全集合类等。大部分开发人员都可以在应用程序中用这些类,来替换许多(如果不是全部)同步、wait() 和 notify() 的使用,从而提高性能、可读性和正确性。
什么是线程?
进程:独立运行的程序,在某种程度上相互隔离。
线程有时称为轻量级进程。与进程一样,它们拥有通过程序运行的独立的并发路径,并且每个线程都有自己的程序计数器,称为堆栈和本地变量。然而,线程存在于进程中,它们与同一进程内的其他线程共享内存、文件句柄以及进程状态。
今天,几乎所有操作系统都支持线程,允许执行多个可独立调度的线程,以便共存于一个进程中。因为一个进程中的所有线程是在同一个地址空间中执行的,所以多个线程可以同时访问相同对象,并且它们从同一堆栈中分配对象。虽然这使线程更易于与其他线程共享信息,但也意味着您必须确保线程之间不相互干涉。
正确使用线程能带来诸多好处,其中包括更好的资源利用、简化开发、高吞吐量、更易响应的用户界面以及能执行异步处理。
线程的优点:
1. 使用多处理器。 多处理器(MP)系统变得越来越便宜,并且分布越来越广泛。因为调度的基本单位通常是线程,所以不管有多少处理器可用,一个线程的应用程序一次只能在一个处理器上运行。在设计良好的程序中,通过更好地利用可用的计算机资源,多线程能够提高吞吐量和性能。
2. 简化建模。 有效使用线程能够使程序编写变得更简单,并易于维护
3. 异步或后台处理。 服务器应用程序可以同时服务于许多远程客户机。如果应用程序从 socket 中读取数据,并且没有数据可以读取,那么对 read() 的调用将被阻塞,直到有数据可读。在单线程应用程序中,这意味着当某一个线程被阻塞时,不仅处理相应请求要延迟,而且处理所有请求也将延迟。然而,如果每个 socket 都有自己的 IO 线程,那么当一个线程被阻塞时,对其他并发请求行为没有影响。
一些特殊场景:
a)Servlet 和 JavaServer Page 技术
Servlet 容器可以创建多个线程,在多个线程中同时调用给定 servlet,从而处理多个请求。因此 servlet 类必须是线程安全的。
b)RMI
远程方法调用(remote method invocation,RMI)工具允许调用其他 JVM 中运行的操作。实现远程对象最普遍的方法是扩展 UnicastRemoteObject。例示 UnicastRemoteObject 时,它是通过 RMI 调度器注册的,该调度器可能创建一个或多个线程,将在这些线程中执行远程方法。因此,远程类必须是线程安全的。
正如所看到的,即使应用程序没有明确创建线程,也会发生许多可能会从其他线程调用类的情况。幸运的是,java.util.concurrent 中的类可以大大简化编写线程安全类的任务。
例子 -- 非线程安全 servlet
下列 servlet 看起来像无害的留言板 servlet,它保存每个来访者的姓名。然而,该 servlet 不是线程安全的,而这个 servlet 应该是线程安全的。问题在于它使用 HashSet 存储来访者的姓名,HashSet 不是线程安全的类。
当我们说这个 servlet 不是线程安全的时,是说它所造成的破坏不仅仅是丢失留言板输入。在最坏的情况下,留言板数据结构都可能被破坏并且无法恢复。
[html] view plaincopyprint?
- <span style="font-size:14px;">public class UnsafeGuestbookServlet extends HttpServlet {
- private Set visitorSet = new HashSet();
- protected void doGet(HttpServletRequest httpServletRequest,
- HttpServletResponse httpServletResponse) throws ServletException, IOException {
- String visitorName = httpServletRequest.getParameter(‘NAME‘);
- if (visitorName != null)
- visitorSet.add(visitorName);
- }
- }</span>
通过将 visitorSet 的定义更改为下列代码,可以使该类变为线程安全的:
private Set visitorSet = Collections.synchronizedSet(new HashSet());
线程安全集合
JDK 1.2 中引入的 Collection 框架是一种表示对象集合的高度灵活的框架,它使用基本接口 List、Set 和 Map。通过 JDK 提供每个集合的多次实现(HashMap、Hashtable、TreeMap、WeakHashMap、HashSet、TreeSet、Vector、ArrayList、LinkedList 等等)。其中一些集合已经是线程安全的(Hashtable 和 Vector),通过同步的封装工厂(Collections.synchronizedMap()、synchronizedList() 和 synchronizedSet()),其余的集合均可表现为线程安全的。
java.util.concurrent 包添加了多个新的线程安全集合类(ConcurrentHashMap、CopyOnWriteArrayList 和 CopyOnWriteArraySet)。这些类的目的是提供高性能、线程安全的基本集合类。
java.util 中的线程集合仍有一些缺点。例如,在迭代锁定时,通常需要将该锁定保留在集合中,否则,会有抛出 ConcurrentModificationException 的危险。(这个特性有时称为条件线程安全;有关的更多说明,请参阅参考资料。)此外,如果从多个线程频繁地访问集合,则常常不能很好地执行这些类。java.util.concurrent 中的新集合类允许通过在语义中的少量更改来获得更高的并发。
JDK 5.0 还提供了两个新集合接口 -- Queue 和 BlockingQueue。Queue 接口与 List 类似,但它只允许从后面插入,从前面删除。通过消除 List 的随机访问要求,可以创建比现有 ArrayList 和 LinkedList 实现性能更好的 Queue 实现。因为 List 的许多应用程序实际上不需要随机访问,所以Queue 通常可以替代 List,来获得更好的性能。
CopyOnWriteArrayList 和 CopyOnWriteArraySet
可以用两种方法创建线程安全支持数据的 List -- Vector 或封装 ArrayList 和 Collections.synchronizedList()。java.util.concurrent 包添加了名称繁琐的 CopyOnWriteArrayList。为什么我们想要新的线程安全的List类?为什么Vector还不够?
最简单的答案是与迭代和并发修改之间的交互有关。使用 Vector 或使用同步的 List 封装器,返回的迭代器是 fail-fast 的,这意味着如果在迭代过程中任何其他线程修改 List,迭代可能失败。
Vector的非常普遍的应用程序是存储通过组件注册的监听器的列表。当发生适合的事件时,该组件将在监听器的列表中迭代,调用每个监听器。为了防止ConcurrentModificationException,迭代线程必须复制列表或锁定列表,以便进行整体迭代,而这两种情况都需要大量的性能成本。
CopyOnWriteArrayList类通过每次添加或删除元素时创建支持数组的新副本,避免了这个问题,但是进行中的迭代保持对创建迭代器时的当前副本进行操作。虽然复制也会有一些成本,但是在许多情况下,迭代要比修改多得多,在这些情况下,写入时复制要比其他备用方法具有更好的性能和并发性。
如果应用程序需要 Set 语义,而不是 List,那么还有一个 Set 版本 -- CopyOnWriteArraySet。
ConcurrentHashMap
正如已经存在线程安全的 List 的实现,您可以用多种方法创建线程安全的、基于 hash 的 Map -- Hashtable,并使用 Collections.synchronizedMap() 封装 HashMap。JDK 5.0 添加了 ConcurrentHashMap 实现,该实现提供了相同的基本线程安全的 Map 功能,但它大大提高了并发性。
Hashtable 和 synchronizedMap 所采取的获得同步的简单方法(同步 Hashtable 中或者同步的 Map 封装器对象中的每个方法)有两个主要的不足。首先,这种方法对于可伸缩性是一种障碍,因为一次只能有一个线程可以访问 hash 表。同时,这样仍不足以提供真正的线程安全性,许多公用的混合操作仍然需要额外的同步。虽然诸如 get() 和 put() 之类的简单操作可以在不需要额外同步的情况下安全地完成,但还是有一些公用的操作序列,例如迭代或者 put-if-absent(空则放入),需要外部的同步,以避免数据争用。
Hashtable 和 Collections.synchronizedMap, 通过同步每个方法获得线程安全。这意味着当一个线程执行一个 Map 方法时,无论其他线程要对 Map 进行什么样操作,都不能执行,直到第一个线程结束才可以。
对比来说,ConcurrentHashMap 允许多个读取几乎总是并发执行,读和写操作通常并发执行,多个同时写入经常并发执行。结果是当多个线程需要访问同一 Map 时,可以获得更高的并发性。
在大多数情况下,ConcurrentHashMap 是 Hashtable或 Collections.synchronizedMap(new HashMap()) 的简单替换。然而,其中有一个显著不同,即 ConcurrentHashMap 实例中的同步不锁定映射进行独占使用。实际上,没有办法锁定 ConcurrentHashMap 进行独占使用,它被设计用于进行并发访问。为了使集合不被锁定进行独占使用,还提供了公用的混合操作的其他(原子)方法,如 put-if-absent。ConcurrentHashMap 返回的迭代器是弱一致的,意味着它们将不抛出ConcurrentModificationException ,将进行‘合理操作‘来反映迭代过程中其他线程对 Map 的修改。
队列
原始集合框架包含三个接口:List、Map 和 Set。List 描述了元素的有序集合,支持随即访问 -- 可以在任何位置添加、提取或删除元素。
LinkedList 类经常用于存储工作元素(等待执行的任务)的列表或队列。然而,List 提供的灵活性比该公用应用程序所需要的多得多,这个应用程序通常在后面插入元素,从前面删除元素。但是要支持完整 List 接口则意味着 LinkedList 对于这项任务不像原来那样有效。Queue 接口比 List 简单得多,仅包含 put() 和 take() 方法,并允许比 LinkedList 更有效的实现。
Queue 接口还允许实现来确定存储元素的顺序。ConcurrentLinkedQueue 类实现先进先出(first-in-first-out,FIFO)队列,而 PriorityQueue 类实现优先级队列(也称为堆),它对于构建调度器非常有用,调度器必须按优先级或预期的执行时间执行任务。
[html] view plaincopyprint?
- interface Queue extends Collection {
- boolean offer(E x);
- E poll();
- E remove() throws NoSuchElementException;
- E peek();
- E element() throws NoSuchElementException;
- }
实现 Queue 的类是:
1. LinkedList 已经进行了改进来实现 Queue。
2. PriorityQueue 非线程安全的优先级对列(堆)实现,根据自然顺序或比较器返回元素。
3. ConcurrentLinkedQueue 快速、线程安全的、无阻塞 FIFO 队列。
线程创建
通过例示从 Thread 获得的对象并调用 Thread.start() 方法来创建线程。可以用两种方法创建线程:通过扩展 Thread 和覆盖 run() 方法,或者通过实现 Runnable 接口和使用 Thread(Runnable) 构造函数:
[html] view plaincopyprint?
- class WorkerThread extends Thread {
- public void run() { /* do work */ }
- }
- Thread t = new WorkerThread();
- t.start();
- 或者:
- Thread t = new Thread(new Runnable() {
- public void run() { /* do work */ }
- }
- t.start();
如何不对任务进行管理
大多数服务器应用程序(如 Web 服务器、POP 服务器、数据库服务器或文件服务器)代表远程客户机处理请求,这些客户机通常使用 socket 连接到服务器。对于每个请求,通常要进行少量处理(获得该文件的代码块,并将其发送回 socket),但是可能会有大量(且不受限制)的客户机请求服务。
用于构建服务器应用程序的简单化模型会为每个请求创建新的线程。下列代码段实现简单的 Web 服务器,它接受端口 80 的 socket 连接,并创建新的线程来处理请求。不幸的是,该代码不是实现 Web 服务器的好方法,因为在重负载条件下它将失败,停止整台服务器。
[html] view plaincopyprint?
- class UnreliableWebServer {
- public static void main(String[] args) {
- ServerSocket socket = new ServerSocket(80);
- while (true) {
- final Socket connection = socket.accept();
- Runnable r = new Runnable() {
- public void run() {
- handleRequest(connection);
- }
- };
- // Don‘t do this!
- new Thread(r).start();
- }
- }
- }
当服务器被请求吞没时,UnreliableWebServer 类不能很好地处理这种情况。每次有请求时,就会创建新的类。根据操作系统和可用内存,可以创建的线程数是有限的。
不幸的是,您通常不知道限制是多少 -- 只有当应用程序因为 OutOfMemoryError 而崩溃时才发现。
如果足够快地在这台服务器上抛出请求的话,最终其中一个线程创建将失败,生成的 Error 会关闭整个应用程序。当一次仅能有效支持很少线程时,没有必要创建上千个
线程,无论如何,这样使用资源可能会损害性能。创建线程会使用相当一部分内存,其中包括有两个堆栈(Java 和 C),以及每线程数据结构。如果创建过多线程,其中
每个线程都将占用一些 CPU 时间,结果将使用许多内存来支持大量线程,每个线程都运行得很慢。这样就无法很好地使用计算资源。
使用线程池解决问题
为任务创建新的线程并不一定不好,但是如果创建任务的频率高,而任务平均持续时间低,我们可以看到每项任务创建一个新的线程将产生性能(如果负载不可预知,还有稳定性)问题。
如果不是每项任务创建一个新的线程,则服务器应用程序必须采取一些方法来限制一次可以处理的请求数。这意味着每次需要启动新的任务时,它不能仅调用下列代码。
new Thread(runnable).start()
管理一大组小任务的标准机制是组合工作队列和线程池。工作队列就是要处理的任务的队列,前面描述的 Queue 类完全适合。线程池是线程的集合,每个线程都提取公用工作队列。当一个工作线程完成任务处理后,它会返回队列,查看是否有其他任务需要处理。如果有,它会转移到下一个任务,并开始处理。
线程池为线程生命周期间接成本问题和资源崩溃问题提供了解决方案。通过对多个任务重新使用线程,创建线程的间接成本将分布到多个任务中。作为一种额外好处,因为请求到达时,线程已经存在,从而可以消除由创建线程引起的延迟。因此,可以立即处理请求,使应用程序更易响应。而且,通过正确调整线程池中的线程数,可以强制超出特定限制的任何请求等待,直到有线程可以处理它,它们等待时所消耗的资源要少于使用额外线程所消耗的资源,这样可以防止资源崩溃。
Executor 框架
java.util.concurrent 包中包含灵活的线程池实现,但是更重要的是,它包含用于管理实现 Runnable 的任务的执行的整个框架。该框架称为 Executor 框架。
Executor 接口相当简单。它描述将运行 Runnable 的对象:
[html] view plaincopyprint?
- public interface Executor {
- void execute(Runnable command);
- }
任务运行于哪个线程不是由该接口指定的,这取决于使用的 Executor 的实现。
java.util.concurrent 中的大多数 Executor 实现还实现 ExecutorService 接口,这是对 Executor 的扩展,它还管理执行服务的生命周期。这使它们更易于管理,并向生命可能比单独 Executor 的生命更长的应用程序提供服务。
[html] view plaincopyprint?
- public interface ExecutorService extends Executor {
- void shutdown();
- List shutdownNow();
- boolean isShutdown();
- boolean isTerminated();
- boolean awaitTermination(long timeout,
- TimeUnit unit);
- // other convenience methods for submitting tasks
- }
java.util.concurrent 包含多个 Executor 实现,每个实现都实现不同的执行策略。什么是执行策略?执行策略定义何时在哪个线程中运行任务,执行任务可能消耗的资源级别(线程、内存等等),以及如果执行程序超载该怎么办。
执行程序通常通过工厂方法示例,而不是通过构造函数。Executors 类包含用于构造许多不同类型的 Executor 实现的静态工厂方法:
1. Executors.newCachedThreadPool() 创建不限制大小的线程池,但是当以前创建的线程可以使用时将重新使用那些线程。如果没有现有线程可用,
将创建新的线程并将其添加到池中。使用不到 60 秒的线程将终止并从缓存中删除。
2. Executors.newFixedThreadPool(int n) 创建线程池,其重新使用在不受限制的队列之外运行的固定线程组。在关闭前,所有线程都会因为执行
过程中的失败而终止,如果需要执行后续任务,将会有新的线程来代替这些线程。
3. Executors.newSingleThreadExecutor() 创建单一工作线程,与 Swing 事件线程非常相似。保证顺序执行任务,在任何给定时间,不会有多个任务处于活动状态。
之前的例子可以做简单调整,只需将 Thread.start() 调用替换为向 Executor 提交任务即可:
[html] view plaincopyprint?
- class ReliableWebServer {
- Executor pool = Executors.newFixedThreadPool(7);
- public static void main(String[] args) {
- ServerSocket socket = new ServerSocket(80);
- while (true) {
- final Socket connection = socket.accept();
- Runnable r = new Runnable() {
- public void run() {
- handleRequest(connection);
- }
- };
- pool.execute(r);
- }
- }
- }