高级java必会系列一:多线程的简单使用

众所周知,开启线程2种方法:第一是实现Runable接口,第二继承Thread类。(当然内部类也算...)常用的,这里就不再赘述。本章主要分析总结线程池和常用调度类。

一、线程池

1.newCachedThreadPool

(1)缓存型池子,先查看池中有没有以前建立的线程,如果有,就reuse,如果没有,就建立一个新的线程加入池中;

(2)缓存型池子,通常用于执行一些生存周期很短的异步型任务;因此一些面向连接的daemon型server中用得不多;

(3)能reuse的线程,必须是timeout IDLE内的池中线程,缺省timeout是60s,超过这个IDLE时长,线程实例将被终止及移出池。

(4)注意,放入CachedThreadPool的线程不必担心其结束,超过TIMEOUT不活动,其会自动被终止

2.newFixedThreadPool--本人常用

(1)newFixedThreadPool与cacheThreadPool差不多,也是能reuse就用,但不能随时建新的线程

(2)其独特之处:任意时间点,最多只能有固定数目的活动线程存在,此时如果有新的线程要建立,只能放在另外的队列中等待,直到当前的线程中某个线程终止直接被移出池子

(3)和cacheThreadPool不同,FixedThreadPool没有IDLE机制(可能也有,但既然文档没提,肯定非常长,类似依赖上层的TCP或UDP IDLE机制之类的),所以FixedThreadPool多数针对一些很稳定很固定的正规并发线程,多用于服务器

(4)从方法的源代码看,cache池和fixed 池调用的是同一个底层池,只不过参数不同:

    fixed池线程数固定,并且是0秒IDLE(无IDLE)

    cache池线程数支持0-Integer.MAX_VALUE(显然完全没考虑主机的资源承受能力),60秒IDLE

3.ScheduledThreadPool

(1)调度型线程池

(2)这个池子里的线程可以按schedule依次delay执行,或周期执行

4.SingleThreadExecutor

(1)单例线程,任意时间池中只能有一个线程

(2)用的是和cache池和fixed池相同的底层池,但线程数目是1-1,0秒IDLE(无IDLE)

二、常用线程调度类

1.wait、notify、notifyAll-----不建议新手直接使用

顾名思义,wait是等待,notify是通知一个等待线程、notifyAll唤醒所有等待线程。

2.CountDownLatch----很适合用来将一个任务分为n个独立的部分,等这些部分都完成后继续接下来的任务

隶属于java.util.concurrent包。CountDownLatch类是一个同步计数器,构造时传入int参数,该参数就是计数器的初始值,每调用一次countDown()方法,计数器减1,计数器大于0 时,await()方法会阻塞程序继续执行.当多个线程达到预期时(latch.countDown()),唤醒多个其他等待中的线程,即执行latch.await()后面的代码。样例是,张三、李四合作完成任务,张三5秒,李四8秒,当张三李四都完成后,总任务结束。代码如下:

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;

