Akka(24): Stream:从外部系统控制数据流-control live stream from external system

在数据流应用的现实场景中常常会遇到与外界系统对接的需求。这些外部系统可能是Actor系统又或者是一些其它类型的系统。与这些外界系统对接的意思是在另一个线程中运行的数据流可以接收外部系统推送的事件及做出行为改变的响应。

如果一个外界系统需要控制一个运行中数据流的功能环节GraphStage,首先必须在这个GraphStage内部构建一个控制函数,这样才能接触并更改GraphStage的内部状态。外部系统可以通过调用这个控制函数来向GraphStage发送信息,控制GraphStage行为。akka-stream是多线程异步模式的程序,所以这个函数只能是一个异步运行的回调callback。akka-stream提供了一个函数getAsyncCallback函数,能够把一个函数放到另一个线程里并返回它的callback:

  /**
   * Obtain a callback object that can be used asynchronously to re-enter the
   * current [[GraphStage]] with an asynchronous notification. The [[invoke()]] method of the returned
   * [[AsyncCallback]] is safe to be called from other threads and it will in the background thread-safely
   * delegate to the passed callback function. I.e. [[invoke()]] will be called by the external world and
   * the passed handler will be invoked eventually in a thread-safe way by the execution environment.
   *
   * This object can be cached and reused within the same [[GraphStageLogic]].
   */
  final def getAsyncCallback[T](handler: T ⇒ Unit): AsyncCallback[T] = {
    new AsyncCallback[T] {
      override def invoke(event: T): Unit =
        interpreter.onAsyncInput(GraphStageLogic.this, event, handler.asInstanceOf[Any ⇒ Unit])
    }
  }

getAsyncCallback把一个函数T=>Unit变成了异步运算函数并通过AsyncCallback返回它的回调callback。下面是getAsyncCallback的一个用例:

//external system
object Injector {
  var callback: AsyncCallback[String] = null
   def inject(m: String) = {
     if (callback != null)
     callback.invoke(m)
   }
}
class InjectControl(injector: Injector.type) extends GraphStage[FlowShape[String,String]] {
  val inport = Inlet[String]("input")
  val outport = Outlet[String]("output")
  val shape = FlowShape.of(inport,outport)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) {
      var extMessage = ""
      override def preStart(): Unit = {
        val callback = getAsyncCallback[String] { m =>
          if (m.length > 0)
             m match {
               case "Stop" => completeStage()
               case s: String => extMessage = s
             }

        }
        injector.callback = callback
      }

      setHandler(inport, new InHandler {
        override def onPush(): Unit =
          if (extMessage.length > 0) {
            push(outport,extMessage)
            extMessage=""
          }
        else
          push(outport, grab(inport))
      })
      setHandler(outport, new OutHandler {
        override def onPull(): Unit = pull(inport)
      })
    }

}

上面这个例子里的object Injector模拟一个外部系统。我们重写了GraphStage InjectControl.createLogic里的preStart()函数,在这里把一个String=>Unit函数的callback登记在Injector里。这个callback函数能接受传入的String并更新内部状态extMessage,或者当传入String==“Stop"时终止数据流。在onPush()里extMessage最终会被当作流元素插入到数据流中。下面我们就构建这个GraphStage的测试运行程序:

object InteractWithStreams extends App {
  implicit val sys = ActorSystem("demoSys")
  implicit val ec = sys.dispatcher
  implicit val mat = ActorMaterializer(
    ActorMaterializerSettings(sys)
      .withInputBuffer(initialSize = 16, maxSize = 16)
  )

  val source = Source(Stream.from(1)).map(_.toString).delay(1.second,DelayOverflowStrategy.backpressure)
  val graph = new InjectControl(Injector)
  val flow = Flow.fromGraph(graph)

  source.via(flow).to(Sink.foreach(println)).run()
  Thread.sleep(2000)
  Injector.inject("hello")
  Thread.sleep(2000)
  Injector.inject("world!")
  Thread.sleep(2000)
  Injector.inject("Stop")

  scala.io.StdIn.readLine()
  sys.terminate()

}

试运行结果显示:

1
2
hello
4
5
6
world!
8
9
10

Process finished with exit code 0

正确地把"hello world!"插入了一个正在运行中的数据流中并在最后终止了这个数据流。

