4.airflow测试

1.测试sqoop任务
1.1 测试全量抽取
1.1.1.直接执行命令
1.1.2.以shell文件方式执行sqoop或hive任务
1.2 测试增量抽取
2.测试hive任务
3.总结

当前生产上的任务主要分为两部分:sqoop任务和hive计算任务,测试这两种任务,分别以shell文件和直接执行命令的方式来测试.

本次测试的表是airflow.code_library.

1.测试sqoop任务

1.1 测试全量抽取

1.1.1.直接执行命令

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

default_args = {
    ‘owner‘: ‘yangxw‘,
    ‘depends_on_past‘: False,
    ‘start_date‘: datetime(2017, 5, 23),
}
dag = DAG(‘sqoop4‘, default_args=default_args,schedule_interval=None)
bash_cmd = ‘‘‘
sqoop import --connect jdbc:oracle:thin:@//XX.XX.XX.XX/aaaa --username bbbb --password ‘cccc‘ --query " select CODENO, ITEMNO, ITEMNAME, BANKNO, SORTNO, ISINUSE, ITEMDESCRIBE, ITEMATTRIBUTE, RELATIVECODE, ATTRIBUTE1, ATTRIBUTE2, ATTRIBUTE3, ATTRIBUTE4, ATTRIBUTE5, ATTRIBUTE6, ATTRIBUTE7, ATTRIBUTE8, INPUTUSER, INPUTORG, INPUTTIME, UPDATEUSER, UPDATETIME, REMARK, HELPTEXT , to_char(SysDate,‘YYYY-MM-DD HH24:mi:ss‘) as etl_in_dt from XDGL.CODE_LIBRARY where \$CONDITIONS " --hcatalog-database airflow --hcatalog-table CODE_LIBRARY --hcatalog-storage-stanza ‘stored as ORC‘ --hive-overwrite --hive-delims-replacement " " -m 1
‘‘‘
t1 = BashOperator(
    task_id=‘sqoopshell‘,
    bash_command=bash_cmd,
    dag=dag)

测试成功,数据导入到表中.

1.1.2.以shell文件方式执行sqoop或hive任务

上述步骤虽然可以执行成功,但是如果要truncate 表,那么还要需要再增加一个task来执行truncate命令,这样一个ETL任务就要分成两个task很不方便.通过shell将truncate和import放在一起执行.

1)创建dag

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

default_args = {
    ‘owner‘: ‘yangxw‘,
    ‘depends_on_past‘: False,
    ‘start_date‘: datetime(2017, 5, 23)
}

dag = DAG(‘sqoop7‘, default_args=default_args,schedule_interval=None)

bash_cmd = ‘sh /home/airflow/sqoop3.sh‘
t1 = BashOperator(
    task_id=‘sqoop7‘,
    bash_command=bash_cmd,
    dag=dag)

2)创建shell文件

hive -e "truncate table airflow.CODE_LIBRARY"
sqoop import --connect jdbc:oracle:thin:@//AAAA/BBB --username CCC --password ‘DDD‘ --query " select CODENO, ITEMNO, ITEMNAME, BANKNO, SORTNO, ISINUSE, ITEMDESCRIBE, ITEMATTRIBUTE, RELATIVECODE, ATTRIBUTE1, ATTRIBUTE2, ATTRIBUTE3, ATTRIBUTE4, ATTRIBUTE5, ATTRIBUTE6, ATTRIBUT
E7, ATTRIBUTE8, INPUTUSER, INPUTORG, INPUTTIME, UPDATEUSER, UPDATETIME, REMARK, HELPTEXT , to_char(SysDate,‘YYYY-MM-DD HH24:mi:ss‘) as etl_in_dt from XDGL.CODE_LIBRARY where \$CONDITIONS " --hcatalog-database airflow --hcatalog-table CODE_LIBRARY --hcatalog-storage-stanza ‘stored as ORC‘ --hive-overwrite --hive-delims-replacement " " -m 1 

将这些文件分发到scheduler和worker节点上,然后执行:

查看日志会报错:

…………
[2017-05-24 10:55:52,853] {base_task_runner.py:95} INFO - Subtask:   File "/opt/anaconda2/lib/python2.7/site-packages/jinja2/loaders.py", line 187, in get_source
[2017-05-24 10:55:52,853] {base_task_runner.py:95} INFO - Subtask:     raise TemplateNotFound(template)
[2017-05-24 10:55:52,854] {base_task_runner.py:95} INFO - Subtask: jinja2.exceptions.TemplateNotFound: sh /home/airflow/sqoop3.sh

