Java并发编程(四):并发容器(转)

  解决并发情况下的容器线程安全问题的。给多线程环境准备一个线程安全的容器对象。
  线程安全的容器对象: Vector, Hashtable。线程安全容器对象,都是使用 synchronized 方法实现的。
  concurrent 包中的同步容器,大多数是使用系统底层技术实现的线程安全。类似 native。 Java8 中使用 CAS。

1、Map/Set

  1.1 ConcurrentHashMap/ConcurrentHashSet

    底层哈希实现的同步 Map(Set)。效率高,线程安全。使用系统底层技术实现线程安全。 量级较 synchronized 低。key 和 value 不能为 null。

  1.2 ConcurrentSkipListMap/ConcurrentSkipListSet

    底层跳表(SkipList)实现的同步 Map(Set)。有序,效率比 ConcurrentHashMap 稍低。

import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;

public class Test_01_ConcurrentMap {

    public static void main(String[] args) {
//        final Map<String, String> map = new Hashtable<>();
         final Map<String, String> map = new ConcurrentHashMap<>();
         //ConcurrentSkipListMap跳表实现的,是排序的,最慢
//         final Map<String, String> map = new ConcurrentSkipListMap<>();
        final Random r = new Random();
        Thread[] array = new Thread[100];
        final CountDownLatch latch = new CountDownLatch(array.length);

        long begin = System.currentTimeMillis();
        for (int i = 0; i < array.length; i++) {
            array[i] = new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int j = 0; j < 100000; j++) {
                        map.put("key" + r.nextInt(100000), "value" + r.nextInt(100000));
                    }
                    latch.countDown();
                }
            });
        }
        for (Thread t : array) {
            t.start();
        }
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        long end = System.currentTimeMillis();
        System.out.println("执行时间为 : " + (end - begin) + "毫秒!");
    }

}

2、List

  2.1 CopyOnWriteArrayList

    写时复制集合。写入效率低,读取效率高。每次写入数据,都会创建一个新的底层数组。

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;

public class Test_02_CopyOnWriteList {

    public static void main(String[] args) {
         final List<String> list = new ArrayList<>();
//         final List<String> list = new Vector<>();
//        final List<String> list = new CopyOnWriteArrayList<>();
        final Random r = new Random();
        Thread[] array = new Thread[100];
        final CountDownLatch latch = new CountDownLatch(array.length);

        long begin = System.currentTimeMillis();
        for (int i = 0; i < array.length; i++) {
            array[i] = new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int j = 0; j < 1000; j++) {
                        list.add("value" + r.nextInt(100000));
                    }
                    latch.countDown();
                }
            });
        }
        for (Thread t : array) {
            t.start();
        }
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        long end = System.currentTimeMillis();
        System.out.println("执行时间为 : " + (end - begin) + "毫秒!");
        System.out.println("List.size() : " + list.size());
    }

}

3、Queue

  3.1 ConcurrentLinkedQueue

    基础链表同步队列。

/**
 * 并发容器 - ConcurrentLinkedQueue
 * 队列 - 链表实现的。
 */
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

public class Test_03_ConcurrentLinkedQueue {

    public static void main(String[] args) {
        Queue<String> queue = new ConcurrentLinkedQueue<>();
        for (int i = 0; i < 10; i++) {
            queue.offer("value" + i);
        }

        System.out.println(queue);
        System.out.println(queue.size());

        // peek() -> 查看queue中的首数据
        System.out.println(queue.peek());
        System.out.println(queue.size());

        // poll() -> 获取queue中的首数据
        System.out.println(queue.poll());
        System.out.println(queue.size());
    }

}

  3.2 LinkedBlockingQueue

    阻塞队列,队列容量不足自动阻塞,队列容量为 0 自动阻塞。

