juc下的并发工具类和线程池

工具类

CountDownLatch

利用它可以实现类似计数器的功能。比如有一个任务A,它要等待其他4个任务执行完毕之后才能执行,此时就可以利用CountDownLatch来实现这种功能了。

package com.yjc.juc;

import java.util.concurrent.CountDownLatch;

public class CountDownLatchDemo {
    public static void main(String[] args) throws InterruptedException {
        System.out.println("主线程启动---->等待子线程执行完毕");
        //代表等待两个线程执行完主线程才继续执行
        CountDownLatch countDownLatch=new CountDownLatch(2);
        new Thread(() ->
        {
            System.out.println("第一个子线程" + Thread.currentThread().getName() + "正在执行");
            countDownLatch.countDown();
            System.out.println("第一个子线程" + Thread.currentThread().getName() + "执行完毕");
        }).start();
        new Thread(() ->
        {
            System.out.println("第二个子线程" + Thread.currentThread().getName() + "正在执行");
            countDownLatch.countDown();
            System.out.println("第二个子线程" + Thread.currentThread().getName() + "执行完毕");
        }).start();
        //使主线程进入等待状态
        countDownLatch.await();
        System.out.println("子线程执行完毕,主线程开始执行");

    }
}

执行结果

CyclicBarrier

CyclicBarrier初始化时规定一个数目,然后计算调用了CyclicBarrier.await()进入等待的线程数。当线程数达到了这个数目时,所有进入等待状态的线程被唤醒并继续。

CyclicBarrier就象它名字的意思一样,可看成是个障碍, 所有的线程必须到齐后才能一起通过这个障碍。

CyclicBarrier初始时还可带一个Runnable的参数, 此Runnable任务在CyclicBarrier的数目达到后,所有其它线程被唤醒前被执行。

