Java 7 并发编程实战手册目录
代码下载(https://github.com/Wang-Jun-Chao/java-concurrency)
第二章线程同步基础
2.1简介
多个执行线程共享一个资源的情景,是最常见的并发编程情景之一。在并发应用中常常遇到这样的情景:多个线程读或者写相同的数据,或者访问相同的文件或数据库连接。 为了防止这些共享资源可能出现的错误或数据不一致,我们必须实现一些机制来防止这些错误的发生。
为了解决这些问题,引入了临界区(Critical Section)概念,临界区是一个用以访问共享资源的代码块,这个代码块在同一时间内只允许一个线程执行。
为了帮助编程人员实现这个临界区,Java (以及大多数编程语言)提供了同步机制。 当一个线程试图访问一个临界区时,它将使用一种同步机制来查看是不是己经有其他线程进入临界区。如果没有其他线程进入临界区,它就可以进入临界区;如果已经有线程进入了临界区,它就被同步机制挂起,直到进入的线程离开这个临界区。如果在等待进入临界区的线程不止一个,JVM会选择其中的一个,其余的将继续等待。
本章将逐步讲解如何使用Java语言提供的两种基本同步机制:
? synchronized关键字机制;
? Lock接口及其实现机制.
2.2使用synchronized实现同步方法
Java的最基本的同步方式,即使用synchronized关键字来控制一个方法的并发访问。如果一个对象己用synchronized关键字声明,那么只有一个执行线程被允许访问它。如果其他某个线程试图访问这个对象的其他方法,它将被挂起, 直到第一个线程执行完正在运行的方法。
换句话说,每一个用synchronized关键字声明的方法都是临界区在Java中,同一个对象的临界区,在同一时间只有一个允许被访问。
静态方法则有不同的行为。用synchronized关键字声明的静态方法,同时只能够被一个执行线程访问,但是其他线程可以访问这个对象的非静态方法。必须非常谨慎这一点, 因为两个线程可以同时访问一个对象的两个不同的synchronized方法,即其中一个是静态方法,另一个是非静态方法。如果两个方法都改变了相同的数据,将会出现数据不一致的错误。
范例将使两个线程访问同一个对象。我们有一个银行账号和两个线程,一个线程将转钱到账户中,另一线程将从账户中取钱。如果方法不同步,账户钱数可能不正确。而同步机制则能确保账户的最终余额是正确的。
采用非同步实现的:
package com.concurrency.task;
/**
* 帐户类,模拟一个银行帐户
*/
public class Account {
/**
* 帐户余额
*/
private double balance;
/**
* 获取帐户余额
*
* @return 帐户余额
*/
public double getBalance() {
return balance;
}
/**
* 设置帐户余额
*
* @param balance 帐户余额
*/
public void setBalance(double balance) {
this.balance = balance;
}
/**
* 增加帐户余额
*
* @param amount 增加的余额
*/
public void addAccount(double amount) {
double tmp = balance;
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
tmp += amount;
balance = tmp;
}
/**
* 减少帐户余额
*
* @param amount 减少的余额
*/
public void subtractAmount(double amount) {
double tmp = balance;
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
tmp -= amount;
balance = tmp;
}
}
package com.concurrency.task;
public class Bank implements Runnable {
/**
* 一个帐户
*/
private Account account;
/**
* 构造函数
*
* @param account 银行帐户
*/
public Bank(Account account) {
this.account = account;
}
@Override
public void run() {
for (int i = 0; i < 100; i++) {
this.account.subtractAmount(1000);
}
}
}
package com.concurrency.task;
public class Company implements Runnable {
/**
* 一个帐户
*/
private Account account;
/**
* 构造函数
* @param account 帐户对象
*/
public Company(Account account) {
this.account = account;
}
@Override
public void run() {
for (int i = 0; i < 100; i++) {
this.account.addAccount(1000);
}
}
}
package com.concurrency.core;
import com.concurrency.task.Account;
import com.concurrency.task.Bank;
import com.concurrency.task.Company;
public class Main {
public static void main(String[] args) {
// 创建一个帐户对象
Account account = new Account();
// 帐户初始值为1000
account.setBalance(1000);
// 创建一个公司对象,并且让公司对象在一个线程中运行
Company company = new Company(account);
Thread companyThread = new Thread(company);
// 创建一个银行对象,并且让银行对象在一个线程中运行
Bank bank = new Bank(account);
Thread bankThread = new Thread(bank);
// 输出初始的余额
System.out.printf("Account : Initial Balance: %f\n", account.getBalance());
// 启动公司和银行两个线程
companyThread.start();
bankThread.start();
try {
// 等待两个线程的完成
companyThread.join();
bankThread.join();
// 输出最后的余额
System.out.printf("Account : Final Balance: %f\n", account.getBalance());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
图2.2-1 非同步结果
采用同步方法
package com.concurrency.task;
/**
* 帐户类,模拟一个银行帐户
*/
public class Account {
/**
* 帐户余额
*/
private double balance;
/**
* 获取帐户余额
*
* @return 帐户余额
*/
public double getBalance() {
return balance;
}
/**
* 设置帐户余额
*
* @param balance 帐户余额
*/
public void setBalance(double balance) {
this.balance = balance;
}
/**
* 增加帐户余额
*
* @param amount 增加的余额
*/
public synchronized void addAccount(double amount) {
double tmp = balance;
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
tmp += amount;
balance = tmp;
}
/**
* 减少帐户余额
*
* @param amount 减少的余额
*/
public synchronized void subtractAmount(double amount) {
double tmp = balance;
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
tmp -= amount;
balance = tmp;
}
}
package com.concurrency.task;
public class Bank implements Runnable {
/**
* 一个帐户
*/
private Account account;
/**
* 构造函数
*
* @param account 银行帐户
*/
public Bank(Account account) {
this.account = account;
}
@Override
public void run() {
for (int i = 0; i < 100; i++) {
this.account.subtractAmount(1000);
}
}
}
package com.concurrency.task;
public class Company implements Runnable {
/**
* 一个帐户
*/
private Account account;
/**
* 构造函数
* @param account 帐户对象
*/
public Company(Account account) {
this.account = account;
}
@Override
public void run() {
for (int i = 0; i < 100; i++) {
this.account.addAccount(1000);
}
}
}
package com.concurrency.core;
import com.concurrency.task.Account;
import com.concurrency.task.Bank;
import com.concurrency.task.Company;
public class Main {
public static void main(String[] args) {
// 创建一个帐户对象
Account account = new Account();
// 帐户初始值为1000
account.setBalance(1000);
// 创建一个公司对象,并且让公司对象在一个线程中运行
Company company = new Company(account);
Thread companyThread = new Thread(company);
// 创建一个银行对象,并且让银行对象在一个线程中运行
Bank bank = new Bank(account);
Thread bankThread = new Thread(bank);
// 输出初始的余额
System.out.printf("Account : Initial Balance: %f\n", account.getBalance());
// 启动公司和银行两个线程
companyThread.start();
bankThread.start();
try {
// 等待两个线程的完成
companyThread.join();
bankThread.join();
// 输出最后的余额
System.out.printf("Account : Final Balance: %f\n", account.getBalance());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
图2.2-2 采用同步方法
在本节中,你已经开发了一个银行账户的模拟应用,它能够对余额进行充值和扣除。 这个程序通过调用100次addAccount()方法对账户进行充值,每次存入1000;然后通过调 用100次subtractAccount()方法对账户余额进行扣除,每次扣除1000;我们期望账户的最终余额与起初余额相等。
范例中通过使用临时变量tmp来存储账户余额,己经制造了一个错误情景:这个临时变量先获取账户余额,然后进行数额累加,之后把最终结果更新为账户余额。此外,范例还通过sleep()方法增加了延时,使得正在执行这个方法的线程休眠10ms,而此时其他某个线程也可能会执行这个方法.因此可能会改变账户余额,引发错误。而synchronized关键字机制避免了这类错误的发生。
如果想査看共享数据的并发访问问题,只需要将addAmounl()和subtractAmount()方法声明中的synchronized关键字删除即可。在没有synchronized关键字的情况下,一个线程读取了账户余额然后进入休眠,这个时候,其他某个线程读取这个账户余额,最终这两个方法都将修改同一个余额,但是其中一个操作不会影响到最终结果。
可以从图2-1看到余额值并不一致。如果多次运行这个程序,你将获取不同的结果。因为JVM并不保证线程的执行顺序,所以每次运行的时候,线程将以不同的顺序读取并且修改账户的余额.造成最终结果也是不同的。
现在,将synchronized关键字恢复到两个方法的声明中,然后运行程序。从图2-2中你可以发现,运行结果跟期望是一致的。即便多次运行这个程序,仍将获取相同的结果。
一个对象的方法采用synchronized关键字进行声明,只能被一个线程访问。如果线程A正在执行一个同步方法syncMethodA(),线程B要执行这个对象 的其他同步方法syncMethodB(),线程B将被阻塞直到线程A访问完。但如果线程B访问的是同一个类的不同对象,那么两个线程都不会被阻塞。
synchronized关键字会降低应用程序的性能,因此只能在并发情景中需要修改共享数据的方法上使用它。如果多个线程访问同一个synchronized方法,则只有一个线程可以 问,其他线程将等待。如果方法声明没有使用synchronized关键字,所有的线程都能在同 一时间执行这个方法,因而总运行时间将降低。如果已知一个方法不会被一个以上线程调 用,则无需使用synchronized关键字声明。
可以递归调用被synchronized声明的方法。当线程访问一个对象的同步方法时,它还可以调用这个对象的其他的同步方法,也包含正在执行的方法,而不必再次去获取这个方法的访问权。
我们可以通过synchronized关键字来保护代码块(而不是整个方法)的访问。应该这样利用synchronized关键字:方法的其余部分保持在synchronized代码块之外,以获取更好的性能。临界区(即同一时间只能被一个线程访问的代码块)的访问应该尽可能的短。 例如在获取一幢楼人数的操作中,我们只使用synchrcrairad关键字来保护对人数更新的指令,并让其他操作不使用共享数据。当这样使用synchnmiMd关键字时,必须把对象引用作为传入参数。同一时间只有一个线程被允许访问这个syndinmized代码。通常来说,我们使用this关键字来引用正在执行的方法所属的对象。
synchronized (this) {
// Java code
}
2.3使用非依赖属性实现同步
当使用synchronized关键字来保护代码块时,必须把对象引用作为传入参数。通常情况下,使用this关键字来引用执行方法所属的对象,也可以使用其他的对象对其进行引用。 一般来说,这些对象就是为这个目的而创建的。例如,在类中有两个非依赖属性,它们被 多个线程共享,你必须同步每一个变量的访问,但是同一时刻只允许一个线程访问一个属性变量,其他某个线程访问另一个属性变量。
在本节中,我们将通过范例学习实现电影院售票场景的编程。这个范例模拟了有两个屏幕和两个售票处的电影院,一个售票处卖出的一张票,只能用于其中一个电影院,不能同时用于两个电影院,因此每个电影院的剩余票数是独立的属性。
package com.concurrency.task;
/**
* 影院类,其有两个影院厅
*/
public class Cinema {
/**
* 保存影院厅1的剩余电影票数
*/
private long vacanciesCinema1;
/**
* 保存影院厅2的剩余电影票数
*/
private long vacanciesCinema2;
/**
* 控制vacanciesCinema1同步访问的对象
*/
private final Object controlCinema1;
/**
* 控制 vacanciesCinema2同步访问的对象
*/
private final Object controlCinema2;
public Cinema() {
controlCinema1 = new Object(); // 初始化同步控制变量
controlCinema2 = new Object(); // 初始化同步控制变量
vacanciesCinema1 = 20; // 设置初始空闲票数
vacanciesCinema2 = 20; // 设置初始空闲票数
}
/**
* 出售影院厅1的门票
*
* @param number 出售的门票张数
* @return true出售成功,false出售失败
*/
public boolean sellTickets1(int number) {
synchronized (controlCinema1) {
if (number < vacanciesCinema1) {
vacanciesCinema1 -= number;
return true;
} else {
return false;
}
}
}
/**
* 出售影院厅2的门票
*
* @param number 出售的门票张数
* @return true出售成功,false出售失败
*/
public boolean sellTickets2(int number) {
synchronized (controlCinema2) {
if (number < vacanciesCinema2) {
vacanciesCinema2 -= number;
return true;
} else {
return false;
}
}
}
/**
* 向售影院厅1退门票
*
* @param number 退的门票张数
* @return true退票成功,总返回true
*/
public boolean returnTickets1(int number) {
synchronized (controlCinema1) {
vacanciesCinema1 += number;
return true;
}
}
/**
* 向售影院厅2退门票
*
* @param number 退的门票张数
* @return true退票成功,总返回true
*/
public boolean returnTickets2(int number) {
synchronized (controlCinema2) {
vacanciesCinema2 += number;
return true;
}
}
/**
* 获取影院厅1剩余的门票数
*
* @return 影院1剩余的门票数
*/
public long getVacanciesCinema1() {
return vacanciesCinema1;
}
/**
* 获取影院厅2剩余的门票数
*
* @return 影院2剩余的门票数
*/
public long getVacanciesCinema2() {
return vacanciesCinema2;
}
}
package com.concurrency.task;
/**
* 售票窗口类,出售1号放映厅的票
*/
public class TicketOffice1 implements Runnable {
/**
* 电影院对象
*/
private Cinema cinema;
/**
* 构造函数
* @param cinema 电影院对象
*/
public TicketOffice1(Cinema cinema) {
this.cinema = cinema;
}
@Override
public void run() {
cinema.sellTickets1(3);
cinema.sellTickets1(2);
cinema.sellTickets2(2);
cinema.returnTickets1(3);
cinema.sellTickets1(5);
cinema.sellTickets2(2);
cinema.sellTickets2(2);
cinema.sellTickets2(2);
}
}
package com.concurrency.task;
/**
* 售票窗口类,出售2号放映厅的票
*/
public class TicketOffice2 implements Runnable {
/**
* 电影院对象
*/
private Cinema cinema;
/**
* 构造函数
*
* @param cinema 电影院对象
*/
public TicketOffice2(Cinema cinema) {
this.cinema = cinema;
}
@Override
public void run() {
cinema.sellTickets2(2);
cinema.sellTickets2(4);
cinema.sellTickets1(2);
cinema.sellTickets1(1);
cinema.returnTickets2(2);
cinema.sellTickets1(3);
cinema.sellTickets2(2);
cinema.sellTickets1(2);
}
}
package com.concurrency.core;
import com.concurrency.task.Cinema;
import com.concurrency.task.TicketOffice1;
import com.concurrency.task.TicketOffice2;
public class Main {
public static void main(String[] args) {
// 创建一个电影院对象
Cinema cinema = new Cinema();
// 创建一个出售一号影院厅票的售票窗口对象,并且让其在一个线程中运行
TicketOffice1 ticketOffice1 = new TicketOffice1(cinema);
Thread thread1 = new Thread(ticketOffice1, "TicketOffice1");
// 创建一个出售二号影院厅票的售票窗口对象,并且让其在一个线程中运行
TicketOffice2 ticketOffice2 = new TicketOffice2(cinema);
Thread thread2 = new Thread(ticketOffice2, "TicketOffice2");
// 启动两个售票窗口线程
thread1.start();
thread2.start();
try {
// 等待两个线程完成
thread1.join();
thread2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
// 输出电影院剩余的票数
System.out.printf("Room 1 Vacancies: %d\n", cinema.getVacanciesCinema1());
System.out.printf("Room 2 Vacancies: %d\n", cinema.getVacanciesCinema2());
}
}
图2.3-1 运行结果
用synchronized关键字保护代码块时,我们使用对象作为它的传入参数。JVM保证同一时间只有一个线程能够访问这个对象的代码保护块(注意我们一直谈论的是对象不是类)。
备注:这个例子使用了一个对象来控制对vacanciesCinema1属性的访问,所以同一时间只有一个线程能够修改这个属性:使用了另一个对象来控制vacanciesCinema2属性的访问,所以同一时间只有一个线程能够修改这个属性。但是,这个例子允许同时运行两个线 程:一个修改vacancesCinemal属性,另一个修改vacanciesCinema2属性。
运行这个范例,可以看到最终结果总是与每个电影院的剩余票数一致。
2.4在同步代码中使用条件
在并发编程中一个典型的问题是生产者-消费者(Producer-Consumer)问题。我们有一个数据缓冲区,一个或者多个数据生产者将把数据存入这个缓冲区,一个或者多个数据消费者将数据从缓冲区中取走。
这个缓冲区是一个共享数据结构,必须使用同步机制控制对它的访问,例如使用 synchronized关键字,但是会受到更多的限制。如果缓冲区是满的,生产者就不能再放入数据,如果缓冲区是空的,消费者就不能读取数据。
对于这些场景,Java在Object类中提供了 wait()、notify()和notifyAll()方法。线程可以在同步代码块中调用wati()方法。如果在同步代码块之外调用wait()方法,JVM将抛出 IllegaMonitorStateException异常。当一个线程调用wait()方法时,JVM将这个线程置入休眠,并且释放控制这个同步代码块的对象,同时允许其他线程执行这个对象控制的其他同步代码块。为了唤醒这个线程,必须在这个对象控制的某个同步代码块中调用notify() 或者notifyAll()方法。
在本节中,我们将通过范例学习实现生产者-消费者问题,这个范例中将使用synchronized 关键字和 wait()、notify()及 notifyAll()方法。
package com.concurrency.task;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
/**
* 事件存储类,生产者会存储事件,消费者会处理存储的事件,一个事件就是一个日期对象
*/
public class EventStorage {
/**
* 最多可以保存的事件数
*/
private int maxSize;
/**
* 存储事件的集合
*/
private List<Date> storage;
/**
* 构造函数
*/
public EventStorage() {
this.maxSize = 10; // 最多可以存储10个事件
this.storage = new LinkedList<Date>(); // 初始化事件存储集合
}
/**
* 同步方法,向事件集合中添加一个事件
*/
public synchronized void set() {
while (this.storage.size() == this.maxSize) { // 如果集合已经满了,就等待
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
this.storage.add(new Date()); // 产生事件
System.out.printf("Set: %d\n", storage.size());
notify(); // 唤醒其它线程
}
/**
* 同步方法,使用处理事件集合中的一个事件
*/
public synchronized void get() {
while (this.storage.size() == 0) { // 如果集合为空就等待
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.printf("Get: %d: %s\n", storage.size(), ((LinkedList<?>) storage).poll()); // 消费一个事件
notify(); // 唤醒其它线程
}
}
package com.concurrency.task;
/**
* 生产者对象,生产事件
*/
public class Producer implements Runnable {
/**
* 事件存储对象
*/
private EventStorage storage;
/**
* 构造函数
*
* @param storage 事件存储对象
*/
public Producer(EventStorage storage) {
this.storage = storage;
}
@Override
public void run() {
for (int i = 0; i < 100; i++) {
this.storage.set(); // 产生事件
}
}
}
package com.concurrency.task;
/**
* 消费者对象,消费事件
*/
public class Consumer implements Runnable {
/**
* 事件存储对象
*/
private EventStorage storage;
/**
* 构造函数
*
* @param storage 事件存储对象
*/
public Consumer(EventStorage storage) {
this.storage = storage;
}
@Override
public void run() {
for (int i = 0; i < 100; i++) {
this.storage.get(); // 消费事件
}
}
}
package com.concurrency.core;
import com.concurrency.task.Consumer;
import com.concurrency.task.EventStorage;
import com.concurrency.task.Producer;
public class Main {
public static void main(String[] args) {
// 创建一个事件存储对象
EventStorage storage = new EventStorage();
// 创建一个事件生产者对象,并且将其放入到一个线程中运行
Producer producer = new Producer(storage);
Thread thread1 = new Thread(producer);
// 创建一个事件消费者对象,并且将其放入到一个线程中运行
Consumer consumer = new Consumer(storage);
Thread thread2 = new Thread(consumer);
// 启动两线程
thread2.start();
thread1.start();
}
}
图2.4-1 部分运行结果
这个范例的主要部分是数据存储EventStorage类的set()和get()方法。首先,set()方法检査存储列表storage是否还有空间,如果己经满了,就调用wait()方法挂起线程并等待空余空间出现。其次,当其他线程调用notilyAll()方法时,挂起的线程将被唤醒并且再次检查这个条件。notifyAll()并不保证哪个线程会被唤醒。这个过程持续进行直到存储列表有空余空间出现,然后生产者将生成一个新的数据并且存入存储列表storage。
get()方法的行为与之类似。首先,get()方法检査存储列表storage是否还有数据,如果没有,就调用wait()方法挂起线程并等待数据的出现。其次,当其他线程调用notifyAll()方法时,挂起的线程将被唤醒并且再次检查这个条件。这个过程持续进行直到存储列表有数据出现。
备注:必须在while循环中调用wait(),并且不断查询while的条件,直到条件为真的时候才能继续。
2.5使用锁实现同步
Java提供了同步代码块的另一种机制,它是一种比synchronized关键字更强大也更灵活的机制。这种机制基于Lock接口及其实现类(例如ReemrantLock),提供了更多的好处。
?支持更灵活的同步代码块结构。使用synchronized关键字时,只能在同一个 synchronized块结构中获取和释放控制。Lock接口允许实现更复杂的临界区结构(即控制的获取和释放不出现在同一个块结构中)。
?相比synchronized关键字,Lock接口提供了更多的功能。其中一个新功能是 tryLock()方法的实现。这个方法试图获取锁,如果锁已被其他线程获取,它将返冋false 并继续往下执行代码。使用synchronized关键字时,如果线程A试图执行一个同步代码块, 而线程B已在执行这个同步代码块,则线程A就会被挂起直到线程B运行完这个同步代码块。使用锁的tryLock()方法,通过返回值将得知是否有其他线程正在使用这个锁保护的代码块。
?Lock接口允许分离读和写操作,允许多个读线程和只有一个写线程。
?相比synchronized关键字,Lock接口具有更好的性能。
在本节中,我们将学习如何使用锁来同步代码,并且使用Lock接口和它的实现类—— ReentrantLock类来创建一个临界区。这个范例将模拟打印队列。
package com.concurrency.task;
public class Job implements Runnable {
/**
* 打印文档的队列
*/
private PrintQueue printQueue;
/**
* 构造函数
*
* @param printQueue 打印文档的队列
*/
public Job(PrintQueue printQueue) {
this.printQueue = printQueue;
}
@Override
public void run() {
System.out.printf("%s: Going to print a document\n", Thread.currentThread().getName());
printQueue.printJob(new Object());
System.out.printf("%s: The document has been printed\n", Thread.currentThread().getName());
}
}
package com.concurrency.task;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* 打印队列类,模拟一个打印队列事件
*/
public class PrintQueue {
/**
* 用于控制队列访问的锁
*/
private final Lock queueLock = new ReentrantLock();
/**
* 打印一个文档
*
* @param object 要打印的文档对象
*/
public void printJob(Object object) {
queueLock.lock(); // 上锁
try {
long duration = (long) (Math.random() * 10000);
System.out.printf("%s: PrintQueue: Printing a Job during %d seconds\n", Thread.currentThread().getName(), (duration / 1000));
Thread.sleep(duration);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
queueLock.unlock(); // 解锁
}
}
}
package com.concurrency.core;
import com.concurrency.task.Job;
import com.concurrency.task.PrintQueue;
public class Main {
public static void main(String[] args) {
// 创建一个打印队列
PrintQueue printQueue = new PrintQueue();
// 创建10个打印线程
Thread thread[] = new Thread[10];
for (int i = 0; i < 10; i++) {
thread[i] = new Thread(new Job(printQueue), "Thread " + i);
}
// 启动线程
for (int i = 0; i < 10; i++) {
thread[i].start();
}
}
}
图2.5-1 运行结果
这个范例的主要部分是打印队列类PrintQueue中的printJob()方法。我们使用锁实现 一个临界区,并且保证同一时间只有一个执行线程访问这个临界区时,必须创建 ReentrantLock对象。在这个临界区的开始,必须通过lock()方法获取对锁的控制。当线程A访问这个方法时,如果 没有其他线程获取对这个锁的控制,lock()方法将让线程A获得得锁并且允许它立刻执行临界区代码。否则,如果其他线程B正在执行这个锁保护的临界区代码,lock()方法将让线程A休眠直到线程B执行完临界区的代码。
在线程离开临界区的时候,我们必须使用unlock()方法来释放它持有的锁,以让其他线程来访问临界区。如果在离开临界区的时候没有调用unlock方法,其他线程将永久地等待,从而导致死锁(Deadlock)情景。如果在临界区使用了try-catch块,不要忘记将unlock方法放入finally部分。
Lock接口(和它的实现类ReentrantLock)还提供了另一个方法来获取锁,即tryLock() 方法。跟lock()方法最大的不同是:线程使用tryLock()不能够获取锁,tryLock()会立即返回,它不会将线程置入休眠。tryLock()方法返回一个布尔值,true表示线程获取了锁,false 表示没有获取锁。
备注:编程人员应该重视tryLock()方法的返回值及其对应的行为。如果这个方法返回 false,则程序不会执行临界区代码。如果执行了,这个应用很可能会出现错误的结果。
ReentrantLock类也允许使用递归调用。如果一个线程获取了锁并且进行了递归调用, 它将继续持有这个锁,因此调用lock()方法后也将立即返回,并且线程将继续执行递归调用。再者,我们还可以调用其他的方法。
必须很小心使用锁,以避免死锁。当两个或者多个线程被阻塞并且它们在等待的锁永远不会被释放时,就会发生死锁。例如,线程A获取了锁X,线程B获取了锁Y,现在线程A试图获取锁Y,同时线程B也试图获取锁X,则两个线程都将被阻塞,而且它们等待的锁永远不会被释放。这个问题就在于两个线程都试图获取对方拥有的锁。
2.6使用读写锁实现同步数据访问
锁机制最大的改进之一就是ReadWriteLock接口和它的唯一实现类ReentrantRead WriteLock。这个类有两个锁,一个是读操作锁,另一个是写操作锁。使用读操作锁时可 以允许多个线程同时访问,但是使用写操作锁时只允许一个线程进行。在一个线程执行写 操作时,其他线程不能够执行读操作。
在本节中,我们将通过范例学习如何使用ReadWriteLock接口编写程序。这个范例将使用ReadWriteLock接口控制对价格对象的访问,价格对象存储了两个产品的价格。
package com.concurrency.task;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* 价格信息类,这个类存储了两个价格,一个写者写这个价格,多个读者读这个价格
*/
public class PricesInfo {
/**
* 两个价格g
*/
private double price1;
private double price2;
/**
* 控制价格访问的锁
*/
private ReadWriteLock lock;
/**
* 构造函数,初始化价格和锁
*/
public PricesInfo() {
this.price1 = 1.0;
this.price2 = 2.0;
this.lock = new ReentrantReadWriteLock();
}
/**
* 获取第一个价格
*
* @return 第一个价格
*/
public double getPrice1() {
lock.readLock().lock();
double value = price1;
lock.readLock().unlock();
return value;
}
/**
* 获取第二个价格
*
* @return 第二个价格
*/
public double getPrice2() {
lock.readLock().lock();
double value = price2;
lock.readLock().unlock();
return value;
}
/**
* 设置两个价格
*
* @param price1 第一个价格
* @param price2 第二个价格
*/
public void setPrices(double price1, double price2) {
lock.writeLock().lock();
this.price1 = price1;
this.price2 = price2;
lock.writeLock().unlock();
}
}
package com.concurrency.task;
/**
* 读者类,消费价格
*/
public class Reader implements Runnable {
/**
* 价格信息对象
*/
private PricesInfo pricesInfo;
/**
* 构造函数
*
* @param pricesInfo 价格信息对象
*/
public Reader(PricesInfo pricesInfo) {
this.pricesInfo = pricesInfo;
}
/**
* 核心方法,消费两个价格,并且将他们输出
*/
@Override
public void run() {
for (int i = 0; i < 10; i++) {
System.out.printf("%s: Price 1: %f\n", Thread.currentThread().getName(), pricesInfo.getPrice1());
System.out.printf("%s: Price 2: %f\n", Thread.currentThread().getName(), pricesInfo.getPrice2());
}
}
}
package com.concurrency.task;
/**
* 写者类,产生价格
*/
public class Writer implements Runnable {
/**
* 价格信息对象
*/
private PricesInfo pricesInfo;
/**
* 构造函数
*
* @param pricesInfo 价格信息对象
*/
public Writer(PricesInfo pricesInfo) {
this.pricesInfo = pricesInfo;
}
/**
* 核心方法,写价格
*/
@Override
public void run() {
for (int i = 0; i < 3; i++) {
System.out.printf("Writer: Attempt to modify the prices.\n");
pricesInfo.setPrices(Math.random() * 10, Math.random() * 8);
System.out.printf("Writer: Prices have been modified.\n");
try {
Thread.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
package com.concurrency.core;
import com.concurrency.task.PricesInfo;
import com.concurrency.task.Reader;
import com.concurrency.task.Writer;
public class Main {
public static void main(String[] args) {
// 创建价格信息对象,用于存储价格
PricesInfo pricesInfo = new PricesInfo();
Reader readers[] = new Reader[5];
Thread threadsReader[] = new Thread[5];
// 创建5个读者并且将其放在不同的线程中远行
for (int i = 0; i < 5; i++) {
readers[i] = new Reader(pricesInfo);
threadsReader[i] = new Thread(readers[i]);
}
// 创建一个写者,并且将其放在一个线程中运行
Writer writer = new Writer(pricesInfo);
Thread threadWriter = new Thread(writer);
// 启动读者写者线程
for (int i = 0; i < 5; i++) {
threadsReader[i].start();
}
threadWriter.start();
}
}
图2.6-1 运行结果
ReentrantReadWriteLock类有两种锁:一种是读操作锁,另一 种是写操作锁。读操作锁是通过ReadWriteLock接口的readLock()方法获取的,这个锁 实现了 Lock接口,所以我们可以使用lock(), unlock()和tryLock()方法。写操作锁是通 过ReadWriteLock接口的writeLock()方法获取的,这个锁同样也实现了 Lock接口,所以我们也可以使用lock()、unlock()和tryLock()方法。编程人员应该确保正确地使用这些锁,使用它们的时候应该符合这些锁的设计初衷。当你获取Lock接口的读锁时,不可以进行修改操作,否则将引起数据不一致的错误。
2.7修改锁的公平性
ReentrantLock和ReentrantReadWriteLock类的构造器都含有一个布尔参数fair,它允许你控制这两个类的行为。默认fair的值是false,它称为非公平模式(Non-Fair Mode)。 在非公平模式下,当有很多线程在等待锁(ReentrantLock和ReentrantReadWriteLock)时,锁将选择它们中的一个来访问临界区,这个选择是没有任何约束的。如果fail的值是true, 则称为公平模式(Fair Mode)。在公平模式下,当有很多线程在等待锁(ReentrantLock和ReentrantReadWriteLock)时,锁将选择它们中的一个来访问临界区,而且选择的是等待时间最长的。这两种模式只适用于lock()和unlock()方法。而Lock接口的tryLock〇方法没有将线程置于休眠,fair属性并不影响这个方法。
在本节中,我们将修改“使用锁实现同步”一节的范例来使用这个属性,并观察公平模式和非公平模式的区别。
package com.concurrency.task;
public class Job implements Runnable {
/**
* 打印文档的队列
*/
private PrintQueue printQueue;
/**
* 构造函数
*
* @param printQueue 打印文档的队列
*/
public Job(PrintQueue printQueue) {
this.printQueue = printQueue;
}
@Override
public void run() {
System.out.printf("%s: Going to print a document\n", Thread.currentThread().getName());
printQueue.printJob(new Object());
System.out.printf("%s: The document has been printed\n", Thread.currentThread().getName());
}
}
package com.concurrency.task;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* 打印队列类,模拟一个打印队列事件
*/
public class PrintQueue {
/**
* 用于控制队列访问的锁,使用公平锁
*/
private final Lock queueLock = new ReentrantLock(true);
/**
* 打印一个文档
*
* @param object 要打印的文档对象
*/
public void printJob(Object object) {
queueLock.lock(); // 上锁
try {
long duration = (long) (Math.random() * 10000);
System.out.printf("%s: PrintQueue: Printing a Job during %d seconds\n", Thread.currentThread().getName(), (duration / 1000));
Thread.sleep(duration);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
queueLock.unlock(); // 解锁
}
queueLock.lock(); // 上锁
try {
long duration = (long) (Math.random() * 10000);
System.out.printf("%s: PrintQueue: Printing a Job during %d seconds\n", Thread.currentThread().getName(), (duration / 1000));
Thread.sleep(duration);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
queueLock.unlock(); // 解锁
}
}
}
package com.concurrency.core;
import com.concurrency.task.Job;
import com.concurrency.task.PrintQueue;
public class Main {
public static void main(String[] args) {
// 创建一个打印队列
PrintQueue printQueue = new PrintQueue();
// 创建10个打印任务并且将其放入到不同的线程中运行
Thread thread[] = new Thread[10];
for (int i = 0; i < 10; i++) {
thread[i] = new Thread(new Job(printQueue), "Thread " + i);
}
// 每隔0.1s运行一个线程,一个10个线程
for (int i = 0; i < 10; i++) {
thread[i].start();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
图2.7-1 使用公平锁运行结果
所有线程创建的间隔是0.1秒。第一个线程请求锁是线程0,然后是线程1,以此类推。 当线程0执行第一个加锁的代码块,其余9个线程将等待获取这个锁,当线程0释放了锁。它立即又请求锁,这个时候就有10个线程试图获取锁。在公平模式下,Lock接口将选择 线程1,因为这个线程等待的时间最久,然后,它选择线程2,然后线程3,以此类推。 在所有线程都执行完第一个被锁保护的代码块之前,它们都没有执行第二个被锁保护的 代码块。
当所有线程执行完第一个加锁代码块之后,又轮到了线程0,然后是线程1,以此类推。现在来看看非公平模式,将传入锁构造器的参数设置为false。在下面的截屏中,你将 看到修改后范例的执行结果。
private final Lock queueLock = new ReentrantLock(false);
图2.7-2 非公平锁运行结果
这里,所有线程是按顺序创建的,每个线程都执行两个被锁保护的代码块。然而,访 问时线程并没有按照创建的先后顺序。如同前面解释的,锁将选择任一个线程并让它访问 锁保护的代码。JVM没有对线程的执行顺序提供保障。
2.8在锁中使用多条件(Multiple Condition)
一个锁可能关联一个或者多个条件,这些条件通过Condition接口声明。目的是允许线程获取锁并且査看等待的某一个条件是否满足,如果不满足就挂起直到某个线程唤醒它们。Condition接口提供了挂起线程和唤起线程的机制。
并发编程中的一个典型问题是生产者-消费者(Producer-Consumer)问题。如本章前面提到的,我们使用一个数据缓冲区,一个或者多个数据生产者(Producer)将数据保存 到缓冲区,一个或者多个数据消费者(Consumer)将数据从缓冲区中取走。
在本节中,我们将通过范例学习并使用锁和条件.来解决生产者-消费者问题。
package com.concurrency.utils;
/**
* 文件模拟类,
*/
public class FileMock {
/**
* 模拟文件的内容
*/
private String[] content;
/**
* 当前需要处理的文件第多少行
*/
private int index;
/**
* 构造函数,随机生成文件的内容
*
* @param size 文件的行数
* @param length 每行文件的字符数
*/
public FileMock(int size, int length) {
this.content = new String[size];
for (int i = 0; i < size; i++) {
StringBuilder builder = new StringBuilder(length);
for (int j = 0; j < length; j++) {
int indice = (int) (Math.random() * 255);
builder.append((char) indice);
}
content[i] = builder.toString();
}
this.index = 0;
}
/**
* 判断是否还有文件的行数需要处理
*
* @return true是,false否
*/
public boolean hasMoreLines() {
return this.index < this.content.length;
}
/**
* 返回下一行的文件内容
*
* @return 有返回文件内容,没有返回false
*/
public String getLine() {
if (this.hasMoreLines()) {
System.out.println("Mock: " + (this.content.length - this.index));
return this.content[this.index++];
}
return null;
}
}
package com.concurrency.utils;
import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class Buffer {
/**
* 集合对象,被当作缓冲使用
*/
private LinkedList<String> buffer;
/**
* 缓冲的最大大小
*/
private int maxSize;
/**
* 控制缓冲访问的锁
*/
private ReentrantLock lock;
/**
* 缓冲中有数据的条件
*/
private Condition lines;
/**
* 缓冲为空的条件
*/
private Condition space;
/**
* 是否追加行
*/
private boolean pendingLines;
/**
* 构造函数,初始化属性
*
* @param maxSize 缓冲最大大小
*/
public Buffer(int maxSize) {
this.maxSize = maxSize;
this.buffer = new LinkedList<>();
this.lock = new ReentrantLock();
this.lines = lock.newCondition();
this.space = lock.newCondition();
this.pendingLines = true;
}
/**
* 向缓冲区中插入一行数据
*
* @param line 一行数据
*/
public void insert(String line) {
lock.lock();
try {
while (this.buffer.size() == this.maxSize) {
this.space.await();
}
this.buffer.offer(line);
System.out.printf("%s: Inserted Line: %d\n", Thread.currentThread().getName(), this.buffer.size());
this.lines.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public String get() {
String line = null;
lock.lock();
try {
while (this.buffer.size() == 0 && hasPendingLines()) {
this.lines.await();
}
if (hasPendingLines()) {
line = this.buffer.poll();
System.out.printf("%s: Line Readed: %d\n", Thread.currentThread().getName(), this.buffer.size());
this.space.signalAll();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
this.lock.unlock();
}
return line;
}
/**
* 设置是否追加行
*
* @param pendingLines true追加,false不追加
*/
public void setPendingLines(boolean pendingLines) {
this.pendingLines = pendingLines;
}
/**
* 判断是否有数据可以进行处理
*
* @return true有数据可进行处理,false无数据可以进行处理
*/
public boolean hasPendingLines() {
return this.pendingLines || this.buffer.size() > 0;
}
}
package com.concurrency.task;
import com.concurrency.utils.Buffer;
import com.concurrency.utils.FileMock;
public class Producer implements Runnable {
/**
* 文件模拟对象
*/
private FileMock mock;
/**
* 缓冲对象
*/
private Buffer buffer;
/**
* 构造函数
*
* @param mock 文件模拟对象
* @param buffer 缓冲对象
*/
public Producer(FileMock mock, Buffer buffer) {
this.mock = mock;
this.buffer = buffer;
}
/**
* 核心方法,读取文件中的数据,并且将读取到的数据插入到缓冲区
*/
@Override
public void run() {
this.buffer.setPendingLines(true);
while (this.mock.hasMoreLines()) {
String line = this.mock.getLine();
this.buffer.insert(line);
}
this.buffer.setPendingLines(false);
}
}
package com.concurrency.task;
import com.concurrency.utils.Buffer;
import java.util.Random;
public class Consumer implements Runnable {
/**
* 缓冲对象
*/
private Buffer buffer;
/**
* 构造函数
*
* @param buffer 缓冲对象
*/
public Consumer(Buffer buffer) {
this.buffer = buffer;
}
/**
* 核心方法,当缓冲中有数据时就进行处理
*/
@Override
public void run() {
while (buffer.hasPendingLines()) {
String line = buffer.get();
processLine(line);
}
}
/**
* 模拟处理一行数据,休眠[0,100)毫秒
*
* @param line 一行数据
*/
private void processLine(String line) {
try {
Random random = new Random();
Thread.sleep(random.nextInt(100));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
package com.concurrency.core;
import com.concurrency.task.Consumer;
import com.concurrency.task.Producer;
import com.concurrency.utils.Buffer;
import com.concurrency.utils.FileMock;
public class Main {
public static void main(String[] args) {
// 创建一个文件模拟对象,它有101行
FileMock mock = new FileMock(101, 10);
// 创建一个缓冲对象,最多可以缓存20行数据
Buffer buffer = new Buffer(20);
// 创建一个生产者对象,并且让它在一个单独的线程中运行
Producer producer = new Producer(mock, buffer);
Thread threadProducer = new Thread(producer, "Producer");
// 创建三个消费者对象,并且个他们分别在不同的线程中运行
Consumer consumers[] = new Consumer[3];
Thread threadConsumers[] = new Thread[3];
for (int i = 0; i < 3; i++) {
consumers[i] = new Consumer(buffer);
threadConsumers[i] = new Thread(consumers[i], "Consumer " + i);
}
// 启动生产者和消费者线程
threadProducer.start();
for (int i = 0; i < 3; i++) {
threadConsumers[i].start();
}
}
}
图2.8-1 部分运行结果
与锁绑定的所有条件对象都是通过Lock接口声明的nenConditionO方法创建的。在使用条件的时候,必须获取这个条件绑定的锁,所以带条件的代码必须在调用lock对象的lock()方法和unlock()方法之间。
当线程调用条件的await()方法时,它将自动释放这个条件绑定的锁,其他某个线程才可以获取这个锁并且执行相同的操作,或者执行这个锁保护的另一个临界区代码。
备注:当一个线程调用了条件对象的signal()或者signallAll()方法后,一个或者多个在该条件上挂起的线程将被唤醒,但这并不能保证让它们挂起的条件己经满足,所以必须在while循环中调用await(),在条件成立之前不能离开这个循环。如果条件不成立,将再次调用await()。
必须小心使用await()和signal()方法。如果调用了一个条件的await()方法,却从不调用它的signal()方法,这个线程将永久休眠。
因调用await()方法进入休眠的线程可能会被中断,所以必须处理InterruptedException异常。
Condition接口还提供了 await()方法的其他形式。
await(long time, TimeUnit unit),直到发生以下情况之一之前,线程将一直处于休眠状态。
?其他某个线程中断当前线程。
?其他某个线程调用了将当前线程挂起的条件的singal()或signalAll()方法。
?指定的等待时间已经过去。
?通过 TimeUnit 类的常量 DAYS、HOURS、MICROSECONDS、MILLISECONDS、 MINUTES、ANOSECONDS和SECONDS指定的等待时间已经过去,
awaitUninterruptibly():它是不可中断的。这个线程将休眠直到其他某个线程调用了将它挂起的条件的singal()或signalAll()方法。
awaitUntil(Date date):直到发生以下情况之一之前,线程将一直处于休眠状态。
?其它某个线程调用了将它挂起的条件的singal()或者signalAll()方法。
?其他某个线程中断当前线程。
?指定的最后期限到了。
也可以将条件与读写锁ReadLock和Writelock —起使用。
欢迎转载,转载请注明出处http://blog.csdn.net/DERRANTCM/article/details/48128815
版权声明:本文为博主原创文章,未经博主允许不得转载。