Java多线程编程之同步器

同步器

为每种特定的同步问题提供了解决方案

Semaphore

Semaphore【信号标;旗语】,通过计数器控制对共享资源的访问。

测试类:

    package concurrent;

    import concurrent.thread.SemaphoreThread;

    import java.util.concurrent.Semaphore;

    /**
    * 拿客
    * www.coderknock.com
    * QQ群:213732117
    * 创建时间:2016年08月08日
    * 描述:
    */
    public class SemaphoreTest {

       public static void main(String[] args) {
           //在Thread里声明并不是同一个对象
           Semaphore semaphore = new Semaphore(3);
           SemaphoreThread testA = new SemaphoreThread("A", semaphore);
           SemaphoreThread testB = new SemaphoreThread("B", semaphore);
           SemaphoreThread testC = new SemaphoreThread("C", semaphore);
           SemaphoreThread testD = new SemaphoreThread("D", semaphore);
           SemaphoreThread testE = new SemaphoreThread("E", semaphore);
           SemaphoreThread testF = new SemaphoreThread("F", semaphore);
           SemaphoreThread testG = new SemaphoreThread("G", semaphore);
           testA.start();
           testB.start();
           testC.start();
           testD.start();
           testE.start();
           testF.start();
           testG.start();
       }
   }

线程写法:

   package concurrent.thread;

   import org.apache.logging.log4j.LogManager;
   import org.apache.logging.log4j.Logger;

   import java.util.concurrent.Semaphore;

   /**
    * 拿客
    * www.coderknock.com
    * QQ群:213732117
    * 创建时间:2016年08月08日
    * 描述:
    */
   public class SemaphoreThread extends Thread {
       private static final Logger logger = LogManager.getLogger(SemaphoreThread.class);
       //创建有3个信号量的信号量计数器
       public Semaphore semaphore;

       public SemaphoreThread(String name, Semaphore semaphore) {
           setName(name);
           this.semaphore = semaphore;
       }

       @Override
       public void run() {
           try {
               logger.debug(getName() + " 取号等待... " + System.currentTimeMillis());
               //取出一个信号
               semaphore.acquire();
               logger.debug(getName() + " 提供服务... " + System.currentTimeMillis());
               sleep(1000);
               logger.debug(getName() + " 完成服务... " + System.currentTimeMillis());

           } catch (InterruptedException e) {
               e.printStackTrace();
           }
           logger.debug(getName() + " 释放... " + System.currentTimeMillis());
           //释放一个信号
           semaphore.release();
       }
   }

执行结果【以下所有输出结果中[]中为线程名称- 后为输出的内容】:

    [C] - C 取号等待... 1470642024037
    [F] - F 取号等待... 1470642024036
    [E] - E 取号等待... 1470642024036
    [B] - B 取号等待... 1470642024037
    [D] - D 取号等待... 1470642024037
    [A] - A 取号等待... 1470642023965
    [D] - D 提供服务... 1470642024039
    [C] - C 提供服务... 1470642024039
    [G] - G 取号等待... 1470642024036
    [F] - F 提供服务... 1470642024040
    [D] - D 完成服务... 1470642025039
    [C] - C 完成服务... 1470642025039
    [D] - D 释放... 1470642025040
    [F] - F 完成服务... 1470642025040
    [C] - C 释放... 1470642025041
    [B] - B 提供服务... 1470642025042
    [A] - A 提供服务... 1470642025042
    [F] - F 释放... 1470642025043
    [E] - E 提供服务... 1470642025043
    [A] - A 完成服务... 1470642026043
    [B] - B 完成服务... 1470642026043
    [B] - B 释放... 1470642026043
    [A] - A 释放... 1470642026043
    [G] - G 提供服务... 1470642026044
    [E] - E 完成服务... 1470642026045
    [E] - E 释放... 1470642026045
    [G] - G 完成服务... 1470642027045
    [G] - G 释放... 1470642027046

可以看到,当3个信号量被领取完之后,之后的线程会阻塞在领取信号的位置,当有信号量释放之后才会继续执行。

CountDownLatch

