线程通信

线程通信

一 使用Synchronized的线程

1.当线程在系统内运行时,线程的调度具有一定的透明性,程序通常无法准确控制线程的轮换执行,但java也提供了一些机制来保证线程协调运行。
Object类提供了wait(),notify()和notifyAll()三个方法,这三个方法属于Object类,但是必须由同步监视器来调用,可以分为以下两种情况:
(1)对于使用synchronized修饰的同步方法,因为该类的默认实例this就是同步监视器,所以可以在同步方法中直接调用这三个方法
(2)对于使用synchronized修饰的同步代码块,同步监视器是synchronized后括号里的对象,所以必须使用该对象调用这三个方法

2.wait():导致当前线程等待,直到其他线程调用该同步监视器的notify()方法或notifyAll()方法来唤醒该线程。无时间参数的wait会一直等待,直到其他线程通知,带毫秒的wait会等到指定时间后自动苏醒。调用wait()方法的当前线程会释放对该同步监视器的锁定。
notify():唤醒在此同步监视器上等待的单个线程。如果所有线程都是在此同步监视器上等待,则会选择唤醒其中一个线程,选择是任意的。
notifyAll():唤醒在此同步监视器上等待的所有线程。

3.假设在系统中有两个线程,分别代表存款者和取钱者,系统要求存款者和取钱者不断重复存钱、取钱的动作,而且要求每当存款者将钱存入指定账户后,取钱者就立即取出该笔钱,不允许存款者连续两次存钱,也不允许取钱者连续两次取钱。

程序中可以通过一个旗标来标识账户中是否已有存款,当旗标为false时,账户中没有存款,存款者线程可以向下执行,当存款者存款后,旗标为true,并调用notify()或notifyAll()方法来唤醒其他线程,并调用wait()让存款者线程等待,取款者的操作类似。

Account.java

public class Account {
    private String accountNo;
    private double balance;
    private boolean flag=false;
    public Account(String accountNo,double balance){
        this.accountNo=accountNo;
        this.balance=balance;
    }

    //因为账户余额不可以随便更改,所以只为balance提供getter方法
    public double getBalance() {
        return balance;
    }

    public String getAccountNo() {
        return accountNo;
    }

    public void setAccountNo(String accountNo) {
        this.accountNo = accountNo;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;

        Account account = (Account) o;

        return accountNo.equals(account.accountNo);

    }

    @Override
    public int hashCode() {
        return accountNo.hashCode();
    }

    public synchronized void draw(double drawAmount){
        try{
            if(!flag){
                wait();
            }else{
                System.out.println(Thread.currentThread().getName()+"取钱:"+drawAmount);
                balance-=drawAmount;
                System.out.println("账户余额为:"+balance);
                flag=false;
                notifyAll();
            }
        }catch(InterruptedException ex){
            ex.printStackTrace();
        }
    }

    public synchronized void depoist(double depoistAmount){
        try{
            if(flag){
                wait();
            }else{
                System.out.println(Thread.currentThread().getName()+"存款:"+depoistAmount);
                balance+=depoistAmount;
                System.out.println("账户余额为:"+balance);
                flag=true;
                notifyAll();
            }
        }catch(InterruptedException ex){
            ex.printStackTrace();
        }
    }
}

DrawThread.java

public class DrawThread extends Thread {
    private Account account;
    private double drawAmount;

    public DrawThread(String name, Account account, double drawAmount) {
        super(name);
        this.account = account;
        this.drawAmount = drawAmount;
    }
    public void run(){
        for(int i=0;i<100;i++) {
            account.draw(drawAmount);
        }
    }
}

DepositThread.java

public class DepositThread extends Thread {
    private Account account;
    private double depositAmount;
    public DepositThread(String name, Account account, double depositAmount) {
        super(name);
        this.account = account;
        this.depositAmount = depositAmount;
    }
    public void run(){
        for(int i=0;i<100;i++){
            account.depoist(depositAmount);
        }
    }
}

DrawTest.java

public class DrawTest {
    public static void main(String[] args){
        Account acct=new Account("1234567",0);
        new DrawThread("取钱者",acct,800).start();
        new DepositThread("存款者甲",acct,800).start();
        new DepositThread("存钱者乙",acct,800).start();
        new DepositThread("存钱者丙",acct,800).start();
    }
}

结果:
存款者甲存款:800.0
账户余额为:800.0
取钱者取钱:800.0
账户余额为:0.0

