Java并发编程从入门到精通 - 第5章:多线程之间的交互:线程阀

详述:
 线程阀是一种线程与线程之间相互制约和交互的机制;
 作用:http://wsmajunfeng.iteye.com/blog/1629354
阻塞队列BlockingQueue;
数组阻塞队列ArrayBlockingQueue;
链表阻塞队列LinkedBlockingQueue;
优先级阻塞队列PriorityBlockingQueue;
延时队列DelayQueue;
同步队列SynchronousQueue;
链表双向阻塞队列LinkedBlockingDeque;
链表传输队列LinkedTransferQueue;
同步计数器CountDownLatch;
抽象队列化同步器AbstractQueuedSynchroizer;
同步计数器Semaphore;
同步计数器CyclicBarrier;

 1 /**
 2  * ArrayBlockingQueue的简单用法
 3  */
 4 package thread04;
 5
 6 import java.util.concurrent.ArrayBlockingQueue;
 7 import java.util.concurrent.BlockingQueue;
 8
 9 public class ArrayBlockingQueueTest01
10 {
11     public static void main(String[] args) throws InterruptedException
12     {
13         // 新建一个等待队列
14         final BlockingQueue<String> bq = new ArrayBlockingQueue<String>(16);
15
16         for(int i=0;i<16;i++)
17         {
18             String log = (i+1) + " --> ";
19             bq.put(log);
20         }
21
22         // 新建四个线程
23         for(int i=0;i<4;i++)
24         {
25             new Thread(new Runnable()
26             {
27                 @Override
28                 public void run()
29                 {
30                     while(true)
31                     {
32                         try
33                         {
34                             String log = bq.take();
35                             parseLog(log);
36                         }
37                         catch (InterruptedException e)
38                         {
39                             e.printStackTrace();
40                         }
41                     }
42                 }
43             }).start();
44         }
45     }
46
47     public static void parseLog(String log)
48     {
49         System.out.println(log + System.currentTimeMillis());
50
51         try
52         {
53             Thread.sleep(1000);
54         }
55         catch (InterruptedException e)
56         {
57             e.printStackTrace();
58         }
59     }
60 }

ArrayBlockingQueue的简单用法

 1 /**
 2  * LinkedBlockingQueue的简单用法
 3  */
 4 package thread04;
 5
 6 import java.util.concurrent.BlockingQueue;
 7 import java.util.concurrent.LinkedBlockingQueue;
 8
 9 public class LinkedBlockingQueueTest01
10 {
11     public static void main(String[] args)
12     {
13         final BlockingQueue<String> bq = new LinkedBlockingQueue<String>(16);
14
15         for(int i=0;i<16;i++)
16         {
17             String log = (i+1) + " --> ";
18             try
19             {
20                 bq.put(log);
21             }
22             catch (InterruptedException e)
23             {
24                 e.printStackTrace();
25             }
26         }
27
28         for(int i=0;i<4;i++)
29         {
30             new Thread(new Runnable()
31             {
32                 @Override
33                 public void run()
34                 {
35                     while(true)
36                     {
37                         try
38                         {
39                             String log = bq.take();
40                             parseLog(log);
41                         }
42                         catch (InterruptedException e)
43                         {
44                             e.printStackTrace();
45                         }
46                     }
47                 }
48
49             }).start();
50         }
51     }
52
53     public static void parseLog(String log)
54     {
55         System.out.println(log + System.currentTimeMillis());
56
57         try
58         {
59             Thread.sleep(1000);
60         }
61         catch (InterruptedException e)
62         {
63             e.printStackTrace();
64         }
65     }
66 }

LinkedBlockingQueue的简单用法

 1 /**
 2  * DelayQueue的简单用法
 3  */
 4 package thread04;
 5
 6 import java.util.concurrent.DelayQueue;
 7 import java.util.concurrent.Delayed;
 8 import java.util.concurrent.TimeUnit;
 9
