优雅的处理Redis访问超时

很长一段时间以来,一直在项目中使用Redis作为辅助存储,确切来说是利用Redis的内存存储,而不是将其作为缓存。比如常见的利用Set集合来判断某个数值是否存在,或者将来自不同请求的数据放在Redis中进行拼接然后一起写入MySQL等数据库。
这种存储目的的使用要求对Redis的访问不能失败(如果作为缓存使用,是接受失败的),所以作为存储目的使用代码中要对请求Redis的代码进行异常处理以及重试等。
在最初的代码中采用了最常见的方法如try ... catch ...处理异常,递归进行重试,类似:

//伪代码
public void Process(int retry)
{
    if(retry>3)
    {
        //记录错误
        return;
    }
    try
    {
        //业务代码
    }
    catch(Exception ex)
    {
        //重试
        ++retry;
        Process(retry);
    }
}

后来有一天看到了园友Jeffcky推荐的Polly库,瞬间眼前一亮,这才是我们处理异常和重试所需要的东西。
关于Polly的使用,可以参考Jeffcky的博文或者Polly项目的GitHub主页(文档很详细)。
大致的代码结构如:

var tsArr = new TimeSpan[]
{
    TimeSpan.FromSeconds(1),
    TimeSpan.FromSeconds(1)
};
// 构造一种重试测试(其它可选的包括熔断等)
var policy = Policy
    .Handle<Exception>()
    .WaitAndRetryAsync(tsArr);

// 需要有Polly调用的业务代码,以异步方法为例
async Task SomeToInvoke()
{
       // 一些异步调用
}

// 使用Polly执行业务代码(如不需要捕获异常可选用其它重载)
var pollyRet = await policy.ExecuteAndCaptureAsync(SomeToInvoke);
// 处理返回值判断调用是否成功,或发生了什么异常

下面一步步来看博主的实现过程。

先放上一些测试所用的代码,首先是创建Redis连接的接口和类,它们是从NopCommerce项目一个早起版本借(chao)鉴(xi)来的(文件名都没改,为了测试方便代码略有改动),一直用着没啥大问题就这样用了。

public interface IRedisConnectionWrapper : IDisposable
{
    IDatabase Database(int? db = null);
    IServer Server(EndPoint endPoint);
    EndPoint[] GetEndpoints();
    void FlushDb(int? db = null);
}
public class RedisConnectionWrapper : IRedisConnectionWrapper
{
    private readonly Lazy<string> _connectionString;
    private readonly Lazy<string> _auth;

    private volatile ConnectionMultiplexer _connection;
    private readonly object _lock = new object();

    public RedisConnectionWrapper(string server, string pswd)
    {
        this._connectionString = new Lazy<string>(() => server);
        this._auth = new Lazy<string>(() => pswd);
    }

    private ConnectionMultiplexer GetConnection()
    {
        if (_connection != null && _connection.IsConnected) return _connection;

        lock (_lock)
        {
            if (_connection != null && _connection.IsConnected) return _connection;

            if (_connection != null)
            {
                _connection.Dispose();
            }

            var options = new ConfigurationOptions();
            options.EndPoints.Add(_connectionString.Value);
            if (!string.IsNullOrEmpty(_auth.Value))
                options.Password = _auth.Value;

            _connection = ConnectionMultiplexer.Connect(options);
        }

        return _connection;
    }

    public IDatabase Database(int? db = null)
    {
        return GetConnection().GetDatabase(db ?? -1);
    }

    public IServer Server(EndPoint endPoint)
    {
        return GetConnection().GetServer(endPoint);
    }

    public EndPoint[] GetEndpoints()
    {
        return GetConnection().GetEndPoints();
    }

    public void FlushDb(int? db = null)
    {
        var endPoints = GetEndpoints();

        foreach (var endPoint in endPoints)
        {
            Server(endPoint).FlushDatabase(db ?? -1);
        }
    }

    public void Dispose()
    {
        if (_connection != null)
        {
            _connection.Dispose();
        }
    }
}

对于StackExchange.Redis来说是比较标准的连接创建方式,顺便看了下新版的NopCommerce代码中,代码有了些小改进,增加了一个双重锁。有需要的园友可以自行去下载新的。

