利用AOP写2PC框架(二)

AOP的底层已经封装好了以后,我们就要开始针对应用层写具体的业务逻辑了。

也就是说我们需要有个类继承于AopProxyBase,并且重写其After,Bofore以达到我们的拦截记录的功能。代码如下:

public class TransactionProxy : AopProxyBase
    {
        public TransactionProxy(MarshalByRefObject obj, Type type)
            : base(obj, type)
        { }

        public override void Before(System.Runtime.Remoting.Messaging.IMessage requestMsg, AopMethodAttribute[] attrs)
        {

        }

        public override void After(System.Runtime.Remoting.Messaging.IMessage requestMsg, System.Runtime.Remoting.Messaging.IMessage Respond, AopMethodAttribute[] attrs)
        {
            foreach (var attr in attrs)
            {
                if (attr is LogicRollBackTransAttribute)
                {
                    return;
                }
            }

            var args = requestMsg.Properties["__Args"] as object[];
            string methodName = requestMsg.Properties["__MethodName"] as string;
            CustomTransaction customTrans = null;
            List<object> list = new List<object>();

            customTrans = CallContext.GetData(TransKey.CustomTransKey) as CustomTransaction;
            if (customTrans != null)
            {
                list.AddRange(args);
                TransactionUnit unit = AppTransactionManage.Instance.GetRollBackInfo(methodName);
                if (unit != null)
                {
                    unit.Argments = list;
                }
                customTrans.Compensation.Add(unit);
                CallContext.SetData(TransKey.CustomTransKey, customTrans);

                var outArgs = Respond.Properties["__OutArgs"] as object[];
                IDbTransaction dbTrans;
                foreach (var attr in attrs)
                {
                    if (attr is DbTransAttribute || attr is LogicTransAttribute)
                    {
                        if (outArgs != null)
                        {
                            foreach (var arg in outArgs)
                            {
                                if (arg is IDbTransaction)
                                {
                                    dbTrans = arg as IDbTransaction;
                                    if (customTrans != null)
                                    {
                                        customTrans.AddDbTransaction(dbTrans);
                                    }
                                }
                            }
                        }
                    }
                }
            }
        }
    }

在After的地方,我们可以看到,我们做了一次LogicRollBackTransAttribute的判定,避免在回调的时候,又再走一次拦截和记录的流程。

同时做了DbTransAttribute和LogicTransAttribute的判定。因为我把事务分为两类,一类是db本身自己控制的,可以直接rollback的,一类是logic的,需要我们去手动通过逻辑回滚的。代码如下:

[AttributeUsage(AttributeTargets.Method)]
    public class LogicTransAttribute : AopMethodAttribute
    {
        public string MethodName { get; set; }

        public LogicTransAttribute()
        {

        }

        public LogicTransAttribute(string name)
        {
            this.MethodName = name;
        }
    }

[AttributeUsage(AttributeTargets.Method)]
    public class DbTransAttribute : AopMethodAttribute
    {

    }

同时可以看到,我把每一个函数的调用作为一个单元,用TransactionUnit类来保存,代码如下:

public class TransactionUnit
    {
        public object InstanceObject;
        /// <summary>
        /// 执行的方法
        /// </summary>
        public MethodInfo Forward;
        /// <summary>
        /// 失败回滚的方法
        /// </summary>
        public MethodInfo Rollback;
        /// <summary>
        /// 参数
        /// </summary>
        public IList<object> Argments;
    }

因为,一个事务里面,可能包含了多次操作redis,或者多次操作db,为了保证线程安全,同时又需要避开锁,我用了CallContext将一个线程里面的一段事务,保存在其线程上下文中。在保存一个完整的TransactionUnit的时候,不可能每一次都去通过反射去取MethodInfo,所以又增加了一段初始化和字典来保存其MethodInfo。代码如下:

public class AppTransactionManage
    {
        private Dictionary<string, TransactionUnit> _transMaps;

        static AppTransactionManage() { }
        private AppTransactionManage()
        {
            if (this._transMaps == null)
            {
                this._transMaps = new Dictionary<string, TransactionUnit>();
            }
        }

        private static AppTransactionManage _instance;
        public static AppTransactionManage Instance
        {
            get
            {
                if (_instance == null)
                {
                    _instance = new AppTransactionManage();
                }
                return _instance;
            }
        }

        public TransactionUnit GetRollBackInfo(string methodName)
        {
            if (this._transMaps == null) throw new ArgumentNullException("not init");
            if (this._transMaps.ContainsKey(methodName))
            {
                return this._transMaps[methodName];
            }
            return null;
        }

        public void Init(params string[] assembly)
        {
            if (assembly != null)
            {
                foreach (string s in assembly)
                {
                    var ass = Assembly.Load(s);
                    if (ass != null)
                    {
                        var types = ass.GetTypes();
                        foreach (var type in types)
                        {
                            var transAttr = type.GetCustomAttribute(typeof(TransactionAttribute), false) as TransactionAttribute;
                            if (transAttr != null)
                            {
                                var methods = type.GetMethods();
                                foreach (var method in methods)
                                {
                                    var forwardTrans = method.GetCustomAttribute(typeof(LogicTransAttribute), false) as LogicTransAttribute;
                                    var rollbackTrans = method.GetCustomAttribute(typeof(LogicRollBackTransAttribute), false) as LogicRollBackTransAttribute;

                                    TransactionUnit unit;
                                    if (forwardTrans != null)
                                    {
                                        if (!this._transMaps.TryGetValue(forwardTrans.MethodName, out unit))
                                        {
                                            unit = new TransactionUnit();
                                        }
                                        unit.Forward = method;
                                        unit.InstanceObject = Activator.CreateInstance(type);
                                        this._transMaps[forwardTrans.MethodName] = unit;
                                    }

                                    if (rollbackTrans != null)
                                    {
                                        if (!this._transMaps.TryGetValue(rollbackTrans.MethodName, out unit))
                                        {
                                            unit = new TransactionUnit();
                                        }
                                        unit.Rollback = method;
                                        unit.InstanceObject = Activator.CreateInstance(type);
                                        this._transMaps[rollbackTrans.MethodName] = unit;
                                    }
                                }
                            }
                        }
                    }
                }
            }
        }
    }

为了友好开发者的调用,可以让其像使用SqlTransaction一样来使用,我又对外公开了一个CustomTranstion,将调用方式封装在这个类里面,代码如下:

public class CustomTransaction : IDisposable
    {
        private List<IDbTransaction> _dbTransactions;

        private bool _isRollBack = true;

        /// <summary>
        /// 补偿机制
        /// </summary>
        public List<TransactionUnit> Compensation;

        public void Commit()
        {
            if (this._dbTransactions != null)
            {
                this._dbTransactions.ForEach((t) => t.Commit());
            }
            this._isRollBack = false;
        }

        public void RollBack()
        {
            if (this.Compensation != null)
            {
                this.Compensation.ForEach((t) =>
                {
                    object[] paramsArray = t.Argments == null ? null : t.Argments.ToArray();
                    t.Rollback.Invoke(t.InstanceObject, paramsArray);
                });
            }
            if (this._dbTransactions != null)
            {
                this._dbTransactions.ForEach((t) => t.Rollback());
            }
        }

        private bool _isRetry = true;

        public CustomTransaction(bool isRetry = true)
        {
            this._isRetry = isRetry;
            if (this._dbTransactions == null)
            {
                this._dbTransactions = new List<IDbTransaction>();
            }
            if (this.Compensation == null)
            {
                this.Compensation = new List<TransactionUnit>();
            }
            CallContext.SetData(TransKey.CustomTransKey, this);
        }

        public void AddDbTransaction(IDbTransaction transaction)
        {
            this._dbTransactions.Add(transaction);
        }

        public void Dispose()
        {
            if (this._isRollBack)
            {
                this.RollBack();
            }
            CallContext.FreeNamedDataSlot(TransKey.CustomTransKey);
        }
    }

这个时候,你就可以像是用SqlTransaction一样去Using(var trans = new CustomTranstion()){}然后在using里面去写trans.Commit();来提交所有的事务操作,如果不做Commit操作的话,在CustomTranstion里面,会自动去调用其rollback()操作。

但是这并没有完,所有的只是记录下来了,但是并没有保存到DB去做持久化。这个时候就需要增加一个队列,来不断的去将TransactionUnit来保存到db,同时又需要把队列去做持久化,避免一些意外原因,导致队列数据丢失,而缺失了这部分的日志记录(虽然我个人认为这一部分可以省略)。代码如下:

