Airflow 从入门到精通-05-宏变量,Jinja 和 Execution Date

一、宏变量

变量和宏可以在模板中使用(参见 Jinja 模板 部分)

默认变量

默认情况下,Airflow 引擎会传递一些可在所有模板中访问的变量,也可以理解为系统变量。
file

示例:

# The execution date as YYYY-MM-DD
date = "{{ ds }}"
t = BashOperator(
    task_id='test_env',
    bash_command='/tmp/test.sh ',
    dag=dag,
    env={'EXECUTION_DATE': date})

宏(Macros)

宏是一种向模板公开对象并位于模板中的宏命名空间下的方法。 提供了一些常用的库和方法。
file

Jinja Templating

Airflow 利用 Jinja 模板的强大功能,这可以成为与宏结合使用的强大工具。 例如,假设您想使用 BashOperator 将执行日期作为环境变量传递给 Bash 脚本:

# The execution date as YYYY-MM-DD
date = "{{ ds }}"
t = BashOperator(
    task_id='test_env',
    bash_command='/tmp/test.sh ',
    dag=dag,
    env={'EXECUTION_DATE': date})

这里,{{ ds }} 是一个宏(macro),因为 BashOperator 的 env 参数是用 Jinja 模板化的,所以执行日期将作为 Bash 脚本中名为 EXECUTION_DATE 的环境变量提供。

二、实战

对上一节的发送邮件task增加宏变量:

  email_task = EmailOperator(
        task_id='send_email',
        to='407544577@qq.com',
        subject='Stock Price is downloaded',
        html_content="""<h2>Airflow Email Test.</h2>{{ ds_nodash }} <br/>{{ dag }}<br/> {{ conf }} <br/> {{ ds }} <br/> {{ ti }}""",
        dag=dag
    )

邮件内容:
file

三、Execution Date

定时任务调度,默认设置的为 schedule_interval=None,这里我们可以设置为定时调度,就不需要手动触发了。

# [START instantiate_dag]
with DAG(
        dag_id='download_stock_price_v2',
        default_args=default_args,
        description='download stock price and save to local csv files and save to database',
        schedule_interval=None,
        start_date=days_ago(2),
        tags=['quantdata'],
        catchup=False,   # 新增,一个任务执行完了再执行下一个DAG任务
        max_active_runs=1,  # 一个任务执行完了再执行下一个DAG任务
) as dag:

file

设置定时调度:

# 每2分钟执行一次
 schedule_interval='*/2 * * * *',

设置成功后,可以看到定时任务已经可以自动刷新了:

file

完整代码:
download_stock_price_v2.py

"""Example DAG demonstrating the usage of the BashOperator."""

from datetime import timedelta
from textwrap import dedent
import yfinance as yf
import mysql.connector

from airflow import DAG
from airflow.operators.python import PythonOperator
# from airflow.operators.mysql_operator import MySqlOperator
from airflow.providers.mysql.operators.mysql import MySqlOperator
from airflow.operators.email import EmailOperator

from airflow.utils.dates import days_ago
from airflow.models import Variable

def download_price(*args, **context):
    stock_list = get_tickers(context)

    # 新增正常的股票(没有退市的或不存在的)
    valid_tickers = []
    for ticker in stock_list:
        dat = yf.Ticker(ticker)
        hist = dat.history(period="1mo")
        # print(type(hist))
        # print(hist.shape)
        # print(os.getcwd())

        if hist.shape[0] > 0:
            valid_tickers.append(ticker)
        else:
            continue

        with open(get_file_path(ticker), 'w') as writer:
            hist.to_csv(writer, index=True)
        print("Finished downloading price data for " + ticker)
    # 增加返回值(用于任务之间数据的传递)
    return valid_tickers

def get_file_path(ticker):
    # NOT SAVE in distributed system
    return f'./{ticker}.csv'

def load_price_data(ticker):
    with open(get_file_path(ticker), 'r') as reader:
        lines = reader.readlines()
        return [[ticker] + line.split(',')[:5] for line in lines if line[:4] != 'Date']

def get_tickers(context):
    # 获取配置的变量Variables
    stock_list = Variable.get("stock_list_json", deserialize_json=True)

    # 如果有配置参数,则使用配置参数的数据(Trigger DAG with parameters)
    stocks = context["dag_run"].conf.get("stocks")
    if stocks:
        stock_list = stocks
    return stock_list

def save_to_mysql_stage(*args, **context):
    # tickers = get_tickers(context)
    # Pull the return_value XCOM from "pulling_task"
    tickers = context['ti'].xcom_pull(task_ids='download_prices')
    print(f"received tickers:{tickers}")

    """
    # 连接数据库(硬编码方式连接)
    mydb = mysql.connector.connect(
        host="98.14.13.15",
        user="root",
        password="Qun988",
        database="demodb",
        port=3307
    )
    """

    # 使用airflow 的 Connections 动态获取配置信息
    from airflow.hooks.base_hook import BaseHook
    conn = BaseHook.get_connection('demodb')

    mydb = mysql.connector.connect(
        host=conn.host,
        user=conn.login,
        password=conn.password,
        database=conn.schema,
        port=conn.port
    )

    mycursor = mydb.cursor()
    for ticker in tickers:
        val = load_price_data(ticker)
        print(f"{ticker} length={len(val)} {val[1]}")

        sql = """INSERT INTO stock_prices_stage
        (ticker, as_of_date, open_price, high_price, low_price, close_price)
        VALUES (%s,%s,%s,%s,%s,%s)"""
        mycursor.executemany(sql, val)

        mydb.commit()
        print(mycursor.rowcount, "record inserted.")

default_args = {
    'owner': 'kaiyi',
    'email': ['407544577@qq.com'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 1,
    'retry_delay': timedelta(seconds=30)
}

# [START instantiate_dag]
with DAG(
        dag_id='download_stock_price_v2',
        default_args=default_args,
        description='download stock price and save to local csv files and save to database',
        schedule_interval='*/2 * * * *',
        start_date=days_ago(2),
        tags=['quantdata'],
        catchup=False,   # 新增,一个任务执行完了再执行下一个DAG任务
        max_active_runs=1,  # 一个任务执行完了再执行下一个DAG任务
) as dag:
    # [END instantiate_dag]

    dag.doc_md = """
    This DAG download stock price
    """

    download_task = PythonOperator(
        task_id="download_prices",
        python_callable=download_price,
        provide_context=True
    )

    save_to_mysql_task = PythonOperator(
        task_id="save_to_database",
        python_callable=save_to_mysql_stage,
        provide_context=True
    )

    mysql_task = MySqlOperator(
        task_id="merge_stock_price",
        mysql_conn_id='demodb',
        sql="merge_stock_price.sql",
        dag=dag,
    )

    email_task = EmailOperator(
        task_id='send_email',
        to='407544577@qq.com',
        subject='Stock Price is downloaded',
        html_content="""<h2>Airflow Email Test.</h2>{{ ds_nodash }} <br/>{{ dag }}<br/> {{ conf }} <br/> {{ ds }} <br/> {{ ti }}""",
        dag=dag
    )

    download_task >> save_to_mysql_task >> mysql_task >> email_task

为者常成,行者常至