接着开始考虑重试问题,为了代码看起来更简洁,决定尝试通过动态代理将捕捉异常并重试的操作作为切面注入。说到动态代理,第一个想到肯定是Castle.Core(前身为CastleDynamicProxy)。动态代理可以选择接口或者是类,如果是类的话需要方法是虚方法。看了下StackExchange.Redis的代码,几个实现类都是internal,方法也都是非virtual。所以只能只能自己写一个类包一下。
这个类就是一个壳,为了我们切面注入。下面的代码只保留的一个方法,其它的省略。另外Castle.Core的动态代理是不支持异步方法的,所以先用Redis的同步接口做下尝试。

public class RedisDatabaseWrapper:IDatabase
{
    private IDatabase _redisDb;

    public RedisDatabaseWrapper(IRedisConnectionWrapper redisConnectionWrapper)
    {
        _redisDb = redisConnectionWrapper.Database();
    }

    public virtual bool SetContains(RedisKey key, RedisValue value, CommandFlags flags = CommandFlags.None)
    {
        return _redisDb.SetContains(key, value, flags);
    }

    // 省略其它所有方法...
}

安装Castle.Core,并开始实现动态代理类。

public class RetryByPollyInterceptor : IInterceptor
{
    public async void Intercept(IInvocation invocation)
    {
        var isAsync = IsAsyncMethod(invocation.Method);
        if (isAsync)
            InterceptAsync(invocation);
        else
            InterceptSync(invocation);
    }

    private void InterceptSync(IInvocation invocation)
    {
        var tsArr = new TimeSpan[]
        {
            TimeSpan.FromSeconds(1),
            TimeSpan.FromSeconds(1)
        };

        Action<Exception, TimeSpan, int, Context> action = (ex, ts, idx, ctx) =>
        {
            Console.WriteLine($"Polly Exp:{ex.GetType()} {ex.Message} Try:{idx} ");

            var invca = (IInvocation)ctx["inv"];
            if (idx == 2)
            {
                var type = invca.Method.ReturnType;
                if (type == typeof(void)) return;
                var ret = type.IsValueType ? Activator.CreateInstance(type) : null;
                invca.ReturnValue = ret;
            }
        };

        var policy = Policy
            .Handle<TimeoutException>()
            .Or<RedisConnectionException>()
            .Or<Exception>()
            .WaitAndRetry(tsArr, action);

        void OrignalInvoke()
        {
            invocation.Proceed();
        }

        var pollyRet = policy.ExecuteAndCapture(OrignalInvoke, new Dictionary<string, object>() { ["inv"] = invocation });
        if (pollyRet.Outcome != OutcomeType.Successful)
        {
            Console.WriteLine($"Polly Ret Type:{pollyRet.Outcome} Exp:{pollyRet.ExceptionType} Msg:{pollyRet.FinalException?.Message}");
        }
    }

    private void InterceptAsync(IInvocation invocation)
    {
            // 异步方法代理,下文会讨论
    }

    private static bool IsAsyncMethod(MethodInfo method)
    {
        return (
            method.ReturnType == typeof(Task) ||
            (method.ReturnType.IsGenericType && method.ReturnType.GetGenericTypeDefinition() == typeof(Task<>))
        );
    }
}

注意
这个方法也是经过多次尝试才最终完成,可以看到这里预留了处理异步代理的方法,后文会详细说。对于同步方法这段代码可以完美的捕获异常并重试。不用在外侧代码进行catch。当然内部发生异常并多次重试仍失败后会返回非期望的结果,还是需要根据业务的需要对返回值进行判断。
这段代码最值得注意的是这几行:

Action<Exception, TimeSpan, int, Context> action = (ex, ts, idx, ctx) =>
{
    Console.WriteLine($"Polly Exp:{ex.GetType()} {ex.Message} Try:{idx} ");

    var invca = (IInvocation)ctx["inv"];
    if (idx == 2)
    {
        var type = invca.Method.ReturnType;
        if (type == typeof(void)) return;
        var ret = type.IsValueType ? Activator.CreateInstance(type) : null;
        invca.ReturnValue = ret;
    }
};