/**
 * 并发容器 - LinkedBlockingQueue
 * 阻塞容器。
 * put & take - 自动阻塞。
 * put自动阻塞, 队列容量满后,自动阻塞
 * take自动阻塞方法, 队列容量为0后,自动阻塞。
 */

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public class Test_04_LinkedBlockingQueue {

    final BlockingQueue<String> queue = new LinkedBlockingQueue<>();
    final Random r = new Random();

    public static void main(String[] args) {
        final Test_04_LinkedBlockingQueue t = new Test_04_LinkedBlockingQueue();

        new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    try {
                        t.queue.put("value" + t.r.nextInt(1000));
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }, "producer").start();

        for (int i = 0; i < 3; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    while (true) {
                        try {
                            System.out.println(Thread.currentThread().getName() +
                                    " - " + t.queue.take());
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }, "consumer" + i).start();
        }
    }

}

  3.3 ArrayBlockingQueue

    底层数组实现的有界队列。自动阻塞。根据调用 API(add/put/offer)不同,有不同特 性。
    当容量不足的时候,有阻塞能力。
    add 方法在容量不足的时候,抛出异常put 方法在容量不足的时候,阻塞等待
    offer 方法:
    单参数 offer 方法,不阻塞。容量不足的时候,返回 false。当前新增数据操作放弃。 三参数 offer 方法(offer(value,times,timeunit)),容量不足的时候,阻塞 times 时长(单
位为 timeunit),如果在阻塞时长内,有容量空闲,新增数据返回 true。如果阻塞时长范围 内,无容量空闲,放弃新增数据,返回 false。

/**
 * 并发容器 - ArrayBlockingQueue
 * 有界容器。
 */

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

public class Test_05_ArrayBlockingQueue {

