C#高级编程五十八天----并行集合

并行集合

对于并行任务,与其相关紧密的就是对一些共享资源,数据结构的并行訪问.常常要做的就是对一些队列进行加锁-解锁,然后运行类似插入,删除等等相互排斥操作. .NET4提供了一些封装好的支持并行操作数据容器,能够降低并行编程的复杂程度.

并行集合的命名空间:System.Collections.Concurrent

并行容器:

ConcurrentQueue

ConcurrentStack

ConcurrentBag: 一个无序的数据结构集,当不考虑顺序时很实用.

BlockingCollection:与经典的堵塞队列数据结构类似

ConcurrentDictoinary

以上这些集合在某种程度上使用了无锁技术(CAS和内存屏蔽),与加相互排斥锁相比获得了性能的提升.可是在串行程序中,最好不要使用这些集合,他们必定会影响性能.

ConcurrentQueue使用方法与实例

其全然无锁,但当CAS面临资源竞争失败时可能会陷入自旋并重试操作.

Enqueue:在队尾插入元素

TryDequeue:尝试删除对头元素,并通过out參数返回

TryPeek:尝试将对头元素通过out參数返回,但不删除元素

案例:

using System;

using System.Collections.Generic;

using System.Linq;

using System.Text;

using System.Threading.Tasks;

using System.Collections.Concurrent;

namespace 并行结合Queue

{

class Program

{

internal static ConcurrentQueue<int> _TestQueue;

class ThreadWork1 //生产者

{

public ThreadWork1()

{ }

public void run()

{

Console.WriteLine("ThreadWork1 run { ");

for (int i = 0; i < 100; i++)

{

Console.WriteLine("ThreadWork1 producer: "+i);

_TestQueue.Enqueue(i);

}

Console.WriteLine("ThreadWork1 run } ");

}

}

class ThreadWork2 //consumer

{

public ThreadWork2()

{ }

public void run()

{

int i = 0;

bool IsDequeue = false;

Console.WriteLine("ThreadWork2 run { ");

for (; ; )

{

IsDequeue = _TestQueue.TryDequeue(out i);

if (IsDequeue)

{

System.Console.WriteLine("ThreadWork2 consumer: " + i * i + "   =====");

}

if (i==99)

{

break;

}

}

Console.WriteLine("ThreadWork2 run } ");

}

}

static void StartT1()

{

ThreadWork1 work1 = new ThreadWork1();

work1.run();

}

static void StartT2()

{

ThreadWork2 work2 = new ThreadWork2();

work2.run();

}

static void Main(string[] args)

{

Task t1 = new Task(() => StartT1());

Task t2 = new Task(() => StartT2());

_TestQueue = new ConcurrentQueue<int>();

Console.WriteLine("Sample 3-1 Main {");

Console.WriteLine("Main t1 t2 started {");

t1.Start();

t2.Start();

Console.WriteLine("Main t1 t2 started }");

Console.WriteLine("Main wait t1 t2 end {");

Task.WaitAll(t1, t2);

Console.WriteLine("Main wait t1 t2 end }");

Console.WriteLine("Sample 3-1 Main }");

Console.ReadKey();

}

}

}

ConcurrentStact

其全然无锁,但当CAS面临资源竞争失败时可能会陷入自旋并重试操作.

Push:向栈顶插入元素

TryPop:从栈顶弹出元素,而且通过out參数返回

TryPeek:返回栈顶元素,但不弹出

案例:

using System;

using System.Collections.Generic;

using System.Linq;

using System.Text;

using System.Threading.Tasks;

using System.Collections.Concurrent;

namespace 并行结合Queue

