管理 AirFlow 方法

@[toc]

管理 AirFlow 方法

进程管理工具Supervisord

  1. 安装进程管理工具Supervisord管理airflow进程

    easy_install supervisor  #此方法不适用于python3安装(会出现很多问题)
    echo_supervisord_conf > /etc/supervisord.conf
  2. 编辑文件supervisord.conf,添加启动命令
    
    vi /etc/supervisord.conf

[program:airflow_web]
command=/usr/bin/airflow webserver -p 8080

[program:airflow_worker]
command=/usr/bin/airflow worker

[program:airflow_scheduler]
command=/usr/bin/airflow scheduler


>  3. 启动supervisord服务

/usr/bin/supervisord -c /etc/supervisord.conf


>  4. 此时可以用 supervisorctl 来管理airflow服务了

supervisorctl start airflow_web
supervisorctl stop airflow_web
supervisorctl restart airflow_web
supervisorctl stop all


### 进程管理工具 systemd
>  1.  vim   /etc/sysconfig/airflow  # systemd需要调用此文件,一般定义的是airflow的变量

AIRFLOW_CONFIG=/root/airflow/airflow.cfg
AIRFLOW_HOME=/root/airflow


>  2. vim  /usr/lib/systemd/system/airflow-webserver.service  #systemctl 管理的服务名
>  其他的服务也可以使用此方式进行定义

[Unit]
Description=Airflow webserver daemon
After=network.target postgresql.service mysql.service redis.service
Wants=postgresql.service mysql.service redis.service

[Service]
EnvironmentFile=/etc/sysconfig/airflow
User=root
Group=root
Type=simple
ExecStart=/bin/bash -c "export PATH=${PATH}:/usr/local/python3/bin/ ; /usr/local/python3/bin/airflow webserver -p 8080 --pid /root/airflow/service/webserver.pid -A /root/airflow/service/webserver.out -E /root/airflow/service/webserver.err -l /root/airflow/service/webserver.log"

KillMode=process
Restart=on-failure
RestartSec=5s
PrivateTmp=true

[Install]
WantedBy=multi-user.target

>  3. systemctl  daemon-reload  #加载服务
>  4. systemctl  status  airflow-webserver.service  #查看服务状态,后期就可以用这种方式进行管理

