C#多线程总结

线程的创建

Thread

 1             var thread = new Thread(() =>
 2             {
 3                 Console.WriteLine("thread start:" + Thread.CurrentThread.ManagedThreadId);  //ManagedThreadId为线程的id
 4                 Thread.Sleep(10000);
 5                 Console.WriteLine("thread end:" + Thread.CurrentThread.ManagedThreadId);
 6             });
 7             //设置是否为后台线程:
 8             //  前台线程:所有前台线程执行结束后,该进程才会关闭退出(主线程和通过Thread类创建的线程默认是前台线程)
 9             //  后台线程:所有前台结束后,后台线程就会立即结束(不管是否执行完成都会结束)
10             thread.IsBackground = true;
11             thread.Start();//开启线程,不传递参数
12
13             //传递参数的
14             var thread1 = new Thread(param =>
15             {
16                 Thread.Sleep(3000);
17                 Console.WriteLine(param);
18             });
19             thread1.Start("val");
20             thread1.Join(); //等待线程执行完成(使当前调用Join的线程阻塞)
21             //暂停和恢复线程都标志为已过时了,不建议使用
22             //thread1.Suspend();
23             //thread1.Resume();
24             //设置线程的优先级,注意:在NT内核的Windows平台上建议不使用优先级来影响线程优先调度的行为,因为根本没法预期一个高优先级的线程必然会先于一个低优先级的线程执行,所以也就失去了控制线程调度的价值
25             //thread1.Priority = ThreadPriority.Highest;
26             //thread1.Abort();    //暴力的终止线程,一般不建议使用

Sleep/ SpinWait

Sleep与SpinWait的区别:

使用Thread.Sleep()会导致等待过于进行切换,等待时间不准确,而且会由用户模式切换到内核模式;使用SpinWait(一个轻量同步类型(结构体))来进行等待的处理,等待过程中会使用自旋等待,从而避免线程频繁的用户模式和内核模式切换,一般用于短时的等待操作:

1             //参数一为Func<bool>,就是自旋时的循环体,直到返回true或者过时为止
2             SpinWait.SpinUntil(() =>
3             {
4                 Console.WriteLine("Spin Waiting");
5                 return false;
6             }, 1000);
7             SpinWait.SpinUntil(() => false, 1000); //返回false会进入等待状态,类似于Thread.Sleep()等待,但是会盘旋CPU周期,在短期内等待事件准确度都高于Sleep
8             SpinWait.SpinUntil(() => true, 1000);  //返回true会自动跳出等待状态,不再休眠,继续执行下面的代码

使用SpinWait做一些多线程的流程控制

 1             int i = 0;
 2             Task.Run(() =>
 3             {
 4                 Thread.Sleep(1000); //模拟一些操作
 5                 Interlocked.Increment(ref i);
 6             });
 7             Task.Run(() =>
 8             {
 9                 Thread.Sleep(1000); //模拟一些操作
10                 SpinWait.SpinUntil(() => i == 1);    //等待1完成
11                 Thread.Sleep(1000); //模拟一些操作
12                 Interlocked.Increment(ref i);
13             });
14             SpinWait.SpinUntil(() => i == 2);    //等待所有流程完成
15             Console.WriteLine("Completed!");

ThreadPool

通过线程池创建线程,池中的线程都是后台线程

使用线程更应该使用线程池来创建:比如一个服务器需要处理成千上万个客户端链接,并处理不同的请求时,这种情况下如果简单通过Thread来创建线程处理,那么就是需要创建成千上万个线程了,那么多线程会频繁的调度切换,资源浪费严重、性能十分低下,因此需要线程池来维护多线程(会动态调整线程数量)

1             ThreadPool.QueueUserWorkItem(param =>
2             {
3                 Console.WriteLine(param);   //val,param为传递过来的参数
4             }, "val");

Task

通过Task来创建线程(线程也是由线程池维护,也是后台线程),比ThreadPool更加灵活方便

 1             var tasks = new List<Task>();
 2             tasks.Add(Task.Factory.StartNew(param =>
 3             {
 4                 Thread.Sleep(5000);
 5                 Console.WriteLine(param);
 6             }, "val"));
 7             tasks.Add(Task.Run(() => Console.WriteLine(Thread.CurrentThread.ManagedThreadId)));
 8             Task.WaitAny(tasks.ToArray());  //等待(阻塞)只要有一个Task执行完毕就不再等待了
 9             Task.WaitAll(tasks.ToArray());  //等待(阻塞)所有Task执行结束
10
11             //带返回值的
12             var task = Task.Run<string>(() =>
13             {
14                 Thread.Sleep(3000);
15                 return "rtn Val";
16             });
17             //task.Wait();  //等待执行结束
18             Console.WriteLine(task.Result); //获取返回的结果,调用Result就会等待Task执行结束返回结果,因此也会造成阻塞

ConfigureAwait

1             Task.Run(() =>
2             {
3                 Thread.Sleep(1000);
4                 Console.WriteLine("Async");
5
6                 //ConfigureAwait为false发生异常的时候不会回取捕捉原始Context(上下文),
7                 //这样子就是在线程池中运行,而不是在ASP.NET/UI的Context的上下文线程中运
8                 //行了,这样子性能上提高了
9             }).ConfigureAwait(false);   

