马士兵老师高并发编程同步容器

手写固定同步容器

写一个固定容量同步容器,拥有put和get方法,以及getCount方法,能够支持2个生产者线程以及10个消费者线程的阻塞调用。

使用wait与notify

思路:使用一个集合来当做生产或者消费的中转站,然后每当生产或者消费的时刻都判断集合的容量,如果不满足条件那么就对这种操作进行阻塞也就是wait同时notify其它的所有线程。当其它线程启动之后也会遇到“不合格的线程”这时候也会阻塞,直到合格的线程进行执行。

核心代码:


  1. public class MyContainer1<T> {
  2. final private LinkedList<T> lists = new LinkedList<>();
  3. final private int MAX = 10; //最多10个元素
  4. private int count = 0;
  5. public synchronized void put(T t) {
  6. while(lists.size() == MAX) { //想想为什么用while而不是用if?
  7. try {
  8. this.wait(); //effective java
  9. } catch (InterruptedException e) {
  10. e.printStackTrace();
  11. }
  12. }
  13. lists.add(t);
  14. ++count;
  15. this.notifyAll(); //通知消费者线程进行消费
  16. }
  17. public synchronized T get() {
  18. T t = null;
  19. while(lists.size() == 0) {
  20. try {
  21. this.wait();
  22. } catch (InterruptedException e) {
  23. e.printStackTrace();
  24. }
  25. }
  26. t = lists.removeFirst();
  27. count --;
  28. this.notifyAll(); //通知生产者进行生产
  29. return t;
  30. }
  31. public static void main(String[] args) {
  32. MyContainer1<String> c = new MyContainer1<>();
  33. //启动消费者线程
  34. for(int i=0; i<10; i++) {
  35. new Thread(()->{
  36. for(int j=0; j<5; j++) System.out.println(c.get());
  37. }, "c" + i).start();
  38. }
  39. try {
  40. TimeUnit.SECONDS.sleep(2);
  41. } catch (InterruptedException e) {
  42. e.printStackTrace();
  43. }
  44. //启动生产者线程
  45. for(int i=0; i<2; i++) {
  46. new Thread(()->{
  47. for(int j=0; j<25; j++) c.put(Thread.currentThread().getName() + " " + j);
  48. }, "p" + i).start();
  49. }
  50. }
  51. }

注意事项:

为什么是while?

有下面的一个场景,当消费者消费的阈值不满足条件那么它将会被wait阻塞。此时还有其的9个消费者也处于阻塞状态。当notifyAll的时候我们希望的是让生产者来生产元素,但是这时候被唤醒的消费者会继续去消费,代码会从wait()处直接向下执行。如果是一个if判断,再加上这时的集合中没有元素那么此时一定会出异常。但是如果使用的是while的话那么被唤醒的消费者就会循环检测发现不满足条件就继续阻塞。整个程序顺利进行。

使用signalAll唤醒对应条件的线程

signalAll与ReentrantLock共用达到只唤醒对应条件的线程。比如说当生产者生产的元素超过阈值的时候他就会调用signalAll这时所有的消费者被唤醒而所有的生产者则不受影响。这样就可以避免唤醒不必要的线程节省资源。

此处注意要给每一种线程都定义一个Condition,在上锁的时候就只用这个Condition的锁去锁定对应的线程。具体代码如下:


  1. public class MyContainer2<T> {
  2. final private LinkedList<T> lists = new LinkedList<>();
  3. final private int MAX = 10; //最多10个元素
  4. private int count = 0;
  5. private Lock lock = new ReentrantLock();
  6. private Condition producer = lock.newCondition();
  7. private Condition consumer = lock.newCondition();
  8. public void put(T t) {
  9. try {
  10. lock.lock();
  11. while(lists.size() == MAX) {
  12. producer.await();
  13. }
  14. lists.add(t);
  15. ++count;
  16. consumer.signalAll(); //通知消费者线程进行消费
  17. } catch (InterruptedException e) {
  18. e.printStackTrace();
  19. } finally {
  20. lock.unlock();
  21. }
  22. }
  23. public T get() {
  24. T t = null;
  25. try {
  26. lock.lock();
  27. while(lists.size() == 0) {
  28. consumer.await();
  29. }
  30. t = lists.removeFirst();
  31. count --;
  32. producer.signalAll(); //通知生产者进行生产
  33. } catch (InterruptedException e) {
  34. e.printStackTrace();
  35. } finally {
  36. lock.unlock();
  37. }
  38. return t;
  39. }
  40. public static void main(String[] args) {
  41. MyContainer2<String> c = new MyContainer2<>();
  42. //启动消费者线程
  43. for(int i=0; i<10; i++) {
  44. new Thread(()->{
  45. for(int j=0; j<5; j++) {
  46. System.out.println(c.get());
  47. // System.out.println("c");
  48. }
  49. }, "c" + i).start();
  50. }
  51. try {
  52. TimeUnit.SECONDS.sleep(2);
  53. } catch (InterruptedException e) {
  54. e.printStackTrace();
  55. }
  56. //启动生产者线程
  57. for(int i=0; i<2; i++) {
  58. new Thread(()->{
  59. for(int j=0; j<25; j++) c.put(Thread.currentThread().getName() + " " + j);
  60. }, "p" + i).start();
  61. }
  62. }
  63. }

