Java并发编程-移相器

移相器(Phaser)内有2个重要状态,分别是phase和party。
phase就是阶段,初值为0,当所有的线程执行完本轮任务,同时开始下一轮任务时,意味着当前阶段已结束,进入到下一阶段,phase的值自动加1。
party就是线程,party=4就意味着Phaser对象当前管理着4个线程。
Phaser还有一个重要的方法经常需要被重载,那就是boolean onAdvance(int phase, int registeredParties)方法。此方法有2个作用:
1、当每一个阶段执行完毕,此方法会被自动调用,因此,重载此方法写入的代码会在每个阶段执行完毕时执行,相当于CyclicBarrier的barrierAction。
2、当此方法返回true时,意味着Phaser被终止,因此可以巧妙的设置此方法的返回值来终止所有线程。
例如:若此方法返回值为 phase>=3,其含义为当整个线程执行了4个阶段后,程序终止。
Phaser把多个线程执行的任务划分成多个阶段(phase),编程时要明确各个阶段的任务,每个阶段都可以有任意个参与者,线程可以随时注册并参与到某个阶段,当一个阶段中所有线程都成功完成之后,Phaser的onAdvance()被调用,可以通过覆盖添加自定义处理逻辑(类似循环屏障(关卡)使用的Runnable接口),然后Phaser类会自动进入下个阶段。如此循环,直到Phaser不再包含任何参与者。
register(),bulkRegister(),动态添加一个或多个参与者。
arrive(),某个参与者完成任务后调用
arriveAndDeregister(),任务完成,取消自己的注册。
arriveAndAwaitAdvance(),自己完成等待其他参与者完成:进入阻塞,直到Phaser成功进入下个阶段。
awaitAdvance()、awaitAdvanceInterruptibly(),等待phaser进入下个阶段,参数为当前阶段的编号,后者可以设置超时和处理中断请求。
另外,Phaser的一个重要特征是多个Phaser可以组成树形结构,Phaser提供了构造方法来指定当前对象的父对象;当一个子对象参与者>0,会自动注册到父对象中;当=0,自动解除注册。

[java]
package org.suxuan;

import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicReferenceArray;

public class PhaserDemo {
    public static void main(String[] args) {
        final int workers = 2;
        final int workLength = 10;

        final Phaser phaser = new Phaser(workers + 1);
        final AtomicReferenceArray<String> lane1 = new AtomicReferenceArray<String>(new String[workLength]);
        final AtomicReferenceArray<String> lane2 = new AtomicReferenceArray<String>(new String[workLength]);

        new Thread("Producer 1") {
            @Override
            public void run() {
                for (int i = 0; i < workLength; i++) {
                    $sleep(20);

                    lane1.set(i, "lane1-answer-" + i);

                    System.out.printf("[%-17s] working in lane1 finished phase [%d]%n",
                            Thread.currentThread().getName(), phaser.getPhase());

                    phaser.arriveAndAwaitAdvance();
                }
            }
        }.start();

        new Thread("Slower producer 2") {
            @Override
            public void run() {
                for (int i = 0; i < workLength; i++) {
                    $sleep(40);

                    lane2.set(i, "lane2-answer-" + i);

                    System.out.printf("[%-17s] working in lane2 finished phase [%d]%n",
                            Thread.currentThread().getName(), phaser.getPhase());

                    phaser.arriveAndAwaitAdvance();
                }
            }
        }.start();

        new Thread("Slow consumer") {
            @Override
            public void run() {
                for (int start = 0; start < workLength; ) {
                    System.out.printf("[%-17s] about to wait for phase [%d] completion%n",
                            Thread.currentThread().getName(), start);

                    int phaseInProgress = phaser.awaitAdvance(start);

                    //Read all the way up to the most recent completed phases.
                    for (int i = start; i < phaseInProgress; i++) {
                        System.out.printf("[%-17s] read [%s] & [%s] from phase [%d]%n",
                                Thread.currentThread().getName(), lane1.get(i), lane2.get(i), i);
                    }

                    start = phaseInProgress;

                    $sleep(80);
                }
            }
        }.start();

        phaser.arriveAndDeregister();
    }

