.NET 异步多线程,Thread,ThreadPool,Task,Parallel

今天记录一下异步多线程的进阶历史,以及简单的使用方法

主要还是以Task,Parallel为主,毕竟用的比较多的现在就是这些了,再往前去的,除非是老项目,不然真的应该是挺少了,大概有个概念,就当了解一下进化史了

1:委托异步多线程,所有的异步都是基于委托来实现的

#region 委托异步多线程
{
  //委托异步多线程
  Stopwatch watch = new Stopwatch();
  watch.Start();
  Console.WriteLine($"开始执行了,{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss ffff")},,,,{Thread.CurrentThread.ManagedThreadId}");
  Action<string> act = DoSomethingLong;
  for (int i = 0; i < 5; i++)
  {
    //int ThreadId = Thread.CurrentThread.ManagedThreadId;//获取当前线程ID
    string name = $"Async {i}";
    act.BeginInvoke(name, null, null);
  }
  watch.Stop();
  Console.WriteLine($"结束执行了,{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss ffff")},,,,{watch.ElapsedMilliseconds}");
}
#endregion

2:多线程的最老版本:Thread(好像是2.0的时候出的?记不得了)

//Thread
//Thread默认属于前台线程,启动后必须完成
//Thread有很多Api,但大多数都已经不用了
Console.WriteLine("Thread多线程开始了");
Stopwatch watch = new Stopwatch();
watch.Start();
//线程容器
List<Thread> list = new List<Thread>();
for (int i = 0; i < 5; i++)
{
       //int ThreadId = Thread.CurrentThread.ManagedThreadId;//获取当前线程ID
       string name = $"Async {i}";
       ThreadStart start1 = () =>
       {
          DoSomethingLong(name);
       };
       Thread thread = new Thread(start1);
       thread.IsBackground = true;//设置为后台线程,关闭后立即退出
       thread.Start();
       list.Add(thread);
       //thread.Suspend();//暂停,已经不用了
       //thread.Resume();//恢复,已经不用了
       //thread.Abort();//销毁线程
       //停止线程靠的不是外部力量,而是线程自身,外部修改信号量,线程检测信号量
}
//判断当前线程状态,来做线程等待
while (list.Count(t => t.ThreadState != System.Threading.ThreadState.Stopped) > 0)
{
  Console.WriteLine("等待中.....");   Thread.Sleep(500);
}
//统计正确全部耗时,通过join来做线程等待
foreach (var item in list)
{
  item.Join();//线程等待,表示把thread线程任务join到当前线程,也就是当前线程等着thread任务完成
  //等待完成后统计时间
}
watch.Stop();

Thread的无返回值回调:

封装一个方法

/// <summary>
/// 回调封装,无返回值
/// </summary>
/// <param name="start"></param>
/// <param name="callback"></param>
private static void ThreadWithCallback(ThreadStart start, Action callback)
{
  ThreadStart nweStart = () =>
  {
    start.Invoke();
    callback.Invoke();
  };
  Thread thread = new Thread(nweStart);
  thread.Start();
}

//回调的委托
Action act = () =>
{
    Console.WriteLine("回调函数");
};
//要执行的异步操作
ThreadStart start = () =>
{
    Console.WriteLine("正常执行。。");
};
ThreadWithCallback(start, act);

有返回值的回调:

/// <summary>
/// 回调封装,有返回值
/// 想要获取返回值,必须要有一个等待的过程
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="func"></param>
/// <returns></returns>
private static Func<T> ThreadWithReturn<T>(Func<T> func)
{
  T t = default(T);
  //ThreadStart本身也是一个委托
  ThreadStart start = () =>
    {
      t = func.Invoke();
    };
  //开启一个子线程
  Thread thread = new Thread(start);
  thread.Start();
  //返回一个委托,因为委托本身是不执行的,所以这里返回出去的是还没有执行的委托
  //等在获取结果的地方,调用该委托
  return () =>
  {
    //只是判断状态的方法
    while (thread.ThreadState != System.Threading.ThreadState.Stopped)
    {
      Thread.Sleep(500);
    }
    //使用线程等待
    //thread.Join();
    //以上两种都可以
    return t;
  };
}

Func<int> func = () =>
{
  Console.WriteLine("正在执行。。。");
  Thread.Sleep(10000);
  Console.WriteLine("执行结束。。。");
  return DateTime.Now.Year;
};
Func<int> newfunc = ThreadWithReturn(func);
//这里为了方便测试,只管感受回调的执行原理
Console.WriteLine("Else.....");
Thread.Sleep(100);
Console.WriteLine("Else.....");
Thread.Sleep(100);
Console.WriteLine("Else.....");
//执行回调函数里return的委托获取结果
//newfunc.Invoke();
Console.WriteLine($"有参数回调函数的回调结果:{newfunc.Invoke()}");