### 使用脚本管理airflow
```bash
#!/bin/bash
#=== This is the function about airflow webserver service ===
webserver_status(){
    echo -e "\e[36m  Checking service status, please wait ... \e[0m"
    sleep  3
    Status=`ps -elf| grep "airflow[ -]webserver" |wc -l`
    if [ $Status -eq 0 ] ;then
        echo -e "\e[31m webserver is stop !!! \e[0m"
    else
        echo -e "\e[32m webserver is running... \e[0m"
    fi
}
webserver_start(){
    echo  -e "\e[36m Starting airflow webserver ... \e[0m"
    sleep 1
    nohup /usr/local/python3/bin/airflow  webserver >> /root/airflow/service/webserver.log 2>&1 &
    webserver_status
}
webserver_stop(){
    echo  -e "\e[36m Stopping airflow webserver ... \e[0m"
    sleep 1
    /usr/bin/kill -9 `ps -elf| grep "airflow[ -]webserver" | grep -v grep |awk -F" " ‘{ print $4 }‘`
    rm -rf /root/airflow/airflow-webserver.pid
    webserver_status
}
#=== This is the function about airflow scheduler service ===
scheduler_status(){
    echo -e "\e[36m  Checking service status, please wait ... \e[0m"
    sleep  3
    Status=`ps -elf| grep "airflow[ -]scheduler" |wc -l`
    if [ $Status -eq 0 ] ;then
        echo -e "\e[31m scheduler is stop !!! \e[0m"
    else
        echo -e "\e[32m scheduler is running... \e[0m"
    fi
}
scheduler_start(){
    echo  -e "\e[36m Starting airflow scheduler ... \e[0m"
    sleep 1
    nohup /usr/local/python3/bin/airflow  scheduler >> /root/airflow/service/scheduler.log 2>&1 &
    scheduler_status
}
scheduler_stop(){
    echo  -e "\e[36m Stopping airflow scheduler ... \e[0m"
    sleep 1
    /usr/bin/kill -9 `ps -elf| grep "airflow[ -]scheduler" | grep -v grep |awk -F" " ‘{ print $4 }‘`
    rm -rf /root/airflow/airflow-scheduler.pid
    scheduler_status
}
#=== This is the function about airflow flower service ===
flower_status(){
    echo -e "\e[36m  Checking service status, please wait ... \e[0m"
    sleep  3
    Status=`netstat  -anputl| grep 5555 | grep LISTEN | awk -F" " ‘{ print $7 }‘ | awk -F"/" ‘{ print $1 }‘ |wc -l`
    if [ $Status -eq 0 ] ;then
        echo -e "\e[31m flower is stop !!! \e[0m"
    else
        echo -e "\e[32m flower is running... \e[0m"
    fi
}
flower_start(){
    echo  -e "\e[36m Starting airflow flower ... \e[0m"
    sleep 1
    nohup /usr/local/python3/bin/airflow  flower >> /root/airflow/service/flower.log 2>&1 &
    flower_status
}
flower_stop(){
    echo  -e "\e[36m Stopping airflow flower ... \e[0m"
    sleep 1
    /usr/bin/kill -9 `netstat  -anputl| grep 5555 | grep LISTEN | awk -F" " ‘{ print $7 }‘ | awk -F"/" ‘{ print $1 }‘`
    rm -rf /root/airflow/airflow-flower.pid
    flower_status
}
#=== This is the function about airflow worker service ===
worker_status(){
    echo -e "\e[36m  Checking service status, please wait ... \e[0m"
    sleep  3
    Status=`ps -elf| grep "airflow serve_logs" | grep -v grep | wc -l`
    celeryStatus=`ps -elf| grep celery |grep -v grep | wc -l`
    if [ $Status -eq 0 ] ;then
        if [ $celeryStatus -eq 0 ]; then
            echo -e "\e[31m worker is stop !!! \e[0m"
        else
           echo -e "\e[32m worker is running... \e[0m"
        fi
    else
        echo -e "\e[32m worker is running... \e[0m"
    fi
}
worker_start(){
    echo  -e "\e[36m Starting airflow worker ... \e[0m"
    sleep 1
    nohup /usr/local/python3/bin/airflow  worker >> /root/airflow/service/worker.log 2>&1 &
    worker_status
}
worker_stop(){
    echo  -e "\e[36m Stopping airflow worker ... \e[0m"
    sleep 1
    /usr/bin/kill -9 `ps -elf| grep "airflow serve_logs" | grep -v grep |awk -F" " ‘{ print $4 }‘`
    /usr/bin/kill -9 `ps -elf| grep celery |grep -v grep |awk -F" " ‘{ print $4 }‘`
    rm -rf /root/airflow/airflow-worker.pid
    worker_status
}

#=== This is the startup option for the airflow service ===
case "$2" in
  start)
    case "$1" in
      webserver)
        webserver_start
        ;;
      worker)
        worker_start
        ;;
      scheduler)
        scheduler_start
        ;;
      flower)
        flower_start
        ;;
      all)
        webserver_start
        scheduler_start
        flower_start
        worker_start
        ;;
      *)
        echo -e "\n A tool used for starting airflow servicesUsage: airflow.sh {webserver|worker|scheduler|flower|all}  {start|stop|status}"
        usage
        exit 2
      esac
    ;;
  stop)
    case "$1" in
      webserver)
        webserver_stop
        ;;
      worker)
        worker_stop
        ;;
      scheduler)
        scheduler_stop
        ;;
      flower)
        flower_stop
        ;;
      all)
        worker_stop
        flower_stop
        scheduler_stop
        webserver_stop
        ;;
      *)
        echo -e "\n A tool used for starting airflow servicesUsage: airflow.sh {webserver|worker|scheduler|flower|all}  {start|stop|status}"
        usage
        exit 3
      esac
    ;;
  status)
    case "$1" in
      webserver)
        webserver_status
        ;;
      worker)
        worker_status
        ;;
      scheduler)
        scheduler_status
        ;;
      flower)
        flower_status
        ;;
      all)
        webserver_status
        scheduler_status
        flower_status
        worker_status
        ;;
      *)
        echo -e "\n A tool used for starting airflow servicesUsage: airflow.sh {webserver|worker|scheduler|flower|all}  {start|stop|status}"
        usage
        exit 4
      esac
    ;;
  *)
    echo -e "\n A tool used for starting airflow servicesUsage: airflow.sh {webserver|worker|scheduler|flower|all}  {start|stop|status}"
    usage
    exit 1
esac

获取日志信息的改造

  1. 进入incubator-airflow/airflow/www/
  2. 修改views.py
    在 class Airflow(BaseView)中添加下面代码
    
    @expose(‘/logs‘)
    @login_required
    @wwwutils.action_logging
    def logs(self):
    BASE_LOG_FOLDER = os.path.expanduser(
    conf.get(‘core‘, ‘BASE_LOG_FOLDER‘))
    dag_id = request.args.get(‘dag_id‘)
    task_id = request.args.get(‘task_id‘)
    execution_date = request.args.get(‘execution_date‘)
    dag = dagbag.get_dag(dag_id)
    log_relative = "{dag_id}/{task_id}/{execution_date}".format(
    **locals())
    loc = os.path.join(BASE_LOG_FOLDER, log_relative)
    loc = loc.format(**locals())
    log = ""
    TI = models.TaskInstance
    session = Session()
    dttm = dateutil.parser.parse(execution_date)
    ti = session.query(TI).filter(
    TI.dag_id == dag_id, TI.task_id == task_id,
    TI.execution_date == dttm).first()
    dttm = dateutil.parser.parse(execution_date)
    form = DateTimeForm(data={‘execution_date‘: dttm})
    if ti:
        host = ti.hostname
        log_loaded = False

        if os.path.exists(loc):
            try:
                f = open(loc)
                log += "".join(f.readlines())
                f.close()
                log_loaded = True
            except:
                log = "*** Failed to load local log file: {0}.\n".format(loc)
        else:
            WORKER_LOG_SERVER_PORT =                 conf.get(‘celery‘, ‘WORKER_LOG_SERVER_PORT‘)
            url = os.path.join(
                "http://{host}:{WORKER_LOG_SERVER_PORT}/log", log_relative
            ).format(**locals())
            log += "*** Log file isn‘t local.\n"
            log += "*** Fetching here: {url}\n".format(**locals())
            try:
                import requests
                timeout = None  # No timeout
                try:
                    timeout = conf.getint(‘webserver‘, ‘log_fetch_timeout_sec‘)
                except (AirflowConfigException, ValueError):
                    pass

                response = requests.get(url, timeout=timeout)
                response.raise_for_status()
                log += ‘\n‘ + response.text
                log_loaded = True
            except:
                log += "*** Failed to fetch log file from worker.\n".format(
                    **locals())

        if not log_loaded:
            # load remote logs
            remote_log_base = conf.get(‘core‘, ‘REMOTE_BASE_LOG_FOLDER‘)
            remote_log = os.path.join(remote_log_base, log_relative)
            log += ‘\n*** Reading remote logs...\n‘

            # S3
            if remote_log.startswith(‘s3:/‘):
                log += log_utils.S3Log().read(remote_log, return_error=True)

            # GCS
            elif remote_log.startswith(‘gs:/‘):
                log += log_utils.GCSLog().read(remote_log, return_error=True)

            # unsupported
            elif remote_log:
                log += ‘*** Unsupported remote log location.‘

        session.commit()
        session.close()

    if PY2 and not isinstance(log, unicode):
        log = log.decode(‘utf-8‘)

    title = "Log"

    return wwwutils.json_response(log)
>  3. 重启服务,访问url如:

http://localhost:8085/admin/airflow/logs?task_id=run_after_loop&dag_id=example_bash_operator&execution_date=2018-01-11

>  就可以拿到这个任务在execution_date=2018-01-11的日志

### 删除DAG
>  由于dag的删除现在官方没有暴露直接的api,而完整的删除又牵扯到多个表,总结出删除dag的sql如下

set @dag_id = ‘BAD_DAG‘;
delete from airflow.xcom where dag_id = @dag_id;
delete from airflow.task_instance where dag_id = @dag_id;
delete from airflow.sla_miss where dag_id = @dag_id;
delete from airflow.log where dag_id = @dag_id;
delete from airflow.job where dag_id = @dag_id;
delete from airflow.dag_run where dag_id = @dag_id;
delete from airflow.dag where dag_id = @dag_id;


### 集群管理脚本
#### 集群服务上线脚本
```bash
#!/usr/bin/env bash
function usage() {
    echo -e "\n A tool used for starting airflow services
Usage: 200.sh {webserver|worker|scheduler|flower}
"
}

