分享一个异步任务在遇到IO异常时支持递归回调的辅助方法

public void TryAsyncActionRecursively<TAsyncResult>(
    string asyncActionName,
    Func<Task<TAsyncResult>> asyncAction,
    Action<int> mainAction,
    Action<TAsyncResult> successAction,
    Func<string> getContextInfoFunc,
    Action<Exception> failedAction,
    int retryTimes) where TAsyncResult : AsyncOperationResult
{
    var retryAction = new Action<int>(currentRetryTimes =>
    {
        if (currentRetryTimes > _immediatelyRetryTimes)
        {
            Task.Factory.StartDelayedTask(_retryIntervalForIOException, () => mainAction(currentRetryTimes + 1));
        }
        else
        {
            mainAction(currentRetryTimes + 1);
        }
    });
    var executeFailedAction = new Action<Exception>(ex =>
    {
        try
        {
            if (failedAction != null)
            {
                failedAction(ex);
            }
        }
        catch (Exception unknownEx)
        {
            _logger.Error(string.Format("Failed to execute the failedCallbackAction of asyncAction:{0}, contextInfo:{1}",
                asyncActionName, getContextInfoFunc()), unknownEx);
        }
    });
    var processTaskException = new Action<Exception, int>((ex, currentRetryTimes) =>
    {
        if (ex is IOException)
        {
            _logger.Error(string.Format("Async task ‘{0}‘ has io exception, contextInfo:{1}, current retryTimes:{2}, try to run the async task again.",
                asyncActionName, getContextInfoFunc(), currentRetryTimes), ex);
            retryAction(retryTimes);
        }
        else
        {
            _logger.Error(string.Format("Async task ‘{0}‘ has unknown exception, contextInfo:{1}, current retryTimes:{2}",
                asyncActionName, getContextInfoFunc(), currentRetryTimes), ex);            executeFailedAction(ex);
        }
    });
    var completeAction = new Action<Task<TAsyncResult>>(t =>
    {
        if (t.Exception != null)
        {
            processTaskException(t.Exception.InnerException, retryTimes);
            return;
        }
        var result = t.Result;
        if (result.Status == AsyncOperationResultStatus.IOException)
        {
            _logger.ErrorFormat("Async task ‘{0}‘ result status is io exception, contextInfo:{1}, current retryTimes:{2}, errorMsg:{3}, try to run the async task again.",
                asyncActionName, getContextInfoFunc(), retryTimes, result.ErrorMessage);
            retryAction(retryTimes);
            return;
        }
        if (successAction != null)
        {
            successAction(result);
        }
    });

    try
    {
        asyncAction().ContinueWith(completeAction);
    }
    catch (IOException ex)
    {
        _logger.Error(string.Format("IOException raised when executing async action ‘{0}‘, contextInfo:{1}, current retryTimes:{2}, try to run the async task again.",
            asyncActionName, getContextInfoFunc(), retryTimes), ex);
        retryAction(retryTimes);
    }
    catch (Exception ex)
    {
        _logger.Error(string.Format("Unknown exception raised when executing async action ‘{0}‘, contextInfo:{1}, current retryTimes:{2}",
            asyncActionName, getContextInfoFunc(), retryTimes), ex);
        executeFailedAction(ex);
    }
}

该函数的功能是:执行一个异步任务(返回Task的方法),如果执行出现IO异常,则重试当前主函数(mainAction);用户的mainAction中会再次调用TryAsyncActionRecursively方法。

从而实现当遇到IO异常时,能做到不断重试。另外,重试只立即重试指定的次数,超过指定次数,则不立即重试,而是暂停一定间隔后再次执行。

该函数还提供当acyncAction执行成功或失败后的回调函数,以及允许传入当前上下文的一些说明信息,以便记录有意义的错误日志信息。

下面是使用示例:

private void PublishEventAsync(ProcessingCommand processingCommand, EventStream eventStream, int retryTimes)
{
    TryAsyncActionRecursively<AsyncOperationResult>("PublishEventAsync",
    () => _eventPublisher.PublishAsync(eventStream),
    currentRetryTimes => PublishEventAsync(processingCommand, eventStream, currentRetryTimes),
    result =>
    {
        _logger.DebugFormat("Publish events success, {0}", eventStream);
        processingCommand.Complete(new CommandResult(CommandStatus.Success, processingCommand.Command.Id, null, null, null));
    },
    () => string.Format("[eventStream:{0}]", eventStream),
    ex => processingCommand.Complete(new CommandResult(CommandStatus.Failed, processingCommand.Command.Id, null, null, "Publish events failed.")),
    retryTimes);
}
PublishEventAsync(processingCommand, eventStream, 0);
时间: 2024-11-05 18:46:41

分享一个异步任务在遇到IO异常时支持递归回调的辅助方法的相关文章

使用C++ boost从零构建一个异步文件IO系统

