目录
- 2.线程的工具类
- 2.1 fork/join框架
- 2.2 CountDownLatch
- 一般用法
- 2.3 CycliBarrier
- 2.4 Semaphore
- 2.5 Exchange
- 使用举例
- 2.6 Callable Future and FutureTask
2.线程的工具类
2.1 fork/join框架
### 什么是分而治之
简单地说把一个大的问题,拆分成若干个子问题,每个问题相互独立,且和原来问题形式相同。最后将每个子问题的解合并得到原问题的解答。
### 什么是工作密取
### 举例
带参数继承RecursiveTask<V>
/**
* fork/join 使用 情况1:带返回值,这时候要继承RecursiveTask<V>
* 情况2:不带返回值,这时候,要继承RecursiveAction
*
* @author 45027056
*
*/
public class SumArray2 extends RecursiveTask<Integer> {
private int beginIndex;
private int endIndex;
private int[] src;
//将这个计数任务拆分成10个子任务
private final static int THRESHOLD = MakeArray.ARRAY_LENGTH / 10;
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool();
int[] makeArray = MakeArray.makeArray();
SumArray2 task = new SumArray2(0, makeArray.length - 1, makeArray);
long start = System.currentTimeMillis();
pool.invoke(task);// 同步调用
System.out.println("Task is Running.....");
System.out.println("The count is " + task.join() + " spend time:" + (System.currentTimeMillis() - start) + "ms");
}
//构造方法指定参数
public SumArray2(int begin, int end, int[] makeArray) {
this.beginIndex = begin;
this.endIndex = end;
this.src = makeArray;
}
@Override
protected Integer compute() {
if (endIndex - beginIndex <= THRESHOLD) {
int sum = 0;
for (int i : src) {
sum = sum + i;
}
return sum;
} else {
int mid = (endIndex + beginIndex) / 2;
// 创建任务1
SumArray2 task1 = new SumArray2(beginIndex, mid, src);
// 创建任务2
SumArray2 task2 = new SumArray2(mid + 1, endIndex, src);
// 调用2个子任务
invokeAll(task1, task2);
// 合并结果返回给主任务
return task1.join() + task2.join();
}
}
}
### 举例2 不带参数继承RecursiveAction
2.2 CountDownLatch
> 一组线程等待另外一组线程执行完后再执行。通过CountDownLatch的构造函数指定条件,coutDownLatch.wait()阻塞一个或者一组线程,当coutDownLatch.countDown()减至0时,被阻塞的线程才可以运行。
一般用法
/**
*
* @author 45027056
* 演示countDownLatch用法
* 类说明:演示CountDownLatch,有5个初始化的线程,5个扣除点,
* 扣除完毕以后,主线程和业务线程才能继续自己的工作
*/
public class UsCountDownLatch2 {
//1。CountDownLatch构造函数必须指定一个整数N作为入参,且N必须大于0。
//每次latch.countDown()方法,会扣减1,直到N为0,其他latch.await()的线程才会继续运行。
private static CountDownLatch latch = new CountDownLatch(5);
public static void main(String[] args) {
UsCountDownLatch2 us = new UsCountDownLatch2();
us.startInit();
us.startBusiThread();
try {
//一直阻塞直到CountDownLatch(0)
latch.await();
System.out.println("Main Thread is doing itis working ");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//初始化线程
public void startInit(){
for(int i=0; i < 5; i++){
new Thread(){
@Override
public void run() {
System.out.println("init thread working");
latch.countDown();
}
}.start();
}
}
//业务线程
public void startBusiThread(){
new Thread(){
@Override
public void run() {
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("startBusiThread is doing its working");
}
}.start();
}
}
2.3 CycliBarrier
> 和CountDown不同,CycliBarrier是一个同步工具,用于控制一组线程,当且进当所有这组线程本身到达了栅栏点,await()的线程才可以继续运行下去(A synchronization aid that allows a set of threads to all wait for
> 常用方法: await()/getNumberWaiting()
### 使用例子
/**
* 演示如何使用UseCyclicBarrier
* 1.CyclicBarrier可以指定当线程都到到达栅栏点的时候,执行自定义的线程。通过CyclicBarrier构造函数的第二个入参指定。
* 2。注意:先执行CyclicBarrier指定的BarrierActionThread,再执行其他await()线程
* @author 45027056
*
*/
public class UseCyclicBarrier2 {
CyclicBarrier barrier = new CyclicBarrier(5,new BarrierActionThread());
private ConcurrentHashMap<String,Long> resultMap = new ConcurrentHashMap<String,Long>();
public static void main(String[] args) {
UseCyclicBarrier2 useBarrier = new UseCyclicBarrier2();
for(int i=0; i < 6; i++){
useBarrier.new SubThread().start();
}
//主线程sleep 2秒
SleepTools.second(2);
System.out.println("Main thread is end..");
}
//栅栏点都到达后执行的线程
class BarrierActionThread implements Runnable{
@Override
public void run() {
System.out.println("BarrierActionThread is running ...");
}
}
//栅栏子线程
class SubThread extends Thread{
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "is await...");
try {
barrier.await();
resultMap.put(Thread.currentThread().getName(), Thread.currentThread().getId());
System.out.println(Thread.currentThread().getName() + "is free...and doing its wrok...");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}
2.4 Semaphore
> 一个一般用来限流的并发工具类
### 使用用例
/**
* 演示Semaphore使用实现链接池
* 可以理解Semaphore是用来限流的工具。控制某种资源(连接数)保持在某个范围内。
* @author 45027056
*
*/
public class DBPoolSemaphore2 {
LinkedList<Connection> pools = new LinkedList<Connection>();
Semaphore idle = new Semaphore(10);//空闲信号量
Semaphore inuse = new Semaphore(0);//在用信号量
public DBPoolSemaphore2(int poolSize){
if(poolSize > 0){
for(int i=0; i < poolSize; i++){
pools.addFirst(SqlConnectImpl.fetchConnection());
}
} else {
System.out.println("poolSize must greater than 0");
}
}
public void returnConnect(Connection connection) throws InterruptedException {
if(null != connection){
//阻塞直到从inuse中获取到信号量。
inuse.acquire();
synchronized (pools) {
pools.addFirst(connection);
pools.notifyAll();
}
//释放一个信号量返回idle
idle.release();
}
}
public Connection takeConnect() throws InterruptedException {
//阻塞直到从idle中获取到信号量。
idle.acquire();
Connection conn;
synchronized (pools) {
conn = pools.removeFirst();
}
//释放一个信号量返回inuse
inuse.release();
return conn;
}
}
2.5 Exchange
> 用于2个线程交换数据.注意只局限2个线程之间交换数据。
使用举例
public class UseExchange {
private static final Exchanger<Set<String>> exchange
= new Exchanger<Set<String>>();
public static void main(String[] args) {
//第一个线程
new Thread(new Runnable() {
@Override
public void run() {
Set<String> setA = new HashSet<String>();//存放数据的容器
try {
/*添加数据
* set.add(.....)
* */
setA = exchange.exchange(setA);//交换set
/*处理交换后的数据*/
} catch (InterruptedException e) {
}
}
}).start();
//第二个线程
new Thread(new Runnable() {
@Override
public void run() {
Set<String> setB = new HashSet<String>();//存放数据的容器
try {
/*添加数据
* set.add(.....)
* set.add(.....)
* */
setB = exchange.exchange(setB);//交换set
/*处理交换后的数据*/
} catch (InterruptedException e) {
}
}
}).start();
}
}
2.6 Callable Future and FutureTask
/**
* Callable FutureTask的用法
* 1.Callable和Runnable接口的区别
* + Callable有返回值,Runnable没有返回值
* + Callable 可以抛出异常,Runnable不可以。
* @author 45027056
*
*/
public class UserFuture2 implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println("user Callable begin");
int sum = 0;
for(int i=0;i < 5000;i++){
sum = sum + i;
}
try {
Thread.currentThread().sleep(2000);
} catch (InterruptedException e) {
System.out.println("have InterruptedException..");
}
System.out.println("user Callable end");
return sum;
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
UserFuture2 ueseFuture = new UserFuture2();
//FutureTask 本质是一个Runnable,用来包装Callable,然后投放到线程中去执行。
FutureTask<Integer> futureTask = new FutureTask(ueseFuture);
new Thread(futureTask).start();
SleepTools.second(1);
Random random = new Random();
//随机获得结果和中断。
if(random.nextBoolean()){
int sum = futureTask.get();
System.out.println("the callable result sum is" + sum);
} else {
//如果这里调用能够cancel会中断call方法里面的Thread.currentThread.sleep(),这里会抛出一个InterruptedException
futureTask.cancel(true);
}
}
}
原文地址:https://www.cnblogs.com/codetree/p/10884158.html
时间: 2024-10-05 23:25:18