【这里是的实现,指的是针对各个数据访问框架的一个基础实现】
目标
- 定义仓储/QueryEntry的基本功能
- 实现仓储的基本功能,以利于复用
- 实现一些常用的功能
- 提供一些便利的功能
目标框架
博主使用的ORM框架是EF6.x,使用MAP来配置模型和数据库之间的映射(因为模型是定义在领域层[CQRS]的),所以不打算使用声明式的Attribute。使用code first来生成数据库。
仓储基本功能
使用一个泛型接口定义了一个仓储需要实现的功能:
public interface IBasicReponsitory<T> { void Insert(T item); void Delete(T item); void Delete(Guid aggregateId); void Update(T category); T Fetch(Guid aggregateId); T TryFetch(Guid aggregateId); bool Exists(Expression<Func<T, bool>> predict); /*以下是额外的一些接口方法,待商榷*/ IQueryable<T> Query(); Task<T> FetchAsync(Guid id); Task<T> TryFetchAsync(Guid id); Task<IEnumerable<T>> RetriveAsync(Expression<Func<T, bool>> predict); }
以及一个QueryEntry需要实现的一些基本功能:
public interface IQueryEntry<T> where T : IHasPrimaryKey { T TryFetch(Guid id); Task<T> TryFetchAsync(Guid id); bool Exsits(Guid id); bool Exsits(Expression<Func<T, bool>> selector); }
随着时间的推移,这个接口会发生变更,添加一些更多的功能。同时,并不要求所有的仓储或者QueryEntry继承接口,基本接口的定义和实现仅仅是为了提供便利。
为了方便QueryEntry的实现,提供了一个抽象类:
public abstract class ReponsitoryBasedQueryEntry<T> : IQueryEntry<T> where T : IHasPrimaryKey { public abstract IBasicReponsitory<T> BasicReponsitory { get; } public T TryFetch(Guid id) { return BasicReponsitory.TryFetch(id); } public Task<T> TryFetchAsync(Guid id) { return BasicReponsitory.TryFetchAsync(id); } public bool Exsits(Guid id) { return BasicReponsitory.Query().Any(i => i.Id == id); } public bool Exsits(System.Linq.Expressions.Expression<Func<T, bool>> selector) { return BasicReponsitory.Query().Any(selector); } }
基本实现
public class BasicEntityFrameworkReponsitory<T> : IBasicReponsitory<T> where T : class, IHasPrimaryKey { public BasicEntityFrameworkReponsitory() { Table = StorageConfiguration.DbContext.Set<T>(); } public DbSet<T> Table { get; private set; } public virtual void Insert(T item) { Table.Add(item); } public virtual void Delete(T item) { Table.Remove(item); } public virtual void Delete(Guid aggregateId) { var item = TryFetch(aggregateId); Delete(item); } public virtual void Update(T category) { //do nothing... } public T Fetch(Guid aggregateId) { var item = TryFetch(aggregateId); if (item == null) { throw new AggregateRootNotFoundException(aggregateId); } return item; } public T TryFetch(Guid aggregateId) { var item = Query().FirstOrDefault(i => i.Id == aggregateId); return item; } public virtual IQueryable<T> Query() { return Table; } public async Task<T> FetchAsync(Guid id) { return await Table.FirstAsync(i => i.Id == id); } public async Task<T> TryFetchAsync(Guid id) { return await Table.FirstOrDefaultAsync(i => i.Id == id); } public bool Exists(Expression<Func<T, bool>> predict) { return Table.Any(predict); } public async Task<IEnumerable<T>> RetriveAsync(Expression<Func<T, bool>> predict) { return await Table.Where(predict).ToArrayAsync(); } }
这部分代码表达了个人的几个想法:
1.DbContext的生命周期是由Storage自行管理的。当然,可以通过一定的方式指定。
2.提供了基础的Query()方法,并设置为虚方法。个人并不抵制使用IQueryable对象进行查询。我觉得可以把使用IQueryable对象进行查询的代码片段看作匿名方法。
常用的功能:软删除
这里是继承基本实现的一个实现:
public class SoftDeleteEntityFrameworkReponsitory<T> : BasicEntityFrameworkReponsitory<T> where T : class, IHasPrimaryKey, ISoftDelete { public override IQueryable<T> Query() { return base.Query().Where(i => !i.IsDeleted); } public override void Delete(T item) { item.IsDeleted = true; Update(item); } }
这里要求仓储对应的模型实现接口ISoftDelete,为软删除提供支持:
public interface ISoftDelete { bool IsDeleted { get; set; } }
同时override了Query()方法,过滤了已删除的内容。
常用的功能:操作跟踪
好吧,这应该是事件溯源干的事,然而事件溯源目前太难了。原理和软删除差不多:
/// <summary> /// 既然开启了跟踪,那么这条数据必然是不能硬删除的 /// </summary> /// <typeparam name="T"></typeparam> public class TraceEnabledEntityFrameworkReponsitory<T> : SoftDeleteEntityFrameworkReponsitory<T> where T : class, ISoftDelete, ITrackLastModifying, IHasPrimaryKey { /// <summary> /// 开启跟踪时,不允许匿名操作 /// </summary> [Dependency] public IDpfbSession Session { get; set; } public override void Update(T item) { if (!Session.UserId.HasValue) throw new Exception(); //todo 提供一个明确的异常 item.LastModifiedBy = Session.UserId.Value; item.LastModifiedTime = DateTime.Now; } public override void Insert(T item) { if (!Session.UserId.HasValue) throw new Exception(); //todo 提供一个明确的异常 item.LastModifiedBy = Session.UserId.Value; item.LastModifiedTime = DateTime.Now; base.Insert(item); } }
不过这个功能的侵入性很强,Storage应该无法感知“用户”这种概念才对。
便利的功能:动态仓储(DynamicReponsitory)
前一篇文章中说过,引入QueryEntry是为了将查询和提交分来,同时为查询操作提供更大的优化空间。在面对数据库的查询中,多表联查是非常普遍的。所以打算针对多表联查提供一个遍历的组件。同时,直接提交语句查询是和数据库相关的,所以要针对不同的数据库提供不同的DynamicReponsitory。
这个组件解决的问题是:直接提交数据库多表联查,查询结果自动转换模型,提供分页支持。
模型转换
先来解决这个比较有趣的问题:将一个DataReader转换为一个值或者一个可枚举的集合。直接上实现代码:
public class DataReaderTransfer<T> : CacheBlock<string, Func<IDataReader, T>> where T : new() { protected DataReaderTransfer() { } /// <summary> /// /// </summary> /// <param name="filedsNameArray"></param> /// <param name="key">编译缓存所使用的key,建议使用查询字符串的hash</param> /// <returns></returns> public Func<IDataReader, T> Compile(string[] filedsNameArray, string key) { var outType = typeof (T); var func = ConcurrentDic.GetOrAdd(key, k => { var expressions = new List<Expression>(); //public T xxx(IDataReader reader){ var param = Expression.Parameter(typeof (IDataReader)); //var instance = new T(); var newExp = Expression.New(outType); var varExp = Expression.Variable(outType, "instance"); var varAssExp = Expression.Assign(varExp, newExp); expressions.Add(varAssExp); var indexProp = typeof (IDataRecord).GetProperties().Last(p => p.Name == "Item"); //表示 reader[""] foreach (var fieldName in filedsNameArray) { //if(xxx)xxx.xxx=null;else xxx.xxx = (xxx)value; var prop = outType.GetProperty(fieldName); if (prop == null) continue; var propExp = Expression.PropertyOrField(varExp, fieldName); Expression value = Expression.MakeIndex(param, indexProp, new Expression[] {Expression.Constant(fieldName)}); //处理空值 var defaultExp = Expression.Default(prop.PropertyType); var isDbNullExp = Expression.TypeIs(value, typeof (DBNull)); //处理枚举以及可空枚举 if (prop.PropertyType.IsEnum || prop.PropertyType.IsGenericType && prop.PropertyType.GetGenericArguments()[0].IsEnum) { value = Expression.Convert(value, typeof (int)); } var convertedExp = Expression.Convert(value, prop.PropertyType); //读取到dbnull的时候,使用一个默认值 var condExp = Expression.IfThenElse(isDbNullExp, Expression.Assign(propExp, defaultExp), Expression.Assign(propExp, convertedExp)); expressions.Add(condExp); } //return instance; var retarget = Expression.Label(outType); var returnExp = Expression.Return(retarget, varExp); expressions.Add(returnExp); //} var relabel = Expression.Label(retarget, Expression.Default(outType)); expressions.Add(relabel); var blockExp = Expression.Block(new[] {varExp}, expressions); var expression = Expression.Lambda<Func<IDataReader, T>>(blockExp, param); return expression.Compile(); }); return func; } public Func<IDataReader, T> Compile(IDataReader reader, string key) { var length = reader.FieldCount; var names = Enumerable.Range(1, length).Select(i => reader.GetName(i - 1)).ToArray(); return Compile(names, key); } public static DataReaderTransfer<T> Instance = new DataReaderTransfer<T>(); //基于反射的映射.... //private static T DynamicMap<T>(IDataReader reader) where T : new() //{ // var instance = new T(); // var count = reader.FieldCount; // while (count-- > 1) // { // object value = reader[count - 1]; // var name = reader.GetName(count - 1); // var prop = typeof (T).GetProperty(name); // if (prop == null) // { // continue; // } // if (value is DBNull) // { // value = null; // } // prop.SetValue(instance, value); // } // return instance; //} }
主要的思想是:在运行期间对一个特定的模型分析一次,分析构造这个模型需要如何访问DataReader,并将访问操作编译为Func<>,通过一个静态字典缓存。下一次构造的时候,直接访问静态字典的Func<>,将DataReader的行转换为模型。这个耗时,大概是硬编码转换的2倍,可以获得比反射好的性能受益。
链式调用以及延时查询
先来看一段调用代码:
[TestClass] public class DynamicReponsitorySamples { static DynamicReponsitorySamples() { //DbContext 配置 StorageConfiguration.Config.Use<DbContext, ObjectManageContext>(new ContainerControlledLifetimeManager()); //无法使用.UseDbContext<ObjectManageContext>(),因为无法提供基于HTTP生命周期的管理对象 DynamicReponsitory = new DynamicReponsitory(); } public static DynamicReponsitory DynamicReponsitory { get; set; } [TestMethod] public void Query() { //直接提交一个SQL查询,并映射到实体 var queryText = "SELECT A.*,D.Name AS DepartmentName FROM [ADMIN] A LEFT JOIN [Department] D ON A.DepartmentCode = D.Code"; var query = DynamicReponsitory.Query<AdminListItem>(queryText); //QueryResult对象遵循延时查询的规则,直到执行枚举才会执行查询操作 query.Foreach(i => Trace.WriteLine(string.Format("{0}\t{1}", i.UserName, i.RealName))); } [TestMethod] public void Count() { //可以直接执行一个COUNT(*)语句 var countQueryText = "SELECT COUNT(*) FROM [ADMIN]"; var countQuery = DynamicReponsitory.Count(countQueryText); Trace.WriteLine("Count:" + countQuery.Value); //可以提供一个SELECT * 语句 countQueryText = "SELECT * FROM [ADMIN]"; //但是需要将重载的第二个参数置为true countQuery = DynamicReponsitory.Count(countQueryText, true); Trace.WriteLine("Count:" + countQuery.Value); //可以对一个query对象执行CmountAmount()扩展方法,但是这个query对象代表的查询必须很普通 var query = DynamicReponsitory.Query<AdminListItem>( "SELECT A.*,D.Name AS DepartmentName FROM [ADMIN] A LEFT JOIN [Department] D ON A.DepartmentCode = D.Code"); countQuery = query.CountAmount(); //Value的值同样遵循延时查询的规则,但是重复访问会导致访问内存中缓存的数据 Trace.WriteLine("Count:" + countQuery.Value); Trace.WriteLine("Count:" + countQuery.Value); //如果需要重新查询,可以调用Result.ReQuery()方法 var reQuery = countQuery.ReQuery(); Trace.WriteLine("Count:" + reQuery.Value); } /// <summary> /// 分页调用,支持分页信息和分页列表信息的无序访问 /// </summary> [TestMethod] public void Page() { //可以对所有的query对象执行Page()扩展方法,从而进行分页 //必须执行要求OrderBy参数的重载,否则会进行内存分页(加载所有行) var query = DynamicReponsitory.Query<AdminListItem>( "SELECT A.*,D.Name AS DepartmentName FROM [ADMIN] A LEFT JOIN [Department] D ON A.DepartmentCode = D.Code"); var paged = query.Page("ORDER BY DepartmentName", 1, 2); /* * 以下表示支持分页信息和分页列表信息的无序访问 * 如果使用一条sql同时返回这些信息,必须先枚举集合才能继续访问分页信息 */ Trace.WriteLine(string.Format("从{0}行到{1}行,在所有的{2}行中", paged.From, paged.To, paged.Amount)); paged.Foreach(i => Trace.WriteLine(string.Format("{0}\t{1}", i.UserName, i.RealName))); //重复访问会导致访问内存中缓存的数据 var resultArray = paged.Take(1); Trace.WriteLine(string.Format("从{0}行到{1}行,在所有的{2}行中", paged.From, paged.To, paged.Amount)); resultArray.Foreach(i => Trace.WriteLine(string.Format("{0}\t{1}", i.UserName, i.RealName))); } }
链式调用是指,我调用了DynamicReponsitory.Query()方法之后,可以紧接着调用Page()或者Count()方法。那么,显而易见,如果查询不是延时的,很容易导致这个问题:我把服务器上1W条数据全down下来了,然后在内存里面数数或者分页。
为了实现延时查询的目标,引入了这几个类型:
public class SqlQueryExpression : ICloneable { public SqlQueryExpression() { Parameters = new List<object>(); } public SqlQueryExpression(string expressionText) : this() { ExpressionText = expressionText; } public string ExpressionText { get; set; } public IList<object> Parameters { get; private set; } public IDataReader Read(DbConnection connection) { var parameters = Parameters.ToArray(); if (connection.State != ConnectionState.Open) connection.Open(); //查询,开启最低级别的事务隔离,防止默认事务产生争用锁 var trans = connection.BeginTransaction(IsolationLevel.ReadUncommitted); var command = connection.CreateCommand(); command.CommandType = CommandType.Text; command.CommandText = ExpressionText; command.Parameters.AddRange(parameters); command.Transaction = trans; return command.ExecuteReader(CommandBehavior.CloseConnection); } public virtual object Clone() { //实现拷贝接口 var cloned = new SqlQueryExpression(ExpressionText); Parameters.Foreach(i => { var parameter = (SqlParameter) i; var clonedParameter = new SqlParameter(parameter.ParameterName, parameter.Value); clonedParameter.Direction = parameter.Direction; cloned.Parameters.Add(clonedParameter); }); return cloned; } }
public class SqlQueryResult { public SqlQueryExpression SqlQueryExpression { get; private set; } public DbConnection DbConnection { get; private set; } public virtual bool Enumerated { get; protected set; } protected IDataReader DataReader; protected void Query() { DataReader = DataReader ?? SqlQueryExpression.Read(DbConnection); } public SqlQueryResult(SqlQueryExpression expression, DbConnection connection) { SqlQueryExpression = expression; DbConnection = connection; } } /// <summary> /// 代表DynamicReponsitory的查询结果 /// </summary> /// <typeparam name="T">代表需要构造的类型</typeparam> public class SqlQueryResult<T> : SqlQueryResult, IEnumerable<T> where T : new() { public SqlQueryResult(SqlQueryExpression expression, DbConnection connection) : base(expression, connection) { } public IEnumerator<T> GetEnumerator() { //对于一个Query对象,在第一次访问的时候,要求加载所有数据,防止Skip与Take导致数据丢失 if (!Enumerated) { Query(); using (DataReader) { Enumerated = true; var uniqueKey = typeof (T).FullName + SqlQueryExpression.ExpressionText; var func = DataReaderTransfer<T>.Instance.Compile(DataReader, uniqueKey); while (DataReader.Read()) { var item = func(DataReader); ResultSet.Add(item); //yield return item; } } } return ResultSet.GetEnumerator(); //return ((IEnumerable<T>) ResultSet).GetEnumerator(); } protected List<T> ResultSet = new List<T>(); IEnumerator IEnumerable.GetEnumerator() { return GetEnumerator(); } public SqlQueryResult<T> ReQuery() { var exp = SqlQueryExpression.Clone() as SqlQueryExpression; return new SqlQueryResult<T>(exp, DbConnection); } }
SqlQueryExpression存储了将要执行的查询,而SqlQueryResult则存储了查询返回的结果。同时,SqlQueryExpression实现了拷贝,以支持ReQuery()。
关于具体的分页支持,实际上是使用了一个开窗函数,通过注入子查询的方式,从而支持了各种查询的分页(不奇葩的查询)。
为了防止查询被锁住,默认开启了最低的事务隔离级别。
...
【想到什么再补充】