{

class Program

{

internal static ConcurrentStack<int> _TestStack;

class ThreadWork1 //生产者

{

public ThreadWork1()

{ }

public void run()

{

Console.WriteLine("ThreadWork1 run { ");

for (int i = 0; i < 100; i++)

{

Console.WriteLine("ThreadWork1 producer: "+i);

_TestStack.Push(i);

}

Console.WriteLine("ThreadWork1 run } ");

}

}

class ThreadWork2 //consumer

{

public ThreadWork2()

{ }

public void run()

{

int i = 0;

bool IsDequeue = false;

Console.WriteLine("ThreadWork2 run { ");

for (; ; )

{

IsDequeue = _TestStack.TryPop(out i);

if (IsDequeue)

{

System.Console.WriteLine("ThreadWork2 consumer: " + i * i + "   =====" + i);

}

if (i==99)

{

break;

}

}

Console.WriteLine("ThreadWork2 run } ");

}

}

static void StartT1()

{

ThreadWork1 work1 = new ThreadWork1();

work1.run();

}

static void StartT2()

{

ThreadWork2 work2 = new ThreadWork2();

work2.run();

}

static void Main(string[] args)

{

Task t1 = new Task(() => StartT1());

Task t2 = new Task(() => StartT2());

_TestStack = new ConcurrentStack<int>();

Console.WriteLine("Sample 4-1 Main {");

Console.WriteLine("Main t1 t2 started {");

t1.Start();

t2.Start();

Console.WriteLine("Main t1 t2 started }");

Console.WriteLine("Main wait t1 t2 end {");

Task.WaitAll(t1, t2);

Console.WriteLine("Main wait t1 t2 end }");

Console.WriteLine("Sample 4-1 Main }");

Console.ReadKey();

}

}

}

ConcurrentBag

一个无序的集合,程序能够向当中插入元素,或删除元素.

在同一个线程中向集合插入,删除元素效率非常高.

Add:向集合中插入元素

TryTake:从集合中取出元素并删除

TryPeek:从集合中取出元素,但不删除元素

案例:

using System;

using System.Collections.Concurrent;

using System.Collections.Generic;

using System.Linq;

using System.Text;

using System.Threading.Tasks;

namespace 并行集合ConcurrentBag

{

class Program

{

internal static ConcurrentBag<int> _TestBag;

class ThreadWork1 //producer

{

public ThreadWork1()

{ }

public void run()

{

Console.WriteLine("Threadwork1 run { ");

for (int i = 0; i < 100; i++)

{

Console.WriteLine("ThreadWork1 producer: "+i);

_TestBag.Add(i);

}

Console.WriteLine("ThreadWork1 run } ");

}

}

class ThreadWork2//consumer

{

public ThreadWork2()

{ }

public void run()

{

bool IsDequeue = false;

Console.WriteLine("ThreadWork2 run { ");

for (int i = 0; i < 100; i++)

{

IsDequeue = _TestBag.TryTake(out i);

if (IsDequeue)

{

Console.WriteLine("ThreadWork2 consumer: " + i * i + "======" + i);

}

}

Console.WriteLine("ThreadWork2 run } ");

}

}

static void Start1()

{

ThreadWork1 work1 = new ThreadWork1();

work1.run();

}

static void Start2()

{

ThreadWork2 work2 = new ThreadWork2();

work2.run();

}

static void Main(string[] args)

{

Task t1 = new Task(() => Start1());

Task t2 = new Task(() => Start2());

_TestBag = new ConcurrentBag<int>();

t1.Start();

t2.Start();

Console.WriteLine("Main t1 t2 started }");

Console.WriteLine("Main wait t1 t2 end {");

Task.WaitAll(t1, t2);

Console.WriteLine("Main wait t1 t2 end }");

Console.WriteLine("Sample 4-3 Main }");

Console.ReadKey();

}

}

}

BlockingCollection

一个支持界限和堵塞的容器

Add:向容器中插入元素

TryTake:从容器中取出元素并删除

TryPeek:从容器中取出元素,但不删除

CompleteAdding:告诉容器,加入元素完毕.此时假设还想继续加入会发生异常.

