Java笔记:并发工具

一、基础知识

并发工具定义了一些核心特征,用于以其他方式实现同步和线程间通信。

  • 同步器:提供了同步多线程间交互的高级方法。
  • 执行器:管理线程的执行。
  • 并发集合:提供了由集合框架定义的相关类的并发替代版本。
  • Fork/Join框架:支持并行编程。

二、同步对象使用

Semaphore实现了经典的信号量,信号量通过计数器控制对共享资源的访问。如果计数器大于0则允许访问,如果计数器为0则拒绝访问。希望获得共享资源的线程尝试获得许可证,若允许访问则线程可得到许可证,若不允许访问则线程阻塞直至得到许可证为止。

import java.util.concurrent.Semaphore;

class Solution {
    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(1);
        Integer i = 0;
        new IncThread(semaphore);
        new DecThread(semaphore);
    }
}

class Shared {
    static int integer = 0;
}

class IncThread implements Runnable {
    private Semaphore semaphore;

    IncThread(Semaphore semaphore) {
        this.semaphore = semaphore;
        new Thread(this).start();
    }

    @Override
    public void run() {
        try {
            semaphore.acquire();//阻塞直至得到许可
            for (int i = 0; i < 5; i++) {
                Shared.integer++;
                System.out.println(Shared.integer);
                Thread.sleep(500);
            }
        } catch (InterruptedException exc) {
            System.out.println(exc.getMessage());
        }
        semaphore.release();//释放许可
    }
}

class DecThread implements Runnable {
    private Semaphore semaphore;

    DecThread(Semaphore semaphore) {
        this.semaphore = semaphore;
        new Thread(this).start();
    }

    @Override
    public void run() {
        try {
            semaphore.acquire();
            for (int i = 0; i < 5; i++) {
                Shared.integer--;
                System.out.println(Shared.integer);
                Thread.sleep(500);
            }
        } catch (InterruptedException exc) {
            System.out.println(exc.getMessage());
        }
        semaphore.release();
    }
}

如果希望线程进行等待,直到发生一个或多个事件为止。CountDownLatch可用于处理这类情况,计数器必须从指定事件数量归零,锁存器才会被释放。

import java.util.concurrent.CountDownLatch;

class Solution {
    public static void main(String[] args) {
        CountDownLatch cdl = new CountDownLatch(5);

        new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                System.out.println(i);
                cdl.countDown();
            }
        }).start();

        try {
            cdl.await();//暂停主线程直至计数器递减5次
        } catch (InterruptedException e) {
            System.out.println(e.getMessage());
        }

        System.out.println("Hello");
    }
}

如果由多个线程组成的线程组必须在某处进行等待,直到线程组中所有线程都到达执行点。CyclicBarrier可用于处理这类情况,同步对象会被挂起直至指定数量的线程都到达预定位置为止。

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

class Solution {
    public static void main(String[] args) {
        CyclicBarrier barrier = new CyclicBarrier(10, () -> System.out.println("Hello"));
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                try {
                    int t = (int) (Math.random() * 5000);
                    Thread.sleep(t);
                    System.out.println(t);
                    barrier.await();//指定数量的线程调用await后释放被挂起的线程
                } catch (InterruptedException | BrokenBarrierException e) {
                    System.out.println(e.getMessage());
                }
            }).start();
        }
    }
}

Exchanger用于简化两个线程之间的数据交换。简单的进行等待,直到两个独立的线程均调用exchange方法为止,之后交换线程所提供的数据。

import java.util.Arrays;
import java.util.concurrent.Exchanger;

class Solution {
    public static void main(String[] args) {
        Exchanger<int[]> arrayExchanger = new Exchanger<>();
        Runnable consumer = () -> {
            try {
                for (int i = 0; i < 3; i++) {
                    int[] arr = arrayExchanger.exchange(new int[5]);
                    System.out.println(Arrays.toString(arr));
                }
            } catch (InterruptedException e) {
                System.out.println(e.getMessage());
            }
        };

        Runnable producer = () -> {
            int[] arr = new int[5];
            try {
                for (int i = 0; i < 15; i++) {
                    arr[i % 5] = i;
                    if ((i + 1) % 5 == 0)
                        arr = arrayExchanger.exchange(arr);
                }
            } catch (InterruptedException e) {
                System.out.println(e.getMessage());
            }
        };

        new Thread(consumer).start();
        new Thread(producer).start();
    }
}

