17.并发类容器设计

并发类容器设计

1.ConcurrentHashMap:代替散列普通的hashTable,添加了复合操作支持。

  1. private ConcurrentHashMap<String, Object> resultMap = new ConcurrentHashMap<String, Object>();
  2. for (Map.Entry<String, Object> m : resultMap.entrySet()) {
  3. count += (Long) m.getValue();
  4. }

2.CopyOnWriteArrayList:代替了voctor,并发的CopyOnWriteArraySet,以及并发的queue。

3.ConcurrentLikedQueue:(高性能队列)、无阻塞形式、基于链表的无界队列,该队列不允许为空、先进先出原则。

  1. package demo6;
  2. import java.util.concurrent.ConcurrentLinkedQueue;
  3. /**高性能队列、无阻塞
  4. */
  5. public class DemoConcurrentLinkedQueue {
  6. /**
  7. * 说明:list与Queue的区别
  8. * 1.队列先进先出
  9. * 2.list集合
  10. */
  11. /*
  12. 使用take()函数,如果队列中没有数据,则线程wait释放CPU,而poll()则不会等待,直接返回null;同样,空间耗尽时offer()函数不会等待,直接返回false,
  13. 而put()则会wait,因此如果你使用while(true)来获得队列元素,千万别用poll(),CPU会100%的.
  14. */
  15. public static void main(String[] args){
  16. ConcurrentLinkedQueue<String> mq = new ConcurrentLinkedQueue<String>();
  17. mq.add("小明");
  18. mq.add("王老吉");
  19. mq.add("大宝");
  20. mq.add("哈士奇");
  21. System.err.println(mq.poll());//1.取出该元素并删除
  22. System.err.println(mq.size());
  23. System.err.println(mq.peek());//1.取出不删除
  24. System.err.println(mq.size());
  25. }
  26. }
  27. 输出:
  28. 小明 3 王老吉 3

4.LinkedBlockingQueue:(阻塞形式队列)、连链表构成、采用分离锁、读写分离、无界队列、可并行生产、消费。

  1. BlockingQueue<String> mq2 = new LinkedBlockingQueue<String>(5);//有界、无界
  2. mq2.add("1+");
  3. mq2.add("2+");
  4. mq2.add("3+");
  5. mq2.add("4+");
  6. mq2.add("5+");
  7. for (Iterator iterator = mq2.iterator();iterator.hasNext();){
  8. String str = (String) iterator.next();
  9. System.err.println(str);
  10. }
  11. List<String> list = new ArrayList<>();
  12. System.err.println("批量获取:"+mq2.drainTo(list,3));
  13. for (String s:list){

5.ArrayBlockingQueue:基于数组的实现阻塞形式队列、自定义长度、生产、消费不能并行执行,可先进先出、先进后出。

  1. BlockingQueue<String> mq1 = new ArrayBlockingQueue<String>(3);
  2. mq1.add("a");
  3. mq1.put("b");
  4. mq1.add("e");

6.PriorityBlockingQueue:基于优先级的阻塞形式,使用公平分离锁实现,优先级有comparable对象决定,实现此接口,没取一个队列排列一次队列

  1. package demo6;
  2. public class Task implements Comparable<Task> {
  3. private int id;
  4. private String name;
  5. public int getId() {
  6. return id;
  7. }
  8. public void setId(int id) {
  9. this.id = id;
  10. }
  11. public String getName() {
  12. return name;
  13. }
  14. public void setName(String name) {
  15. this.name = name;
  16. }
  17. @Override
  18. public int compareTo(Task o) {
  19. return this.id > o.id ? 1 : (this.id < o.id ? -1 : 0);
  20. }
  21. @Override
  22. public String toString() {
  23. return "Task{" +
  24. "id=" + id +
  25. ", name=‘" + name + ‘\‘‘ +
  26. ‘}‘;
  27. }
  28. }
  1. package demo6;
  2. import java.util.concurrent.PriorityBlockingQueue;
  3. public class DemoPriorityBlockingQueue {
  4. public static void main(String[] args) throws InterruptedException {
  5. PriorityBlockingQueue<Task> priorityBlockingQueue = new PriorityBlockingQueue<Task>();
  6. /**
  7. * 每取一个队列,就排一次序
  8. */
  9. Task t1 = new Task();
  10. t1.setId(3);
  11. t1.setName(" 任务1");
  12. Task t2 = new Task();
  13. t2.setId(6);
  14. t2.setName(" 任务2");
  15. Task t3 = new Task();
  16. t3.setId(1);
  17. t3.setName(" 任务3");
  18. priorityBlockingQueue.add(t1);
  19. priorityBlockingQueue.add(t2);
  20. priorityBlockingQueue.add(t3);
  21. /* for (Task q:priorityBlockingQueue){
  22. System.err.println(q.getId()+"\t"+q.getName());
  23. }*/
  24. System.err.println("容器:"+priorityBlockingQueue);
  25. System.err.println(priorityBlockingQueue.poll().getId());
  26. System.err.println("容器:"+priorityBlockingQueue);
  27. System.err.println(priorityBlockingQueue.poll().getId());
  28. System.err.println("容器:"+priorityBlockingQueue);
  29. System.err.println(priorityBlockingQueue.poll().getId());
  30. }
  31. }
  32. 输出
  33. 容器:[Task{id=1, name=‘ 任务3‘}, Task{id=6, name=‘ 任务2‘}, Task{id=3, name=‘ 任务1‘}] 1 容器:[Task{id=3, name=‘ 任务1‘}, Task{id=6, name=‘ 任务2‘}] 3 容器:[Task{id=6, name=‘ 任务2‘}] 6

