Airflow 从入门到精通-06-Dynamic DAGs 和可视化 DAG
一、Dynamic DAGs(动态生成DAG)
由于 DAG 是由 Python 代码定义的,因此不需要纯粹是声明性的;您可以自由使用循环、函数等来定义您的 DAG。 例如,这里有一个 DAG,它使用 for 循环来定义一些任务:
例如,这里有一个 DAG,它使用 for 循环来定义一些任务:
源文件:dynamic_dags.py
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'kaiyi',
'email': ['407544577@qq.com'],
}
with DAG(
dag_id='loop_dags',
default_args=default_args,
description='here is a DAG that uses a for loop to define some Tasks',
start_date=days_ago(2),
tags=['quantdata'],
catchup=False, # 新增,一个任务执行完了再执行下一个DAG任务
max_active_runs=1, # 一个任务执行完了再执行下一个DAG任务
) as dag:
first = DummyOperator(task_id="first")
last = DummyOperator( task_id="last")
options = ["branch_a", "branch_b", "branch_c", "branch_d"]
for option in options:
t = DummyOperator(task_id=option)
first >> t >> last
然后再手动执行 DAG:
实战
def create_dag(symbol):
with DAG(
"email_{}_price".format(symbol.lower()),
default_args={"start_date": "2020-01-01"},
schedule_interval="0 0 * * *",
) as dag:
get_price_task = PythonOperator(
task_id="get_price",
python_callable=get_price,
op_kwargs=dict(
symbol="BTC",
),
)
email_price_task = PythonOperator(
task_id="email_price",
python_callable=email_price,
)
(
get_price_task
>> email_price_task
)
return dag
for symbol in ("BTC", "ETH", "LTC", "XLM"):
dag = create_dag(symbol=symbol)
globals()["{}_dag".format(symbol.lower())] = dag
二、DAG Visualization
如果您想查看 DAG 的可视化表示,您有两个选择:
- 您可以加载
Airflow UI
,导航到您的 DAG,然后选择“Graph View” - 您可以运行
airflow dags show
,将其呈现为图像文件
我们通常建议您使用图形视图,因为它还会显示您选择的任何 DAG 运行中所有任务实例的状态。
当然,随着您开发 DAG,它们会变得越来越复杂,因此我们提供了一些方法来修改这些 DAG 视图,使它们更易于理解
相关文章:
Dynamic DAGs
Dynamically Generating DAGs in Airflow
为者常成,行者常至
自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)