10 public class DelayQueueTest01
11 {
12     public static void main(String[] args)
13     {
14         // 新建一个等待队列
15         final DelayQueue<Student> dq = new DelayQueue<Student>();
16
17         for(int i=0;i<5;i++)
18         {
19             Student student = new Student("学生"+i, Math.round((Math.random()*10+i)));
20             dq.put(student);  // 将数据存到队列里面
21         }
22
23         // 获取但不移除此队列的头部;如果此队列为空,则返回null
24         System.out.println("dq.peek():" + dq.peek().getName());
25
26         // 获取并移除此队列的头部,在可从此队列获得到期延迟的元素,获得到达指定的等待时间之前一直等待(如有必要)
27         // poll(long timeout, TimeUnit unit)
28     }
29 }
30
31 class Student implements Delayed  // 必须实现Delayed接口
32 {
33     private String name;
34     private long submitTime;  // 交卷时间
35     private long workTime;  // 考试时间
36
37     public String getName()
38     {
39         return this.name + "交卷,用时" + workTime;
40     }
41
42     public Student(String name, long submitTime)
43     {
44         this.name = name;
45         this.workTime = submitTime;
46         this.submitTime = TimeUnit.NANOSECONDS.convert(submitTime, TimeUnit.MILLISECONDS) + System.nanoTime();
47         System.out.println(this.name + "交卷,用时" + workTime);
48     }
49
50     @Override
51     // 必须实现compareTo()方法
52     public int compareTo(Delayed o)
53     {
54         // 比较的方法
55         Student that = (Student) o;
56         return submitTime > that.submitTime ? 1 : (submitTime < that.submitTime ? -1 : 0);
57     }
58
59     @Override
60     // 必须实现getDelay()方法
61     public long getDelay(TimeUnit unit)
62     {
63         // 返回一个延时时间
64         return unit.convert(submitTime - System.nanoTime(), unit.NANOSECONDS);
65     }
66
67 }
68
69 /*
70 每次运行结果都不一样,我们获得永远是队列里面的第一个元素
71 */

DelayQueue的简单用法

 1 /**
 2  * SynchronousQueue的简单用法
 3  */
 4 package thread04;
 5
 6 import java.util.concurrent.Semaphore;
 7 import java.util.concurrent.SynchronousQueue;
 8
 9 public class SynchronousQueueTest01
10 {
11     public static void main(String[] args)
12     {
13         System.out.println("begin:" + System.currentTimeMillis() / 1000);
14
15         final SynchronousQueue<String> sq = new SynchronousQueue<String>();
16         final Semaphore sem = new Semaphore(1);
17
18         /*
19         // 放在不同的地方,结果是不一样的
20         for(int i=0;i<10;i++)
21         {
22             String input = i + "";
23             try
24             {
25                 sq.put(input);
26             }
27             catch (InterruptedException e)
28             {
29                 e.printStackTrace();
30             }
31         }
32         */
33
34         for(int i=0;i<10;i++)
35         {
36             new Thread(new Runnable()
37             {
38                 @Override
39                 public void run()
40                 {
41                     try
42                     {
43                         sem.acquire();
44
45                         String input = sq.take();
46                         String output = TestDo.doSome(input);
47                         System.out.println(Thread.currentThread().getName() + ":" + output);
48
49                         sem.release();
50                     }
51                     catch (InterruptedException e)
52                     {
53                         e.printStackTrace();
54                     }
55
56                 }
57             }).start();
58         }
59
60         /*for(int i=0;i<10;i++)
61         {
62             String input = i + "";
63             try
64             {
65                 sq.put(input);
66             }
67             catch (InterruptedException e)
68             {
69                 e.printStackTrace();
70             }
71         }*/
72
73     }
74
75 }
76
77 class TestDo
78 {
79     public static String doSome(String input)
80     {
81         try
82         {
83             Thread.sleep(1000);
84         }
85         catch (InterruptedException e)
86         {
87             e.printStackTrace();
88         }
89
90         String output = input + ":" + System.currentTimeMillis();
91
92         return output;
93     }
94 }

