在数据流应用的现实场景中常常会遇到与外界系统对接的需求。这些外部系统可能是Actor系统又或者是一些其它类型的系统。与这些外界系统对接的意思是在另一个线程中运行的数据流可以接收外部系统推送的事件及做出行为改变的响应。
如果一个外界系统需要控制一个运行中数据流的功能环节GraphStage,首先必须在这个GraphStage内部构建一个控制函数,这样才能接触并更改GraphStage的内部状态。外部系统可以通过调用这个控制函数来向GraphStage发送信息,控制GraphStage行为。akka-stream是多线程异步模式的程序,所以这个函数只能是一个异步运行的回调callback。akka-stream提供了一个函数getAsyncCallback函数,能够把一个函数放到另一个线程里并返回它的callback:
/** * Obtain a callback object that can be used asynchronously to re-enter the * current [[GraphStage]] with an asynchronous notification. The [[invoke()]] method of the returned * [[AsyncCallback]] is safe to be called from other threads and it will in the background thread-safely * delegate to the passed callback function. I.e. [[invoke()]] will be called by the external world and * the passed handler will be invoked eventually in a thread-safe way by the execution environment. * * This object can be cached and reused within the same [[GraphStageLogic]]. */ final def getAsyncCallback[T](handler: T ⇒ Unit): AsyncCallback[T] = { new AsyncCallback[T] { override def invoke(event: T): Unit = interpreter.onAsyncInput(GraphStageLogic.this, event, handler.asInstanceOf[Any ⇒ Unit]) } }
getAsyncCallback把一个函数T=>Unit变成了异步运算函数并通过AsyncCallback返回它的回调callback。下面是getAsyncCallback的一个用例:
//external system object Injector { var callback: AsyncCallback[String] = null def inject(m: String) = { if (callback != null) callback.invoke(m) } } class InjectControl(injector: Injector.type) extends GraphStage[FlowShape[String,String]] { val inport = Inlet[String]("input") val outport = Outlet[String]("output") val shape = FlowShape.of(inport,outport) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { var extMessage = "" override def preStart(): Unit = { val callback = getAsyncCallback[String] { m => if (m.length > 0) m match { case "Stop" => completeStage() case s: String => extMessage = s } } injector.callback = callback } setHandler(inport, new InHandler { override def onPush(): Unit = if (extMessage.length > 0) { push(outport,extMessage) extMessage="" } else push(outport, grab(inport)) }) setHandler(outport, new OutHandler { override def onPull(): Unit = pull(inport) }) } }
上面这个例子里的object Injector模拟一个外部系统。我们重写了GraphStage InjectControl.createLogic里的preStart()函数,在这里把一个String=>Unit函数的callback登记在Injector里。这个callback函数能接受传入的String并更新内部状态extMessage,或者当传入String==“Stop"时终止数据流。在onPush()里extMessage最终会被当作流元素插入到数据流中。下面我们就构建这个GraphStage的测试运行程序:
object InteractWithStreams extends App { implicit val sys = ActorSystem("demoSys") implicit val ec = sys.dispatcher implicit val mat = ActorMaterializer( ActorMaterializerSettings(sys) .withInputBuffer(initialSize = 16, maxSize = 16) ) val source = Source(Stream.from(1)).map(_.toString).delay(1.second,DelayOverflowStrategy.backpressure) val graph = new InjectControl(Injector) val flow = Flow.fromGraph(graph) source.via(flow).to(Sink.foreach(println)).run() Thread.sleep(2000) Injector.inject("hello") Thread.sleep(2000) Injector.inject("world!") Thread.sleep(2000) Injector.inject("Stop") scala.io.StdIn.readLine() sys.terminate() }
试运行结果显示:
1 2 hello 4 5 6 world! 8 9 10 Process finished with exit code 0
正确地把"hello world!"插入了一个正在运行中的数据流中并在最后终止了这个数据流。
另外,一个GraphStage也可以被外界当作一种Actor来进行交流。我们可以在GraphStage内部构建一个(ActorRef,Any)=>Unit款式的函数,然后用getStageActor(func).ref以一种ActorRef形式返回这个函数:
/** * Initialize a [[StageActorRef]] which can be used to interact with from the outside world "as-if" an [[Actor]]. * The messages are looped through the [[getAsyncCallback]] mechanism of [[GraphStage]] so they are safe to modify * internal state of this stage. * * This method must (the earliest) be called after the [[GraphStageLogic]] constructor has finished running, * for example from the [[preStart]] callback the graph stage logic provides. * * Created [[StageActorRef]] to get messages and watch other actors in synchronous way. * * The [[StageActorRef]]‘s lifecycle is bound to the Stage, in other words when the Stage is finished, * the Actor will be terminated as well. The entity backing the [[StageActorRef]] is not a real Actor, * but the [[GraphStageLogic]] itself, therefore it does not react to [[PoisonPill]]. * * @param receive callback that will be called upon receiving of a message by this special Actor * @return minimal actor with watch method */ // FIXME: I don‘t like the Pair allocation :( @ApiMayChange final protected def getStageActor(receive: ((ActorRef, Any)) ⇒ Unit): StageActor = { _stageActor match { case null ⇒ val actorMaterializer = ActorMaterializerHelper.downcast(interpreter.materializer) _stageActor = new StageActor(actorMaterializer, getAsyncCallback, receive) _stageActor case existing ⇒ existing.become(receive) existing } } ... /** * The ActorRef by which this StageActor can be contacted from the outside. * This is a full-fledged ActorRef that supports watching and being watched * as well as location transparent (remote) communication. */ def ref: ActorRef = functionRef
下面是receive:((ActorRef,Any))=>Unit这个函数的实现例子:
def behavior(m:(ActorRef,Any)): Unit = { val (sender, msg) = m msg.asInstanceOf[String] match { case "Stop" => completeStage() case [email protected] _ => extMessage = s } }
这个函数的输入参数(sender,msg)代表发送消息的Actor和发送的消息。与上个例子一样,作为一个GraphStage的内部函数,它可以使用、更新GraphStage内部状态。GraphStage的实现如下:
class StageAsActor(extActor: ActorRef) extends GraphStage[FlowShape[String,String]] { val inport = Inlet[String]("input") val outport = Outlet[String]("output") val shape = FlowShape.of(inport,outport) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { var extMessage = "" override def preStart(): Unit = { extActor ! getStageActor(behavior).ref } def behavior(m:(ActorRef,Any)): Unit = { val (sender, msg) = m msg.asInstanceOf[String] match { case "Stop" => completeStage() case [email protected] _ => extMessage = s } } setHandler(inport, new InHandler { override def onPush(): Unit = if (extMessage.length > 0) { push(outport,extMessage) extMessage="" } else push(outport, grab(inport)) }) setHandler(outport, new OutHandler { override def onPull(): Unit = pull(inport) }) } }
参数extActor就是外部的控制Actor。在creatLogic.preStart()里我们先把StageActor传给extActor。外部系统就可以通过extActor来控制数据流行为:
class Messenger extends Actor with ActorLogging { var stageActor: ActorRef = _ override def receive: Receive = { case r: ActorRef => stageActor = r log.info("received stage actorRef") case s: String => stageActor forward s log.info(s"forwarding message:$s") } } object GetStageActorDemo extends App { implicit val sys = ActorSystem("demoSys") implicit val ec = sys.dispatcher implicit val mat = ActorMaterializer( ActorMaterializerSettings(sys) .withInputBuffer(initialSize = 16, maxSize = 16) ) val stageActorMessenger = sys.actorOf(Props[Messenger],"forwarder") val source = Source(Stream.from(1)).map(_.toString).delay(1.second,DelayOverflowStrategy.backpressure) val graph = new StageAsActor(stageActorMessenger) val flow = Flow.fromGraph(graph) source.via(flow).to(Sink.foreach(println)).run() Thread.sleep(2000) stageActorMessenger ! "Hello" Thread.sleep(1000) stageActorMessenger ! "World!" Thread.sleep(2000) stageActorMessenger ! "Stop" scala.io.StdIn.readLine() sys.terminate() }
Messenger就是一个存粹的中介,把控制消息通过StageActor转发给运行中的数据流。
下面是本次示范的源代码:
GetAsyncCallBack.scala
import akka.actor._ import akka.stream._ import akka.stream.scaladsl._ import akka.stream.stage._ import scala.concurrent.duration._ //external system object Injector { var callback: AsyncCallback[String] = null def inject(m: String) = { if (callback != null) callback.invoke(m) } } class InjectControl(injector: Injector.type) extends GraphStage[FlowShape[String,String]] { val inport = Inlet[String]("input") val outport = Outlet[String]("output") val shape = FlowShape.of(inport,outport) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { var extMessage = "" override def preStart(): Unit = { val callback = getAsyncCallback[String] { m => if (m.length > 0) m match { case "Stop" => completeStage() case s: String => extMessage = s } } injector.callback = callback } setHandler(inport, new InHandler { override def onPush(): Unit = if (extMessage.length > 0) { push(outport,extMessage) extMessage="" } else push(outport, grab(inport)) }) setHandler(outport, new OutHandler { override def onPull(): Unit = pull(inport) }) } } object GetAsyncCallbackDemo extends App { implicit val sys = ActorSystem("demoSys") implicit val ec = sys.dispatcher implicit val mat = ActorMaterializer( ActorMaterializerSettings(sys) .withInputBuffer(initialSize = 16, maxSize = 16) ) val source = Source(Stream.from(1)).map(_.toString).delay(1.second,DelayOverflowStrategy.backpressure) val graph = new InjectControl(Injector) val flow = Flow.fromGraph(graph) source.via(flow).to(Sink.foreach(println)).run() Thread.sleep(2000) Injector.inject("hello") Thread.sleep(2000) Injector.inject("world!") Thread.sleep(2000) Injector.inject("Stop") scala.io.StdIn.readLine() sys.terminate() }
GetStageActorDemo.scala
import akka.actor._ import akka.stream._ import akka.stream.scaladsl._ import akka.stream.stage._ import scala.concurrent.duration._ class StageAsActor(extActor: ActorRef) extends GraphStage[FlowShape[String,String]] { val inport = Inlet[String]("input") val outport = Outlet[String]("output") val shape = FlowShape.of(inport,outport) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { var extMessage = "" override def preStart(): Unit = { extActor ! getStageActor(behavior).ref } def behavior(m:(ActorRef,Any)): Unit = { val (sender, msg) = m msg.asInstanceOf[String] match { case "Stop" => completeStage() case [email protected] _ => extMessage = s } } setHandler(inport, new InHandler { override def onPush(): Unit = if (extMessage.length > 0) { push(outport,extMessage) extMessage="" } else push(outport, grab(inport)) }) setHandler(outport, new OutHandler { override def onPull(): Unit = pull(inport) }) } } class Messenger extends Actor with ActorLogging { var stageActor: ActorRef = _ override def receive: Receive = { case r: ActorRef => stageActor = r log.info("received stage actorRef") case s: String => stageActor forward s log.info(s"forwarding message:$s") } } object GetStageActorDemo extends App { implicit val sys = ActorSystem("demoSys") implicit val ec = sys.dispatcher implicit val mat = ActorMaterializer( ActorMaterializerSettings(sys) .withInputBuffer(initialSize = 16, maxSize = 16) ) val stageActorMessenger = sys.actorOf(Props[Messenger],"forwarder") val source = Source(Stream.from(1)).map(_.toString).delay(1.second,DelayOverflowStrategy.backpressure) val graph = new StageAsActor(stageActorMessenger) val flow = Flow.fromGraph(graph) source.via(flow).to(Sink.foreach(println)).run() Thread.sleep(2000) stageActorMessenger ! "Hello" Thread.sleep(1000) stageActorMessenger ! "World!" Thread.sleep(2000) stageActorMessenger ! "Stop" scala.io.StdIn.readLine() sys.terminate() }