Akka Stream之Graph

最近在项目中需要实现图的一些操作,因此,初步考虑使用Akka Stream的Graph实现。从而学习了下:

一、介绍

我们知道在Akka Stream中有三种简单的线性数据流操作:Source/Flow/Sink。但是当我们需要使用一些复杂的操作,例如扇入和扇出时,可能就需要使用图相关的流操作了。因此,我们可以这样认为,Akka Stream的Graph是一种运算方案,他可能是简单的线性数据流,也可以由基础的流图组合而成的复杂的数据流程。因为Graph只是对数据流运算的简单描述,所以它是可以重复利用的。

二、依赖

要使用Akka Stream的Graph,我们需要添加下面的依赖:

<dependency>
  <groupId>com.typesafe.akka</groupId>
  <artifactId>akka-stream_2.12</artifactId>
  <version>2.5.18</version>
</dependency>

三、构建Graph

Graph是由简单的Flow组成的,这些Flow用作图形中的线性连接以及用作Flow的扇入和扇出点的连接点。Akka Stream目前提供了下面这些连接点:

1、扇出:

(1)Broadcast[T]:(1输入,N输出)给定输入元件发射到每个输出

(2)Balance[T]:(1输入,N输出)给定输入元件发射到其输出端口之一

(3)UnzipWith[In,A,B,...]:(1个输入,N个输出)采用1个输入的函数,给定每个输入的值发出N个输出元素(其中N <= 20)

(4)UnZip[A,B]:(1个输入,2个输出)将元组流(A,B)拆分为两个流,一个是类型A,另一个是类型B

2、扇入:

(1)Merge[In]:(N个输入,1个输出)从输入中随机选取将它们逐个推入其输出

(2)MergePreferred[In]Merge但是如果元素在最受欢迎的端口上可用,它会从中选择,否则从中随机从其他端口上选

(3)MergePrioritized[In]Merge但是如果元素在所有输入端口上都可用,它会根据它们的优先级随机选择它们

(4)MergeLatest[In]:(N个输入,1个输出)发出List[In],当第i个输入流发出元素时,发出的列表中的第i个元素被更新

(5)ZipWith[A,B,...,Out]:(N个输入,1个输出),其取N个输入的函数,给出每个输入的值,发出1个输出元素

(6)Zip[A,B]:(2个输入,1个输出)是一个ZipWith专用于压缩和解的输入流AB成元组流(A,B)

(7)Concat[A]:(2个输入,1个输出)连接两个流(首先消耗一个,然后消耗第二个)

四、例子 

现在假设我们需要实现如下图所示的一个Graph

我们可以用akka-stream提供的GraphDSL来构建Graph。GraphDSL继承了GraphApply的create方法,GraphDSL.create(...)就是构建Graph的方法,因此,我们可以使用如下代码创建上图所示的Graph:

val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
  import GraphDSL.Implicits._
  val in = Source(1 to 10)
  val out = Sink.ignore

  val bcast = builder.add(Broadcast[Int](2))
  val merge = builder.add(Merge[Int](2))

  val f1, f2, f3, f4 = Flow[Int].map(_ + 10)

  in ~> f1 ~> bcast ~> f2 ~> merge ~> f3 ~> out
  bcast ~> f4 ~> merge
  ClosedShape
})

注意:在这个里面我们需要引入import GraphDSL.Implicits._。是为了将~>(读作边缘,通过或者到),以及他的相反操作<~引入到代码的范围内。

原文地址:https://www.cnblogs.com/junjiang3/p/10016003.html

时间: 2024-10-16 00:50:30

Akka Stream之Graph的相关文章

Akka Stream文档翻译:Quick Start Guide: Reactive Tweets

Quick Start Guide: Reactive Tweets 快速入门指南: Reactive Tweets (reactive tweets 大概可以理解为“响应式推文”,在此可以测试下GFW是否还在正常工作 Twitter) A typical use case for stream processing is consuming a live stream of data that we want to extract or aggregate some other data fr

Akka Stream文档翻译:Motivation

想搞下Akka HTTP,用于用于我们的开源项目football,发现Akka HTTP的例子里大量用到Akka Stream,而且Akka HTTP是构建Akka Stream之上的. 所以还是先看下Akka Stream吧,先了解下这个Akka Stream这个项目的动机. 动机 Motivation The way we consume services from the internet today includes many instances of streaming data, b

