Airflow 从入门到精通-06-Dynamic DAGs 和可视化 DAG

一、Dynamic DAGs(动态生成DAG)

由于 DAG 是由 Python 代码定义的,因此不需要纯粹是声明性的;您可以自由使用循环、函数等来定义您的 DAG。 例如,这里有一个 DAG,它使用 for 循环来定义一些任务:

例如,这里有一个 DAG,它使用 for 循环来定义一些任务:
源文件:dynamic_dags.py

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago

default_args = {
    'owner': 'kaiyi',
    'email': ['407544577@qq.com'],
}

with DAG(
        dag_id='loop_dags',
        default_args=default_args,
        description='here is a DAG that uses a for loop to define some Tasks',
        start_date=days_ago(2),
        tags=['quantdata'],
        catchup=False,   # 新增,一个任务执行完了再执行下一个DAG任务
        max_active_runs=1,  # 一个任务执行完了再执行下一个DAG任务
) as dag:

    first = DummyOperator(task_id="first")
    last = DummyOperator( task_id="last")

    options = ["branch_a", "branch_b", "branch_c", "branch_d"]
    for option in options:
        t = DummyOperator(task_id=option)
        first >> t >> last

然后再手动执行 DAG:

file

file

实战

def create_dag(symbol):
    with DAG(
        "email_{}_price".format(symbol.lower()),
        default_args={"start_date": "2020-01-01"},
        schedule_interval="0 0 * * *",
    ) as dag:
        get_price_task = PythonOperator(
            task_id="get_price",
            python_callable=get_price,
            op_kwargs=dict(
                symbol="BTC",
            ),
        )
        email_price_task = PythonOperator(
            task_id="email_price",
            python_callable=email_price,
        )
        (
            get_price_task
            >> email_price_task
        )
        return dag

for symbol in ("BTC", "ETH", "LTC", "XLM"):
    dag = create_dag(symbol=symbol)
        globals()["{}_dag".format(symbol.lower())] = dag

二、DAG Visualization

如果您想查看 DAG 的可视化表示,您有两个选择:

  • 您可以加载 Airflow UI,导航到您的 DAG,然后选择“Graph View”
  • 您可以运行 airflow dags show,将其呈现为图像文件

我们通常建议您使用图形视图,因为它还会显示您选择的任何 DAG 运行中所有任务实例的状态。
当然,随着您开发 DAG,它们会变得越来越复杂,因此我们提供了一些方法来修改这些 DAG 视图,使它们更易于理解


相关文章:
Dynamic DAGs
Dynamically Generating DAGs in Airflow

为者常成,行者常至