Akka(18): Stream:组合数据流,组件-Graph components

akka-stream的数据流可以由一些组件组合而成。这些组件统称数据流图Graph,它描述了数据流向和处理环节。Source,Flow,Sink是最基础的Graph。用基础Graph又可以组合更复杂的复合Graph。如果一个Graph的所有端口(输入、输出)都是连接的话就是一个闭合流图RunnableGraph,否则就属于·开放流图PartialGraph。一个完整的(可运算的)数据流就是一个RunnableGraph。Graph的输出出入端口可以用Shape来描述:

/**
 * A Shape describes the inlets and outlets of a [[Graph]]. In keeping with the
 * philosophy that a Graph is a freely reusable blueprint, everything that
 * matters from the outside are the connections that can be made with it,
 * otherwise it is just a black box.
 */
abstract class Shape {
  /**
   * Scala API: get a list of all input ports
   */
  def inlets: immutable.Seq[Inlet[_]]

  /**
   * Scala API: get a list of all output ports
   */
  def outlets: immutable.Seq[Outlet[_]]

...

Shape类型的抽象函数inlets,outlets分别代表Graph形状的输入、输出端口。下面列出了aka-stream提供的几个现有形状Shape:

final case class SourceShape[+T](out: Outlet[T @uncheckedVariance]) extends Shape {...}
final case class FlowShape[-I, +O](in: Inlet[I @uncheckedVariance], out: Outlet[O @uncheckedVariance]) extends Shape {...}
final case class SinkShape[-T](in: Inlet[T @uncheckedVariance]) extends Shape {...}
sealed abstract class ClosedShape extends Shape
/**
 * A bidirectional flow of elements that consequently has two inputs and two
 * outputs, arranged like this:
 *
 * {{{
 *        +------+
 *  In1 ~>|      |~> Out1
 *        | bidi |
 * Out2 <~|      |<~ In2
 *        +------+
 * }}}
 */
final case class BidiShape[-In1, +Out1, -In2, +Out2](
  in1:  Inlet[In1 @uncheckedVariance],
  out1: Outlet[Out1 @uncheckedVariance],
  in2:  Inlet[In2 @uncheckedVariance],
  out2: Outlet[Out2 @uncheckedVariance]) extends Shape {...}
object UniformFanInShape {
  def apply[I, O](outlet: Outlet[O], inlets: Inlet[I]*): UniformFanInShape[I, O] =
    new UniformFanInShape(inlets.size, FanInShape.Ports(outlet, inlets.toList))
}
object UniformFanOutShape {
  def apply[I, O](inlet: Inlet[I], outlets: Outlet[O]*): UniformFanOutShape[I, O] =
    new UniformFanOutShape(outlets.size, FanOutShape.Ports(inlet, outlets.toList))
}

Shape是Graph类型的一个参数:

trait Graph[+S <: Shape, +M] {
  /**
   * Type-level accessor for the shape parameter of this graph.
   */
  type Shape = S @uncheckedVariance
  /**
   * The shape of a graph is all that is externally visible: its inlets and outlets.
   */
  def shape: S
...

RunnableGraph类型的Shape是ClosedShape:

/**
 * Flow with attached input and output, can be executed.
 */
final case class RunnableGraph[+Mat](override val traversalBuilder: TraversalBuilder) extends Graph[ClosedShape, Mat] {
  override def shape = ClosedShape

  /**
   * Transform only the materialized value of this RunnableGraph, leaving all other properties as they were.
   */
  def mapMaterializedValue[Mat2](f: Mat ? Mat2): RunnableGraph[Mat2] =
    copy(traversalBuilder.transformMat(f.asInstanceOf[Any ? Any]))

  /**
   * Run this flow and return the materialized instance from the flow.
   */
  def run()(implicit materializer: Materializer): Mat = materializer.materialize(this)
...

我们可以用akka-stream提供的GraphDSL来构建Graph。GraphDSL继承了GraphApply的create方法,GraphDSL.create(...)就是构建Graph的方法:

object GraphDSL extends GraphApply {...}
trait GraphApply {
  /**
   * Creates a new [[Graph]] by passing a [[GraphDSL.Builder]] to the given create function.
   */
  def create[S <: Shape]()(buildBlock: GraphDSL.Builder[NotUsed] ? S): Graph[S, NotUsed] = {
    val builder = new GraphDSL.Builder
    val s = buildBlock(builder)

    createGraph(s, builder)
  }
...
def create[S <: Shape, Mat](g1: Graph[Shape, Mat])(buildBlock: GraphDSL.Builder[Mat] ? (g1.Shape) ? S): Graph[S, Mat] = {...}
def create[S <: Shape, Mat, M1, M2](g1: Graph[Shape, M1], g2: Graph[Shape, M2])(combineMat: (M1, M2) ? Mat)(buildBlock: GraphDSL.Builder[Mat] ? (g1.Shape, g2.Shape) ? S): Graph[S, Mat] = {...}
...
def create[S <: Shape, Mat, M1, M2, M3, M4, M5](g1: Graph[Shape, M1], g2: Graph[Shape, M2], g3: Graph[Shape, M3], g4: Graph[Shape, M4], g5: Graph[Shape, M5])(combineMat: (M1, M2, M3, M4, M5) ? Mat)(buildBlock: GraphDSL.Builder[Mat] ? (g1.Shape, g2.Shape, g3.Shape, g4.Shape, g5.Shape) ? S): Graph[S, Mat] = {
...}

buildBlock函数类型:buildBlock: GraphDSL.Builder[Mat] ? (g1.Shape, g2.Shape,...,g5.Shape) ? S,g?代表合并处理后的开放型流图。下面是几个最基本的Graph构建试例:

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._

object SimpleGraphs extends App{

  implicit val sys = ActorSystem("streamSys")
  implicit val ec = sys.dispatcher
  implicit val mat = ActorMaterializer()

  val source = Source(1 to 10)
  val flow = Flow[Int].map(_ * 2)
  val sink = Sink.foreach(println)

  val sourceGraph = GraphDSL.create(){implicit builder =>
    import GraphDSL.Implicits._
    val src = source.filter(_ % 2 == 0)
    val pipe = builder.add(Flow[Int])
    src ~> pipe.in
    SourceShape(pipe.out)
  }

  Source.fromGraph(sourceGraph).runWith(sink).andThen{case _ => } // sys.terminate()}

  val flowGraph = GraphDSL.create(){implicit builder =>
    import GraphDSL.Implicits._

    val pipe = builder.add(Flow[Int])
    FlowShape(pipe.in,pipe.out)
  }

  val (_,fut) = Flow.fromGraph(flowGraph).runWith(source,sink)
  fut.andThen{case _ => } //sys.terminate()}

  val sinkGraph = GraphDSL.create(){implicit builder =>
     import GraphDSL.Implicits._
     val pipe = builder.add(Flow[Int])
     pipe.out.map(_ * 3) ~> Sink.foreach(println)
     SinkShape(pipe.in)
  }

  val fut1 = Sink.fromGraph(sinkGraph).runWith(source)

  Thread.sleep(1000)
  sys.terminate()

上面我们示范了Source,Flow,Sink的Graph编写,我们使用了Flow[Int]作为共同基础组件。我们知道:akka-stream的Graph可以用更简单的Partial-Graph来组合,而所有Graph最终都是用基础流图Core-Graph如Source,Flow,Sink组合而成的。上面例子里我们是用builder.add(...)把一个Flow Graph加入到一个空的Graph模版里,builder.add返回Shape pipe用于揭露这个被加入的Graph的输入输出端口。然后我们按目标Graph的功能要求把pipe的端口连接起来就完成了这个数据流图的设计了。测试使用证明这几个Graph的功能符合预想。下面我们还可以试着自定义一种类似的Pipe类型Graph来更细致的了解Graph组合的过程。所有基础组件Core-Graph都必须定义Shape来描述它的输入输出端口,定义GraphStage中的GraphStateLogic来描述对数据流元素具体的读写方式。

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import scala.collection.immutable

case class PipeShape[In,Out](
    in: Inlet[In],
    out: Outlet[Out]) extends Shape {

  override def inlets: immutable.Seq[Inlet[_]] = in :: Nil

  override def outlets: immutable.Seq[Outlet[_]] = out :: Nil

  override def deepCopy(): Shape =
    PipeShape(
      in = in.carbonCopy(),
      out = out.carbonCopy()
    )
}

PipeShape有一个输入端口和一个输出端口。因为继承了Shape类所以必须实现Shape类的抽象函数。假设我们设计一个Graph,能把用户提供的一个函数用来对输入元素进行施用,如:source.via(ApplyPipe(myFunc)).runWith(sink)。当然,我们可以直接使用source.map(r => myFunc).runWith(sink),不过我们需要的是:ApplyPipe里可能涉及到许多预设定的共用功能,然后myFunc是其中的一部分代码。如果用map(...)的话用户就必须提供所有的代码了。ApplyPipe的形状是PipeShape,下面是它的GraphState设计:

  class Pipe[In, Out](f: In => Out) extends GraphStage[PipeShape[In, Out]] {
    val in = Inlet[In]("Pipe.in")
    val out = Outlet[Out]("Pipe.out")

    override def shape = PipeShape(in, out)

    override def initialAttributes: Attributes = Attributes.none

    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
      new GraphStageLogic(shape) with InHandler with OutHandler {

        private def decider =
          inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider)

        override def onPull(): Unit = pull(in)

        override def onPush(): Unit = {
          try {
            push(out, f(grab(in)))
          }
          catch {
            case NonFatal(ex) ? decider(ex) match {
              case Supervision.Stop ? failStage(ex)
              case _ ? pull(in)
            }
          }
        }

        setHandlers(in,out, this)
      }
  }

在这个Pipe GraphStage定义里首先定义了输入输出端口in,out,然后通过createLogic来定义GraphStageLogic,InHandler,outHandler。InHandler和OutHandler分别对应输入输出端口上数据元素的活动处理方式:

/**
 * Collection of callbacks for an input port of a [[GraphStage]]
 */
trait InHandler {
  /**
   * Called when the input port has a new element available. The actual element can be retrieved via the
   * [[GraphStageLogic.grab()]] method.
   */
  @throws(classOf[Exception])
  def onPush(): Unit

  /**
   * Called when the input port is finished. After this callback no other callbacks will be called for this port.
   */
  @throws(classOf[Exception])
  def onUpstreamFinish(): Unit = GraphInterpreter.currentInterpreter.activeStage.completeStage()

  /**
   * Called when the input port has failed. After this callback no other callbacks will be called for this port.
   */
  @throws(classOf[Exception])
  def onUpstreamFailure(ex: Throwable): Unit = GraphInterpreter.currentInterpreter.activeStage.failStage(ex)
}

/**
 * Collection of callbacks for an output port of a [[GraphStage]]
 */
trait OutHandler {
  /**
   * Called when the output port has received a pull, and therefore ready to emit an element, i.e. [[GraphStageLogic.push()]]
   * is now allowed to be called on this port.
   */
  @throws(classOf[Exception])
  def onPull(): Unit

  /**
   * Called when the output port will no longer accept any new elements. After this callback no other callbacks will
   * be called for this port.
   */
  @throws(classOf[Exception])
  def onDownstreamFinish(): Unit = {
    GraphInterpreter
      .currentInterpreter
      .activeStage
      .completeStage()
  }
}

akka-stream Graph的输入输出处理实现了Reactive-Stream协议。所以我们最好使用akka-stream提供现成的pull,push来重写抽象函数onPull,onPush。然后用setHandlers来设定这个GraphStage的输入输出及处理函数handler:

  /**
   * Assign callbacks for linear stage for both [[Inlet]] and [[Outlet]]
   */
  final protected def setHandlers(in: Inlet[_], out: Outlet[_], handler: InHandler with OutHandler): Unit = {
    setHandler(in, handler)
    setHandler(out, handler)
  }
 /**
   * Assigns callbacks for the events for an [[Inlet]]
   */
  final protected def setHandler(in: Inlet[_], handler: InHandler): Unit = {
    handlers(in.id) = handler
    if (_interpreter != null) _interpreter.setHandler(conn(in), handler)
  }
  /**
   * Assigns callbacks for the events for an [[Outlet]]
   */
  final protected def setHandler(out: Outlet[_], handler: OutHandler): Unit = {
    handlers(out.id + inCount) = handler
    if (_interpreter != null) _interpreter.setHandler(conn(out), handler)
  }

有了Shape和GraphStage后我们就可以构建一个Graph:

def applyPipe[In,Out](f: In => Out) = GraphDSL.create() {implicit builder =>
    val pipe = builder.add(new Pipe(f))
    FlowShape(pipe.in,pipe.out)
  }

也可以直接用来组合一个复合Graph:

  RunnableGraph.fromGraph(
    GraphDSL.create(){implicit builder =>
      import GraphDSL.Implicits._

      val source = Source(1 to 10)
      val sink = Sink.foreach(println)
      val f: Int => Int = _ * 3
      val pipeShape = builder.add(new Pipe[Int,Int](f))
      source ~> pipeShape.in
      pipeShape.out~> sink
      ClosedShape

    }
  ).run()

整个例子源代码如下:

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.ActorAttributes._
import akka.stream.stage._

import scala.collection.immutable
import scala.util.control.NonFatal

object PipeOps {

  case class PipeShape[In, Out](
                                 in: Inlet[In],
                                 out: Outlet[Out]) extends Shape {

    override def inlets: immutable.Seq[Inlet[_]] = in :: Nil

    override def outlets: immutable.Seq[Outlet[_]] = out :: Nil

    override def deepCopy(): Shape =
      PipeShape(
        in = in.carbonCopy(),
        out = out.carbonCopy()
      )
  }

  class Pipe[In, Out](f: In => Out) extends GraphStage[PipeShape[In, Out]] {
    val in = Inlet[In]("Pipe.in")
    val out = Outlet[Out]("Pipe.out")

    override def shape = PipeShape(in, out)

    override def initialAttributes: Attributes = Attributes.none

    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
      new GraphStageLogic(shape) with InHandler with OutHandler {

        private def decider =
          inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider)

        override def onPull(): Unit = pull(in)

        override def onPush(): Unit = {
          try {
            push(out, f(grab(in)))
          }
          catch {
            case NonFatal(ex) ? decider(ex) match {
              case Supervision.Stop ? failStage(ex)
              case _ ? pull(in)
            }
          }
        }

        setHandlers(in,out, this)
      }
  }

  def applyPipe[In,Out](f: In => Out) = GraphDSL.create() {implicit builder =>
    val pipe = builder.add(new Pipe(f))
    FlowShape(pipe.in,pipe.out)
  }

}

object ShapeDemo1 extends App {
import PipeOps._
  implicit val sys = ActorSystem("streamSys")
  implicit val ec = sys.dispatcher
  implicit val mat = ActorMaterializer()

  RunnableGraph.fromGraph(
    GraphDSL.create(){implicit builder =>
      import GraphDSL.Implicits._

      val source = Source(1 to 10)
      val sink = Sink.foreach(println)
      val f: Int => Int = _ * 3
      val pipeShape = builder.add(new Pipe[Int,Int](f))
      source ~> pipeShape.in
      pipeShape.out~> sink
      ClosedShape

    }
  ).run()

  val fut = Source(1 to 10).via(applyPipe[Int,Int](_ * 2)).runForeach(println)

  scala.io.StdIn.readLine()

  sys.terminate()

}
时间: 2024-10-07 06:13:28

Akka(18): Stream:组合数据流,组件-Graph components的相关文章

Akka(17): Stream:数据流基础组件-Source,Flow,Sink简介

在大数据程序流行的今天,许多程序都面临着共同的难题:程序输入数据趋于无限大,抵达时间又不确定.一般的解决方法是采用回调函数(callback-function)来实现的,但这样的解决方案很容易造成“回调地狱(callback hell)”,即所谓的“goto-hell”:程序控制跳来跳去很难跟踪,特别是一些变量如果在回调函数中更改后产生不可预料的结果.数据流(stream)是一种解决问题的有效编程方式.Stream是一个抽象概念,能把程序数据输入过程和其它细节隐蔽起来,通过申明方式把数据处理过程

【ExtJS 4.x学习教程】(4)组件(Components)

作者:周邦涛(Timen) Email:[email protected] 转载请注明出处:  http://blog.csdn.net/zhoubangtao/article/details/27366477 1. 简介 一个Ext JS 应用的UI是由一个或多个叫做组件(Component)的小部件组成的.所有的组件都是Ext.Component的子类,Ext.Component可以使其子类参与自动化的声明周期管理,包括初始化.渲染.调整大小及位置和销毁.Ext JS提供了大量的直接可用的组

Java Swing界面编程(18)---单行文本输入组件:JTextField

以下的程序与上一例有一点区别,仔细对比不难发现其中的不同之处. package com.beyole.util; import javax.swing.JFrame; import javax.swing.JLabel; import javax.swing.JTextField; public class test17 { public static void main(String[] args) { JFrame frame = new JFrame("Crystal"); JT

Vue 中数据流组件

好久不见呀,这两年写了很多很多东西,也学到很多很多东西,没有时常分享是因为大多都是我独自思考.明年我想出去与更多的大神交流,再修筑自己构建的内容. 有时候我会想:我们遇到的问题,碰到的界限,是别人给的还是我们自己给的?其实我觉得自己选择的方向是对的,但是有时难免会怀疑,是否有人和我在做一样的事情,我希望找到有趣的伙伴,做一些有趣的事情. Vue 中数据流组件 又将年终了,该做年终总结了呀..最近花了一些时间,升级了一下我们的技术架构,使用 vue 作为我们的框架.延续传统,我们仍然需要开发一些

2015/9/22 Python基础(18):组合、派生和继承

一个类被定义后,目标就是把它当成一个模块来使用,并把这些对象嵌入到你的代码中去,同其他数据类型及逻辑执行流混合使用.有两种方法可以在你的代码中利用类.第一种是组合,就是让不同的类混合并加入到其他类中,来增强功能和代码重用性.你可以在一个大点的类中创建你自己的类的实例,实现一些其他属性和方法来增强原来的类对象.另一种是派生,通过子类从基类继承核心属性,不断地派生扩展功能实现. 组合举例来说,我们想对之前做过的地址本类作加强性设计.如果在设计的过程中,为names.addresses等创建了单独的类

聊聊React高阶组件(Higher-Order Components)

使用 react已经有不短的时间了,最近看到关于 react高阶组件的一篇文章,看了之后顿时眼前一亮,对于我这种还在新手村晃荡.一切朝着打怪升级看齐的小喽啰来说,像这种难度不是太高同时门槛也不是那么低的东西如今可不多见了啊,是个不可多得的 zhuangbility的利器,自然不可轻易错过,遂深入了解了一番. 概述 高阶组件的定义 React 官网上对高阶组件的定义: 高阶部件是一种用于复用组件逻辑的高级技术,它并不是 React API的一部分,而是从React 演化而来的一种模式. 具体地说,

Autofac怎样注册组件(Registering Components)

在上篇Autofac基本使用步骤中,我们知道Autofac通过创建一个ContainerBuilder对象来注册组件,这个builder是通过服务来暴露组件,后续我们只需要知道服务即可,得到具体组件实例. 在Autofac中的术语请点击此链接查看==>术语 什么是组件(components)? 分三类: 1. reflection(.NET 类型, 或者泛型) 2.现成的实例.(我们创建的类型的实例) 3.Lambda表达式(执行对象实例化的匿名函数) Auotfac 为ContainerBui

小白学Python(18)——pyecharts 关系图 Graph

Graph-基本示例 1 import json 2 import os 3 4 from pyecharts import options as opts 5 from pyecharts.charts import Graph, Page 6 7 8 nodes = [ 9 {"name": "结点1", "symbolSize": 10}, 10 {"name": "结点2", "symbo

关于MUI v0.18.0版本 Table组件里的复选框不能选的解决方案

前段时间在用MUI的时候,Table组件出现复选框不能选的bug(描述: 点击复选框,点击事件会触发,复选框勾选状态无变化). 解决方法: 用CheckBox组件代替Table组件自带的复选框. 解决思路: 1.将CheckBox分为两种,一种是表头里的全选框(以下称全选框),一种是列表行里普通的复选框(以下称普通框): 2.将普通框进行单独封装(原因: 1.便于单个普通框自己管理自己的勾选状态,2.当全选框的勾选状态发生变化时,可以通过props将全选框的状态赋给它,从而实现全选的功能): 关