akka源码分析

看akka源码的一些体会,没有列出源码来。akka代码主要包括两块:底层分发(akka.dispatch包)和上层模型(akka.actor包),从底层线程调度(dispatch)往上看起

函数式语言主要处理表达式求值,面向对象语言主要处理对象间消息发送消息。

1. 底层线程调度

Doug Lea: ForkJoinTask

ForkJoinTask是用少数线程执行海量独立任务的极好架构,这里的独立任务指的是任务和任务之间不要有共享数据,否则会有并发访问的问题。

ForkJoinTask的实现包括三个类:

ForkJoinPool: 实现了ExecutorService,提供execute、submit等线程池基本方法,池中的线程都是ForkJoinWorkerThread;

ForkJoinWorkerThread: 继承自Thread,包含了自己的ForkJoin任务队列,在处理完自己任务队列中任务的时候,可以从其他Worker的队列中

偷任务来执行;

ForkJoinTask: 实现了Future接口,可以直接作为ForkJoinPool.submit的返回值,提供的fork方法将自己放到当前Worker线程的任务队列中,

join方法让当前线程等待任务完成,或者通过偷过来等方式自己执行该任务

为了性能考虑,这三个类紧耦合,存在大量互相访问成员属性的情况,Doug Lea老先生说,这种比较ugly的实现,能让性能提高四倍,可以每秒

处理10亿级别的ForkJoin任务。

为了处理并发,大量使用了sun.misc.Unsafe类中提供的直接对内存的CAS(compare and swap)原子操作,为了解决可能的乱序执行

导致的问题,整个代码中都充斥着在if条件判断中对变量赋值的操作,感觉就是在看C代码。

ForkJoinTask是多核单进程版本的MapReduceJob。

2. 上层actor模型

Actor是用户态定义的类型,用户能够看到的Actor都是从这个类型来的。用户能看到的actor是trait akka.actor.Actor,这个只是actor对外的

一个门面,actor要访问actor系统内部的功能,基本上都要通过ActorContext来访问。

ActorCell是actor的内部表示,实现了ActorContext这个trait,所有的功能基本上都是在ActorCell提供的。ActorCell占用64字节。

ActorContext是从actor的角度看到的ActorCell的视图,提供了设置接收超时、自身引用、become/unbecome、获取sender引用、

获取children引用列表、获取MessageDispatcher、获取ActorSystem、获取parent引用、watch/unwatch一个actor的方法,

因ActorContext继承了ActorRefFactory,所以也有actorOf、actorFor等创建/获取actorRef的能力。

Actor/ActorCell和enipcore的Service/ServiceBase概念一模一样,都是一个是系统外面向用户的,一个是系统内进行调度的。

ActorRef是用户看到的对Actor的引用,任何对actor的访问,都是通过ActorRef来的。ActorRef提供了获取path、tell/forward消息的功能

实际上内部是使用一个InternalActorRef来表示ActorRef的,InternalActorRef继承自ActorRef,提供了Actor生命周期管理的接口。

LocalActorRef实现了InternalActorRef,是本节点中真正的actorRef实现,其中会创建并启动ActorCell。

ActorSystem在创建时,LocalActorRefProvider会创建rootGuardian(根actor),然后rootGuardian下会创建面向用户态actor的

guardian,这两个都是InternalActorRef,是通过直接new LocalActorRef创建出来的,这两个guardian的Actor类都是Guardian。

在actor内部创建子actor时,执行的是context.actorOf方法,context实际上就是ActorCell,ActorCell.actorOf调用了

LocalActorRefProvider.actorOf方法,直接new一个LocalActorRef出来,而新创建的LocalActorRef会创建ActorCell,并调用其

start方法,ActorCell.start方法中,将创建mailbox,并向mailbox中发送一个Create系统消息,然后让dispatcher开始调度mailbox

执行ActorSystem.actorOf方法创建actor时,实际上向guardian这个Actor发送CreateChild消息,让它创建一个actor。guardian在

收到CreateChild消息时,调用context.actorOf方法创建新actor,这个就与在actor内部创建子actor的做法一样了。

3. Actor模型和线程模型如何结合

MessageQueue实现了入队列enqueue(receiver:ActorRef, handle: Envelope),出队列dequeue():Envelope

SystemMessageQueue提供了systemEnqueue(receiver:ActorRef, message: SystemMessage),全部出队列systemDrain():SystemMessage方法。

其中,Envelope封装了message:Any和sender:ActorRef两个成员,而SystemMessage实际上是一个LinkedList,包含了所有的系统消息。

MailBox继承自系统消息队列SystemMessageQueue,实现了Runnable接口,同时包含了一个ActorCell成员,一个MessageQueue成员

MailBox代理了MessageQueue的所有方法。MessageQueue的具体类型,根据MailBoxType的不同而不同,比如UnboundedMailbox将创建ConcurrentLinkedQueue

Dispatchers根据ID生成Dispatcher,ActorSystem中有一个默认的dispatcher,dispatcher底层有executor,executor有两种ForkJoinExecutor和

ThreadPoolExecutor,默认是ForkJoinExecutor。

另外,scala中的val都是在对象初始化时就执行的

3.1 在创建ActorSystem时,初始化默认的dispatcher,使用默认的ForkJoinPool(ExecutorService)

3.2 在使用actorRef ! Message发送消息时,调用了actorRef对应的actorCell.tell方法,其中调用了dispatcher.dispatch方法

dispatch(akka/dispatch/Dispather.scala)中做了两件事:

一是将消息放到actorCell的消息队列中(mbox.enqueue(receiver.self, invocation))

二是调用dispather底层的线程池executor.execute(mbox)(registerForExecution(mbox, true, false))执行mbox.run()方法

而mbox.run()中,将先从SystemMessage链表中处理系统消息,然后从MessageQueue成员中处理用户消息。处理系统消息时,

调用actorCell.systemInvoke方法,将所有的系统消息顺序全部处理完;处理用户消息时,调用actorCell.invoke方法,根据dispatcher

的throughput决定本次处理多少条消息,根据dispatcher的throughputDeadlineTime决定本次处理多长时间,时间长度在处理

完一条消息后检查一次。

对于ForkJoinPool这种executor,每次执行execute(mbox)时,实际上都是先创建一个继承自ForkJoinTask的MailboxExecutionTask,

其中的exec方法调用mbox.run方法,因此每次执行都会创建一个ForkJoinTask对象。

还有一点,消息队列都是放到actor对应的mailbox中(以Envelope的形式封装消息本身和sender),而执行的task对象会放到Executor的

每个线程对应的工作队列中,task和消息分别使用不同的队列。

4. 定时处理

actorSystem在初始化时,会创建scheduler。scheduler内部维护HashedWheelTimer定时器,schedular提供schedule、scheduleOnce等方法,

可以在指定时间之后执行一个task,或者向某个actor发送一个消息。执行task时,使用system.dispatcher执行。

schedule主要在状态机FSM、actor.receive接收超时中使用。actor.receive中使用时,首先实现actor.preStart方法,其中调用setReceiveTimeout设置超时时间,在每个receive方法中,需要能够处理ReceiveTimeout事件,如果需要再次超时时,需要再次设置超时事件。只有receive处理完了所有的事件并且设置了超时事件后,超时才会被再次设置

内部实现上,actorCell通过调用checkReceiveTimeout方法调用系统scheduler设置一个一次性的超时事件。在actorCell处理Create系统消息时,创建了actor后,首先调用其actor.preStart方法,然后执行checkReceiveTimeout判断是否设置超时。

5. FSM的实现

akka提供了FSM的实现,该实现基于actor模型,提供了状态与状态数据定义、超时等一系列状态机相关的模型和方法

6. akka如何与耗时系统进行交互,即akka如何与外部系统进行适配(待续)

7. 在play中的应用(待续)

总结:

akka中重点的类都在akka.actor和akka.dispatch两个包中。前者提供了actor模型的抽象和语义,后者提供了底层执行机制。

ActorSystem是系统的控制中心,这里汇聚了用于线程调度的dispatcher,用于定时处理的scheduler,用于创建actor的provider。

dispatcher提供了dispatch/dispatchSystem/execute等多种执行轻量级任务的方法

akka中,还有监控(supervise)、Promise/Future、与外部系统交互、Patterns、路由还没有看,暂时不看了。

时间: 2024-10-13 12:07:07

akka源码分析的相关文章

Akka源码分析-Cluster-ActorSystem

前面几篇博客,我们依次介绍了local和remote的一些内容,其实再分析cluster就会简单很多,后面关于cluster的源码分析,能够省略的地方,就不再贴源码而是一句话带过了,如果有不理解的地方,希望多翻翻之前的博客. 在使用cluster时,配置文件中的akka.actor.provider值是cluster,所以ActorSystem对应的provider就是akka.cluster.ClusterActorRefProvider. /** * INTERNAL API * * The

Akka源码分析-Cluster-Distributed Publish Subscribe in Cluster

在ClusterClient源码分析中,我们知道,他是依托于"Distributed Publish Subscribe in Cluster"来实现消息的转发的,那本文就来分析一下Pub/Sub是如何实现的. 还记得之前分析Cluster源码的文章吗?其实Cluster只是把集群内各个节点的信息通过gossip协议公布出来,并把节点的信息分发出来.但各个actor的地址还是需要开发者自行获取或设计的,比如我要跟worker通信,那就需要知道这个actor在哪个节点,通过actorPa

Akka源码分析-Actor创建

上一篇博客我们介绍了ActorSystem的创建过程,下面我们就研究一下actor的创建过程. val system = ActorSystem("firstActorSystem",ConfigFactory.load()) val helloActor= system.actorOf(Props(new HelloActor),"HelloActor") helloActor ! "Hello" 普通情况下,我们一般使用ActorSystem

Akka源码分析-Actor&ActorContext&ActorRef&ActorCell

分析源码的过程中我们发现,Akka出现了Actor.ActorRef.ActorCell.ActorContext等几个相似的概念,它们之间究竟有什么区别和联系呢? /** * Actor base trait that should be extended by or mixed to create an Actor with the semantics of the 'Actor Model': * <a href="http://en.wikipedia.org/wiki/Actor

Akka源码分析-Remote-发消息

上一篇博客我们介绍了remote模式下Actor的创建,其实与local的创建并没有太大区别,一般情况下还是使用LocalActorRef创建了Actor.那么发消息是否意味着也是相同的呢? 既然actorOf还是委托给了LocalActorRef,那么在本地创建的Actor发消息还是跟以前一样的,那么如果如何给远程的Actor发消息呢?我们一般是通过actorSelection或者给远程Actor发送一个Identify消息,来接收对应的ActorRef,然后再发消息.我们来分析一下这两者的区

Akka源码分析-Remote-收消息

上一遍博客中,我们分析了网络链接建立的过程,一旦建立就可以正常的收发消息了.发送消息的细节不再分析,因为对于本地的actor来说这个过程相对简单,它只是创立链接然后给指定的netty网路服务发送消息就好了.接收消息就比较麻烦了,因为这对于actor来说是透明的,netty收到消息后如何把消息分发给指定的actor呢?这个分发的过程值得研究研究. 之前分析过,在监听创立的过程中,有一个对象非常关键:TcpServerHandler.它负责链接建立.消息收发等功能.TcpServerHandler继

Akka源码分析-CircuitBreaker(熔断器)

熔断器,在很多技术栈中都会出现的一种技术.它是在分布式系统中提供一个稳定的阻止嵌套失败的机制. 该怎么理解呢?简单来说,在分布式环境中,如果某个计算节点出现问题,很容易出现失败的逆向传到或整个系统的雪崩.什么意思呢?比如某个服务按照顺序依次调用了其他的三个服务,分别为A/B/C.如果B服务由于某种原因,响应变慢了,本来100毫秒就完成了,现在是1秒.此时A就会等待B服务的时间也就变成了1秒,那么就意味着会有很多的A服务调用在等待,如果并发量非常大,很容易就会造成A服务所在的节点出现问题,也就是说

spark core源码分析6 Spark job的提交

本节主要讲解SparkContext的逻辑 首先看一个spark自带的最简单的例子: object SparkPi { def main(args: Array[String]) { val conf = new SparkConf().setAppName("Spark Pi") val spark = new SparkContext(conf) val slices = if (args.length > 0) args(0).toInt else 2 val n = ma

Spark源码分析之八:Task运行(二)

在<Spark源码分析之七:Task运行(一)>一文中,我们详细叙述了Task运行的整体流程,最终Task被传输到Executor上,启动一个对应的TaskRunner线程,并且在线程池中被调度执行.继而,我们对TaskRunner的run()方法进行了详细的分析,总结出了其内Task执行的三个主要步骤: Step1:Task及其运行时需要的辅助对象构造,主要包括: 1.当前线程设置上下文类加载器: 2.获取序列化器ser: 3.更新任务状态TaskState: 4.计算垃圾回收时间: 5.反