IsCompleted:告诉消费者线程,产生者线程还在继续执行中,任务还未完毕.

案例:

程序中,消费者线程全然使用While(!__testCollection.IsCompleted)作为退出执行的推断条件.在Work1中,有两条语句被凝视了,当i为50时设置为CompleteAdding,但当继续向当中插入元素时,系统抛出异常,提示无法再继续插入.

案例:

using System;

using System.Collections.Concurrent;

using System.Collections.Generic;

using System.Linq;

using System.Text;

using System.Threading.Tasks;

namespace 并行结合Collection

{

class Program

{

internal static BlockingCollection<int> _testBCollection;

class ThreadWork1 //producer

{

public ThreadWork1()

{ }

public void run()

{

Console.WriteLine("ThreadWork1 run { ");

for (int i = 0; i < 100; i++)

{

Console.WriteLine("ThreadWork1 producer: "+i);

_testBCollection.Add(i);

//当i为50时设置为CompleteAdding,但当继续向当中插入元素时,系统抛出异常,提示无法再继续插入.

//if (i==50)

//{

//    _testBCollection.CompleteAdding();

//}

}

_testBCollection.CompleteAdding();

Console.WriteLine("ThreadWork1 run } ");

}

}

class ThreadWork2//consumer

{

public ThreadWork2()

{ }

public void run()

{

int i = 0;

bool IsDequeue = false;

Console.WriteLine("ThreadWork2 run { ");

while (!_testBCollection.IsCompleted)

{

//不明确这里为啥i会自己主动的++

//Console.WriteLine("i=================="+i);

IsDequeue = _testBCollection.TryTake(out i);

if (IsDequeue)

{

Console.WriteLine("ThreadWork2 consumer: "+i*i+"====="+i);

//i++;这句话不加还是会出现自己主动++的操作

}

}

}

}

static void StartT1()

{

ThreadWork1 work1 = new ThreadWork1();

work1.run();

}

static void StartT2()

{

ThreadWork2 work2 = new ThreadWork2();

work2.run();

}

static void Main(string[] args)

{

Task t1 = new Task(() => StartT1());

Task t2 = new Task(() => StartT2());

_testBCollection = new BlockingCollection<int>();

Console.WriteLine("Sample 4-4 Main {");

Console.WriteLine("Main t1 t2 started {");

t1.Start();

t2.Start();

Console.WriteLine("Main t1 t2 started }");

Console.WriteLine("Main wait t1 t2 end {");

Task.WaitAll(t1, t2);

Console.WriteLine("Main wait t1 t2 end }");

Console.WriteLine("Sample 4-4 Main }");

Console.ReadKey();

}

}

}

分析://_testBCollection.CompleteAdding();//这句话凝视掉work2陷入循环,无法退出

ConcurrentDictionary

对于读操作是全然无锁的,当非常多线程要改动数据时,它会使用细粒度的锁.

AddOrUpdate:假设键不存在,方法会在容器中加入新的键和值,假设存在,则更新现有的键和值

GetOrAdd:假设键不存在,方法会向容器中加入新的键和值,假设存在则返回现有的值,并不加入新值.

TryAdd:尝试在容器中加入新的键和值

TryGetValue:尝试依据指定的键获得值

TryUpdate:有条件的更新当前键所相应的值

TryRemove:尝试删除指定的键.

Getenumerator:返回一个可以遍历整个容器的枚举器.

案例:

using System;

using System.Collections.Concurrent;

using System.Collections.Generic;

using System.Linq;

using System.Text;

using System.Threading.Tasks;

namespace 并行集合Dictionary