这是airflow的一个bug,默认会使用jinja2的语法来解析task.

bash_cmd = ‘sh /home/airflow/sqoop3.sh‘ 修改为
bash_cmd = ‘{{"sh /home/airflow/sqoop3.sh"}}‘ 即可

测试成功.或者使用:

bash_cmd = ‘‘‘
sh /home/airflow/sqoop3.sh
‘‘‘

也可以执行成功.

1.2 测试增量抽取

新建个dag,sqoop8.

dag = DAG(‘sqoop8‘, default_args=default_args,schedule_interval=None)

bash_cmd = ‘‘‘
sh /home/airflow/sqoop4.sh %s
‘‘‘ % ‘2017-05-24‘

t1 = BashOperator(
    task_id=‘sqoop8‘,
    bash_command=bash_cmd,
    dag=dag)

创建shell:

hive -e "alter table airflow.ACCT_FEE_ARCH drop partition(p_day=‘$1‘);"
sqoop import --connect jdbc:oracle:thin:@//AAA/BBB --username CCC --password ‘DDD‘ --query " select SERIALNO,  ……
to_char(SYNCHDATE, ‘YYYY-MM-DD HH24:mi:ss‘) as SYNCHDATE , to_char(SysDate,‘YYYY-MM-DD HH24:mi:ss‘) as ETL_IN_DT from XDGL.ACCT_FEE_ARCH where SYNCHDATE < (TO_DATE(‘$1‘, ‘YYYY-MM-DD‘) +1) and SYNCHDATE >= (TO_DATE(‘$1‘, ‘YYYY-MM-DD‘)) and \$CONDITIONS " --hcatalog-database airflow --hcatalog-table ACCT_FEE_ARCH --hcatalog-storage-stanza ‘stored as ORC‘ --hive-partition-key p_day --hive-partition-value $1  --hive-delims-replacement " " -m 1

2.测试hive任务

上面以shell方式执行了hive truncate任务,下面以命令的方式执行sql文件.

创建sqoop9:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.models import Variable

default_args = {
    ‘owner‘: ‘yangxw‘,
    ‘depends_on_past‘: False,
    ‘start_date‘: datetime(2017, 5, 23)
}

dag = DAG(‘hivesh2‘, default_args=default_args,schedule_interval=None)
str1 = Variable.get("str1")
bash_cmd = ‘‘‘
hive -f "/home/airflow/hive1.sql"  -hivevar tbname=%s
‘‘‘ % str1

t1 = BashOperator(
    task_id=‘hivesh2‘,
    bash_command=bash_cmd,
    dag=dag)

创建hive sql文件:

insert overwrite table airflow.tab_cnt select ‘${tbname}‘,  count(*) from ${tbname}

在页面上创建变量 str1=airflow.ACCT_FEE_ARCH

执行成功.

3.总结

1.如果执行shell,一定要用jinja2语法或者‘‘‘ ‘‘‘:

bash_cmd = ‘{{" sh /home/airflow/sqoop1.sh"}}‘ 或者

bash_cmd = ‘‘‘

sh /home/airflow/sqoop1.sh

‘‘‘

2.所有的文件必须复制到所有节点

python文件\shell文件\sql文件,必须复制到所有的webserver scheduler worker节点

3.有时候使用python命令编译不出来pyc文件,在页面上只能看到dag名称,不能看到代码及调度等.这时使用

python -m py_compile XXX.py 来编译

4.airflow的dag一旦创建就无法删除,错误的或者多余的dag可以设置为pause模式并隐藏.

5.shell的方式适合执行sqoop任务,可以将truncate table\drop partition和import一步执行完成,不用起两个task来执行.命令的方式适合执行hive 任务,通过hive -f XXX.sql --hivevar a=%s b=%s的方式,动态的传递参数给hive.

来自为知笔记(Wiz)

时间: 2024-08-02 15:12:17

4.airflow测试的相关文章

1.airflow的安装

1.环境准备1.1 安装环境1.2 创建用户2.安装airflow2.1 安装python2.2 安装pip2.3 安装数据库2.4 安装airflow2.4.1 安装主模块2.4.2 安装数据库模块.密码模块2.5 配置airflown2.5.1 设置环境变量2.5.2 修改配置文件3. 启动airflow3.1 初始化数据库3.2 创建用户3.3 启动airflow4.执行任务5.安装celery5.1 安装celery模块5.2 安装celery broker5.2.1 使用RabbitM