SynchronousQueue的简单用法

 1 /**
 2  * LinkedTransferQueue的简单使用
 3  */
 4 package thread04;
 5
 6 import java.util.Random;
 7 import java.util.concurrent.LinkedTransferQueue;
 8 import java.util.concurrent.TimeUnit;
 9
10 public class LinkedTransferQueueTest01
11 {
12     public static void main(String[] args)
13     {
14         LinkedTransferQueue<String> queue = new LinkedTransferQueue<String>();
15
16         Producer p = new Producer(queue);
17         Thread producer = new Thread(p);
18         // 设为守护进程,使得线程执行结束后程序自动结束
19         producer.setDaemon(true);
20         producer.start();
21
22         for(int i=0;i<10;i++)
23         {
24             Consumer c = new Consumer(queue);
25             Thread consumer = new Thread(c);
26             consumer.setDaemon(true);
27             consumer.start();
28
29             try
30             {
31                 // 消费者进程休眠1秒,以便生产者获得CPU,从而生产产品
32                 Thread.sleep(1000);
33             }
34             catch (InterruptedException e)
35             {
36                 e.printStackTrace();
37             }
38         }
39     }
40 }
41
42 class Consumer implements Runnable
43 {
44     private LinkedTransferQueue<String> queue;
45
46     public Consumer(LinkedTransferQueue<String> queue)
47     {
48         this.queue = queue;
49     }
50
51     @Override
52     public void run()
53     {
54         try
55         {
56             System.out.println(" Consumer " + Thread.currentThread().getName() + " " + queue.take());
57         }
58         catch (InterruptedException e)
59         {
60             e.printStackTrace();
61         }
62     }
63 }
64
65 class Producer implements Runnable
66 {
67     private LinkedTransferQueue<String> queue;
68
69     public Producer(LinkedTransferQueue<String> queue)
70     {
71         this.queue = queue;
72     }
73
74     private String produce()
75     {
76         return " your lucky number " + (new Random().nextInt(1000));
77     }
78
79     @Override
80     public void run()
81     {
82         try
83         {
84             while(true)
85             {
86                 if(queue.hasWaitingConsumer())
87                 {
88                     queue.transfer(produce());
89                 }
90
91                 TimeUnit.SECONDS.sleep(1);
92             }
93         }
94         catch (Exception e)
95         {
96             e.printStackTrace();
97         }
98     }
99 }

LinkedTransferQueue的简单使用