CountDownLatch【倒计时锁】,线程中调用countDownLatch.await()使进程进入阻塞状态,当达成指定次数后(通过countDownLatch.countDown())继续执行每个线程中剩余的内容。

测试类:

package concurrent.thread;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.concurrent.CountDownLatch;

/**
 * 拿客
 * www.coderknock.com
 * QQ群:213732117
 * 创建时间:2016年08月08日
 * 描述:
 */
public class package concurrent;

import concurrent.thread.CountDownLatchThread;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;

/**
 * 拿客
 * www.coderknock.com
 * QQ群:213732117
 * 创建时间:2016年08月08日
 * 描述:
 */
public class CountDownLatchTest {

    private static final Logger logger = LogManager.getLogger(CountDownLatchTest.class);

    public static void main(String[] args) throws InterruptedException {
        //设定当达成三个计数时触发
        CountDownLatch countDownLatch = new CountDownLatch(3);
        new CountDownLatchThread("A", countDownLatch).start();
        new CountDownLatchThread("B", countDownLatch).start();
        new CountDownLatchThread("C", countDownLatch).start();
        new CountDownLatchThread("D", countDownLatch).start();
        new CountDownLatchThread("E", countDownLatch).start();
        for (int i = 3; i > 0; i--) {
            Thread.sleep(1000);
            logger.debug(i);
            countDownLatch.countDown();
        }
    }
}

线程类:

package concurrent.thread;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.concurrent.CountDownLatch;

/**
 * 拿客
 * www.coderknock.com
 * QQ群:213732117
 * 创建时间:2016年08月08日
 * 描述:
 */
public class CountDownLatchThread extends Thread {
    private static final Logger logger = LogManager.getLogger(CountDownLatchThread.class);
    //计数器
    private CountDownLatch countDownLatch;

    public CountDownLatchThread(String name, CountDownLatch countDownLatch) {
        setName(name);
        this.countDownLatch = countDownLatch;
    }