    private static void $sleep(long millis) {
        try {
            Thread.sleep(millis);
        }
        catch (InterruptedException e) {
        }
    }
}

[/java]

Phaser初始化为3,之后customer线程取消自己的注册(此时只有两个生产者线程间进行同步),awaitAdvance不会阻塞,它直接返回。
两个生产者线程,依次生成结果放置到AtomicReferenceArray中。消费者线程每睡眠80Ms后,从结果集中,把当前已经完成的结果打印出来。
某次运行结果如下:

[java]
[Slow consumer    ] about to wait for phase [0] completion
[Producer 1       ] working in lane1 finished phase [0]
[Slower producer 2] working in lane2 finished phase [0]
[Slow consumer    ] read [lane1-answer-0] & [lane2-answer-0] from phase [0]
[Producer 1       ] working in lane1 finished phase [1]
[Slower producer 2] working in lane2 finished phase [1]
[Producer 1       ] working in lane1 finished phase [2]
[Slow consumer    ] about to wait for phase [1] completion
[Slow consumer    ] read [lane1-answer-1] & [lane2-answer-1] from phase [1]
[Slower producer 2] working in lane2 finished phase [2]
[Producer 1       ] working in lane1 finished phase [3]
[Slower producer 2] working in lane2 finished phase [3]
[Producer 1       ] working in lane1 finished phase [4]
[Slower producer 2] working in lane2 finished phase [4]
[Producer 1       ] working in lane1 finished phase [5]
[Slow consumer    ] about to wait for phase [2] completion
[Slow consumer    ] read [lane1-answer-2] & [lane2-answer-2] from phase [2]
[Slow consumer    ] read [lane1-answer-3] & [lane2-answer-3] from phase [3]
[Slow consumer    ] read [lane1-answer-4] & [lane2-answer-4] from phase [4]
[Slower producer 2] working in lane2 finished phase [5]
[Producer 1       ] working in lane1 finished phase [6]
[Slower producer 2] working in lane2 finished phase [6]
[Slow consumer    ] about to wait for phase [5] completion
[Slow consumer    ] read [lane1-answer-5] & [lane2-answer-5] from phase [5]
[Slow consumer    ] read [lane1-answer-6] & [lane2-answer-6] from phase [6]
[Slower producer 2] working in lane2 finished phase [7]
[Producer 1       ] working in lane1 finished phase [7]
[Producer 1       ] working in lane1 finished phase [8]
[Slower producer 2] working in lane2 finished phase [8]
[Producer 1       ] working in lane1 finished phase [9]
[Slow consumer    ] about to wait for phase [7] completion
[Slow consumer    ] read [lane1-answer-7] & [lane2-answer-7] from phase [7]
[Slow consumer    ] read [lane1-answer-8] & [lane2-answer-8] from phase [8]
[Slower producer 2] working in lane2 finished phase [9]
[Slow consumer    ] about to wait for phase [9] completion
[Slow consumer    ] read [lane1-answer-9] & [lane2-answer-9] from phase [9]
[/java]
时间: 2024-10-07 23:25:29

Java并发编程-移相器的相关文章

Java并发编程-总纲

Java 原生支持并发,基本的底层同步包括:synchronized,用来标示一个方法(普通,静态)或者一个块需要同步执行(某一时刻,只允许一个线程在执行代码块).volatile,用来标识一个变量是共享变量(线程不缓存),更新和读取是原子的.wait,线程等待某一个Object上的事件(notify事件,线程挂起,释放锁),需要在synchronized区中执行.notify,事件发生后,通知事件,通知一个挂起的线程,需要在synchronized区中执行.notifyAll,事件发生后,通知

Java并发编程:Concurrent锁机制解析