同步计数器CountDownLatch:
 详解:
  在完成一组正在其他线程中执行的操作之前,允许一个或多个线程一直等待;
  用给定的计数初始化CountDownLatch;由于调用了countDown()方法,所以在当前计数到达0之前,awiait()方法会一直受阻塞(哪个线程中调用了await()方法,哪个线程就被阻塞,一直等待);之后(当前计数到达0),会释放所有等待的线程,await的所有后续调用(线程调用await()方法后续的一些操作)都将立即返回;这种现象只出现一次,因为计数无法被重置;
 主要方法:
  CountDownLatch(int count) :构造一个用给定计数初始化的CountDownLatch实例对象;
  void await():使当前线程在锁存器倒计数至0之前一直等待,除非线程被中断;
  boolean await(long timeout, TimeUnit unit) :使当前线程在锁存器倒计数至0之前一直等待,除非线程被中断或超出了指定的等待时间;
  void countDown():递减锁存器的计数,如果计数到达0,则释放所有等待的线程;
  long getCount():返回当前计数;
 应用场景:
  在一些应用场合中,需要等待某个条件到达要求后才能做后面的事情;同时当线程都完成后也会触发事件,以便进行后面的操作;

 1 /**
 2  * CountDownLatch的简单使用
 3  */
 4 package thread04;
 5
 6 import java.util.concurrent.CountDownLatch;
 7
 8 public class CountDownLatchTest01
 9 {
10     private static CountDownLatch latch = new CountDownLatch(3);
11
12     public static void main(String[] args)
13     {
14         Worker worker1 = new Worker("张三 程序员1", latch);
15         Worker worker2 = new Worker("李四  程序员2", latch);
16         Worker worker3 = new Worker("王五  程序员3", latch);
17
18         Thread t1 = new Thread(worker1);
19         Thread t2 = new Thread(worker2);
20         Thread t3 = new Thread(worker3);
21
22         t1.start();
23         t2.start();
24         t3.start();
25
26         try
27         {
28             latch.await();
29         } catch (InterruptedException e)
30         {
31             e.printStackTrace();
32         }
33
34         System.out.println("Main is end!");
35     }
36 }
37
38 class Worker implements Runnable
39 {
40     private String workerName;
41     private CountDownLatch latch;
42
43     public Worker(String workerName, CountDownLatch latch)
44     {
45         this.workerName = workerName;
46         this.latch = latch;
47     }
48
49     @Override
50     public void run()
51     {
52         try
53         {
54             System.out.println("Worker " + workerName + " is begin!");
55             Thread.sleep(1000L);
56             System.out.println("Worker " + workerName + " is end!");
57
58         } catch (InterruptedException e)
59         {
60             e.printStackTrace();
61         }
62         latch.countDown();
63     }
64 }

CountDownLatch的简单使用

信号量Semaphore:
 详解:
  通过对信号量的不同操作,可以分别实现进程间的互斥与同步;
  Semaphore可以控制某个资源被同时访问的任务数(维护了一个许可集合),它通过acquire()获取一个许可,release()释放一个许可;如果被同时访问的任务数已满,则其他acquire的任务进入等待状态,直到有一个任务被release掉,它才能得到许可;
 使用场景:
  排队场景,资源有限的房间,资源有限的群等等,常见的实际应用场景包括线程池、连接池等;

 1 /**
 2  * Semaphore的简单使用
 3  */
 4 package thread04;
 5
 6 import java.util.concurrent.ExecutorService;
 7 import java.util.concurrent.Executors;
 8 import java.util.concurrent.Semaphore;
 9 import java.util.concurrent.locks.ReentrantLock;
10
11 public class SemaphoreTest01
12 {
13     public static void main(String[] args)
14     {
15         ExecutorService es = Executors.newCachedThreadPool();
16
17         final Semaphore sh = new Semaphore(5);
18
19         ReentrantLock rl = new ReentrantLock();
20
21         for(int i=0;i<10;i++)
22         {
23             final int num = i;
24
25             Runnable run = new Runnable()
26             {
27                 @Override
28                 public void run()
29                 {
30                     // rl.lock();
31                     try
32                     {
33                         sh.acquire();
34                         System.out.println("线程 " + Thread.currentThread().getName() + " 获得许可:" + num);
35                         for(int i=0;i<999999;i++);
36                         sh.release();
37                         System.out.println("线程 " + Thread.currentThread().getName() + "释放许可:" + num);
38                         System.out.println("当前允许进入的任务个数: " + sh.availablePermits());
39                     }
40                     catch (InterruptedException e)
41                     {
42                         e.printStackTrace();
43                     }
44                     finally
45                     {
46                         // rl.unlock();
47                     }
48                 }
49             };
50             es.execute(run);
51         }
52
53         es.shutdown();
54     }
55 }

Semaphore的简单使用

障碍器CyclicBarrier:
 详解:
  又叫同步计数器;
 使用场景:
  你希望创建一组任务,它们并发地执行工作,另外的一个任务在这一组任务并发执行结束前一直阻塞等待,直到该组任务全部执行结束,这个任务才得以执行;

 1 /**
 2  * CyclicBarrier的简单使用
 3  */
 4 package thread04;
 5
 6 import java.util.concurrent.BrokenBarrierException;
 7 import java.util.concurrent.CyclicBarrier;
 8
 9 public class CyclicBarrierTest01
