Orleans框架------基于Actor模型生成分布式Id

一、Actor简介

actor模型是一种并行计算的数学模型。 响应于收到的消息,演员可以:做出决定,创建更多Actor,发送更多消息,并确定如何响应接收到的下一条消息。 演员可以修改自己的状态,但只能通过消息相互影响(避免需要任何锁)。

actor是一个计算实体,当其收到消息时,可以并发执行如下操作:

1. 发送有限数量的消息给其他actor

2. 创建有限数量的新actor

3. 指定收到下一消息时的行为

在Orleans中使用的是虚拟Actor方式,详细:http://dotnet.github.io/orleans/Documentation/Introduction.html

详细参见: https://en.wikipedia.org/wiki/Actor_model

二、Orleans框架

Orleans是一个框架,可以直接构建分布式大规模计算应用程序,而无需学习和应用复杂的并发或其他缩放模式。 它是由Microsoft Research创建的,旨在用于云端。

Orleans已被Microsoft Azure广泛应用于微软的几个产品集团,其中最着名的是343个行业,作为所有Halo 4和Halo 5云服务的平台,以及越来越多的其他公司。

特性:

1、默认可扩展
           奥尔良处理构建分布式系统的复杂性,使您的应用程序能够扩展到数百台服务器。
    2、低延迟
           奥尔良允许您保持内存所需的状态,因此您的应用程序可以快速响应传入的请求。

3、简化并发

Orleans允许您编写简单的单线程C#代码,通过actor之间的异步消息传递来处理并发。

三、生成流水号项目实战

1、场景

现在系统基于分布式服务开发,数据在客户端处理后提交到服务端入库,但是由于多个系统间的并发而流水号全部在一张表,每次都是先select在update 高并发容易直接死锁。

如图:

2、基于Orleans的actor

将每条数据改造成一个Actor,由个Actor之间的状态来保证流水号的递增,这样即使单个流水号访问量大只要扩展Orleans的soli即可。

四、关键代码:

1、利用初始化SerialNumberStorgeProvider初始化管理Grain的状态

public class SerialNumberStorgeProvider : IStorageProvider
    {
        public Logger Log { get; set; }

        public string Name { get; set; }

        public Task ClearStateAsync(string grainType, GrainReference grainReference, IGrainState grainState)
        {
            return TaskDone.Done;
        }

        public Task Close()
        {
            return TaskDone.Done;
        }

        public Task Init(string name, IProviderRuntime providerRuntime, IProviderConfiguration config)
        {
            this.Name = nameof(SerialNumberStorgeProvider);
            this.Log = providerRuntime.GetLogger(this.Name);

            return TaskDone.Done;
        }

        public Task ReadStateAsync(string grainType, GrainReference grainReference, IGrainState grainState)
        {
            Console.WriteLine("获取种子信息");
            var SerialNumber = grainReference.GetPrimaryKeyString();
            using (var db = new DBContext())
            {
                var query = db.SerialNumbers.AsNoTracking().FirstOrDefault(o => o.Name.Equals(SerialNumber));
                if (query != null)
                    grainState.State = query;
                else
                {
                    db.SerialNumbers.Add(new SerialNumberInfo
                    {
                        Name = grainReference.GetPrimaryKeyString(),
                        Number = 1
                    });
                    db.SaveChanges();
                    grainState.State = new SerialNumberInfo
                    {
                        Name = grainReference.GetPrimaryKeyString(),
                        Number = 1
                    };
                }
            }
            return TaskDone.Done;
        }

        public async Task WriteStateAsync(string grainType, GrainReference grainReference, IGrainState grainState)
        {
            var model = grainState.State as SerialNumberInfo;
            using (var db = new DBContext())
            {
                var query = db.SerialNumbers.FirstOrDefault(o => o.Name.Equals(model.Name));
                query.Number = model.Number;
                await db.SaveChangesAsync();
            }
        }
    }
2、Grain获取流水号的实现  
[StorageProvider(ProviderName = "SerialNumberStorgeProvider")]
    public class SerialNumberGrain : Grain<SerialNumberInfo>, ISerialNumberGrain
    {
        /// <summary>
        /// 获取多个流水号
        /// </summary>
        /// <param name="number"></param>
        /// <returns></returns>
        public Task<List<long>> GetMutilSerialNumber(int number)
        {
            if (number == 0) { number = 1; }

            List<long> list = new List<long>();
            for (int i = 1; i <= number; i++)
            {
                this.State.Number += 1;
                list.Add(this.State.Number);
            }
            this.WriteStateAsync();
            return Task.FromResult(list);
        }

        /// <summary>
        /// 获取单个流水号
        /// </summary>
        /// <returns></returns>
        public Task<long> GetSerialNumber()
        {
            this.WriteStateAsync();
            return Task.FromResult(this.State.Number);
        }
    }

最后,最多说一句,在测试的时候发现如果不适用如下这种方式,并发时会发生Task调度异常

源码地址:https://github.com/liyang-live/MakeSerialNumber

参考资料:http://dotnet.github.io/orleans/index.html

http://www.cnblogs.com/joab/p/5657851.html

https://github.com/dotnet/orleans

时间: 2024-11-10 11:39:56

Orleans框架------基于Actor模型生成分布式Id的相关文章

分布式高并发下Actor模型

写在开始 一般来说有两种策略用来在并发线程中进行通信:共享数据和消息传递.使用共享数据方式的并发编程面临的最大的一个问题就是数据条件竞争.处理各种锁的问题是让人十分头痛的一件事. 传统多数流行的语言并发是基于多线程之间的共享内存,使用同步方法防止写争夺,Actors使用消息模型,每个Actor在同一时间处理最多一个消息,可以发送消息给其他Actor,保证了单独写原则.从而巧妙避免了多线程写争夺.和共享数据方式相比,消息传递机制最大的优点就是不会产生数据竞争状态.实现消息传递有两种常见的类型:基于

在.NET中实现Actor模型的不同方式

上周,<实现领域驱动设计>(Implementing Domain-Driven Design)一书的作者Vaughn Vernon,发布了Dotsero,这是一个使用C#编写的.基于.NET的Actor模型工具包,它的实现参考了Akka API.Akka工具包是对Actor模型的一种实现,目前为止已经有对应Java和Scala版本的API. 今年早些时候,微软Research部门也发布了一个基于Actor模型的框架,Orleans框架的预览版.这个框架采用了云端编程模型,编写这个框架的目的在

Actor 模型中的通信模式

在 Actor 模型中所有的 Actor 之间有且只有一种通信模式,那就是 tell 的方式,也就是 fire and forget 的方式.但是在实际的开发过程中工程师们逐渐总结出了一些常用的通信模式.本文以 akka-typed(2.6.0-M8) 框架为例,介绍存在于 actor 模型中最基本的一些通信模式,讨论消息是如何在 actor 之间流动的. Fire and Forget 我们首先看到最基础的模式 fire and forget 即发送后遗忘.这种模式是直观地尝试向一个 acto

一种基于Orleans的分布式Id生成方案

基于Orleans的分布式Id生成方案,因Orleans的单实例.单线程模型,让这种实现变的简单,贴出一种实现,欢迎大家提出意见 public interface ISequenceNoGenerator : Orleans.IGrainWithIntegerKey { Task<Immutable<string>> GetNext(); } public class SequenceNoGenerator : Orleans.Grain, ISequenceNoGenerator

基于Scala的Actor之上的分布式并发消息驱动框架Akka初体验

学习了基于Scala的Actor之上的分布式并发消息驱动框架Akka初体验,应用actor模型,位置透明,做到高并发.可伸缩.容错.单机也可以用,水平扩展.垂直扩展.容错都有很好的表现,spark中的例子如下: private def initializeEventProcessActor(){ implicat val timeout=Timeout( 30 seconds) val initEventActorReply= dagSchedulerActorSupervisor ? Prop

Scala 深入浅出实战经典 第90讲:基于Scala的Actor之上的分布式并发消息驱动框架Akka初体验

akka提供了可伸缩的实时事务处理功能. akka基于actor,并提供了位置透明. 1GB的heap可以有2500000个actor. 水平扩展,垂直扩展,容错3个方面的解决方式. 树形结构的actor,每个actor都有状态和行为. DT大数据梦工厂微信公众账号:DT_Spark. DT大数据梦工厂的微信公众号是DT_Spark,每天都会有大数据实战视频发布,请您持续学习. 王家林DT大数据梦工厂scala的所有视频.PPT和代码在百度云盘的链接:http://pan.baidu.com/s

.NET的Actor模型:Orleans

Orleans是微软推出的类似Scala Akka的Actor模型,Orleans是一个建立在.NET之上的,设计的目标是为了方便程序员开发需要大规模扩展的云服务, 可用于实现DDD+EventSourcing/CQRS系统. 传统的三层体系结构包括无状态的前端,无状态的中间层和存储层在可伸缩性方面是有限制的,由于存储层在延迟和吞吐量方面的限制,这对于每个用户请求都有影响.通常办法是在中间层和存储层之间添加缓存层来提高性能.然而,缓存会失去了大部分的并发性和底层存储层的语义保证.为了防止缓存和存

9种分布式ID生成之 美团(Leaf)实战

整理了一些Java方面的架构.面试资料(微服务.集群.分布式.中间件等),有需要的小伙伴可以关注公众号[程序员内点事],无套路自行领取 更多优选 一口气说出 9种 分布式ID生成方式,面试官有点懵了 面试总被问分库分表怎么办?你可以这样怼他 3万字总结,Mysql优化之精髓 为了不复制粘贴,我被逼着学会了JAVA爬虫 技术部突然宣布:JAVA开发人员全部要会接口自动化测试框架 Redis 5种数据结构及对应使用场景,全会面试要加分的 引言 前几天写过一篇<一口气说出 9种 分布式ID生成方式,面

初解,Scala语言中基于Actor的并发编程的机制,并展示了在Spark中基于Scala语言的Actor而产生的消息驱动框架Akka的使用,

Scala深入浅出实战中级--进阶经典(第66讲:Scala并发编程实战初体验及其在Spark源码中应用解析)内容介绍和视频链接 2015-07-24 DT大数据梦工厂 从明天起,做一个勤奋的人 看视频.下视频,分享视频 DT大数据梦工厂-Scala深入浅出实战中级--进阶经典:第66讲:Scala并发编程实战初体验及其在Spark源码中的应用解析 本期视频通过代码实战详解了Java语言基于加锁的并发编程模型的弊端以及Scala语言中基于Actor的并发编程的机制,并展示了在Spark中基于Sc