Apache NiFi Processor实战

1 前言

Apache NiFi是什么?NiFi官网给出如下解释:“一个易用、强大、可靠的数据处理与分发系统”。通俗的来说,即Apache NiFi 是一个易于使用、功能强大而且可靠的数据处理和分发系统,其为数据流设计,它支持高度可配置的指示图的数据路由、转换和系统中介逻辑。
为了对NiFi能够表述的更为清楚,下面通过NiFi的架构来做简要介绍,如下图所示。

根据官网对各个组件的说明,做摘要翻译:
? WebServer:其目的在于提供基于HTTP的命令和控制API。
? Flow Controller:这是操作的核心,以Processor为处理单元,提供了用于运行的扩展线程,并管理扩展接收资源时的调度。
? Extensions:在其他文档中描述了各种类型的NiFi扩展,Extensions的关键在于扩展在JVM中操作和执行。
? FlowFile Repository:FlowFile库的作用是NiFi跟踪记录当前在流中处于活动状态的给定流文件的状态,其实现是可插拔的,默认的方法是位于指定磁盘分区上的一个持久的写前日志。
? Content Repository:Content库的作用是给定流文件的实际内容字节所在的位置,其实现也是可插拔的。默认的方法是一种相对简单的机制,即在文件系统中存储数据块。
? Provenance Repository:Provenance库是所有源数据存储的地方,支持可插拔。默认实现是使用一个或多个物理磁盘卷,在每个位置事件数据都是索引和可搜索的。

2 NiFi Processer介绍

上一节说了那么多,主要通过NiFi的架构图介绍了NiFi的基本概念,由概念可知Flow Controller是NiFi的核心,那么Flow Controller具体是什么?Flow Controller扮演者文件交流的处理器角色,维持着多个处理器的连接并管理各个Processer,Processer则是实际处理单元。

那么,让我们通过NiFi的UI看下NiFi的Processor包含哪些?

通过上图可知,Processor包含各种类型的组件,如amazon、attributes、hadoop等,可通过前缀进行轻易辨识,如Get、Fetch开头代表获取,如getFile、getFTP、FetchHDFS,execute代表执行,如ExecuteSQL、ExecuteProcess、ExecuteFlumeSink等均可较容易知其简单用途。

3 NiFi Processer实战

说了那么多,介绍了NiFi的架构和Processor,那么说好的实战呢?那么,本文就以笔者的一个实际需求为例,进行Processor的实战。需求如下:选取一款数据处理调度工具,对服务器脚本实现定制调度执行。其中服务器的脚本涉及到对环境变量、oracle数据库、Hadoop生态圈组件的调度。当对服务器脚本调度执行完成后返回脚本运行状态,并提供失败重运行接口。
为了实现需求,曾调度过各种调度工具,如Apache Oozie、Azkaban、Pentaho等,最终比较了各种利弊尝试选用Apache NiFi作为尝试,通过查阅NiFi Processor API,能更好的支持远程操作的Processor为ExecuteProcess。下面将对需求进行实战讲解。

3.1 Processor的添加与配置

1. 点击“Add Processor”,选择ExecuteProcess后点击Add按钮完成添加,如下图。

2. 右击ExecuteProcess后选择Configure Processor,对Properties选项卡进行配置,其中每一个配置选项均提供了相关的说明,如下图。

