线程安全集合

>>返回《C# 并发编程》

  • 1. 简介
  • 2. 不可变栈和队列
  • 3. 不可变列表
  • 4. 不可变Set集合
  • 5. 不可变字典
  • 6. 线程安全字典
  • 7. 阻塞队列
  • 8. 阻塞栈和包
  • 9. 异步队列
  • 10. 异步栈和包
  • 11. 阻塞/异步队列

1. 简介

  • 不可变集合

    • 不可变集合之间通常共享了大部分存储空间,因此其实浪费并不大
    • 因为是无法修改的,所以是线程安全
  • 线程安全集合
    • 可同时被多个线程修改的可变集合

      • 线程安全集合混合使用了细粒度锁定无锁技术,以确保线程被阻塞的时间最短

        • 通常情况下是根本不阻塞
    • 对很多线程安全集合进行枚举操作时,内部创建了该集合的一个快照(snapshot),并对这个快照进行枚举操作。
    • 线程安全集合的主要优点是多个线程可以安全地对其进行访问,而代码只会被阻塞很短的时间,或根本阻塞。

下面对常用的属于不可变集合线程安全集合类型的,特定数据结构的集合进行说明。

2. 不可变栈和队列

不可变集合采用的模式是返回一个修改过的集合,原始的集合引用是不变化的。

  • 这意味着,如果引用了特定的不可变集合的实例,它是不会变化的。
var stack = ImmutableStack<int>.Empty;
stack = stack.Push(13);
var biggerStack = stack.Push(7);
// 先显示“7”,接着显示“13”。
foreach (var item in biggerStack)
    Console.WriteLine($"biggerStack {item}");

// 只显示“13”。
foreach (var item in stack)
    Console.WriteLine($"stack {item}");

输出:

biggerStack 7
biggerStack 13
stack 13

两个栈实际上在内部共享了存储项目 13 的内存。

  • 这种实现方式的效率很高,并且可以很方便地创建当前状态的快照
  • 每个不可变集合的实例都是绝对线程安全

ImmutableQueue 使用方法类似。

  • 不可变集合的一个实例是永远不改变的。
  • 因为不会改变,所以是绝对线程安全的。
  • 对不可变集合使用修改方法时,返回修改后的集合。
  • 不可变集合非常适用于共享状态,但不适合用来做交换数据的通道。

3. 不可变列表

不可变列表的内部是用二叉树组织数据的。这么做是为了让不可变列表的实例之间共享的内存最大化。

  • 这导致 ImmutableList<T>List<T> 在常用操作上有性能上的差别(参见下表)。
操 作 List<T> ImmutableList<T>
Add 平摊 O(1) O(log N)
Insert O(N) O(log N)
RemoveAt O(N) O(log N)
Item[index] O(1) O(log N)

不可变列表确实可以使用index获取数据项,但需要注意性能问题。不能简单地用它来替代 List<T>

  • 这意味着应该尽量使用 foreach 而不是用 for

4. 不可变Set集合

  • ImmutableHashSet<T>

    • 是一个不含重复元素的集合
  • ImmutableSortedSet<T>
    • 是一个已排序的不含重复元素的集合
  • 都有相似的接口
//ImmutableHashSet
var hashSet = ImmutableHashSet<int>.Empty;
hashSet = hashSet.Add(13);
hashSet = hashSet.Add(7);
// 显示“7”和“13”,次序不确定。
foreach (var item in hashSet)
    Console.Write(item + " ");

System.Console.WriteLine();
hashSet = hashSet.Remove(7);

//ImmutableSortedSet
var sortedSet = ImmutableSortedSet<int>.Empty;
sortedSet = sortedSet.Add(13);
sortedSet = sortedSet.Add(7);
// 先显示“7”,接着显示“13”。
foreach (var item in sortedSet)
    Console.Write(item + " ");

var smallestItem = sortedSet[0];
// smallestItem == 7
sortedSet = sortedSet.Remove(7);

输出:

7 13
7 13 
操 作 ImmutableHashSet<T> ImmutableSortedSet<T>
Add O(log N) O(log N)
Remove O(log N) O(log N)
Item[index] 不可用 O(log N)

ImmutableSortedSet 索引操作的时间复杂度是 O(log N),而不是 O(1),这跟 上节中 ImmutableList<T> 的情况类似。

  • 这意味着它们适用同样的警告:使用 ImmutableSortedSet<T>时,应该尽量用 foreach 而不是用 for 。

