一、Actor简介
actor模型是一种并行计算的数学模型。 响应于收到的消息,演员可以:做出决定,创建更多Actor,发送更多消息,并确定如何响应接收到的下一条消息。 演员可以修改自己的状态,但只能通过消息相互影响(避免需要任何锁)。
actor是一个计算实体,当其收到消息时,可以并发执行如下操作:
1. 发送有限数量的消息给其他actor
2. 创建有限数量的新actor
3. 指定收到下一消息时的行为
在Orleans中使用的是虚拟Actor方式,详细:http://dotnet.github.io/orleans/Documentation/Introduction.html
详细参见: https://en.wikipedia.org/wiki/Actor_model
二、Orleans框架
Orleans是一个框架,可以直接构建分布式大规模计算应用程序,而无需学习和应用复杂的并发或其他缩放模式。 它是由Microsoft Research创建的,旨在用于云端。
Orleans已被Microsoft Azure广泛应用于微软的几个产品集团,其中最着名的是343个行业,作为所有Halo 4和Halo 5云服务的平台,以及越来越多的其他公司。
特性:
1、默认可扩展
奥尔良处理构建分布式系统的复杂性,使您的应用程序能够扩展到数百台服务器。
2、低延迟
奥尔良允许您保持内存所需的状态,因此您的应用程序可以快速响应传入的请求。
3、简化并发
Orleans允许您编写简单的单线程C#代码,通过actor之间的异步消息传递来处理并发。
三、生成流水号项目实战
1、场景
现在系统基于分布式服务开发,数据在客户端处理后提交到服务端入库,但是由于多个系统间的并发而流水号全部在一张表,每次都是先select在update 高并发容易直接死锁。
如图:
2、基于Orleans的actor
将每条数据改造成一个Actor,由个Actor之间的状态来保证流水号的递增,这样即使单个流水号访问量大只要扩展Orleans的soli即可。
四、关键代码:
1、利用初始化SerialNumberStorgeProvider初始化管理Grain的状态
public class SerialNumberStorgeProvider : IStorageProvider { public Logger Log { get; set; } public string Name { get; set; } public Task ClearStateAsync(string grainType, GrainReference grainReference, IGrainState grainState) { return TaskDone.Done; } public Task Close() { return TaskDone.Done; } public Task Init(string name, IProviderRuntime providerRuntime, IProviderConfiguration config) { this.Name = nameof(SerialNumberStorgeProvider); this.Log = providerRuntime.GetLogger(this.Name); return TaskDone.Done; } public Task ReadStateAsync(string grainType, GrainReference grainReference, IGrainState grainState) { Console.WriteLine("获取种子信息"); var SerialNumber = grainReference.GetPrimaryKeyString(); using (var db = new DBContext()) { var query = db.SerialNumbers.AsNoTracking().FirstOrDefault(o => o.Name.Equals(SerialNumber)); if (query != null) grainState.State = query; else { db.SerialNumbers.Add(new SerialNumberInfo { Name = grainReference.GetPrimaryKeyString(), Number = 1 }); db.SaveChanges(); grainState.State = new SerialNumberInfo { Name = grainReference.GetPrimaryKeyString(), Number = 1 }; } } return TaskDone.Done; } public async Task WriteStateAsync(string grainType, GrainReference grainReference, IGrainState grainState) { var model = grainState.State as SerialNumberInfo; using (var db = new DBContext()) { var query = db.SerialNumbers.FirstOrDefault(o => o.Name.Equals(model.Name)); query.Number = model.Number; await db.SaveChangesAsync(); } } }
2、Grain获取流水号的实现
[StorageProvider(ProviderName = "SerialNumberStorgeProvider")] public class SerialNumberGrain : Grain<SerialNumberInfo>, ISerialNumberGrain { /// <summary> /// 获取多个流水号 /// </summary> /// <param name="number"></param> /// <returns></returns> public Task<List<long>> GetMutilSerialNumber(int number) { if (number == 0) { number = 1; } List<long> list = new List<long>(); for (int i = 1; i <= number; i++) { this.State.Number += 1; list.Add(this.State.Number); } this.WriteStateAsync(); return Task.FromResult(list); } /// <summary> /// 获取单个流水号 /// </summary> /// <returns></returns> public Task<long> GetSerialNumber() { this.WriteStateAsync(); return Task.FromResult(this.State.Number); } }
最后,最多说一句,在测试的时候发现如果不适用如下这种方式,并发时会发生Task调度异常
源码地址:https://github.com/liyang-live/MakeSerialNumber
参考资料:http://dotnet.github.io/orleans/index.html
http://www.cnblogs.com/joab/p/5657851.html
https://github.com/dotnet/orleans