并发编程—2并发工具类

目录

  • 2.线程的工具类

    • 2.1 fork/join框架
    • 2.2 CountDownLatch
      • 一般用法
    • 2.3 CycliBarrier
    • 2.4 Semaphore
    • 2.5 Exchange
      • 使用举例
    • 2.6 Callable Future and FutureTask

2.线程的工具类

2.1 fork/join框架

### 什么是分而治之
    简单地说把一个大的问题,拆分成若干个子问题,每个问题相互独立,且和原来问题形式相同。最后将每个子问题的解合并得到原问题的解答。
### 什么是工作密取
### 举例
 带参数继承RecursiveTask<V> 
/**
 * fork/join 使用 情况1:带返回值,这时候要继承RecursiveTask<V>
 * 情况2:不带返回值,这时候,要继承RecursiveAction
 *
 * @author 45027056
 *
 */
public class SumArray2 extends RecursiveTask<Integer> {
    private int beginIndex;
    private int endIndex;
    private int[] src;
    //将这个计数任务拆分成10个子任务
    private final static int THRESHOLD = MakeArray.ARRAY_LENGTH / 10;

    public static void main(String[] args) {
        ForkJoinPool pool = new ForkJoinPool();
        int[] makeArray = MakeArray.makeArray();
        SumArray2 task = new SumArray2(0, makeArray.length - 1, makeArray);
        long start = System.currentTimeMillis();
        pool.invoke(task);// 同步调用
        System.out.println("Task is Running.....");
        System.out.println("The count is " + task.join() + " spend time:" + (System.currentTimeMillis() - start) + "ms");

    }

    //构造方法指定参数
    public SumArray2(int begin, int end, int[] makeArray) {
        this.beginIndex = begin;
        this.endIndex = end;
        this.src = makeArray;
    }

    @Override
    protected Integer compute() {
        if (endIndex - beginIndex <= THRESHOLD) {
            int sum = 0;
            for (int i : src) {
                sum = sum + i;
            }
            return sum;
        } else {
            int mid = (endIndex + beginIndex) / 2;
            // 创建任务1
            SumArray2 task1 = new SumArray2(beginIndex, mid, src);
            // 创建任务2
            SumArray2 task2 = new SumArray2(mid + 1, endIndex, src);
            // 调用2个子任务
            invokeAll(task1, task2);
            // 合并结果返回给主任务
            return task1.join() + task2.join();
        }
    }

}
### 举例2  不带参数继承RecursiveAction
    

2.2 CountDownLatch

> 一组线程等待另外一组线程执行完后再执行。通过CountDownLatch的构造函数指定条件,coutDownLatch.wait()阻塞一个或者一组线程,当coutDownLatch.countDown()减至0时,被阻塞的线程才可以运行。

一般用法

    /**
 *
 * @author 45027056
 * 演示countDownLatch用法
 * 类说明:演示CountDownLatch,有5个初始化的线程,5个扣除点,
 * 扣除完毕以后,主线程和业务线程才能继续自己的工作
 */