关于有返回值的回调,因为委托是在执行Invoke的时候才会去调用委托的方法,所以在调用newfunc.Invoke()的时候,才会去委托里面抓取值,这里会有一个等待的过程,等待线程执行完成

3:ThreadPool:线程池

线程池是在Thread后出的,算是一种很给力的进化

在Thread的年代,线程是直接从计算机里抓取的,而线程池的出现,就是给开发人员提供一个容器,可以从容器中抓取线程,也就是线程池

好处就在于可以避免频繁的创建和销毁线程,直接从线程池拿线程,用完在还回去,当不够的时候,线程池在从计算机中给你分配,岂不是很爽?

线程池的数量是可以控制的,最小线程数量:8

ThreadPool.QueueUserWorkItem(p =>
{
  //这里的p是没有值的
  Console.WriteLine(p);
  Thread.Sleep(2000);
  Console.WriteLine($"线程池线程,{Thread.CurrentThread.ManagedThreadId}");
});
ThreadPool.QueueUserWorkItem(p =>
{
  //这里的p就是传进来的值
  Console.WriteLine(p);
  Thread.Sleep(2000);
  Console.WriteLine($"线程池线程,带参数,{Thread.CurrentThread.ManagedThreadId}");
}, "这里是参数");

线程池用起来还是很方便的,也可以控制线程数量

线程池里有两种线程:普通线程,IO线程,我比较喜欢在操作IO的时候使用ThreadPool

int workerThreads = 0;
int completionPortThreads = 0;
//设置线程池的最大线程数量(普通线程,IO线程)
ThreadPool.SetMaxThreads(80, 80);
//设置线程池的最小线程数量(普通线程,IO线程)
ThreadPool.SetMinThreads(8, 8);
ThreadPool.GetMaxThreads(out workerThreads, out completionPortThreads);
Console.WriteLine($"当前可用最大普通线程:{workerThreads},IO:{completionPortThreads}");
ThreadPool.GetMinThreads(out workerThreads, out completionPortThreads);
Console.WriteLine($"当前可用最小普通线程:{workerThreads},IO:{completionPortThreads}");

ThreadPool并没有Thread的Join等待接口,那么想让ThreadPool等待要这么做呢?

ManualResetEvent:通知一个或多个正在等待的线程已发生的事件,相当于发送了一个信号

mre.WaitOne:卡住当前主线程,一直等到信号修改为true的时候,才会接着往下跑

//用来控制线程等待,false默认为关闭状态
ManualResetEvent mre = new ManualResetEvent(false);
ThreadPool.QueueUserWorkItem(p =>
{
  DoSomethingLong("控制线程等待");
  Console.WriteLine($"线程池线程,带参数,{Thread.CurrentThread.ManagedThreadId}");
  //通知线程,修改信号为true
  mre.Set();
});
//阻止当前线程,直到等到信号为true在继续执行
mre.WaitOne();
//关闭线程,相当于设置成false
mre.Reset();
Console.WriteLine("信号被关闭了");
ThreadPool.QueueUserWorkItem(p =>
{
  Console.WriteLine("再次等待");
  mre.Set();
});
mre.WaitOne();

4:Task,这也是现在用的最多的了,毕竟是比较新的玩意

关于Task就介绍几个最常用的接口,基本上就够用了(一招鲜吃遍天)

Task.Factory.StartNew:创建一个新的线程,Task的线程也是从线程池中拿的(ThreadPool)

Task.WaitAny:等待一群线程中的其中一个完成,这里是卡主线程,一直等到一群线程中的最快的一个完成,才能继续往下执行(20年前我也差点被后面的给追上),打个简单的比方:从三个地方获取配置信息(数据库,config,IO),同时开启三个线程来访问,谁快我用谁。

Task.WaitAll:等待所有线程完成,这里也是卡主线程,一直等待所有子线程完成任务,才能继续往下执行。

Task.ContinueWhenAny:回调形式的,任意一个线程完成后执行的后续动作,这个就跟WaitAny差不多,只不是是回调形式的。

Task.ContinueWhenAll:回调形式的,所有线程完成后执行的后续动作,理解同上

