上一遍博客中,我们分析了网络链接建立的过程,一旦建立就可以正常的收发消息了。发送消息的细节不再分析,因为对于本地的actor来说这个过程相对简单,它只是创立链接然后给指定的netty网路服务发送消息就好了。接收消息就比较麻烦了,因为这对于actor来说是透明的,netty收到消息后如何把消息分发给指定的actor呢?这个分发的过程值得研究研究。
之前分析过,在监听创立的过程中,有一个对象非常关键:TcpServerHandler。它负责链接建立、消息收发等功能。TcpServerHandler继承了ServerHandler
private[netty] abstract class ServerHandler( protected final val transport: NettyTransport, private final val associationListenerFuture: Future[AssociationEventListener]) extends NettyServerHelpers with CommonHandlers
ServerHandler继承了NettyServerHelpers
private[netty] trait NettyServerHelpers extends SimpleChannelUpstreamHandler with NettyHelpers { final override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent): Unit = { super.messageReceived(ctx, e) onMessage(ctx, e) } final override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent): Unit = transformException(ctx, e) final override def channelConnected(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = { super.channelConnected(ctx, e) onConnect(ctx, e) } final override def channelOpen(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = { super.channelOpen(ctx, e) onOpen(ctx, e) } final override def channelDisconnected(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = { super.channelDisconnected(ctx, e) onDisconnect(ctx, e) } }
很明显NettyServerHelpers有一个messageReceived应该就是收到消息时回调的方法,那onMessage在哪里实现呢?TcpServerHandler还继承了TcpHandlers,我们来看看TcpHandlers的onMessage方法。
override def onMessage(ctx: ChannelHandlerContext, e: MessageEvent): Unit = { val bytes: Array[Byte] = e.getMessage.asInstanceOf[ChannelBuffer].array() if (bytes.length > 0) notifyListener(e.getChannel, InboundPayload(ByteString(bytes))) }
它最终用InboundPayload封装了收到的数据,并调用了ChannelLocalActor.notifyListener方法。
private[remote] object ChannelLocalActor extends ChannelLocal[Option[HandleEventListener]] { override def initialValue(channel: Channel): Option[HandleEventListener] = None def notifyListener(channel: Channel, msg: HandleEvent): Unit = get(channel) foreach { _ notify msg } }
ChannelLocalActor可以先把它理解成一个ThreadLocal对象,其他的技术细节读者可以自行谷歌。notifyListener只调用了get,那具体是在哪里set的呢?通过channel变量get到的Option[HandleEventListener]又是在哪里赋值的呢?
override def registerListener( channel: Channel, listener: HandleEventListener, msg: ChannelBuffer, remoteSocketAddress: InetSocketAddress): Unit = ChannelLocalActor.set(channel, Some(listener))
很显然是在registerListener时set的值,那registerListener在哪里调用呢?如果读过上一篇的文章,一定会知道ServerHandler.initInbound函数,这个函数调用了CommonHandlers.init
final protected def init(channel: Channel, remoteSocketAddress: SocketAddress, remoteAddress: Address, msg: ChannelBuffer)( op: (AssociationHandle ? Any)): Unit = { import transport._ NettyTransport.addressFromSocketAddress(channel.getLocalAddress, schemeIdentifier, system.name, Some(settings.Hostname), None) match { case Some(localAddress) ? val handle = createHandle(channel, localAddress, remoteAddress) handle.readHandlerPromise.future.foreach { listener ? registerListener(channel, listener, msg, remoteSocketAddress.asInstanceOf[InetSocketAddress]) channel.setReadable(true) } op(handle) case _ ? NettyTransport.gracefulClose(channel) } }
看到没,上面的函数中调用了registerListener,那listener具体在哪里创建的呢,或者是哪个变量对应的值呢?这就需要研究createHandle对象及其返回值是什么了。经过分析还是找到了TcpHandlers这个trait,里面有createHandle的具体实现。
override def createHandle(channel: Channel, localAddress: Address, remoteAddress: Address): AssociationHandle = new TcpAssociationHandle(localAddress, remoteAddress, transport, channel)
TcpAssociationHandle源码如下
private[remote] class TcpAssociationHandle( val localAddress: Address, val remoteAddress: Address, val transport: NettyTransport, private val channel: Channel) extends AssociationHandle { import transport.executionContext override val readHandlerPromise: Promise[HandleEventListener] = Promise() override def write(payload: ByteString): Boolean = if (channel.isWritable && channel.isOpen) { channel.write(ChannelBuffers.wrappedBuffer(payload.asByteBuffer)) true } else false override def disassociate(): Unit = NettyTransport.gracefulClose(channel) }
由此可见,readHandlerPromise是一个Promise[HandleEventListener],并没有具体赋值的逻辑,这就要去使用TcpAssociationHandle的相关代码找相关的赋值逻辑了。TcpAssociationHandle在哪里使用呢?还记得handleInboundAssociation建立连接的过程吗?它最终调用了createAndRegisterEndpoint
private def createAndRegisterEndpoint(handle: AkkaProtocolHandle): Unit = { val writing = settings.UsePassiveConnections && !endpoints.hasWritableEndpointFor(handle.remoteAddress) eventPublisher.notifyListeners(AssociatedEvent(handle.localAddress, handle.remoteAddress, inbound = true)) val endpoint = createEndpoint( handle.remoteAddress, handle.localAddress, transportMapping(handle.localAddress), settings, Some(handle), writing) if (writing) endpoints.registerWritableEndpoint(handle.remoteAddress, Some(handle.handshakeInfo.uid), endpoint) else { endpoints.registerReadOnlyEndpoint(handle.remoteAddress, endpoint, handle.handshakeInfo.uid) if (!endpoints.hasWritableEndpointFor(handle.remoteAddress)) endpoints.removePolicy(handle.remoteAddress) } }
createAndRegisterEndpoint拿着一个连接实例AkkaProtocolHandle创建了一个endpoint,其中有个很关键的字段writing,它是true还是false呢?UsePassiveConnections默认为true,且经分析!endpoints.hasWritableEndpointFor(handle.remoteAddress)应该也是true,所以writing是true
# Reuse inbound connections for outbound messages use-passive-connections = on
ReliableDeliverySupervisor其实是对EndpointWriter的代理。在创建ReliableDeliverySupervisor的过程中AkkaProtocolHandle是作为参数传入的,也就监听到连接消息后创建的handle。而在创建EndpointWriter的过程中,这个handle又是作为第一个参数传入了EndpointWriter。我们来看看EndpointWriter是如何使用这个handle的。
override def preStart(): Unit = { handle match { case Some(h) ? reader = startReadEndpoint(h) case None ? transport.associate(remoteAddress, refuseUid).map(Handle(_)) pipeTo self } }
在preStart时,handle应该是有值的,如果有值,就调用了startReadEndpoint(h)方法。
private def startReadEndpoint(handle: AkkaProtocolHandle): Some[ActorRef] = { val newReader = context.watch(context.actorOf( RARP(context.system).configureDispatcher(EndpointReader.props(localAddress, remoteAddress, transport, settings, codec, msgDispatch, inbound, handle.handshakeInfo.uid, reliableDeliverySupervisor, receiveBuffers)).withDeploy(Deploy.local), "endpointReader-" + AddressUrlEncoder(remoteAddress) + "-" + readerId.next())) handle.readHandlerPromise.success(ActorHandleEventListener(newReader)) Some(newReader) }
startReadEndpoint做了什么呢?它又创建了一个Actor:EndpointReader!!!好多中间的actor创建。创建之后,调用了handle.readHandlerPromise.success(ActorHandleEventListener(newReader))给handle.readHandlerPromise。还记得ActorHandleEventListener吗,它就是把收到的消息转发了对应的actor,此处就是newReader。
EndpointReader如何处理InboundPayload消息呢?首先解码收到的消息,然后给创建它的reliableDelivery发送ack消息。
override def decodeMessage( raw: ByteString, provider: RemoteActorRefProvider, localAddress: Address): (Option[Ack], Option[Message]) = { val ackAndEnvelope = AckAndEnvelopeContainer.parseFrom(raw.toArray) val ackOption = if (ackAndEnvelope.hasAck) { import scala.collection.JavaConverters._ Some(Ack(SeqNo(ackAndEnvelope.getAck.getCumulativeAck), ackAndEnvelope.getAck.getNacksList.asScala.map(SeqNo(_)).toSet)) } else None val messageOption = if (ackAndEnvelope.hasEnvelope) { val msgPdu = ackAndEnvelope.getEnvelope Some(Message( recipient = provider.resolveActorRefWithLocalAddress(msgPdu.getRecipient.getPath, localAddress), recipientAddress = AddressFromURIString(msgPdu.getRecipient.getPath), serializedMessage = msgPdu.getMessage, senderOption = if (msgPdu.hasSender) OptionVal(provider.resolveActorRefWithLocalAddress(msgPdu.getSender.getPath, localAddress)) else OptionVal.None, seqOption = if (msgPdu.hasSeq) Some(SeqNo(msgPdu.getSeq)) else None)) } else None (ackOption, messageOption) }
上面是decodeMessage的源码,消息最终被decode成了Message对象。
final case class Message( recipient: InternalActorRef, recipientAddress: Address, serializedMessage: SerializedMessage, senderOption: OptionVal[ActorRef], seqOption: Option[SeqNo]) extends HasSequenceNumber { def reliableDeliveryEnabled = seqOption.isDefined override def seq: SeqNo = seqOption.get }
默认情况下reliableDeliveryEnabled是false的,因为发送出去的msgPdu是没有getSeq的,因为默认的tcp是保证消息发送的。所以EndpointReader收到消息后调用了msgDispatch.dispatch把消息分发出去了。根据上下文msgDispatch是在EndpointWriter创建的,代码如下。
val msgDispatch = new DefaultMessageDispatcher(extendedSystem, provider, markLog)
DefaultMessageDispatcher.dispatch不再具体分析,它就是把消息tell给了Message.recipient,而recipient是一个InternalActorRef,对的,你没有看错,这就是一个InternalActorRef,是不是很神奇,payload解码之后直接就有目标actor的InternalActorRef了??!!那我们就得好好看看是如何对payload进行解码的了。
在decodeMessage函数中,有两处代码非常关键:“recipient = provider.resolveActorRefWithLocalAddress(msgPdu.getRecipient.getPath, localAddress)”、“if (msgPdu.hasSender) OptionVal(provider.resolveActorRefWithLocalAddress(msgPdu.getSender.getPath, localAddress))”。都是调用provider.resolveActorRefWithLocalAddress函数通过actor的path转化成了对应actor的ActorRef,很显然provider就是RemoteActorRefProvider。
/** * INTERNAL API * Called in deserialization of incoming remote messages where the correct local address is known. */ private[akka] def resolveActorRefWithLocalAddress(path: String, localAddress: Address): InternalActorRef = { path match { case ActorPathExtractor(address, elems) ? if (hasAddress(address)) local.resolveActorRef(rootGuardian, elems) else try { new RemoteActorRef(transport, localAddress, RootActorPath(address) / elems, Nobody, props = None, deploy = None) } catch { case NonFatal(e) ? log.warning("Error while resolving ActorRef [{}] due to [{}]", path, e.getMessage) new EmptyLocalActorRef(this, RootActorPath(address) / elems, eventStream) } case _ ? log.debug("Resolve (deserialization) of unknown (invalid) path [{}], using deadLetters.", path) deadLetters } }
resolveActorRefWithLocalAddress也很简单,如果目标address包含在本机范围,就调用local.resolveActorRef,否则就创建RemoteActorRef,关于RemoteActorRef的作用这里不再讲解。
/** * INTERNAL API */ private[akka] def resolveActorRef(ref: InternalActorRef, pathElements: Iterable[String]): InternalActorRef = if (pathElements.isEmpty) { log.debug("Resolve (deserialization) of empty path doesn‘t match an active actor, using deadLetters.") deadLetters } else ref.getChild(pathElements.iterator) match { case Nobody ? if (log.isDebugEnabled) log.debug( "Resolve (deserialization) of path [{}] doesn‘t match an active actor. " + "It has probably been stopped, using deadLetters.", pathElements.mkString("/")) new EmptyLocalActorRef(system.provider, ref.path / pathElements, eventStream) case x ? x }
LocalActorRefProvider.resolveActorRef也比较简单,就是调用ref.getChild,而ref是LocalActorRefProvider.rootGuardian,其实就是在本地范围内从root向下查找对应的ActorRef。
至此remote模式下收发消息的过程我们就分析清楚了,如果还有不清楚的小伙伴就再把之前的文章复习一下,当然还可以在下面留言讨论。
原文地址:https://www.cnblogs.com/gabry/p/9390621.html