Airflow 从入门到精通-05-宏变量,Jinja 和 Execution Date
一、宏变量
默认变量
默认情况下,Airflow 引擎会传递一些可在所有模板中访问的变量,也可以理解为系统变量。
示例:
# The execution date as YYYY-MM-DD
date = "{{ ds }}"
t = BashOperator(
task_id='test_env',
bash_command='/tmp/test.sh ',
dag=dag,
env={'EXECUTION_DATE': date})
宏(Macros)
宏是一种向模板公开对象并位于模板中的宏命名空间下的方法。 提供了一些常用的库和方法。
Jinja Templating
Airflow 利用 Jinja 模板的强大功能,这可以成为与宏结合使用的强大工具。 例如,假设您想使用 BashOperator 将执行日期作为环境变量传递给 Bash 脚本:
# The execution date as YYYY-MM-DD
date = "{{ ds }}"
t = BashOperator(
task_id='test_env',
bash_command='/tmp/test.sh ',
dag=dag,
env={'EXECUTION_DATE': date})
这里,{{ ds }} 是一个宏(macro),因为 BashOperator 的 env 参数是用 Jinja 模板化的,所以执行日期将作为 Bash 脚本中名为 EXECUTION_DATE 的环境变量提供。
二、实战
对上一节的发送邮件task增加宏变量:
email_task = EmailOperator(
task_id='send_email',
to='407544577@qq.com',
subject='Stock Price is downloaded',
html_content="""<h2>Airflow Email Test.</h2>{{ ds_nodash }} <br/>{{ dag }}<br/> {{ conf }} <br/> {{ ds }} <br/> {{ ti }}""",
dag=dag
)
邮件内容:
三、Execution Date
定时任务调度,默认设置的为 schedule_interval=None,这里我们可以设置为定时调度,就不需要手动触发了。
# [START instantiate_dag]
with DAG(
dag_id='download_stock_price_v2',
default_args=default_args,
description='download stock price and save to local csv files and save to database',
schedule_interval=None,
start_date=days_ago(2),
tags=['quantdata'],
catchup=False, # 新增,一个任务执行完了再执行下一个DAG任务
max_active_runs=1, # 一个任务执行完了再执行下一个DAG任务
) as dag:

设置定时调度:
# 每2分钟执行一次
schedule_interval='*/2 * * * *',
设置成功后,可以看到定时任务已经可以自动刷新了:

完整代码:download_stock_price_v2.py
"""Example DAG demonstrating the usage of the BashOperator."""
from datetime import timedelta
from textwrap import dedent
import yfinance as yf
import mysql.connector
from airflow import DAG
from airflow.operators.python import PythonOperator
# from airflow.operators.mysql_operator import MySqlOperator
from airflow.providers.mysql.operators.mysql import MySqlOperator
from airflow.operators.email import EmailOperator
from airflow.utils.dates import days_ago
from airflow.models import Variable
def download_price(*args, **context):
stock_list = get_tickers(context)
# 新增正常的股票(没有退市的或不存在的)
valid_tickers = []
for ticker in stock_list:
dat = yf.Ticker(ticker)
hist = dat.history(period="1mo")
# print(type(hist))
# print(hist.shape)
# print(os.getcwd())
if hist.shape[0] > 0:
valid_tickers.append(ticker)
else:
continue
with open(get_file_path(ticker), 'w') as writer:
hist.to_csv(writer, index=True)
print("Finished downloading price data for " + ticker)
# 增加返回值(用于任务之间数据的传递)
return valid_tickers
def get_file_path(ticker):
# NOT SAVE in distributed system
return f'./{ticker}.csv'
def load_price_data(ticker):
with open(get_file_path(ticker), 'r') as reader:
lines = reader.readlines()
return [[ticker] + line.split(',')[:5] for line in lines if line[:4] != 'Date']
def get_tickers(context):
# 获取配置的变量Variables
stock_list = Variable.get("stock_list_json", deserialize_json=True)
# 如果有配置参数,则使用配置参数的数据(Trigger DAG with parameters)
stocks = context["dag_run"].conf.get("stocks")
if stocks:
stock_list = stocks
return stock_list
def save_to_mysql_stage(*args, **context):
# tickers = get_tickers(context)
# Pull the return_value XCOM from "pulling_task"
tickers = context['ti'].xcom_pull(task_ids='download_prices')
print(f"received tickers:{tickers}")
"""
# 连接数据库(硬编码方式连接)
mydb = mysql.connector.connect(
host="98.14.13.15",
user="root",
password="Qun988",
database="demodb",
port=3307
)
"""
# 使用airflow 的 Connections 动态获取配置信息
from airflow.hooks.base_hook import BaseHook
conn = BaseHook.get_connection('demodb')
mydb = mysql.connector.connect(
host=conn.host,
user=conn.login,
password=conn.password,
database=conn.schema,
port=conn.port
)
mycursor = mydb.cursor()
for ticker in tickers:
val = load_price_data(ticker)
print(f"{ticker} length={len(val)} {val[1]}")
sql = """INSERT INTO stock_prices_stage
(ticker, as_of_date, open_price, high_price, low_price, close_price)
VALUES (%s,%s,%s,%s,%s,%s)"""
mycursor.executemany(sql, val)
mydb.commit()
print(mycursor.rowcount, "record inserted.")
default_args = {
'owner': 'kaiyi',
'email': ['407544577@qq.com'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(seconds=30)
}
# [START instantiate_dag]
with DAG(
dag_id='download_stock_price_v2',
default_args=default_args,
description='download stock price and save to local csv files and save to database',
schedule_interval='*/2 * * * *',
start_date=days_ago(2),
tags=['quantdata'],
catchup=False, # 新增,一个任务执行完了再执行下一个DAG任务
max_active_runs=1, # 一个任务执行完了再执行下一个DAG任务
) as dag:
# [END instantiate_dag]
dag.doc_md = """
This DAG download stock price
"""
download_task = PythonOperator(
task_id="download_prices",
python_callable=download_price,
provide_context=True
)
save_to_mysql_task = PythonOperator(
task_id="save_to_database",
python_callable=save_to_mysql_stage,
provide_context=True
)
mysql_task = MySqlOperator(
task_id="merge_stock_price",
mysql_conn_id='demodb',
sql="merge_stock_price.sql",
dag=dag,
)
email_task = EmailOperator(
task_id='send_email',
to='407544577@qq.com',
subject='Stock Price is downloaded',
html_content="""<h2>Airflow Email Test.</h2>{{ ds_nodash }} <br/>{{ dag }}<br/> {{ conf }} <br/> {{ ds }} <br/> {{ ti }}""",
dag=dag
)
download_task >> save_to_mysql_task >> mysql_task >> email_task
为者常成,行者常至
自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)