Flink 环境部署

Flink 原理与实现:架构和拓扑概览

架构

要了解一个系统,一般都是从架构开始。我们关心的问题是:系统部署成功后各个节点都启动了哪些服务,各个服务之间又是怎么交互和协调的。下方是 Flink 集群启动后架构图。

当 Flink 集群启动后,首先会启动一个 JobManger 和一个或多个的 TaskManager。由 Client 提交任务给 JobManager,JobManager 再调度任务到各个 TaskManager 去执行,然后 TaskManager 将心跳和统计信息汇报给 JobManager。TaskManager 之间以流的形式进行数据的传输。上述三者均为独立的 JVM 进程。

  • Client 为提交 Job 的客户端,可以是运行在任何机器上(与 JobManager 环境连通即可)。提交 Job 后,Client 可以结束进程(Streaming的任务),也可以不结束并等待结果返回。
  • JobManager 主要负责调度 Job 并协调 Task 做 checkpoint,职责上很像 Storm 的 Nimbus。从 Client 处接收到 Job 和 JAR 包等资源后,会生成优化后的执行计划,并以 Task 的单元调度到各个 TaskManager 去执行。
  • TaskManager 在启动的时候就设置好了槽位数(Slot),每个 slot 能启动一个 Task,Task 为线程。从 JobManager 处接收需要部署的 Task,部署启动后,与自己的上游建立 Netty 连接,接收数据并处理。

可以看到 Flink 的任务调度是多线程模型,并且不同Job/Task混合在一个 TaskManager 进程中。虽然这种方式可以有效提高 CPU 利用率,但是个人不太喜欢这种设计,因为不仅缺乏资源隔离机制,同时也不方便调试。类似 Storm 的进程模型,一个JVM 中只跑该 Job 的 Tasks 实际应用中更为合理。

Job 例子

本文所示例子为 flink-1.0.x 版本

我们使用 Flink 自带的 examples 包中的 SocketTextStreamWordCount,这是一个从 socket 流中统计单词出现次数的例子。

  • 首先,使用 netcat 启动本地服务器:

    $ nc -l 9000
  • 然后提交 Flink 程序
    $ bin/flink run examples/streaming/SocketTextStreamWordCount.jar \--hostname 10.218.130.9 \--port 9000

在netcat端输入单词并监控 taskmanager 的输出可以看到单词统计的结果。

SocketTextStreamWordCount 的具体代码如下:

public static void main(String[] args) throws Exception {// 检查输入final ParameterTool params = ParameterTool.fromArgs(args);...// set up the execution environmentfinal StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// get input dataDataStream<String> text =env.socketTextStream(params.get("hostname"), params.getInt("port"), ‘\n‘, 0);DataStream<Tuple2<String, Integer>> counts =// split up the lines in pairs (2-tuples) containing: (word,1)text.flatMap(new Tokenizer())// group by the tuple field "0" and sum up tuple field "1".keyBy(0).sum(1);counts.print();// execute programenv.execute("WordCount from SocketTextStream Example");}

我们将最后一行代码 env.execute 替换成 System.out.println(env.getExecutionPlan()); 并在本地运行该代码(并发度设为2),可以得到该拓扑的逻辑执行计划图的 JSON 串,将该 JSON 串粘贴到http://flink.apache.org/visualizer/ 中,能可视化该执行图。

但这并不是最终在 Flink 中运行的执行图,只是一个表示拓扑节点关系的计划图,在 Flink 中对应了 SteramGraph。另外,提交拓扑后(并发度设为2)还能在 UI 中看到另一张执行计划图,如下所示,该图对应了 Flink 中的 JobGraph。

Graph

看起来有点乱,怎么有这么多不一样的图。实际上,还有更多的图。Flink 中的执行图可以分成四层:StreamGraph -> JobGraph -> ExecutionGraph -> 物理执行图。

  • StreamGraph:是根据用户通过 Stream API 编写的代码生成的最初的图。用来表示程序的拓扑结构。
  • JobGraph:StreamGraph经过优化后生成了 JobGraph,提交给 JobManager 的数据结构。主要的优化为,将多个符合条件的节点 chain 在一起作为一个节点,这样可以减少数据在节点之间流动所需要的序列化/反序列化/传输消耗。
  • ExecutionGraph:JobManager 根据 JobGraph 生成ExecutionGraph。ExecutionGraph是JobGraph的并行化版本,是调度层最核心的数据结构。
  • 物理执行图:JobManager 根据 ExecutionGraph 对 Job 进行调度后,在各个TaskManager 上部署 Task 后形成的“图”,并不是一个具体的数据结构。

例如上文中的2个并发度(Source为1个并发度)的 SocketTextStreamWordCount 四层执行图的演变过程如下图所示(点击查看大图):

这里对一些名词进行简单的解释。

  • StreamGraph:根据用户通过 Stream API 编写的代码生成的最初的图。

    • StreamNode:用来代表 operator 的类,并具有所有相关的属性,如并发度、入边和出边等。
    • StreamEdge:表示连接两个StreamNode的边。
  • JobGraph:StreamGraph经过优化后生成了 JobGraph,提交给 JobManager 的数据结构。
    • JobVertex:经过优化后符合条件的多个StreamNode可能会chain在一起生成一个JobVertex,即一个JobVertex包含一个或多个operator,JobVertex的输入是JobEdge,输出是IntermediateDataSet。
    • IntermediateDataSet:表示JobVertex的输出,即经过operator处理产生的数据集。producer是JobVertex,consumer是JobEdge。
    • JobEdge:代表了job graph中的一条数据传输通道。source 是 IntermediateDataSet,target 是 JobVertex。即数据通过JobEdge由IntermediateDataSet传递给目标JobVertex。
  • ExecutionGraph:JobManager 根据 JobGraph 生成ExecutionGraph。ExecutionGraph是JobGraph的并行化版本,是调度层最核心的数据结构。
    • ExecutionJobVertex:和JobGraph中的JobVertex一一对应。每一个ExecutionJobVertex都有和并发度一样多的 ExecutionVertex。
    • ExecutionVertex:表示ExecutionJobVertex的其中一个并发子任务,输入是ExecutionEdge,输出是IntermediateResultPartition。
    • IntermediateResult:和JobGraph中的IntermediateDataSet一一对应。一个IntermediateResult包含多个IntermediateResultPartition,其个数等于该operator的并发度。
    • IntermediateResultPartition:表示ExecutionVertex的一个输出分区,producer是ExecutionVertex,consumer是若干个ExecutionEdge。
    • ExecutionEdge:表示ExecutionVertex的输入,source是IntermediateResultPartition,target是ExecutionVertex。source和target都只能是一个。
    • Execution:是执行一个 ExecutionVertex 的一次尝试。当发生故障或者数据需要重算的情况下 ExecutionVertex 可能会有多个 ExecutionAttemptID。一个 Execution 通过 ExecutionAttemptID 来唯一标识。JM和TM之间关于 task 的部署和 task status 的更新都是通过 ExecutionAttemptID 来确定消息接受者。
  • 物理执行图:JobManager 根据 ExecutionGraph 对 Job 进行调度后,在各个TaskManager 上部署 Task 后形成的“图”,并不是一个具体的数据结构。
    • Task:Execution被调度后在分配的 TaskManager 中启动对应的 Task。Task 包裹了具有用户执行逻辑的 operator。
    • ResultPartition:代表由一个Task的生成的数据,和ExecutionGraph中的IntermediateResultPartition一一对应。
    • ResultSubpartition:是ResultPartition的一个子分区。每个ResultPartition包含多个ResultSubpartition,其数目要由下游消费 Task 数和 DistributionPattern 来决定。
    • InputGate:代表Task的输入封装,和JobGraph中JobEdge一一对应。每个InputGate消费了一个或多个的ResultPartition。
    • InputChannel:每个InputGate会包含一个以上的InputChannel,和ExecutionGraph中的ExecutionEdge一一对应,也和ResultSubpartition一对一地相连,即一个InputChannel接收一个ResultSubpartition的输出。

那么 Flink 为什么要设计这4张图呢,其目的是什么呢?Spark 中也有多张图,数据依赖图以及物理执行的DAG。其目的都是一样的,就是解耦,每张图各司其职,每张图对应了 Job 不同的阶段,更方便做该阶段的事情。我们给出更完整的 Flink Graph 的层次图。

首先我们看到,JobGraph 之上除了 StreamGraph 还有 OptimizedPlan。OptimizedPlan 是由 Batch API 转换而来的。StreamGraph 是由 Stream API 转换而来的。为什么 API 不直接转换成 JobGraph?因为,Batch 和 Stream 的图结构和优化方法有很大的区别,比如 Batch 有很多执行前的预分析用来优化图的执行,而这种优化并不普适于 Stream,所以通过 OptimizedPlan 来做 Batch 的优化会更方便和清晰,也不会影响 Stream。JobGraph 的责任就是统一 Batch 和 Stream 的图,用来描述清楚一个拓扑图的结构,并且做了 chaining 的优化,chaining 是普适于 Batch 和 Stream 的,所以在这一层做掉。ExecutionGraph 的责任是方便调度和各个 tasks 状态的监控和跟踪,所以 ExecutionGraph 是并行化的 JobGraph。而“物理执行图”就是最终分布式在各个机器上运行着的tasks了。所以可以看到,这种解耦方式极大地方便了我们在各个层所做的工作,各个层之间是相互隔离的。

后续的文章,将会详细介绍 Flink 是如何生成这些执行图的。由于我目前关注 Flink 的流处理功能,所以主要有以下内容:

  1. 如何生成 StreamGraph
  2. 如何生成 JobGraph
  3. 如何生成 ExecutionGraph
  4. 如何进行调度(如何生成物理执行图)

Flink官方文档翻译:安装部署(本地模式)

本文主要介绍如何将Flink以本地模式运行在单机上。

下载

进入下载页面。如果你想让Flink与Hadoop进行交互(如HDFS或者HBase),请选择一个与你的Hadoop版本相匹配的Flink包。当你不确定或者只是想运行在本地文件系统上,请选择Hadoop 1.2.x对应的包。

环境准备

Flink 可以运行在 Linux、Mac OS X 和 Windows 上。本地模式的安装唯一需要的只是 Java 1.7.x或更高版本。接下来的指南假定是类Unix环境,Windows用户请参考 Flink on Windows。

你可以执行下面的命令来查看是否已经正确安装了Java了。


java -version

这条命令会输出类似于下面的信息:


java version "1.8.0_51"

Java(TM) SE Runtime Environment (build 1.8.0_51-b16)

Java HotSpot(TM) 64-Bit Server VM (build 25.51-b03, mixed mode)

配置

对于本地模式,Flink是可以开箱即用的,你不用去更改任何的默认配置。

开箱即用的配置会使用默认的Java环境。如果你想更改Java的运行环境,你可以手动地设置环境变量JAVA_HOME或者conf/flink-conf.yaml中的配置项env.java.home。你可以查阅配置页面了解更多关于Flink的配置。

启动Flink

你现在就可以开始运行Flink了。解压已经下载的压缩包,然后进入新创建的flink目录。在那里,你就可以本地模式运行Flink了:


$ tar xzf flink-*.tgz

$ cd flink-*

$ bin/start-local.sh

Starting job manager

你可以通过观察logs目录下的日志文件来检查系统是否正在运行了:


$ tail log/flink-*-jobmanager-*.log

INFO ... - Initializing memory manager with 409 megabytes of memory

INFO ... - Trying to load org.apache.flinknephele.jobmanager.scheduler.local.LocalScheduler as scheduler

INFO ... - Setting up web info server, using web-root directory ...

INFO ... - Web info server will display information about nephele job-manager on localhost, port 8081.

INFO ... - Starting web info server for JobManager on port 8081

JobManager 同时会在8081端口上启动一个web前端,你可以通过 http://localhost:8081 来访问。

在windows上运行

Flink on Windows

如果你想要在 Windows 上运行 Flink,你需要如上文所述地下载、解压、配置 Flink 压缩包。之后,你可以使用使用 Windows 批处理文件(.bat文件)或者使用 Cygwin 运行 Flink 的 JobMnager。

使用 Windows 批处理文件启动

使用 Windows 批处理文件本地模式启动Flink,首先打开命令行窗口,进入 Flink 的 bin/ 目录,然后运行 start-local.bat 。

注意:Java运行环境必须已经加到了 Windows 的%PATH%环境变量中。按照本指南添加 Java 到%PATH%环境变量中。


$ cd flink

$ cd bin

$ start-local.bat

Starting Flink job manager. Webinterface by default on http://localhost:8081/.

Do not close this batch window. Stop job manager by pressing Ctrl+C.

之后,你需要打开新的命令行窗口,并运行flink.bat。

使用 Cygwin 和 Unix 脚本启动

使用 Cygwin 你需要打开 Cygwin 的命令行,进入 Flink 目录,然后运行start-local.sh脚本:


$ cd flink

$ bin/start-local.sh

Starting Nephele job manager

从 Git 安装 Flink

如果你是从 git 安装的 Flink,而且使用的 Windows git shell,Cygwin会产生一个类似于下面的错误:


c:/flink/bin/start-local.sh: line 30: $‘\r‘: command not found

这个错误的产生是因为 git 运行在 Windows 上时,会自动地将 UNIX 换行转换成 Windows 换行。问题是,Cygwin 只认 Unix 换行。解决方案是调整 Cygwin 配置来正确处理换行。步骤如下:

1. 打开 Cygwin 命令行

2. 确定 home 目录,通过输入

3. cd ;pwd

它会返回 Cygwin 根目录下的一个路径。

在home目录下,使用 NotePad, WordPad 或者其他编辑器打开.bash_profile文件,然后添加如下内容到文件末尾:(如果文件不存在,你需要创建它)

1. Export SHELLOPTS

2. Set  -o  igncr

保存文件,然后打开一个新的bash窗口。

时间: 2024-10-13 13:27:50

Flink 环境部署的相关文章

【原创】大数据基础之Airflow(2)生产环境部署airflow研究

一 官方 airflow官方分布式部署结构图 airflow进程 webserver scheduler flower(非必须) worker airflow缺点 scheduler单点 通过在scheduler的dags目录变动dag文件来提交流程 官方分布式部署方案 多个webserver 多个worker CeleryExecutor(依赖redis或rabbitmq) MesosExecutor(依赖mesos) 第三方开源方案ASFC 针对scheduler单点问题,有第三方方案:ht

wamp2.4+composer+rabbitmq环境部署-176

version wamp 2.4 1.打开openssl 分别更改php.ini的文件配置 E:\wamp\bin\apache\Apache2.4.4\bin\php.ini E:\wamp\bin\apache\Apache2.4.4\bin\php.ini php.ini的功能打开 extension=php_openssl.dll 2.Composer下载 https://getcomposer.org/Composer-Setup.exe 下载完成直接点下一步图型安装-- 3.php-

window7下 cocos2dx android交叉编译环境部署小结

上周被android交叉编译搞惨了,还好最后弄好了,写个小结以后备用吧. 步骤,1.下载cygwin的devel和shells模块 2. 2.设置环境变量 a.设置NDK_ROOT b.设置Path 编辑Path,在后面追加4个环境变量,以;号隔开(别忘了给前面的加上;),4个分别是android的sdk的platform-tools,android的sdk的tools,   cygwin的bin,上面添加的NDK_ROOT C:\Work\adt-bundle-windows-x86_64-2

生产环境部署squid服务

网络拓扑 该实验使用虚拟机模拟搭建,准备开启2台虚拟机,客户端用本记代替. 1.Squid服务器使用2块网卡.如下图显示: 2.Web服务器与squid服务器配置在同一个网段即可.如下图显示: 3.客户端IP与squid服务器网卡1是在同一网段上即可.如下图显示: 4.查看squid服务器上的eth0和eth1网卡ip地址 5.查看web服务器的ip地址 6.先测试客户端能否ping通squid服务器的网卡1. 7.测试客户端能否ping通web服务器,ping不通正常. 8.安装squid软件

php+mysql+nginx在linux上的环境部署

一直在linux上鼓捣,还没有完整在linux下配置过nginx服务器环境部署呢,这几天没什么事就部署了一下,遇到的问题也很多,现在把我的环境部署文档发出了,有什么问题大家可以一起讨论一下,希望大家采用后遇到问题多多沟通. || 安装所需各种依赖包 sudo -s LANG=C yum -y install gcc gcc-c++ autoconf libjpeg libjpeg-devel libpng libpng-devel freetype freetype-devel libxml2

微信公众号开发系列教程一(调试环境部署)

原文:微信公众号开发系列教程一(调试环境部署) 目录 C#微信公众号开发系列教程一(调试环境部署) C#微信公众号开发系列教程一(调试环境部署续:vs远程调试) C#微信公众号开发系列教程二(新手接入指南) 微信公众号火了好一阵子了,笔者算是比较早接触微信公众号开发的了,大概做了一年半了,从最开始的到处网上找demo到现在也开发一些公众号.园子里关于微信开发的教程已经数不胜数了,我也准备来凑凑热闹.一是梳理下这段时间开发的经验,二是希望能帮到想做微信开发的小伙伴们,希望大大神们吐槽的时候悠着点,

python 环境部署:

pre { direction: ltr; color: rgb(0, 0, 10); text-align: left } pre.western { font-family: "Liberation Mono", serif } pre.cjk { font-family: "Droid Sans" } pre.ctl { font-family: "Liberation Mono" } td p { margin-bottom: 0cm;

ThinkPHP搭建商城一,环境部署

一:配置虚拟主机 我使用的是Mac笔记本,所以以mac为例 1,首先找到Apache虚拟主机配置目录etc/apache2/extra/httpd-vhosts.conf或者private/etc/apache2/extra/httpd-vhosts.conf 2,打开之后添加以下语句,其中www.cshop.com是虚拟主机的域名,/myphp/cshop目录是项目的根目录,这个目录要自己创建 <VirtualHost *:80> ServerName www.cshop.com Docum

(二 )VMware workstation 部署虚拟集群实践——并行批量操作环境部署

在上一篇博客中,已经介绍了安装虚拟集群的过程和需要注意的细节问题. 这篇主要是介绍如何批量登陆远程主机和配置,这个过程中是在没有部署并行处理工具或者集群管理工具的前进行的. ------------首次登陆-------------- 首次登陆需要解决的问题就是: 1,信任远程主机公钥的问题,也就是key_word:yes/no? 2,然后就是远程主机的密码,key_word:password: 在自动化部署过程中,需要进行免交互和免密码登陆. 1,使用expect编写免交互登陆脚本(适用于te