//线程容器 List<Task> taskList = new List<Task>();
Stopwatch watch = new Stopwatch();
watch.Start();
Console.WriteLine("************Task Begin**************");
//启动5个线程
for (int i = 0; i < 5; i++)
{
  string name = $"Task:{i}";
  Task task = Task.Factory.StartNew(() =>
  {
    DoSomethingLong(name);
  });
  taskList.Add(task);
}
//回调形式的,任意一个完成后执行的后续动作
Task any = Task.Factory.ContinueWhenAny(taskList.ToArray(), task =>
{
  Console.WriteLine("ContinueWhenAny");
});
//回调形式的,全部任务完成后执行的后续动作
Task all = Task.Factory.ContinueWhenAll(taskList.ToArray(), tasks =>
{
  Console.WriteLine($"ContinueWhenAll{tasks.Length}");
});
//把两个回调也放到容器里面,包含回调一起等待
taskList.Add(any);
taskList.Add(all);
//WaitAny:线程等待,当前线程等待其中一个线程完成
Task.WaitAny(taskList.ToArray());
Console.WriteLine("WaitAny");

//WaitAll:线程等待,当前线程等待所有线程的完成
Task.WaitAll(taskList.ToArray());
Console.WriteLine("WaitAll");
Console.WriteLine($"************Task End**************{watch.ElapsedMilliseconds},{Thread.CurrentThread.ManagedThreadId}");

关于Task其实只要熟练掌握这几个接口,基本上就够了,完全够用

5:Parallel(并行编程):其实就是在Task基础上又进行了一次封装,当然,Parallel也有很酷炫功能

问:首先是为什么叫做并行编程,跟Task有什么区别?

答:使用Task开启子线程的时候,主线程是属于空闲状态,并不参与执行(我是领导,有5件事情需要处理,我安排了5个手下去做,而我本身就是观望状态 PS:到底是领导。),Parallel开启子线程的时候,主线程也会参与计算(我是领导,我有5件事情需要处理,我身为领导,但是我很勤劳,所以我做了一件事情,另外四件事情安排4个手下去完成),很明显,减少了开销。

你以为Parallel就只有这个炫酷的功能?大错特错,更炫酷的还有;

Parallel可以控制线程的最大并发数量,啥意思?就是不管你是脑溢血,还是心脏病,还是动脉大出血,我的手术室只有2个,同时也只能给两个人做手术,做完一个在进来一个,同时做完两个,就同时在进来两个,多了不行。

当前,你想使用Task,或者ThreadPool来实现这样的效果也是可以的,不过这就需要你动动脑筋了,当然,有微软给封装好的接口直接使用,岂不美哉?

 //并行编程
Console.WriteLine($"*************Parallel start********{Thread.CurrentThread.ManagedThreadId}****");
//一次性执行1个或多个线程,效果类似:Task WaitAll,只不过Parallel的主线程也参与了计算
Parallel.Invoke(
  () => { DoSomethingLong("Parallel`1"); },
  () => { DoSomethingLong("Parallel`2"); },
  () => { DoSomethingLong("Parallel`3"); },
  () => { DoSomethingLong("Parallel`4"); },
  () => { DoSomethingLong("Parallel`5"); });
//定义要执行的线程数量
Parallel.For(0, 5, t =>
{
  int a = t;
  DoSomethingLong($"Parallel`{a}");
});
{
  ParallelOptions options = new ParallelOptions()
  {
    MaxDegreeOfParallelism = 3//执行线程的最大并发数量,执行完成一个,就接着开启一个
  };
  //遍历集合,根据集合数量执行线程数量
  Parallel.ForEach(new List<string> { "a", "b", "c", "d", "e", "f", "g" }, options, t =>
  {
    DoSomethingLong($"Parallel`{t}");
  });
}
{
  ParallelOptions options = new ParallelOptions()
  {
    MaxDegreeOfParallelism = 3//执行线程的最大并发数量,执行完成一个,就接着开启一个
  };
  //遍历集合,根据集合数量执行线程数量
  Parallel.ForEach(new List<string> { "a", "b", "c", "d", "e", "f", "g" }, options, (t, status) =>
  {
    //status.Break();//这一次结束。
    //status.Stop();//整个Parallel结束掉,Break和Stop不可以共存
    DoSomethingLong($"Parallel`{t}");
  });
}
Console.WriteLine("*************Parallel end************");

可以多了解一下Parallel的接口,里面的用法有很多,我这里也只是列出了比较常用的几个,像ParallelOptions类可以控制并发数量,当然,不可以也可以,Parallel的重载方法很多,可以自己看看

6:线程取消,异常处理

