TPL异步并行编程之任务超时

此处参考自阿涛的博文:http://www.cnblogs.com/HelloMyWorld/p/5526914.html

基本的思路:

net中异步操作由于是交给线程来实现,因此不可能真正想js那样将一个单线程上的任务移除:如

var id=setTimeout(fun,200);

if(id>0){

  clearTimeout(id);//将一个任务从单线程的任务栈中移除,自然就做到了真正的移除任务

}

但是在net中一个任务交给线程执行后,具体什么时候执行完成我们并不确定,就算是我们把线程终止掉,如果任务执行完了,且执行完后与之关联的处理函数关系任然建立,那么其处理函数一样会执行

那么对于net这样的现状,我们只好斩断与之关联的处理函数的关系,来达到取消一个任务的或者认为他超时

同样的原理,比如我们有一个搜索框,或者地图缩放来做一些事情,其实我们在输入框中输入一段文字是非常快的,假如我们以文本发生变化就去发起网络搜索,那么其实会发起n个字符的搜索请求;

但是实际上我们想要只是最后一次发起的请求而已,那么问题就来了,这些回来的数据谁先谁后都是随机的,也就自然的出现了结果乱七八糟,那么怎么办呢?

1 同步搜索

我们可以锁定ui界面让用户不再能够发起请求:当然是一个传统并非很友好的解决

如:点击搜索按钮,调用搜索程序发起搜索,锁住ui,等待结果返回,再解锁ui将操作交给用户;

2 延时处理

比如连续的触发任务执行,那么我们就让他在小于一段时间内的触发不发起真正的请求,这种办法也只是减少无用请求而已

如:我一直点搜索按钮一直点,是的一直点,但是当我到达比如200ms的才会发起请求,之前点的都没用;

3 取消无用的处理

比如我连续发起了10次请求但是,只要最后一次的,那么我把之前的就取消掉,请求其实已经发出,只是对于结果我们丢弃掉了

如:我一直点一直点一直点,发起了10次请求,但是我每次发起请求前就把上一次的丢弃掉,注意这里并不是真正让这个网络请求取消了,本质上是没有办法取消掉的,

我只是让回来的结果丢弃掉而已不做任何处理了,那么就算是我点了n次其实我也只取了我最后一次的结果,所以这样看起来合情合理,但是对于网络流量要求的app来说就很不科学了

4 将2和3结合起来

首先我们做一个延时处理比如200ms,当达到延时处理后再发起请求,但是有个特殊地方比如我在下一个200ms内又发起请求,此时结果并没有回来,那么我们将之前的任务取消掉就好了,这样也就相对友好的

处理了这些矛盾,这里兼顾了2和3的优缺点

以下是我改进的代码,让任务可以手动取消