【airflow实战系列】 基于 python 的调度和监控工作流的平台

简介 airflow 是一个使用python语言编写的data pipeline调度和监控工作流的平台.Airflow被Airbnb内部用来创建.监控和调整数据管道.任何工作流都可以在这个使用Python来编写的平台上运行. Airflow是一种允许工作流开发人员轻松创建.维护和周期性地调度运行工作流(即有向无环图或成为DAGs)的工具.在Airbnb中,这些工作流包括了如数据存储.增长分析.Email发送.A/B测试等等这些跨越多部门的用例. 这个平台拥有和 Hive.Presto.MySQL

airflow + CeleryExecutor 环境搭建

airflow整合环境搭建 1. 整体结构 mysql -> 后端数据库 redis -> 用于broker CeleryExecutor -> 执行器 2. 环境安装 安装python anaconda环境 添加py用户 # useradd py 设置密码 # passwd py 创建anaconda安装路径 # mkdir /anaconda 赋权 # chown -R py:py /anaconda 上传anaconda安装包并用py用户运行安装程序 $ chmod +x Anac

airflow常用命令

airflow中常用命令 airflow test dag_id task_id execution_date 测试task示例: airflow test example_hello_world_dag hello_task 20180516 airflow run dag_id task_id execution_date 运行task airflow run -A dag_id task_id execution_date 忽略依赖task运行task airflow webserver

开源数据流管道-Luigi vs Azkaban vs Oozie vs Airflow

原文链接:https://www.jianshu.com/p/4ae1faea733b 随着企业的发展,他们的工作流程变得更加复杂,越来越多的有着错综复杂依赖关系的工作流需要增加监控,故障排除.如果没有明确的血缘关系.就可能出现问责问题,对元数据的操作也可能丢失.这就是有向无环图(DAG),数据管道和工作流管理器发挥作用的地方. 复杂的工作流程可以通过DAG来表示.DAG是一种图结构.信息必须沿特定方向在顶点间传递,但信息无法通过循环返回起点.DAG的构建快是数据管道,或者是一个进程的输入成为下

Airflow自定义插件, 使用datax抽数

Airflow自定义插件 Airflow之所以受欢迎的一个重要因素就是它的插件机制.Python成熟类库可以很方便的引入各种插件.在我们实际工作中,必然会遇到官方的一些插件不足够满足需求的时候.这时候,我们可以编写自己的插件.不需要你了解内部原理,甚至不需要很熟悉Python, 反正我连蒙带猜写的. 插件分类 Airflow的插件分为Operator和Sensor两种.Operator是具体要执行的任务插件, Sensor则是条件传感器,当我需要设定某些依赖的时候可以通过不同的sensor来感知

Airflow的部署(全网图文结合最全)

本篇文章只讲Airflow的部署以及再部署中遇到的坑和解决方式 环境准备 Python的安装 python安装的过程中 你可能会遇到各种各样的问题,上网搜各种问题的解法也不尽相同,最关键的是基本没啥效果.在我安装的过程中总结了几点,再执行我下面的流程的时候,一定要一步不落,并且保证环境一定要干净,如果在执行某个步骤的时候出现 已存在的字眼,一定要删掉然后重新执行这一步.(这都是血与泪的教训) #python依赖 yum -y install zlib zlib-devel yum -y inst

AirFlow介绍

AirFlow介绍 一.AirFlow是什么    airflow 是一个编排.调度和监控workflow的平台,由Airbnb开源,现在在Apache Software Foundation 孵化.airflow 将workflow编排为由tasks组成的DAGs(有向无环图),调度器在一组workers上按照指定的依赖关系执行tasks.同时,airflow 提供了丰富的命令行工具和简单易用的用户界面以便用户查看和操作,并且airflow提供了监控和报警系统.   Airflow的调度依赖于

AirFlow 安装配置

airflow 安装配置 airflow 相关软件安装 python 3.6.5 安装 安装依赖程序 : [[email protected] ~]# yum -y install zlib zlib-devel bzip2 bzip2-devel ncurses ncurses-devel readline readline-devel openssl openssl-devel openssl-static xz lzma xz-devel sqlite sqlite-devel gdbm