从结果可以发现,三个存款者线程随机的向账户中存款,只有一个取款者取钱。但是程序最后被阻塞无法继续向下执行,这是因为三个存款者线程共有300此存款操作,而一个取钱者线程只有100次取钱操作,程序最后被阻塞,如下:

存钱者丙存款:800.0
账户余额为:800.0

这里的阻塞并不是死锁,只是取钱者线程已经执行完毕,而存款者线程还在等待其他线程来取钱而已,并不是等待其他线程释放同步监视器。

二 使用Condition控制线程通信

1.如果程序中直接使用Lock对象来保证同步,则系统中不存在隐式的同步监视器,就不能使用wait(),notify(),notifyAll()进行线程通信了。Java提供了一个Condition来保持协调,使用Condition可以让那些已经得到Lock对象却无法继续执行的线程释放Lock锁,也可以唤醒其他处于等待的线程。Condition实例被绑定在了一个Lock对象上,只要调用Lock对象的newCondition()方法即可。

2.Condition提供了如下三个方法:
await():同wait()方法,但是该方法有更多的变体,如long awaitNanos(long nanosTimeout),void awaitUninterruptibly(),awaitUtil(Date deadline)等
signal():同notify()
signalAll():同notifyAll()

3.Account.java

public class Account {
    private final Lock lock=new ReentrantLock();
    private final Condition cond=lock.newCondition();
    private String accountNo;
    private double balance;
    private boolean flag=false;
    public Account(String accountNo,double balance){
        this.accountNo=accountNo;
        this.balance=balance;
    }

    //因为账户余额不可以随便更改,所以只为balance提供getter方法
    public double getBalance() {
        return balance;
    }

    public String getAccountNo() {
        return accountNo;
    }

    public void setAccountNo(String accountNo) {
        this.accountNo = accountNo;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;

        Account account = (Account) o;

        return accountNo.equals(account.accountNo);

    }

    @Override
    public int hashCode() {
        return accountNo.hashCode();
    }

    public synchronized void draw(double drawAmount){
        lock.lock();
        try{
            if(!flag){
                cond.await();
            }else{
                System.out.println(Thread.currentThread().getName()+"取钱:"+drawAmount);
                balance-=drawAmount;
                System.out.println("账户余额为:"+balance);
                flag=false;
                cond.signalAll();
            }
        }catch(InterruptedException ex){
            ex.printStackTrace();
        }finally{
            lock.unlock();
        }
    }

    public synchronized void deposit(double depositAmount){
        lock.lock();
        try{
            if(flag){
                cond.await();
            }else{
                System.out.println(Thread.currentThread().getName()+"存款:"+depositAmount);
                balance+=depositAmount;
                System.out.println("账户余额为:"+balance);
                flag=true;
                cond.signalAll();
            }
        }catch(InterruptedException ex){
            ex.printStackTrace();
        }finally{
            lock.unlock();
        }
    }
}

其他的几个java文件不变,运行结果是一样的。

三 使用阻塞队列(BlockingQueue)控制线程通信
1.BlockingQueue是Queue的子接口,但它的主要用途并不是作为容器,而是作为线程同步的工具。它的一个特征是:当生产者线程试图向BlockingQueue中放入元素时,如果该队列已满,则该线程被阻塞;当消费者线程试图从BlockingQueue中取出元素时,如果该队列已空,则该线程被阻塞。程序的两个线程交替向BlockingQueue中放入、取出元素,即可很好的控制线程的通信。

2.提供了两个支持阻塞的方法:
put(E e):试图把E元素放入BlockingQueue中,如果队列的元素已满,则阻塞该线程;
take():尝试从BlockingQueue的头部取出元素,如果队列的元素已空,则阻塞该线程;

3.BlockingQueue继承了Queue接口,可以使用Queue接口中的方法,归纳起来分为三组:
(1)在队列尾部插入元素,包括add(E e),offer(E e),put(E e),当队列已满时,分别会抛出异常、返回false、阻塞队列;
(2)在队列头部删除并返回删除的元素,包括remove(),poll()和take(),当队列已空时,分别会抛出异常、返回false、阻塞队列;
(3)在队列头部取出但不删除元素,包括element()和peek()方法,当队列已空时,着三个方法分别抛出异常、返回false