Thread.Sleep()与Task.Delay()

 1             //    Thread.Sleep是同步延迟, Task.Delay异步延迟;
 2             //    Thread.Sleep不能取消,Task.Delay可以。
 3             Task.Run(async () =>
 4             {
 5                 //将任务延迟1000毫秒后运行,如果无限等待那么指定为-1
 6                 await Task.Delay(1000);
 7                 Console.WriteLine("Task Start");
 8                 //CancellationToken设置为true就是标志Task任务取消,为false和 await Task.Delay(1000)一样将任务延迟3000毫秒后运行
 9                 await Task.Delay(1000, new CancellationToken(true));
10                 Console.WriteLine("这里不会被执行,因为任务取消了~");
11             });

Task与async/await

 1         public class TaskTest
 2         {
 3             public Task DoAsync(string param)
 4             {
 5                 return Task.Run(() =>
 6                 {
 7                     //调用Result会阻塞直到获取到返回值
 8                     NextDo(LongTimeDoAsync(param).Result);
 9                 });
10             }
11
12             public async Task Do1Async(string param)
13             {
14                 //对比上面的DoAsync方法,执行结果一样,但是使用async/await配合Task使用,节省了代码量,而且也方便外部的调用和等待处理等等
15                 NextDo(await LongTimeDoAsync(param));
16             }
17
18             async Task<object> LongTimeDoAsync(string param)
19             {
20                 return await Task.Run<object>(() =>
21                 {
22                     //执行一些耗时的操作
23                     Thread.Sleep(10000);
24                     return param + " ok";
25                 });
26             }
27
28             void NextDo(object result)
29             {
30                 Console.WriteLine(result);
31             }
32         }

调用:

1             var test = new TaskTest();
2             test.DoAsync("DoAsync");
3             test.Do1Async("Do1Async");

并发集合

在System.Collections.Concurrent下有集合,都是写多线程安全集合,而ConcurrentXXX为并发集合,有不少方法带有Try前缀,这些方法在多线程下执行过程中可能会失败返回false,因此不要相信这些操作会完成任务,需要判断返回的结果;还有BlockingCollection<T>是阻塞集合,就是添加/获取元素的时候会阻塞线程直到操作完成。

ConcurrentDictionary

 1             ConcurrentDictionary<string, string> dict = new ConcurrentDictionary<string, string>();
 2             dict.TryAdd("key1", "val1");
 3             string val;
 4             dict.TryGetValue("key1", out val);
 5             dict.TryUpdate("key1", "val2", val);//最后参数为比较的值,值不同才会更新
 6             dict.TryRemove("key1", out val);
 7             Console.WriteLine(val); //val2
 8
 9             val = dict.GetOrAdd("key1", "val3");
10             val = dict.GetOrAdd("key1", "val4");
11             Console.WriteLine(val); //val3
12
13             dict["key1"] = null;
14             //对于AddOrUpdate方法,如果指定的key已经存在,那么调用第三个参数进行UpdateValue
15             //如果不存在,那么调用第二个参数进行AddValue
16             val = dict.AddOrUpdate("key1", "val5", (key, oldVal) =>
17             {
18                 Console.WriteLine(oldVal);  //null
19                 return "val6";
20             });
21             Console.WriteLine(val); //val6
22
23             val = dict.AddOrUpdate("key2", key =>
24             {
25                 return "val7";
26             }, (key, oldVal) =>
27             {
28                 Console.WriteLine(oldVal);
29                 return "val8";
30             });
31             Console.WriteLine(val); //val7

ConcurrentQueue

1             ConcurrentQueue<string> q = new ConcurrentQueue<string>();
2             q.Enqueue("val1");
3             q.Enqueue("val2");
4             string val;
5             q.TryPeek(out val);
6             Console.WriteLine(val); //val1
7             q.TryDequeue(out val);
8             Console.WriteLine(val); //val1

ConcurrentStack

1             ConcurrentStack<string> s = new ConcurrentStack<string>();
2             s.Push("val1");
3             s.Push("val2");
4             string val;
5             s.TryPeek(out val);
6             Console.WriteLine(val); //val2
7             s.TryPop(out val);
8             Console.WriteLine(val); //val2

ConcurrentBag

 1             //ConcurrentBag:无序的并发集合(相同元素可重复添加)
 2             ConcurrentBag<object> bag = new ConcurrentBag<object>();
 3             var obj = new object();
 4             bag.Add(obj);
 5             bag.Add(obj);
 6             Console.WriteLine(bag.Count);   //2
 7             while (!bag.IsEmpty)    //判断集合是否为空
 8             {
 9                 bag.TryTake(out obj);   //获取
10             }

并行计算

Parallel

For