另外,一个GraphStage也可以被外界当作一种Actor来进行交流。我们可以在GraphStage内部构建一个(ActorRef,Any)=>Unit款式的函数,然后用getStageActor(func).ref以一种ActorRef形式返回这个函数:

 /**
   * Initialize a [[StageActorRef]] which can be used to interact with from the outside world "as-if" an [[Actor]].
   * The messages are looped through the [[getAsyncCallback]] mechanism of [[GraphStage]] so they are safe to modify
   * internal state of this stage.
   *
   * This method must (the earliest) be called after the [[GraphStageLogic]] constructor has finished running,
   * for example from the [[preStart]] callback the graph stage logic provides.
   *
   * Created [[StageActorRef]] to get messages and watch other actors in synchronous way.
   *
   * The [[StageActorRef]]‘s lifecycle is bound to the Stage, in other words when the Stage is finished,
   * the Actor will be terminated as well. The entity backing the [[StageActorRef]] is not a real Actor,
   * but the [[GraphStageLogic]] itself, therefore it does not react to [[PoisonPill]].
   *
   * @param receive callback that will be called upon receiving of a message by this special Actor
   * @return minimal actor with watch method
   */
  // FIXME: I don‘t like the Pair allocation :(
  @ApiMayChange
  final protected def getStageActor(receive: ((ActorRef, Any)) ⇒ Unit): StageActor = {
    _stageActor match {
      case null ⇒
        val actorMaterializer = ActorMaterializerHelper.downcast(interpreter.materializer)
        _stageActor = new StageActor(actorMaterializer, getAsyncCallback, receive)
        _stageActor
      case existing ⇒
        existing.become(receive)
        existing
    }
  }
...
    /**
     * The ActorRef by which this StageActor can be contacted from the outside.
     * This is a full-fledged ActorRef that supports watching and being watched
     * as well as location transparent (remote) communication.
     */
    def ref: ActorRef = functionRef

下面是receive:((ActorRef,Any))=>Unit这个函数的实现例子:

      def behavior(m:(ActorRef,Any)): Unit = {
        val (sender, msg) = m

        msg.asInstanceOf[String] match {
          case "Stop" => completeStage()
          case [email protected] _ => extMessage = s
        }
      }

这个函数的输入参数(sender,msg)代表发送消息的Actor和发送的消息。与上个例子一样,作为一个GraphStage的内部函数,它可以使用、更新GraphStage内部状态。GraphStage的实现如下:

class StageAsActor(extActor: ActorRef) extends GraphStage[FlowShape[String,String]] {
  val inport = Inlet[String]("input")
  val outport = Outlet[String]("output")
  val shape = FlowShape.of(inport,outport)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) {
      var extMessage = ""
      override def preStart(): Unit = {
        extActor ! getStageActor(behavior).ref
      }

      def behavior(m:(ActorRef,Any)): Unit = {
        val (sender, msg) = m

        msg.asInstanceOf[String] match {
          case "Stop" => completeStage()
          case [email protected] _ => extMessage = s
        }
      }

      setHandler(inport, new InHandler {
        override def onPush(): Unit =
          if (extMessage.length > 0) {
            push(outport,extMessage)
            extMessage=""
          }
          else
            push(outport, grab(inport))
      })
      setHandler(outport, new OutHandler {
        override def onPull(): Unit = pull(inport)
      })
    }

}

参数extActor就是外部的控制Actor。在creatLogic.preStart()里我们先把StageActor传给extActor。外部系统就可以通过extActor来控制数据流行为:

class Messenger extends Actor with ActorLogging {
  var stageActor: ActorRef = _
  override def receive: Receive = {
    case r: ActorRef =>
      stageActor = r
      log.info("received stage actorRef")
    case s: String => stageActor forward s
      log.info(s"forwarding message:$s")

  }
}
object GetStageActorDemo extends App {
  implicit val sys = ActorSystem("demoSys")
  implicit val ec = sys.dispatcher
  implicit val mat = ActorMaterializer(
    ActorMaterializerSettings(sys)
      .withInputBuffer(initialSize = 16, maxSize = 16)
  )

  val stageActorMessenger = sys.actorOf(Props[Messenger],"forwarder")

  val source = Source(Stream.from(1)).map(_.toString).delay(1.second,DelayOverflowStrategy.backpressure)
  val graph = new StageAsActor(stageActorMessenger)
  val flow = Flow.fromGraph(graph)

  source.via(flow).to(Sink.foreach(println)).run()

   Thread.sleep(2000)
  stageActorMessenger ! "Hello"
  Thread.sleep(1000)
  stageActorMessenger ! "World!"
  Thread.sleep(2000)
  stageActorMessenger ! "Stop"