10 {
11     public static void main(String[] args)
12     {
13         // 创建CyclicBarrier对象,并设置执行完一组5个线程的并发任务后,再执行MainTask任务
14         CyclicBarrier cb = new CyclicBarrier(5, new MainTask());
15
16         SubTask sb1 = new SubTask("A", cb);
17         SubTask sb2 = new SubTask("B", cb);
18         SubTask sb3 = new SubTask("C", cb);
19         SubTask sb4 = new SubTask("D", cb);
20         SubTask sb5 = new SubTask("E", cb);
21
22         new Thread(sb1).start();
23         new Thread(sb2).start();
24         new Thread(sb3).start();
25         new Thread(sb4).start();
26         new Thread(sb5).start();
27
28     }
29 }
30
31 /**
32  * 最后要执行的任务
33  * @author Administrator
34  *
35  */
36 class MainTask implements Runnable
37 {
38     @Override
39     public void run()
40     {
41         System.out.println("前面的并发任务全部执行完毕后,开始执行最后任务...");
42     }
43 }
44
45 /**
46  * 一组并发任务
47  * @author Administrator
48  *
49  */
50 class SubTask implements Runnable
51 {
52     private String name;
53     private CyclicBarrier cb;
54
55     public SubTask(String name, CyclicBarrier cb)
56     {
57         this.name = name;
58         this.cb = cb;
59     }
60
61     @Override
62     public void run()
63     {
64         System.out.println("并发任务 " + name + " 开始执行...");
65         for(int i=0;i<999999;i++);
66         System.out.println("并发任务 " + name + " 执行完毕,通知障碍器...");
67
68         try
69         {
70             // 每执行完一项任务就通知障碍器
71             cb.await();
72         }
73         catch (InterruptedException | BrokenBarrierException e)
74         {
75             e.printStackTrace();
76         }
77     }
78
79     public String getName()
80     {
81         return name;
82     }
83     public void setName(String name)
84     {
85         this.name = name;
86     }
87     public CyclicBarrier getCb()
88     {
89         return cb;
90     }
91     public void setCb(CyclicBarrier cb)
92     {
93         this.cb = cb;
94     }
95 }

CyclicBarrier的简单使用

原文地址:https://www.cnblogs.com/kehuaihan/p/8460253.html

时间: 2024-10-06 02:58:13

Java并发编程从入门到精通 - 第5章:多线程之间的交互:线程阀的相关文章

Java并发编程从入门到精通 - 第6章:线程池

1.什么是线程池(为什么使用线程池):2.Executor框架介绍:  Java 5中引入的,其内部使用了线程池机制,在java.util.cocurrent 包下,通过该框架来控制线程的启动.执行和关闭(使用该框架来创建线程池),可以简化并发编程的操作:  Executor框架包括:线程池,Executor,Executors,ExecutorService,CompletionService,Future,Callable等:  ExecutorService接口继承自Executor接口,

Java并发编程从入门到精通 - 第2章:认识Thread

线程实现的三种方法:1.三种实现方式的简记: 继承Thread类,重写run()方法: 实现Runnable接口,重写run()方法,子类创建对象并作为Thread类的构造器参数: 实现Callable接口,重写call()方法,子类创建对象并作为FutureTask类的构造器参数,FutureTask类创建对象并作为Thread类的构造器参数:2.三种实现方法的比较: 继承Thread类:因为是单继承,所以扩展性不好: 实现Runnable接口:接口可以多重实现:并且还可以再继承一个类:扩展性

Java并发编程从入门到精通 - 第3章:Thread安全

