根据需求我们需要创建部分所需的配置表,各表说明如下:
1、文件FTP主机配置表:SHELL_HOST_CFG
COLUMN_NAME |
DATA_TYPE |
COMMENTS |
LATN_ID |
NUMBER |
|
HOST_ID |
NUMBER |
主机ID |
HOST_NAME |
VARCHAR2 |
主机名 |
HOST_IP |
VARCHAR2 |
主机IP |
HOST_PORT |
VARCHAR2 |
主机端口 |
USERNAME |
VARCHAR2 |
用户名 |
PASSWD |
VARCHAR2 |
密码 |
HOST_PATH |
VARCHAR2 |
默认路径 |
NOTE |
VARCHAR2 |
备注 |
STATE_DATE |
DATE |
时间 |
create table SHELL_HOST_CFG ( host_id NUMBER(2) not null, host_name VARCHAR2(30), host_ip VARCHAR2(15) not null, host_port VARCHAR2(10), username VARCHAR2(20), passwd VARCHAR2(20), host_path VARCHAR2(60), note VARCHAR2(50), state_date DATE, latn_id NUMBER(4) ); comment on column SHELL_HOST_CFG.host_id is '主机ID'; comment on column SHELL_HOST_CFG.host_name is '主机名'; comment on column SHELL_HOST_CFG.host_ip is '主机IP'; comment on column SHELL_HOST_CFG.host_port is '主机端口'; comment on column SHELL_HOST_CFG.username is '用户名'; comment on column SHELL_HOST_CFG.passwd is '密码'; comment on column SHELL_HOST_CFG.host_path is '默认路径'; comment on column SHELL_HOST_CFG.note is '备注'; comment on column SHELL_HOST_CFG.state_date is '时间'; -- Create/Recreate primary, unique and foreign key constraints alter table SHELL_HOST_CFG add constraint PK_SHELL_HOST_CFG primary key (HOST_ID) using index tablespace TBS_CSS_INT_IDX;
2、文件入库配置表 SHELL_PROC_GATHER_CFG
COLUMN_NAME |
DATA_TYPE |
COMMENTS |
PROC_ID |
NUMBER |
流程ID |
OWNER_NAME |
VARCHAR2 |
目标库模式名 |
FILE_HEAD |
VARCHAR2 |
文件头如果要添加日志如:BML.MKT_LO_D.${l_date} |
DATE_FORMAT |
VARCHAR2 |
日期格式 如:yyyymmdd |
TARGET_TABLE |
VARCHAR2 |
目标表 |
FIELD_LIST |
VARCHAR2 |
控制文件中的字段 |
HOST_ID |
NUMBER |
主机ID,配置表见:shell_host_cfg |
SRC_DIR |
VARCHAR2 |
源文件目录 |
FILE_TAIL |
VARCHAR2 |
文件后缀 |
CHECK_FILE |
VARCHAR2 |
check文件 |
LATN_ID |
NUMBER |
本地网标识 |
DESC_DIR |
VARCHAR2 |
目标目录 |
TERMINATE |
VARCHAR2 |
分隔符 |
IS_CHECK |
CHAR |
是否需要check文件 |
AFTER_FTP_SHELL |
VARCHAR2 |
FTP之后需要执行的shell命令 |
BEFORE_LOAD_SQL |
VARCHAR2 |
SQLLDR之前预操作 |
DATA_FILES |
VARCHAR2 |
数据文件名称 |
IF_CHECK_DDL |
CHAR |
是否检查表结构 |
TAB_DDL |
VARCHAR2 |
表结构来源 |
SHORT_PROC_ID |
VARCHAR2 |
流程短号 |
CHARACTERSET |
VARCHAR2 |
导入使用的字符集,默认UTF8 |
SHELL_AFTER_ALL |
VARCHAR2 |
导入完成后需要执行的SHELL脚本 |
create table SHELL_PROC_GATHER_CFG ( proc_id NUMBER(20) not null, owner_name VARCHAR2(200), file_head VARCHAR2(200), date_format VARCHAR2(100), target_table VARCHAR2(100), field_list VARCHAR2(4000), host_id NUMBER(12), src_dir VARCHAR2(1000), file_tail VARCHAR2(200), check_file VARCHAR2(100), latn_id NUMBER(4), desc_dir VARCHAR2(1000), terminate VARCHAR2(100), is_check CHAR(1), after_ftp_shell VARCHAR2(4000), before_load_sql VARCHAR2(4000), data_files VARCHAR2(1000), if_check_ddl CHAR(1), tab_ddl VARCHAR2(4000), short_proc_id as (SUBSTR(TO_CHAR("PROC_ID"),1,4)), characterset VARCHAR2(100) default 'UTF8', shell_after_all VARCHAR2(4000) ); -- Add comments to the columns comment on column SHELL_PROC_GATHER_CFG.proc_id is '流程ID'; comment on column SHELL_PROC_GATHER_CFG.owner_name is '目标库模式名'; comment on column SHELL_PROC_GATHER_CFG.file_head is '文件头如果要添加日志如:BML.MKT_LO_D.${l_date}'; comment on column SHELL_PROC_GATHER_CFG.date_format is '日期格式 如:yyyymmdd'; comment on column SHELL_PROC_GATHER_CFG.target_table is '目标表'; comment on column SHELL_PROC_GATHER_CFG.field_list is '控制文件中的字段'; comment on column SHELL_PROC_GATHER_CFG.host_id is '主机ID,配置表见:shell_host_cfg'; comment on column SHELL_PROC_GATHER_CFG.src_dir is '源文件目录'; comment on column SHELL_PROC_GATHER_CFG.file_tail is '文件后缀'; comment on column SHELL_PROC_GATHER_CFG.check_file is 'check文件'; comment on column SHELL_PROC_GATHER_CFG.latn_id is '本地网标识'; comment on column SHELL_PROC_GATHER_CFG.desc_dir is '目标目录'; comment on column SHELL_PROC_GATHER_CFG.terminate is '分隔符'; comment on column SHELL_PROC_GATHER_CFG.is_check is '是否需要check文件'; comment on column SHELL_PROC_GATHER_CFG.after_ftp_shell is 'FTP之后需要执行的shell命令'; comment on column SHELL_PROC_GATHER_CFG.before_load_sql is 'SQLLDR之前预操作'; comment on column SHELL_PROC_GATHER_CFG.data_files is '数据文件名称'; comment on column SHELL_PROC_GATHER_CFG.if_check_ddl is '是否检查表结构'; comment on column SHELL_PROC_GATHER_CFG.tab_ddl is '表结构来源'; comment on column SHELL_PROC_GATHER_CFG.short_proc_id is '流程短号'; comment on column SHELL_PROC_GATHER_CFG.characterset is '导入使用的字符集,默认UTF8'; comment on column SHELL_PROC_GATHER_CFG.shell_after_all is '导入完成后需要执行的SHELL脚本'; -- Create/Recreate primary, unique and foreign key constraints alter table SHELL_PROC_GATHER_CFG add constraint PK_SHELL_PROC_GATHER_CFG primary key (PROC_ID) using index tablespace TBS_CSS_INT_IDX; alter table SHELL_PROC_GATHER_CFG add constraint FK_SHELL_PROC_GATHER_CFG foreign key (PROC_ID) references SHELL_PROC_RUN_CFG (PROC_ID) on delete cascade; alter table SHELL_PROC_GATHER_CFG add constraint FK_SHELL_PROC_GATHER_CFG_HOST foreign key (HOST_ID) references SHELL_HOST_CFG (HOST_ID);
3、流程节点配置表SHELL_PROC_RUN_CFG
COLUMN_NAME |
DATA_TYPE |
COMMENTS |
PROC_ID |
NUMBER |
流程ID |
PROC_NAME |
VARCHAR2 |
流程名称 |
REPORT_ID |
NUMBER |
报表ID |
REPORT_NAME |
VARCHAR2 |
报表名称 |
RUN_DAY |
VARCHAR2 |
运行日期,月报需要配置 |
RUN_HOUR |
VARCHAR2 |
运行小时,月报,日报,周报需要配置 |
RUN_MINUTE |
VARCHAR2 |
运行分钟,月报,日报,周报,小时报表需要配置 |
LAST_SUCC_CYCLE |
VARCHAR2 |
上次执行成功账期 |
RUN_TYPE |
VARCHAR2 |
运行类型(1:月报 2:日报 3:周报 4:小时报 5:分钟报表 6:只执行一次) |
RUN_SQL |
VARCHAR2 |
运行SQL |
LATN_ID |
NUMBER |
本地网标识 |
STATE |
CHAR |
流程状态 |
MODIFY_DT |
DATE |
新增修改时间 |
OWNER |
VARCHAR2 |
所有者 |
ORDER_ID |
NUMBER |
执行顺序 |
ERR_CNT |
NUMBER |
同一账期内允许重复运行的报错次数 |
IF_EXECUTED |
CHAR |
针对只执行一次的调度 |
ETL_TYPE |
VARCHAR2 |
调度类型 SHELL:需要在RUN_SQL中填写 SHELL文件全路径以及需要传入的参数 |
IF_CONVERT |
CHAR |
是否需要转换数据,如后续抽取整合到其他用户下的结果表 |
SHORT_PROC_ID |
VARCHAR2 |
|
GATHER_TYPE |
VARCHAR2 |
EDW方式或者ODSO方式 |
APPOINT_TIME |
VARCHAR2 |
约定时间 |
RUN_WEEK_DAY |
VARCHAR2 |
|
create table SHELL_PROC_RUN_CFG ( proc_id NUMBER(20) not null, proc_name VARCHAR2(4000), report_id NUMBER(12), report_name VARCHAR2(200), run_day VARCHAR2(2), run_hour VARCHAR2(2), run_minute VARCHAR2(2), last_succ_cycle VARCHAR2(100), run_type VARCHAR2(20), run_sql VARCHAR2(4000), latn_id NUMBER(4), state CHAR(1) default 'A', modify_dt DATE default SYSDATE, owner VARCHAR2(100), order_id NUMBER(12), err_cnt NUMBER(12), if_executed CHAR(1) default 'N', etl_type VARCHAR2(100), if_convert CHAR(1) default 'Y', short_proc_id as (SUBSTR(TO_CHAR("PROC_ID"),1,4)), gather_type VARCHAR2(20) default 'EDW', appoint_time VARCHAR2(100), run_week_day VARCHAR2(2) default 0 ); -- Add comments to the columns comment on column SHELL_PROC_RUN_CFG.proc_id is '流程ID'; comment on column SHELL_PROC_RUN_CFG.proc_name is '流程名称'; comment on column SHELL_PROC_RUN_CFG.report_id is '报表ID'; comment on column SHELL_PROC_RUN_CFG.report_name is '报表名称'; comment on column SHELL_PROC_RUN_CFG.run_day is '运行日期,月报需要配置'; comment on column SHELL_PROC_RUN_CFG.run_hour is '运行小时,月报,日报,周报需要配置'; comment on column SHELL_PROC_RUN_CFG.run_minute is '运行分钟,月报,日报,周报,小时报表需要配置'; comment on column SHELL_PROC_RUN_CFG.last_succ_cycle is '上次执行成功账期'; comment on column SHELL_PROC_RUN_CFG.run_type is '运行类型(1:月报 2:日报 3:周报 4:小时报 5:分钟报表 6:只执行一次)'; comment on column SHELL_PROC_RUN_CFG.run_sql is '运行SQL'; comment on column SHELL_PROC_RUN_CFG.latn_id is '本地网标识'; comment on column SHELL_PROC_RUN_CFG.state is '流程状态'; comment on column SHELL_PROC_RUN_CFG.modify_dt is '新增修改时间'; comment on column SHELL_PROC_RUN_CFG.owner is '所有者'; comment on column SHELL_PROC_RUN_CFG.order_id is '执行顺序'; comment on column SHELL_PROC_RUN_CFG.err_cnt is '同一账期内允许重复运行的报错次数'; comment on column SHELL_PROC_RUN_CFG.if_executed is '针对只执行一次的调度'; comment on column SHELL_PROC_RUN_CFG.etl_type is '调度类型 SHELL:需要在RUN_SQL中填写 SHELL文件全路径以及需要传入的参数'; comment on column SHELL_PROC_RUN_CFG.if_convert is '是否需要转换数据,如后续抽取整合到其他用户下的结果表'; comment on column SHELL_PROC_RUN_CFG.gather_type is 'EDW方式或者ODSO方式'; comment on column SHELL_PROC_RUN_CFG.appoint_time is '约定时间'; -- Create/Recreate primary, unique and foreign key constraints alter table SHELL_PROC_RUN_CFG add constraint PK_SHELL_PROC_RUN_CFG primary key (PROC_ID) using index tablespace TBS_CSS_INT_IDX;
4、入库日志表SHELL_PROC_RUN_LOG
COLUMN_NAME |
DATA_TYPE |
COMMENTS |
PROC_ID |
NUMBER |
流程ID |
STAT_CYCLE_ID |
NUMBER |
账期 |
RUN_STATE |
VARCHAR2 |
运行状态 0:运行成功 1:未运行 2:运行中 3:报错终止 其他:运行报错 |
RUN_MSG |
VARCHAR2 |
运行情况 |
START_TIME |
DATE |
开始时间 |
END_TIME |
DATE |
结束时间 |
ERR_CNT |
NUMBER |
报错次数 |
LATN_ID |
NUMBER |
本地网标识 |
SID |
NUMBER |
会话ID |
SERIAL# |
NUMBER |
系列号 |
INST_ID |
NUMBER |
数据库实例ID(针对多实例) |
RUN_TYPE |
VARCHAR2 |
|
SPID |
NUMBER |
|
IS_SYNC |
VARCHAR2 |
日志状态是否已经同步 |
SHORT_PROC_ID |
VARCHAR2 |
create table SHELL_PROC_RUN_LOG ( proc_id NUMBER(20), stat_cycle_id NUMBER(20), run_state VARCHAR2(100), run_msg VARCHAR2(4000), start_time DATE default SYSDATE, end_time DATE, err_cnt NUMBER(12), latn_id NUMBER(4), sid NUMBER, serial# NUMBER, inst_id NUMBER, run_type VARCHAR2(20), spid NUMBER, is_sync VARCHAR2(100) default 'N', short_proc_id as (SUBSTR(TO_CHAR("PROC_ID"),1,4)) ); -- Add comments to the columns comment on column SHELL_PROC_RUN_LOG.proc_id is '流程ID'; comment on column SHELL_PROC_RUN_LOG.stat_cycle_id is '账期'; comment on column SHELL_PROC_RUN_LOG.run_state is '运行状态 0:运行成功 1:未运行 2:运行中 3:报错终止 其他:运行报错'; comment on column SHELL_PROC_RUN_LOG.run_msg is '运行情况'; comment on column SHELL_PROC_RUN_LOG.start_time is '开始时间'; comment on column SHELL_PROC_RUN_LOG.end_time is '结束时间'; comment on column SHELL_PROC_RUN_LOG.err_cnt is '报错次数'; comment on column SHELL_PROC_RUN_LOG.latn_id is '本地网标识'; comment on column SHELL_PROC_RUN_LOG.sid is '会话ID'; comment on column SHELL_PROC_RUN_LOG.serial# is '系列号'; comment on column SHELL_PROC_RUN_LOG.inst_id is '数据库实例ID(针对多实例)';
以上几个配置表是文件调度的基础表,当然如果有前后置关系的话,还可以根据需要创建如下关系表:
create table SHELL_PROC_RUN_RELA ( proc_id NUMBER(20), sum_proc_id NUMBER(20), comments VARCHAR2(200), modify_dt DATE default SYSDATE ) tablespace TBS_CSS_RPT_4
ETL调度开发(2)——配置表说明
时间: 2024-11-20 13:03:54