  scala.io.StdIn.readLine()
  sys.terminate()

}

Messenger就是一个存粹的中介,把控制消息通过StageActor转发给运行中的数据流。

下面是本次示范的源代码:

GetAsyncCallBack.scala

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.stage._
import scala.concurrent.duration._

//external system
object Injector {
  var callback: AsyncCallback[String] = null
   def inject(m: String) = {
     if (callback != null)
     callback.invoke(m)
   }
}
class InjectControl(injector: Injector.type) extends GraphStage[FlowShape[String,String]] {
  val inport = Inlet[String]("input")
  val outport = Outlet[String]("output")
  val shape = FlowShape.of(inport,outport)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) {
      var extMessage = ""
      override def preStart(): Unit = {
        val callback = getAsyncCallback[String] { m =>
          if (m.length > 0)
             m match {
               case "Stop" => completeStage()
               case s: String => extMessage = s
             }

        }
        injector.callback = callback
      }

      setHandler(inport, new InHandler {
        override def onPush(): Unit =
          if (extMessage.length > 0) {
            push(outport,extMessage)
            extMessage=""
          }
        else
          push(outport, grab(inport))
      })
      setHandler(outport, new OutHandler {
        override def onPull(): Unit = pull(inport)
      })
    }

}

object GetAsyncCallbackDemo extends App {
  implicit val sys = ActorSystem("demoSys")
  implicit val ec = sys.dispatcher
  implicit val mat = ActorMaterializer(
    ActorMaterializerSettings(sys)
      .withInputBuffer(initialSize = 16, maxSize = 16)
  )

  val source = Source(Stream.from(1)).map(_.toString).delay(1.second,DelayOverflowStrategy.backpressure)
  val graph = new InjectControl(Injector)
  val flow = Flow.fromGraph(graph)

  source.via(flow).to(Sink.foreach(println)).run()
  Thread.sleep(2000)
  Injector.inject("hello")
  Thread.sleep(2000)
  Injector.inject("world!")
  Thread.sleep(2000)
  Injector.inject("Stop")

  scala.io.StdIn.readLine()
  sys.terminate()

}

GetStageActorDemo.scala

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.stage._
import scala.concurrent.duration._

class StageAsActor(extActor: ActorRef) extends GraphStage[FlowShape[String,String]] {
  val inport = Inlet[String]("input")
  val outport = Outlet[String]("output")
  val shape = FlowShape.of(inport,outport)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) {
      var extMessage = ""
      override def preStart(): Unit = {
        extActor ! getStageActor(behavior).ref
      }

      def behavior(m:(ActorRef,Any)): Unit = {
        val (sender, msg) = m

        msg.asInstanceOf[String] match {
          case "Stop" => completeStage()
          case [email protected] _ => extMessage = s
        }
      }

      setHandler(inport, new InHandler {
        override def onPush(): Unit =
          if (extMessage.length > 0) {
            push(outport,extMessage)
            extMessage=""
          }
          else
            push(outport, grab(inport))
      })
      setHandler(outport, new OutHandler {
        override def onPull(): Unit = pull(inport)
      })
    }

}

class Messenger extends Actor with ActorLogging {
  var stageActor: ActorRef = _
  override def receive: Receive = {
    case r: ActorRef =>
      stageActor = r
      log.info("received stage actorRef")
    case s: String => stageActor forward s
      log.info(s"forwarding message:$s")

  }
}
object GetStageActorDemo extends App {
  implicit val sys = ActorSystem("demoSys")
  implicit val ec = sys.dispatcher
  implicit val mat = ActorMaterializer(
    ActorMaterializerSettings(sys)
      .withInputBuffer(initialSize = 16, maxSize = 16)
  )

  val stageActorMessenger = sys.actorOf(Props[Messenger],"forwarder")

  val source = Source(Stream.from(1)).map(_.toString).delay(1.second,DelayOverflowStrategy.backpressure)
  val graph = new StageAsActor(stageActorMessenger)
  val flow = Flow.fromGraph(graph)

  source.via(flow).to(Sink.foreach(println)).run()

   Thread.sleep(2000)
  stageActorMessenger ! "Hello"
  Thread.sleep(1000)
  stageActorMessenger ! "World!"
  Thread.sleep(2000)
  stageActorMessenger ! "Stop"

  scala.io.StdIn.readLine()
  sys.terminate()

}
时间: 2024-10-16 01:42:21

