Airflow 从入门到精通-04-增强 DAG 健壮性

一、使用airflow发送邮件

1、配置SMTP邮件服务

$ cd /root/airflow
$ vi airflow.cfg

修改配置文件 airflow.cfg email smtp 部分 :

...

[email]

# Configuration email backend and whether to
# send email alerts on retry or failure
# Email backend to use
email_backend = airflow.utils.email.send_email_smtp

# Whether email alerts should be sent when a task is retried
default_email_on_retry = True

# Whether email alerts should be sent when a task failed
default_email_on_failure = True

# File that will be used as the template for Email subject (which will be rendered using Jinja2).
# If not set, Airflow uses a base template.
# Example: subject_template = /path/to/my_subject_template_file
# subject_template =

# File that will be used as the template for Email content (which will be rendered using Jinja2).
# If not set, Airflow uses a base template.
# Example: html_content_template = /path/to/my_html_content_template_file
# html_content_template =

[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = smtp.126.com
smtp_starttls = False
smtp_ssl = False
# Example: smtp_user = airflow
smtp_user = demo@126.com
# Example: smtp_password = airflow
smtp_password = demosjiofjoisdji
smtp_port = 25
smtp_mail_from = demo@126.com
smtp_timeout = 30
smtp_retry_limit = 5

[sentry]

# Sentry (https://docs.sentry.io) integration. Here you can supply
# additional configuration options based on the Python platform. See:
# https://docs.sentry.io/error-reporting/configuration/?platform=python.

...

2、测试邮件服务

保存之后,然后在 download_stock_price_v2.py 加入email任务


# 引入EmailOperator
from airflow.operators.email import EmailOperator

...

  email_task = EmailOperator(
        task_id='send_email',
        to='407544577@qq.com',
        subject='Stock Price is downloaded',
        html_content="""<h2>Airflow Email Test.</h2>""",
        dag=dag
    )

    download_task >> save_to_mysql_task >> mysql_task >> email_task

杀掉airflow进程,重启:

# 批量杀进程
ps -ef | grep airflow | grep -v grep | awk '{print "kill -9 "$2}'  | sh

# 删除相关的pid文件,否则重启会报错
[root@quant airflow]# rm -rf airflow-webserver-monitor.pid
[root@quant airflow]# rm -rf airflow-scheduler.*
[root@quant airflow]# rm -rf airflow-webserver.*

# 重启
# 启动web服务
airflow webserver -p 8080 -D

# 启动定时任务
airflow scheduler -D

然后手动触发执行DAG,可以看到,sendmail 邮件任务执行成功,邮箱也可以收到该邮件 ^_^

file

查看该任务日志:
file

查看邮件:
file

注意,在设置邮箱时,一定要检查配置,如果报如下错误:

smtplib.SMTPNotSupportedError: STARTTLS extension not supported by server

则在设置 smtp 配置时,需要将smtp_starttls = False 设置为 False

二、执行失败邮件提示

如果在任务执行失败时,需要邮件提示,则可在默认参数进行配置

default_args = {
    'owner': 'kaiyi',
    'email': ['407544577@qq.com'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 1,
    'retry_delay': timedelta(seconds=30)
}

然后修改保存的路径为不存在的路径,人为的制造一个bug,看下airflow是否会发送一个失败的邮件:
file

然后手动触发,任务报错:
file

file

可以看到,已经收到该任务报错的邮件了
file

三、关于重跑DagRun的配置

如果同时跑多个并行的任务,可能会造成死锁,所以,在一次 run DAG 的时候,需要设置如下参数catchup=False,max_active_runs=1

# [START instantiate_dag]
with DAG(
        dag_id='download_stock_price_v2',
        default_args=default_args,
        description='download stock price and save to local csv files and save to database',
        schedule_interval=None,
        start_date=days_ago(2),
        tags=['quantdata'],
        catchup=False,   # 新增,一个任务执行完了再执行下一个DAG任务
        max_active_runs=1,  # 一个任务执行完了再执行下一个DAG任务
) as dag:

完整 download_stock_price_v2.py 文件:

"""Example DAG demonstrating the usage of the BashOperator."""

from datetime import timedelta
from textwrap import dedent
import yfinance as yf
import mysql.connector

from airflow import DAG
from airflow.operators.python import PythonOperator
# from airflow.operators.mysql_operator import MySqlOperator
from airflow.providers.mysql.operators.mysql import MySqlOperator
from airflow.operators.email import EmailOperator

from airflow.utils.dates import days_ago
from airflow.models import Variable

def download_price(*args, **context):
    stock_list = get_tickers(context)

    # 新增正常的股票(没有退市的或不存在的)
    valid_tickers = []
    for ticker in stock_list:
        dat = yf.Ticker(ticker)
        hist = dat.history(period="1mo")
        # print(type(hist))
        # print(hist.shape)
        # print(os.getcwd())

        if hist.shape[0] > 0:
            valid_tickers.append(ticker)
        else:
            continue

        with open(get_file_path(ticker), 'w') as writer:
            hist.to_csv(writer, index=True)
        print("Finished downloading price data for " + ticker)
    # 增加返回值(用于任务之间数据的传递)
    return valid_tickers

def get_file_path(ticker):
    # NOT SAVE in distributed system
    return f'./{ticker}.csv'

def load_price_data(ticker):
    with open(get_file_path(ticker), 'r') as reader:
        lines = reader.readlines()
        return [[ticker] + line.split(',')[:5] for line in lines if line[:4] != 'Date']

def get_tickers(context):
    # 获取配置的变量Variables
    stock_list = Variable.get("stock_list_json", deserialize_json=True)

    # 如果有配置参数,则使用配置参数的数据(Trigger DAG with parameters)
    stocks = context["dag_run"].conf.get("stocks")
    if stocks:
        stock_list = stocks
    return stock_list

def save_to_mysql_stage(*args, **context):
    # tickers = get_tickers(context)
    # Pull the return_value XCOM from "pulling_task"
    tickers = context['ti'].xcom_pull(task_ids='download_prices')
    print(f"received tickers:{tickers}")

    """
    # 连接数据库(硬编码方式连接)
    mydb = mysql.connector.connect(
        host="98.12.13.14",
        user="root",
        password="Quan988",
        database="demodb",
        port=3306
    )
    """

    # 使用airflow 的 Connections 动态获取配置信息
    from airflow.hooks.base_hook import BaseHook
    conn = BaseHook.get_connection('demodb')

    mydb = mysql.connector.connect(
        host=conn.host,
        user=conn.login,
        password=conn.password,
        database=conn.schema,
        port=conn.port
    )

    mycursor = mydb.cursor()
    for ticker in tickers:
        val = load_price_data(ticker)
        print(f"{ticker} length={len(val)} {val[1]}")

        sql = """INSERT INTO stock_prices_stage
        (ticker, as_of_date, open_price, high_price, low_price, close_price)
        VALUES (%s,%s,%s,%s,%s,%s)"""
        mycursor.executemany(sql, val)

        mydb.commit()
        print(mycursor.rowcount, "record inserted.")

default_args = {
    'owner': 'kaiyi',
    'email': ['407544577@qq.com'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 1,
    'retry_delay': timedelta(seconds=30)
}

# [START instantiate_dag]
with DAG(
        dag_id='download_stock_price_v2',
        default_args=default_args,
        description='download stock price and save to local csv files and save to database',
        schedule_interval=None,
        start_date=days_ago(2),
        tags=['quantdata'],
        catchup=False,   # 新增,一个任务执行完了再执行下一个DAG任务
        max_active_runs=1,  # 一个任务执行完了再执行下一个DAG任务
) as dag:
    # [END instantiate_dag]

    dag.doc_md = """
    This DAG download stock price
    """

    download_task = PythonOperator(
        task_id="download_prices",
        python_callable=download_price,
        provide_context=True
    )

    save_to_mysql_task = PythonOperator(
        task_id="save_to_database",
        python_callable=save_to_mysql_stage,
        provide_context=True
    )

    mysql_task = MySqlOperator(
        task_id="merge_stock_price",
        mysql_conn_id='demodb',
        sql="merge_stock_price.sql",
        dag=dag,
    )

    email_task = EmailOperator(
        task_id='send_email',
        to='407544577@qq.com',
        subject='Stock Price is downloaded',
        html_content="""<h2>Airflow Email Test.</h2>""",
        dag=dag
    )

    download_task >> save_to_mysql_task >> mysql_task >> email_task

为者常成,行者常至