Akka源码分析-Remote-收消息

  上一遍博客中,我们分析了网络链接建立的过程,一旦建立就可以正常的收发消息了。发送消息的细节不再分析,因为对于本地的actor来说这个过程相对简单,它只是创立链接然后给指定的netty网路服务发送消息就好了。接收消息就比较麻烦了,因为这对于actor来说是透明的,netty收到消息后如何把消息分发给指定的actor呢?这个分发的过程值得研究研究。

  之前分析过,在监听创立的过程中,有一个对象非常关键:TcpServerHandler。它负责链接建立、消息收发等功能。TcpServerHandler继承了ServerHandler

private[netty] abstract class ServerHandler(
  protected final val transport:               NettyTransport,
  private final val associationListenerFuture: Future[AssociationEventListener])
  extends NettyServerHelpers with CommonHandlers

  ServerHandler继承了NettyServerHelpers

private[netty] trait NettyServerHelpers extends SimpleChannelUpstreamHandler with NettyHelpers {

  final override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent): Unit = {
    super.messageReceived(ctx, e)
    onMessage(ctx, e)
  }

  final override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent): Unit = transformException(ctx, e)

  final override def channelConnected(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = {
    super.channelConnected(ctx, e)
    onConnect(ctx, e)
  }

  final override def channelOpen(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = {
    super.channelOpen(ctx, e)
    onOpen(ctx, e)
  }

  final override def channelDisconnected(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = {
    super.channelDisconnected(ctx, e)
    onDisconnect(ctx, e)
  }
}

  很明显NettyServerHelpers有一个messageReceived应该就是收到消息时回调的方法,那onMessage在哪里实现呢?TcpServerHandler还继承了TcpHandlers,我们来看看TcpHandlers的onMessage方法。

 override def onMessage(ctx: ChannelHandlerContext, e: MessageEvent): Unit = {
    val bytes: Array[Byte] = e.getMessage.asInstanceOf[ChannelBuffer].array()
    if (bytes.length > 0) notifyListener(e.getChannel, InboundPayload(ByteString(bytes)))
  }

  它最终用InboundPayload封装了收到的数据,并调用了ChannelLocalActor.notifyListener方法。

private[remote] object ChannelLocalActor extends ChannelLocal[Option[HandleEventListener]] {
  override def initialValue(channel: Channel): Option[HandleEventListener] = None
  def notifyListener(channel: Channel, msg: HandleEvent): Unit = get(channel) foreach { _ notify msg }
}

  ChannelLocalActor可以先把它理解成一个ThreadLocal对象,其他的技术细节读者可以自行谷歌。notifyListener只调用了get,那具体是在哪里set的呢?通过channel变量get到的Option[HandleEventListener]又是在哪里赋值的呢?

  override def registerListener(
    channel:             Channel,
    listener:            HandleEventListener,
    msg:                 ChannelBuffer,
    remoteSocketAddress: InetSocketAddress): Unit = ChannelLocalActor.set(channel, Some(listener))

  很显然是在registerListener时set的值,那registerListener在哪里调用呢?如果读过上一篇的文章,一定会知道ServerHandler.initInbound函数,这个函数调用了CommonHandlers.init

final protected def init(channel: Channel, remoteSocketAddress: SocketAddress, remoteAddress: Address, msg: ChannelBuffer)(
    op: (AssociationHandle ? Any)): Unit = {
    import transport._
    NettyTransport.addressFromSocketAddress(channel.getLocalAddress, schemeIdentifier, system.name, Some(settings.Hostname), None) match {
      case Some(localAddress) ?
        val handle = createHandle(channel, localAddress, remoteAddress)
        handle.readHandlerPromise.future.foreach {
          listener ?
            registerListener(channel, listener, msg, remoteSocketAddress.asInstanceOf[InetSocketAddress])
            channel.setReadable(true)
        }
        op(handle)

      case _ ? NettyTransport.gracefulClose(channel)
    }
  }

  看到没,上面的函数中调用了registerListener,那listener具体在哪里创建的呢,或者是哪个变量对应的值呢?这就需要研究createHandle对象及其返回值是什么了。经过分析还是找到了TcpHandlers这个trait,里面有createHandle的具体实现。

  override def createHandle(channel: Channel, localAddress: Address, remoteAddress: Address): AssociationHandle =
    new TcpAssociationHandle(localAddress, remoteAddress, transport, channel)

  TcpAssociationHandle源码如下

private[remote] class TcpAssociationHandle(
  val localAddress:    Address,
  val remoteAddress:   Address,
  val transport:       NettyTransport,
  private val channel: Channel)
  extends AssociationHandle {
  import transport.executionContext

  override val readHandlerPromise: Promise[HandleEventListener] = Promise()

  override def write(payload: ByteString): Boolean =
    if (channel.isWritable && channel.isOpen) {
      channel.write(ChannelBuffers.wrappedBuffer(payload.asByteBuffer))
      true
    } else false

  override def disassociate(): Unit = NettyTransport.gracefulClose(channel)
}

  由此可见,readHandlerPromise是一个Promise[HandleEventListener],并没有具体赋值的逻辑,这就要去使用TcpAssociationHandle的相关代码找相关的赋值逻辑了。TcpAssociationHandle在哪里使用呢?还记得handleInboundAssociation建立连接的过程吗?它最终调用了createAndRegisterEndpoint

  private def createAndRegisterEndpoint(handle: AkkaProtocolHandle): Unit = {
    val writing = settings.UsePassiveConnections && !endpoints.hasWritableEndpointFor(handle.remoteAddress)
    eventPublisher.notifyListeners(AssociatedEvent(handle.localAddress, handle.remoteAddress, inbound = true))

    val endpoint = createEndpoint(
      handle.remoteAddress,
      handle.localAddress,
      transportMapping(handle.localAddress),
      settings,
      Some(handle),
      writing)

    if (writing)
      endpoints.registerWritableEndpoint(handle.remoteAddress, Some(handle.handshakeInfo.uid), endpoint)
    else {
      endpoints.registerReadOnlyEndpoint(handle.remoteAddress, endpoint, handle.handshakeInfo.uid)
      if (!endpoints.hasWritableEndpointFor(handle.remoteAddress))
        endpoints.removePolicy(handle.remoteAddress)
    }
  }

  createAndRegisterEndpoint拿着一个连接实例AkkaProtocolHandle创建了一个endpoint,其中有个很关键的字段writing,它是true还是false呢?UsePassiveConnections默认为true,且经分析!endpoints.hasWritableEndpointFor(handle.remoteAddress)应该也是true,所以writing是true

 # Reuse inbound connections for outbound messages
    use-passive-connections = on

  ReliableDeliverySupervisor其实是对EndpointWriter的代理。在创建ReliableDeliverySupervisor的过程中AkkaProtocolHandle是作为参数传入的,也就监听到连接消息后创建的handle。而在创建EndpointWriter的过程中,这个handle又是作为第一个参数传入了EndpointWriter。我们来看看EndpointWriter是如何使用这个handle的。

 override def preStart(): Unit = {
    handle match {
      case Some(h) ?
        reader = startReadEndpoint(h)
      case None ?
        transport.associate(remoteAddress, refuseUid).map(Handle(_)) pipeTo self
    }
  }

  在preStart时,handle应该是有值的,如果有值,就调用了startReadEndpoint(h)方法。

private def startReadEndpoint(handle: AkkaProtocolHandle): Some[ActorRef] = {
    val newReader =
      context.watch(context.actorOf(
        RARP(context.system).configureDispatcher(EndpointReader.props(localAddress, remoteAddress, transport, settings, codec,
          msgDispatch, inbound, handle.handshakeInfo.uid, reliableDeliverySupervisor, receiveBuffers)).withDeploy(Deploy.local),
        "endpointReader-" + AddressUrlEncoder(remoteAddress) + "-" + readerId.next()))
    handle.readHandlerPromise.success(ActorHandleEventListener(newReader))
    Some(newReader)
  }

  startReadEndpoint做了什么呢?它又创建了一个Actor:EndpointReader!!!好多中间的actor创建。创建之后,调用了handle.readHandlerPromise.success(ActorHandleEventListener(newReader))给handle.readHandlerPromise。还记得ActorHandleEventListener吗,它就是把收到的消息转发了对应的actor,此处就是newReader。

  EndpointReader如何处理InboundPayload消息呢?首先解码收到的消息,然后给创建它的reliableDelivery发送ack消息。

  override def decodeMessage(
    raw:          ByteString,
    provider:     RemoteActorRefProvider,
    localAddress: Address): (Option[Ack], Option[Message]) = {
    val ackAndEnvelope = AckAndEnvelopeContainer.parseFrom(raw.toArray)

    val ackOption = if (ackAndEnvelope.hasAck) {
      import scala.collection.JavaConverters._
      Some(Ack(SeqNo(ackAndEnvelope.getAck.getCumulativeAck), ackAndEnvelope.getAck.getNacksList.asScala.map(SeqNo(_)).toSet))
    } else None

    val messageOption = if (ackAndEnvelope.hasEnvelope) {
      val msgPdu = ackAndEnvelope.getEnvelope
      Some(Message(
        recipient = provider.resolveActorRefWithLocalAddress(msgPdu.getRecipient.getPath, localAddress),
        recipientAddress = AddressFromURIString(msgPdu.getRecipient.getPath),
        serializedMessage = msgPdu.getMessage,
        senderOption =
          if (msgPdu.hasSender) OptionVal(provider.resolveActorRefWithLocalAddress(msgPdu.getSender.getPath, localAddress))
          else OptionVal.None,
        seqOption =
          if (msgPdu.hasSeq) Some(SeqNo(msgPdu.getSeq)) else None))
    } else None

    (ackOption, messageOption)
  }

  上面是decodeMessage的源码,消息最终被decode成了Message对象。

  final case class Message(
    recipient:         InternalActorRef,
    recipientAddress:  Address,
    serializedMessage: SerializedMessage,
    senderOption:      OptionVal[ActorRef],
    seqOption:         Option[SeqNo]) extends HasSequenceNumber {

    def reliableDeliveryEnabled = seqOption.isDefined

    override def seq: SeqNo = seqOption.get
  }

  默认情况下reliableDeliveryEnabled是false的,因为发送出去的msgPdu是没有getSeq的,因为默认的tcp是保证消息发送的。所以EndpointReader收到消息后调用了msgDispatch.dispatch把消息分发出去了。根据上下文msgDispatch是在EndpointWriter创建的,代码如下。

val msgDispatch = new DefaultMessageDispatcher(extendedSystem, provider, markLog)

  DefaultMessageDispatcher.dispatch不再具体分析,它就是把消息tell给了Message.recipient,而recipient是一个InternalActorRef,对的,你没有看错,这就是一个InternalActorRef,是不是很神奇,payload解码之后直接就有目标actor的InternalActorRef了??!!那我们就得好好看看是如何对payload进行解码的了。

  在decodeMessage函数中,有两处代码非常关键:“recipient = provider.resolveActorRefWithLocalAddress(msgPdu.getRecipient.getPath, localAddress)”、“if (msgPdu.hasSender) OptionVal(provider.resolveActorRefWithLocalAddress(msgPdu.getSender.getPath, localAddress))”。都是调用provider.resolveActorRefWithLocalAddress函数通过actor的path转化成了对应actor的ActorRef,很显然provider就是RemoteActorRefProvider。

/**
   * INTERNAL API
   * Called in deserialization of incoming remote messages where the correct local address is known.
   */
  private[akka] def resolveActorRefWithLocalAddress(path: String, localAddress: Address): InternalActorRef = {
    path match {
      case ActorPathExtractor(address, elems) ?
        if (hasAddress(address))
          local.resolveActorRef(rootGuardian, elems)
        else try {
          new RemoteActorRef(transport, localAddress, RootActorPath(address) / elems, Nobody, props = None, deploy = None)
        } catch {
          case NonFatal(e) ?
            log.warning("Error while resolving ActorRef [{}] due to [{}]", path, e.getMessage)
            new EmptyLocalActorRef(this, RootActorPath(address) / elems, eventStream)
        }
      case _ ?
        log.debug("Resolve (deserialization) of unknown (invalid) path [{}], using deadLetters.", path)
        deadLetters
    }
  }

   resolveActorRefWithLocalAddress也很简单,如果目标address包含在本机范围,就调用local.resolveActorRef,否则就创建RemoteActorRef,关于RemoteActorRef的作用这里不再讲解。

  /**
   * INTERNAL API
   */
  private[akka] def resolveActorRef(ref: InternalActorRef, pathElements: Iterable[String]): InternalActorRef =
    if (pathElements.isEmpty) {
      log.debug("Resolve (deserialization) of empty path doesn‘t match an active actor, using deadLetters.")
      deadLetters
    } else ref.getChild(pathElements.iterator) match {
      case Nobody ?
        if (log.isDebugEnabled)
          log.debug(
            "Resolve (deserialization) of path [{}] doesn‘t match an active actor. " +
              "It has probably been stopped, using deadLetters.",
            pathElements.mkString("/"))
        new EmptyLocalActorRef(system.provider, ref.path / pathElements, eventStream)
      case x ? x
    }

  LocalActorRefProvider.resolveActorRef也比较简单,就是调用ref.getChild,而ref是LocalActorRefProvider.rootGuardian,其实就是在本地范围内从root向下查找对应的ActorRef。

  至此remote模式下收发消息的过程我们就分析清楚了,如果还有不清楚的小伙伴就再把之前的文章复习一下,当然还可以在下面留言讨论。

原文地址:https://www.cnblogs.com/gabry/p/9390621.html

时间: 2024-10-16 01:39:44

Akka源码分析-Remote-收消息的相关文章

Akka源码分析-Cluster-ActorSystem

前面几篇博客,我们依次介绍了local和remote的一些内容,其实再分析cluster就会简单很多,后面关于cluster的源码分析,能够省略的地方,就不再贴源码而是一句话带过了,如果有不理解的地方,希望多翻翻之前的博客. 在使用cluster时,配置文件中的akka.actor.provider值是cluster,所以ActorSystem对应的provider就是akka.cluster.ClusterActorRefProvider. /** * INTERNAL API * * The

Akka源码分析-Cluster-Distributed Publish Subscribe in Cluster

在ClusterClient源码分析中,我们知道,他是依托于"Distributed Publish Subscribe in Cluster"来实现消息的转发的,那本文就来分析一下Pub/Sub是如何实现的. 还记得之前分析Cluster源码的文章吗?其实Cluster只是把集群内各个节点的信息通过gossip协议公布出来,并把节点的信息分发出来.但各个actor的地址还是需要开发者自行获取或设计的,比如我要跟worker通信,那就需要知道这个actor在哪个节点,通过actorPa

akka源码分析

看akka源码的一些体会,没有列出源码来.akka代码主要包括两块:底层分发(akka.dispatch包)和上层模型(akka.actor包),从底层线程调度(dispatch)往上看起 函数式语言主要处理表达式求值,面向对象语言主要处理对象间消息发送消息. 1. 底层线程调度 Doug Lea: ForkJoinTask ForkJoinTask是用少数线程执行海量独立任务的极好架构,这里的独立任务指的是任务和任务之间不要有共享数据,否则会有并发访问的问题. ForkJoinTask的实现包

Spring AMQP 源码分析 06 - 手动消息确认

### 准备 ## 目标 了解 Spring AMQP 如何手动确认消息已成功消费 ## 前置知识 <Spring AMQP 源码分析 04 - MessageListener> ## 相关资源 Offical doc:<http://docs.spring.io/spring-amqp/docs/1.7.3.RELEASE/reference/html/_reference.html#message-listener-adapter> Sample code:<https:

Akka源码分析-Remote-发消息

上一篇博客我们介绍了remote模式下Actor的创建,其实与local的创建并没有太大区别,一般情况下还是使用LocalActorRef创建了Actor.那么发消息是否意味着也是相同的呢? 既然actorOf还是委托给了LocalActorRef,那么在本地创建的Actor发消息还是跟以前一样的,那么如果如何给远程的Actor发消息呢?我们一般是通过actorSelection或者给远程Actor发送一个Identify消息,来接收对应的ActorRef,然后再发消息.我们来分析一下这两者的区

Akka源码分析-Actor创建

上一篇博客我们介绍了ActorSystem的创建过程,下面我们就研究一下actor的创建过程. val system = ActorSystem("firstActorSystem",ConfigFactory.load()) val helloActor= system.actorOf(Props(new HelloActor),"HelloActor") helloActor ! "Hello" 普通情况下,我们一般使用ActorSystem

Akka源码分析-Actor&amp;ActorContext&amp;ActorRef&amp;ActorCell

分析源码的过程中我们发现,Akka出现了Actor.ActorRef.ActorCell.ActorContext等几个相似的概念,它们之间究竟有什么区别和联系呢? /** * Actor base trait that should be extended by or mixed to create an Actor with the semantics of the 'Actor Model': * <a href="http://en.wikipedia.org/wiki/Actor

Akka源码分析-CircuitBreaker(熔断器)

熔断器,在很多技术栈中都会出现的一种技术.它是在分布式系统中提供一个稳定的阻止嵌套失败的机制. 该怎么理解呢?简单来说,在分布式环境中,如果某个计算节点出现问题,很容易出现失败的逆向传到或整个系统的雪崩.什么意思呢?比如某个服务按照顺序依次调用了其他的三个服务,分别为A/B/C.如果B服务由于某种原因,响应变慢了,本来100毫秒就完成了,现在是1秒.此时A就会等待B服务的时间也就变成了1秒,那么就意味着会有很多的A服务调用在等待,如果并发量非常大,很容易就会造成A服务所在的节点出现问题,也就是说

rocketmq源码分析4-事务消息实现原理

为什么消息要具备事务能力 参见还是比较清晰的.简单的说 就是在你业务逻辑过程中,需要发送一条消息给订阅消息的人,但是期望是 此逻辑过程完全成功完成之后才能使订阅者收到消息.业务逻辑过程 假设是这样的:逻辑部分a-->发消息给MQ-->逻辑部分b假设我们在发送消息给MQ之后执行逻辑部分b时产生了异常,那如果MQ不具备事务消息能力时,订阅者也收到了消息.这是我们不希望见到的. 分布式事务基础概念 关于分布式事务.两阶段提交协议.三阶提交协议 理解分布式事务的两阶段提交2pc 分布式事务(一)两阶段