一、数据整合业务场景
1.1 背景
因GA某系统进行调整,导致原先从该系统备份数据库获取的相应数据资源无法正常获取,后续的数据统一由GA某系统每日将前一日的ZTRY照片数据压缩后上传到指定的FTP服务器上,需要从该FTP上下载下来,解压后将JPEG照片数据解析加载进入数据库。
1.2 目标
需要建立作业可以定时从FTP下载服务器将ZIP文件下载到本地指定文件夹,将ZIP文件解压后,获得JPEG格式照片,通过Kettle转换将照片进行解析,同时按照一定的规则将照片的名字等信息解析拼接成唯一主键,以实现将本地照片文件整合到数据库中进行存储,整合完毕后将ZIP压缩文件转移至备份文件夹,同时将解压后的文件夹及照片文件删除。
1.3 工具及组件
工具:IDI(基于Kettle二次开发的ETL工具)
组件:①转换:输入组件、输出组件、
②作业:
二、数据整合设计思路
2.1 作业调度设计思路
设计思路如下图流程所示:
1、获取时间变量
作业启动后调用时间变量转换进行时间变量的捕获,获取经过格式化处理的昨日日期。
2、检查目录情况
进入本地指定下载文件夹中进行查看,确认文件夹中是否已经下载昨日日期的ZIP文件。
3、执行判断
若下载文件夹中为空,或不存在昨日日期的ZIP文件,则跳转进入FTP下载;反之,跳出作业停止,等待下一次作业启动。
4、FTP下载文件
根据日期从FTP上下载符合日期要求的ZIP文件,并将其保存在本地指定的下载文件夹中。
5、解压缩ZIP文件
将ZIP文件解压缩到指定的解压文件夹。
6、数据清洗整合
调用数据清洗处理整合转换,捕获ZIP文件中的数据并针对数据进行清洗、转换、处理、加载。
7、删除目录
数据清洗转换完成,数据正常加载进入数据库后,将解压缩产生的目录和文件删除,释放空间。
8、移动文件
将ZIP文件转移到备份文件夹进行保存,以便后期数据出现问题进行恢复或者进行核查。
9、结束
结束整个作业,等待作业下一次启动。
2.2 转换抽取设计思路
2.2.1 捕获时间变量
设计思路如下图流程所示:
1、获取系统日期时间
获取当前系统的昨日零点时间,即若本日为2018年07月15日18时36分47秒,则捕获的时间应为2018/07/14 00:00:00.000。
2、日期时间格式化
将上述2018/07/14 00:00:00.000格式化为20180714。
3、设置时间变量
将格式化的日期数据作为变量传入变量中保存。
2.2.2 数据清洗整合
设计思路如下所示:
1、捕获照片文件名称
2、照片文件解析
将照片文件解析成为二进制数据。
3、照片数据清洗整合
将数据流中的数据通过组件进行清洗、转换,整合成目标数据流。
4、数据加载
将目标数据流加载进入数据库中存储。
三、数据整合解决方案
3.1 作业调度模型
3.1.1 作业调度流程
图示:作业调度模型
3.1.2 作业模型解析
3.1.2.1 启动
3.1.2.2 调用时间变量转换模型
3.1.2.3 检查目录是否为空
3.1.2.4 跳出作业
3.1.2.5 FTP下载
3.1.2.6 解压缩文件
3.1.2.7 调用数据清洗整合转换模型
3.1.2.8 删除解压缩目录
3.1.2.9 移动压缩文件
3.1.2.10 作业结束
3.2 整合转换模型
3.2.1 整合转换流程
图示:捕获时间变量转换。
图示:数据清洗整合转换
3.2.2 整合转换流程解析
3.2.2.1 捕获时间变量转换
3.2.2.1.1 获取系统日期时间
3.2.2.1.2 日期时间格式化
//Script here
var SYS_DATE = date2str(TODAY,‘yyyyMMdd‘);
3.2.2.1.3 设置时间变量
3.2.2.2 数据清洗整合转换
3.2.2.2.1 获取文件名
3.2.2.2.2 照片转换二进制
/Script here
var file = new java.io.File(filename);
var fileInputStream = new java.io.FileInputStream(file);
var Content = org.pentaho.di.core.Const.createByteArray(file.length());
fileInputStream.read(Content,0,file.length());
fileInputStream.close();
3.2.2.2.3 拆分字段
3.2.2.2.4 字符串替换
3.2.2.2.5 字段选择
3.2.2.2.6 列转行
3.2.2.2.7 数据修改
//Script here
var zp01 = ZP_1;
var zp02 = ZP_2;
var zp03 = ZP_3;
var zp04 = ZP_4;
var zp05 = ZP_5;
var zp06 = ZP_6;
var zp01modifytime = ZP_1_LASTMODIFYTIME;
var zp02modifytime = ZP_2_LASTMODIFYTIME;
var zp03modifytime = ZP_3_LASTMODIFYTIME;
var zp04modifytime = ZP_4_LASTMODIFYTIME;
var zp05modifytime = ZP_5_LASTMODIFYTIME;
var zp06modifytime = ZP_6_LASTMODIFYTIME;
var zpscrq = replace(dirname,‘_ZP‘,‘‘);
var dirname1 = ‘ftp://10.78.1.30/‘+ dirname + ‘.zip‘;
if(zp01==null && zp02==null && zp03==null){
zp01=zp04;
zp02=zp05;
zp03=zp06;
zp01modifytime=zp04modifytime;
zp02modifytime=zp05modifytime;
zp03modifytime=zp06modifytime;}
3.2.2.2.8 获取系统信息
3.2.2.2.9 字段选择
3.2.2.2.10 过滤记录
3.2.2.2.11 插入更新
3.2.2.2.12 抛弃无效数据
四、其他说明
暂无。
原文地址:https://www.cnblogs.com/badian/p/9319015.html