售票员案例

* 场景

* 有N张火车票,每张票都有一个编号

* 同时有10个窗口对外售票

* 请写一个模拟程序

*

* 分析下面的程序可能会产生哪些问题?

* 重复销售?超量销售?

Solution1:使用线程不安全的集合而且不上锁


  1. public class TicketSeller1 {
  2. static List<String> tickets = new ArrayList<>();
  3. static {
  4. for(int i=0; i<1000; i++) tickets.add("票编号:" + i);
  5. }
  6. public static void main(String[] args) {
  7. for(int i=0; i<10; i++) {
  8. new Thread(()->{
  9. while(tickets.size() > 0) {
  10. try {
  11. Thread.sleep(10);
  12. } catch (Exception e) {
  13. e.printStackTrace();
  14. }
  15. System.out.println("销售了--" + tickets.remove(0));
  16. }
  17. }).start();
  18. }
  19. }
  20. }

为了使得问题呈现的效果明显我们加上了睡眠时间。此程序会出现的问题:

总的来说会有两个点出现问题:

程序逻辑的线程不安全,有可能有多个线程涌入while循环中这是造成并发问题的根本。这个原因会导致在size为1的时候涌入很多的线程进而执行多次删除操作下标越界。

集合的线程不安全,remove的方法本来就不是线程安全的。为了说明问题我,我们把remove方法放大:

可以看出如果两个线程同时执行remove方法的话,由于index一样所以他们的remove的返回值就会得到同一个oldValue。也就是重复卖出。

Solution2:使用集合Vector但是代码逻辑不加锁

代码上的逻辑与solution1是一样的,但是Vector集合是线程安全的,所以它只会出现程序逻辑不安全带来的并发问题。也就是会出现有可能有多个线程涌入while循环中这是造成并发问题的根本。这个原因会导致在size为1的时候涌入很多的线程进而执行多次删除操作下标越界。但是绝对不会出现卖出同一张票的情况。我们把remove的代码放大:

这是一个同步的方法,每一个线程过来如果得不到锁得话都会陷入等待。虽然都是remove(0)但是当下一个线程来到的时候0位置已经是一个全新的元素。

Solution3:给代码逻辑上锁使用线程不安全的集合

不多说了无论如何都可以防止线程安全问题,因为在Solution1中已经提到过了代码的并发问题是一切问题的原因。直接上代码:


  1. public class TicketSeller3 {
  2. static List<String> tickets = new LinkedList<>();
  3. static {
  4. for(int i=0; i<1000; i++) tickets.add("票 编号:" + i);
  5. }
  6. public static void main(String[] args) {
  7. for(int i=0; i<10; i++) {
  8. new Thread(()->{
  9. while(true) {
  10. synchronized(tickets) {
  11. if(tickets.size() <= 0) break;
  12. try {
  13. TimeUnit.MILLISECONDS.sleep(10);
  14. } catch (InterruptedException e) {
  15. e.printStackTrace();
  16. }
  17. System.out.println("销售了--" + tickets.remove(0));
  18. }
  19. }
  20. }).start();
  21. }
  22. }
  23. }

Solution4使用并发集合不加锁