7.synchronizeQueue:无缓冲的队列,生产消费的数据直接被消费者offer,不可直接使用add方法,必须先take方法在前,调用处于阻塞状态,等待add()方法的通知。

  1. package demo6;
  2. import java.util.concurrent.SynchronousQueue;
  3. public class DemoSynchronousQueue {
  4. public static void main(String[] args) {
  5. final SynchronousQueue<String> synchronousQueue = new SynchronousQueue<String>();
  6. /**
  7. * 不可直接使用add()方法,必须先有take方法在前调用(处于阻塞状态,等待add的通知激活),add()方法在后,
  8. */
  9. Thread t1 = new Thread(new Runnable() {
  10. @Override
  11. public void run() {
  12. try {
  13. System.err.println(synchronousQueue.take());
  14. } catch (InterruptedException e) {
  15. e.printStackTrace();
  16. }
  17. }
  18. }, "T1");
  19. t1.start();
  20. Thread t2 = new Thread(new Runnable() {
  21. @Override
  22. public void run() {
  23. synchronousQueue.add("哈哈哈哈");
  24. }
  25. }, "T2");
  26. t2.start();
  27. }
  28. }

8.delayQueue:带有延迟时间的队列。

  1. package demo6;
  2. import java.util.concurrent.Delayed;
  3. import java.util.concurrent.TimeUnit;
  4. public class WangMin implements Delayed {
  5. private String name;
  6. private String id;
  7. /*结束时间*/
  8. private long endTime;
  9. public WangMin(String name, String id, long endTime) {
  10. this.name = name;
  11. this.id = id;
  12. this.endTime = endTime;
  13. }
  14. public String getName() {
  15. return name;
  16. }
  17. public void setName(String name) {
  18. this.name = name;
  19. }
  20. public String getId() {
  21. return id;
  22. }
  23. public void setId(String id) {
  24. this.id = id;
  25. }
  26. public long getEndTime() {
  27. return endTime;
  28. }
  29. public void setEndTime(long endTime) {
  30. this.endTime = endTime;
  31. }
  32. private TimeUnit timeUnit = TimeUnit.SECONDS;
  33. /*用来判断是否时间已经到期*/
  34. @Override
  35. public long getDelay(TimeUnit unit) {
  36. return this.endTime - System.currentTimeMillis();
  37. }
  38. /*相互排序比较时间*/
  39. @Override
  40. public int compareTo(Delayed o) {
  41. WangMin wangMin = (WangMin) o;
  42. return this.getDelay(this.timeUnit) - wangMin.getDelay(this.timeUnit) > 0 ? 1 : 0;
  43. }
  44. }
  1. package demo6;
  2. import java.util.concurrent.DelayQueue;
  3. public class WangBa implements Runnable{
  4. private DelayQueue<WangMin> queue = new DelayQueue<WangMin>();
  5. public boolean yinye = true;
  6. public void startShangWang(String name, String id, int money) {
  7. WangMin wangMin = new WangMin(name,id,1000*money+System.currentTimeMillis());
  8. System.err.println("网络机器:"+wangMin.getName()+"\t身份证:"+wangMin.getId()+"\t金额:"+money+"\t开始上网。。。");
  9. queue.add(wangMin);
  10. }
  11. public void endShangWang(WangMin wangMin) {
  12. System.err.println("网络机器:"+wangMin.getName()+"\t身份证:"+wangMin.getId()+"\t下机了....");
  13. }
  14. @Override
  15. public void run() {
  16. while (yinye){
  17. try {
  18. WangMin min = queue.take();
  19. endShangWang(min);
  20. } catch (InterruptedException e) {
  21. e.printStackTrace();
  22. }
  23. }
  24. }
  25. public static void main(String[] args){
  26. System.err.println("网吧开始...");
  27. WangBa wangBa = new WangBa();
  28. Thread sw = new Thread(wangBa);
  29. sw.start();
  30. wangBa.startShangWang("亚瑟1","xdwdasdasda12324323",5);
  31. wangBa.startShangWang("亚瑟2","xdwdasdasda12324323",20);
  32. wangBa.startShangWang("亚瑟3","xdwdasdasda12324323",15);
  33. }
  34. }
  35. 输出:
  36. 网吧开始... 网络机器:亚瑟1 身份证:xdwdasdasda12324323 金额:5 开始上网。。。 网络机器:亚瑟2 身份证:xdwdasdasda12324323 金额:20 开始上网。。。 网络机器:亚瑟3 身份证:xdwdasdasda12324323 金额:15 开始上网。。。 网络机器:亚瑟1 身份证:xdwdasdasda12324323 下机了.... 网络机器:亚瑟3 身份证:xdwdasdasda12324323 下机了.... 网络机器:亚瑟2 身份证:xdwdasdasda12324323 下机了....