public class CountDownLatchDemo {
    final static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); 

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch=new CountDownLatch(2);//两个工人的协作
        Worker worker1=new Worker("张三", 5000, latch);
        Worker worker2=new Worker("李四", 8000, latch);
        worker1.start();
        worker2.start();
        latch.await();//阻塞!等待所有工人完成工作
        System.out.println("all work done at "+sdf.format(new Date()));
    }  

    static class Worker extends Thread{
        String workerName;
        int workTime;
        CountDownLatch latch;
        public Worker(String workerName ,int workTime ,CountDownLatch latch){
             this.workerName=workerName;
             this.workTime=workTime;
             this.latch=latch;
        } 

        public void run(){
            System.out.println("Worker "+workerName+" do work begin at "+sdf.format(new Date()));
            doWork();//工作了
            System.out.println("Worker "+workerName+" do work complete at "+sdf.format(new Date()));
            latch.countDown();//工人完成工作,计数器减一  

        }  

        private void doWork(){
            try {
                Thread.sleep(workTime);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

-----------------------------------------------Worker 李四 do work begin at 2016-11-02 18:25:28Worker 张三 do work begin at 2016-11-02 18:25:28Worker 张三 do work complete at 2016-11-02 18:25:33Worker 李四 do work complete at 2016-11-02 18:25:36all work done at 2016-11-02 18:25:36

测试可见,张三李四共同协作完成。

3.CyclicBarrier----适合多线程循环到达屏障后再执行

字面意思循环屏障,可理解为栅栏,协同多个线程都执行到barrier.await时,如果构造CyclicBarrier barrier=new CyclicBarrier(2, Runnable)时,第一个参数代码线程数,如果有第二参Runnable,那么所有线程都await时,先执行Runnable,再各自执行await后续的代码。

CyclicBarrier和CountDownLatch区别:

1.CountDownLatch在多个线程都执行完毕latch.countDown后唤醒await线程,多个countDown子线程在执行完countDown后可继续执行后续代码。

2.CyclicBarrier可循环使用,CountDownLatch只1次。见代码示例:

3.CountDownLatch需要latch.countDown和latch.await()配合使用。CyclicBarrier就一个barrier.await。

下面举例:鸟、鱼2个线程同时运行问题。

  1 package study.thread;
  2
  3 import java.util.concurrent.BrokenBarrierException;
  4 import java.util.concurrent.CyclicBarrier;
  5
  6 /**
  7  * 循环栅栏(屏障)
  8  * 问题:一个池塘,有很多鸟和很多鱼,鸟每分钟产生一个后代,鱼每30秒钟产生2个后代。
  9  * 鸟每10秒钟要吃掉一条鱼。建一个池塘,初始化一些鱼和鸟,看看什么时候鸟把鱼吃光。
 10  *
 11  */
 12 public class CyclicBarrierDemo {
 13
 14     long time ;
 15     long birdNum ;
 16     long fishNum ;
 17     Object lock = new Object() ;
 18     CyclicBarrier barrier  ;
 19
 20     public CyclicBarrierDemo(long birdNum , long fishNum){
 21         this.birdNum = birdNum ;
 22         this.fishNum = fishNum ;
 23     }
 24
 25     /**
 26      * 入口
 27      * @param args
 28      */
 29     public static void main(String[] args) {
 30         //构造demo,初始化5只秒,20条鱼
 31         CyclicBarrierDemo bf = new CyclicBarrierDemo(5 , 20) ;
 32         //生态圈开启
 33         bf.start();
 34     }
 35
 36     //生态圈开启
 37     public void start(){
 38         //构造鱼,鸟,时间线
 39         FishThread fish = new FishThread() ;
 40         BirdThread bird = new BirdThread() ;
 41         TimeLine tl = new TimeLine() ;
 42
 43         //初始化环形屏障,当barrier对象的await方法被调用两次之后,将会执行tl线程
 44         barrier = new CyclicBarrier(2, tl) ;//这里要注意第一个参数,如果大于调用await的线程数,会死锁。
 45
 46         //鱼、鸟动起来
 47         fish.start();
 48         bird.start();
 49
 50     }
 51
 52     public void printInfo(String source){
 53         System.out.printf(source+"time[%d]:birdNum[%d] ,fishNum[%d]\n" ,time , birdNum , fishNum);
 54     }
 55
 56     private class TimeLine implements Runnable {
 57         @Override
 58         public void run() { //所有子任务都调用了await方法后,将会执行该方法, 然后所有子线程继续执行
 59             System.out.println("TimeLine start!");
 60             //如果鱼数量<=0,结束程序
 61             if(fishNum <= 0){
 62                 System.exit(-1);
 63             }
 64             //时间加10秒
 65             time += 10 ;
 66             System.out.println("TimeLine end,时间加10秒!");
 67         }
 68     }
 69
 70     private class FishThread extends Thread {
 71         @Override
 72         public void run() {
 73             //循环
 74             while(true){
 75                 try {
 76                     System.out.println("鱼已经就位!到达await!");
 77                     barrier.await() ;   //进入睡眠, 等待所有子任务都进入睡眠  然后再继续
 78                 } catch (InterruptedException | BrokenBarrierException e) {
 79                     e.printStackTrace();
 80                 }
 81                 synchronized (lock) {
 82                     //鱼每30秒钟产生2个后代
 83                     if(time % 30 == 0){
 84                         fishNum += fishNum * 2;
 85                         printInfo("鱼动作执行!");
 86                     }
 87                 }
 88             }
 89         }
 90     }
 91
 92     private class BirdThread extends Thread{
 93         @Override
 94         public void run() {
 95             //循环
 96             while(true){
 97                 try {
 98                     System.out.println("鸟已经就位!到达await!");
 99                     barrier.await() ;  //进入睡眠, 等待所有子任务都进入睡眠  然后再继续
100                 } catch (InterruptedException | BrokenBarrierException e) {
101                     e.printStackTrace();
102                 }
103                 synchronized (lock) {
104                     //鸟每10秒钟要吃掉一条鱼
105                     if(time % 10 == 0){
106                         fishNum = fishNum >= birdNum ? fishNum - birdNum : 0 ;
107                         //鸟每分钟产生一个后代
108                         if(time % 60 == 0){
109                             birdNum += birdNum ;
110                         }
111                         printInfo("鸟动作执行!");
112                     }
113                 }
114
115             }
116
117         }
118
119     }
120
121 }  

4.Semaphore---通过控制操作系统的信号量数目来控制并发,比控制线程并发数粒度更细。

管理固定数值的信号量,用以控制并发的数量。把需要并发的代码放在acquire、release之间即可。acquire获取信号,release释放信号。如果Semaphore管理一个信号量,就是互斥锁。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

public class SemaphoreTest {

     public static void main(String[] args) {
        // 线程池
        ExecutorService exec = Executors.newCachedThreadPool();
        // 只能5个线程同时访问
        final Semaphore semp = new Semaphore(5);
        // 模拟20个客户端访问
        for (int index = 0; index < 20; index++) {
            final int NO = index;
            Runnable run = new Runnable() {
                public void run() {
                    try {
                        //获取许可
                        semp.acquire();
                        System.out.println("Accessing: " + NO);
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally{
                        //释放
                        semp.release();              System.out.println("-----------------"+semp.availablePermits());
                    }
                }
            };
            exec.execute(run);
        }
        // 退出线程池
        exec.shutdown();
    }
}

5.Exchanger

用于两个线程之间进行数据交换,先执行exchanger.exchange()的线程等待后来的线程到达,然后交换数据,最后再继续向下执行。

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Exchanger;

/**
 *
 * @ClassName: ExchangerDemo
 * @Description: 用于两个线程之间进行数据交换,先执行exchanger.exchange()的线程等待后来的线程到达,然后交换数据,最后再继续向下执行。
 * @author denny.zhang
 * @date 2016年11月4日 下午1:27:29
 *
 */
public class ExchangerDemo {
    public static void main(String[] args) {
        final Exchanger<List<Integer>> exchanger = new Exchanger<List<Integer>>();

        new Thread(){
            public void run(){
                List<Integer> list = new ArrayList<Integer>();
                list.add(1);
                list.add(2);
                try {
                    list = exchanger.exchange(list);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Thread1"+list);
            }
        }.start();

        new Thread(){
            public void run(){
                List<Integer> list = new ArrayList<Integer>();
                list.add(3);
                list.add(4);
                try {
                    list = exchanger.exchange(list);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Thread2"+list);
            }
        }.start();
    }
}

6.Future和FutrueTask---常用!

Future是接口,FutrueTask是接口实现类。场景:多线程并发执行,返回结果放进list.

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
 *
 * @ClassName: FutureDemo
 * @Description: Future
 * @author denny.zhang
 * @date 2016年11月4日 下午1:50:32
 *
 */
public class FutureDemo {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        //结果集
        List<Integer> list = new ArrayList<Integer>();
        //开启多线程
        ExecutorService exs = Executors.newFixedThreadPool(3);
        List<Future<Integer>> futureList = new ArrayList<Future<Integer>>();
        //启动线程池,固定线程数为3
        for(int i=0;i<3;i++){
            //提交任务,添加返回
            futureList.add(exs.submit(new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    return 1;
                }
            }));
        }
        //结果归集
        for (Future<Integer> future : futureList) {
            while (true) {
                if (future.isDone()&& !future.isCancelled()) {
                    Integer i = future.get();
                    list.add(i);
                    break;
                } else {
                    Thread.sleep(100);
                }
            }
        }
        System.out.println("list="+list);
    }
}

返回:list=[1, 1, 1]

参考:

《大型网站系统与java中间件实践》

时间: 2024-11-08 18:15:52

高级java必会系列一:多线程的简单使用的相关文章

高级java必会系列一:zookeeper分布式锁

方案1: 算法思路:利用名称唯一性,加锁操作时,只需要所有客户端一起创建/test/Lock节点,只有一个创建成功,成功者获得锁.解锁时,只需删除/test/Lock节点,其余客户端再次进入竞争创建节点,直到所有客户端都获得锁.特点:这种方案的正确性和可靠性是ZooKeeper机制保证的,实现简单.缺点是会产生"惊群"效应,假如许多客户端在等待一把锁,当锁释放时候所有客户端都被唤醒,仅仅有一个客户端得到锁.方案2:算法思路:临时顺序节点实现共享锁        客户端调用create(

直通大厂:Java必考系列——JVM经典面试题目(含答案)

Q1:类的加载机制是什么?答:类加载到内存中主要有5个阶段,分别为①加载:将Class文件读取到运行时数据区的方法区内,在堆中创建Class对象,并封装类在方法区的数据结构的过程.②验证:主要用于确保Class文件符合当前虚拟机的要求,保障虚拟机自身的安全,只有通过验证的Class文件才能被JVM加载.③准备:主要工作是在方法区中为类变量分配内存空间并设置类中变量的初始值.④解析:将常量池中的符号引用替换为直接引用.⑤初始化:主要通过执行类构造器的<client>方法为类进行初始化,该方法是在

Java反射机制大神必学系列之 ,高级与低级的差别在哪里?

Java反射机制大神必学系列之 ,高级与低级的差别在哪里?java学习爱好者 2019-05-20 19:08前言今天介绍下Java的反射机制,以前我们获取一个类的实例都是使用new一个实例出来.那样太low了,今天跟我一起来学习学习一种更加高大上的方式来实现. 正文Java反射机制定义 Java反射机制是指在运行状态中,对于任意一个类,都能够知道这个类的所有属性和方法:对于任意一个对象,都能够调用它的任意一个方法和属性:这种动态获取的信息以及动态调用对象的方法的功能称为java语言的反射机制.

Java总结篇系列:Java多线程(三)

一.一个典型的Java线程安全例子 1 public class ThreadTest { 2 3 public static void main(String[] args) { 4 Account account = new Account("123456", 1000); 5 DrawMoneyRunnable drawMoneyRunnable = new DrawMoneyRunnable(account, 700); 6 Thread myThread1 = new Thr

JAVA思维导图系列:多线程初级

感觉自己JAVA基础太差了,重新看一遍,已思维导图的方式记录下来 多线程初级 进程 独立性 拥有独立资源 独立的地址 无授权其他进程无法访问 动态性 与程序的区别是:进程是动态的指令集合,而程序是静态的指令集合 加入时间概念 有自己的生命周期和不同的状态 并发性 多个进程可以在单核处理器并发执行 多个进程互不影响 和并行的区别:并行是同一时刻多个进程在多个处理器上同时执行 而并发是指在同一时刻只能执行一条指令,但互相切换迅速,宏观上看是执行多个指令 线程 线程相对于进程如同进程相对于操作系统 多

Java基础复习笔记系列 八 多线程编程

Java基础复习笔记系列之 多线程编程 1. 2.

JAVA思维导图系列:多线程中级

多线程中级,包含控制线程的几种方法.线程的同步.线程组,有返回值的线程.线程之前的通信.线程池和线程的几个相关类 线程同步 当多个线程访问同一资源时,加synchronized对资源进行加锁 synchronized可以修饰代码块,修饰方法 只对可以改变竞争资源的方法加锁 只有多线程才会出现线程安全问题 释放锁的条件 同步方法.代码块结束 出现未处理的异常 调用wait,将本线程置为就绪状态 sleep或者yield.suspend不会释放同步锁 同步锁(Lock),显示加锁.释放锁 读写锁(R

【小白的java成长系列】——多线程初识(多人买票问题)

本来这节内容是要到后面来说的,因为最近在弄并发的问题,推荐一本书<java并发编程实战>,深入的讲解了多线程问题的.本人最近也刚好在看这本书,还不错的~ 多线程的相关概念,就不用说了的,自己可以去网上查找,有一大堆关于它的讲解~ 先来看看买票的程序: package me.javen.thread.one; public class TicketDemo { public static void main(String[] args) { // 使用Thread类的方式 // TicketTh

JAVA思维导图系列:多线程0基础

感觉自己JAVA基础太差了,又一次看一遍,已思维导图的方式记录下来 多线程0基础 进程 独立性 拥有独立资源 独立的地址 无授权其它进程无法訪问 动态性 与程序的差别是:进程是动态的指令集合,而程序是静态的指令集合 增加时间概念 有自己的生命周期和不同的状态 并发性 多个进程能够在单核处理器并发运行 多个进程互不影响 和并行的差别:并行是同一时刻多个进程在多个处理器上同一时候运行 而并发是指在同一时刻仅仅能运行一条指令,但互相切换迅速,宏观上看是运行多个指令 线程 线程相对于进程如同进程相对于操