首先说并发的集合是线程安全的,而且效率较高因为使用了局部锁。这样的话只在取值的时候加了锁,而且如果是以下标来取值的话还可以同时取走多个地方的值这样的话效率大大提高。而且这里使用一种取值然后再判断的逻辑巧妙的避免了下表越界的错误,而前面的案例中都是先判断再取值这样就造成了线程不安全:


  1. public class TicketSeller4 {
  2. static Queue<String> tickets = new ConcurrentLinkedQueue<>();
  3. static {
  4. for(int i=0; i<1000; i++) tickets.add("票 编号:" + i);
  5. }
  6. public static void main(String[] args) {
  7. for(int i=0; i<10; i++) {
  8. new Thread(()->{
  9. while(true) {
  10. String s = tickets.poll();
  11. if(s == null) break;
  12. else System.out.println("销售了--" + s);
  13. }
  14. }).start();
  15. }
  16. }
  17. }

高并发集合简介

ConcurrectHashMap:使用了局部锁,也就是细粒度的锁来提高并发效果。

ConcurrectSkipListMap:使用了局部锁,但是结果也排序的map集合。对比TreeMap一个元素排序的map集合。

CopyOnWriteArrayList:读取时不加锁,但是写的时候回拷贝原有的数据然后对拷贝的数据进行操作最后将指针指向修改过的集合。这个集合适用于读操作远远大于写操作的情况。

BlockingQueue:阻塞队列,当队列中没有元素的时候就会对取元素产生阻塞,当队列中满元素的时候就会对添加元素产生阻塞。而且不允许添加null的值而且在取值与添加值的情况下都会加锁,换句话说它是一个线程安全的集合。以下为部分源码:

DelayQueue:执行定时任务,他的内部会装有很多的task接受的task都实现了Delay接口,因此task内部也就维护了一个conpareTo的方法,如果按照时间排序的话那么就能够实现任务的定时执行。


  1. public class T07_DelayQueue {
  2. static BlockingQueue<MyTask> tasks = new DelayQueue<>();
  3. static Random r = new Random();
  4. static class MyTask implements Delayed {
  5. long runningTime;
  6. MyTask(long rt) {
  7. this.runningTime = rt;
  8. }
  9. @Override
  10. public int compareTo(Delayed o) {
  11. if(this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS))
  12. return -1;
  13. else if(this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS))
  14. return 1;
  15. else
  16. return 0;
  17. }
  18. @Override
  19. public long getDelay(TimeUnit unit) {
  20. return unit.convert(runningTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
  21. }
  22. @Override
  23. public String toString() {
  24. return "" + runningTime;
  25. }
  26. }
  27. public static void main(String[] args) throws InterruptedException {
  28. long now = System.currentTimeMillis();
  29. MyTask t1 = new MyTask(now + 1000);
  30. MyTask t2 = new MyTask(now + 2000);
  31. MyTask t3 = new MyTask(now + 1500);
  32. MyTask t4 = new MyTask(now + 2500);
  33. MyTask t5 = new MyTask(now + 500);
  34. tasks.put(t1);
  35. tasks.put(t2);
  36. tasks.put(t3);
  37. tasks.put(t4);
  38. tasks.put(t5);
  39. System.out.println(tasks);
  40. for(int i=0; i<5; i++) {
  41. System.out.println(tasks.take());
  42. }
  43. }
  44. }

TransferQueue:当有消费者的话那么就直接将生产出来的元素交给消费者,但是如果没有消费者的话就会将生产的元素放到队列中。当队列中的元素消耗完的时候消费者就会阻塞。

原文地址:https://blog.csdn.net/qq_34993631/article/details/82685082

原文地址:https://www.cnblogs.com/jpfss/p/9913344.html

时间: 2024-10-11 22:18:39

马士兵老师高并发编程同步容器的相关文章

马士兵老师高并发编程基础入门