常用方法

1.add() 添加一个元素。

2.offer(队列,时间秒,时间范围单位)

3.poll()取出元素并删除,否返回null。

4.peck()取出,否返回false。

5.tace()取出,如果队列无数据直接释放cpu。

并发类:

1. ConcurrentLikedQueue         高性能、无阻塞、无界

2.BlockingQueue    阻塞形式队列

null

时间: 2024-10-09 15:49:27

17.并发类容器设计的相关文章

架构师养成--7.同步类容器和并发类容器

一.同步类容器 同步类容器都是线程安全的,但在某些场景下可能需要加锁来保护复合操作.复合类操作如:迭代(反复访问元素,遍历完容器中的所有元素).跳转(根据指定的顺序找到当前元素的下一个元素).以及条件运算.这些复合操作在多线程并发的修改容器时,可能会表现出意外的行为,最经典的便是ConcurrentModificationException,原因是当容器迭代的过程中,被并发的修改了内容,这是由于早期迭代器设计的时候并没有考虑并发修改的问题. 同步类容器:如古老的Vector/HashTable.

同步类容器和并发类容器

一.同步类容器 同步类容器都是线程安全的,但在某些场景下可能需要加锁来保护复合操作.复合类操作如:迭代(反复访问元素,遍历完容器中的所有元素).跳转(根据指定的顺序找到当前元素的下一个元素).以及条件运算.这些复合操作在多线程并发的修改容器时,可能会表现出意外的行为,最经典的便是ConcurrentModificationException,原因是当容器迭代的过程中,被并发的修改了内容,这是由于早期迭代器设计的时候并没有考虑并发修改的问题. 同步类容器:如古老的Vector/HashTable.