1             //并行计算,调用的线程会等待直到并行执行完毕
2             Parallel.For(2, 10, i =>
3             {
4                 //i的值为[2, 10)(不包括10),就是执行次数为8次
5                 Console.WriteLine(i);
6             });
1             //MaxDegreeOfParallelism为指定并行计算的最大线程数
2             Parallel.For(1, 10, new ParallelOptions { MaxDegreeOfParallelism = 3 }, i =>
3             {
4                 Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
5             });
 1             int result = 0;
 2             Parallel.For(0, 100, new ParallelOptions { MaxDegreeOfParallelism = 4 },
 3                //初始化localState
 4                () => 0,
 5                //并行循环体(i为[0, 100),也就是会执行100次)
 6                (i, loop, localState) =>
 7                {
 8                    //localState从0开始,不断累加i的值
 9                    return localState + i;   //循环体中返回的结果会在下面的回调中进行值的合并(结果的合并必须在下面进行)
10                },
11                //合并计算的结果
12                localState => Interlocked.Add(ref result, localState)
13             );
14             Console.WriteLine("真实结果: {0}. 预期结果:4950.", result);

ForEach

 1             int aCount = 0;
 2             //并行计算,会等待(阻塞)直到执行完成
 3             Parallel.ForEach("aaaabbbbbcccc",
 4                 //设置并行计算的最大线程数
 5                 new ParallelOptions { MaxDegreeOfParallelism = 4 },
 6                 c =>
 7                 {
 8                     //计算‘a‘的个数
 9                     if (c == ‘a‘)
10                     {
11                         Interlocked.Increment(ref aCount);
12                     }
13                 });
14             Console.WriteLine(aCount); //4
 1             //Partitioner为设置策略分区:例如值范围为[0, 100],每个区域的大小为4
 2             Parallel.ForEach(Partitioner.Create(0, 10, 4),
 3                 val =>
 4                 {
 5                     Console.WriteLine(val); //val是一个Tuple<int, int>,分成的区间值有:(0, 4),(4, 8),(8, 10)
 6                 });
 7
 8             int result = 0;
 9             Parallel.ForEach(Partitioner.Create(1, 101, 10),
10                 val =>
11                 {
12                     for (int i = val.Item1; i < val.Item2; i++)
13                     {
14                         Interlocked.Add(ref result, i);
15                     }
16                 });
17             Console.WriteLine(result);  //输出:5050
 1             int[] vals = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
 2             int sum = 0;
 3             Parallel.ForEach(
 4                     vals,
 5                     //localSum的初始值
 6                     () => 0,
 7                     //并行执行的循环体
 8                     (val, loopState, localSum) =>
 9                     {
10                         //val为集合vals中的值
11                         //这里的操作是并行计算集合中值的总和
12                         localSum += val;
13                         return localSum;    //循环体中返回的结果会在下面的回调中进行值的合并(结果的合并必须在下面进行)
14                     },
15                     //合并计算的结果
16                     (localSum) => Interlocked.Add(ref sum, localSum)
17                 );
18             Console.WriteLine(sum);    //55

Invoke

1             int i = 0;
2             Action action = () => Interlocked.Increment(ref i);
3             Action action1 = () => Interlocked.Add(ref i, 2);
4             Action action2 = () => Interlocked.Add(ref i, 3);
5             //并行调用Action,调用的线程会等待直到并行执行完毕
6             Parallel.Invoke(action, action1, action2);
7             //Parallel.Invoke(new ParallelOptions { MaxDegreeOfParallelism = 3 }, action, action1, action2);
8             Console.WriteLine(i);   //输出:6