关于线程取消这块呢,要记住一点,线程只能自身终结自身,也就是除非我自杀,否则你干不掉我,不要妄想通过主线程来控制计算中的子线程。

关于线程异常处理这块呢,想要捕获子线程的异常,最好在子线程本身抓捕,也可以使用Task的WaitAll,不过这种方法不推荐,如果用了,别忘了一点,catch里面放的是AggregateException,不是Exception,不然捕捉不到别怪我

TaskFactory taskFactory = new TaskFactory();
//通知线程是否取消
CancellationTokenSource cts = new CancellationTokenSource();
List<Task> taskList = new List<Task>();
//想要主线程抓捕到子线程异常,必须使用Task.WaitAll,这种方法不推荐
//想要捕获子线程的异常,最好在子线程本身抓捕
//完全搞不懂啊,看懵逼了都
try
{
  for (int i = 0; i < 40; i++)
  {
    int name = i;
    //在子线程中抓捕异常
    Action<object> a = t =>
    {
      try
      {
        Thread.Sleep(2000);
        Console.WriteLine(cts.IsCancellationRequested);
        if (cts.IsCancellationRequested)
        {
          Console.WriteLine($"放弃执行{name}");
        }
        else
        {
          if (name == 1)
          {
            throw new Exception($"取消了线程{name}{t}");
          }
          if (name == 5)
          {
            throw new Exception($"取消了线程{name}{t}");
          }
          Console.WriteLine($"执行成功:{name}");
        }
      }
      catch (Exception ex)
      {
        //通知线程取消,后面的都不执行
        cts.Cancel();
        Console.WriteLine(ex.Message);
      }
    };
  taskList.Add(taskFactory.StartNew(a, name, cts.Token));
  }  
  Task.WaitAll(taskList.ToArray());
}
catch (AggregateException ex)
{
  foreach (var item in ex.InnerExceptions)
  {
    Console.WriteLine(item.Message);
  }
}
 

7:给线程上个锁,防止并发的时候数据丢失,覆盖等

//先准备三个公共变量
private static int iIndex;
private static object obj = new object();
private static List<int> indexList = new List<int>();

List<Task> taskList = new List<Task>();
//开启1000个线程
for (int i = 0; i < 10000; i++)
{
  int newI = i;
  Task task = Task.Factory.StartNew(() =>
  {
  iIndex += 1;
  indexList.Add(newI);
  });
  taskList.Add(task);
}
//等待所有线程完成
Task.WhenAll(taskList.ToArray());
//打印结果
Console.WriteLine(iIndex);
Console.WriteLine(indexList.Count);

给你们看一下我这里运行三次打印出的结果

第一次:

第二次:

第三次:

看的出来,还是比较稳定的只丢失那么几个数据的对吧?

为啥会这样呢?因为当两个线程同时操作一个数据的时候,你觉得会以谁的操作为标准来保存?就好像我们操作IO的时候,不允许多多个线程同时操作一个IO,因为计算机不知道以谁的标准来保存修改

下面给它加个锁,稍微修改一下代码:

List<Task> taskList = new List<Task>();
//开启1000个线程
for (int i = 0; i < 10000; i++)
{
    int newI = i;
    Task task = Task.Factory.StartNew(() =>
    {
        //加个锁
        lock (objLock)
        {
            iIndex += 1;
            indexList.Add(newI);
        }
    });
    taskList.Add(task);
}
//等待所有线程完成
Task.WhenAll(taskList.ToArray());
//打印结果
Console.WriteLine(iIndex);
Console.WriteLine(indexList.Count);

在看一下运行结果:

线程锁会破坏线程,增加耗时,降低效率,不要看效果很爽就到处加锁,万一你钥匙丢了呢(死锁);

共有变量:都能访问的局部变量/全局变量/数据库的值/硬盘文件,这些都有可能是数据不安全的,如果有需求,还是得加个锁

如果确实是要用到锁,推荐大家就使用一个:私有的,静态的,object类型的变量就可以了;

漏掉了一个方法,我给补上:

/// <summary>
/// 一个比较耗时的方法,循环1000W次
/// </summary>
/// <param name="name"></param>
public static void DoSomethingLong(string name)
{
    int iResult = 0;
    for (int i = 0; i < 1000000000; i++)
    {
        iResult += i;
    }
    Console.WriteLine($"********************{name}*******{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss ffff")}****{Thread.CurrentThread.ManagedThreadId}****");
}

原文地址:https://www.cnblogs.com/MuNet/p/8545139.html

时间: 2024-11-07 03:24:41

