Flink中TaskManager端执行用户逻辑过程(源码分析)

TaskManager接收到来自JobManager的jobGraph转换得到的TDD对象,启动了任务,在StreamInputProcessor类的processInput()方法中

通过一个while(true)中不停的拉取上游的数据,然后调用streamOperator.processElement(record)调用用户实现的方法去处理数据拉取的数据

首先先来看下这个operator对象

然后看看OneInputStreamOperator类的UML

这里所有的实现类没有全部列出,只列了一些代表

看到这里,写过Flink的streamAPI的同学,肯定感觉到很熟悉!!!!!!

这里!不就是我们常写flink代码的那些算子嘛

对没有错,我们程序中实现的那些算子逻辑,最后都会被封装成一个OneInputStreamOperator,这里具体看一个最熟悉的Fliter

来看一下StreamFilter的processElement方法

!!!这里传入一个数据后,这个userFunction调用了filter方法并且把数据放进去了

当返回true通过这个output.collect发送出去了

这不就对应了我们用户自己实现的filter算子嘛,没错这个方法其实就是客户端的filter方法,这个userFunction包含了用户实现filter算子的逻辑

(!!!!!就是说这个processElement方法会调用用户的逻辑)

(所以这个userFunction可以带上client的方法实现,这对我们很重要,特别是对flink源码修改,为clientApi添加新功能方法,运行时可以通过这里拿到)

继续

来看看这个output.collect()方法

然后

看到这个,等等等等

我不是从这个processElement()方法进来的吗,怎么又开始调processElement()方法了

难道递归了? 不对不对

这里operator不是上一个operator了,而是这个output对象的(这里是chainOutPut)

看下这个output对象

看下UML类图,也是只列举了重要的

先看chainingOutPut的属性

发现了又出现了OneInputStreamOperator对象

看到这个实现类的名字!chain联想起了什么

Flink会将可以chain在一起的算子在streamGraph转换成jobGraph的时候根据条件chain在一起

一惊!

来分别看一下ChainingOutPut和RecordWriterOutput的collect()方法有什么区别

在chain中

在RecordWriter中

这里chain的ouput,又继续调用了下一个operator的processElement方法,然后又在processElement方法中又调用output.collect( ),collect中又调用了下一个operator的processElement方法

整个过程就是个无限的循环,直到,某一个operator的ouput不为ChainingOutPut,当变为RecordWriterOutput时

上面看到RecordWriterOutput的processElement直接emit发送出去了这个数据,再也没有继续调用processElement方法了

这里也就对应了,flink中的责任链,chain在一起的算子会一个接着一个执行,直到无法chain,就会往下游发送emit了

来看一下UML类图帮助理解

里中有我,我中有你,一直相互调用直到无法chain,然后emit往下游发送(这里肯定就有发送端的反压逻辑,以后随缘更新)

那这里的循环调用理解了就会想,那如何确定第一个operator调用,然后进入整个调用链呢

回到TaskManager接收到JobManager的TDD以后初始化整个任务的时候

StreamTask.java中invoke方法中

先是初始化了一个OperatorChain,里面其实就是一个数组StreamOperator

在他初始化的时候,其实就是为我们所有的streamOutputs设置了他的output以及会根据jobManager发送过来的TDD(包含信息)

设置成对应的ChainingOutPut还是RecordWriterOutput,chainOutput会设置他的的operator

然后获取了getHeadOperator()其实就是获取了他调用连中的第一个

然后在

将这个第一个operator关联到了inputProcessor对象里面

后面就简单了在inputProcessor.processInput中就进入了while(true)循环拉取上游数据的逻辑

然后

在这里调用的第一个processElement方法就是我们的那个headOperator

这样整个调用责任链就开始从第一个Operator运行起来了

原文地址:https://www.cnblogs.com/ljygz/p/11504220.html

时间: 2024-10-31 00:41:34

Flink中TaskManager端执行用户逻辑过程(源码分析)的相关文章

spring启动component-scan类扫描加载过程---源码分析

有朋友最近问到了 spring 加载类的过程,尤其是基于 annotation 注解的加载过程,有些时候如果由于某些系统部署的问题,加载不到,很是不解!就针对这个问题,我这篇博客说说spring启动过程,用源码来说明,这部分内容也会在书中出现,只是表达方式会稍微有些区别,我将使用spring 3.0的版本来说明(虽然版本有所区别,但是变化并不是特别大),另外,这里会从WEB中使用spring开始,中途会穿插自己通过newClassPathXmlApplicationContext 的区别和联系.