.title { text-align: center } .todo { font-family: monospace; color: red } .done { color: green } .tag { background-color: #eee; font-family: monospace; padding: 2px; font-size: 80%; font-weight: normal } .timestamp { color: #bebebe } .timestamp-kwd

Java并发编程:Callable、Future和FutureTask(转)

Java并发编程:Callable.Future和FutureTask 在前面的文章中我们讲述了创建线程的2种方式,一种是直接继承Thread,另外一种就是实现Runnable接口. 这2种方式都有一个缺陷就是:在执行完任务之后无法获取执行结果. 如果需要获取执行结果,就必须通过共享变量或者使用线程通信的方式来达到效果,这样使用起来就比较麻烦. 而自从Java 1.5开始,就提供了Callable和Future,通过它们可以在任务执行完毕之后得到任务执行结果. 今天我们就来讨论一下Callabl

Java并发编程 Volatile关键字解析

volatile关键字的两层语义 一旦一个共享变量(类的成员变量.类的静态成员变量)被volatile修饰之后,那么就具备了两层语义: 1)保证了不同线程对这个变量进行操作时的可见性,即一个线程修改了某个变量的值,这新值对其他线程来说是立即可见的. 2)禁止进行指令重排序. 根据volatile的语义,我们可以看到,volatile主要针对的是并发三要素(原子性,可见性和有序性)中的后两者有实际优化作用. 可见性: 线程本身并不直接与主内存进行数据的交互,而是通过线程的工作内存来完成相应的操作.

Java并发编程

synchronized是Java中的关键字,在并发编程中被称为内置锁或者监视器锁.当用它来修饰一个方法或者一个代码块的时候能够保证同一时刻最多只有一个线程执行该段代码. Java的内置锁相当于一种互斥锁,最多只有一个线程能够持有这种锁,故而由这个锁保护的同步代码块会以原子方式执行,多个线程在执行该代码时就不会相互干扰. 但由于被锁保护的同步块代码是以串行形式来访问的,即多个线程以独占的方式访问对象,而这也导致如果被锁保护的同步代码块的作用范围过大,会导致并发不良. 这里有必要简单讲一下内置锁的

6、Java并发编程:volatile关键字解析

Java并发编程:volatile关键字解析 volatile这个关键字可能很多朋友都听说过,或许也都用过.在Java 5之前,它是一个备受争议的关键字,因为在程序中使用它往往会导致出人意料的结果.在Java 5之后,volatile关键字才得以重获生机. volatile关键字虽然从字面上理解起来比较简单,但是要用好不是一件容易的事情.由于volatile关键字是与Java的内存模型有关的,因此在讲述volatile关键之前,我们先来了解一下与内存模型相关的概念和知识,然后分析了volatil

7、Java并发编程:深入剖析ThreadLocal

Java并发编程:深入剖析ThreadLocal 想必很多朋友对ThreadLocal并不陌生,今天我们就来一起探讨下ThreadLocal的使用方法和实现原理.首先,本文先谈一下对ThreadLocal的理解,然后根据ThreadLocal类的源码分析了其实现原理和使用需要注意的地方,最后给出了两个应用场景. 以下是本文目录大纲: 一.对ThreadLocal的理解 二.深入解析ThreadLocal类 三.ThreadLocal的应用场景 若有不正之处请多多谅解,并欢迎批评指正. 请尊重作者

java并发编程实战学习(3)--基础构建模块

转自:java并发编程实战 5.3阻塞队列和生产者-消费者模式 BlockingQueue阻塞队列提供可阻塞的put和take方法,以及支持定时的offer和poll方法.如果队列已经满了,那么put方法将阻塞直到空间可用:如果队列为空,那么take方法将阻塞直到有元素可用.队列可以是有界的也可以是无界的. 如果生产者生成工作的速率比消费者处理工作的速率款,那么工作项会在队列中累计起来,最终好紧内存.同样,put方法的阻塞特性也极大地简化了生产者的编码.如果使用有界队列,当队列充满时,生产者将阻

Java 并发编程之任务取消(九)

Jvm关闭 jvm可正常关闭也可强行关闭,正常关闭有多种触发方式: 当最后一个正常(非守护,下面会讲到什么是守护线程)线程结束时 当调用system.exit时,或者通过其他特定于平台的方法关闭时(例如发送了SIGINT信号或键入Ctrl-c) 通过其他特定平台的方法关闭jvm,调用Runtime.halt或者在操作系统当中杀死JVM进程(例如发送sigkill)来强行关闭jvm. 关闭钩子 在正常关闭中,jvm首先调用所有已注册的关闭钩子,关闭钩子是指通过 Runtime.addShutdow