可以先快速地以可变方式构建,然后转换成不可变集合

5. 不可变字典

  • ImmutableDictionary<TKey,TValue>
  • ImmutableSortedDictionar y<TKey,TValue>
//ImmutableDictionary
var dictionary = ImmutableDictionary<int, string>.Empty;
dictionary = dictionary.Add(10, "Ten");
dictionary = dictionary.Add(21, "Twenty-One");
dictionary = dictionary.SetItem(10, "Diez");
// 显示“10Diez”和“21Twenty-One”,次序不确定。
foreach (var item in dictionary)
    Console.WriteLine(item.Key + ":" + item.Value);

var ten = dictionary[10]; // ten == "Diez"
dictionary = dictionary.Remove(21);

//ImmutableSortedDictionary
var sortedDictionary = ImmutableSortedDictionary<int, string>.Empty; sortedDictionary = sortedDictionary.Add(10, "Ten");
sortedDictionary = sortedDictionary.Add(21, "Twenty-One");
sortedDictionary = sortedDictionary.SetItem(10, "Diez");
// 先显示“10Diez”,接着显示“21Twenty-One”。
foreach (var item in sortedDictionary)
    Console.WriteLine(item.Key + ":" + item.Value);

ten = sortedDictionary[10];
// ten == "Diez"
sortedDictionary = sortedDictionary.Remove(21);

输出:

10:Diez
21:Twenty-One
10:Diez
21:Twenty-One
操 作 I
操 作 ImmutableDictionary<TK,TV> ImmutableSortedDictionary<TK,TV>
Add O(log N) O(log N)
SetItem O(log N) O(log N)
Item[key] O(log N) O(log N)
Remove O(log N) O(log N)

6. 线程安全字典

var dictionary = new ConcurrentDictionary<int, string>();
var newValue = dictionary.AddOrUpdate(0,
key => "Zero",
(key, oldValue) => "Zero");

AddOrUpdate 方法有些复杂,这是因为这个方法必须执行多个步骤,具体步骤取决于并发字典的当前内容。

  • 方法的第一个参数是
  • 第二个参数是一个委托,它把键(本例中为 0)转换成添加到字典的值(本例中为“Zero”)
    • 只有当字典中没有这个键时,这个委托才会运行。
  • 第三个参数也是一个委托,它把(0)和原来的值转换成字典中修改后的值
    (“Zero”)。

    • 只有当字典中已经存在这个键时,这个委托才会运行。
  • AddOrUpdate return 这个键对应的新值(与其中一个委托返回的值相同)。

AddOrUpdate 可能要多次调用其中一个(或两个)委托。这种情况很少,但确实会发生。

  • 因此这些委托必须简单、快速,并且不能有副作用
  • 这些委托只能创建新的值,不能修改程序中其他变量
  • 这个原则适用于所有 ConcurrentDictionary<TKey,TValue> 的方法所使用的委托
// 使用与前面一样的“字典”。
string currentValue;
bool keyExists = dictionary.TryGetValue(0, out currentValue);

// 使用与前面一样的“字典”。
string removedValue;
bool keyExisted = dictionary.TryRemove(0, out removedValue);
  • 如果多个线程读写一个共享集合, 使用 ConcurrentDictrionary<TKey,TValue> 是最合适的
  • 如果不会频繁修改(很少修改), 那更适合使用 ImmutableDictionary<TKey, TValue>
  • 如果一些线程只添加元素,另一些线程只移除元素,那最好使用生产者/消费者集合

7. 阻塞队列

  • GetConsumingEnumerable 会阻塞线程
  • CommpleteAdding 方法执行后所有被 GetConsumingEnumerable 阻塞的线程开始执行
  • 每个元素只会被消费一次
private static readonly BlockingCollection<int> _blockingQueue = new BlockingCollection<int>();
public static async Task BlockingCollectionSP()
{
    Action consumerAction = () =>
     {
        Console.WriteLine($"started print({Thread.CurrentThread.ManagedThreadId}).");
        // 先显示“7”,后显示“13”。
        foreach (var item in _blockingQueue.GetConsumingEnumerable())
        {
             Console.WriteLine($"print({Thread.CurrentThread.ManagedThreadId}) {item}");
        }
        Console.WriteLine($"ended print({Thread.CurrentThread.ManagedThreadId}).");
     };
    Task task1 = Task.Run(consumerAction);
    Task task2 = Task.Run(consumerAction);
    Task task3 = Task.Run(consumerAction);

    _blockingQueue.Add(7);
    System.Console.WriteLine($"added 7.");
    _blockingQueue.Add(13);
    System.Console.WriteLine($"added 13.");
    _blockingQueue.CompleteAdding();
    System.Console.WriteLine("CompleteAdding.");

    try
    {
        _blockingQueue.Add(15);
    }
    catch (Exception ex)
    {
        System.Console.WriteLine($"{ex.GetType().Name}:{ex.Message}");
    }

    await Task.WhenAll(task1, task2, task3);
}