PORT=8081
ROLE=webserver
ENV_ARGS=""
check_alive() {
    PID=`netstat -nlpt | grep $PORT | awk ‘{print $7}‘ | awk -F "/" ‘{print $1}‘`
    [ -n "$PID" ] && return 0 || return 1
}

check_scheduler_alive() {
    PIDS=`ps -ef | grep "/usr/local/bin/airflow scheduler" | grep "python" | awk ‘{print $2}‘`
    [ -n "$PIDS" ] && return 0 || return 1
}

function get_host_ip(){
    local host=$(ifconfig | grep "inet " | grep "\-\->" | awk ‘{print $2}‘ | tail -1)
    if [[ -z "$host" ]]; then
        host=$(ifconfig | grep "inet " | grep "broadcast" | awk ‘{print $2}‘ | tail -1)
    fi
    echo "${host}"
}

start_service() {
    if [ $ROLE = ‘scheduler‘ ];then
        check_scheduler_alive
    else
        check_alive
    fi
    if [ $? -ne 0 ];then
        nohup airflow $ROLE $ENV_ARGS > $BASE_LOG_DIR/$ROLE/$ROLE.log 2>&1 &
        sleep 5
        if [ $ROLE = ‘scheduler‘ ];then
            check_scheduler_alive
        else
            check_alive
        fi
        if [ $? -ne 0 ];then
            echo "service start error"
            exit 1
        else
            echo "service start success"
            exit 0
        fi
    else
        echo "service alreay started"
        exit 0
    fi
}

