如何构建一个flink sql平台

我们都知道,离线计算有Hive,使用过的知道,需要先定义一个schema,比如针对HDFS这种存储对标mysql定义一个schema,schema的本质是什么?主要描述下面这些信息

1)当前存储的物理位置的描述

2)数据格式的组成形式

然后Hive可以让用户定义一段sql,针对上面定义的schema进行,sql的本质是什么,是业务逻辑的描述。然后Hive内部会将这段sql进行编译转化为原生的底层MapReduce操作,通过这种方式,屏蔽底层技术原理,让业务开发人员集中精力在schema和sql业务逻辑上,flink sql平台也正是做同样的事情。

一开始经过跟上海同事的讨论,选择Uber的Athenax作为技术选型,通过翻阅源码,发现还是有很多不完善的地方,比如配置文件采用yaml,如果做多集群调度,平台代码优化,多存储扩展机制,都没有考虑得很清楚,所以代码拿过来之后基本上可以说按照对yarn和flink的理解重新写了一遍。

下面是平台代码的思路

1)通过springboot提供HTTP API,提供多集群定义,存储在mysql里

一个集群需要定义的信息点如下:

2)提供HTTP API让业务进行Job定义

这里的Job定义包含3个方面:job的输出输出的schema定义,job的业务逻辑定义(sql),job需要的yarn资源定义,具体来说如下所示:

Job定义


文中的sql定义

SELECT SUM(nested.number) as nestedNumber,

hundredFunction(SUM(CAST(`value` AS DOUBLE))) as `sum`,

COUNT(`value`) as `count`,

AVG(CAST(`value` AS DOUBLE)) as `avg`,

MAX(CAST(`value` AS DOUBLE)) as `max`,

MIN(CAST(`value` AS DOUBLE)) as `min`,

TUMBLE_END(`time`, INTERVAL ‘3‘ SECOND)  as `time`  

FROM input.`ymm-appmetric-dev-self1`  

WHERE metric IS NOT NULL AND `value` IS NOT NULL 

and `time` IS NOT NULL

GROUP BY metric,TUMBLE(`time`, INTERVAL ‘3‘ SECOND)

输入/输出schema定义,以kafka为例,输入和输出格式差不多

{

"brokerAddress":"略",

"topic":"dev-metric",

"schemas":[

                   {"key":"sum","type":"double"},

                   {"key":"count","type":"int"},

                   {"key":"avg","type":"double"},

                   {"key":"max","type":"double"},

                   {"key":"min","type":"double"},

                   {"key":"time","type":"timestamp"},

                   {"key":"nestedNumber","type":"int"}

                 ]

}

对于业务来说,“打开IDE->了解flink语法写java代码->打包成jar->提交到yarn集群”这一环节省去了,直接打开界面,点击按钮定义sql,写一段业务逻辑sql,提交此业务到mysql,关闭浏览器即可.由平台进行调度(秒级),永远不用担心这个任务某一天挂了怎么办,平台会自动发现自动拉起.提交一次永远不需要再人工干预,除非逻辑发生变化,在逻辑发生变化时也简单,打开任务修改再提交,关闭浏览器,结束,平台会发现job变化杀死老任务拉起新任务.

下面讲一下平台内部是如何实现的

3)集群自动发现

如果平台维护方想增加一个集群,通过界面直接定义一个存在mysql即可,后台线程会自动发现,为每个集群创建一个线程,多节点情况下,整个环境中某个特定集群的多个线程通过ZK进行抢占决定哪个线程当前为这个集群服务.

增加JVM关闭钩子,在JVM退出时,主动关闭ZK客户端,释放ZK上的临时节点.

4)UDF的支持&自动发现

平台支持平台级UDF的定义,由平台人员进行维护,平台人员编写脚本,通过base64编码存在mysql里,归属到某个集群,这个集群的扫描线程发现有必要进行编译时,实时编译成class常驻内存,同时,打包成jar包上传到远程HDFS,后面会将此路径放入到具体job的classpath路径下. job就可以正确发现UDF.

