@[toc]
管理 AirFlow 方法
进程管理工具Supervisord
- 安装进程管理工具Supervisord管理airflow进程
easy_install supervisor #此方法不适用于python3安装(会出现很多问题) echo_supervisord_conf > /etc/supervisord.conf
- 编辑文件supervisord.conf,添加启动命令
vi /etc/supervisord.conf
[program:airflow_web]
command=/usr/bin/airflow webserver -p 8080
[program:airflow_worker]
command=/usr/bin/airflow worker
[program:airflow_scheduler]
command=/usr/bin/airflow scheduler
> 3. 启动supervisord服务
/usr/bin/supervisord -c /etc/supervisord.conf
> 4. 此时可以用 supervisorctl 来管理airflow服务了
supervisorctl start airflow_web
supervisorctl stop airflow_web
supervisorctl restart airflow_web
supervisorctl stop all
### 进程管理工具 systemd
> 1. vim /etc/sysconfig/airflow # systemd需要调用此文件,一般定义的是airflow的变量
AIRFLOW_CONFIG=/root/airflow/airflow.cfg
AIRFLOW_HOME=/root/airflow
> 2. vim /usr/lib/systemd/system/airflow-webserver.service #systemctl 管理的服务名
> 其他的服务也可以使用此方式进行定义
[Unit]
Description=Airflow webserver daemon
After=network.target postgresql.service mysql.service redis.service
Wants=postgresql.service mysql.service redis.service
[Service]
EnvironmentFile=/etc/sysconfig/airflow
User=root
Group=root
Type=simple
ExecStart=/bin/bash -c "export PATH=${PATH}:/usr/local/python3/bin/ ; /usr/local/python3/bin/airflow webserver -p 8080 --pid /root/airflow/service/webserver.pid -A /root/airflow/service/webserver.out -E /root/airflow/service/webserver.err -l /root/airflow/service/webserver.log"
KillMode=process
Restart=on-failure
RestartSec=5s
PrivateTmp=true
[Install]
WantedBy=multi-user.target
> 3. systemctl daemon-reload #加载服务
> 4. systemctl status airflow-webserver.service #查看服务状态,后期就可以用这种方式进行管理
### 使用脚本管理airflow
```bash
#!/bin/bash
#=== This is the function about airflow webserver service ===
webserver_status(){
echo -e "\e[36m Checking service status, please wait ... \e[0m"
sleep 3
Status=`ps -elf| grep "airflow[ -]webserver" |wc -l`
if [ $Status -eq 0 ] ;then
echo -e "\e[31m webserver is stop !!! \e[0m"
else
echo -e "\e[32m webserver is running... \e[0m"
fi
}
webserver_start(){
echo -e "\e[36m Starting airflow webserver ... \e[0m"
sleep 1
nohup /usr/local/python3/bin/airflow webserver >> /root/airflow/service/webserver.log 2>&1 &
webserver_status
}
webserver_stop(){
echo -e "\e[36m Stopping airflow webserver ... \e[0m"
sleep 1
/usr/bin/kill -9 `ps -elf| grep "airflow[ -]webserver" | grep -v grep |awk -F" " ‘{ print $4 }‘`
rm -rf /root/airflow/airflow-webserver.pid
webserver_status
}
#=== This is the function about airflow scheduler service ===
scheduler_status(){
echo -e "\e[36m Checking service status, please wait ... \e[0m"
sleep 3
Status=`ps -elf| grep "airflow[ -]scheduler" |wc -l`
if [ $Status -eq 0 ] ;then
echo -e "\e[31m scheduler is stop !!! \e[0m"
else
echo -e "\e[32m scheduler is running... \e[0m"
fi
}
scheduler_start(){
echo -e "\e[36m Starting airflow scheduler ... \e[0m"
sleep 1
nohup /usr/local/python3/bin/airflow scheduler >> /root/airflow/service/scheduler.log 2>&1 &
scheduler_status
}
scheduler_stop(){
echo -e "\e[36m Stopping airflow scheduler ... \e[0m"
sleep 1
/usr/bin/kill -9 `ps -elf| grep "airflow[ -]scheduler" | grep -v grep |awk -F" " ‘{ print $4 }‘`
rm -rf /root/airflow/airflow-scheduler.pid
scheduler_status
}
#=== This is the function about airflow flower service ===
flower_status(){
echo -e "\e[36m Checking service status, please wait ... \e[0m"
sleep 3
Status=`netstat -anputl| grep 5555 | grep LISTEN | awk -F" " ‘{ print $7 }‘ | awk -F"/" ‘{ print $1 }‘ |wc -l`
if [ $Status -eq 0 ] ;then
echo -e "\e[31m flower is stop !!! \e[0m"
else
echo -e "\e[32m flower is running... \e[0m"
fi
}
flower_start(){
echo -e "\e[36m Starting airflow flower ... \e[0m"
sleep 1
nohup /usr/local/python3/bin/airflow flower >> /root/airflow/service/flower.log 2>&1 &
flower_status
}
flower_stop(){
echo -e "\e[36m Stopping airflow flower ... \e[0m"
sleep 1
/usr/bin/kill -9 `netstat -anputl| grep 5555 | grep LISTEN | awk -F" " ‘{ print $7 }‘ | awk -F"/" ‘{ print $1 }‘`
rm -rf /root/airflow/airflow-flower.pid
flower_status
}
#=== This is the function about airflow worker service ===
worker_status(){
echo -e "\e[36m Checking service status, please wait ... \e[0m"
sleep 3
Status=`ps -elf| grep "airflow serve_logs" | grep -v grep | wc -l`
celeryStatus=`ps -elf| grep celery |grep -v grep | wc -l`
if [ $Status -eq 0 ] ;then
if [ $celeryStatus -eq 0 ]; then
echo -e "\e[31m worker is stop !!! \e[0m"
else
echo -e "\e[32m worker is running... \e[0m"
fi
else
echo -e "\e[32m worker is running... \e[0m"
fi
}
worker_start(){
echo -e "\e[36m Starting airflow worker ... \e[0m"
sleep 1
nohup /usr/local/python3/bin/airflow worker >> /root/airflow/service/worker.log 2>&1 &
worker_status
}
worker_stop(){
echo -e "\e[36m Stopping airflow worker ... \e[0m"
sleep 1
/usr/bin/kill -9 `ps -elf| grep "airflow serve_logs" | grep -v grep |awk -F" " ‘{ print $4 }‘`
/usr/bin/kill -9 `ps -elf| grep celery |grep -v grep |awk -F" " ‘{ print $4 }‘`
rm -rf /root/airflow/airflow-worker.pid
worker_status
}
#=== This is the startup option for the airflow service ===
case "$2" in
start)
case "$1" in
webserver)
webserver_start
;;
worker)
worker_start
;;
scheduler)
scheduler_start
;;
flower)
flower_start
;;
all)
webserver_start
scheduler_start
flower_start
worker_start
;;
*)
echo -e "\n A tool used for starting airflow servicesUsage: airflow.sh {webserver|worker|scheduler|flower|all} {start|stop|status}"
usage
exit 2
esac
;;
stop)
case "$1" in
webserver)
webserver_stop
;;
worker)
worker_stop
;;
scheduler)
scheduler_stop
;;
flower)
flower_stop
;;
all)
worker_stop
flower_stop
scheduler_stop
webserver_stop
;;
*)
echo -e "\n A tool used for starting airflow servicesUsage: airflow.sh {webserver|worker|scheduler|flower|all} {start|stop|status}"
usage
exit 3
esac
;;
status)
case "$1" in
webserver)
webserver_status
;;
worker)
worker_status
;;
scheduler)
scheduler_status
;;
flower)
flower_status
;;
all)
webserver_status
scheduler_status
flower_status
worker_status
;;
*)
echo -e "\n A tool used for starting airflow servicesUsage: airflow.sh {webserver|worker|scheduler|flower|all} {start|stop|status}"
usage
exit 4
esac
;;
*)
echo -e "\n A tool used for starting airflow servicesUsage: airflow.sh {webserver|worker|scheduler|flower|all} {start|stop|status}"
usage
exit 1
esac
获取日志信息的改造
- 进入incubator-airflow/airflow/www/
- 修改views.py
在 class Airflow(BaseView)中添加下面代码@expose(‘/logs‘) @login_required @wwwutils.action_logging def logs(self): BASE_LOG_FOLDER = os.path.expanduser( conf.get(‘core‘, ‘BASE_LOG_FOLDER‘)) dag_id = request.args.get(‘dag_id‘) task_id = request.args.get(‘task_id‘) execution_date = request.args.get(‘execution_date‘) dag = dagbag.get_dag(dag_id) log_relative = "{dag_id}/{task_id}/{execution_date}".format( **locals()) loc = os.path.join(BASE_LOG_FOLDER, log_relative) loc = loc.format(**locals()) log = "" TI = models.TaskInstance session = Session() dttm = dateutil.parser.parse(execution_date) ti = session.query(TI).filter( TI.dag_id == dag_id, TI.task_id == task_id, TI.execution_date == dttm).first() dttm = dateutil.parser.parse(execution_date) form = DateTimeForm(data={‘execution_date‘: dttm})
if ti:
host = ti.hostname
log_loaded = False
if os.path.exists(loc):
try:
f = open(loc)
log += "".join(f.readlines())
f.close()
log_loaded = True
except:
log = "*** Failed to load local log file: {0}.\n".format(loc)
else:
WORKER_LOG_SERVER_PORT = conf.get(‘celery‘, ‘WORKER_LOG_SERVER_PORT‘)
url = os.path.join(
"http://{host}:{WORKER_LOG_SERVER_PORT}/log", log_relative
).format(**locals())
log += "*** Log file isn‘t local.\n"
log += "*** Fetching here: {url}\n".format(**locals())
try:
import requests
timeout = None # No timeout
try:
timeout = conf.getint(‘webserver‘, ‘log_fetch_timeout_sec‘)
except (AirflowConfigException, ValueError):
pass
response = requests.get(url, timeout=timeout)
response.raise_for_status()
log += ‘\n‘ + response.text
log_loaded = True
except:
log += "*** Failed to fetch log file from worker.\n".format(
**locals())
if not log_loaded:
# load remote logs
remote_log_base = conf.get(‘core‘, ‘REMOTE_BASE_LOG_FOLDER‘)
remote_log = os.path.join(remote_log_base, log_relative)
log += ‘\n*** Reading remote logs...\n‘
# S3
if remote_log.startswith(‘s3:/‘):
log += log_utils.S3Log().read(remote_log, return_error=True)
# GCS
elif remote_log.startswith(‘gs:/‘):
log += log_utils.GCSLog().read(remote_log, return_error=True)
# unsupported
elif remote_log:
log += ‘*** Unsupported remote log location.‘
session.commit()
session.close()
if PY2 and not isinstance(log, unicode):
log = log.decode(‘utf-8‘)
title = "Log"
return wwwutils.json_response(log)
> 3. 重启服务,访问url如:
> 就可以拿到这个任务在execution_date=2018-01-11的日志
### 删除DAG
> 由于dag的删除现在官方没有暴露直接的api,而完整的删除又牵扯到多个表,总结出删除dag的sql如下
set @dag_id = ‘BAD_DAG‘;
delete from airflow.xcom where dag_id = @dag_id;
delete from airflow.task_instance where dag_id = @dag_id;
delete from airflow.sla_miss where dag_id = @dag_id;
delete from airflow.log where dag_id = @dag_id;
delete from airflow.job where dag_id = @dag_id;
delete from airflow.dag_run where dag_id = @dag_id;
delete from airflow.dag where dag_id = @dag_id;
### 集群管理脚本
#### 集群服务上线脚本
```bash
#!/usr/bin/env bash
function usage() {
echo -e "\n A tool used for starting airflow services
Usage: 200.sh {webserver|worker|scheduler|flower}
"
}
PORT=8081
ROLE=webserver
ENV_ARGS=""
check_alive() {
PID=`netstat -nlpt | grep $PORT | awk ‘{print $7}‘ | awk -F "/" ‘{print $1}‘`
[ -n "$PID" ] && return 0 || return 1
}
check_scheduler_alive() {
PIDS=`ps -ef | grep "/usr/local/bin/airflow scheduler" | grep "python" | awk ‘{print $2}‘`
[ -n "$PIDS" ] && return 0 || return 1
}
function get_host_ip(){
local host=$(ifconfig | grep "inet " | grep "\-\->" | awk ‘{print $2}‘ | tail -1)
if [[ -z "$host" ]]; then
host=$(ifconfig | grep "inet " | grep "broadcast" | awk ‘{print $2}‘ | tail -1)
fi
echo "${host}"
}
start_service() {
if [ $ROLE = ‘scheduler‘ ];then
check_scheduler_alive
else
check_alive
fi
if [ $? -ne 0 ];then
nohup airflow $ROLE $ENV_ARGS > $BASE_LOG_DIR/$ROLE/$ROLE.log 2>&1 &
sleep 5
if [ $ROLE = ‘scheduler‘ ];then
check_scheduler_alive
else
check_alive
fi
if [ $? -ne 0 ];then
echo "service start error"
exit 1
else
echo "service start success"
exit 0
fi
else
echo "service alreay started"
exit 0
fi
}
function main() {
if [ -z "${POOL}" ]; then
echo "the environment variable POOL cannot be empty"
exit 1
fi
source /data0/hcp/sbin/init-hcp.sh
case "$1" in
webserver)
echo "starting airflow webserver"
ROLE=webserver
PORT=8081
start_service
;;
worker)
echo "starting airflow worker"
ROLE=worker
PORT=8793
local host_ip=$(get_host_ip)
ENV_ARGS="-cn ${host_ip}@${host_ip}"
start_service
;;
flower)
echo "starting airflow flower"
ROLE=flower
PORT=5555
start_service
;;
scheduler)
echo "starting airflow scheduler"
ROLE=scheduler
start_service
;;
*)
usage
exit 1
esac
}
main "[email protected]"
集群服务下线脚本
#!/usr/bin/env bash
function usage() {
echo -e "\n A tool used for stop airflow services
Usage: 200.sh {webserver|worker|scheduler|flower}
"
}
function get_host_ip(){
local host=$(ifconfig | grep "inet " | grep "\-\->" | awk ‘{print $2}‘ | tail -1)
if [[ -z "$host" ]]; then
host=$(ifconfig | grep "inet " | grep "broadcast" | awk ‘{print $2}‘ | tail -1)
fi
echo "${host}"
}
function main() {
if [ -z "${POOL}" ]; then
echo "the environment variable POOL cannot be empty"
exit 1
fi
source /data0/hcp/sbin/init-hcp.sh
case "$1" in
webserver)
echo "stopping airflow webserver"
cat $AIRFLOW_HOME/airflow-webserver.pid | xargs kill -9
;;
worker)
echo "stopping airflow worker"
PORT=8793
PID=`netstat -nlpt | grep $PORT | awk ‘{print $7}‘ | awk -F "/" ‘{print $1}‘`
kill -9 $PID
local host_ip=$(get_host_ip)
ps -ef | grep celeryd | grep ${host_ip}@${host_ip} | awk ‘{print $2}‘ | xargs kill -9
;;
flower)
echo "stopping airflow flower"
PORT=5555
PID=`netstat -nlpt | grep $PORT | awk ‘{print $7}‘ | awk -F "/" ‘{print $1}‘`
kill -9 $PID
start_service
;;
scheduler)
echo "stopping airflow scheduler"
PID=`ps -ef | grep "/usr/local/bin/airflow scheduler" | grep "python" | awk ‘{print $2}‘`
kill -9 $PID
;;
*)
usage
exit 1
esac
}
main "[email protected]"
修改ariflow 时区问题
airflow默认使用utc时间,在中国时区需要用+8小时就是本地时间,下面把airflow全面修改为中国时区,带大家改airflow源码,这里主要针对airflow版本是1.10.0 进行修改,其它版本大同小异,参照修改即可
- 在airflow家目录下修改airflow.cfg,设置
default_timezone = Asia/Shanghai- 进入airflow包的安装位置,也就是site-packages的位置,以下修改文件均为相对位置
这我安装airflow包的位置(大家自行参考)
cd /usr/local/python3/lib/python3.6/site-packages/airflow- 修改 utils/timezone.py
#在 utc = pendulum.timezone(‘UTC’) 这行(第27行)代码下添加, from airflow import configuration as conf try: tz = conf.get("core", "default_timezone") if tz == "system": utc = pendulum.local_timezone() else: utc = pendulum.timezone(tz) except Exception: pass #修改utcnow()函数 (在第69行) 原代码 d = dt.datetime.utcnow() 修改为 d = dt.datetime.now()
- 修改 utils/sqlalchemy.py
#在utc = pendulum.timezone(‘UTC’) 这行(第37行)代码下添加 from airflow import configuration as conf try: tz = conf.get("core", "default_timezone") if tz == "system": utc = pendulum.local_timezone() else: utc = pendulum.timezone(tz) except Exception: pass
注释 utils/sqlalchemy.py中的cursor.execute(“SET time_zone = ‘+00:00’”) (第124行)
> 5. 修改 www/templates/admin/master.html(第31行)
```python
把代码 var UTCseconds = (x.getTime() + x.getTimezoneOffset()*60*1000);
改为 var UTCseconds = x.getTime();
把代码 "timeFormat":"H:i:s %UTC%",
改为 "timeFormat":"H:i:s",
- 最后重启airflow-webserver即可
原文地址:https://blog.51cto.com/xiaoqiangjs/2462918