Phaser允许多个线程进行同步。Phaser对象会等待所有party(等价于线程)完成当前阶段后才会进入下阶段,通过调用arrive方法或其变体通知当前阶段完成。

import java.util.concurrent.Phaser;

class MyPhaser extends Phaser {
    private int total;

    MyPhaser(int parties, int total) {
        super(parties);
        this.total = total - 1;
    }

    @Override
    protected boolean onAdvance(int phase, int registeredParties) {
        System.out.println("Phase " + phase + " completed");
        return phase == total || registeredParties == 0;//返回true则结束Phaser
    }
}

class Solution {
    public static void main(String[] args) {
        Phaser phaser = new MyPhaser(0, 3);
        Runnable runnable = () -> {
            phaser.register();
            while (!phaser.isTerminated()) {
                System.out.println("Phase " + phaser.getPhase() + " started");
                phaser.arriveAndAwaitAdvance();
            }
        };

        for (int i = 0; i < 3; i++)
            new Thread(runnable).start();
    }
}

三、执行器

执行器用于启动并控制线程的执行,因此执行器为通过Thread管理线程提供了一种代替方案。执行器的核心是Executor接口,ExecutorService接口扩展了Executor接口。

线程池提供了用于执行各种任务的一组线程,每个任务不是使用独立的线程而是线程池中的线程。线程池减轻了创建许多独立线程所带来的负担,通过少量线程执行大量任务。

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

class Solution {
    public static void main(String[] args) {
        CountDownLatch[] cdls = new CountDownLatch[5];
        for (int i = 0; i < cdls.length; i++)
            cdls[i] = new CountDownLatch(5);
        ExecutorService pool = Executors.newFixedThreadPool(3);//创建线程池

        for (CountDownLatch cdl : cdls)
            pool.execute(() -> {
                for (int j = 0; j < 5; j++) {
                    System.out.println(j);
                    cdl.countDown();
                }
            });

        try {
            for (CountDownLatch cdl : cdls)
                cdl.await();
        } catch (InterruptedException e) {
            System.out.println(e.getMessage());
        }
        pool.shutdown();//关闭线程池
    }
}

泛型接口Callable表示返回值的线程。应用程序可以使用Callable对象计算结果后,将结果返回给调用线程。call方法定义希望执行的任务,在任务完成后返回结果。Callable任务通过调用ExecutorService对象的submit方法执行。

泛型接口Future表示将由Callable对象返回的值。

import java.util.concurrent.*;

class Add implements Callable<Integer> {
    private int[] arr;

    Add(int... arr) {
        this.arr = arr;
    }

    @Override
    public Integer call() {
        int sum = 0;
        for (int i : arr)
            sum += i;
        return sum;
    }
}

class Solution {
    public static void main(String[] args) {
        ExecutorService pool = Executors.newFixedThreadPool(1);
        Future<Integer> future = pool.submit(new Add(1, 2, 3));
        try {
            System.out.println(future.get());
        } catch (InterruptedException | ExecutionException e) {
            System.out.println(e.getMessage());
        }
        pool.shutdown();
    }
}

四、时间

TimeUnit枚举用于指定时间单位。

import java.util.concurrent.TimeUnit;

class Solution {
    public static void main(String[] args) {
        try {
            TimeUnit.SECONDS.sleep(3);//暂停3秒
            System.out.println(TimeUnit.SECONDS.toMillis(1));//单位转换
        } catch (InterruptedException e) {
            System.out.println(e.getMessage());
        }
    }
}

五、锁

锁为使用synchronized控制共享资源的访问提供了替代技术。在访问共享资源自谦,申请用于保护资源的锁,当资源访问完成后释放锁。当某线程正在使用锁时,其他尝试申请锁的线程会被挂起。

所有锁都实现了Lock接口,申请锁时调用lock方法,如果不可得就会进行等待。如果不希望进行等待就使用tryLock方法。使用newCondition方法获取与锁关联的Condition对象之后可通过await或signal等方法控制锁,类似于wait和notify方法。

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

class Shared {
    static int data = 0;
}

class Solution {
    public static void main(String[] args) {
        ReentrantLock lock = new ReentrantLock();
        Runnable runnable = () -> {
            lock.lock();//申请锁
            Shared.data++;
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException exc) {
                System.out.println(exc.getMessage());
            } finally {
                System.out.println(Shared.data);
                lock.unlock();//释放锁
            }
        };

        for (int i = 0; i < 10; i++)
            new Thread(runnable).start();
    }
}

六、原子操作