并发编程(9):同步类容器与并发类容器

1.同步类容器 同步类容器都是线程安全的,但在某些场景下可能需要加锁来保护复合操作. 复合操作,如: 迭代(反复访问元素,遍历完容器中所有的元素) 跳转(根据指定的顺序找到当前元素的下一个元素) 条件运算 这些复合操作在多线程并发的修改容器时,可能会表现出意外的行为,最经典的就是ConcurrentModificationException,原因是当容器迭代的过程中,被并发的修改了内容,这是由于早期迭代器设计的时候并没有考虑并发修改的问题. 同步类容器:如古老的Vector.HashTble.这

线程学习--(七)单例和多线程、同步类容器和并发类容器

一.同步类容器 同步类容器都是线程安全的,但在某些场景下可能需要加锁来保护复合操作.复合类操作如:迭代(反复访问元素,遍历完容器中的所有元素).跳转(根据指定的顺序找到当前元素的下一个元素).以及条件运算.这些复合操作在多线程并发的修改容器时,可能会表现出意外的行为,最经典的便是ConcurrentModificationException,原因是当容器迭代的过程中,被并发的修改了内容,这是由于早期迭代器设计的时候并没有考虑并发修改的问题. 同步类容器:如古老的Vector/HashTable.

并发类容器-第二讲

一.首先我们来熟悉一下跳表(数据结构和算法) 参考资料: https://www.cnblogs.com/seniusen/p/9870398.html https://blog.csdn.net/u010425776/article/details/54890215 跳表的由来:作用:存储有序序列,并且实现高效的查找与插入删除.存储有序序列最简单的办法就是使用数组,从而查找可以采用二分搜索,但插入删除需要移动元素较为低效.因此出现了二叉搜索树,用来解决插入删除移动元素的问题.但二叉搜索树在最坏

18.并发类容器MQ

package demo7.MQ; public class QueueData { private int id; private String name; private String taskCode; public QueueData() { } public QueueData(int id, String name, String taskCode) { this.id = id; this.name = name; this.taskCode = taskCode; } publi

并发类容器-第一讲

一.基础知识夯实 1.首先是我们需要懂得几点是在java中的集中运算 直接看代码吧 public class IntToBinary { public static void main(String[] args) throws UnsupportedEncodingException { int data = 4; System.out.println("the 4 is "+Integer.toBinaryString(data)); //位与 &(1&1=1 1&

Java线程同步类容器和并发容器(四)

同步类容器都是线程安全的,在某些场景下,需要枷锁保护符合操作,最经典ConcurrentModifiicationException,原因是当容器迭代的过程中,被并发的修改了内容. for (Iterator iterator = tickets.iterator(); iterator.hasNext();) { String string = (String) iterator.next(); tickets.remove(20); } //多线程使用Vector或者HashTable的示例

深入浅出 Java Concurrency (17): 并发容器 part 2 ConcurrentMap (2)

本来想比较全面和深入的谈谈ConcurrentHashMap的,发现网上有很多对HashMap和ConcurrentHashMap分析的文章,因此本小节尽可能的分析其中的细节,少一点理论的东西,多谈谈内部设计的原理和思想. 要谈ConcurrentHashMap的构造,就不得不谈HashMap的构造,因此先从HashMap开始简单介绍. HashMap原理 我们从头开始设想.要将对象存放在一起,如何设计这个容器.目前只有两条路可以走,一种是采用分格技术,每一个对象存放于一个格子中,这样通过对格子