[Serializable]
    public class TransQueue : IDisposable
    {
        public Queue<Action> _transQueue;
        private Thread _thread;
        private bool _isDispose;

        public delegate void PersistenceHandler(Action[] actions);

        PersistenceHandler persistenceHandler;
        private readonly object _syncObject = new object();
        public TransQueue()
        {
            if (_transQueue == null)
            {
                _transQueue = new Queue<Action>();
            }
            if (persistenceHandler == null)
            {
                persistenceHandler = PersistenceToDisk;
            }

            if (_thread == null)
            {
                _thread = new Thread(Thread_Work)
                {
                    IsBackground = true
                };
            }
            _thread.Start();
        }

        public void Push(Action action)
        {
            if (_transQueue == null) throw new ArgumentNullException("transQueue is not init");

            lock (_syncObject)
            {
                _transQueue.Enqueue(action);
            }
        }

        public void Thread_Work()
        {
            while (!_isDispose)
            {
                Action[] items = null;
                if (_transQueue != null && _transQueue.Count > 0)
                {
                    lock (_syncObject)
                    {
                        items = new Action[_transQueue.Count];
                        _transQueue.CopyTo(items, 0);
                    }
                }

                if (items != null && items.Length > 0)
                {
                    persistenceHandler.BeginInvoke(items, PersistenceHandlerCallBack, persistenceHandler);
                    foreach (var item in items)
                    {
                        item.Invoke();
                    }
                }
            }
        }

        public void PersistenceHandlerCallBack(IAsyncResult result)
        {
            try
            {
                (result.AsyncState as PersistenceHandler).EndInvoke(result);
            }
            catch (Exception e)
            {
            }
        }

        public void PersistenceToDisk(Action[] items)
        {
            BinaryHelper.SaveToFile(items);
        }

        public void Dispose()
        {
            _isDispose = true;
            _thread.Join();
        }

    }

 public class TransQueueManage
    {
        private int _threadNumber = 2;
        private TransQueue[] _transQueue;
        Random random = new Random();
        private TransQueueManage()
        {
            if(_transQueue == null)
            {
                _transQueue = new TransQueue[_threadNumber];
                for (var i = 0; i < _threadNumber; i++)
                {
                    _transQueue[i] = new TransQueue();
                }
            }
        }

        static TransQueueManage()
        {

        }

        private static readonly object _syncObject = new object();
        private static TransQueueManage _instance;
        public static TransQueueManage Instance
        {
            get
            {
                if (_instance == null)
                {
                    lock (_syncObject)
                    {
                        if (_instance == null)
                        {
                            _instance = new TransQueueManage();
                        }
                    }
                }
                return _instance;
            }
        }

        public void Push(Action action)
        {
            var index = GetRandomThreadIndex();
            _transQueue[index].Push(action);
        }

        public int GetRandomThreadIndex()
        {
            return random.Next(0, _threadNumber);
        }
    }

时间: 2024-08-10 07:53:31

利用AOP写2PC框架(二)的相关文章

利用AOP实现空模式的无缝使用 创世纪代码应用之一:一行代码让接口框架RUN起来

这是我开播第一篇,朋友们多多支持.捧场,谢谢. 引子 地是空虚混沌.渊面黑暗. 神的灵运行在水面上.  神说.要有光.就有了光.  神看光是好的.就把光暗分开了.  神称光为昼.称暗为夜.有晚上.有早晨.这是头一日. ——引至<圣经.神创造天地> 关键词:null,AOP,Spring.Net框架,空模式,面向接口编程,单元测试,方法拦截器 摘要:在我们编程的时候很难离开null,它给我们带来了很多麻烦.本文从新的视角利用AOP无缝使用空模式部分解决了这个问题,最重要的是可以使得我们的程序尽早

(二)springMvc原理和手写springMvc框架