    final BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);

    public static void main(String[] args) {
        final Test_05_ArrayBlockingQueue t = new Test_05_ArrayBlockingQueue();

        for (int i = 0; i < 5; i++) {
            // System.out.println("add method : " + t.queue.add("value"+i));
            /*try {
                t.queue.put("put"+i);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("put method : " + i);*/
            // System.out.println("offer method : " + t.queue.offer("value"+i));
            try {
                System.out.println("offer method : " +
                        t.queue.offer("value" + i, 1, TimeUnit.SECONDS));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        System.out.println(t.queue);
    }

}

  3.4 DelayQueue

    延时队列。根据比较机制,实现自定义处理顺序的队列。常用于定时任务。
    如:定时关机。

/**
 * 并发容器 - DelayQueue
 * 无界容器。
 */

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class Test_06_DelayQueue {

    static BlockingQueue<MyTask_06> queue = new DelayQueue<>();

    public static void main(String[] args) throws InterruptedException {
        long value = System.currentTimeMillis();
        MyTask_06 task1 = new MyTask_06(value + 2000);
        MyTask_06 task2 = new MyTask_06(value + 1000);
        MyTask_06 task3 = new MyTask_06(value + 3000);
        MyTask_06 task4 = new MyTask_06(value + 2500);
        MyTask_06 task5 = new MyTask_06(value + 1500);

        queue.put(task1);
        queue.put(task2);
        queue.put(task3);
        queue.put(task4);
        queue.put(task5);

        System.out.println(queue);
        System.out.println(value);
        for (int i = 0; i < 5; i++) {
            System.out.println(queue.take());
        }
    }

}

class MyTask_06 implements Delayed {

    private long compareValue;

    public MyTask_06(long compareValue) {
        this.compareValue = compareValue;
    }

    /**
     * 比较大小。自动实现升序
     * 建议和getDelay方法配合完成。
     * 如果在DelayQueue是需要按时间完成的计划任务,必须配合getDelay方法完成。
     */
    @Override
    public int compareTo(Delayed o) {
        return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
    }

    /**
     * 获取计划时长的方法。
     * 根据参数TimeUnit来决定,如何返回结果值。
     */
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(compareValue - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    @Override
    public String toString() {
        return "Task compare value is : " + this.compareValue;
    }

}

  3.5 LinkedTransferQueue

    转移队列,是一个容量为 0 的队列。使用 transfer 方法,实现数据的即时处理。没有消费者,就阻塞。

/**
 * 并发容器 - LinkedTransferQueue
 * 转移队列
 * add - 队列会保存数据,不做阻塞等待。
 * transfer - 是TransferQueue的特有方法。必须有消费者(take()方法的调用者)。
 * 如果没有任意线程消费数据,transfer方法阻塞。一般用于处理即时消息。
 */

import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TransferQueue;

public class Test_07_TransferQueue {

    TransferQueue<String> queue = new LinkedTransferQueue<>();

    public static void main(String[] args) {
        final Test_07_TransferQueue t = new Test_07_TransferQueue();

        /*new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println(Thread.currentThread().getName() + " thread begin " );
                    System.out.println(Thread.currentThread().getName() + " - " + t.queue.take());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "output thread").start();

        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        try {
            t.queue.transfer("test string");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }*/

        new Thread(new Runnable() {

            @Override
            public void run() {
                try {
                    t.queue.transfer("test string");
                    // t.queue.add("test string");
                    System.out.println("add ok");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();

        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println(Thread.currentThread().getName() + " thread begin ");
                    System.out.println(Thread.currentThread().getName() + " - " + t.queue.take());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "output thread").start();

    }

}

  3.6 SynchronusQueue

    同步队列,是一个容量为 0 的队列。是一个特殊的 TransferQueue。 必须现有消费线程等待,才能使用的队列。
    add 方法,无阻塞。若没有消费线程阻塞等待数据,则抛出异常。 put 方法,有阻塞。若没有消费线程阻塞等待数据,则阻塞。

/**
 * 并发容器 - SynchronousQueue
 */
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

public class Test_08_SynchronusQueue {

    BlockingQueue<String> queue = new SynchronousQueue<>();

    public static void main(String[] args) {
        final Test_08_SynchronusQueue t = new Test_08_SynchronusQueue();

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println(Thread.currentThread().getName() + " thread begin ");
                    try {
                        TimeUnit.SECONDS.sleep(2);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName() + " - " + t.queue.take());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "output thread").start();

        /*try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }*/
        // t.queue.add("test add");
        try {
            t.queue.put("test put");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println(Thread.currentThread().getName() + " queue size : " + t.queue.size());
    }

}

转载于:https://www.cnblogs.com/jing99/p/10733597.html

原文地址:https://www.cnblogs.com/ruiyeclub/p/12326531.html

时间: 2024-10-10 23:37:53

Java并发编程(四):并发容器(转)的相关文章

Java并发编程:并发容器ConcurrentHashMap

Java并发编程:并发容器之ConcurrentHashMap(转载) 下面这部分内容转载自: http://www.haogongju.net/art/2350374 JDK5中添加了新的concurrent包,相对同步容器而言,并发容器通过一些机制改进了并发性能.因为同步容器将所有对容器状态的访问都 串行化了,这样保证了线程的安全性,所以这种方法的代价就是严重降低了并发性,当多个线程竞争容器时,吞吐量严重降低.因此Java5.0开 始针对多线程并发访问设计,提供了并发性能较好的并发容器,引入

Java并发编程:并发容器之ConcurrentHashMap(转)

本文转自:http://www.cnblogs.com/dolphin0520/p/3932905.html Java并发编程:并发容器之ConcurrentHashMap(转载) 下面这部分内容转载自: http://www.haogongju.net/art/2350374 JDK5中添加了新的concurrent包,相对同步容器而言,并发容器通过一些机制改进了并发性能.因为同步容器将所有对容器状态的访问都 串行化了,这样保证了线程的安全性,所以这种方法的代价就是严重降低了并发性,当多个线程

Java并发编程:并发容器之CopyOnWriteArrayList(转)

本文转自:http://www.cnblogs.com/dolphin0520/p/3938914.html Java并发编程:并发容器之CopyOnWriteArrayList(转载) 原文链接: http://ifeve.com/java-copy-on-write/ Copy-On-Write简称COW,是一种用于程序设计中的优化策略.其基本思路是,从一开始大家都在共享同一个内容,当某个人想要修改这个内容的时候,才会真正把内容Copy出去形成一个新的内容然后再改,这是一种延时懒惰策略.从J

【转】Java并发编程:并发容器之ConcurrentHashMap

JDK5中添加了新的concurrent包,相对同步容器而言,并发容器通过一些机制改进了并发性能.因为同步容器将所有对容器状态的访问都串行化了,这样保证了线程的安全性,所以这种方法的代价就是严重降低了并发性,当多个线程竞争容器时,吞吐量严重降低.因此Java5.0开始针对多线程并发访问设计,提供了并发性能较好的并发容器,引入了java.util.concurrent包.与Vector和Hashtable.Collections.synchronizedXxx()同步容器等相比,util.conc

【Java并发编程】并发编程大合集-值得收藏

http://blog.csdn.net/ns_code/article/details/17539599这个博主的关于java并发编程系列很不错,值得收藏. 为了方便各位网友学习以及方便自己复习之用,将Java并发编程系列内容系列内容按照由浅入深的学习顺序总结如下,点击相应的标题即可跳转到对应的文章    [Java并发编程]实现多线程的两种方法    [Java并发编程]线程的中断    [Java并发编程]正确挂起.恢复.终止线程    [Java并发编程]守护线程和线程阻塞    [Ja

【Java并发编程】并发编程大合集

转载自:http://blog.csdn.net/ns_code/article/details/17539599 为了方便各位网友学习以及方便自己复习之用,将Java并发编程系列内容系列内容按照由浅入深的学习顺序总结如下,点击相应的标题即可跳转到对应的文章    [Java并发编程]实现多线程的两种方法    [Java并发编程]线程的中断    [Java并发编程]正确挂起.恢复.终止线程    [Java并发编程]守护线程和线程阻塞    [Java并发编程]Volatile关键字(上)

Java并发编程:同步容器

为了方便编写出线程安全的程序,Java里面提供了一些线程安全类和并发工具,比如:同步容器.并发容器.阻塞队列.Synchronizer(比如CountDownLatch).今天我们就来讨论下同步容器. 以下是本文的目录大纲: 一.为什么会出现同步容器? 二.Java中的同步容器类 三.同步容器的缺陷 若有不正之处请多多谅解,并欢迎批评指正. 请尊重作者劳动成果,转载请标明原文链接: http://www.cnblogs.com/dolphin0520/p/3933404.html 一.为什么会出

Java 并发编程(四):如何保证对象的线程安全性

本篇来谈谈 Java 并发编程:如何保证对象的线程安全性. 01.前言 先让我吐一句肺腑之言吧,不说出来会憋出内伤的.<Java 并发编程实战>这本书太特么枯燥了,尽管它被奉为并发编程当中的经典之作,但我还是忍不住.因为第四章"对象的组合"我整整啃了两周的时间,才啃出来点肉丝. 读者朋友们见谅啊.要怪只能怪我自己的学习能力有限,真读不了这种生硬无趣的技术书.但是为了学习,为了进步,为了将来(口号喊得有点大了),只能硬着头皮上. 请随我来,我尽量写得有趣点. 02.线程安全类

Java并发编程:并发容器之ConcurrentHashMap

下面这部分内容转载自: http://www.haogongju.net/art/2350374 JDK5中添加了新的concurrent包,相对同步容器而言,并发容器通过一些机制改进了并发性能.因为同步容器将所有对容器状态的访问都 串行化了,这样保证了线程的安全性,所以这种方法的代价就是严重降低了并发性,当多个线程竞争容器时,吞吐量严重降低.因此Java5.0开 始针对多线程并发访问设计,提供了并发性能较好的并发容器,引入了java.util.concurrent包.与Vector和Hasht