4.Java7之后又新增了一些阻塞队列,包含5个实现类:
(1)ArrayBlockingQueue:基于数组实现的BlockingQueue队列;
(2)LinkedBlockingQueue:基于链表实现的BlockingQueue队列;
(3)PriorityBlockingQueue:它并不是标准的阻塞队列,当调用remove()扥方法取出元素时,并不是取出队列中存在时间最长的元素,而是队列中最小的元素。判断元素的大小可根据元素(实现Comparable接口)的本身大小来自然排序,也可以使用Comparator进行定制排序。
(4)SynchronousQueue:同步队列,对该队列的存取操作必须交替进行;
(5)DelayQueue:它是一个特殊的BlockingQueue,底层基于PriorityBlockingQueue实现,并要求所有的集合元素都要实现Delay接口。

5.举例

public class BlockingQueueTest{
    public static void main(String[] args) throws Exception{
        BlockingQueue<String> bq=new ArrayBlockingQueue<String>(2);
        bq.put("java");
        bq.put("study");
        bq.put("javaee");  //阻塞线程
    }
}

四 线程池
1.系统启动一个新线程的成本是比较高的,因为涉及到与操作系统的交互,使用线程池可以很好的提高性能,尤其是当程序中需要创建大量生存期很短的线程时,更要考虑使用线程池。线程池在系统启动时就创建大量的空闲的线程,程序将一个Runnable对象或Callable对象传给线程池,线程池就会启动一个线程来执行它的run()或call()方法,当方法结束后,该线程并不会死亡,而是再次返回线程池中成为空闲状态,等待下一个Runnable对象的run()或call()方法。使用线程池还可以控制系统中并发线程的数量,当系统中包含大量并发线程时,会导致系统性能剧烈下降,甚至导致JVM崩溃。

2.Java5之后支持内建线程池,通过一个Executors工厂类来产生线程池,该工厂类包含以下几个静态工厂方法来创建线程池:
(1)newCachedThreadPool():创建一个具有缓存功能的线程池
(2)newFixedThreadPool( int nThreads):创建一个可重用的、具有固定线程数的线程池
(3)newSingleThreadPool():创建一个只有单线程的线程池
(4)newScheduledThreadPool(int corePoolSize):创建具有指定线程数的线程池,它可以在指定延迟后执行线程任务。
(6)newSingleThreadScheduledExecutor():创建只有一个线程的线程池,它可以在指定延迟后执行线程任务。
(7)ExecutorService newWorkStealingPool(int parallelism):创建持有足够的线程的线程池来支持给定的并行级别,该方法还会使用多个队列来减少竞争
(8)ExecutorService newWorkStealingPool():上一个方法的简化版本,如果当前机器有4个CPU,则目标并行级别被设置为4,也就是相当于前一个方法传入4作为参数
注意:前三个方法返回一个ExecutorService对象,该对象代表一个线程池,可以执行Runnable或Callable对象所代表的线程;中间两个方法返回ScheduledExecutorService线程池,它可以在指定延迟后执行线程任务;后两个是Java8新增的,充分利用了多CPU并行的能力,这两个方法生成的work stealing池,相当于后台线程池,如果所有的后台线程都死亡了,work stealing池中的线程会自动死亡。

3.ExecutorService里提供了如下三个方法:
(1)Future<?> submit(Runnable task): 将一个Runnable对象提交给指定的线程池,线程池将在有空闲线程时执行Runnable对象代表的任务。这里的Future对象代表Runnable任务的返回值,但run()方法无返回值,所以在run()方法执行结束后返回null。但可以调用Future的isDone(),isCancelled()来获得Runnable对象的执行状态。
(2)<T> Future<T> submit(Runnable task, T result): 这里的result显式指定线程执行结束的返回值,所以Future对象将在run()方法执行结束后返回result
(3)<T> Future<T> submit(Callable<T> task): 这里提交的是Callable对象

4.ScheduledExecutorService提供了4个方法:
(1)ScheduledFuture<V> schedule(Callable<V> callable,long delay,TimeUnit unit):指定callable任务将在delay延迟后执行。
(2)ScheduledFuture<?> schedule(Runnable command,long delay,TimeUnit unit):指定command任务将在delay延迟后执行。
(3)ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit): 指定command任务将在delay延迟后执行,而且以设定频率重复执行,也就是说,在initialDelay后开始执行,一次在initialDelay+period,initialDelay+2*period...处重复执行。
(4)ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit):
创建并执行一个在给定初始延迟后首次启用的定期操作,随后在每一次执行终止和下一次执行开始之间都会存在给定的延迟。如果任务在某次执行时遇到异常,就会取消后续执行;否则,只会通过程序来显示取消或终止该任务。

5.用完一个线程池后,应该调用该线程池的shutdown()方法,该方法将启动线程池的关闭序列,线程池不再接收新任务,但会将所有已提交任务执行完成;也可以调用shutdownNow()方法来关闭线程池,该方法试图停止所有正在执行的活动任务,暂停处理正在等待的任务,并返回等待执行的任务列表。