输出:

started print(4).
started print(3).
started print(6).
added 7.
added 13.
CompleteAdding.
ended print(6).
InvalidOperationException:The collection has been marked as complete with regards to additions.
print(4) 7
ended print(4).
print(3) 13
ended print(3).

8. 阻塞栈和包

  • 在默认情况下,.NET 中的 BlockingCollection<T> 用作阻塞队列,但它也可以作为任何类型的生产者/消费者集合
  • BlockingCollection<T> 实际上是对线程安全集合进行了封装, 实现了 IProducerConsumerCollection<T> 接口。
    • 因此可以在创建 BlockingCollection<T> 实例时指明规则
BlockingCollection<int> _blockingStack = new BlockingCollection<int>( new ConcurrentStack<int>());
BlockingCollection<int> _blockingBag = new BlockingCollection<int>( new ConcurrentBag<int>());

替换到阻塞队列示例代码中试试。

9. 异步队列

public static async Task BufferBlockPS()
{
    BufferBlock<int> _asyncQueue = new BufferBlock<int>();
    Func<Task> concurrentConsumerAction = async () =>
     {
         while (true)
         {
             int item;
             try
             {
                 item = await _asyncQueue.ReceiveAsync();
             }
             catch (InvalidOperationException)
             {
                 System.Console.WriteLine($"exit({Thread.CurrentThread.ManagedThreadId}).");
                 break;
             }
             Console.WriteLine($"print({Thread.CurrentThread.ManagedThreadId}) {item}");
         }
     };
    Func<Task> consumerAction = async () =>
    {
        try
        {
            // 先显示“7”,后显示“13”。 单线程可用
            while (await _asyncQueue.OutputAvailableAsync())
            {
                Console.WriteLine($"print({Thread.CurrentThread.ManagedThreadId}) {await _asyncQueue.ReceiveAsync()}");
            }
        }
        catch (Exception ex)
        {
            System.Console.WriteLine($"{ex.GetType().Name}({Thread.CurrentThread.ManagedThreadId}):{ex.Message}");
        }

    };

    Task t1 = consumerAction();
    Task t2 = consumerAction();

    // Task t1 = concurrentConsumerAction();
    // Task t2 = concurrentConsumerAction();

    // 生产者代码
    await _asyncQueue.SendAsync(7);
    await _asyncQueue.SendAsync(13);
    await _asyncQueue.SendAsync(15);
    System.Console.WriteLine("Added 7 13 15.");
    _asyncQueue.Complete();

    await Task.WhenAll(t1, t2);
}

输出:

Added 7 13 15.
print(4) 7
print(6) 13
print(4) 15
InvalidOperationException(3):The source completed without providing data to receive.

10. 异步栈和包

Nito.AsyncEx 库

AsyncCollection<int> _asyncStack = new AsyncCollection<int>( new ConcurrentStack<int>());
AsyncCollection<int> _asyncBag = new AsyncCollection<int>( new ConcurrentBag<int>());

11. 阻塞/异步队列

在阻塞队列中已经介绍了BufferBlock<T>

这里介绍 ActionBlock<int>

public static async Task ActionBlockPS()
{
    ActionBlock<int> queue = new ActionBlock<int>(u => Console.WriteLine($"print({Thread.CurrentThread.ManagedThreadId}) {u}"));

    // 异步的生产者代码
    await queue.SendAsync(7);
    await queue.SendAsync(13);
    System.Console.WriteLine("Added async.");
    // 同步的生产者代码
    queue.Post(15);
    queue.Post(17);
    System.Console.WriteLine("Added sync.");
    queue.Complete();
    System.Console.WriteLine($"Completed({Thread.CurrentThread.ManagedThreadId}).");
}

输出:

Added async.
Added sync.
Completed(1).
print(3) 7
print(3) 13
print(3) 15
print(3) 17