由于我们设置重试两次,当第二次发生异常时,我们强制给方法返回值赋一个返回值,这样可以让外部调用方法正常执行下去而不会由于无法获取代理方法的返回值而报空引用异常。

接着看看其它组成部分。在博主目前大部分项目中都使用Autofac作为容器,我们需要注册一下用到的类。并且通过Autofac的Castle.Core插件,可以注册动态代理,这样就不用通过给类添加Attribute的方式来添加代理,这是个人比较喜欢的风格。

var builder = new ContainerBuilder();

builder.Register(c => new RetryByPollyInterceptor()); //动态代理类

builder.RegisterType<RedisDatabaseWrapper>().As<IDatabase>().EnableInterfaceInterceptors().InterceptedBy(typeof(RetryByPollyInterceptor)).SingleInstance(); //添加动态代理

builder.RegisterType<RedisConnectionWrapper>().As<IRedisConnectionWrapper>()
    .WithParameters(new[]
    {
        new NamedParameter("server", "127.0.0.1"),
        new NamedParameter("pswd",""),
    }).SingleInstance();

Container = builder.Build();

可以用下面的代码来测试一下上面这些方法。

public void ReadTest(long start, long end)
{
    for (var i = start; i <= end; i++)
    {
        var exists = _redisDb.SetContains(RedisKey, i);
    }
}

可以使用Windows版的Redis,直接运行redis-server.exe来启动服务。然后直接关闭redis-server程序来模拟服务端失败,或者直接禁用网卡来模拟网络失败。
可以看到Polly会进行重试并且捕获异常,也就说在ReadTest中感知不到异常。

搞定了同步方法,开始尝试动态代理异步方法。添加Redis异步接口的实现并注册:

public class RedisDatabaseAsyncWrapper:IDatabaseAsync
{
    private IDatabase _redisDb;

    public RedisDatabaseAsyncWrapper(IRedisConnectionWrapper redisConnectionWrapper)
    {
        _redisDb = redisConnectionWrapper.Database();
    }

    public virtual async Task<bool> SetContainsAsync(RedisKey key, RedisValue value, CommandFlags flags = CommandFlags.None)
    {
        return await _redisDb.SetContainsAsync(key, value, flags);
    }

    // 省略其它实现..
}

//注册异步实现
builder.RegisterType<RedisDatabaseAsyncWrapper>().As<IDatabaseAsync>().EnableInterfaceInterceptors().InterceptedBy(typeof(RetryByPollyInterceptor)).SingleInstance();

//异步代理
private void InterceptAsync(IInvocation invocation)
{
    var tsArr = new TimeSpan[]
    {
        TimeSpan.FromSeconds(1),
        TimeSpan.FromSeconds(1)
    };

    var policy = Policy
        .Handle<TimeoutException>()
        .Or<RedisConnectionException>()
        .Or<Exception>()
        .WaitAndRetry(tsArr);

    void OrignalInvoke()
    {
        try
        {
            invocation.Proceed();
        }
        catch (Exception e)
        {
            var geneType = invocation.Method.ReturnType.GenericTypeArguments[0];
            var ret = geneType.IsValueType ? Activator.CreateInstance(geneType) : null;
            invocation.ReturnValue = Task.FromResult(ret);

            Console.WriteLine(e);
        }
    }

    var pollyRet = policy.ExecuteAndCapture(OrignalInvoke,
        new Dictionary<string, object>() { ["inv"] = invocation });
    if (pollyRet.Outcome != OutcomeType.Successful)
    {
        Console.WriteLine(
            $"Polly Ret Type:{pollyRet.Outcome} Exp:{pollyRet.ExceptionType} Msg:{pollyRet.FinalException?.Message}");

        var invca = (IInvocation)pollyRet.Context["inv"];

        var type = invca.Method.ReturnType;
        if (type == typeof(void)) return;
        if (type.IsGenericType)
        {
            var geneType = invca.Method.ReturnType.GenericTypeArguments[0];
            var ret = geneType.IsValueType ? Activator.CreateInstance(geneType) : null;
            invca.ReturnValue = Task.FromResult(ret);
        }
        else
        {
            invca.ReturnValue = Task.FromResult(0);
        }
    }
}