6.举例

public class ThreadPoolTest {
    public static void main(String[] args) throws Exception{
        ExecutorService pool= Executors.newFixedThreadPool(6);
        Runnable target= ()->{
            for(int i=0;i<100;i++){
                System.out.println(Thread.currentThread().getPriority()+"的i值为:"+i);
            }
        };
        //向线程池中提交两个线程
        pool.submit(target)
        pool.submit(target);
        pool.shutdown();
    }
}

五 ThreadLocal类

1.java为ThreadLocal类增加了泛型支持,使用ThreadLocal类可以简化多线程编程时的并发访问,使用这个工具类可以很简捷的隔离多线程程序的竞争资源。

2.ThreadLocal,是Thread Local Variable (线程局部变量) 的意思,它的功能很简单,就是为每一个使用该变量的线程都提供一个变量值的副本,使每一个线程都可以独立的改变自己的副本,而不会与其他线程的副本冲突。从线程的角度看,就好像每一个线程都完全拥有该变量一样。

3.ThreadLocal类只提供了三个public方法:
T get():返回此线程局部变量中当前线程副本中的值
void remove():删除此线程局部变量中当前线程的值
void set(T value):设置此线程局部变量中当前线程副本中的值

4.ThreadLocalTest.java

class Account{
    //定义一个ThreadLocal类型的变量,该变量将是一个线程局部变量,每个线程都会保留该变量的一个副本
    private ThreadLocal<String> name=new ThreadLocal<>();
    public Account(String str){
        this.name.set(str);
        //用于访问当前线程的name的副本
        System.out.println("---"+this.name.get());
    }
    public String getName(){
        return name.get();
    }
    public void setName(String str){
        this.name.set(str);
    }
}
class MyTest extends Thread{
    private Account account;

    public MyTest(String name, Account account) {
        super(name);
        this.account = account;
    }
    public void run(){
        for(int i=0;i<10;i++){
            if(i==6) {
                account.setName(getName());
            }
            System.out.println(account.getName()+"账户的i值:"+i);
        }
    }
}
public class ThreadLocalTest {
    public static void main(String[] args){
        //虽然两个线程共享同一个账户,即只有一个账户名,但由于账户名是ThreadLocal类型的,所以每个线程都完全拥有各自的账户名副本,因此在i==6之后,将看到两个线程访问同一个账户时出现不同的账户名
        Account at=new Account("初始名");
        new MyTest("线程甲",at).start();
        new MyTest("线程乙",at).start();
    }
}

结果:

---初始名
null账户的i值:0
null账户的i值:0
null账户的i值:1
null账户的i值:1
null账户的i值:2
null账户的i值:2
null账户的i值:3
null账户的i值:3
null账户的i值:4
null账户的i值:4
null账户的i值:5
null账户的i值:5
线程甲账户的i值:6
线程乙账户的i值:6
线程甲账户的i值:7
线程乙账户的i值:7
线程甲账户的i值:8
线程乙账户的i值:8
线程甲账户的i值:9
线程乙账户的i值:9

实际上账户名有三个副本,主线程一个,另外启动的两个线程各一个,它们的值互不干扰,每个线程都完全拥有自己的ThreadLocal变量。ThreadLocal将需要并发访问的资源复制多份,每个线程拥有一份资源,每个线程都拥有自己的线程副本,从而也就没有必要对该变量进行同步了。ThreadLocal提供了线程安全的共享对象,在编写多线程代码时,可以把不安全的整个变量封装进ThreadLocal,或者把该对象与线程相关的状态使用ThreadLocal保存。

5.ThreadLocal并不能代替同步机制,两者面向的问题领域不同,同步机制是为了同步多个线程对相同资源的并发访问,是多个线程之间进行通信的有效方式,而ThreadLocal是为了隔离多个线程的数据共享,从根本上避免多个线程之间对共享资源的竞争。

 

时间: 2024-10-23 10:26:45

线程通信的相关文章

Java多线程编程核心技术读书笔记(3)-线程通信

线程是操作系统中独立的个体,但是这些个体如果无法经过特殊的处理就不能成为一个整体.线程间通信可以实现线程间的信息互换.相互唤起等功能,是系统的交互性更加强大,大大提高CPU的利用率,同时还能让开发者对各个线程任务有清晰的把控和监督,最常用的线程通信方法就是--等待/通知机制. 一.等待/通知机制 1.wait() / notify() 等待/通知机制在生活中比比皆是,例如:厨师/服务员的菜品传递台.生产者/消费者模式,JDK中通过Object里面的两个方法 wait() / notify() 来