原文地址:https://www.cnblogs.com/BigBrotherStone/p/12247969.html

时间: 2024-07-31 10:33:52

线程安全集合的相关文章

Net线程安全集合

在看Supersocket源码的时候发现很多地方都用到了我们不是很常用的线程安全集合,这些都是由net优化后的线程安全集合因此 应该比我们常规lock来效率好一些 比如说: 1 CurrentStack 线程安全栈 2 ConcurrentDictionary 线程安全字典,

线程安全 集合

线程安全指的是该对象的add,remove,get等方法是线程安全的,即同一对象(同一个list),同一时间只有一个线程能在这几个方法上运行,其实针对的是这个集合list 这几种方法内容而言的,如list的add方法,这个add方法里面具体实现肯定有好多条代码,假如一个线程调用add方法,其实是在执行这个add方法里的具体实现的代码,再不停的执行代码过程中,完全有可能有另外一个线程也来执行这个list的add方法,也就是也会进这个list的add方法的具体实现代码中,那这个时候由于对象都是同一个

Java创建多线程和线程安全集合Vector

public class Test { public static Vector<String> data = new Vector<String>(); public static void main(String[] args) { for (int i = 0; i < 100; i++) { data.add("data" + i); } for (int i = 0; i < 3; i++) { Thread t = new Thread(

JDK5新特性之线程同步集合(五)

一. 传统集合: 传统方式下的Collection在迭代集合时, 不同意对集合进行改动: public class CollectionModifyExceptionTest { public static void main(String[] args) { Collection<String> list = new ArrayList<String>(); list.add("aaa"); list.add("bbb"); list.ad

Java面试题(五)线程及集合补充

1,一个进程就是一个应用程序.一个线程就是一个进程中运行的最小单元.  一个进程可以包括多线程. 餐馆(进程). 主线程. 服务员(线程) 服务员(线程) 服务员(线程) 服务员(线程) 加塞,让步,守护,睡觉,打断,设置优先级.. 2,线程的生命周期?新建,就绪,运行,阻塞,死亡 3,线程创建有几种方式?答:三种.extends Thread,implements Runnable,线程池. 继承是否要满足一种关系?is-a关系?子类 is a 父类. 4,不同线程创建对应的start方法. 

011-多线程-JUC集合-Queue-PriorityBlockingQueue和DelayQueue

一.PriorityBlockingQueue简介 PriorityBlockingQueue是一个支持优先级的无界阻塞队列.默认情况下元素采用自然顺序升序排列.也可以自定义类实现compareTo()方法来指定元素排序规则,或者初始化PriorityBlockingQueue时,指定构造参数Comparator来对元素进行排序.但需要注意的是不能保证同优先级元素的顺序. 二.DelayQueue DelayQueue是一个支持延时获取元素的无界阻塞队列.队列使用PriorityQueue来实现

012-多线程-JUC集合-Queue-SynchronousQueue和LinkedTransferQueue

一.SynchronousQueue概述 SynchronousQueue是一个不存储元素的队列.每一个put操作必须等待一个take操作,否则不能继续添加元素. 它支持公平访问队列.默认情况下线程采用非公平性策略访问队列.SynchronousQueue类只有两个构造方法: public SynchronousQueue() { this(false); } public SynchronousQueue(boolean fair) { transferer = fair ? new Tran

Concurrent下的线程安全集合淄滋爪篆装状

http://www.ebay.com/cln/zhenbe5/2015-01-28/165564250015 http://www.ebay.com/cln/ainxiacen/2015-01-28/165288136012 http://www.ebay.com/cln/chj3768/2015-01-28/165455877011 http://www.ebay.com/cln/kon-j97/2015-01-28/165564254015 http://www.ebay.com/cln/

Java集合的有序无序问题和线程安全与否问题

首先,清楚有序和无序是什么意思: 集合的有序.无序是指插入元素时,保持插入的顺序性,也就是先插入的元素优先放入集合的前面部分. 而排序是指插入元素后,集合中的元素是否自动排序.(例如升序排序) 1.有序集合:集合里的元素可以根据key或index访问.无序集合:集合里的元素只能遍历.有序集合在属性的增加,删除及修改中拥有较好的性能表现. Set集合一般是无序的.实现hash算法的集合一般是无序的,例如hashMap,hashTable List集合一般是有序的. 底层是Tree的一般是有序的,例