{

class Program

{

internal static ConcurrentDictionary<int, int> _TestDictionary;

class ThreadWork1//producer

{

public ThreadWork1()

{ }

public void run()

{

System.Console.WriteLine("ThreadWork1 run { ");

for (int i = 0; i < 100; i++)

{

System.Console.WriteLine("ThreadWork1 producer: " + i);

_TestDictionary.TryAdd(i, i);

}

System.Console.WriteLine("ThreadWork1 run } ");

}

}

class ThreadWork2//consumer

{

public ThreadWork2()

{ }

public void run()

{

bool IsOk = false;

Console.WriteLine("ThreadWork2 run { ");

int nCnt = 0,i=0,nValue=0;

while (nCnt<100)

{

IsOk = _TestDictionary.TryGetValue(i,out nValue);

if (IsOk)

{

Console.WriteLine("ThreadWork2 consumer: "+i*i+"====="+i);

nValue *= nValue;

nCnt++;

i++;

}

}

Console.WriteLine("ThreadWork2 run } ");

}

}

static void StartT1()

{

ThreadWork1 work1 = new ThreadWork1();

work1.run();

}

static void StartT2()

{

ThreadWork2 work2 = new ThreadWork2();

work2.run();

}

static void Main(string[] args)

{

Task t1 = new Task(() => StartT1());

Task t2 = new Task(() => StartT2());

bool bIsNext = true;

int nValue = 0;

_TestDictionary = new ConcurrentDictionary<int, int>();

Console.WriteLine("Sample 4-5 Main {");

Console.WriteLine("Main t1 t2 started {");

t1.Start();

t2.Start();

Console.WriteLine("Main t1 t2 started }");

Console.WriteLine("Main wait t1 t2 end {");

Task.WaitAll(t1, t2);

Console.WriteLine("Main wait t1 t2 end }");

foreach (var pair in _TestDictionary)

{

Console.WriteLine(pair.Key + " : " + pair.Value);

}

IEnumerator<KeyValuePair<int, int>> enumer = _TestDictionary.GetEnumerator();

while (bIsNext)

{

bIsNext = enumer.MoveNext();

Console.WriteLine("Key: " + enumer.Current.Key +"  Value: " + enumer.Current.Value);

_TestDictionary.TryRemove(enumer.Current.Key, out nValue);

}

Console.WriteLine("\n\nDictionary Count: " + _TestDictionary.Count);

Console.WriteLine("Sample 4-5 Main }");

Console.ReadKey();

}

}

}

总结说明: .NET4包括的新命名空间System.Collection,Concurrent有几个线程安全的集合类.线程安全的集合可防止多个线程以相互冲突的方式訪问集合.

为了对集合进行线程安全的訪问,定义了IPriducerConsumerCollection<T>接口.这个接口中最重要的方法是TryAdd()和TryTake().TryAdd()方法尝试给集合加入一项,但假设集合禁止加入项,这个操作可能失败.为了给出相关信息,TryAdd()方法返回一个布尔值,以说明操作是成功还是失败.TryTake()方法也以这样的方式工作,以通过调用者操作是成功还是失败,并在操作成功时返回集合中的项.

时间: 2024-08-09 03:45:23

C#高级编程五十八天----并行集合的相关文章

C#高级编程五十九天----集合的性能

各种集合的性能 许多集合类提供了相同的功能,例如,SortedList类与SortedDictionary类的功能几乎完全相同.但是,其性能常常有很大的区别.SortedList集合使用的内存少,SortedDictionary集合的元素检索速度快. 在MSDN文档中,集合的方法常常有性能提示,给出了以大O(字母)表示的操作时间: O(1) O(log n) O(n) O(1)表示无论集合中有多少数据项,这个操作需要的时间都不变.例如ArrayList类的Add()方法就具有O(1)行为.无论列

C#高级编程五十六天----可观察的集合ObservableCollection

可观察的集合 如果需要集合中的元素核实删除或添加的信息,就可以使用ObservableCollection<T>类. ObservableCollection<T>类表示一个动态数据集合,在添加项,移除项或刷新整个列表时,刺激和将提供通知. 命名空间:System.Collections.ObjectModle 语法:public class ObservableCollection<T>:Collection<T>,INotifyCollectionCha