Akka(18): Stream:组合数据流,组件-Graph components

akka-stream的数据流可以由一些组件组合而成.这些组件统称数据流图Graph,它描述了数据流向和处理环节.Source,Flow,Sink是最基础的Graph.用基础Graph又可以组合更复杂的复合Graph.如果一个Graph的所有端口(输入.输出)都是连接的话就是一个闭合流图RunnableGraph,否则就属于·开放流图PartialGraph.一个完整的(可运算的)数据流就是一个RunnableGraph.Graph的输出出入端口可以用Shape来描述: /** * A Shap

Akka(21): Stream:实时操控:人为中断-KillSwitch

akka-stream是多线程non-blocking模式的,一般来说,运算任务提交到另外线程后这个线程就会在当前程序控制之外自由运行了.任何时候如果需要终止运行中的数据流就必须采用一种任务柄(handler)方式来控制在其它线程内运行的任务.这个handler可以在提交运算任务时获取.akka-stream提供了KillSwitch trait来支持这项功能: /** * A [[KillSwitch]] allows completion of [[Graph]]s from the out

Akka(22): Stream:实时操控:动态管道连接-MergeHub,BroadcastHub and PartitionHub

在现实中我们会经常遇到这样的场景:有一个固定的数据源Source,我们希望按照程序运行状态来接驳任意数量的下游接收方subscriber.又或者我需要在程序运行时(runtime)把多个数据流向某个固定的数据流终端Sink推送.这就涉及到动态连接合并型Merge或扩散型Broadcast的数据流连接点junction.从akka-stream的技术文档得知:一对多,多对一或多对多类型的复杂数据流组件必须用GraphDSL来设计,产生Graph类型结果.前面我们提到过:Graph就是一种运算预案,

Akka(26): Stream:异常处理-Exception handling

akka-stream是基于Actor模式的,所以也继承了Actor模式的“坚韧性(resilient)”特点,在任何异常情况下都有某种整体统一的异常处理策略和具体实施方式.在akka-stream的官方文件中都有详细的说明和示范例子.我们在这篇讨论里也没有什么更好的想法和范例,也只能略做一些字面翻译和分析理解的事了.下面列出了akka-stream处理异常的一些实用方法: 1.recover:这是一个函数,发出数据流最后一个元素然后根据上游发生的异常终止当前数据流 2.recoverWithR

Akka(24): Stream:从外部系统控制数据流-control live stream from external system

在数据流应用的现实场景中常常会遇到与外界系统对接的需求.这些外部系统可能是Actor系统又或者是一些其它类型的系统.与这些外界系统对接的意思是在另一个线程中运行的数据流可以接收外部系统推送的事件及做出行为改变的响应. 如果一个外界系统需要控制一个运行中数据流的功能环节GraphStage,首先必须在这个GraphStage内部构建一个控制函数,这样才能接触并更改GraphStage的内部状态.外部系统可以通过调用这个控制函数来向GraphStage发送信息,控制GraphStage行为.akka

Akka(23): Stream:自定义流构件功能-Custom defined stream processing stages

从总体上看:akka-stream是由数据源头Source,流通节点Flow和数据流终点Sink三个框架性的流构件(stream components)组成的.这其中:Source和Sink是stream的两个独立端点,而Flow处于stream Source和Sink中间可能由多个通道式的节点组成,每个节点代表某些数据流元素转化处理功能,它们的链接顺序则可能代表整体作业的流程.一个完整的数据流(可运行数据流)必须是一个闭合的数据流,即:从外表上看,数据流两头必须连接一个Source和一个Sin

Akka(17): Stream:数据流基础组件-Source,Flow,Sink简介

在大数据程序流行的今天,许多程序都面临着共同的难题:程序输入数据趋于无限大,抵达时间又不确定.一般的解决方法是采用回调函数(callback-function)来实现的,但这样的解决方案很容易造成“回调地狱(callback hell)”,即所谓的“goto-hell”:程序控制跳来跳去很难跟踪,特别是一些变量如果在回调函数中更改后产生不可预料的结果.数据流(stream)是一种解决问题的有效编程方式.Stream是一个抽象概念,能把程序数据输入过程和其它细节隐蔽起来,通过申明方式把数据处理过程