如上图所示,这里有必要对各选项进行相关说明。
? Command(执行命令): sh。
? Command Arguments(执行命令参数):-c;ssh [email protected] sh js/job/job_hourly.sh `date
? Batch Duration(执行间隔时间):不设置。//我们需求是通过定时调度,而并非按间隔时间执行。
? Redirect Error Stream(重定位流):不设置。
? Argument Delimiter(执行命令参数分隔符):; //以;对参数进行分割。

3.2 Processor调度

NiFi支持三种调度策略,包括Time Driven(时间驱动)、CRON Driven(CRON驱动)和Event Driven(事件驱动,非可选),根据我们实际需求选择CRON Driven,个人理解CRON即是Crontab的应用,CRON的各参数含义分别代表:秒、分、时、日、月、周、年,需要配合*、?和L共同执行(*代表字段的值都有效;?代表对于指定的字段不指定值;L代表长整形)。如:“0 0 13 * * ?”代表想要在每天下午1点进行调度执行。因此根据我们的需求进行参数的调度配置。如下图所示。

3.3 运行状态监控

NiFi通过Rest API供开发者调度,这里我们用Processor API对运行状态进行监控(状态参数获取、Processor的启动与停止)。
1) 运行状态监控参数获取:
命令如下:curl ‘http://IP/nifi-api/processors/processorsID ‘得到如下结果,可通过json解析器解析并获取状态。

2) Processor的启动与停止:
NiFi的Processor启动停止通过其Put方法实现,Put最有效的作用是改变其运行状态,NiFi的Process总共有三种状态,即Running、Stopped和Disabled。
那么我们将开始和停止两个命令Rest API的放在脚本中执行即可。
? 启动命令(使用Rest API的Put方法):
curl -i -X PUT -H ‘Content-Type:application/json’ -d ‘
{
“revision”: {
“clientId”: “586ec1d7-015d-1000-6459-28251212434e”,
“version”:17},
“component”: {
“id”: “39e0dafc-015d-1000-918d-bee89ae2226e”,
“state”: “RUNNING”
}
}’ http://IP/nifi-api/processors/processorsID
? 停止命令(使用Rest API的Put方法):
curl -i -X PUT -H ‘Content-Type:application/json’ -d ‘
{
“revision”: {
“clientId”: “586ec1d7-015d-1000-6459-28251212434e”,
“version”:17},
“component”: {
“id”: “39e0dafc-015d-1000-918d-bee89ae2226e”,
“state”: “STOPPED”
}
}’ http://IP/nifi-api/processors/processorsID

4 小结与后记

本文首先对Apache NiFi进行简介,后以笔者的实际需求为例,对NiFi核心组件Processor的实战说明。由于NiFi仍然属于Apache推出时间不长的一个顶级项目,虽功能十分强大,但可查阅资源仍然有限,本文更多的是一个抛砖的过程,其真正强大的功能还在数据处理上,欢迎感兴趣的各位进行互相探讨。

原文链接:http://www.cobub.com/actual-combat-of-apache-nifi-processor/

时间: 2024-08-10 22:50:17

Apache NiFi Processor实战的相关文章

Apache Nifi 组件开发

Apache NiFi是由美国过国家安全局(NSA)贡献给Apache基金会的开源项目,其设计目标是自动化系统间的数据流.基于其工作流式的编程理念,NiFi非常易于使用,强大,可靠及高可配置.两个最重要的特性是其强大的用户界面及良好的数据回溯工具(官网吹的) nifi 使用起来还比较方便,基于web 的控制界面能够比较好的管理和控制数据流转,数据处理的业务流程也比较明确,下面说一下 如何定制开发一个基于我们自身业务的组件 1.工程结构 nifi对工程结构做了规范,我们只要按照他的格式创创建工程包

【NIFI】 Apache NiFI 使用技巧

本章介绍NIFI组件的使用. 主要有:Nginx反向代理NIFI,配置SSLContextService Nginx反向代理NIFI 使用nginx反向代理NIFI配置如下 1 upstream nifi_server { 2 ip_hash; 3 server 127.0.0.1:18002; 4 } 5 6 server { 7 listen 80; 8 server_name nifi.example.com; 9 charset utf-8; 10 11 location / { 12

Apache NiFi之MySQL数据同步到HBase

一.说明 将Apache NiFi做为关系型数据与非关系型数据库的数据同步工具使用,在此场景中需要将mysql导出的avro数据格式转化为json入库HBase 二.开拔 Ⅰ).配置ExecuteSQLRecord a).选择ExecuteSQLRecord 在Processor中搜索ExecuteSQLRecord b).配置ExecuteSQLRecord 1.创建Database Connection Pool 2.创建JsonRecordSetWriter 3.配置SQL select

Apache NiFi之Kafka流数据到HBase

一.说明 在大数据平台的业务场景中,处理实时kafka数据流数据,以成为必要的能力:此篇将尝试通过Apache NiFi来接入Kafka数据然后处理后存储之HBase 二.开拔 Ⅰ).配置ConsumeKafka_0_10 测试使用了kafka0.10版本,所以NiFi中也选择对于版本 a).选择ConsumeKafka_0_10 在Processor中搜索ConsumeKafka_0_10 b).配置ConsumeKafka_0_10 1.Kafka Brokers: hostname1:90

Apache NiFi 概述

Apache NiFi概述 Apache NiFi团队[email protected] 什么是Apache NiFi? 简单地说,NiFi是为了自动化系统之间的数据流而构建的.虽然术语“数据流”在各种环境中使用,但我们在此处使用它来表示系统之间自动化和管理的信息流.这个问题空间一直存在,因为企业有多个系统,其中一些系统创建数据,一些系统消耗数据.已经讨论并广泛阐述了出现的问题和解决方案模式.企业集成模式 [eip]中提供了一个全面且易于使用的表单. 数据流的一些高级挑战包括: 系统失败 网络故

Apache NiFi 入门指南

本指南使用于谁? 本指南适用于从未使用过,在NiFi中有限度接触或仅完成特定任务的用户.本指南不是详尽的说明手册或参考指南.“ 用户指南”提供了大量信息,旨在提供更加详尽的资源,并且作为参考指南非常有用.相比之下,本指南旨在为用户提供所需的信息,以便了解如何使用NiFi,以便快速轻松地构建强大而灵活的数据流. 一些因为本指南中的某些信息仅适用于初次使用的用户,而其他信息可能适用于那些使用过NiFi的人,本指南分为几个不同的部分,其中一些可能对某些部分没用读者.随意跳转到最适合您的部分. 本指南确

Apache Curator入门实战

版权声明:本文为博主原创文章,未经博主允许不得转载. 目录(?)[+] Apache Curator入门实战 Curator是Netflix公司开源的一个Zookeeper客户端,与Zookeeper提供的原生客户端相比,Curator的抽象层次更高,简化了Zookeeper客户端的开发量. 1.Zookeeper安装部署 Zookeeper的部署很简单,如果已经有Java运行环境的话,下载tarball解压后即可运行. [root@vm Temp]$ wget http://mirror.bi

Apache Spark技术实战之1 -- KafkaWordCount

欢迎转载,转载请注明出处,徽沪一郎. 概要 Spark应用开发实践性非常强,很多时候可能都会将时间花费在环境的搭建和运行上,如果有一个比较好的指导将会大大的缩短应用开发流程.Spark Streaming中涉及到和许多第三方程序的整合,源码中的例子如何真正跑起来,文档不是很多也不详细. 本篇主要讲述如何运行KafkaWordCount,这个需要涉及Kafka集群的搭建,还是说的越仔细越好. 搭建Kafka集群 步骤1:下载kafka 0.8.1及解压 wget https://www.apach

Apache NiFi MiNiFI C++ 0.0.1 发布

Apache NiFi MiNiFI C++ 0.0.1 发布了,Apache NiFi 是一个易于使用.功能强大而且可靠的数据处理和分发系统(app开发ty300.com).Apache NiFi 是为数据流设计(基础教程qkxue.net).它支持高度可配置的指示图的数据路由.转换和系统中介逻辑. 一些提升: Initial framework implemented in C/C++Declarative configuration of processing flows through