function main() {
    if [ -z "${POOL}" ]; then
        echo "the environment variable POOL cannot be empty"
        exit 1
    fi
    source /data0/hcp/sbin/init-hcp.sh
    case "$1" in
        webserver)
            echo "starting airflow webserver"
            ROLE=webserver
            PORT=8081
            start_service
            ;;
        worker)
            echo "starting airflow worker"
            ROLE=worker
            PORT=8793
            local host_ip=$(get_host_ip)
            ENV_ARGS="-cn ${host_ip}@${host_ip}"
            start_service
            ;;
        flower)
            echo "starting airflow flower"
            ROLE=flower
            PORT=5555
            start_service
            ;;
        scheduler)
            echo "starting airflow scheduler"
            ROLE=scheduler
            start_service
            ;;
        *)
            usage
            exit 1
    esac
}

main "[email protected]"

集群服务下线脚本

#!/usr/bin/env bash
function usage() {
    echo -e "\n A tool used for stop airflow services
Usage: 200.sh {webserver|worker|scheduler|flower}
"
}

function get_host_ip(){
    local host=$(ifconfig | grep "inet " | grep "\-\->" | awk ‘{print $2}‘ | tail -1)
    if [[ -z "$host" ]]; then
        host=$(ifconfig | grep "inet " | grep "broadcast" | awk ‘{print $2}‘ | tail -1)
    fi
    echo "${host}"
}