当UDF没有发生变化时,线程不会编译,而是复用上一次的编译结果.

5)程序可以任意部署,不依赖大数据环境

程序本身不依赖大数据环境的配置,具体是指不需要依赖当前宿主机.../etc/hadoop/*.xml文件

通过读取cluster的配置,动态生成XML配置,再生成HDFS/YARN的客户端client,这样,平台代码可以任意部署到物理机/容器中,只要环境可以通过TCP连接到对应域名/ip即可.

6)如何做任务调度-任务的自动发现

这里的任务调度是指:哪些任务需要下线,哪些任务需要第一次上线,哪些任务需要重新上线,

这里的业务逻辑就是比较mysql里job的时间戳和yarn集群里任务的时间戳

yarn集群里任务的时间戳是通过提交时打上Tag标记,就是为了下一次比较用。

这里有一个细节,就是Athenax的做法是先算出所有要杀死的任务,杀死,再拉起所有要拉起的任务,个人认为这里不妥,优化之后的做法是:按照任务级别,算出(killaction,startaction),对于单个job来说,二者至少存在1个action,然后以任务为级别进行调度,不再是之前的大一统提交方式,这样就算单个任务调度异常,也不影响其它任务,做到了任务之间做隔离.

通过时间戳的方式,就不难理解业务一旦修改任务,平台发现时间戳有变化,就可以自动杀死老任务,拉起新任务,不需要人工操作.

7)拉起任务中的编译工作

一个job需要拉起时,会实时结合(输入schema,SQL业务逻辑,输出schema)进行编译,

正如hive会翻译成原生的mapreduce操作,flink sql编译工作会翻译成原生的flink jobgraph

这部分是抽取了athenax里的编译工作做2开

代码如下:

private JobCompilerResult compile(Map inputs, String originSql,
                                      ExternalCatalog output, ResourceDTO resourceDTO,
                                      ClusterDTO athenaxCluster,
                                      Configuration flinkConf) throws Exception {
        // 解析sql
        LoggerUtil.info("to be compiled sql : [{}]", originSql);
        SqlNodeList stmts = (SqlNodeList) new CalciteSqlParser().parse(originSql);
        Validator validator = new Validator();
        validator.validateQuery(stmts);
        HashMap udfMap = validator.getUserDefinedFunctions();
        String selectSql = validator.getStatement().toString();
        List additionalResources = validator.getAdditionalResources();
        LoggerUtil.info("succeed to parse sql,result is : [{}]", stmts);
        LoggerUtil.info("udf {}", udfMap);
        LoggerUtil.info("statement {}", selectSql);
        LoggerUtil.info("additionalResources {}", additionalResources);
        // 准备编译,输出Flink的JobGraph
        LoggerUtil.info("begin to create execution environment");
        StreamExecutionEnvironment localExecEnv = StreamExecutionEnvironment
            .createLocalEnvironment();
        //非常重要
        setFeature(localExecEnv,
            resourceDTO.getTaskManagerCount() * resourceDTO.getSlotPerTaskManager(), flinkConf);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.getTableEnvironment(localExecEnv);
        LoggerUtil.info("tableEnv : {} ", tableEnv);
        // 注册UDF,收归到平台了,也就是说,只支持平台开发人员预定义,暂时不支持业务自定义
        for (Map.Entry e : udfMap.entrySet()) {
            final String name = e.getKey();
            String clazzName = e.getValue();
            LoggerUtil.info("used udf specified by business : {}", name);
        }
        registerSDF(athenaxCluster, tableEnv);
        LoggerUtil.info("all udf registerd , bingo");
        // 开始注册所有的input相关的schema
        for (Map.Entry e : inputs.entrySet()) {
            LoggerUtil.info("Registering input catalog {}", e.getKey());
            tableEnv.registerExternalCatalog(e.getKey(), e.getValue());
        }
        LoggerUtil.info("all input catalog registerd , bingo");
        Table table = tableEnv.sqlQuery(selectSql);
        LoggerUtil.info("succeed to execute tableEnv.sqlQuery(...)");
        LoggerUtil.info("table {}", table);
        LoggerUtil.info("bingo! input work done completely,let us handle output work now!!!");
        // 开始注册output
        List outputTables = output.listTables();
        for (String t : outputTables) {
            table.writeToSink(getOutputTable(output.getTable(t)));
        }
        LoggerUtil.info("handle output ok");
        // 生成JobGraph
        StreamGraph streamGraph = localExecEnv.getStreamGraph();
        JobGraph jobGraph = streamGraph.getJobGraph();
        // this is required because the slots are allocated lazily
        //如果为true就会报错,然后flink内部就是一直重启,所以设置为false
        jobGraph.setAllowQueuedScheduling(false);
        LoggerUtil.info("create flink job ok {}", jobGraph);
        JobGraphTool.analyze(jobGraph);
        // 生成返回结果
        JobCompilerResult jobCompilerResult = new JobCompilerResult();
        jobCompilerResult.setJobGraph(jobGraph);
        ArrayList paths = new ArrayList();
        Collection values = udfMap.values();
        for (String value : values) {
            paths.add(value);
        }
        jobCompilerResult.setAdditionalJars(paths);
        return jobCompilerResult;
    }

这部分工作要理解,需要对Calcite有基础

8)多存储的支持

平台在一开始编写的时候,就考虑到了多存储支持,虽然很多任务是从kafka->计算->Kafka

但是平台并不只满足于这一点,因为写到kafka之后,可能还需要业务再去维护一段代码取读取kafka的消息进行消费,如果有的业务希望直接能把结果写到mysql,这个时候就是需要对多存储进行扩展

通过设计和扩展机制,平台开发人员只需要定义储存相关的类,针对schema定义的解析工作已经再父类中完成,所有存储类共用,这样可以灵活支持多存储,平台开发人员只需要把重点放在特定存储性质的支撑即可.

PS:编写此类存储类需要对fink job内部的运行机制,否则会造成资源泄露和浪费.

平台内部已经针对每种类型进行了定义

    // 存储类型
    //排名不分先后    
    public static int STORAGE_REDIS    = 1 << 0; //1
    public static int STORAGE_MYSQL    = 1 << 1; //2
    public static int STORAGE_ROCKETMQ = 1 << 2; //4
    public static int STORAGE_KAFKA    = 1 << 3; //8
    public static int STORAGE_PULSAR   = 1 << 4; //16
    public static int STORAGE_OTHER0   = 1 << 5; //32
    public static int STORAGE_OTHER1   = 1 << 6; //64
    public static int STORAGE_OTHER2   = 1 << 7; //128
    public static int STORAGE_RABBITMQ = 1 << 8; //256
    public static int STORAGE_HBASE    = 1 << 9; //512
    public static int STORAGE_ES       = 1 << 10;//1024
    public static int STORAGE_HDFS     = 1 << 11;//2048

目前支持的情况如下:

输入:Kafka

输出:Kafka/Mysql

PS:输出mysql是基于flink官方的提供类实现的第一版,经过分析源码,mysql sink官方这部分代码写得太随意,差评.

后续当业务有需求时,需要结合zebra做2次开发.毕竟运维不会提供生产环境的ip和端口等信息,只会提供一个数据源字符串标识.这样更贴合公司内部的运行环境

9)任务提交

一旦生成flink原生的job,就可以准备提交工作

这部分需要对yarn的运行机制比较清楚,比如任务提交到RM上经过哪些状态变化,ApplicationMaster如何申请资源启动TaskManager, 具体的job是如何提交给JobManager的,平台开发人员需要对此有基本的原理掌握,当初也是0基础开始学习,通过快速翻阅源代码掌握一些运行机制,方可安心进行平台开发.

10)其它优化

针对yarn client的参数优化,保证可在一定时间内返回,否则可能一直卡死

针对flink job的平台级优化,比如禁止缓存,让信息立刻传输到下一个环节(默认100毫秒延迟)

定义flink job的重启次数,当发生异常时可自行恢复等

11)压测结果

输入:本地启动7个线程,发送速度

每秒发送到kafka 十几万条

接收topic描述

ymm-appmetric-dev-self1 开发环境   partitions 6  replication 1

flink任务描述

2个TaskManager进程   每个进程800M内存   每个进程3个线程,

并行度 2*3=6

flink计算任务所用sql

SELECT SUM(nested.number) as nestedNumber,

hundredFunction(SUM(CAST(`value` AS DOUBLE))) as `sum`,

COUNT(`value`) as `count`,

AVG(CAST(`value` AS DOUBLE)) as `avg`,

MAX(CAST(`value` AS DOUBLE)) as `max`,

MIN(CAST(`value` AS DOUBLE)) as `min`,

TUMBLE_END(`time`, INTERVAL ‘3‘ SECOND)  as `time`  

FROM input.`ymm-appmetric-dev-self1`  

WHERE metric IS NOT NULL AND `value` IS NOT NULL and `time` IS NOT NULL 

GROUP BY metric,     TUMBLE(`time`, INTERVAL ‘3‘ SECOND)

输出topic

ymm-appmetric-dev-result  partitions 3

观察flink consumer端的消费速度

每个线程的消费速度在24000上下浮动,并发度6,每秒可消费kafka消息14万+,应该说目前不会碰到性能瓶颈.

其它

本次测试发送数据条数:4.3 亿条

耗时:56分钟

对于业务开发人员来说,我觉得好处就是

1)不需要懂flink语法(你真的想知道flink的玩法?好吧我承认你很好学)

2)不需要打开IDE写java代码(你真的想写Java代码?好吧我承认你对Java是真爱)

3)提交一次,不再需要人工介入(你真的想在假期/晚上/过节/过年 担心任务挂掉?好吧我承认你很敬业)

只需要

1)界面点击操作,定义你的schema

2)写一段你所擅长的sql
3)点击提交按钮

4)关闭浏览器

5)关闭电脑

其它的就交给平台吧!

原文地址:https://www.cnblogs.com/ybyqi/p/9857258.html

时间: 2024-11-07 18:52:13

如何构建一个flink sql平台的相关文章

如何构建一个企业的大数据分析平台

面对海量的各种来源的数据,如何对这些零散的数据进行有效的分析,得到有价值的信息一直是大数据领域研究的热点问题. 大数据分析处理平台就是整合当前主流的各种具有不同侧重点的大数据处理分析框架和工具,实现对数据的挖掘和分析,一个大数据分析平台涉及到的组件众多,如何将其有机地结合起来,完成海量数据的挖掘是一项复杂的工作.在搭建大数据分析平台之前,要先明确业务需求场景以及用户的需求,通过大数据分析平台,想要得到哪些有价值的信息,需要接入的数据有哪些,明确基于场景业务需求的大数据平台要具备的基本的功能,来决

Demo:基于 Flink SQL 构建流式应用

Flink 1.10.0 于近期刚发布,释放了许多令人激动的新特性.尤其是 Flink SQL 模块,发展速度非常快,因此本文特意从实践的角度出发,带领大家一起探索使用 Flink SQL 如何快速构建流式应用. 本文将基于 Kafka, MySQL, Elasticsearch, Kibana,使用 Flink SQL 构建一个电商用户行为的实时分析应用.本文所有的实战演练都将在 Flink SQL CLI 上执行,全程只涉及 SQL 纯文本,无需一行 Java/Scala 代码,无需安装 I

如何基于Python构建一个可扩展的运维自动化平台

嘉宾简介 刘天斯 从事互联网运维工作已13年,目前就职于腾讯-互动娱乐部,负责游戏大数据的运营,曾就职于天涯社区,担任首席架构师/系统管理员. 热衷开源技术的研究,包括系统架构.运维开发.负载均衡.缓存技术.数据库.NOSQL.分布式存储.消息中间件.大数据及云计算.Mesos.Docker.DevOps等领域.擅长大规模集群的运维工作,尤其在自动化运维方面有着非常丰富的经验.同时热衷于互联网前沿技术的研究,活跃在国内社区.业界技术大会,充当一名开源技术的传播与分享者. 导言 受 Reboot

如何构建一个有效的服务治理平台

本文我们重点讨论如何构建一个有效的服务治理平台,话不多说,直接切入整体.构建服务治理平台基于“管理”,“度量”,“管控”三个层面统筹考虑安排.具体来讲,又可以分为六个层次来考虑问,分别是:服务管理流程体系,服务治理平台,服务治理核心架构,服务协议规范,服务支撑工具,服务运行环境.六个层面的具体关系如下图所示: 接下来我们分别来看一下每个层面的具体内容. 01 服务治理框架 当下无论对于什么样类型的服务治理核心框架,无论是开源还是自建,在功能层面相差不大,但技术实现却有所差别.但就落地实践而言,自

NHibernate构建一个ASP.NET MVC应用程序

NHibernate构建一个ASP.NET MVC应用程序 什么是Nhibernate? NHibernate是一个面向.NET环境的对象/关系数据库映射工具.对象/关系数据库映射(object/relational mapping,ORM)这个术语表示一种技术,用来把对象模型表示的对象映射到基于SQL的关系模型数据结构中去. NHibernate 是一个基于.Net 的针对关系型数据库的对象持久化类库.NHibernate 来源于非常优秀的基于Java的Hibernate 关系型持久化工具.

基于Spark构建开放式的云计算平台第一阶段课程

在2014年6月30日到7月2日举行的Spark Summit是整个云计算大数据领域的Big Event,在会议上DataBricks公司提出了构建开放的Cloud平台,而且宣布该平台完全基于Spark,该平台功能类似于EC2,但比EC2更快.更灵活.更易用. 构建一个开发的云服务平台,需要存储技术.计算平台.消息驱动框架和开发API架构设计等,所以我们把课程主要分为两个阶段:1,Spark技术实战:2,构建开发云平他的消息驱动框架和开放API设计实现: 本课程是是整个系列课程的第一阶段课程,采

Apache Flink SQL

本篇核心目标是让大家概要了解一个完整的 Apache Flink SQL Job 的组成部分,以及 Apache Flink SQL 所提供的核心算子的语义,最后会应用 TumbleWindow 编写一个 End-to-End 的页面访问的统计示例. 1.Apache Flink SQL Job 的组成 我们做任何数据计算都离不开读取原始数据,计算逻辑和写入计算结果数据三部分,当然基于 ApacheFlink SQL 编写的计算 Job 也离不开这个三部分,如下所所示: &lt;img src=

基于rsyslog+mysql+loganalyzer构建一个小而美的日志服务器

前言: 每当我们遇到问题抓耳挠腮,一脸懵那啥的时候,也许看一下日志就瞬间豁然开朗,所以,一个易用的日志服务器还是很重要的.我们先不介绍elk,那个牛逼吊炸天的日志分析系统.今天我们先来构建一个小的日志服务器.elk我们以后再说~ 正文: 当我们配置完一个服务,启动报错时,那感觉就像吃了啥一样,别提多难受,除非服务程序本身有检测配置文件的程序,不然我们就得依赖linux自己的集中日志服务器rsyslog了.不过,有时候也许我们想搜索一下某条日志,或者向用可视化的界面进行统计分析.此时我们可以把日志

我是如何构建一个持续发展的项目

说起项目,每个程序员都应该搭建过自己的项目,而我也搭建过数十个企业级或互联网级项目:在做企业级项目时也抽象了一套通过的开发脚手架ES方便开发,也做过一些通用的代码生成工具来生成通用项目架子或一些CRUD的代码.做这些平台或项目的时候或多或少给我一些启示和原则,而这些启示和原则一直指导着我内心方向,时刻指导我不偏离航线. 启示录 心中有原则 代码规范化 代码审查 代码重构 代码注释 代码逻辑抽象 工具类 项目闭环 持续改进 自动化 心中有原则 我认为这是搭建和维护项目的灵魂,失去了灵魂,项目虽然能