当读写某些类型的变量时,原子操作提供了一种不可中断的方案。这意味着不再需要锁以及其他同步机制。

import java.util.concurrent.atomic.AtomicInteger;

class Shared {
    static AtomicInteger integer = new AtomicInteger(0);
}

class Solution {
    public static void main(String[] args) {
        Runnable runnable = () -> {
            for (int i = 0; i < 3; i++)
                System.out.println(Shared.integer.getAndAdd(1));
        };

        for (int i = 0; i < 5; i++)
            new Thread(runnable).start();
    }
}

七、并行编程

Fork/Join框架通过简化多线程的创建使用和自动使用多处理器,增强了多线程编程。

ForkJoinTask用于定义能够被ForkJoinPool管理的任务,泛型参数指定了任务结果的类型。fork方法为调用任务的异步执行提交任务,join方法等待调用该方法的任务中止,invoke和invokeAll方法则将fork和join合并到单个调用中。

RecursiveAction和RecursiveTask均为ForkJoinTask的子类。前者用于封装不返回结果的任务,后者用于封装返回结果的任务。

ForkJoinPool用于管理ForkJoinTask。若调用需等待的任务则使用线程池的invoke方法(同步),若不需要等待任务完成则调用execute方法(异步)。

import java.util.Arrays;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;

class Sqrt extends RecursiveAction {//分治策略
    double[] arr;
    int front, rear;

    Sqrt(double[] arr, int front, int rear) {
        this.arr = arr;
        this.front = front;
        this.rear = rear;
    }

    @Override
    protected void compute() {
        if (rear - front < 100) {
            for (int i = front; i <= rear; i++)
                arr[i] = Math.sqrt(arr[i]);
        } else {
            int middle = (front + rear) / 2;
            invokeAll(new Sqrt(arr, front, middle), new Sqrt(arr, middle + 1, rear));
        }
    }
}

class Solution {
    public static void main(String[] args) {
        ForkJoinPool pool = new ForkJoinPool();
        double[] arr = new double[10000];
        for (int i = 0; i < arr.length; i++)
            arr[i] = (double) i;
        System.out.println(Arrays.toString(arr));

        Sqrt sqrt = new Sqrt(arr, 0, arr.length - 1);
        pool.invoke(sqrt);
        System.out.println(Arrays.toString(arr));
    }
}

若没有显示声明ForkJoinPool,则会自动使用公共池,使用commonPool可获取公共池的引用。

import java.util.Arrays;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

class Sum extends RecursiveTask<Integer> {
    int[] arr;
    int front, rear;

    Sum(int[] arr, int front, int rear) {
        this.arr = arr;
        this.front = front;
        this.rear = rear;
    }

    @Override
    protected Integer compute() {
        int sum = 0;
        if (rear - front < 100) {
            for (int i = front; i <= rear; i++)
                sum += arr[i];
        } else {
            int middle = (front + rear) / 2;
            Sum taskA = new Sum(arr, front, middle);
            Sum taskB = new Sum(arr, middle + 1, rear);
            taskA.fork();
            taskB.fork();
            sum = taskA.join() + taskB.join();
        }
        return sum;
    }
}

class Solution {
    public static void main(String[] args) {
        ForkJoinPool pool = ForkJoinPool.commonPool();
        int[] arr = new int[10000];
        for (int i = 0; i < arr.length; i++)
            arr[i] = i;
        System.out.println(pool.invoke(new Sum(arr, 0, arr.length - 1)));
    }
}

原文地址:https://www.cnblogs.com/arseneyao/p/8521675.html

时间: 2024-10-07 19:14:21

Java笔记:并发工具的相关文章

Java学习笔记--并发工具Semaphore,CountDownLatch,CyclicBarrier,Exchanger

Semaphore 实现典型的信号量 CountDownLatch 在指定数量的事件发生前一直等待 CyclicBarrier 使一组线程在一个预定义的执行点等待 Exchanger 交换两个线程的数据 1. Semaphore 信号量(Semaphore),是在多线程环境下使用的一种设施, 它负责协调各个线程, 以保证它们能够正确.合理的使用公共资源 在java中,还可以设置该信号量是否采用公平模式,如果以公平方式执行,则线程将会按到达的顺序(FIFO)执行,如果是非公平,则可以后请求的有可能

java同步并发工具类CountDownLatch、CyclicBarrier和Semaphore