Cocos2dx-3.x 中CCCamera相机类详解及源码分析

Cocos2d-x 3.3版本中加入了相机这个类,该类在3D游戏中是必不可少的,在3D立体游戏中,往往需要视野角度的变化,通过相机的变换才能观察和体验整个游戏世界. CCCamera类基本使用 在游戏中一般有两种类型的相机:一种是透视相机,它在3D游戏中十分常见:另一种是正交相机,它没有透视相机的近大远小的效果而是相机内任何位置的物体大小比例都是一样的. 上图是透视相机的原理图,一般来说,我们通过以下代码创建: _camera = Camera::createPerspective(60, (G

Servlet容器Tomcat中web.xml中url-pattern的配置详解[附带源码分析]

前言 今天研究了一下tomcat上web.xml配置文件中url-pattern的问题. 这个问题其实毕业前就困扰着我,当时忙于找工作. 找到工作之后一直忙,也就没时间顾虑这个问题了. 说到底还是自己懒了,没花时间来研究. 今天看了tomcat的部分源码 了解了这个url-pattern的机制.  下面让我一一道来. tomcat的大致结构就不说了, 毕竟自己也不是特别熟悉. 有兴趣的同学请自行查看相关资料. 等有时间了我会来补充这部分的知识的. 想要了解url-pattern的大致配置必须了解

Java中线程局部变量ThreadLocal使用教程及源码分析

在Java多线程编程中有时候会遇见线程本地局部变量ThreadLocal这个类,下面就来讲讲ThreadLocal的使用及源码分析. ThreadLocal 是Thread Local Varial(线程局部变量)的意思,每个线程在使用线程局部变量的时候都会为使用这个线程局部变量的线程提供一个线程局部变量的副本,使得每个线程都可以完全独立地操作这个线程局部变量,而不会与其他线程发生冲突,从线程的角度来看,每个线程都好像独立地拥有了这个线程局部变量.这样,看似每个线程都在并发访问同一个资源(线程局

【Flume】flume中LoadBalancingSinkProcessor负载均衡实现机制的源码分析

基于上一篇文章http://blog.csdn.net/simonchi/article/details/42520193  相对比较细致的分析后,该文章将对LoadBalancingSinkProcessor源码进行选择性的重要逻辑代码进行讲解 首先读取配置,当然是重写congifure方法 public void configure(Context context) { Preconditions.checkState(getSinks().size() > 1, "The LoadB

Java中HashMap底层实现原理(JDK1.8)源码分析

这几天学习了HashMap的底层实现,但是发现好几个版本的,代码不一,而且看了Android包的HashMap和JDK中的HashMap的也不是一样,原来他们没有指定JDK版本,很多文章都是旧版本JDK1.6.JDK1.7的.现在我来分析一哈最新的JDK1.8的HashMap及性能优化. 在JDK1.6,JDK1.7中,HashMap采用位桶+链表实现,即使用链表处理冲突,同一hash值的链表都存储在一个链表里.但是当位于一个桶中的元素较多,即hash值相等的元素较多时,通过key值依次查找的效

ZGC gc策略及回收过程-源码分析

源码文件:/src/hotspot/share/gc/z/zDirector.cpp 一.回收策略 main入口函数: void ZDirector::run_service() { // Main loop while (_metronome.wait_for_tick()) { sample_allocation_rate(); const GCCause::Cause cause = make_gc_decision(); if (cause != GCCause::_no_gc) { Z

Netty源码分析 (八)----- write过程 源码分析

上一篇文章主要讲了netty的read过程,本文主要分析一下write和writeAndFlush. 主要内容 本文分以下几个部分阐述一个java对象最后是如何转变成字节流,写到socket缓冲区中去的 pipeline中的标准链表结构 java对象编码过程 write:写队列 flush:刷新写队列 writeAndFlush: 写队列并刷新 pipeline中的标准链表结构 一个标准的pipeline链式结构如下 数据从head节点流入,先拆包,然后解码成业务对象,最后经过业务Handler

Live555源码分析[2]:RTSPServer中的用户认证

http://blog.csdn.net/njzhujinhua @20140601 说到鉴权,这是我多年来工作中的一部分,但这里rtsp中的认证简单多了,只是最基本的digest鉴权的策略. 在Live555的实现中, 用户信息由如下类维护,其提供增删查的接口.realm默认值为"LIVE555 Streaming Media" class UserAuthenticationDatabase { public: UserAuthenticationDatabase(char con