PLINQ

 1             var list = new int[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
 2             //PLINQ,进行并行计算,但是PLINQ不能限定并行计算时的最大线程数
 3             list.AsParallel().ForAll(l =>
 4             {
 5                 Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
 6             });
 7
 8             Console.WriteLine(list.AsParallel().Where(l => l > 5).Sum());   //40
 9             Console.WriteLine(list.AsParallel().Aggregate((sum, val) =>
10             {
11                 return val + sum + 1;
12             }));    //64
13
14             var list1 = new int[] { 1, 1, 1, 2, 2, 2, 3 };
15             Console.WriteLine(list1.AsParallel().GroupBy(l => l).Count());  //3

线程同步

lock(Monitor) / SpinLock

lock

lock使用起来很简单,为Monitor封装的语法糖:

1 lock (obj)
2 {
3     //同步操作.
4 }

锁的对象不要为类型Type,因为性能上会损失大:lock(typeof(Class))

Monitor

1             Monitor.Enter(obj);
2             try
3             {
4                 //同步操作
5             }
6             finally
7             {
8                 Monitor.Exit(obj);
9             }

使用Monitor的主要优点就是可能设置等待的超时值:

 1                 bool lockTaken = false;
 2                 Monitor.TryEnter(obj, 1000, ref lockTaken);
 3                 if (lockTaken)
 4                 {
 5                     try
 6                     {
 7                         //同步操作
 8                     }
 9                     finally
10                     {
11                         Monitor.Exit(obj);
12                     }
13                 }
14                 else
15                 {
16                     Console.WriteLine("超时了");
17                 }

SpinLock自旋锁/细粒度锁

自旋锁:就是在等待的过程中会做自旋等待,避免线程频繁的用户模式和内核模式切换

msdn中的说明:

自旋锁可用于叶级锁定,此时在大小方面或由于垃圾回收压力,使用Monitor(lock)所隐含的对象分配消耗过多。自旋锁非常有助于避免阻塞,但是如果预期有大量阻塞,由于旋转过多,您可能不应该使用自旋锁。当锁是细粒度的并且数量巨大(例如链接的列表中每个节点一个锁)时以及锁保持时间总是非常短时,旋转可能非常有帮助。通常,在保持一个旋锁时,应避免任何这些操作:

  • 阻塞,
  • 调用本身可能阻塞的任何内容,
  • 一次保持多个自旋锁,
  • 进行动态调度的调用(接口和虚方法)
  • 在某一方不拥有的任何代码中进行动态调度的调用,或
  • 分配内存。

简单封装:

 1         public class SpinLockEx
 2         {
 3             SpinLock _slock = new SpinLock();
 4             public void Lock(Action action)
 5             {
 6                 bool lockTaken = false;
 7                 try
 8                 {
 9                     _slock.Enter(ref lockTaken);
10                     action();
11                 }
12                 finally
13                 {
14                     if(lockTaken) _slock.Exit();
15                 }
16             }
17         }

使用:

 1             int ival1 = 0, ival2 = 0;
 2             List<Task> list = new List<Task>();
 3             var slock = new SpinLockEx();
 4             for (int i = 0; i < 10000; i++)
 5             {
 6                 list.Add(Task.Run(() =>
 7                 {
 8                     slock.Lock(() =>
 9                     {
10                         ival1++;    //注意:这里只是模拟多线程操作共享资源,对于数值操作应该使用Interlocked
11                     });
12                 }));
13                 list.Add(Task.Run(() =>
14                 {
15                     ival2++;
16                 }));
17             }
18             Task.WaitAll(list.ToArray());
19             Console.WriteLine(ival1);   //值计算准确:10000
20             Console.WriteLine(ival2);   //值计算可能会不准确,因为没有做多线程安全

Mutex

Mutex互斥锁(互斥对象)的使用作用和Monitor(lock)差不多,但是Mutex是内核对象,可以跨进程共享的,不过性能方面Monitor比较高,因为Mutex控制需要从用户模式到内核模式,而Monitor是用户模式下控制的。

 1             bool isNew;
 2             //参数一:主调线程是否初始拥有互斥对象
 3             //参数二:定义互斥对象的名称(命名互斥对象跨进程共享)
 4             //参数三:该命名的互斥对象是否为新创建的
 5             var m = new Mutex(false, "Tom123", out isNew);
 6             if (m.WaitOne())    //等待互斥对象拥有权(一个线程拥有了,另一个线程等待拥有权,直到拥有的线程调用ReleaseMutex释放)
 7             {
 8                 try
 9                 {
10                     //同步操作
11                     Thread.Sleep(3000);
12                     Console.WriteLine("do something");
13                 }
14                 finally
15                 {
16                     m.ReleaseMutex();    //释放拥有权
17                 }
18             }
19             else
20             {
21                 //等待失败,如果WaitOne的时候有指定超时值,否则会一直等待
22             }
23
24             bool isNew;
25             //因为命名的互斥对象是跨进程的,因此通过第三个参数判断互斥对象是否已经存在,
26             //可做一些检测程序是否已经运行的操作
27             m = new Mutex(false, "Tom123", out isNew);
28             if (!isNew)
29             {
30                 Console.WriteLine("该程序已经运行!");
31             }
32             m.Dispose();//记住需要释放资源

Event

事件对象也是内核对象,事件对象分为 人工重置 和 自动重置:

AutoResetEvent(自动重置)

 1             AutoResetEvent e = new AutoResetEvent(true);    //参数为是否初始化为有信号状态
 2             if (e.WaitOne()) //等待事件对象,直到有信号状态(如果没有指定超时值)
 3             {
 4                 //对象自动重置的事件对象来说:等待成功,那么就会自动设置为无信号状态(因此并不需要调用e.Reset()),因此性质和互斥对象差不多
 5                 try
 6                 {
 7                     //同步操作
 8                     Thread.Sleep(3000);
 9                     Console.WriteLine("do something");
10                 }
11                 finally
12                 {
13                     e.Set();//设置为有信号状态
14                 }
15             }
16             else
17             {
18                 //等待失败,如果WaitOne的时候指定超时值,否则会一直等待
19             }
20             e.Dispose();    //使用完成需要释放对象资源,因为是内核对象

ManualResetEvent/Slim (人工重置)

 1             //人工重置的事件对象需要手动设置为无信号状态,因此人工重置的事件对象不适合做多线程同步锁,
 2             //可用于做一些程序启动时的初始化操作,例如:加载某些大文件,加载完成后通知加载完成,而这些过程通过人工重置事件对象控制
 3             //带Slim的事件对象,性能上会更好(运用了细粒度锁),就是参数二中可以指定等待时自旋的次数,目的防止等待的过程中频繁的切换线程
 4             var e = new ManualResetEventSlim(false, 100);   //参数一为是否初始化为有信号状态
 5             Task.Run(() =>
 6             {
 7                 Console.WriteLine("加载大文件开始");
 8                 Thread.Sleep(10000);
 9                 Console.WriteLine("加载完成");
10                 e.Set();//设置为有信号状态
11             });
12             Task.Run(() =>
13             {
14                 e.Wait();   //等待文件加载完成
15                 Console.WriteLine("加载完成后,do something...");
16             });

ReaderWriterLock/Slim

ReaderWriterLock就是用于允许多个读取器(读取的时候不能写入),而只能有一个写入器(写入时锁定)来管理,对于ReaderWriterLockSlim的性能上会更好(运用了细粒度锁),可避免潜在的死锁的很多情况。

 1         public class TestRW : IDisposable
 2         {
 3             ReaderWriterLockSlim _rwLock = new ReaderWriterLockSlim();
 4             StringBuilder _sb = new StringBuilder();
 5             public void Append(string val)
 6             {
 7                 try
 8                 {
 9                     _rwLock.EnterWriteLock();  //获取读取器
10                     _sb.Append(val);
11                 }
12                 finally
13                 {
14                     _rwLock.ExitWriteLock();//释放读取器
15                 }
16             }
17
18             public override string ToString()
19             {
20                 try
21                 {
22                     _rwLock.EnterReadLock(); //获取写入器
23                     return _sb.ToString();
24                 }
25                 finally
26                 {
27                     _rwLock.ExitReadLock();//释放写入器
28                 }
29             }
30
31             public void Dispose()
32             {
33                 _rwLock.Dispose();  //需要释放资源
34             }
35         }

使用

 1             List<Task> tasks = new List<Task>();
 2             TestRW rw = new TestRW();
 3             StringBuilder sb = new StringBuilder();
 4             for (int i = 0; i < 1000; i++)
 5             {
 6                 tasks.Add(Task.Run(() =>
 7                 {
 8                     sb.Append("1");
 9                 }));
10                 tasks.Add(Task.Run(() =>
11                 {
12                     rw.Append("1");
13                 }));
14             }
15             Task.WaitAll(tasks.ToArray());
16             Console.WriteLine(sb.ToString().Length);    //StringBuilder不是多线程安全的,因此结果会失真
17             Console.WriteLine(rw.ToString().Length);    //使用了读写锁,因此多线程安全
18             rw.Dispose();   //记住需要释放资源

Semaphore/Slim

信号量是一种计数互斥锁,计数为0时阻塞线程,可用于多线程的限流操作,SemaphoreSlim运用了细粒度锁,性能更优:

 1             var tasks = new List<Task>();
 2             int ival = 0;
 3             //参数一:初始化信号的数量
 4             //参数二:信号的最大数量
 5             var smp = new SemaphoreSlim(10, 10);
 6             for (int i = 0; i < 100; i++)
 7             {
 8                 tasks.Add(Task.Run(() =>
 9                 {
10                     if (smp.Wait(100))  //等待成功,会减少一个信号量,直到信号量为0
11                     {
12                         Thread.Sleep(1000);  //模拟一些操作
13                         Console.WriteLine(Interlocked.Increment(ref ival));
14                         //smp.Release();    //释放一个信号量
15                     }
16                     else
17                     {
18                         //等待失败,如果Wait的时候有指定超时值,否则会一直等待
19                         Console.WriteLine("超时了~");
20                     }
21                 }));
22             }
23             Task.WaitAll(tasks.ToArray());
24             Console.WriteLine(smp.CurrentCount);    //当前的信号量数量
25             smp.Dispose();  //记住需要释放资源

CountdownEvent

CountdownEvent(倒计器)的性质和Semaphore相反,等待(阻塞)直到信号为0为止(当倒计数为0时,触发完成,停止等待),可以做fork-join(并行计算)的控制:

 1             var datas = new int[]{ 1,2,3,4,5,6,7,8,9,10 };
 2             //倒计器:当倒计数为0时,触发完成,停止等待
 3             using (var c = new CountdownEvent(1))   //参数为初始化倒计数(如果指定为0,那么初始状态为完成,需要调用Reset重置,不然使用会抛异常)
 4             {
 5                 Console.WriteLine(c.CurrentCount);  //当前的倒计数
 6                 // fork work:
 7                 for (int i = 0; i < datas.Length; i++)
 8                 {
 9                     c.AddCount();   //增加倒计数(可指定增加的数量)
10                     ThreadPool.QueueUserWorkItem(idx =>
11                     {
12                         Interlocked.Increment(ref datas[(int)idx]);
13                         c.Signal(); //注册一个(可指定数量)信号,并且CurrentCount倒计数减一(可指定数量)
14                     }, i);
15                 }
16                 c.Signal();
17                 // Join with work.
18                 c.Wait();   //等待(阻塞)直到计数为0为止,可以指定等待的时间
19                 Console.WriteLine("Completed!");
20                 //c.Reset(1);   //重置初始化的倒计数,如果等待完成之后想继续使用倒计器,那么就需要调用这个方法重置了,不然会抛异常
21             }
22             Console.WriteLine(JsonConvert.SerializeObject(datas));  //[2,3,4,5,6,7,8,9,10,11]

Barrier

Barrier(屏障/关卡)适用于并行操作分阶段执行,并且每一阶段中各任务需要进行同步,使用Barrier可以在并行操作中的所有任务都达到相应的关卡之前,阻止各个任务继续执行:

 1             int i = 0;
 2             //参数一:初始化参与者
 3             //参数二:关卡完成触发的事件
 4             var b = new Barrier(2, (bar) =>
 5             {
 6                 Console.WriteLine(i);
 7             });
 8             //增加参与者数量
 9             b.AddParticipant();
10             b.AddParticipants(2);
11             //移除参与者数量
12             b.RemoveParticipant();
13             //参与者数量
14             Console.WriteLine(b.ParticipantCount);  //4
15
16             Action action = () =>
17             {
18                 Interlocked.Increment(ref i);
19                 //设置关卡1:
20                 //用信号通知有参与者(线程)已达到关卡,而且到达的参与者(线程)会一直等待(可指定超时值),
21                 //直到所有参与者都达到关卡为止(就是通知的信号达到ParticipantCount的时候),并且触发Barrier构造函数指定的事件
22                 b.SignalAndWait();  //等待完成之后会触发关卡完成事件,输出:4
23
24                 Interlocked.Add(ref i, 2);
25                 //设置关卡2:
26                 b.SignalAndWait();  //输出:12
27
28                 Interlocked.Add(ref i, 4);
29                 //设置关卡3:
30                 b.SignalAndWait();  //输出:28
31             };
32
33             //参与者(线程)的数量不能大于ParticipantCount,否则会抛异常
34             Task.Run(action);   //设置一个参与者(线程)
35             Parallel.Invoke(action, action, action); //设置三个参与者(线程)

Timer

定时器,可以做一些定时的控制操作:

1             //参数一:定时触发的事件
2             //参数二:传递到事件的参数
3             //参数三:为第一次触发事件的定时时间
4             //参数四:为周期时间(第一次触发事件之后,再次触发事件的周期,指定了这个参数,那么就会按周期时间循环执行了,如果设为0就只会执行一次事件)
5             var timer = new Timer(val =>
6             {
7                 Console.WriteLine(val);
8             }, "val", 1000, 1000);  //这里的设置为每秒执行一次事件
9             //timer.Dispose();  //记住需要释放资源

Interlocked

使用原子操作保证值类型操作的原子性,以保证值类型线程间同步,性能方面比使用lock更优,因此保证值类型多线程安全更应该优先考虑使用Interlocked:

 1         public class IntEx
 2         {
 3             int _val;
 4             public int Val
 5             {
 6                 get { return _val; }
 7                 set
 8                 {
 9                     Interlocked.CompareExchange(ref _val, value, _val);
10                 }
11             }
12
13             public IntEx() { }
14             public IntEx(int ival)
15             {
16                 _val = ival;
17             }
18
19             public int Add(int ival)
20             {
21                 return Interlocked.Add(ref _val, ival);
22             }
23
24             public int Incre()
25             {
26                 return Interlocked.Increment(ref _val);
27             }
28
29             public int Decre()
30             {
31                 return Interlocked.Decrement(ref _val);
32             }
33         }

使用:

 1             var tasks = new List<Task>();
 2             var itl = new IntEx();
 3             int ival = 0;
 4             for (int i = 0; i < 1000; i++)
 5             {
 6                 tasks.Add(Task.Run(() =>
 7                 {
 8                     ival++;
 9                 }));
10                 tasks.Add(Task.Run(() =>
11                 {
12                     itl.Incre();
13                     //itl.Val++;    //也不要这样进行自增,就算使用了Interlocked.CompareExchange
14                 }));
15             }
16             Task.WaitAll(tasks.ToArray());
17             Console.WriteLine(ival);    //没有使用Interlocked以保证操作的原子性,因此数据会失真
18             Console.WriteLine(itl.Val); //数据正确   

使用Interlocked进行多线程的控制:

 1         public class OneDo
 2         {
 3             int _ival = 0;
 4             public event Action Action;
 5
 6             public OneDo(Action action)
 7             {
 8                 if (action == null) throw new ArgumentNullException("action");
 9                 Action = action;
10             }
11
12             public void Release()
13             {
14                 Interlocked.CompareExchange(ref _ival, 0, _ival);
15             }
16
17             public bool IsDoing
18             {
19                 get { return _ival > 0; }
20             }
21
22             bool CanDo()
23             {
24                 if (_ival <= 0)
25                 {
26                     if (Interlocked.Increment(ref _ival) <= 1)
27                     {
28                         return true;
29                     }
30                 }
31                 return false;
32             }
33
34             public bool Do()
35             {
36                 if (CanDo())
37                 {
38                     Action();
39                     return true;
40                 }
41                 return false;
42             }
43
44             public bool DoAndRelease()
45             {
46                 if (CanDo())
47                 {
48                     Action();
49                     Release();
50                     return true;
51                 }
52                 return false;
53             }
54
55             public bool DoAsync()
56             {
57                 if (CanDo())
58                 {
59                     Task.Run(() =>
60                     {
61                         Action();
62                     });
63                     return true;
64                 }
65                 return false;
66             }
67
68             public bool DoAndReleaseAsync()
69             {
70                 if (CanDo())
71                 {
72                     Task.Run(() =>
73                     {
74                         Action();
75                         Release();
76                     });
77                     return true;
78                 }
79                 return false;
80             }
81         }

使用:

 1             var onedo = new OneDo(() => Console.WriteLine("One do!"));
 2             for (int i = 0; i < 100; i++)
 3             {
 4                 Task.Run(() =>
 5                 {
 6                     onedo.Do(); //只会执行一次,除非调用Release,因此可用于做一些初始化操作
 7                 });
 8             }
 9
10
11             int val = 0;
12             var onedo1 = new OneDo(() => Console.WriteLine("One do! " + Interlocked.Increment(ref val)));
13             for (int i = 0; i < 100; i++)
14             {
15                 Task.Run(() =>
16                 {
17                     SpinWait.SpinUntil(() => onedo1.DoAndRelease()); //每次只被一个线程执行,使用SpinWait等待执行成功为止
18                 });
19             }

TPL Dataflow

TPL Dataflow是非常强大的多线程高并发数据流控制类库,

.NET中使用需要在NuGet中搜索Microsoft.Tpl.Dataflow加载,.net core可直接使用。

 1             //数据流Buffer
 2             var buf = new BufferBlock<string>();
 3             //数据流处理器(处理器是异步处理数据的)
 4             var action = new ActionBlock<string>(val =>
 5             {
 6                 //以FIFO形式处理数据
 7                 Console.WriteLine(val + ",tid=" + Thread.CurrentThread.ManagedThreadId);
 8             }, new ExecutionDataflowBlockOptions()
 9             {
10                 //设置并行处理的线程数,默认为1
11                 MaxDegreeOfParallelism = 1,
12             });
13             //连接处理器
14             buf.LinkTo(action);
15             //添加数据到流中,连接到了处理器,数据就会流向处理器中进行处理
16             for (int i = 0; i < 100; i++)
17             {
18                 buf.Post(i.ToString());
19             }
 1             var buf1 = new BufferBlock<string>();
 2             var action1 = new ActionBlock<string>(val =>
 3             {
 4                 Console.WriteLine(val);
 5             });
 6             //buf1.LinkTo(action1);
 7             Task.Run(() =>
 8             {
 9                 Thread.Sleep(3000);
10                 buf1.LinkTo(action1);
11             });
12             for (int i = 0; i < 10; i++)
13             {
14                 buf1.Post(i.ToString());
15             }
16
17             buf1.Complete(); //发送完成信号(不会阻塞),并且停止接受数据,就是后续的添加是不会被处理的
18             buf1.Post("asdf");  //这里添加不会被处理
19             //Completion属性只有在调用了Complete()后才会有效
20             buf1.Completion.Wait();  //这里会等待(阻塞),直到连接到处理器为止(就是:如果没有调用LinkTo(action),那么会一直等待,可设置超时值)
21
22             //对于Action的Complete(),需要Buffer调用Complete并且Completion.Wait才有效
23             action1.Complete();
24             //Completion属性只有在设置了Complete方法后才会有效
25             action1.Completion.Wait();   //这里会等待(阻塞),等待Action把所有数据处理完成为止(可设置超时值)
 1             var buf = new BufferBlock<string>();
 2             //负载均衡:通过设置多个处理器,并且设置处理器的BoundedCapacity(限容值),那么处理器就会按照BoundedCapacity均衡处理数据了
 3             //注意:每个处理器都应该设置BoundedCapacity,如果存在没有设置BoundedCapacity(或设为-1)的处理器,那么数据流可能都只到该处理器进行处理了
 4             buf.LinkTo(new ActionBlock<string>(val =>
 5             {
 6                 Console.WriteLine(val + ",Tom");
 7             }, new ExecutionDataflowBlockOptions()
 8             {
 9                 BoundedCapacity = 10,
10             }));
11             buf.LinkTo(new ActionBlock<string>(val =>
12             {
13                 Console.WriteLine(val + ",Jane");
14             }, new ExecutionDataflowBlockOptions()
15             {
16                 BoundedCapacity = 10,
17             }));
18             for (int i = 0; i < 100; i++)
19             {
20                 buf.Post(i.ToString());
21             }

多播

1             //多播,就是连接的ActionBlock都会收到数据
2             BroadcastBlock<int> broadcast = new BroadcastBlock<int>(val => val);
3             broadcast.LinkTo(new ActionBlock<int>(val => Console.WriteLine(val + ",A")));
4             broadcast.LinkTo(new ActionBlock<int>(val => Console.WriteLine(val + ",B")));
5             for (int i = 0; i < 10; i++)
6             {
7                 broadcast.Post(i);
8             }
1             BroadcastBlock<int> broadcast = new BroadcastBlock<int>(val => val);
2             broadcast.LinkTo(new ActionBlock<int>(val => Console.WriteLine(val + ",A")));
3             for (int i = 0; i < 10; i++)
4             {
5                 broadcast.Post(i);
6             }
7             Thread.Sleep(1000);
8             //连接ActionBlock应该在Post数据之前,不然在之后连接的ActionBlock就只接收到最后一个数据了
9             broadcast.LinkTo(new ActionBlock<int>(val => Console.WriteLine(val + ",B")));

TPL Dataflow更详细说明可参考这篇博文:

http://www.cnblogs.com/haoxinyue/archive/2013/03/01/2938959.htm

时间: 2024-10-14 04:47:40

C#多线程总结的相关文章

Java多线程学习(吐血超详细总结)

林炳文Evankaka原创作品.转载请注明出处http://blog.csdn.net/evankaka 目录(?)[-] 一扩展javalangThread类 二实现javalangRunnable接口 三Thread和Runnable的区别 四线程状态转换 五线程调度 六常用函数说明 使用方式 为什么要用join方法 七常见线程名词解释 八线程同步 九线程数据传递 本文主要讲了java中多线程的使用方法.线程同步.线程数据传递.线程状态及相应的一些线程函数用法.概述等. 首先讲一下进程和线程

Spring多线程

Spring是通过TaskExecutor任务执行器来实现多线程和并发编程的.使用ThreadPoolTaskExecutor可实现一个基于线程池的TaskExecutor.而实际开发中任务一般是非阻碍的,即异步的,所以我们要在配置类中通过@EnableAsync开启对异步的支持,并通过在实际执行的Bean的方法中使用@Async注解来声明其是一个异步任务. 实例代码: (1)配置类 package com.lwh.highlight_spring4.ch3.taskexecutor; /**

python进阶学习(一)--多线程编程

1. 多线程 概念:简单地说操作系统可以同时执行多个不用程序.例如:一边用浏览器上网,一边在听音乐,一边在用笔记软件记笔记. 并发:指的是任务数多余cpu核数,通过操作系统的各种任务调度算法,实现用多个任务"一起"执行(实际上总有一些任务不在执行,因为切换任务的熟度相当快,看上去一起执行而已) 并行:指的是任务数小于等于CPU核数,即任务真的是一起执行的. 2. 线程 概念:线程是进程的一个实体,是CPU调度和分派的基本单位. threading--单线程执行: 1 import ti

多线程的实现及其安全问题

一.进程和线程概述 1.进程:进程是一个具有独立功能的程序关于某个数据集合的一次运行活动,简单来说开启一个程序就开启了一个进程: 如果开启多个进程,它们之间是由于CPU的时间片在相互的切换: 2.线程:开启一个进程的一个任务,对于多线程:每一个线程都在争夺CPU的执行权(CPU的执行权具有随机性): 如果一个程序的执行路径有多条,那么该线程是多线程;反之,就单线程线程:线程是依赖于进程存在的! 3.Jvm是多线程 -- 至少开启了两条线程 main方法 主线程 gc() 垃圾回收线程 二.多线程

多线程和多进程的区别与联系

1.单进程单线程:一个人在一个桌子上吃菜.2.单进程多线程:多个人在同一个桌子上一起吃菜.3.多进程单线程:多个人每个人在自己的桌子上吃菜. 多线程的问题是多个人同时吃一道菜的时候容易发生争抢,例如两个人同时夹一个菜,一个人刚伸出筷子,结果伸到的时候已经被夹走菜了...此时就必须等一个人夹一口之后,在还给另外一个人夹菜,也就是说资源共享就会发生冲突争抢. 1.对于 Windows 系统来说,[开桌子]的开销很大,因此 Windows 鼓励大家在一个桌子上吃菜.因此 Windows 多线程学习重点

Python有了asyncio和aiohttp在爬虫这类型IO任务中多线程/多进程还有存在的必要吗?

最近正在学习Python中的异步编程,看了一些博客后做了一些小测验:对比asyncio+aiohttp的爬虫和asyncio+aiohttp+concurrent.futures(线程池/进程池)在效率中的差异,注释:在爬虫中我几乎没有使用任何计算性任务,为了探测异步的性能,全部都只是做了网络IO请求,就是说aiohttp把网页get完就程序就done了. 结果发现前者的效率比后者还要高.我询问了另外一位博主,(提供代码的博主没回我信息),他说使用concurrent.futures的话因为我全

多线程(一)

这边来谈谈java中,我对对多线程的理解 在了解多线程前,先说说进程. 进程就是正在运行的应用程序.  当你打开任务管理器的时候,你就会发现很多的进程. 而我们要说的线程,就是依赖于进程而存在的,一个进程可以开启多个线程. Thread类 说到线程,就必须来说说Thread类. Thread类是说有线程的父类.具体请参见api 线程的创建以及执行(图解如下) 继承Thread类,或者实现rennable接口. 当继承了父类后,需要重写父类的run方法,这个run方法里面就写你要执行的代码,当这个

多线程下的单例-double check

话不多说直接上代码: public sealed class Singleton { private static Singleton _instance = null; // Creates an syn object. private static readonly object SynObject = new object(); Singleton() { } public static Singleton Instance { get { // Double-Checked Lockin

笔记:多线程

多线程程序在较低的层次上扩展了多任务的概念:一个程序同时执行多个任务,通常每个任务称为一个线程(thread),他是线程控制的简称,可以同时运行一个以上线程的程序称为多线程程序(multithreaded):多线程和多进程有哪些区别呢,本质的区别在于每个进程拥有自己的一整套变量,而线程则是共享数据,Java中启动一个线程的代码如下: // 线程任务的具体实现接口 ????public interface Runnable { public abstract void run(); ????} /

多线程

1.线程的概念? 多线程,就类似与操作系统中的多进程.简单的讲,就是可 以同时并发执行多个任务,处理多件事情.这与我们经常所 谓的边唱边跳,边说边做事一个道理.? 线程是一个轻量级的进程,一个进程中可以分为多个线程. 比起进程,线程所耗费的系统资源更少,切换更加容易 /* * 进程是操作系统中的一个任务,一个程序启动运行,就会创建 * 一个(或多个)进程. * 线程是轻量级的进程.进程会有自己独立的内存空间与资源.一个进程 * 下会存在一个(或多个)线程.线程为进程的执行单元.线程本身不含有 *