闭锁CountDownLatch 闭锁是一种同步工具类,可以延迟线程的进度直到其到达终止状态.闭锁的作用相当于一扇门:在闭锁到达结束状态之前,这扇门一直是关闭的,并且没有任何线程能通过,当到达结束状态时,这扇门会打开并允许所有的线程通过.当闭锁到达结束状态后,将不会再改变状态,因此这扇门将永远保持打开状态.闭锁可以用来确保某些活动直到其他活动都完成后才继续执行,例如: 确保某个计算在其需要的所有资源都被初始化之后才继续执行.二元闭锁(包括两个状态)可以用来表示"资源R已经被初始化",而

Java学习笔记—多线程(并发工具类,java.util.concurrent.atomic包)

在JDK的并发包里提供了几个非常有用的并发工具类.CountDownLatch.CyclicBarrier和Semaphore工具类提供了一种并发流程控制的手段,Exchanger工具类则提供了在线程间交换数据的一种手段.本章会配合一些应用场景来介绍如何使用这些工具类. CountDownLatch CountDownLatch允许一个或多个线程等待其他线程完成操作.假如有这样一个需求:我们需要解析一个Excel里多个sheet的数据,此时可以考虑使用多线程,每个线程解析一个sheet里的数据,

《Java虚拟机并发编程》学习笔记

对<Java虚拟机并发编程>这本书真的是相见恨晚.以前对并发编程只是懂个皮毛,这本书让我对并发编程有了一个全新的认识.所以把书上的知识点做下笔记,以便以后复习使用. 并发与并行 仔细说来,并发和并行是两个不同的概念.但随着多核处理器的普及,并发程序的不同的线程往往会被编译器分配到不同处理器核心上,所以说并发编程与并行对于上层程序员来说是一样的. 并发的风险 饥饿 当一个线程等待某个需要运行时间很长或者永远无法完成的时间发发生,那么这个线程就会陷入饥饿状态.通常饥饿也会被叫做活锁. 解决饥饿的方

Effective Java 阅读笔记——并发

66:同步访问共享的可变数据 synchronized:1互斥,阻止线程看到的对象处于不一致的状态:2保证线程在进入同步区时能看到变量的被各个线程的所有修改 Java中,除了long或者double,“读”或者“写”一个变量是原子的.注意:是读或者写单个动作是源自的,而不是读写这两个动作整体是原子的. 由于虚拟机会对代码进行优化,所以可能会导致一些错误:可能你想的是在另一线程中改变done的值来终止while循环,但是优化之后却无法做到这样.要避免这样的优化错误,就必须对done同步. //优化

Java并发工具类 - CountDownLatch

Java并发工具类 - CountDownLatch 1.简介 CountDownLatch是Java1.5之后引入的Java并发工具类,放在java.util.concurrent包下面 http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/package-summary.html 官方API. CountDownLatch能够使一个或多个线程等待其他线程完成各自的工作后再执行:CountDownLatch是JDK 5+里面

Java并发工具类CyclicBarrier

CyclicBarrier同步屏障 java并发工具类中有一个叫做CyclicBarrier的类,与CountDownLatch类似,都可以实现线程间的同步,但是差别是CyclicBarrier是可重置的同步屏障. 想象一个场景,有N个人不同时间走到一扇门,因为门需要N个人合力才能推开,所以人不足N个时,只能阻塞在此,等到N个人都到了之后,可以推开门,继续进行之前的工作.CyclicBarrier就是这扇门. 看看下面的代码,定义了一个线程数为2的,CyclicBarrier,并在主线程和另外一

11.9-全栈Java笔记: 线程并发协作(生产者/消费者模式)

多线程环境下,我们经常需要多个线程的并发和协作.这个时候,就需要了解一个重要的多线程并发协作模型"生产者消费者模式". 什么是生产者? 生产者指的是负责生产数据的模块(这里模块可能是:方法.对象.线程.进程). 什么是消费者? 消费者指的是负责处理数据的模块(这里模块可能是:方法.对象.线程.进程). 什么是缓冲区? 消费者不能直接使用生产者的数据,它们之间有个"缓冲区".生产者将生产好的数据放入"缓冲区",消费者从"缓冲区"

9.9-全栈Java笔记:遍历集合的N种方式总结&Collections工具类

遍历集合的N种方式总结 [示例1]遍历List方法1,使用普通for循环 for(int i=0;i<list.size();i++){         //list为集合的对象名 String temp = (String)list.get(i); System.out.println(temp); } [示例2]遍历List方法2,使用增强for循环(使用泛型定义类型!) for (String   temp : list) { System.out.println(temp); } [示例