Java并发原语——线程、互斥与同步

本文将介绍:

  • Java线程基本操作(创建、等待等)
  • Java线程同步原语(同步、互斥)

如果你对以上话题已了如指掌,请止步。

Java线程基本操作

Java的线程API以java.lang.Thread类提供,线程的基本操作被封装为为Thread类的方法,其中常用的方法是:

  方法 说明
void start() 启动线程
void join() 等待线程结束

创建(启动)线程

Java中,创建线程的过程分为两步:

  1. 创建可执行(Runnable)的线程对象;
  2. 调用它的start()方法;

可执行的线程对象,即可以调用start()启动的线程对象;而创建可执行的线程对象有两种方法:

  1. 继承(extends)Thread类,重载run()方法;
  2. 实现(implements)Runnable接口(实现run()方法);

两种创建线程的对象的代码实例如下:

继承Thread类

继承Thread类创建线程,如下:

class ExtendsThread extends Thread {
	@Override
	public void run() {
		for (int i = 0; i < 100; ++i) {
			System.out.print("*");
			try {
				Thread.sleep(100);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
}

public class TestExtendsThread {
	public static void main(String[] args) {
		// 1.创建线程对象
		Thread backThread = new ExtendsThread(); 

		// 2.启动线程
		backThread.start(); 

		for(int i=0; i < 100; ++i) {
			System.out.print("#");
			try {
				Thread.sleep(100);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
}
	

该程序打印出的*和#是交替的;这说明backThread的run()和主线程同时在执行!当然,如果一个线程的代码不是多次重复使用,可以将该线程写成“匿名内部类”的形式:

public class TestExtendsThread {
	public static void main(String[] args) {
		new Thread() {
			public void run() {
				for (int i = 0; i < 100; ++i) {
					System.out.print("*");
					try {
						Thread.sleep(100);
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
			}
		}.start();

		for (int i = 0; i < 100; ++i) {
			System.out.print("#");
			try {
				Thread.sleep(100);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
}
	

实现Runnable接口

Java中创建线程对象的另一种方法是:实现Runnable接口,再用具体类的实例作为Thread的参数构造线程,代码如下:

class RunnableImpl implements Runnable {
	@Override
	public void run() {
		for(int i=0; i < 100; ++i) {
			System.out.print("*");
			try {
				Thread.sleep(100);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
}

public class TestImlementsRunnable {
	public static void main(String[] args) {
		Runnable callback = new RunnableImpl();
		Thread backThread = new Thread(callback);
		backThread.start(); // 启动线程

		for(int i=0; i < 100; ++i) {
			System.out.print("#");
			try {
				Thread.sleep(100);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
}
	

类似地,RunnableImpl若是不被复用,也可写成“匿名内部类”的形式:

public class TestImlementsRunnable {

	public static void main(String[] args) {
		new Thread(new Runnable() {
					@Override
					public void run() {
						for(int i=0; i < 100; ++i) {
							System.out.print("*");
							try {
								Thread.sleep(100);
							} catch (InterruptedException e) {
								e.printStackTrace();
							}
						}
					}
				}).start(); 

		for(int i=0; i < 100; ++i) {
			System.out.print("#");
			try {
				Thread.sleep(100);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}

}
	

这两种方法都实现了run()方法,而Thread的start()方法会调用传入的Runnable对象的run()方法(或是调用自己的run方法)。 run()在这里的作用就是为新线程提供一个入口,或者说run描述了新线程将来要“干什么”;相当于一些C库的回调函数。

等待线程结束

Thread的join()方法提供了“等待线程结束”的功能,Java的主线程默认会等待其他线程的结束。Thread.join()提供的是:一个线程等待另一个线程的功能;例如,在main方法(主线程)中调用 backThread.join();则主线程将会在调用处等待,直到backThread执行完毕。如下代码是典型的start和join的使用顺序:

// in main()
Runnable r = new Runnable() {
    public void run() {
        // ...
    }
};
Thread back = new Thread(r);

back.start();
back.join();
	

这段代码对应的序列图如下:

start()的作用是启动一个线程(程序执行流),使得调用处的执行流程一分为二;而join()的作用则与start相反,使得两个执行流程“合二为一”,如下图所示:

两个线程和几个方法执行时间的先后关系,执行流程先“一分为二”和“合二为一”。

互斥

Java的互斥语义由synchronized关键字提供,具体有两种:

  1. synchronized代码块
  2. synchronized方法

下面分别介绍。

为什么需要互斥?

由于本文的定位为多线程编程入门,所以顺便介绍一下为什么会有互斥问题。

猜测下面的程序的输出:

public class NonAtomic {

	static int count = 0;

	public static void main(String[] args) {
		Thread back = new Thread() {
			@Override
			public void run() {
				for(int i=0; i<10000; ++i) {
					++count;
				}
			}
		};

		back.start();

		for(int i=0; i<10000; ++i) {
			++count;
		}

		try {
			back.join(); // wait for back thread finish.
		} catch (InterruptedException e) {
			e.printStackTrace();
		}

		System.out.println(count);
	}

}
	

这个程序并不能像想象中的那样输出20000,而总是小了一些。为什么会这样?因为++count;操作并不是“原子性”的,即不是一条指令就能完成的功能。在多数体系结构上,实现内存中的整数“自增”操作至少需要三步:

  1. 从内存中读数据到寄存器
  2. 在寄存器内加一
  3. 写回内存

一种可能的两个线程同时执行“自增”的情形如下:

在这幅图中,A、B两个线程同时对value执行“自增”,预期的value值应该是11,而实际的value值却是10。

由此可见,要保证多线程环境下“自增”操作的正确性,就必须保证以上三个操作“一次性执行”而不被其他线程干扰,这就是所谓的“原子性”。

synchronized代码块

synchronized代码块的形式如下:

	synchronized(obj)
	{
		// do something.
	}
	

这段代码保证了花括号内代码的“原子性”,就是说两个线程同时执行这一代码块的时候会表现出“要么都不执行,要么全部执行”的特性,即“互斥执行”。两个使用同一obj的synchronized代码块也同样具有“互斥执行”的特性。

只需将上面的NonAtomic稍作修改:

// static int count = 0; 后加一行:
static Object lock = new Object(); 

// ++count改为:
synchronized(lock) {
	++count;
}
	

就能保证程序的输出为20000。

synchronized方法

synchronized代码块通常是方法内的一部分,如果整个方法体都用synchronized锁定,且对象this,如果整个方法体都需要用synchronized(this)锁定,那么也可以用synchronized关键字修饰这个方法。

就是说,这个方法:

	public synchronized void someMethod() {
		// do something...
	}
	

等价于:

	public void someMethod() {
		synchronized(this) {
			// do something...
		}
	}
	

同步

通俗地说,“同步”就是保证两个线程事件的时序(先后)关系,这在多线程环境下非常有用。例如,两个线程A, B正在执行一系列工作Ai, Bi,现在想要使得Ap发生在Bq之前,就需要使用“同步原语”:

支持“同步”操作的调用叫做“同步原语”,在多数《操作系统》教材中,这种原语通常被定义为条件变量(condition variable)。

Java的同步原语为java.lang.Object类的几个方法:

  1. wait() 等待通知,该调用会阻塞当前线程。
  2. notify() 发出通知,如果有多个线程阻塞在该obj上,该调用会唤醒一个(阻塞)等待该obj的线程。
  3. notifyAll()发出通知,如果有多个线程阻塞在该obj上,该调用会唤醒所有(阻塞)等待该obj的线程。

notify()通常用于通知“有资源可用”;例如,生产者——消费者模型中,缓冲区为空时,消费者线程等待新产品的到来,此时生产者线程生产一个产品后可用notify()通知消费者线程。

notifyAll()通常用于通知“状态改变”,例如,一个多线程测试程序中,多个后台线程被创建后,全都等待主线程发出“开始测试”的命令,此时主线程可用notifyAll()通知各个测试线程。

例如如下代码,模拟运动员起跑过程:首先,发令员等待个运动员就绪;然后发令员一声枪响,所有运动员起跑;

public class TestStartRunning {

	static final int NUM_ATHLETES = 10; 

	static int readyCount = 0;
	static Object ready = new Object();
	static Object start = new Object();

	public static void main(String[] args) {
		Thread[] athletes = new Thread[NUM_ATHLETES];

		// 创建运动员
		for (int i = 0; i < athletes.length; ++i) {
			final int num = i;
			athletes[i] = new Thread() {
				@Override
				public void run() {
					System.out.println(Thread.currentThread().getName() + " ready!");

					synchronized (ready) {
						++readyCount;
						ready.notify(); // 通知发令员,“I'm ready!”
					}

					// 等待发令枪响
					try {
						synchronized (start) {
							start.wait();
						}
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
					System.out.println(Thread.currentThread().getName() + " go!");
				}
			};
		}

		// 运动员上场
		for (int i = 0; i < athletes.length; ++i)
			athletes[i].start();

		// 主线程充当裁判员角色
		try {
			synchronized (ready) {
				// 等待所有运动员就位
				while (readyCount < athletes.length) {
					ready.wait();
				}
			}
		} catch (Exception e) {
			e.printStackTrace();
		}

		System.out.println(Thread.currentThread().getName() + " START!");
		synchronized (start) {
			start.notifyAll(); // 打响发令枪
		}
	}
}

信号丢失

wait/notify/notifyAll提供了一种线程间事件通知的方式,但这种通知并不能被有效的“记住”;所以,就存在通知丢失(notify missing)的可能——发出通知的线程先notify,接收通知的线程后wait,此时这个事先发出的通知就会丢失。在POSIX规范上,叫做信号丢失;由于现在的多数操作系统(LINUX,Mac,Unix)都遵循POSIX;所以“信号丢失”这个词使用的更广泛。

如下是一个演示通知丢失的代码:

public class TestNotifyMissing {
	static Object cond = new Object();
	public static void main(String[] args) {
		new Thread() {
			public void run() {
				try {
					Thread.sleep(1000); 

					System.out.println("[back] wait for notify...");
					synchronized (cond) {
						cond.wait();
					}
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				System.out.println("[back] wakeup");
			}
		}.start();

		System.out.println("[main] notify");
		synchronized (cond) {
			cond.notify();
		}
	}
}

这个程序不能正常退出,后台线程因为错过了主线程发出的通知而一直在后台等待,程序也不会输出“[back] wake up”。

通俗地说,wait/notify只是一种口头交流,如果你没有听到,就会错过(而不像邮件、公告板,你收到通知的时间可以比别人发出的时间晚)。

如何避免通知丢失呢?由于notify本身不具备“记忆”,所以可以使用额外的变量作为“公告板”;在notify之前修改这个“公告板”;这样,即便其他线程调用wait的时间晚于notify的时间,也能看到写在“公共板”上的通知。

这同时也解释了另外一个语言设计上的问题:为什么Java的wait和notify端都必须要用synchronized锁定?首先,这不是语法级别的规定,不这么写也能编译通过,只是运行时会抛异常;这是JVM的一种运行时安全检查机制,这种机制是在提醒我们——应该使用额外的变量来防止产生通知丢失。例如刚才的NotifyMissing只需稍作修改就能够正常结束

public class TestNotifyMissingSolution {
	static boolean notified = false; // +++++
	static Object cond = new Object();

	public static void main(String[] args) {
		new Thread() {
			public void run() {
				try {
					Thread.sleep(1000);

					System.out.println("[back] wait for notify...");
					synchronized (cond) {
						while(!notified) // +++++
							cond.wait();
					}
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				System.out.println("[back] wakeup");
			}
		}.start();

		System.out.println("[main] notify");
		synchronized (cond) {
			notified = true; // +++++
			cond.notify();
		}
		System.out.println("[main] notified");
	}
}

虚假唤醒

在例子TestNotifyMissingSolution中,cond.wait()前添加if(!notified),也能够正常运行;但这种做法与文档中给出的while(...)不同,文档中同时指出了虚假唤醒(Spurious Wakeup)的概念。虚假唤醒在《Programming with POSIX Threads》中的解释是::当一个线程wait在某个条件变量上,这个条件变量上没发生broadcast(相当于notifyAll)或signal(相当于notify)调用,wait也又可能返回。虚假唤醒听起来很奇怪,但是在多核系统上,使条件唤醒完全可预测可能导致多数条件变量操作变慢。"

为了防止虚假唤醒,需要在wait返回后继续检查某个条件是否达成,所有通常wait端的条件写为while而不是if,在Java中通常是:

// 等待线程:
synchronized(cond) {
	while(!done) {
		cond.wait();
	}
}

// 唤醒线程:
doSth();
synchronized(cond) {
	done = true;
	cond.notify();
}

总结

在<操作系统>的概念中,提供“互斥语义”的叫互斥器(Mutex),提供同步语义的叫条件变量(Condition Variable)。而在Java中,synchronized关键字和java.lang.Object提供了互斥量(mutex)语义,java.lang.Object的wait/notify/notifyAll则提供了条件变量语义。

另外,多线程环境下对象的回收是十分困难的,Java运行环境的垃圾回收(Garbage Collection,GC)功能减轻了程序员的负担。

参考

Java 1.6 apidocs Thread,http://tool.oschina.net/uploads/apidocs/jdk-zh/java/lang/Thread.html

《Java Concurrency in Practice》(中译本名为《Java并发实践》)

Spurious Wakeup -- Wikipedia,http://en.wikipedia.org/wiki/Spurious_wakeup

多线程编程中条件变量和虚假唤醒(spurious wakeup)的讨论,http://siwind.iteye.com/blog/1469216

时间: 2024-08-10 21:27:46

Java并发原语——线程、互斥与同步的相关文章

java并发编程线程安全

编写线程安全的代码实质就是管理对状态的访问,而且通常是共享的.可变的状态,对象的状态就是数据,存储在状态变量中,比如实例域,或者静态域,同时还包含了其它附属的域,例如hashmap的状态一部分存储到对象本身中,但同时也存储到很多mqp.entry中对象中,一个对象的状态还包含了任何会对他外部可见行为产生影响的数据. 所谓共享是指一个对象可以被多个线程访问, 所谓可变:是指变量的值在其生命周期内可以改变, 真正目的:在不可控制的并发访问中保护数据 线程安全必要条件: 1:对象是否被两个或以上的线程

线程互斥与同步

能解决下面的问题,基本上就能理解线程互斥与同步了. 子线程循环10次,主线程循环100次,接着子线程循环10,主线程循环100次.如此往复循环50次. 1 package cn.lah.thread; 2 3 public class TraditionalThreadCommunication { 4 5 public static void main(String[] args) { 6 7 final Business business = new Business(); 8 new Th

JAVA 并发编程-线程同步通信技术(Lock和Condition)(十)

在之前的博客中已经介绍过线程同步通信技术<JAVA 并发编程-传统线程同步通信技术(四)>,上篇是使用的synchronized,wait,notify来实现,今天我们使用的是Lock和Condition,下面我们结合两者对比来学习. 简单的Lock锁应用: /** * 简单Lock的应用 * @author hejingyuan * */ public class LockTest { public static void main(String[] args) { new LockTest

Java并发:线程间同步-条件队列和同步工具类

转载请注明出处: jiq?钦's technical Blog - 季义钦 线程之间的同步,除了互斥(前面介绍的互斥锁)之外,还存在协作关系,下面我们就介绍一下java线程间常见的一些协作方式. 一.内置条件队列 正如每个Java对象都可以作为一个内置锁,每个对象也可以作为一个条件队列,称为内置条件队列,Object.wait().notify()/notifyAll()构成了内置条件队列的API. 需要注意的是,调用任何对象X的内置条件队列的API都必须要先获得该对象X的内置锁. 1.API介

Java并发3-多线程面试题

1) 什么是线程? 线程是操作系统能够进行运算调度的最小单位,它被包含在进程之中,是进程中的实际运作单位.程序员可以通过它进行多处理器编程,你可以使用多线程对运算密集型任务提速. 2) 线程和进程有什么区别? 线程是进程的子集,一个进程可以有很多线程,每条线程并行执行不同的任务.不同的进程使用不同的内存空间,而所有的线程共享一片相同的内存空间.别把它和栈内存搞混,每个线程都拥有单独的栈内存用来存储本地数据. 3) 如何在Java中实现线程? 在语言层面有两种方式.java.lang.Thread

java并发:线程池、饱和策略、定制、扩展

一.序言 当我们需要使用线程的时候,我们可以随时新建一个线程,这样实现起来非常简便,但在某些场景下存在缺陷:如果需要同时执行多个任务(即并发的线程数量很多),频繁地创建线程会降低系统的效率,因为创建和销毁线程均需要一定的时间.线程池可以使线程得到复用,所谓线程复用就是线程在执行完一个任务后并不被销毁,该线程可以继续执行其他的任务. 二.Executors提供的线程池 Executors是线程的工厂类,也可以说是一个线程池工具类,Executors提供的线程都是通过参数设置来实现不同的线程池机制.

Java 并发:线程中断-interrupt

一直以为执行了interrupt方法就可以让线程结束,并抛出InterruptedException. 今天看了Java并发编程实战的第七章发现并不是这么回事,在这章的开头就提到 要使任务和线程能安全.快速.可靠地停止下来,并不是一件容易的事.Java没有提供任何机制来安全地终止线程.但它提供了(Interruption),这是一种协作机制,能够使一个线程终止另一个线程的当前工作 如上提到的是协作,而不是强制.因为如果需要被中断的线程任务实现没有准守这样的协作约定,那么其他线程就没有办法通过in

java并发学习--线程池(一)

关于java中的线程池,我一开始觉得就是为了避免频繁的创建和销毁线程吧,先创建一定量的线程,然后再进行复用.但是要具体说一下如何做到的,自己又说不出一个一二三来了,这大概就是自己的学习习惯流于表面,不经常深入的结果吧.所以这里决定系统的学习一下线程池的相关知识. 自己稍微总结了一下,学习一些新的知识或者技术的时候,大概都可以分为这么几个点: 1.为什么会有这项技术,用原来的方法有什么问题. 2.这项新技术具体是怎么解决这个问题的(这时可能就要涉及到一些具体的知识点和编码了) 3.是不是使用这项技

Java并发编程——线程池的使用

在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题: 如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间. 那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务? 在Java中可以通过线程池来达到这样的效果.今天我们就来详细讲解一下Java的线程池,首先我们从最核心的ThreadPoolExecutor类中的方法讲起,