封装的超时Task类

  1 public class TimeoutTask
  2 {
  3     #region 字段
  4     private Action _action;
  5     private CancellationToken _token;
  6     private event AsyncCompletedEventHandler _asyncCompletedEvent;
  7     private TaskCompletionSource<AsyncCompletedEventArgs> _tcs;
  8     #endregion
  9
 10     #region 静态方法
 11     public static async Task<AsyncCompletedEventArgs> StartNewTask(Action action, CancellationToken token)
 12     {
 13         return await TimeoutTask.StartNewTask(action, token, Timeout.Infinite);
 14     }
 15
 16     public static async Task<AsyncCompletedEventArgs> StartNewTask(Action action, int timeout)
 17     {
 18         return await TimeoutTask.StartNewTask(action, CancellationToken.None, timeout);
 19     }
 20
 21     public static async Task<AsyncCompletedEventArgs> StartNewTask(Action action, CancellationToken token,
 22         int timeout = Timeout.Infinite)
 23     {
 24         var task = new TimeoutTask(action, token, timeout);
 25
 26         return await task.Run();
 27     }
 28     #endregion
 29
 30     #region 构造
 31
 32     public TimeoutTask(Action action, int timeout) : this(action, CancellationToken.None, timeout)
 33     {
 34
 35     }
 36
 37     public TimeoutTask(Action action, CancellationToken token) : this(action, token, Timeout.Infinite)
 38     {
 39
 40     }
 41
 42     public TimeoutTask(Action action, CancellationToken token, int timeout = Timeout.Infinite)
 43     {
 44         _action = action;
 45
 46         _tcs = new TaskCompletionSource<AsyncCompletedEventArgs>();
 47
 48         if (timeout != Timeout.Infinite)
 49         {
 50             var cts = CancellationTokenSource.CreateLinkedTokenSource(token);
 51             cts.CancelAfter(timeout);
 52             _token = cts.Token;
 53         }
 54         else
 55         {
 56             _token = token;
 57         }
 58     }
 59     #endregion
 60
 61     #region 公用方法
 62
 63     /// <summary>
 64     /// 运行
 65     /// </summary>
 66     /// <returns></returns>
 67     public async Task<AsyncCompletedEventArgs> Run()
 68     {
 69         _asyncCompletedEvent += AsyncCompletedEventHandler;
 70
 71         try
 72         {
 73             using (_token.Register(() => _tcs.TrySetCanceled()))
 74             {
 75                 ExecuteAction();
 76                 return await _tcs.Task.ConfigureAwait(false);
 77             }
 78
 79         }
 80         finally
 81         {
 82             _asyncCompletedEvent -= AsyncCompletedEventHandler;
 83         }
 84
 85     }
 86
 87     public void Cancel()
 88     {
 89         if (!_token.CanBeCanceled)
 90         {
 91             _tcs.TrySetCanceled();
 92         }
 93     }
 94     #endregion
 95
 96     #region 私有方法
 97
 98     /// <summary>
 99     /// 执行Action
100     /// </summary>
101     private void ExecuteAction()
102     {
103         Task.Factory.StartNew(() =>
104         {
105             _action.Invoke();
106
107             OnAsyncCompleteEvent(null);
108         });
109     }
110
111     /// <summary>
112     /// 异步完成事件处理
113     /// </summary>
114     /// <param name="sender"></param>
115     /// <param name="e"></param>
116     private void AsyncCompletedEventHandler(object sender, AsyncCompletedEventArgs e)
117     {
118         if (e.Cancelled)
119         {
120             _tcs.TrySetCanceled();
121         }
122         else if (e.Error != null)
123         {
124             _tcs.TrySetException(e.Error);
125         }
126         else
127         {
128             _tcs.TrySetResult(e);
129         }
130     }
131
132     /// <summary>
133     /// 触发异步完成事件
134     /// </summary>
135     /// <param name="userState"></param>
136     private void OnAsyncCompleteEvent(object userState)
137     {
138         if (_asyncCompletedEvent != null)
139         {
140             _asyncCompletedEvent(this, new AsyncCompletedEventArgs(error: null, cancelled: false, userState: userState));
141         }
142     }
143     #endregion
144 }
145
146 /// <summary>
147 /// 有返回值,可超时,可取消的Task
148 /// </summary>
149 /// <typeparam name="T"></typeparam>
150 public class TimeoutTask<T>
151 {
152     #region 字段
153     private Func<T> _func;
154     private CancellationToken _token;
155     private event AsyncCompletedEventHandler _asyncCompletedEvent;
156     private TaskCompletionSource<AsyncCompletedEventArgs> _tcs;
157     #endregion
158
159     #region 静态方法
160     public static async Task<T> StartNewTask(Func<T> func, CancellationToken token,
161         int timeout = Timeout.Infinite)
162     {
163         var task = new TimeoutTask<T>(func, token, timeout);
164
165         return await task.Run();
166     }
167
168     public static async Task<T> StartNewTask(Func<T> func, int timeout)
169     {
170         return await TimeoutTask<T>.StartNewTask(func, CancellationToken.None, timeout);
171     }
172
173     public static async Task<T> StartNewTask(Func<T> func, CancellationToken token)
174     {
175         return await TimeoutTask<T>.StartNewTask(func, token, Timeout.Infinite);
176     }
177
178
179
180     #endregion
181
182     #region 公用方法
183     /// <summary>
184     /// 运行Task
185     /// </summary>
186     /// <returns></returns>
187     public async Task<T> Run()
188     {
189         _asyncCompletedEvent += AsyncCompletedEventHandler;
190
191         try
192         {
193             using (_token.Register(() => _tcs.TrySetCanceled()))
194             {
195                 ExecuteFunc();
196                 var args = await _tcs.Task.ConfigureAwait(false);
197                 return (T)args.UserState;
198             }
199
200         }
201         finally
202         {
203             _asyncCompletedEvent -= AsyncCompletedEventHandler;
204         }
205
206     }
207
208
209     public bool CanBeCanceled
210     {
211         get { return _token.CanBeCanceled; }
212     }
213
214     public void Cancel()
215     {
216         if (!_token.CanBeCanceled)
217         {
218             _tcs.SetCanceled();
219         }
220     }
221     #endregion
222
223     #region 构造
224     public TimeoutTask(Func<T> func, CancellationToken token) : this(func, token, Timeout.Infinite)
225     {
226
227     }
228
229     public TimeoutTask(Func<T> func, int timeout = Timeout.Infinite) : this(func, CancellationToken.None, timeout)
230     {
231
232     }
233
234     public TimeoutTask(Func<T> func, CancellationToken token, int timeout = Timeout.Infinite)
235     {
236         _func = func;
237
238         _tcs = new TaskCompletionSource<AsyncCompletedEventArgs>();
239
240         if (timeout != Timeout.Infinite)
241         {
242             var cts = CancellationTokenSource.CreateLinkedTokenSource(token);
243
244             cts.CancelAfter(timeout);
245             _token = cts.Token;
246         }
247         else
248         {
249             _token = token;
250         }
251     }
252     #endregion
253
254     #region 私有方法
255
256
257     /// <summary>
258     /// 执行
259     /// </summary>
260     private void ExecuteFunc()
261     {
262         ThreadPool.QueueUserWorkItem(s =>
263         {
264             var result = _func.Invoke();
265
266             OnAsyncCompleteEvent(result);
267         });
268     }
269
270     /// <summary>
271     /// 异步完成事件处理
272     /// </summary>
273     /// <param name="sender"></param>
274     /// <param name="e"></param>
275     private void AsyncCompletedEventHandler(object sender, AsyncCompletedEventArgs e)
276     {
277         if (e.Cancelled)
278         {
279             _tcs.TrySetCanceled();
280         }
281         else if (e.Error != null)
282         {
283             _tcs.TrySetException(e.Error);
284         }
285         else
286         {
287             _tcs.TrySetResult(e);
288         }
289     }
290
291     /// <summary>
292     /// 触发异步完成事件
293     /// </summary>
294     /// <param name="userState"></param>
295     private void OnAsyncCompleteEvent(object userState)
296     {
297         if (_asyncCompletedEvent != null)
298         {
299             _asyncCompletedEvent(this, new AsyncCompletedEventArgs(error: null, cancelled: false, userState: userState));
300         }
301     }
302     #endregion
303 }