C#高级编程六十八天---LINQ小结

LINQ小结 一.LINQ是什么 LINQ也就是Language Interrated Query的缩写,怎么一个缩写法我也不明白,即语言集成查询,是微软在.NET3.5中提出的一项新技术,LINQ主要包含四个组件,下面看一下LINQ的一个架构图: 简单的介绍一些四个组件: 1.Linq to SQL 组件----可以查询基于关于数据的数据(微软本身只是实现了对SQL Server的查询,可以对数据库中的数据进行查询,修改,插入删除,排序等操作) 2.LINQ to Dataset组件----可

C#高级编程四十八天----列表

C#中的List C#中deList怎么样?List<T>类是ArrayList类的泛型等效类,该类使用大小可按需动态增长的数组实现List<T>泛型接口. 泛型的好处:它为使用C#语言编写面向对象程序增加了极大的效力和灵活性,不会强行对值类型进行装箱和拆箱,或对引用类型进行向下强制类型转化,所以性能得到提高. 性能注意事项:再决定使用List<T>还是使用ArrayList类(两者具有类似的功能)时,记住IList<T>类在大多数情况下执行得更好并且是类型

C#高级编程五十四天----Lookup类和有序字典

Lookup类 Dictionary<Tkey,TValue>只为每个键支持一个值.新类Lookup<Tkey,TValue>是.NET3.5中新增的,它类似与Dictionary<Tkey,TElement>,但把键映射带一个值集上.这个类在程序及System.Core中实现,用System,Linq命名空间定义. Lookup<Tkey,TElement>的方法和属性如下表: 属性名或者方法名 说明 Count 属性Count返回集合中的元素个数 Ite

C#高级编程五十五天----HashSet和SortedSet

集 饱含不重复元素的集合称为"集(set)". .NET4包含两个集(HashSet<T>和SortedSet<T>),他们都实现ISet<T>接口.HashSet<T>即包含不重复元素的无序列表,SortedSet<T>集包含不重复元素的有序列表. ISet<T>接口提供的方法可以创建合集,交集,或者给出一个集合时另一个集的超集或子集的信息. 案例: //使用HashSet:重复的元素自动被移除,但是不排序 va

C#高级编程五十三天----字典Dictionary&lt;TKey,TValue&gt;

字典 关键字:Dicitionary 说明: 必须包含命名空间System.Collection.Generic Dictionary里面的每一个元素都是一个键值对(由两个元组组成:键和值). 键必须是唯一的,而值不需要唯一的. 键和值都可以是任意类型(例如:string,int,自定义类型,等等) 通过一个键读取一个值的事件是接近O(1) 键值对之间的偏序可以不定义 使用案例: using System; using System.Collections.Generic; using Syst

C#高级编程五十二天----有序列表

有序列表 如果需要基于对所有集合排序,就可以使用SortedList<TKey,TValue>类.这个类按照键给元素排序.这个集合中的值和键都可以使用任意类型. 下面的例子创建了一个有序列表,其中键和值类型都是string.默认的构造函数创建了一个空列表,再用Add()方法添加书.使用重载的构造函数.可以定义列表的容量,传递实现了IComparer<TKey>接口的对象,该接口用于给列表中的元素排序. 使用Add(Tkey,Tvalue)方法,第一个参数是键,第二个参数是值.除了使

C#高级编程四十七天----集合接口和类型

集合接口和类型 前面介绍了数组和Array类实现的接口.数组的大小是固定的.如果元素个数是动态的,就应私用集合类. List<T>是与数组相当的集合类.还有其他类型的集合:队列,栈,链表和字典. 大多数集合类都可在System.Collections和System.Collections.Generic名称空间中找到.泛型集合类位于System.Collections.Generic名称空间中;专用于特定类型的集合类位于System.Collections.Specialized名称空间中.线