    @Override
    public void run() {
        logger.debug("执行操作...");
        try {
            sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        logger.debug("等待计数器达到标准...");
        try {
            //让线程进入阻塞状态,等待计数达成后释放
            countDownLatch.await();
            logger.debug("计数达成,继续执行...");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

执行结果:

 [E] - 执行操作...
 [B] - 执行操作...
 [A] - 执行操作...
 [C] - 执行操作...
 [D] - 执行操作...
 [main] DEBUG concurrent.CountDownLatchTest - 3
 [B] - 等待计数器达到标准...
 [E] - 等待计数器达到标准...
 [C] - 等待计数器达到标准...
 [D] - 等待计数器达到标准...
 [A] - 等待计数器达到标准...
 [main] DEBUG concurrent.CountDownLatchTest - 2
 [main] DEBUG concurrent.CountDownLatchTest - 1
 [E] - 计数达成,继续执行...
 [C] - 计数达成,继续执行...
 [B] - 计数达成,继续执行...
 [D] - 计数达成,继续执行...
 [A] - 计数达成,继续执行...
CyclicBarrier

CyclicBarrier【Cyclic周期,循环的 Barrier屏障,障碍】循环的等待阻塞的线程个数到达指定数量后使参与计数的线程继续执行并可执行特定线程(使用不同构造函数可以不设定到达后执行),其他线程仍处于阻塞等待再一次达成指定个数。

测试类:

package concurrent;

import concurrent.thread.CyclicBarrierThread;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.concurrent.CyclicBarrier;

/**
 * 拿客
 * www.coderknock.com
 * QQ群:213732117
 * 创建时间:2016年08月08日
 * 描述:
 */
public class CyclicBarrierTest {

    private static final Logger logger = LogManager.getLogger(CyclicBarrierTest.class);

    public static void main(String[] args) {
          //可以使用CyclicBarrier(int parties)不设定到达后执行的内容
        CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> {
            logger.debug("---计数到达后执行的内容----");
        });
        new CyclicBarrierThread("A", cyclicBarrier).start();
        new CyclicBarrierThread("B", cyclicBarrier).start();
        new CyclicBarrierThread("C", cyclicBarrier).start();
        new CyclicBarrierThread("D", cyclicBarrier).start();
        new CyclicBarrierThread("E", cyclicBarrier).start();
        new CyclicBarrierThread("A2", cyclicBarrier).start();
        new CyclicBarrierThread("B2", cyclicBarrier).start();
        new CyclicBarrierThread("C2", cyclicBarrier).start();
        new CyclicBarrierThread("D2", cyclicBarrier).start();
        new CyclicBarrierThread("E2", cyclicBarrier).start();
        //需要注意的是,如果线程数不是上面设置的等待数量的整数倍,比如这个程序中又加了个线程,
        // 那么当达到5个数量时,只会执行达到时的五个线程的内容,
        // 剩余一个线程会出于阻塞状态导致主线程无法退出,程序无法结束
        // new CyclicBarrierThread("F", cyclicBarrier).start();//将这行注释去掉程序无法自动结束
    }
}

线程类:

package concurrent.thread;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

/**
 * 拿客
 * www.coderknock.com
 * QQ群:213732117
 * 创建时间:2016年08月08日
 * 描述:
 */
public class CyclicBarrierThread extends Thread {

    private static final Logger logger = LogManager.getLogger(CyclicBarrierThread.class);

    private CyclicBarrier cyclicBarrier;

    public CyclicBarrierThread(String name, CyclicBarrier cyclicBarrier) {
        super(name);
        this.cyclicBarrier = cyclicBarrier;
    }

    @Override
    public void run() {
        logger.debug("执行操作...");
        try {
            int time = new Random().nextInt(10) * 1000;
            logger.debug("休眠" + time/1000 + "秒");
            sleep(time);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        logger.debug("等待计数器达到标准...");
        try {
            //让线程进入阻塞状态,等待计数达成后释放
            cyclicBarrier.await();
            logger.debug("计数达成,继续执行...");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
}

执行结果:

 [A] - 执行操作...
 [A] - 休眠0秒
 [E2] - 执行操作...
 [E2] - 休眠5秒
 [D2] - 执行操作...
 [D2] - 休眠4秒
 [C2] - 执行操作...
 [C2] - 休眠4秒
 [B2] - 执行操作...
 [B2] - 休眠6秒
 [A2] - 执行操作...
 [A2] - 休眠8秒
 [E] - 执行操作...
 [E] - 休眠5秒
 [D] - 执行操作...
 [D] - 休眠0秒
 [C] - 执行操作...
 [C] - 休眠3秒
 [B] - 执行操作...
 [B] - 休眠7秒
 [A] - 等待计数器达到标准...
 [D] - 等待计数器达到标准...
 [C] - 等待计数器达到标准...
 [D2] - 等待计数器达到标准...
 [C2] - 等待计数器达到标准...
 [C2] DEBUG concurrent.CyclicBarrierTest - ---计数到达后执行的内容----
 [C2] - 计数达成,继续执行...
 [A] - 计数达成,继续执行...
 [C] - 计数达成,继续执行...
 [D2] - 计数达成,继续执行...
 [D] - 计数达成,继续执行...
 [E2] - 等待计数器达到标准...
 [E] - 等待计数器达到标准...
 [B2] - 等待计数器达到标准...
 [B] - 等待计数器达到标准...
 [A2] - 等待计数器达到标准...
 [A2] DEBUG concurrent.CyclicBarrierTest - ---计数到达后执行的内容----
 [E] - 计数达成,继续执行...
 [B2] - 计数达成,继续执行...
 [E2] - 计数达成,继续执行...
 [B] - 计数达成,继续执行...
 [A2] - 计数达成,继续执行...

可以想象成以前不正规的长途汽车站的模式:

不正规的长途汽车站会等待座位坐满之后才发车,到达目的地之后继续等待然后循环进行。每个人都是一个Thread,上车后触发cyclicBarrier.await();,当坐满时就是达到指定达成数的时候,车辆发车就是达成后统一执行的内容,发车后车上的人们就可以聊天之类的操作了【我们暂且理解为上车后人们就都不能动了O(∩_∩)O~】。

CountDownLatch与CyclicBarrier区别:

CountDownLatch是一个或多个线程等待计数达成后继续执行,await()调用并没有参与计数。

CyclicBarrier则是N个线程等待彼此执行到零界点之后再继续执行,await()调用的同时参与了计数,并且CyclicBarrier支持条件达成后执行某个动作,而且这个过程是循环性的。

Exchanger

Exchanger<T> 用于线程间进行数据交换

测试类:

package concurrent;

import concurrent.pojo.ExchangerPojo;
import concurrent.thread.ExchangerThread;

import java.util.HashMap;
import java.util.concurrent.Exchanger;

/**
 * 拿客
 * www.coderknock.com
 * QQ群:213732117
 * 创建时间:2016年08月08日
 * 描述:
 */
public class ExchangerTest {

    public static void main(String[] args) {
        Exchanger<HashMap<String, ExchangerPojo>> exchanger = new Exchanger<>();
        new ExchangerThread("A", exchanger).start();
        new ExchangerThread("B", exchanger).start();
    }
}

实体类:

package concurrent.pojo;

import com.alibaba.fastjson.JSON;

import java.util.Date;
import java.util.List;

/**
 * 拿客
 * www.coderknock.com
 * QQ群:213732117
 * 创建时间:2016年08月08日
 * 描述:
 */
public class ExchangerPojo {
    private int intVal;
    private String strVal;
    private List<String> strList;
    private Date date;

    public ExchangerPojo(int intVal, String strVal, List<String> strList, Date date) {
        this.intVal = intVal;
        this.strVal = strVal;
        this.strList = strList;
        this.date = date;
    }

    public int getIntVal() {
        return intVal;
    }

    public void setIntVal(int intVal) {
        this.intVal = intVal;
    }

    public String getStrVal() {
        return strVal;
    }

    public void setStrVal(String strVal) {
        this.strVal = strVal;
    }

    public List<String> getStrList() {
        return strList;
    }

    public void setStrList(List<String> strList) {
        this.strList = strList;
    }

    public Date getDate() {
        return date;
    }

    public void setDate(Date date) {
        this.date = date;
    }

    @Override
    public String toString() {
        return JSON.toJSONString(this);
    }
}

线程类:

package concurrent.thread;

import concurrent.pojo.ExchangerPojo;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.*;
import java.util.concurrent.Exchanger;

/**
 * 拿客
 * www.coderknock.com
 * QQ群:213732117
 * 创建时间:2016年08月08日
 * 描述:
 */
public class ExchangerThread extends Thread {
    private Exchanger<HashMap<String, ExchangerPojo>> exchanger;

    private static final Logger logger = LogManager.getLogger(ExchangerThread.class);

    public ExchangerThread(String name, Exchanger<HashMap<String, ExchangerPojo>> exchanger) {
        super(name);
        this.exchanger = exchanger;
    }

    @Override
    public void run() {
        HashMap<String, ExchangerPojo> map = new HashMap<>();
        logger.debug(getName() + "提供者提供数据...");
        Random random = new Random();
        for (int i = 0; i < 3; i++) {
            int index = random.nextInt(10);
            List<String> list = new ArrayList<>();
            for (int j = 0; j < index; j++) {
                list.add("list ---> " + j);
            }
            ExchangerPojo pojo = new ExchangerPojo(index, getName() + "提供的数据", list, new Date());
            map.put("第" + i + "个数据", pojo);
        }
        try {
            int time = random.nextInt(10);
            logger.debug(getName() + "等待" + time + "秒....");
            for (int i = time; i > 0; i--) {
                sleep(1000);
                logger.debug(getName() + "---->" + i);
            }
              //等待exchange是会进入阻塞状态,可以在一个线程中与另一线程多次交互,此处就不写多次了
            HashMap<String, ExchangerPojo> getMap = exchanger.exchange(map);
            time = random.nextInt(10);
            logger.debug(getName() + "接受到数据等待" + time + "秒....");
            for (int i = time; i > 0; i--) {
                sleep(1000);
                logger.debug(getName() + "---->" + i);
            }
            getMap.forEach((x, y) -> {
                logger.debug(x + " -----> " + y.toString());
            });
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

执行结果:

 [B] - B提供者提供数据...
 [A] - A提供者提供数据...
 [A] - A等待2秒....
 [B] - B等待0秒....
 [A] - A---->2
 [A] - A---->1
 [B] - B接受到数据等待1秒....
 [A] - A接受到数据等待4秒....
 [B] - B---->1
 [A] - A---->4
 [B] - 第0个数据 -----> {"date":1470652252049,"intVal":5,"strList":["list ---> 0","list ---> 1","list ---> 2","list ---> 3","list ---> 4"],"strVal":"A提供的数据"}
 [B] - 第1个数据 -----> {"date":1470652252049,"intVal":1,"strList":["list ---> 0"],"strVal":"A提供的数据"}
 [B] - 第2个数据 -----> {"date":1470652252049,"intVal":4,"strList":["list ---> 0","list ---> 1","list ---> 2","list ---> 3"],"strVal":"A提供的数据"}
 [A] - A---->3
 [A] - A---->2
 [A] - A---->1
 [A] - 第0个数据 -----> {"date":1470652252057,"intVal":1,"strList":["list ---> 0"],"strVal":"B提供的数据"}
 [A] - 第1个数据 -----> {"date":1470652252057,"intVal":6,"strList":["list ---> 0","list ---> 1","list ---> 2","list ---> 3","list ---> 4","list ---> 5"],"strVal":"B提供的数据"}
 [A] - 第2个数据 -----> {"date":1470652252057,"intVal":6,"strList":["list ---> 0","list ---> 1","list ---> 2","list ---> 3","list ---> 4","list ---> 5"],"strVal":"B提供的数据"}

Phaser

Phaser个人感觉兼具了CountDownLatch与CyclicBarrier的功能,并提供了分阶段的能力。

实现分阶段的CyclicBarrier的功能

测试代码:

package concurrent;

import concurrent.thread.PhaserThread;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.concurrent.Phaser;

/**
 * 拿客
 * 网站:www.coderknock.com
 * QQ群:213732117
 * 三产 创建于 2016年08月08日 21:25:30。
 */
public class PhaserTest {

    private static final Logger logger = LogManager.getLogger(PhaserTest.class);

    public static void main(String[] args) {
        Phaser phaser = new Phaser() {
            /**此方法有2个作用:
             * 1、当每一个阶段执行完毕,此方法会被自动调用,因此,重载此方法写入的代码会在每个阶段执行完毕时执行,相当于CyclicBarrier的barrierAction。
             * 2、当此方法返回true时,意味着Phaser被终止,因此可以巧妙的设置此方法的返回值来终止所有线程。例如:若此方法返回值为 phase>=3,其含义为当整个线程执行了4个阶段后,程序终止。
             * */
            @Override
            protected boolean onAdvance(int phase, int registeredParties) {
                logger.debug("阶段--->" + phase);
                logger.debug("注册的线程数量--->" + registeredParties);
                return super.onAdvance(phase, registeredParties);
            }
        };

        for (int i = 3; i > 0; i--) {
            new PhaserThread("第" + i + "个", phaser).start();
        }
    }
}

线程代码:

package concurrent.thread;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Random;
import java.util.concurrent.Phaser;

/**
 * 拿客
 * 网站:www.coderknock.com
 * QQ群:213732117
 * 三产 创建于 2016年08月08日 21:16:55。
 */
public class PhaserThread extends Thread {

    private Phaser phaser;

    private static final Logger logger = LogManager.getLogger(PhaserThread.class);

    public PhaserThread(String name, Phaser phaser) {
        super(name);
        this.phaser = phaser;
        //把当前线程注册到Phaser
        this.phaser.register();
        logger.debug("name为" + name + "的线程注册了" + this.phaser.getRegisteredParties() + "个线程");
    }

    @Override
    public void run() {
        logger.debug("进入...");
        phaser.arrive();
        for (int i = 6; i > 0; i--) {
            int time = new Random().nextInt(5);
            try {
                logger.debug("睡眠" + time + "秒");
                sleep(time * 1000);
                if (i == 1) {
                    logger.debug("未完成的线程数量:" + phaser.getUnarrivedParties());
                    logger.debug("最后一次触发,并注销自身");
                    phaser.arriveAndDeregister();
                    logger.debug("未完成的线程数量:" + phaser.getUnarrivedParties());
                } else {
                    logger.debug("未完成的线程数量:" + phaser.getUnarrivedParties());
                    logger.debug(i + "--->触发并阻塞...");
                    phaser.arriveAndAwaitAdvance();//相当于CyclicBarrier.await();
                    logger.debug("未完成的线程数量:" + phaser.getUnarrivedParties());
                }

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        logger.debug("注销完成之后注册的线程数量--->" + phaser.getRegisteredParties());
    }
}

执行结果:

 [main] - name为第3个的线程注册了1个线程
 [main] - name为第2个的线程注册了2个线程
 [main] - name为第1个的线程注册了3个线程
 [第3个] - 进入...
 [第2个] - 进入...
 [第3个] - 睡眠2秒
 [第2个] - 睡眠1秒
 [第1个] - 进入...
 [第1个] - 阶段--->0
 [第1个] - 注册的线程数量--->3
 [第1个] - 睡眠4秒
 [第2个] - 未完成的线程数量:3
 [第2个] - 6--->触发并阻塞...
 [第3个] - 未完成的线程数量:2
 [第3个] - 6--->触发并阻塞...
 [第1个] - 未完成的线程数量:1
 [第1个] - 6--->触发并阻塞...
 [第1个] - 阶段--->1
 [第1个] - 注册的线程数量--->3
 [第1个] - 未完成的线程数量:3
 [第3个] - 未完成的线程数量:3
 [第2个] - 未完成的线程数量:3
 [第1个] - 睡眠1秒
 [第3个] - 睡眠0秒
 [第2个] - 睡眠4秒
 [第3个] - 未完成的线程数量:3
 [第3个] - 5--->触发并阻塞...
 [第1个] - 未完成的线程数量:2
 [第1个] - 5--->触发并阻塞...
 [第2个] - 未完成的线程数量:1
 [第2个] - 5--->触发并阻塞...
 [第2个] - 阶段--->2
 [第2个] - 注册的线程数量--->3
 [第2个] - 未完成的线程数量:3
 [第3个] - 未完成的线程数量:3
 [第1个] - 未完成的线程数量:3
 [第2个] - 睡眠0秒
 [第3个] - 睡眠2秒
 [第2个] - 未完成的线程数量:3
 [第1个] - 睡眠2秒
 [第2个] - 4--->触发并阻塞...
 [第3个] - 未完成的线程数量:2
 [第1个] - 未完成的线程数量:2
 [第3个] - 4--->触发并阻塞...
 [第1个] - 4--->触发并阻塞...
 [第1个] - 阶段--->3
 [第1个] - 注册的线程数量--->3
 [第1个] - 未完成的线程数量:3
 [第3个] - 未完成的线程数量:3
 [第2个] - 未完成的线程数量:3
 [第1个] - 睡眠2秒
 [第3个] - 睡眠1秒
 [第2个] - 睡眠4秒
 [第3个] - 未完成的线程数量:3
 [第3个] - 3--->触发并阻塞...
 [第1个] - 未完成的线程数量:2
 [第1个] - 3--->触发并阻塞...
 [第2个] - 未完成的线程数量:1
 [第2个] - 3--->触发并阻塞...
 [第2个] - 阶段--->4
 [第2个] - 注册的线程数量--->3
 [第2个] - 未完成的线程数量:3
 [第3个] - 未完成的线程数量:3
 [第1个] - 未完成的线程数量:3
 [第2个] - 睡眠2秒
 [第1个] - 睡眠2秒
 [第3个] - 睡眠4秒
 [第2个] - 未完成的线程数量:3
 [第1个] - 未完成的线程数量:3
 [第2个] - 2--->触发并阻塞...
 [第1个] - 2--->触发并阻塞...
 [第3个] - 未完成的线程数量:1
 [第3个] - 2--->触发并阻塞...
 [第3个] - 阶段--->5
 [第3个] - 注册的线程数量--->3
 [第3个] - 未完成的线程数量:3
 [第1个] - 未完成的线程数量:3
 [第2个] - 未完成的线程数量:3
 [第3个] - 睡眠2秒
 [第1个] - 睡眠3秒
 [第2个] - 睡眠0秒
 [第2个] - 未完成的线程数量:3
 [第2个] - 最后一次触发,并注销自身
 [第2个] - 未完成的线程数量:2
 [第2个] - 注销完成之后注册的线程数量--->2
 [第3个] - 未完成的线程数量:2
 [第3个] - 最后一次触发,并注销自身
 [第3个] - 未完成的线程数量:1
 [第3个] - 注销完成之后注册的线程数量--->1
 [第1个] - 未完成的线程数量:1
 [第1个] - 最后一次触发,并注销自身
 [第1个] - 阶段--->6
 [第1个] - 注册的线程数量--->0
 [第1个] - 未完成的线程数量:0
 [第1个] - 注销完成之后注册的线程数量--->0

上面代码中,当所有线程进行到arriveAndAwaitAdvance()时会触发计数并且将线程阻塞,等计数数量等于注册线程数量【即所有线程都执行到了约定的地方时,会放行,是所有线程得以继续执行,并触发onAction事件】。我们可以在onAction中根据不同阶段执行不同内容的操作。

实现分阶段的CountDownLatch的功能

只需将上面的测试类更改如下:

package concurrent;

import concurrent.thread.PhaserThread;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.concurrent.Phaser;

import static jodd.util.ThreadUtil.sleep;

/**
 * 拿客
 * 网站:www.coderknock.com
 * QQ群:213732117
 * 三产 创建于 2016年08月08日 21:25:30。
 */
public class PhaserTest {

    private static final Logger logger = LogManager.getLogger(PhaserTest.class);

    public static void main(String[] args) {
        //这里其实相当于已经注册了3个线程,但是并没有实际的线程
        int coutNum=3;
        Phaser phaser = new Phaser(coutNum) {
            /**此方法有2个作用:
             * 1、当每一个阶段执行完毕,此方法会被自动调用,因此,重载此方法写入的代码会在每个阶段执行完毕时执行,相当于CyclicBarrier的barrierAction。
             * 2、当此方法返回true时,意味着Phaser被终止,因此可以巧妙的设置此方法的返回值来终止所有线程。例如:若此方法返回值为 phase>=3,其含义为当整个线程执行了4个阶段后,程序终止。
             * */
            @Override
            protected boolean onAdvance(int phase, int registeredParties) {
                logger.debug("阶段--->" + phase);
                logger.debug("注册的线程数量--->" + registeredParties);
                return registeredParties==coutNum;//当后只剩下coutNum个线程时说明所有真实的注册的线程已经运行完成,测试可以终止Phaser
            }
        };

        for (int i = 3; i > 0; i--) {
            new PhaserThread("第" + i + "个", phaser).start();
        }

        //当phaser未终止时循环注册这块儿可以使用实际的业务处理
        while (!phaser.isTerminated()) {
            sleep(1000);
            logger.debug("触发一次");
            phaser.arrive(); //相当于countDownLatch.countDown();
        }
    }
}

时间: 2024-10-12 22:22:31

Java多线程编程之同步器的相关文章

《Java多线程编程核心技术》推荐

写这篇博客主要是给猿友们推荐一本书<Java多线程编程核心技术>. 之所以要推荐它,主要因为这本书写得十分通俗易懂,以实例贯穿整本书,使得原本抽象的概念,理解起来不再抽象. 只要你有一点点Java基础,你就可以尝试去阅读它,相信定会收获甚大! 博主之前网上找了很久都没完整pdf电子版的,只有不全的试读版,这里博主提供免费.清晰.完整版供各位猿友下载: http://download.csdn.net/detail/u013142781/9452683 刚刚已经提到,<Java多线程编程核

Java多线程编程模式实战指南(二):Immutable Object模式--转载

本文由本人首次发布在infoq中文站上:http://www.infoq.com/cn/articles/java-multithreaded-programming-mode-immutable-object.转载请注明作者: 黄文海 出处:http://viscent.iteye.com. 多线程共享变量的情况下,为了保证数据一致性,往往需要对这些变量的访问进行加锁.而锁本身又会带来一些问题和开销.Immutable Object模式使得我们可以在不使用锁的情况下,既保证共享变量访问的线程安

java多线程编程

一.多线程的优缺点 多线程的优点: 1)资源利用率更好2)程序设计在某些情况下更简单3)程序响应更快 多线程的代价: 1)设计更复杂虽然有一些多线程应用程序比单线程的应用程序要简单,但其他的一般都更复杂.在多线程访问共享数据的时候,这部分代码需要特别的注意.线程之间的交互往往非常复杂.不正确的线程同步产生的错误非常难以被发现,并且重现以修复. 2)上下文切换的开销当CPU从执行一个线程切换到执行另外一个线程的时候,它需要先存储当前线程的本地的数据,程序指针等,然后载入另一个线程的本地数据,程序指

Java 多线程编程两个简单的例子

/** * @author gao */ package gao.org; public class RunnableDemo implements Runnable{ @Override public void run() { // TODO Auto-generated method stub for(int i=0;i<10;i++){ System.out.println("新线程输出:"+i); } } public static void main(String []

Java多线程编程基础之线程对象

在进入java平台的线程对象之前,基于基础篇(一)的一些问题,我先插入两个基本概念. [线程的并发与并行] 在单CPU系统中,系统调度在某一时刻只能让一个线程运行,虽然这种调试机制有多种形式(大多数是时间片轮巡为主),但无论如何,要通过不断切换需要运行的线程让其运行的方式就叫并发(concurrent).而在多CPU系统中,可以让两个以上的线程同时运行,这种可以同时让两个以上线程同时运行的方式叫做并行(parallel). 在上面包括以后的所有论述中,请各位朋友谅解,我无法用最准确的词语来定义储

拨开云雾见天日 —— Java多线程编程概念剖析

说到Java多线程编程,大多数人都会想到继承Thread或实现Runnable编程,new 一个Thread实例,调用start()方法,由OS调用即可.具体过程如下: public class MyThread extends Thread {     @Override     public void run() {         System.out.println("MyThread");     }     public static void main(String[] 

java多线程编程中实现Runnable接口方法相对于继承Thread方法的优势

 java多线程创建方法http://blog.csdn.net/cjc211322/article/details/24999163  java创建多线程方法之间的区别http://blog.csdn.net/cjc211322/article/details/25000449 java多线程编程中实现Runnable接口方法相对于继承Thread方法的优势

Java多线程编程(学习笔记)

一.说明 周末抽空重新学习了下多线程,为了方便以后查阅,写下学习笔记. 有效利用多线程的关键是理解程序是并发执行而不是串行执行的.例如:程序中有两个子系统需要并发执行,这时候需要利用多线程编程. 通过多线程的使用,可以编写出非常高效的程序.但如果创建了太多的线程,程序执行的效率反而会降低. 同时上下文的切换开销也很重要,如果创建太多的线程,CPU花费在上下文的切换时间将对于执行程序的时间. 二.Java多线程编程 概念 在学习多线程时,我们应该首先明白另外一个概念. 进程:是计算机中的程序关于某

Java多线程编程详解

线程的同步 由于同一进程的多个线程共享同一片存储空间,在带来方便的同时,也带来了访问冲突这个严重的问题.Java语言提供了专门机制以解决这种冲突,有效避免了同一个数据对象被多个线程同时访问. 由于我们可以通过 private 关键字来保证数据对象只能被方法访问,所以我们只需针对方法提出一套机制,这套机制就是 synchronized 关键字,它包括两种用法:synchronized 方法和 synchronized 块. 1. synchronized 方法:通过在方法声明中加入 synch