public class UsCountDownLatch2 {
    //1。CountDownLatch构造函数必须指定一个整数N作为入参,且N必须大于0。
    //每次latch.countDown()方法,会扣减1,直到N为0,其他latch.await()的线程才会继续运行。
    private static CountDownLatch latch = new CountDownLatch(5);
    public static void main(String[] args) {
        UsCountDownLatch2 us = new UsCountDownLatch2();
        us.startInit();
        us.startBusiThread();
        try {
            //一直阻塞直到CountDownLatch(0)
            latch.await();
            System.out.println("Main Thread is doing itis working ");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    //初始化线程
    public void startInit(){
        for(int i=0; i < 5; i++){
            new Thread(){
                @Override
                public void run() {
                    System.out.println("init thread working");
                    latch.countDown();
                }
            }.start();

        }
    }

    //业务线程
    public void startBusiThread(){
        new Thread(){
            @Override
            public void run() {
                try {
                    latch.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("startBusiThread is doing its working");

            }
        }.start();
    }
}
    

2.3 CycliBarrier

> 和CountDown不同,CycliBarrier是一个同步工具,用于控制一组线程,当且进当所有这组线程本身到达了栅栏点,await()的线程才可以继续运行下去(A synchronization aid that allows a set of threads to all wait for
> 常用方法: await()/getNumberWaiting()
### 使用例子
        /**
 * 演示如何使用UseCyclicBarrier
 *      1.CyclicBarrier可以指定当线程都到到达栅栏点的时候,执行自定义的线程。通过CyclicBarrier构造函数的第二个入参指定。
 *      2。注意:先执行CyclicBarrier指定的BarrierActionThread,再执行其他await()线程
 * @author 45027056
 *
 */
public class UseCyclicBarrier2 {
    CyclicBarrier barrier = new CyclicBarrier(5,new BarrierActionThread());
    private ConcurrentHashMap<String,Long> resultMap = new ConcurrentHashMap<String,Long>();
    public static void main(String[] args) {
        UseCyclicBarrier2 useBarrier = new UseCyclicBarrier2();
        for(int i=0; i < 6; i++){
            useBarrier.new SubThread().start();
        }
        //主线程sleep 2秒
        SleepTools.second(2);
        System.out.println("Main thread is end..");
    }
    //栅栏点都到达后执行的线程
    class BarrierActionThread implements Runnable{
        @Override
        public void run() {
            System.out.println("BarrierActionThread is running ...");
        }
    }

    //栅栏子线程
    class SubThread extends Thread{
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + "is await...");
            try {
                barrier.await();
                resultMap.put(Thread.currentThread().getName(), Thread.currentThread().getId());
                System.out.println(Thread.currentThread().getName() + "is free...and doing its wrok...");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }

        }
    }
}

2.4 Semaphore

> 一个一般用来限流的并发工具类
### 使用用例
    /**
 * 演示Semaphore使用实现链接池
 * 可以理解Semaphore是用来限流的工具。控制某种资源(连接数)保持在某个范围内。
 * @author 45027056
 *
 */
public class DBPoolSemaphore2 {
    LinkedList<Connection> pools = new LinkedList<Connection>();
    Semaphore idle = new Semaphore(10);//空闲信号量
    Semaphore inuse = new Semaphore(0);//在用信号量
    public DBPoolSemaphore2(int poolSize){
        if(poolSize > 0){
            for(int i=0; i < poolSize; i++){
                pools.addFirst(SqlConnectImpl.fetchConnection());
            }
        } else {
            System.out.println("poolSize must greater than 0");
        }
    }

    public void returnConnect(Connection connection) throws InterruptedException {
        if(null != connection){
            //阻塞直到从inuse中获取到信号量。
            inuse.acquire();
            synchronized (pools) {
                pools.addFirst(connection);
                pools.notifyAll();
            }
            //释放一个信号量返回idle
            idle.release();
        }
    }

    public Connection takeConnect() throws InterruptedException {
        //阻塞直到从idle中获取到信号量。
        idle.acquire();
        Connection conn;
        synchronized (pools) {
            conn = pools.removeFirst();
        }
        //释放一个信号量返回inuse
        inuse.release();
        return conn;
    }
}
    

2.5 Exchange

> 用于2个线程交换数据.注意只局限2个线程之间交换数据。

使用举例

    public class UseExchange {
    private static final Exchanger<Set<String>> exchange
        = new Exchanger<Set<String>>();
    public static void main(String[] args) {
        //第一个线程
        new Thread(new Runnable() {
            @Override
            public void run() {
                Set<String> setA = new HashSet<String>();//存放数据的容器
                try {
                    /*添加数据
                     * set.add(.....)
                     * */
                    setA = exchange.exchange(setA);//交换set
                    /*处理交换后的数据*/
                } catch (InterruptedException e) {
                }
            }
        }).start();

      //第二个线程
        new Thread(new Runnable() {
            @Override
            public void run() {
                Set<String> setB = new HashSet<String>();//存放数据的容器
                try {
                    /*添加数据
                     * set.add(.....)
                     * set.add(.....)
                     * */
                    setB = exchange.exchange(setB);//交换set
                    /*处理交换后的数据*/
                } catch (InterruptedException e) {
                }
            }
        }).start();

    }
}

2.6 Callable Future and FutureTask

/**
 * Callable FutureTask的用法
 * 1.Callable和Runnable接口的区别
 *      + Callable有返回值,Runnable没有返回值
 *      + Callable 可以抛出异常,Runnable不可以。
 * @author 45027056
 *
 */
public class UserFuture2 implements Callable<Integer> {

    @Override
    public Integer call() throws Exception {
        System.out.println("user Callable begin");
        int sum = 0;
        for(int i=0;i < 5000;i++){
            sum = sum + i;
        }
        try {
            Thread.currentThread().sleep(2000);
        } catch (InterruptedException e) {
            System.out.println("have InterruptedException..");
        }

        System.out.println("user Callable end");
        return sum;
    }

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        UserFuture2 ueseFuture = new UserFuture2();
        //FutureTask 本质是一个Runnable,用来包装Callable,然后投放到线程中去执行。
        FutureTask<Integer> futureTask = new FutureTask(ueseFuture);
        new Thread(futureTask).start();
        SleepTools.second(1);
        Random random = new Random();
        //随机获得结果和中断。
        if(random.nextBoolean()){
            int sum = futureTask.get();
            System.out.println("the callable result sum is" + sum);
        } else {
            //如果这里调用能够cancel会中断call方法里面的Thread.currentThread.sleep(),这里会抛出一个InterruptedException
            futureTask.cancel(true);
        }
    }
}

原文地址:https://www.cnblogs.com/codetree/p/10884158.html

时间: 2024-10-05 23:25:18

并发编程—2并发工具类的相关文章

JAVA 并发编程-线程同步工具类(十二)

本文主要介绍一些java线程同步工具类,并不进行具体讲解,当有需要时,可以再去结合实例学习. 信号灯(Semaphore) 应用场景举例: 例如公司的打卡系统,如果有一个打卡机,那么一次就只能有一个人打卡,其余的人就被阻塞住,打卡完以后就可由下一个人打卡.如果有3个打卡机,那么一次就允许3个人或者少于三个人打卡,其余的人就得等待打卡机空闲下来才能继续打卡. 结果: 已进入1个线程,还可进入2个 已进入2个线程,还可进入1个 已进入3个线程,还可进入0个 空余出1个 已进入4个线程,还可进入0个

java并发编程之五、工具类

java在线程同步和互斥方面在语言和工具方面都提供了相应的支撑,与此同时,java还提供了一系列的并发容器和原子类,来使得并发编程更容易. 一.并发容器 (一).同步容器 同步容器指的是容器本身使用synchronized关键字来同步访问,包括我们都知道的HashTable,也包括Vector和Stack.另外,也可以通过工具类Collections.synchronizedList(List<T> list)这个方法将线程不安全的ArrayList转成线程安全的包装类,其他的set,map等

Java并发编程:并发容器ConcurrentHashMap

Java并发编程:并发容器之ConcurrentHashMap(转载) 下面这部分内容转载自: http://www.haogongju.net/art/2350374 JDK5中添加了新的concurrent包,相对同步容器而言,并发容器通过一些机制改进了并发性能.因为同步容器将所有对容器状态的访问都 串行化了,这样保证了线程的安全性,所以这种方法的代价就是严重降低了并发性,当多个线程竞争容器时,吞吐量严重降低.因此Java5.0开 始针对多线程并发访问设计,提供了并发性能较好的并发容器,引入

Java并发编程:并发容器之ConcurrentHashMap(转)

本文转自:http://www.cnblogs.com/dolphin0520/p/3932905.html Java并发编程:并发容器之ConcurrentHashMap(转载) 下面这部分内容转载自: http://www.haogongju.net/art/2350374 JDK5中添加了新的concurrent包,相对同步容器而言,并发容器通过一些机制改进了并发性能.因为同步容器将所有对容器状态的访问都 串行化了,这样保证了线程的安全性,所以这种方法的代价就是严重降低了并发性,当多个线程

Java并发编程:并发容器之CopyOnWriteArrayList(转)

本文转自:http://www.cnblogs.com/dolphin0520/p/3938914.html Java并发编程:并发容器之CopyOnWriteArrayList(转载) 原文链接: http://ifeve.com/java-copy-on-write/ Copy-On-Write简称COW,是一种用于程序设计中的优化策略.其基本思路是,从一开始大家都在共享同一个内容,当某个人想要修改这个内容的时候,才会真正把内容Copy出去形成一个新的内容然后再改,这是一种延时懒惰策略.从J

【转】Java并发编程:并发容器之ConcurrentHashMap

JDK5中添加了新的concurrent包,相对同步容器而言,并发容器通过一些机制改进了并发性能.因为同步容器将所有对容器状态的访问都串行化了,这样保证了线程的安全性,所以这种方法的代价就是严重降低了并发性,当多个线程竞争容器时,吞吐量严重降低.因此Java5.0开始针对多线程并发访问设计,提供了并发性能较好的并发容器,引入了java.util.concurrent包.与Vector和Hashtable.Collections.synchronizedXxx()同步容器等相比,util.conc

【Java并发编程】并发编程大合集-值得收藏

http://blog.csdn.net/ns_code/article/details/17539599这个博主的关于java并发编程系列很不错,值得收藏. 为了方便各位网友学习以及方便自己复习之用,将Java并发编程系列内容系列内容按照由浅入深的学习顺序总结如下,点击相应的标题即可跳转到对应的文章    [Java并发编程]实现多线程的两种方法    [Java并发编程]线程的中断    [Java并发编程]正确挂起.恢复.终止线程    [Java并发编程]守护线程和线程阻塞    [Ja

【Java并发编程】并发编程大合集

转载自:http://blog.csdn.net/ns_code/article/details/17539599 为了方便各位网友学习以及方便自己复习之用,将Java并发编程系列内容系列内容按照由浅入深的学习顺序总结如下,点击相应的标题即可跳转到对应的文章    [Java并发编程]实现多线程的两种方法    [Java并发编程]线程的中断    [Java并发编程]正确挂起.恢复.终止线程    [Java并发编程]守护线程和线程阻塞    [Java并发编程]Volatile关键字(上)

ubuntu16.04编程软件之工具类

zsh 安装zsh + zsh 兼容bash + zsh 官网:Zsh + 查看CentOS已安装的shell cat /etc/shells 正常结果应该是这样的: /bin/sh /bin/bash /sbin/nologin /bin/dash /bin/tcsh /bin/csh + 查看当前的shell echo $SHELL 更新软件源 sudo apt-get install update 安装zsh sudo apt-get install zsh 安装oh-my-zsh 保证已

Java线程与并发编程实践----并发工具类与Executor框架

java5之前,我们使用诸如synchronized,wait(),notify()方法对线程的操作属于对 底层线程的操作,这样会出现很多的问题: 低级的并发原语,比如synchronized,wait(),notify()经常难以正确使用.误用会导致 竞态条件,线程饿死,死锁等风险. 泰国依赖synchronized会影响程序性能以及程序的可扩展性 开发者经常需要高级线程结构,如线程池,信号量.java对底层线程的操作不包含这些结. 为解决这些问题,java5引入并发工具类,该工具类主要有下面