package com.yjc.juc;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierDemo  {
   static  CyclicBarrier cyclicBarrier=new CyclicBarrier(7, ()-> {
       System.out.println("召唤神龙!");
   });

    public static void main(String[] args){

        for (int i = 1; i <=7 ; i++) {
            final int count=i;
            new Thread(()->{
                try {
                    System.out.println(Thread.currentThread().getName()+":获得第"+count+"龙珠");
                    cyclicBarrier.await();
                    System.out.println(Thread.currentThread().getName()+"开始召唤神龙");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }).start();
        }

    }
}

执行结果

Semaphore

Semaphore是一种基于计数的信号量。它可以设定一个阈值,基于此,多个线程竞争获取许可信号,做自己的申请后归还,超过阈值后,线程申请许可信号将会被阻塞。Semaphore可以用来构建一些对象池,资源池之类的,比如数据库连接池,我们也可以创建计数为1的Semaphore,将其作为一种类似互斥锁的机制,这也叫二元信号量,表示两种互斥状态。

package com.yjc.juc;

import java.util.concurrent.Semaphore;

public class SemaphoreDemo {
    public static void main(String[] args) {
        //代表现在一共只有三个资源
        Semaphore semaphore=new Semaphore(3);
        //创建十条线程进行资源抢夺
        for ( int i = 0; i <10 ; i++) {
            final int count =i;
            new Thread(()->{
                try {
                    semaphore.acquire();//请求资源
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("第"+(count+1)+"条线程抢到了资源");
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("第"+(count+1)+"条线程释放资源");
                //释放资源
                semaphore.release();
            }).start();
        }
    }
}

执行结果

Exchanger

用于两个工作线程之间交换数据的封装工具类,简单说就是一个线程在完成一定的事务后想与另一个线程交换数据,则第一个先拿出数据的线程会一直等待第二个线程,直到第二个线程拿着数据到来时才能彼此交换对应数据

package com.yjc.juc;

import java.util.concurrent.Exchanger;

public class ExchangerDemo {
    public static void main(String[] args) {
        Exchanger exchanger = new Exchanger();
        new Thread(() -> {
            for (int i = 1; i < 10; i++) {
                try {
                    Thread.sleep(800);
                    int data=i;
                    System.out.println(i+"\t"+Thread.currentThread().getName()+"交换前:"+data);
                    data = (int)exchanger.exchange(data);
                    System.out.println(i+"\t"+Thread.currentThread().getName()+"交换后:"+data);

                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();

        new Thread(() -> {
            int count=0;
            while (true) {
                ++count;
                try {
                    Thread.sleep(800);
                    int data = 0;
                    System.out.println(count+"\t"+Thread.currentThread().getName() + "交换前:" + data);
                    data = (int) exchanger.exchange(data);
                    System.out.println(count+"\t"+Thread.currentThread().getName() + "交换后:" + data);

                } catch (Exception e) {
                e.printStackTrace();
                }
            }
        }).start();
    }
}

执行结果

线程交换的线程数需要是二的倍数,要不然会出现线程阻塞的状态,也可以通过设置响应时间,一旦在规定时间内没有完成数据的交互,那么就抛出异常

线程池

Java中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序
都可以使用线程池。在开发过程中,合理地使用线程池能够带来3个好处。

第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
第三:提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,
还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。但是,要做到合理利用线程池,必须对其实现原理了如指掌。

ThreadPoolExecutor

Executor框架的最顶层实现是ThreadPoolExecutor类,Executors工厂类中提供的newScheduledThreadPool、newFixedThreadPool、newCachedThreadPool方法其实也只是ThreadPoolExecutor的构造函数参数不同而已。通过传入不同的参数,就可以构造出适用于不同应用场景下的线程池,那么它的底层原理是怎样实现的呢,这篇就来介绍下ThreadPoolExecutor线程池的运行过程。

  • corePoolSize: 核心池的大小。 当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中
  • maximumPoolSize: 线程池最大线程数,它表示在线程池中最多能创建多少个线程;
  • keepAliveTime: 表示线程没有任务执行时最多保持多久时间会终止。
  • unit: 参数keepAliveTime的时间单位,有7种取值

newCachedThreadPool

创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。示例代码如下:

package com.yjc.juc;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CachedThreadPoolDemo {
    public static void main(String[] args) {
        //创建一个可缓存线程如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程
        //线程池为无限大,当执行第二个任务时第一个任务已经完成,会复用执行第一个任务的线程,而不用每次新建线程
        ExecutorService  executorService= Executors.newCachedThreadPool();
        for (int i = 0; i <100 ; i++) {
            final int count=i;
            executorService.execute(()->{
                try {
                    Thread.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName()+"--->i:"+count);
            });

        }
    }
}

newFixedThreadPool

创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待
package com.yjc.juc;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class FixedThreadPoolDemo {
    public static void main(String[] args) {
        //创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待
        ExecutorService executorService= Executors.newFixedThreadPool(5);
        for (int i = 0; i <100 ; i++) {
            final int count=i;
            executorService.execute(()->{
                try {
                    Thread.sleep(300);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName()+"--->i:"+count);
            });

        }
    }
}

newScheduledThreadPool

package com.yjc.juc;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ScheduledThreadPoolDemo {
    public static void main(String[] args) {
        //创建一个定长线程池,支持定时及周期性任务执行
        ScheduledExecutorService executorService= Executors.newScheduledThreadPool(5);
        for (int i = 0; i <100 ; i++) {
            final int count=i;
            executorService.schedule(()->{
                System.out.println(Thread.currentThread().getName()+"--->i:"+count);
                //延迟三秒开始执行
            },3, TimeUnit.SECONDS);

        }
    }
}

延迟三秒才开始执行

newSingleThreadExecutor

创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。示例代码如下:

package com.yjc.juc;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SingleThreadExecutorDemo {
    public static void main(String[] args) {
        //创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行
        ExecutorService executorService= Executors.newSingleThreadExecutor();
        for (int i = 0; i <100 ; i++) {
            final int count=i;
            executorService.execute(()->{
                try {
                    Thread.sleep(300);
                } catch (InterruptedException e) {
                    e.printStackTrace();

                }
                System.out.println(Thread.currentThread().getName()+"--->i:"+count);
            });

        }
    }}

线程池原理剖析

提交一个任务到线程池中,线程池的处理流程如下:

1、判断线程池里的核心线程是否都在执行任务,如果不是(核心线程空闲或者还有核心线程没有被创建)则创建一个新的工作线程来执行任务。如果核心线程都在执行任务,则进入下个流程。

2、线程池判断工作队列是否已满,如果工作队列没有满,则将新提交的任务存储在这个工作队列里。如果工作队列满了,则进入下个流程。

3、判断线程池里的线程是否都处于工作状态,如果没有,则创建一个新的工作线程来执行任务。如果已经满了,则交给饱和策略来处理这个任务。

合理配置线程池

CPU密集型时,任务可以少配置线程数,大概和机器的cpu核数相当,这样可以使得每个线程都在执行任务

IO密集型时,大部分线程都阻塞,故需要多配置线程数,2*cpu核数

原文地址:https://www.cnblogs.com/yjc1605961523/p/12524362.html

时间: 2024-10-09 22:07:09

juc下的并发工具类和线程池的相关文章

Java并发工具类之线程间数据交换工具Exchanger

Exchanger是一个用于线程间协做的工具类,主要用于线程间的数据交换.它提供了一个同步点,在这个同步点,两个线程可以彼此交换数据.两个线程通过exchange方法交换数据,如果一个线程执行exchange方法,它就会等待另一个线程执行exchange方法,当两个线程都到达了同步点,这两个线程就可以交换数据.将本线程产生的数据传送给对方. Exchanger可用于工作的互相校对,比如我们要把线下产生的交易数据通过人工录入的方式添加到系统中,为了避免错误,我们采用AB两人同时录入的方式,当录入完

Java线程与并发编程实践----并发工具类与Executor框架

java5之前,我们使用诸如synchronized,wait(),notify()方法对线程的操作属于对 底层线程的操作,这样会出现很多的问题: 低级的并发原语,比如synchronized,wait(),notify()经常难以正确使用.误用会导致 竞态条件,线程饿死,死锁等风险. 泰国依赖synchronized会影响程序性能以及程序的可扩展性 开发者经常需要高级线程结构,如线程池,信号量.java对底层线程的操作不包含这些结. 为解决这些问题,java5引入并发工具类,该工具类主要有下面

四、线程的并发工具类

线程的并发工具类 一.CountDownLatch [1]CountDownLatch是什么? CountDownLatch,英文翻译为倒计时锁存器,是一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待. 闭锁可以延迟线程的进度直到其到达终止状态,闭锁可以用来确保某些活动直到其他活动都完成才继续执行: 确保某个计算在其需要的所有资源都被初始化之后才继续执行; 确保某个服务在其依赖的所有其他服务都已经启动之后才启动; 等待直到某个操作所有参与者都准备就绪再继续执行

Java并发编程系列-(2) 线程的并发工具类

2.线程的并发工具类 2.1 Fork-Join JDK 7中引入了fork-join框架,专门来解决计算密集型的任务.可以将一个大任务,拆分成若干个小任务,如下图所示: Fork-Join框架利用了分而治之的思想:什么是分而治之?规模为N的问题,N<阈值,直接解决,N>阈值,将N分解为K个小规模子问题,子问题互相对立,与原问题形式相同,将子问题的解合并得到原问题的解. 具体使用中,需要向ForkJoinPool线程池提交一个ForkJoinTask任务.ForkJoinTask任务有两个重要

并发工具类(四)线程间的交换数据 Exchanger

前言 ??JDK中为了处理线程之间的同步问题,除了提供锁机制之外,还提供了几个非常有用的并发工具类:CountDownLatch.CyclicBarrier.Semphore.Exchanger.Phaser: ??CountDownLatch.CyclicBarrier.Semphore.Phaser 这四个工具类提供一种并发流程的控制手段:而Exchanger工具类则提供了在线程之间交换数据的一种手段. 简介 ?? Exchanger的功能是使2个线程之间交换数据(有不少文章的说法是"传输数

Java并发编程-线程的并发工具类

Fork-Join 什么是分而治之?规模为N的问题,N<阈值,直接解决,N>阈值,将N分解为K个小规模子问题,子问题互相对立,与原问题形式相同,将子问题的解合并得到原问题的解动态规范工作密取workStealing Fork/Join使用的标准范式 常用的并发工具类CountDownLatch作用:是一组线程等待其他的线程完成工作以后在执行,加强版joinawait用来等待,countDown负责计数器的减一CyclicBarrier让一组线程达到某个屏障,被阻塞,一直到组内最后一个线程达到屏

并发编程(2)--线程的并发工具类

1.线程的并发工具类 Fork-Join 什么是分而治之? 规模为N的问题,N<阈值,直接解决,N>阈值,将N分解为K个小规模子问题,子问题互相对立,与原问题形式相同,将子问题的解合并得到原问题的解 动态规范 工作密取 workStealing Fork/Join使用的标准范式 下面演示第一种用法:由于上下文切换的原因,所以性能上有可能不如单线程效果好. package com.xiangxue.ch2.forkjoin.sum; import java.util.Random; /** *

并发工具类:CountDownLatch、CyclicBarrier、Semaphore

在多线程的场景下,有些并发流程需要人为来控制,在JDK的并发包里提供了几个并发工具类:CountDownLatch.CyclicBarrier.Semaphore. 一.CountDownLatch 1 import java.util.concurrent.CountDownLatch; 2 3 4 public class CountDownLatchTest 5 { //设置N为2 6 static CountDownLatch c = new CountDownLatch(2); 7 p

?集合工具类使用线程

集合工具类使用线程 1. hashmap源码解析与并发可能遇见的问题 1.HashMap中的几个重要变量 static final int DEFAULT_INITIAL_CAPACITY = 16;     //默认初始容量,必须是2的n次方 static final int MAXIMUM_CAPACITY = 1 << 30;     //最大容量,当通过构造方法传入的容量比它还大时,就用这个最大容量,必须是2的n次方 static final float DEFAULT_LOAD_FA