demo

 1 class Program
 2     {
 3         private static TimeoutTask<string> result;
 4
 5         static void Main(string[] args)
 6         {
 7
 8
 9             ThreadMethod();
10
11
12             Console.WriteLine("启动完成");
13             Console.ReadLine();
14         }
15
16         private async static void ThreadMethod()
17         {
18 //            await TimeoutTask.StartNewTask(LongTimeWork, 6000);
19 //            await TimeoutTask<string>.StartNewTask(LongTimeWork2, 2000);
20             try
21             {
22                 for (int i = 0; i < 5; i++)
23                 {
24
25                     //我手动取消掉上一次的
26                     if (result!=null)
27                     {
28                         try
29                         {
30                             //取消掉
31                             result.Cancel();
32                         }
33                         catch (Exception er)
34                         {
35                         }
36                     }
37
38                     result = new TimeoutTask<string>(LongTimeWork2);
39
40
41
42                     try
43                     {
44                         result.Run();
45                     }
46                     catch (Exception ee)
47                     {
48
49                     }
50                 }
51
52
53                 Console.WriteLine(result);
54             }
55             catch (Exception ex)
56             {
57
58             }
59         }
60
61         private static void LongTimeWork()
62         {
63             Thread.Sleep(5000);
64         }
65
66         private static string LongTimeWork2()
67         {
68             Thread.Sleep(5000);
69             return "XXD";
70         }
71
72
73     }

时间: 2024-07-30 02:27:38

TPL异步并行编程之任务超时的相关文章

TPL异步并行编程之取消任务