我们从两个方面了解springmvc执行原理,首先我们去熟悉springmvc执行的过程,然后知道原理后通过手写springmvc去深入了解代码中执行过程. (一)SpringMVC流程图 (二)SpringMVC流程 1.  用户发送请求至前端控制器DispatcherServlet. 2.  DispatcherServlet收到请求调用HandlerMapping处理器映射器. 3.  处理器映射器找到具体的处理器(可以根据xml配置.注解进行查找),生成处理器对象及处理器拦截器(如果有则

Android开发之手把手教你写ButterKnife框架(二)

欢迎转载,转载请标明出处: http://blog.csdn.net/johnny901114/article/details/52664112 本文出自:[余志强的博客] 上一篇博客Android开发之手把手教你写ButterKnife框架(一)我们讲了ButterKnife是什么.ButterKnife的作用和功能介绍以及ButterKnife的实现原理. 本篇博客主要讲在android studio中如何使用apt. 一.新建个项目, 然后创建一个module名叫processor 新建m

关于Quartz.NET作业调度框架的一点小小的封装,实现伪AOP写LOG功能

Quartz.NET是一个非常强大的作业调度框架,适用于各种定时执行的业务处理等,类似于WINDOWS自带的任务计划程序,其中运用Cron表达式来实现各种定时触发条件是我认为最为惊喜的地方. Quartz.NET主要用到下面几个类: IScheduler --调度器 IJobDetail --作业任务 ITrigger --触发器 如果我们自己采用Timer来写类似的定时执行任务程序的话,相应的我们应该有:(以下均为设想,目的是让大家搞清楚Quartz.NET上面三个接口的关系) Schedul

利用TraceSource写日志

利用TraceSource写日志 从微软推出第一个版本的.NET Framework的时候,就在“System.Diagnostics”命名空间中提供了Debug和Trace两个类帮助我们完成针对调试和跟踪信息的日志记录.在.NET Framework 2.0中,微软引入了TraceSource并对跟踪日志系统进行了优化,优化后的跟踪日志系统在.NET Core中又经过了相应的简化..NET Core的日志模型借助TraceSourceLoggerProvider实现对TraceSource的整

利用多写Redis实现分布式锁原理与实现分析

在我写这篇文章的时候,其实我还是挺纠结的,因为我这个方案本身也是雕虫小技拿出来显眼肯定会被贻笑大方,但是我最终还是拿出来与大家分享,我本着学习的态度和精神,希望大家能够给与我指导和改进方案. 一.关于分布式锁 关于分布式锁,可能绝大部分人都会或多或少涉及到. 我举二个例子: 场景一:从前端界面发起一笔支付请求,如果前端没有做防重处理,那么可能在某一个时刻会有二笔一样的单子同时到达系统后台. 场景二:在App中下订单的时候,点击确认之后,没反应,就又点击了几次.在这种情况下,如果无法保证该接口的幂

Android开发之手把手教你写ButterKnife框架(三)

欢迎转载,转载请标明出处: http://blog.csdn.net/johnny901114/article/details/52672188 本文出自:[余志强的博客] 一.概述 上一篇博客讲了,如何在android studio使用apt < Android开发之手把手教你写ButterKnife框架(二)> 然后在Processor里生成自己的代码,把要输出的类,通过StringBuilder拼接字符串,然后输出. try { // write the file JavaFileObj

写一个框架的详细步骤

定位 所谓定位就是回答几个问题,我出于什么目的要写一个框架,我的这个框架是干什么的,有什么特性适用于什么场景,我的这个框架的用户对象是谁,他们会怎么使用,框架由谁维护将来怎么发展等等. 如果你打算写框架,那么肯定心里已经有一个初步的定位,比如它是一个缓存框架.Web MVC框架.IOC框架.ORM/数据访问框架.RPC框架或是一个用于Web开发的全栈式框架. 是 否要重复造轮子?除非是练手项目,一般我们是有了解决不了问题的时候才会考虑不使用既有的成熟的框架而重复造轮子的,这个时候需要列出新框架主

如何写一个框架

定位 所谓定位就是回答几个问题,我出于什么目的要写一个框架,我的这个框架是干什么的,有什么特性适用于什么场景,我的这个框架的用户对象是谁,他们会怎么使用,框架由谁维护将来怎么发展等等. 如果你打算写框架,那么肯定心里已经有一个初步的定位,比如它是一个缓存框架.Web MVC框架.IOC框架.ORM/数据访问框架.RPC框架或是一个用于Web开发的全栈式框架. 是否要重复造轮子?除非是练手项目,一般我们是有了解决不了问题的时候才会考虑不使用既有的成熟的框架而重复造轮子的,这个时候需要列出新框架主要