前言 因为本科毕业设计中要做一个分布式文件系统,其中一个模块需要实现文件IO.为了验证我对异步IO的理解,决定造一个异步文件IO的轮子.操作系统已经给出各种异步操作的API,如重叠IO, IOCP,kqueue,select,poll,epoll等机制,而且C++也有很多跨平台的异步IO库,如libevent,boost::asio等.我参考已有的实现来完善这个小系统的功能. 渣技术,渣代码,出现问题请各位指出. 概述 同步与异步: 同步:假如我想对一个文件(socket也同理)进行处理,那么一

深入理解Tornado——一个异步web服务器

本人的第一次翻译,转载请注明出处:http://www.cnblogs.com/yiwenshengmei/archive/2011/06/08/understanding_tornado.html原文地址:http://golubenco.org/?p=16 这篇文章的目的在于对Tornado这个异步服务器软件的底层进行一番探索.我采用自底向上的方式进行介绍,从轮巡开始,向上一直到应用层,指出我认为有趣的部分.所以,如果你有打算要阅读Tornado这个web框架的源码,又或者是你对一个异步we

分享一个记录日志的类,可多线程使用。

好久没写博客了,今天分享一个自己用的日志类,非原创,借鉴了前辈的一个想法,然后修改来的. 日志我们是必须的,现在程序都是多线程并发了,记日志就有可能出现问题了,lock?影响性能.log4net太重量级了,本日志是一个轻量级的小工具. 废话不多说,看源码: 1 using System; 2 using System.Collections.Generic; 3 using System.IO; 4 using System.Text; 5 6 namespace GEDU.CourseOnli

[Unity3D入门]分享一个自制的入门级游戏项目&quot;坦克狙击手&quot;

[Unity3D入门]分享一个自制的入门级游戏项目"坦克狙击手" 我在学Unity3D,TankSniper(坦克狙击手)这个项目是用来练手的.游戏玩法来自这里(http://www.4399.com/flash/127672_3.htm),虽然抄袭了人家的创意,不过我只用来练习(目前还很不成熟,离人家的境界相差很大),坦克.导弹.建筑模型来自网络,应该不会有版权问题吧. 由于模型和代码总共10M以上了,需要源代码和发布的Windows版.网页版程序的同学麻烦点个赞并留下你的邮箱~ 到

Could not create pool connection. The DBMS driver exception was: Io 异常: Broken pipe

现场同事反馈:中间件weblogic连不上数据库Oracle,发回日志可以看到: Caused by: weblogic.common.ResourceException: weblogic.common.ResourceException: Could not create pool connection. The DBMS driver exception was: Io 异常: Broken pipe at weblogic.jdbc.common.internal.ConnectionE

分享一个最近在使用的响应式css框架 Pure

响应式,就是根据浏览器或不同设备屏幕大小自适应内容. 第一次查响应式呢,了解到是要在head里写判断浏览器宽度之类的东西,也要设置对IE低版本兼容. 如: <meta name="viewport" content="width=device-width, user-scalable=no, initial-scale=1.0; maximum-scale=1.0; user-scalable=0;" /><link rel="style

【开源.NET】 分享一个前后端分离的轻量级内容管理框架

开发框架要考虑的面太多了:安全.稳定.性能.效率.扩展.整洁,还要经得起实践的考验,从零开发一个可用的框架,是很耗时费神的工作.网上很多开源的框架,为何还要自己开发?我是基于以下两点: 没找到合适的:安全.稳定.简单.易用.高效.免费: 想成为架构师: 于是就自己动手,参考网上开源的项目和借鉴网友的设计思路(特别是萧秦系列博文),结合自己的实践,开发了一个简单.易用.高效的的框架,虽然不完善,但也能解决现实中的问题.不过随着见识增广,发现没负责过千万级别的项目难以成为架构师,也不可能开发出一个完

分享一个强大的采集类,还可以模拟php多进程

做采集的时候,可以使用file_get_contents()去获取网页源代码,但是使用file_get_contents采集,速度慢,而且超时时间,不好控制.如果采集的页面不存在,需要等待的时间很长.一般来说,curl的速度最快,其次是socket,最后是file_get_contents.现在跟大家分享一个很强大的采集类,会根据你的服务器当前的配置,自动选择最快的方式.已经封装了curl和socket,file_get_contents 用法很简单:1,采用get方法请求Http::doGet

分享一个Android和java调用RESTful Web服务的利器Resting

分享一个Android和java调用RESTful Web服务的利器Resting 当我们调用Web服务,往往是最终目标是取HTTP响应,将其转化为将在应用中呈现的值对象.Resting可以用来实现这一功能.Resting,在Java的一个轻量级的REST框架,可用于调用一个RESTful Web服务,并转换成响应来自客户端应用程序定制的Java对象.由于它的简单,resting是适合Android等手持设备. resting目标?暴露简单的get(),post(),put()和delete()