TPL异步并行编程之简单使用 在上篇随笔里面说明了Task的使用,仅仅是简单使用,有时候把一个任务交给Task去执行,但是呢还是要管理下,比如说:我要叫这个任务停止了,不做了,任务取消了,或者超时了 在传统的While里面我们可以这样做,1 通过标识 2 通过一个方法抛异常,3 其他办法 举个例子:while(true){ if(isNotCancel){ //每次都判断下,取消没有,当然isNotCancel需要加上lock的 } } 难道在Task里面有什么新奇吗?其实也没啥新奇的,那Tas

TPL异步并行编程之简单使用

并行编程一直是一个老生常谈的话题 在这里记录一下TPL编程,这在net4.0 微软就已经提供了多核时代下的并行库,其中最核心的最常用的也就是Task 一 Task是什么 Task可以简单的理解为一个线程的封装,向外部暴露几种接口,如常见的Task.Run,Task.Factory.StartNew:当一个Task启动后应用程序将会等待这个Task的执行,但是不会去阻塞UI线程,换句话说,我丢了一个任务跟一个线程并让他去执行,然后我马上回到UI线程,这个线程运行完了就告诉UI线程我完事了,然后继续

TPL异步并行编程之回调

Task是基于ThreadPool线程池基础上的高度异步多线程编程,如果有一天我希望一个需要长时间运行的Task,在被某些异常终止后还能回调一些代码就可以知道Task终止的原因了吧 是的,且看代码 public static Task AsyncRun(this ILoggerProvider logProvider, Action task, TaskCreationOptions taskOption, Action<Exception> exceptionHandler) { retur

网络编程中的超时检测

我们在网络编程中常见的一种做法是:创建好套接字后以阻塞的方式读写,如果没有数据可读的话,程序会一直等待.事实上,网络状况一直不断变化,很有可能在通讯过程中出现网络连接断开.我们在程序中有必要对这种情况进行检测,从而及时做出响应.下面介绍几种常用的超时检测方法(假设我们要求通过套接字等待数据的最大时间为8秒): 一. 设置套接字接收超时 setsockopt可以设置套接字的属性,其中包括接收超时时间.参考代码如下        struct timeval tv; // 描述时间的结构体变量   

linux网络编程中的超时设置

1 下面是在网上找到的资料,先非常的感谢. 用setsockopt()来控制recv()与send()的超时 在send(),recv()过程中有时由于网络状况等原因,收发不能预期进行,而设置收发超时控制: 在Linux下需要注意的是时间的控制结构是struct timeval而并不是某一整型数,int nNetTimeout=1000;//1秒, //设置发送超时 setsockopt(socket,SOL_SOCKET,SO_SNDTIMEO,(char *)&nNetTimeout,siz

connect socket的超时设置

最近项目中,有个需求是检测某ip地址是否是通的,使用了socket的connect函数.但是,当ip地址写错的话,connect就会一直阻塞在那里,大概2.3分钟才能返回连接失败.这对于用户来说是不可接受的.下面的文章介绍了两种方法实现这种超时设置: 转自http://blog.csdn.net/ast_224/article/details/2957294  connect超时: 目前各平台通用的设置socket connect超时的办法是通过select(),具体方法如下: 1.建立sock

Java Socket编程详细解说

Java Socket编程 JavaSocketServerSocket乱码超时 Java Socket编程 对于Java Socket编程而言,有两个概念,一个是ServerSocket,一个是Socket.服务端和客户端之间通过Socket建立连接,之后它们就可以进行通信了.首先ServerSocket将在服务端监听某个端口,当发现客户端有Socket来试图连接它时,它会accept该Socket的连接请求,同时在服务端建立一个对应的Socket与之进行通信.这样就有两个Socket了,客户

缓存系列文章–无底洞问题

一.背景 1. 什么是缓存无底洞问题: Facebook的工作人员反应2010年已达到3000个memcached节点,储存数千G的缓存.他们发现一个问题-memcached的连接效率下降了,于是添加memcached节点,添加完之后,并没有好转.称为“无底洞”现象 2. 缓存无底洞产生的原因: 键值数据库或者缓存系统,由于通常采用hash函数将key映射到对应的实例,造成key的分布与业务无关,但是由于数据量.访问量的需求,需要使用分布式后(无论是客户端一致性哈性.redis-cluster.

分分钟学会GCD

2014 什么是GCD Grand Central Dispatch (GCD)是异步执行任务的技术之一.一般将应用程序中记述的线程管理用的代码在系统级中实现.由于线程管理是作为系统的一部分来实现的,因此可统一管理,也可执行任务,这样就比以前的线程更有效率. 也就是说,GCD用我们难以置信的非常简洁的记述方法,实现了极为复杂的多线程编程. dispatch_async(queue, ^{ // 长时间处理 // 例如数据库访问 // 例如图像识别 // 长时间处理结束,主线程使用该处理结果 di