Java内存模型与多线程: 线程不安全与线程安全: 线程安全问题阐述:  多条语句操作多个线程共享的资源时,一个线程只执行了部分语句,还没执行完,另一个线程又进来操作共享数据(执行语句),导致共享数据最终结果出现误差:所以就是看一个线程能否每次在没有其他线程进入的情况下操作完包含共享资源的语句块,如果能就没有安全问题,不能就有安全问题: 如何模拟多线程的安全问题:  用Thread.sleep()方法模拟: 放在哪:放在多线程操作共享数据的语句块之间(使正在运行的线程休息一会,让其他线程执行,就

Java并发编程从入门到精通 - 第7章:Fork/Join框架

1.综述:化繁为简,分而治之:递归的分解和合并,直到任务小到可以接受的程度:2.Future任务机制:  Future接口就是对于具体的Runnable或者Callable任务的执行结果进行取消.查询是否完成.获取结果:必要时可以通过get方法获取执行结果,该方法会阻塞直到任务会返回结果:也就是说Future接口提供三种功能:判断任务是否完成.能够中断任务.能够获取任务执行结果:  Future接口里面的常用方法:3.FutureTask:  FutureTask类是Future接口唯一的实现类

Java并发编程从入门到精通 张振华.Jack --我的书

[当当.京东.天猫.亚马逊.新华书店等均有销售] 目 录 第一部分:线程并发基础 第1章 概念部分   1 1.1 CPU核心数.线程数 (主流cpu,线程数的大体情况说一下) 1 1.2 CPU时间片轮转机制 2 1.3 什么是进程和什么是线程 4 1.4 进程和线程的比较 5 1.5 什么是并行运行 7 1.6 什么是多并发运行 8 1.7 什么是吞吐量 9 1.8  多并发编程的意义及其好处和注意事项 10 1.9  分布式与并发运算关系 11 1.10 Linux和Window多并发可以

Java并发编程从入门到精通 张振华.Jack --【吐血推荐、热销书籍】

[当当.京东.天猫.亚马逊.新华书店等均有销售]目 录 第一部分:线程并发基础 第1章 概念部分   1 1.1 CPU核心数.线程数 (主流cpu,线程数的大体情况说一下) 1 1.2 CPU时间片轮转机制 2 1.3 什么是进程和什么是线程 4 1.4 进程和线程的比较 5 1.5 什么是并行运行 7 1.6 什么是多并发运行 8 1.7 什么是吞吐量 9 1.8  多并发编程的意义及其好处和注意事项 10 1.9  分布式与并发运算关系 11 1.10 Linux和Window多并发可以采

Java并发编程从入门到精通-总纲

总纲: Thread; Thread安全; 线程安全的集合类; 多线程之间交互:线程阀; 线程池; Fork/Join; 第2章:认识Thread: 线程实现的三种方法; Thread里面的属性和方法; 线程的中断机制; 线程的生命周期; 守护线程; 线程组; 当前线程的副本:ThreadLocal; 线程异常的处理; 第3章:Thread安全: Java内存模型与多线程: 线程不安全和线程安全: 隐式锁synchronized: 显式锁Lock和ReentrantLock: 显式锁ReadWr

Java网络编程从入门到精通(4):DNS缓存

在通过DNS查找域名的过程中,可能会经过多台中间DNS服务器才能找到指定的域名,因此,在DNS服务器上查找域名是非常昂贵的操作.在Java中为了缓解这个问题,提供了DNS缓存.当InetAddress类第一次使用某个域名(如www.csdn.net)创建InetAddress对象后,JVM就会将这个域名和它从DNS上获得的信息(如IP地址)都保存在DNS缓存中.当下一次InetAddress类再使用这个域名时,就直接从DNS缓存里获得所需的信息,而无需再访问DNS服务器. DNS缓存在默认时将永

[Java 并发] Java并发编程实践 思维导图 - 第六章 任务执行

根据<Java并发编程实践>一书整理的思维导图.希望能够有所帮助. 第一部分: 第二部分: 第三部分: