Akka源码分析-Remote-发消息

  上一篇博客我们介绍了remote模式下Actor的创建,其实与local的创建并没有太大区别,一般情况下还是使用LocalActorRef创建了Actor。那么发消息是否意味着也是相同的呢?

  既然actorOf还是委托给了LocalActorRef,那么在本地创建的Actor发消息还是跟以前一样的,那么如果如何给远程的Actor发消息呢?我们一般是通过actorSelection或者给远程Actor发送一个Identify消息,来接收对应的ActorRef,然后再发消息。我们来分析一下这两者的区别。

  首先来看actorSelection,不管是用ActorSystem或者ActorContext的actorSelection方法,最终都是调用了ActorRefFactory对应的方法。

/**
   * Construct an [[akka.actor.ActorSelection]] from the given path, which is
   * parsed for wildcards (these are replaced by regular expressions
   * internally). No attempt is made to verify the existence of any part of
   * the supplied path, it is recommended to send a message and gather the
   * replies in order to resolve the matching set of actors.
   */
  def actorSelection(path: String): ActorSelection = path match {
    case RelativeActorPath(elems) ?
      if (elems.isEmpty) ActorSelection(provider.deadLetters, "")
      else if (elems.head.isEmpty) ActorSelection(provider.rootGuardian, elems.tail)
      else ActorSelection(lookupRoot, elems)
    case ActorPathExtractor(address, elems) ?
      ActorSelection(provider.rootGuardianAt(address), elems)
    case _ ?
      ActorSelection(provider.deadLetters, "")
  }

  我们发现它支持两种类型的path:RelativeActorPath、ActorPathExtractor。

/**
 * Extractor for so-called “relative actor paths” as in “relative URI”, not in
 * “relative to some actor”. Examples:
 *
 *  * "grand/child"
 *  * "/user/hello/world"
 */
object RelativeActorPath extends PathUtils {
  def unapply(addr: String): Option[immutable.Seq[String]] = {
    try {
      val uri = new URI(addr)
      if (uri.isAbsolute) None
      else Some(split(uri.getRawPath, uri.getRawFragment))
    } catch {
      case _: URISyntaxException ? None
    }
  }
}

  RelativeActorPath提取器比较简单,就是创建了一个URI对象,然后判断其是否为Absolute,如果是就返回None,如果不是就返回对应的elemes。对于远程Actor,我们一般会指定主机名、端口号,例如akka.tcp://[email protected]:2552/user/actorName,根据URI的定义,这个URI的schema是akka.tcp,很显然是Absolute,那就会返回None。

/**
 * Given an ActorPath it returns the Address and the path elements if the path is well-formed
 */
object ActorPathExtractor extends PathUtils {
  def unapply(addr: String): Option[(Address, immutable.Iterable[String])] =
    try {
      val uri = new URI(addr)
      uri.getRawPath match {
        case null ? None
        case path ? AddressFromURIString.unapply(uri).map((_, split(path, uri.getRawFragment).drop(1)))
      }
    } catch {
      case _: URISyntaxException ? None
    }
}

  ActorPathExtractor这个提取器的名称定义的是有问题的,既然actorSelection只支持两种类型的路径选择:本地和远程。第一个解析器定义成相对路径,那么后面一个就直接是绝对路径好了啊,为啥用ActorPathExtractor这样蹩脚的命名?难道本地模式下,就不是ActorPath提取器了?我们来看看对于akka.tcp://[email protected]:2552/user/actorName提取出了什么。经调试,address是akka.tcp://[email protected]:2552,elems就是后面的user、actorName了。

  也就是说remote模式下,如果有host、prot等信息就会返回ActorSelection(provider.rootGuardianAt(address), elems)这个类。不过好像无论哪种情况都返回这个类,好尴尬啊,但传入的第一个参数是不同的:provider.rootGuardianAt(address)。也就是说actorSelection这个函数是不区分当前的模式的,只要含有host/port就会传入provider.rootGuardianAt(address),否则就传入provider.rootGuardian。如果在local模式下,也强制用actorSelection查找远程Actor会发生什么呢?我们来看看LocalActorRefProvider。

  override def rootGuardianAt(address: Address): ActorRef =
    if (address == rootPath.address) rootGuardian
    else deadLetters

  local模式下,如果待查询actor的地址就是本地地址,则直接在本地返回查找;否则就返回deadLetters。其实是无法查找远程actor的。那么RemoteActorRefProvider呢?

def rootGuardianAt(address: Address): ActorRef = {
    if (hasAddress(address)) rootGuardian
    else try {
      new RemoteActorRef(transport, transport.localAddressForRemote(address),
        RootActorPath(address), Nobody, props = None, deploy = None)
    } catch {
      case NonFatal(e) ?
        log.error(e, "No root guardian at [{}]", address)
        new EmptyLocalActorRef(this, RootActorPath(address), eventStream)
    }
  }

  当然了,它也会判断一下本地地址是否包含待查询地址(防止多网卡或其他特殊情况),如果包含,则意味着是本地Actor交给rootGuardian;否则就创建RemoteActorRef。

  分析到这里我们知道了,其实在remote模式下,actorSelection返回了一个RemoteActorRef,还记得这个类的作用嘛?我们之前简单分析过,它其实是对远程Acotor的一个本地网络代理,也就是说所有通过actorSelection发送给远程actor的消息,都会经过他中转。

  我们继续分析ActorSelection的源码

/**
   * Construct an ActorSelection from the given string representing a path
   * relative to the given target. This operation has to create all the
   * matching magic, so it is preferable to cache its result if the
   * intention is to send messages frequently.
   */
  def apply(anchorRef: ActorRef, elements: Iterable[String]): ActorSelection = {
    val compiled: immutable.IndexedSeq[SelectionPathElement] = elements.collect({
      case x if !x.isEmpty ?
        if ((x.indexOf(‘?‘) != -1) || (x.indexOf(‘*‘) != -1)) SelectChildPattern(x)
        else if (x == "..") SelectParent
        else SelectChildName(x)
    })(scala.collection.breakOut)
    new ActorSelection with ScalaActorSelection {
      override val anchor = anchorRef
      override val path = compiled
    }
  }

  很显然这里的anchorRef是上面创建的RemoteActorRef实例,其中ActorSelection的anchor(锚定)是anchorRef。至此,一个ActorSelection创建完毕。那么如何发消息呢?这就需要分析tell或者!方法了。

  def tell(msg: Any, sender: ActorRef): Unit =
    ActorSelection.deliverSelection(anchor.asInstanceOf[InternalActorRef], sender,
      ActorSelectionMessage(msg, path, wildcardFanOut = false))

  其实乍一看,我们应该明白,这就是在deliverSelection函数内部,把消息封装成ActorSelectionMessage发送给了anchor。

  该函数首先判断sel的elements是否为空,很显然不为空,进入rec函数。该函数比较复杂而且还是一个尾递归函数,但我们知道此处的ref就是RemoteActorRef,那么RemoteActorRef是不是一个ActorRefWithCell呢?

private[akka] class RemoteActorRef private[akka] (
  remote:                RemoteTransport,
  val localAddressToUse: Address,
  val path:              ActorPath,
  val getParent:         InternalActorRef,
  props:                 Option[Props],
  deploy:                Option[Deploy])
  extends InternalActorRef with RemoteRef

  那么rec就会走到case _的逻辑,也就是把消息转发给了前面创建的RemoteActorRef,我们来看看这个示例是如何实现tell的。

override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = {
    if (message == null) throw InvalidMessageException("Message is null")
    try remote.send(message, OptionVal(sender), this) catch handleException(message, sender)
  }

  RemoteActorRef这个类,通过remote把消息发送出去了,那么remote是什么呢?RemoteTransport是不是很熟悉?在ActorSystem启动的时候我们分析过这个对象,它是Remoting类的实例,Remoting里面send方法是怎样的呢?

override def send(message: Any, senderOption: OptionVal[ActorRef], recipient: RemoteActorRef): Unit = endpointManager match {
    case Some(manager) ? manager.tell(Send(message, senderOption, recipient), sender = senderOption getOrElse Actor.noSender)
    case None          ? throw new RemoteTransportExceptionNoStackTrace("Attempted to send remote message but Remoting is not running.", null)
  }

  它又把消息转发给了manager,而manager就是endpointManager。endpointManager是不是也比较眼熟呢?前面文章中我们也见到过,这是一个EndpointManager实例,而EndpointManager是一个Actor。请注意这里用Send又对message进行了封装。EndpointManager是如何对Send消息进行反应的呢?

case s @ Send(message, senderOption, recipientRef, _) ?
      val recipientAddress = recipientRef.path.address

      def createAndRegisterWritingEndpoint(): ActorRef = {
        endpoints.registerWritableEndpoint(
          recipientAddress,
          uid = None,
          createEndpoint(
            recipientAddress,
            recipientRef.localAddressToUse,
            transportMapping(recipientRef.localAddressToUse),
            settings,
            handleOption = None,
            writing = true))
      }

      endpoints.writableEndpointWithPolicyFor(recipientAddress) match {
        case Some(Pass(endpoint, _)) ?
          endpoint ! s
        case Some(Gated(timeOfRelease)) ?
          if (timeOfRelease.isOverdue()) createAndRegisterWritingEndpoint() ! s
          else extendedSystem.deadLetters ! s
        case Some(Quarantined(uid, _)) ?
          // timeOfRelease is only used for garbage collection reasons, therefore it is ignored here. We still have
          // the Quarantined tombstone and we know what UID we don‘t want to accept, so use it.
          createAndRegisterWritingEndpoint() ! s
        case None ?
          createAndRegisterWritingEndpoint() ! s

      }

  分析以上逻辑,简单来看,会先判断是不是存在一个endpoint,如果存在说明链接已经建立,可以直接发送,否则出于其他状态,就重新创建endpoint,然后把消息转发给该endpoint。

def registerWritableEndpoint(address: Address, uid: Option[Int], endpoint: ActorRef): ActorRef =
      addressToWritable.get(address) match {
        case Some(Pass(e, _)) ?
          throw new IllegalArgumentException(s"Attempting to overwrite existing endpoint [$e] with [$endpoint]")
        case _ ?
          // note that this overwrites Quarantine marker,
          // but that is ok since we keep the quarantined uid in addressToRefuseUid
          addressToWritable += address → Pass(endpoint, uid)
          writableToAddress += endpoint → address
          endpoint
      }

  registerWritableEndpoint没有太复杂的逻辑,就是查询addressToWritable这个HashMap,如果不存在则把对应的endpoint加入缓存,并返回endpoint。而endpoint是通过createEndpoint创建的。

private def createEndpoint(
    remoteAddress:    Address,
    localAddress:     Address,
    transport:        AkkaProtocolTransport,
    endpointSettings: RemoteSettings,
    handleOption:     Option[AkkaProtocolHandle],
    writing:          Boolean): ActorRef = {
    require(transportMapping contains localAddress, "Transport mapping is not defined for the address")
    // refuseUid is ignored for read-only endpoints since the UID of the remote system is already known and has passed
    // quarantine checks
    val refuseUid = endpoints.refuseUid(remoteAddress)

    if (writing) context.watch(context.actorOf(
      RARP(extendedSystem).configureDispatcher(ReliableDeliverySupervisor.props(
        handleOption,
        localAddress,
        remoteAddress,
        refuseUid,
        transport,
        endpointSettings,
        AkkaPduProtobufCodec,
        receiveBuffers)).withDeploy(Deploy.local),
      "reliableEndpointWriter-" + AddressUrlEncoder(remoteAddress) + "-" + endpointId.next()))
    else context.watch(context.actorOf(
      RARP(extendedSystem).configureDispatcher(EndpointWriter.props(
        handleOption,
        localAddress,
        remoteAddress,
        refuseUid,
        transport,
        endpointSettings,
        AkkaPduProtobufCodec,
        receiveBuffers,
        reliableDeliverySupervisor = None)).withDeploy(Deploy.local),
      "endpointWriter-" + AddressUrlEncoder(remoteAddress) + "-" + endpointId.next()))
  }

  createEndpoint最终创建了ReliableDeliverySupervisor这个Actor,也就是说RemoteActorRef最终又把消息发送给了ReliableDeliverySupervisor,ReliableDeliverySupervisor收到消息去调用handleSend方法。

  private def handleSend(send: Send): Unit =
    if (send.message.isInstanceOf[SystemMessage]) {
      val sequencedSend = send.copy(seqOpt = Some(nextSeq()))
      tryBuffer(sequencedSend)
      // If we have not confirmed the remote UID we cannot transfer the system message at this point just buffer it.
      // GotUid will kick resendAll() causing the messages to be properly written.
      // Flow control by not sending more when we already have many outstanding.
      if (uidConfirmed && resendBuffer.nonAcked.size <= settings.SysResendLimit)
        writer ! sequencedSend
    } else writer ! send

  除去特殊情况,用户发的普通消息又发送给了writer,艾玛我去,真是绕啊。writer是什么呢?

var writer: ActorRef = createWriter()
private def createWriter(): ActorRef = {
    context.watch(context.actorOf(RARP(context.system).configureDispatcher(EndpointWriter.props(
      handleOrActive = currentHandle,
      localAddress = localAddress,
      remoteAddress = remoteAddress,
      refuseUid,
      transport = transport,
      settings = settings,
      AkkaPduProtobufCodec,
      receiveBuffers = receiveBuffers,
      reliableDeliverySupervisor = Some(self))).withDeploy(Deploy.local), "endpointWriter"))
  }

  很显然这又是一个ACor!!!哎,继续查找EndpointWriter这个Actor喽

def receive = if (handle.isEmpty) initializing else writing
val writing: Receive = {
    case s: Send ?
      if (!writeSend(s)) {
        enqueueInBuffer(s)
        scheduleBackoffTimer()
        context.become(buffering)
      }

    // We are in Writing state, so buffer is empty, safe to stop here
    case FlushAndStop ?
      flushAndStop()

    case AckIdleCheckTimer if ackDeadline.isOverdue() ?
      trySendPureAck()
  }

  这个Actor会先判断是否已经初始化,这里就假设初始化吧,初始化之后就会进入writing这个偏函数,对send类型的消息,又调用了writeSend函数。

  这个函数简单来看,就是调用codec对消息进行序列化,然后创建了一个pdu,最终把pdu通过handle的write发送出去。handle又是什么呢?

var handle: Option[AkkaProtocolHandle] = handleOrActive
private[remote] class AkkaProtocolHandle(
  _localAddress:          Address,
  _remoteAddress:         Address,
  val readHandlerPromise: Promise[HandleEventListener],
  _wrappedHandle:         AssociationHandle,
  val handshakeInfo:      HandshakeInfo,
  private val stateActor: ActorRef,
  private val codec:      AkkaPduCodec)
  extends AbstractTransportAdapterHandle(_localAddress, _remoteAddress, _wrappedHandle, AkkaScheme) {

  override def write(payload: ByteString): Boolean = wrappedHandle.write(codec.constructPayload(payload))

  override def disassociate(): Unit = disassociate(Unknown)

  def disassociate(info: DisassociateInfo): Unit = stateActor ! DisassociateUnderlying(info)
}

  handle最终是一个AkkaProtocolHandle,这个对象我们不再具体分析,我们可以认为这是一个本地与远程地址链接的通道,通过这个通道就可以与远程actor发送消息了。

  分析到这个地方,actorSelection与远程通信的过程大概就梳理清楚了。为了方便理解,作者特意辛苦的画了一个流程图,以供参考。细心的读者一定会问,那我的消息通过handle发送出去了,对方怎么接收呢?接收之后怎么发送到指定actor的邮箱呢?这一点我们后面再分析。

原文地址:https://www.cnblogs.com/gabry/p/9377182.html

时间: 2024-08-29 08:26:12

Akka源码分析-Remote-发消息的相关文章

Akka源码分析-Cluster-ActorSystem

前面几篇博客,我们依次介绍了local和remote的一些内容,其实再分析cluster就会简单很多,后面关于cluster的源码分析,能够省略的地方,就不再贴源码而是一句话带过了,如果有不理解的地方,希望多翻翻之前的博客. 在使用cluster时,配置文件中的akka.actor.provider值是cluster,所以ActorSystem对应的provider就是akka.cluster.ClusterActorRefProvider. /** * INTERNAL API * * The

Akka源码分析-Cluster-Distributed Publish Subscribe in Cluster

在ClusterClient源码分析中,我们知道,他是依托于"Distributed Publish Subscribe in Cluster"来实现消息的转发的,那本文就来分析一下Pub/Sub是如何实现的. 还记得之前分析Cluster源码的文章吗?其实Cluster只是把集群内各个节点的信息通过gossip协议公布出来,并把节点的信息分发出来.但各个actor的地址还是需要开发者自行获取或设计的,比如我要跟worker通信,那就需要知道这个actor在哪个节点,通过actorPa

akka源码分析

看akka源码的一些体会,没有列出源码来.akka代码主要包括两块:底层分发(akka.dispatch包)和上层模型(akka.actor包),从底层线程调度(dispatch)往上看起 函数式语言主要处理表达式求值,面向对象语言主要处理对象间消息发送消息. 1. 底层线程调度 Doug Lea: ForkJoinTask ForkJoinTask是用少数线程执行海量独立任务的极好架构,这里的独立任务指的是任务和任务之间不要有共享数据,否则会有并发访问的问题. ForkJoinTask的实现包

Spring AMQP 源码分析 06 - 手动消息确认

### 准备 ## 目标 了解 Spring AMQP 如何手动确认消息已成功消费 ## 前置知识 <Spring AMQP 源码分析 04 - MessageListener> ## 相关资源 Offical doc:<http://docs.spring.io/spring-amqp/docs/1.7.3.RELEASE/reference/html/_reference.html#message-listener-adapter> Sample code:<https:

Akka源码分析-Remote-收消息

上一遍博客中,我们分析了网络链接建立的过程,一旦建立就可以正常的收发消息了.发送消息的细节不再分析,因为对于本地的actor来说这个过程相对简单,它只是创立链接然后给指定的netty网路服务发送消息就好了.接收消息就比较麻烦了,因为这对于actor来说是透明的,netty收到消息后如何把消息分发给指定的actor呢?这个分发的过程值得研究研究. 之前分析过,在监听创立的过程中,有一个对象非常关键:TcpServerHandler.它负责链接建立.消息收发等功能.TcpServerHandler继

Akka源码分析-Actor创建

上一篇博客我们介绍了ActorSystem的创建过程,下面我们就研究一下actor的创建过程. val system = ActorSystem("firstActorSystem",ConfigFactory.load()) val helloActor= system.actorOf(Props(new HelloActor),"HelloActor") helloActor ! "Hello" 普通情况下,我们一般使用ActorSystem

Akka源码分析-Actor&amp;ActorContext&amp;ActorRef&amp;ActorCell

分析源码的过程中我们发现,Akka出现了Actor.ActorRef.ActorCell.ActorContext等几个相似的概念,它们之间究竟有什么区别和联系呢? /** * Actor base trait that should be extended by or mixed to create an Actor with the semantics of the 'Actor Model': * <a href="http://en.wikipedia.org/wiki/Actor

Akka源码分析-CircuitBreaker(熔断器)

熔断器,在很多技术栈中都会出现的一种技术.它是在分布式系统中提供一个稳定的阻止嵌套失败的机制. 该怎么理解呢?简单来说,在分布式环境中,如果某个计算节点出现问题,很容易出现失败的逆向传到或整个系统的雪崩.什么意思呢?比如某个服务按照顺序依次调用了其他的三个服务,分别为A/B/C.如果B服务由于某种原因,响应变慢了,本来100毫秒就完成了,现在是1秒.此时A就会等待B服务的时间也就变成了1秒,那么就意味着会有很多的A服务调用在等待,如果并发量非常大,很容易就会造成A服务所在的节点出现问题,也就是说

rocketmq源码分析4-事务消息实现原理

为什么消息要具备事务能力 参见还是比较清晰的.简单的说 就是在你业务逻辑过程中,需要发送一条消息给订阅消息的人,但是期望是 此逻辑过程完全成功完成之后才能使订阅者收到消息.业务逻辑过程 假设是这样的:逻辑部分a-->发消息给MQ-->逻辑部分b假设我们在发送消息给MQ之后执行逻辑部分b时产生了异常,那如果MQ不具备事务消息能力时,订阅者也收到了消息.这是我们不希望见到的. 分布式事务基础概念 关于分布式事务.两阶段提交协议.三阶提交协议 理解分布式事务的两阶段提交2pc 分布式事务(一)两阶段