.NET 异步多线程,Thread,ThreadPool,Task,Parallel的相关文章

异步和多线程,委托异步调用,Thread,ThreadPool,Task,Parallel,CancellationTokenSource

1 进程-线程-多线程,同步和异步2 异步使用和回调3 异步参数4 异步等待5 异步返回值 5 多线程的特点:不卡主线程.速度快.无序性7 thread:线程等待,回调,前台线程/后台线程, 8 threadpool:线程池使用,设置线程池,ManualResetEvent9 Task初步接触 10 task:waitall waitany continueWhenAny continueWhenAll  11并行运算Parallel 12 异常处理.线程取消.多线程的临时变量和lock13 A

.net 多线程 Thread ThreadPool Task

先准备一个耗时方法 /// <summary>/// 耗时方法/// </summary>/// <param name="name"></param>private void DoSomeThing(string name){                 Console.WriteLine($"开始执行{name}, {Thread.CurrentThread.ManagedThreadId.ToString("

[多线程]thread,threadpool,task及TPL知识点整理

简单理解 Thread:是一个指令序列,个体对象. Threadpool:在使用Thread的过程中,程序员要为每个希望并发的序列new一个线程,很麻烦,因此希望有一个统一管理线程的方法,程序员就不需要关注线程的申请管理问题,所以就对Thread进行一系列封装,有了ThreadPool.使用Threadpool,把需要并发的序列添加进线程池,线程池根据其线程列表中的线程的空闲情况,动态为并发序列申请线程. Task:再后来,程序员发现在使用Threadpool的过程当中还是存在很多不便,比如:(

C#中 Thread,Task,Async/Await,IAsyncResult 的那些事儿!

说起异步,Thread,Task,async/await,IAsyncResult 这些东西肯定是绕不开的,今天就来依次聊聊他们 1.线程(Thread) 多线程的意义在于一个应用程序中,有多个执行部分可以同时执行:对于比较耗时的操作(例如io,数据库操作),或者等待响应(如WCF通信)的操作,可以单独开启后台线程来执行,这样主线程就不会阻塞,可以继续往下执行:等到后台线程执行完毕,再通知主线程,然后做出对应操作! 在C#中开启新线程比较简单 static void Main(string[]

C#5.0之后推荐使用TPL(Task Parallel Libray 任务并行库) 和PLINQ(Parallel LINQ, 并行Linq). 其次是TAP(Task-based Asynchronous Pattern, 基于任务的异步模式)

学习书籍: <C#本质论> 1--C#5.0之后推荐使用TPL(Task Parallel Libray 任务并行库) 和PLINQ(Parallel LINQ, 并行Linq). 其次是TAP(Task-based Asynchronous Pattern, 基于任务的异步模式). --用AggregateException处理Task上的未处理异常. --取消任务. CancellationToken --async修饰方法, 返回Task. task.wait(100)可以阻塞现场. a

TPL(Task Parallel Library)多线程、并发功能

The Task Parallel Library (TPL) is a set of public types and APIs in the System.Threading and System.Threading.Tasks namespaces. The purpose of the TPL is to make developers more productive by simplifying the process of adding parallelism and concurr

Task Parallel Library01,基本用法

我们知道,每个应用程序就是一个进程,一个进程有多个线程.Task Parallel Library为我们的异步编程.多线程编程提供了强有力的支持,它允许一个主线程运行的同时,另外的一些线程或Task也同时运行.本篇体验基本用法. 基本用法 Taks的构造函数接收的类型是Action,也就是一个委托. static void Main(string[] args) { var t1 = new Task(() => { Console.WriteLine("任务1开始"); Thr

异步多线程

UI只能在主线程中完成更新,在子线程中更新UI报错如下 Only the original thread that created a view hierarchy can touch its views. 但是,在主线程中完成耗时操作容易引起使用体验不佳,进程卡顿问题,为了解决此问题引入异步多线程

异步多线程(一)

进程 计算机概念,程序在服务器运行时占据全部计算机资源总和,虚拟的.包含CPU.内存.网络.硬盘 MSDN: 当一个程序开始运行时,它就是一个进程,进程包括运行中的程序和程序所使用到的内存和系统资源. 而一个进程又是由多个线程所组成的. 线程 计算机概念,进程在响应操作时最小单位,也包含CPU.内存.网络.硬盘 MSDN: 线程是程序中的一个执行流,每个线程都有自己的专有寄存器(栈指针.程序计数器等),但代码区是共享的,即不同的线程可以执行同样的函数. 多线程 计算机概念,一个进程有多个线程同时