function main() {
    if [ -z "${POOL}" ]; then
        echo "the environment variable POOL cannot be empty"
        exit 1
    fi
    source /data0/hcp/sbin/init-hcp.sh
    case "$1" in
        webserver)
            echo "stopping airflow webserver"
            cat $AIRFLOW_HOME/airflow-webserver.pid | xargs kill -9
            ;;
        worker)
            echo "stopping airflow worker"
            PORT=8793
            PID=`netstat -nlpt | grep $PORT | awk ‘{print $7}‘ | awk -F "/" ‘{print $1}‘`
            kill -9 $PID
            local host_ip=$(get_host_ip)
            ps -ef | grep celeryd | grep ${host_ip}@${host_ip} | awk ‘{print $2}‘ | xargs kill -9
            ;;
        flower)
            echo "stopping airflow flower"
            PORT=5555
            PID=`netstat -nlpt | grep $PORT | awk ‘{print $7}‘ | awk -F "/" ‘{print $1}‘`
            kill -9 $PID
            start_service
            ;;
        scheduler)
            echo "stopping airflow scheduler"
            PID=`ps -ef | grep "/usr/local/bin/airflow scheduler" | grep "python" | awk ‘{print $2}‘`
            kill -9 $PID
            ;;
        *)
            usage
            exit 1
    esac
}

main "[email protected]"

修改ariflow 时区问题

airflow默认使用utc时间,在中国时区需要用+8小时就是本地时间,下面把airflow全面修改为中国时区,带大家改airflow源码,这里主要针对airflow版本是1.10.0 进行修改,其它版本大同小异,参照修改即可

  1. 在airflow家目录下修改airflow.cfg,设置
    default_timezone = Asia/Shanghai
  2. 进入airflow包的安装位置,也就是site-packages的位置,以下修改文件均为相对位置
    这我安装airflow包的位置(大家自行参考)
    cd /usr/local/python3/lib/python3.6/site-packages/airflow
  3. 修改 utils/timezone.py
    #在 utc = pendulum.timezone(‘UTC’) 这行(第27行)代码下添加,
    from airflow import configuration as conf
    try:
    tz = conf.get("core", "default_timezone")
    if tz == "system":
    utc = pendulum.local_timezone()
    else:
    utc = pendulum.timezone(tz)
    except Exception:
    pass
    #修改utcnow()函数 (在第69行)
    原代码 d = dt.datetime.utcnow()
    修改为 d = dt.datetime.now()
  4. 修改 utils/sqlalchemy.py
    
    #在utc = pendulum.timezone(‘UTC’) 这行(第37行)代码下添加
    from airflow import configuration as conf
    try:
    tz = conf.get("core", "default_timezone")
    if tz == "system":
    utc = pendulum.local_timezone()
    else:
    utc = pendulum.timezone(tz)
    except Exception:
    pass

注释 utils/sqlalchemy.py中的cursor.execute(“SET time_zone = ‘+00:00’”) (第124行)