锁是指谁? Object o = new Object(); Synchronized(o); 我们一般认为Synchronized锁定的是这段代码块但事实上,Synchronized锁定的是锁这个对象.不仅如此Synchronized锁定的是heap内存中的这个对象而不是这个引用. 一个例子 /** * 锁定某对象o,如果o的属性发生改变,不影响锁的使用 * 但是如果o变成另外一个对象,则锁定的对象发生改变 * 应该避免将锁定对象的引用变成另外的对象 * @author mashibing *

马士兵java高并发编程三

1.使用静态内部类实现线程安全的单例模式 package com.weiyuan.test; /** * 采用内部类的形式实现单例模式 * 是同步安全的,并且实现了懒加载 * */ public class Sigleton { private Sigleton(){ } private static class Inner{ private static Sigleton s = new Sigleton(); } public static Sigleton getInstance(){ r

Java高并发编程(一)

1.原子量级操作(读.++操作.写分为最小的操作量单位,在多线程中进行原子量级编程保证程序可见性(有序性人为规定)) 由于某些问题在多线程条件下:产生了竞争的问题,(例如:在多线程中一个简单的计数器增加)如果在程序中不采用同步的机制,那么在程序的运行结果中,多个线程在访问此资源时候,产生Racing.解决这个问题,采用某种方式阻止其他线程在该线程使用该变量的时候使用该变量 采用原子级操作:1.采用加锁的机制(最好的操作)2.Java.concurrent.atomic包包含一些原子量操作:Ato

高并发编程必备基础 -- 转载自 并发编程网

文章转载自 并发编程网  本文链接地址:高并发编程必备基础 一. 前言 借用Java并发编程实践中的话"编写正确的程序并不容易,而编写正常的并发程序就更难了",相比于顺序执行的情况,多线程的线程安全问题是微妙而且出乎意料的,因为在没有进行适当同步的情况下多线程中各个操作的顺序是不可预期的,本文算是对多线程情况下同步策略的一个简单介绍. 二. 什么是线程安全问题 线程安全问题是指当多个线程同时读写一个状态变量,并且没有任何同步措施时候,导致脏数据或者其他不可预见的结果的问题.Java中首

Java 面试知识点解析(二)——高并发编程篇

前言: 在遨游了一番 Java Web 的世界之后,发现了自己的一些缺失,所以就着一篇深度好文:知名互联网公司校招 Java 开发岗面试知识点解析 ,来好好的对 Java 知识点进行复习和学习一番,大部分内容参照自这一篇文章,有一些自己补充的,也算是重新学习一下 Java 吧. 前序文章链接: Java 面试知识点解析(一)--基础知识篇 (一)高并发编程基础知识 这里涉及到一些基础的概念,我重新捧起了一下<实战 Java 高并发程序设计>这一本书,感觉到心潮澎湃,这或许就是笔者叙述功底扎实的

Java并发-从同步容器到并发容器

引言 容器是Java基础类库中使用频率最高的一部分,Java集合包中提供了大量的容器类来帮组我们简化开发,我前面的文章中对Java集合包中的关键容器进行过一个系列的分析,但这些集合类都是非线程安全的,即在多线程的环境下,都需要其他额外的手段来保证数据的正确性,最简单的就是通过synchronized关键字将所有使用到非线程安全的容器代码全部同步执行.这种方式虽然可以达到线程安全的目的,但存在几个明显的问题:首先编码上存在一定的复杂性,相关的代码段都需要添加锁.其次这种一刀切的做法在高并发情况下性

高并发编程专题说明

大家好,并发编程是一个提升程序员level的关键专题,本专题会从理论结合实践逐步深入,尽量用通俗的语言和跑的通的程序来给大家讲解,重点每个地方都会形成一个闭环,让大家真正掌握高并发编程的核心要点,让我们一起来学习,感受技术的乐趣. 对该专题感兴趣的,欢迎点赞!!! 最后给大家一个经验之谈,要提升自己的技术逼格,一句话,"不断走出自己的舒适区" 技术说明:本专题会以java作为编程语言,要求大家具备javase的基础知识,如果不具备就需要先掌握这部分知识再来看这个专题了. 原文地址:ht

高并发编程之线程安全与内存模型

微信公众号:Java修炼指南关注可与各位开发者共同探讨学习经验,以及进阶经验.如果有什么问题或建议,请在公众号留言.博客:https://home.cnblogs.com/u/wuyx/ 前几期简单介绍了一些线程方面的基础知识,以及一些线程的一些基础用法(想看往期文章的小伙伴可以直接拉到文章最下方飞速前往).本文通过java内存模型来介绍线程之间不可见的原因. 本期精彩原子性有序性指令重排序可见性Happen-Before规则 原子性 原子性对于我们开发者来说应该算是比较熟悉的了,通俗点说就是执

高并发编程之无锁

前几期简单介绍了一些线程方面的基础知识,以及一些线程的一些基础用法以及通过jvm内存模型的方式去介绍了一些并发中常见的问题(想看往期文章的小伙伴可以直接拉到文章最下方飞速前往).本文重点介绍一个概念“无锁” 本期精彩什么是无锁无锁类的原理AtomicIntegerUnsafeAtomicReferenceAtomicStampedReference 什么是无锁 在高并发编程中最重要的就是获取临界区资源,保证其中操作的原子性.一般来说使用synchronized关键字进行加锁,但是这种操作方式其实