Akka(24): Stream:从外部系统控制数据流-control live stream from external system的相关文章

泛函编程(13)-无穷数据流-Infinite Stream

上节我们提到Stream和List的主要分别是在于Stream的"延后计算"(lazy evaluation)特性.我们还讨论过在处理大规模排列数据集时,Stream可以一个一个把数据元素搬进内存并且可以逐个元素地进行处理操作.这让我不禁联想到我们常用的数据搜索读取方式了:大量的数据存放在数据库里,就好像无穷的数据源头.我们把数据读取方式(那些数据库读写API函数)嵌入Stream的操作函数内,把数据搜索条件传入Stream构造器(constructor)中形成一个对数据搜索操作的描述

Power control within a coherent multi-processing system

Within a multi-processing system including a plurality of processor cores?4, 6operating in accordance with coherent multi-processing, each of the cores includes a cache memory?10, 12?storing local copies of data values from a coherent memory region.

Access control differentiation in trusted computer system

A trusted computer system that offers Linux? compatibility and supports contemporary hardware speeds. It is designed to require no porting of common applications which run on Linux, to be easy to develop for, and to allow the use of a wide variety of

FunDA(4)- 数据流内容控制:Stream data element control

上节我们探讨了通过scalaz-stream-fs2来驱动一套数据处理流程,用fs2的Pipe类型来实现对数据流的逐行操作.本篇讨论准备在上节讨论的基础上对数据流的流动和元素操作进行优化完善.如数据流动中增加诸如next.skip.eof功能.内容控制中增加对行元素的append.insert.update.remove等操作方法.但是经过一番对fs2的再次解读,发现这些操作模式并不像我所想象那样的方式,实际上用fs2来实现数据行控制可能会更加简单和直接.这是因为与传统数据库行浏览方式不同的是f

数据流中位数 · data stream median

[抄题]: 数字是不断进入数组的,在每次添加一个新的数进入数组的同时返回当前新数组的中位数. [思维问题]: [一句话思路]: 如果maxHeap.peek() > minHeap.peek(),就不断流动,直到顺滑. [输入量]:空: 正常情况:特大:特小:程序里处理到的特殊情况:异常情况(不合法不合理的输入): [画图]: [一刷]: 接口类是Queue<Integer>,指明里面的数据类型 compare类无参数,里面的方法有参数 maxheap也有参数,是cnt,cpr,因为要用

Akka Stream文档翻译:Quick Start Guide: Reactive Tweets

Quick Start Guide: Reactive Tweets 快速入门指南: Reactive Tweets (reactive tweets 大概可以理解为“响应式推文”,在此可以测试下GFW是否还在正常工作 Twitter) A typical use case for stream processing is consuming a live stream of data that we want to extract or aggregate some other data fr

Stream数据流

1.Collection接口的改进 在Iterable接口里面定义有一个简单的输出:default void forEach(Consumer<? super T> action). 也就是说如果要想进行迭代处理,没有必要去强制使用Iterator完成了. 使用Lamda操作forEach()方法和直接采用方法引用 范例: package cn.demo; import java.util.ArrayList; import java.util.List; import java.util.s

Akka(27): Stream:Use case-Connecting Slick-dbStream &amp; Scalaz-stream-fs2

在以前的博文中我们介绍了Slick,它是一种FRM(Functional Relation Mapper).有别于ORM,FRM的特点是函数式的语法可以支持灵活的对象组合(Query Composition)实现大规模的代码重复利用,但同时这些特点又影响了编程人员群体对FRM的接受程度,阻碍了FRM成为广为流行的一种数据库编程方式.所以我们只能从小众心态来探讨如何改善Slick现状,希望通过与某些Stream库集成,在Slick FRM的基础上恢复一些人们熟悉的Recordset数据库光标(cu

泛函编程(12)-数据流-Stream

在前面的章节中我们介绍了List,也讨论了List的数据结构和操作函数.List这个东西从外表看上去挺美,但在现实中使用起来却可能很不实在.为什么?有两方面:其一,我们可以发现所有List的操作都是在内存中进行的,要求List中的所有元素都必须在操作时存在于内存里.如果必须针对大型数据集进行List操作的话就明显不切实际了.其二,List的抽象算法如折叠算法.map, flatMap等是无法中途跳出的,无论如何都一直进行到底:只有通过递归算法在才能在中途停止运算.但递归算法不够抽象,经常出现重复