这里直接告诉各位我的尝试结果是无论如何都无法通过Polly来捕获异常。即上面代码中,OrignalInvoke方法中try...catch...抓不到异常,异常直接被扔给了外部方法。具体原因由于本人比较菜也比较懒没有仔细研究,大概可能就是用一个同步环境去调异步环境的方法没有特殊处理所以出的问题。有知道的园友评论中指点下。
如果是把invocation.Proceed()放在Task中,到是异常不会抛到外侧,但会因为被代理的方法取不到返回值而报空引用错误。原因大概应该是Castle.Core没有取到这个异步构造中的返回值。

经过一番尝试后放弃。在查找解决方法的过程中还发现一个名为Castle.Core.AsyncInterceptor的库,给Castle.Core添加动态代理异步函数的功能,但此扩展的文档实在过长,而且粗略看了下还不支持针对Autofac等IoC容器的扩展,直接放弃。

后来机缘巧合看到了园友Lemon大神的介绍其AspectCore库的文章。留言问了下对异步方法支持的情况,Lemon大神立刻给了回复,还附送了一些使用的特别提示。于是立马安装尝试。

首先是最重要的代理方法,AspectCore原生对异步方法提供支持,代码写起来很简单:

public class RetryByPollyAspectCoreInterceptor : AbstractInterceptorAttribute
{
    public override async Task Invoke(AspectContext context, AspectDelegate next)
    {
        var tsArr = new TimeSpan[]
        {
            TimeSpan.FromSeconds(1),
            TimeSpan.FromSeconds(1)
        };

        var policy = Policy
            .Handle<AspectInvocationException>(ex=>ex.InnerException?.GetType()==typeof(TimeoutException))
            .Or<AspectInvocationException>(ex=>ex.InnerException?.GetType()==typeof(RedisConnectionException))
            .WaitAndRetryAsync(tsArr);

        async Task OrignalInvoke()
        {
            await context.Invoke(next);
        }

        var pollyRet = await policy.ExecuteAndCaptureAsync(OrignalInvoke,new Dictionary<string, object>() { ["ctx"] = context});
        if (pollyRet.Outcome != OutcomeType.Successful)
        {
            Console.WriteLine($"Polly Ret Type:{pollyRet.Outcome} Exp:{pollyRet.ExceptionType} Msg:{pollyRet.FinalException?.Message}");

            var ctx = (AspectContext)pollyRet.Context["ctx"];
            var type = ctx.ProxyMethod.ReturnType;
            if (type == typeof(void)) return;
            if (type.IsGenericType)
            {
                var geneType = type.GenericTypeArguments[0];
                dynamic ret = geneType.IsValueType ? Activator.CreateInstance(geneType) : null;
                ctx.ReturnValue = Task.FromResult(ret);
            }
            else
            {
                var ret = type.IsValueType ? Activator.CreateInstance(type) : null;
                ctx.ReturnValue = Task.FromResult(ret);
            }
        }
    }
}

AspectCore也有Autofac的扩展,注册也是非常简单:

builder.RegisterDynamicProxy();

不过AspectCore还是需要给被代理的类添加Attribute:

[RetryByPollyAspectCoreInterceptor]
public class RedisDatabaseAsyncWrapper:IDatabaseAsync
{
    ...
}

希望大神可以扩展AspectCore的Autofac插件实现无需Attribute的代理设置。

最后可以使用下面的代码测试这个异步的重试实现:

public async Task ReadTestAsync(long start, long end)
{
    var total = end - start;
    for (var i = 0; i <= total; i++)
    {
        var item = i + start;
        var exists = await _redisDb.SetContainsAsync(RedisKey, item);
    }
}

可以看到代理方法完美的处理了异常。

文末,在这个异步方法越来越多的新时代再次强烈推荐AspectCore。
感谢各位大神提供了这么多好用的库。感谢各位园友阅读本文。

原文地址:https://www.cnblogs.com/lsxqw2004/p/8284365.html

时间: 2024-10-01 19:59:43

优雅的处理Redis访问超时的相关文章

redis连接超时问题