>  5. 修改 www/templates/admin/master.html(第31行)
```python
把代码 var UTCseconds = (x.getTime() + x.getTimezoneOffset()*60*1000);
改为 var UTCseconds = x.getTime();

把代码 "timeFormat":"H:i:s %UTC%",
改为  "timeFormat":"H:i:s",
  1. 最后重启airflow-webserver即可

原文地址:https://blog.51cto.com/xiaoqiangjs/2462918

时间: 2024-10-21 06:04:02

管理 AirFlow 方法的相关文章

我们小组项目需求变更管理的方法。。

在每个项目中,客户会在原来的需求基础上进行需求改变或者增加需求内容,所以,一个需求变更管理方法在设计项目中有着十分重要的作用,用户可能在初期阶段对自己的 需求不是清楚,只能根据客户在深入的了解后才知道自己还有哪些需求是需要的.在我们的项目中选择了一个分级需求管理模式,那就是将需求进行分级: 一级需求:是客户提出的需求中最为重要,最需要先实现的需求,在本项目中,我们的客户需要一个简易的通讯软件,所以最重要.最为基本的是实现双方的信息传递和信息输出. 二级需求:它不会影响一级需求的实现,但是没有实现

OC7_复合类内存管理(setter方法)

// // Person.h // OC7_复合类内存管理(setter方法) // // Created by zhangxueming on 15/6/18. // Copyright (c) 2015年 zhangxueming. All rights reserved. // #import <Foundation/Foundation.h> #import "Dog.h" @interface Person : NSObject { Dog *_dog; } -

windows下git简单使用及分支管理使用方法

 Windows上git的简单使用 git客户端安装(略) 1)生成ssh密钥: $ ssh-keygen -t rsa -C "[email protected]"  Generating public/private rsa key pair.  Enter file in which to save the key (/c/Users/bunny/.ssh/id_rsa):  Created directory '/c/Users/bunny/.ssh'.  Enter pass

obj-c9[[NSDate,{Category分类,Extension,管理&#39;私有”方法,Protocol (协议)}]

#import <Foundation/Foundation.h>//#import "Person.h"#import "NSString+SayHiMessage.h"#import "NSMutableArray+ChangeArray.h" #import "NSString+ExchangeChineseToEnglish.h" #import "NSString+EmailValidation

SVN管理 使用方法

帐号配置: cat authz  [groups] g_admin = admin,admin_rsync g_库名 = auser,buser,cuser,duser #库目录权限分配置 [linuxspace:/] @g_admin = rw @g_PMC = r * = [linuxspace:/01workdir] @g_admin = rw @g_PMC = r auser = rw *       = cat passwd  [users] admin = pass admin_rs

规范采购管理的方法

规范采购管理的方法 --摘自<公司开了,你该这样管理>作者:张国祥 一.采购流程细分.采购责任细分 我们知道采购活动通常分为几个步骤:采购计划.市场调查.货比三家.讨价还价.选择供应商.合同草签.合同审批.下单订货.货物验收.品质检验.生产反馈.货款结算.不少小企业,这一系列流程活动几乎是一人承担,责任自然也在一个人身上.采购员在整个采购活动中拥有绝对的权力.采购员与供应商之间有地下交易,是十分方便的事情. 如果我们把以上流程活动分派给不同的岗位员工担任,控制就变得非常简单了.生产单位下达采购

Sublime Text 3中文乱码解决方法以及安装包管理器方法

一般出现乱码是因为文本采用了GBK编码格式,Sublime Text默认不支持GBK编码. 安装包管理器 简单安装 使用Ctrl+`快捷键或者通过View->Show Console菜单打开命令行,粘贴如下代码: import urllib.request,os; pf = 'Package Control.sublime-package'; ipp = sublime.installed_packages_path(); urllib.request.install_opener( urlli

通过weblogic自带脚本正常关闭受管理服务器方法

最近应工作需要,需要搭建weblogic中间件.其中遇到一些列的问题,例如如何在关闭服务器之间,正常关闭weblogic服务.一次非正常关闭weblogic服务带来的麻烦,觉得应该为刚入门weblogic的朋友写点东西.虽然看到网上有各种各样的weblogic资料,但是涉及关闭weblogic服务方面的资料比较少.语言组织能力较差,请轻喷! 常用关闭weblogic的服务方法是使用weblogic自带的stopManagedWebLogic.sh脚本,具体使用方法这里不再赘述.但经常遇见管理失败

干货|工厂车间导入5S管理的方法步骤

5s管理对加工厂公司管理员而言可以说是耳熟能详,5S管理以现场管理为关键.以产品服务为导向性的管理方案.那麼,工厂车间该怎样导入5S管理方法? 要成功地实行5S管理就要有一个非常系统的方法,只有全体人员掌了实行步聚,才能使5S管理方法完全地落到实处,关键有下列几个层面: 1.塑造5S管理气氛充足地运用标语.宣传语.宣传牌,让每一职工都能搞清楚5S促进是企业提高企业品牌形象.提升质量.替企业节省成本费的这项最好是的主题活动,是公司取得成功的有效途径. 2.要完全了解5S要旨5S管理的推动要表明管理