java 线程通信

java 线程通信使用wait notify 配合synchronized 当线程执行wait()时,会把当前的锁释放,然后让出CPU,进入等待状态.当执行notify/notifyAll方法时,会唤醒一个处于等待该 对象锁 的线程,然后继续往下执行,直到执行完退出对象锁锁住的区域(synchronized修饰的代码块)后再释放锁. 如下代码: public class ThreadTest { //声明一个线程可视化的list集合 public static List<String> lis

Thinking in Java---再谈线程通信

前面写过一篇关于线程通信的博客,但是只是简单的罗列了几种线程通信的形式及语法:这几天又把<>上对应的内容看了一遍,这一篇博客主要结合几个例子说明下几种线程通信方式的特点. 首先还是要明确线程通信要解决的问题是什么:考虑这么一个情况,我们现在要对一台车进行涂蜡和抛光,并且在进行抛光之前一定要保证已涂蜡,且在涂另一层蜡的时候,又要保证已经抛光:现在我们我们开启两个线程,一个负责涂蜡,另一个负责抛光:要想顺利的完成任务,那么显然这两个线程需要相互等待和协调.这就是线程通信的任务了–如果我们有一个任务

java多线程 - 线程通信

当线程在系统内运行时,程序通常无法准确控制线程的轮换执行,但是可以通过一些机制来保证线程协调运行. 由同步监视器对象协调线程 实现这种功能可以借助于Object类提供的wait().notify().notifyAll()三个方法(注意,这三个方法属于Object类,不属于Thread类).这三个方法必须由同步监视器来调用,可以分为两种情况: 对于同步方法,同步监视器默认是当前实例(this),所以可以在同步方法中直接调用这三个方法: 对于同步代码块,同步监视器是synchronized后括号里

ZeroMq实现跨线程通信

ZeroMq实现跨线程通信 之前在技术崇拜的技术经理指导下阅读了ZeroMq的基础代码,现在就将阅读的心得与成果记录一下,并重新模仿实现了一下经理的异步队列. 1.对外接口 //主要接口(1)void *ymq_attach (void *ctx_, int oid, void* sink_); (2)void *ymq_detach (void *ctx_, int oid); (3)void *ymq_register_timer (void *ctx_, int oid, int time

iOS-----线程同步与线程通信

线程同步与线程通信 多线程是有趣的事情,它很容易突然出现”错误情况”,这是由于系统的线程调度具有一定的随机性造成的.不过,即使程序偶然出现问题,那么是由于编程不当所引起的.当使用多个线程来访问同一个数据时,很容易”偶然”出现线程安全问题. 线程安全问题 关于线程安全问题,有一个经典的问题:银行取钱的问题.银行取钱的基本流程基本可以分为如下几个步骤. 用户输入账户.密码.系统判断用户的账户.密码是否匹配. 用户输入取款金额. 系统判断账户余额是否大于取款金额. 如果余额大于取款金额,则取款成功;如

JavaSE:多线程补充--线程通信

线程通信我认为是多线程中最难掌握的部分了,这里通过两个例子来说明一下. 第一个: 使用两个线程打印 1-100. 线程1, 线程2 交替打印 public class Print implements Runnable{ int i = 1; public void run(){ while(true){ synchronized(this){ if(i<100){ notify(); System.out.println(Thread.currentThread().getName() + &qu

Android线程通信

摘要 andriod提供了 Handler 和 Looper 来满足线程间的通信.例如一个子线程从网络上下载了一副图片,当它下载完成后会发送消息给主线程,这个消息是通过绑定在主线程的Handler来传递的. 正文 图解: 代码示例: /** * @author allin.dev * http://allin.cnblogs.com */ public class MainThread extends Activity { private static final String TAG = "M

JavaSE——线程通信

线程通信: 如果线程A和线程B持有同一个MyObject类的对象object,这两个线程会去调用不同的方法,但是它们是同步执行的,比如:线程B需要等待线程A执行完了methodA()方法之后,它才能执行methodB()方法.这样,线程A和线程B就实现了 通信. 线程通信中要用到的方法:wait()方法: 执行同步锁(obj对象)的该方法的线程进入堵塞状态,会释放对象的锁,java虚拟机把该线程放到该对象的等待池中,该线程如果要再次执行,则需要其他线程将它唤醒. 1 package tongxi