使用java代码测试redis时,报redis连接超时异常,而linux上的redis能正常访问: redis.clients.jedis.exceptions.JedisConnectionException: java.net.SocketTimeout 分析原因:有可能是linux的6379端口被防火墙拦截了(此外,redis.conf配置文件中bind后面需要将绑定的ip修改为实际要使用的ip地址) 打开端口:/sbin/iptables -I INPUT -p tcp --dport

Gateway 访问超时 返回504

问题梳理 [现象]Gateway访问超时 { "timestamp": "2019-06-29 11:45:13", "path": "/admin/user/info", "status": 504, "error": "Gateway Timeout", "message": "Response took longer than co

5.C#编写Redis访问类

那么通过前面几篇博文,服务端的安装和配置应该没什么问题了,接下来的问题是如何通过代码来访问Redis. 这里我们使用的库为: StackExchange.Redis GitHub:https://github.com/StackExchange/StackExchange.Redis NuGet:PM> Install-Package StackExchange.Redis Newtonsoft.Json(Json.NET) GitHub:https://github.com/JamesNK/N

nginx lua redis 访问频率限制(转)

1. 需求分析 Nginx来处理访问控制的方法有多种,实现的效果也有多种,访问IP段,访问内容限制,访问频率限制等. 用Nginx+Lua+Redis来做访问限制主要是考虑到高并发环境下快速访问控制的需求. Nginx处理请求的过程一共划分为11个阶段,分别是: post-read.server-rewrite.find-config.rewrite.post-rewrite. preaccess.access.post-access.try-files.content.log. 在openre

redis的超时删除策略

这个问题有三种可能的答案,它们分别代表了三种不同的删除策略: ·定时删除:在设置键的过期时间的同时,创建一个定时器(timer),让定时器在键的过期时间来临时,立即执行对键的删除操作. ·惰性删除:放任键过期不管,但是每次从键空间中获取键时,都检查取得的键是否过期,如果过期的话,就删除该键:如果没有过期,就返回该键. ·定期删除:每隔一段时间,程序就对数据库进行一次检查,删除里面的过期键.至于要删除多少过期键,以及要检查多少个数据库,则由算法决定. 在这三种策略中,第一种和第三种为主动删除策略,

使用kendynet构建异步redis访问服务

最近开始在kendynet上开发手游服务端,游戏类型是生存挑战类的,要存储的数据结构和类型都比较简单,于是选择了用redis做存储,数据类型使用string基本就足够了.于是在kendynet上写了一个简单的redis异步访问接口. 设计理念 1.项目时间紧迫,不打算提供一个大而全的访问接口,只提供一个request接口用以发出redis请求. 2.数据在redis中key和value都存储为string,由使用者负责将数据序列化成string,从string反序列化回数据. 3.服务支持本地访

kendynet构建异步redis访问服务

最近开始在kendynet上开发手游服务端,游戏类型是生存挑战类的,要存储的数据结构和类型都比较简单,于是选择了用redis做存储,数据类型使用string基本就足够了.于是在kendynet上写了一个简单的redis异步访问接口. 设计理念 1.项目时间紧迫,不打算提供一个大而全的访问接口,只提供一个request接口用以发出redis请求. 2.数据在redis中key和value都存储为string,由使用者负责将数据序列化成string,从string反序列化回数据. 3.服务支持本地访

用redis做超时判断 感觉写的很有意思

public Long getSessionIdByUserId(Long systemId, Long customerUserId) { if (customerUserId == null) { LoggerHelper.info(this.getClass(), "custom user id is null."); return DEFAULT_SESSION_ID; } Long sessionId = getSessionId(String.valueOf(custome

Unable to Connect: sPort: 0 C# ServiceStack.Redis 访问 redis

需求:  对数据库中的不断抓取的文章进行缓存,因此需要定时访问数据,写入缓存中 在捕获到的异常日志发现错误:Unable to Connect: sPort: 0 使用的访问方式是线程池的方式:PooledRedisClientManager 经过测试发现在并发访问redis服务的情况下出现该异常的概率比较高, 解决方法 第一点:要使用using(据说访问效率在高并发的时候会有影响,简单的测试过了确实是这样,不过现在的业务达不到高并发量,速度还是很快滴) using (IRedisClient