Airflow 从入门到精通-01-创建简单的 DAG

一、airflow版本及安装

安装版本说明

安装工具 版本 用途
Python 3.6.5/3.6.x/3.7.x 安装airflow及其依赖包、开发airflow的dag使用
Airflow 2.1.2 任务调度平台
MySQL 5.7 作为airflow的元数据库

安装airflow

1、安装airflow

$ pip install apache-airflow==2.1.2

2、初始化数据库

# initialize the database
airflow db init

3、添加用户

airflow users create \
    --username admin \
    --firstname Corwien \
    --lastname Wong \
    --role Admin \
    --email 407544577@qq.com

创建的用户密码为:quant

4、启动web服务

# airflow webserver --port 8080 -D
airflow webserver -p 8080 -D

5、启动定时任务

# airflow scheduler 
# airflow scheduler &
# nohup command > myout.file 2>&1 &
# nohup command > /dev/null 2>&1 &
# nohup airflow scheduler > /dev/null 2>&1 &
airflow scheduler -D

二、Pycharm配置

设置pycharm 远程连接服务器环境,并且可以执
https://blog.csdn.net/yeler082/article/details/83818771

file

file

file

file

调整远程映射路径为airflow/tags路径:
file

三、airflow dags配置

cd /root/airflow
[root@quant airflow]# ls -l
total 9148
-rw-r--r-- 1 root root   39333 Sep  1 23:54 airflow.cfg
-rw-r--r-- 1 root root 9302016 Sep  1 23:33 airflow.db
-rw-r--r-- 1 root root       6 Sep  1 01:33 airflow-webserver.pid
drwxr-xr-x 2 root root    4096 Sep  1 23:47 dags
drwxr-xr-x 6 root root    4096 Aug 31 23:47 logs
-rw-r--r-- 1 root root    4700 Apr 29 10:03 webserver_config.py
[root@quant airflow]# 

[root@quant airflow]# vi airflow.cfg
[core]
# The folder where your airflow pipelines live, most likely a
# subfolder in a code repository. This path must be absolute.
dags_folder = /root/airflow/dags

# Hostname by providing a path to a callable, which will resolve the hostname.
# The format is "package.function".
#
# For example, default value "socket.getfqdn" means that result from getfqdn() of "socket"
# package will be used as hostname.

四、实战

1、下载单个股票数据并保存在CSV文件

download_stock_price.py

"""Example DAG demonstrating the usage of the BashOperator."""

from datetime import timedelta
from textwrap import dedent
import yfinance as yf
import os

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago

args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

def download_price():
    ticker = "MSFT"
    msft = yf.Ticker(ticker)
    hist = msft.history(period="max")
    print(type(hist))
    print(hist.shape)

    print(os.getcwd())
    with open(f'./{ticker}.csv', 'w') as writer:
        hist.to_csv(writer, index=True)
    print("Finished downloading price data.")

# test
# download_price()

# [START instantiate_dag]
with DAG(
        dag_id='download_stock_price',
        default_args=args,
        description='download stock price and save to local csv files',
        schedule_interval='0 0 * * *',
        start_date=days_ago(2),
        dagrun_timeout=timedelta(minutes=60),
        tags=['quantdata'],
) as dag:
    # [END instantiate_dag]

    dag.doc_md = """
    This DAG download stock price
    """

    download_task = PythonOperator(
        task_id="download_prices",
        python_callable=download_price
    )

"""

if __name__ == "__main__":
    dag.cli()
"""

在服务器运行该文件:

[root@quant dags]# python3 download_stock_price.py
[root@quant dags]# 

开启airflow定时任务,会自动加载到web:
file

file

2、Airflow Variable Section(变量)

上边只下载单个股票,那么有什么办法在不改变code的情况下,来获取多只股票的数据呢?当然有,就是airflow提供的variables.

variables是一种传统的方式去存储和取回任意的内容或者是key-value形式的airflow的设置,它可以在前端界面、代码或者cli中进行增删改查的操作,当你定义管道代码,就可非常方便的使用,例如:

from airflow.models import Variable
foo = Variable.get("foo")
bar = Variable.get("bar", deserialize_json=True)

你可以使用variables在一个jinjia模板中:

echo {{ var.value.<variable_name> }}

添加变量

file

file

修改代码

"""Example DAG demonstrating the usage of the BashOperator."""

from datetime import timedelta
from textwrap import dedent
import yfinance as yf
import os

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from airflow.models import Variable

args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

def download_price():
    # stock_list_str = Variable.get("stock_list")
    stock_list_json = Variable.get("stock_list_json", deserialize_json=True)

    for ticker in stock_list_json:
        msft = yf.Ticker(ticker)
        hist = msft.history(period="max")
        # print(type(hist))
        # print(hist.shape)
        # print(os.getcwd())

        with open(f'./{ticker}.csv', 'w') as writer:
            hist.to_csv(writer, index=True)
        print("Finished downloading price data for " + ticker)

# test
# download_price()

# [START instantiate_dag]
with DAG(
        dag_id='download_stock_price',
        default_args=args,
        description='download stock price and save to local csv files',
        schedule_interval='0 0 * * *',
        start_date=days_ago(2),
        dagrun_timeout=timedelta(minutes=60),
        tags=['quantdata'],
) as dag:
    # [END instantiate_dag]

    dag.doc_md = """
    This DAG download stock price
    """

    download_task = PythonOperator(
        task_id="download_prices",
        python_callable=download_price
    )

"""

if __name__ == "__main__":
    dag.cli()
"""

在界面点击手动运行DAG,通过日志打印可以看到已经运行成功了

file

file

五、Trigger DAG with parameters

在运行DAG时,需要选择填配置参数:

file

使用场景:解决DAG纠错问题,可以使用该配置

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago

dag = DAG("example_parameterized_dag", schedule_interval=None, start_date=days_ago(20))

parameterized_task = BashOperator(
  task_id = 'parameteried_task',
    bach_command="echo value:{{ dag_run.conf['conf1'] }}",
    dag=dag
)

配置:
file

小试牛刀

修改 download_stock_price.py 代码,从配置中读取数据:

"""Example DAG demonstrating the usage of the BashOperator."""

from datetime import timedelta
from textwrap import dedent
import yfinance as yf

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from airflow.models import Variable

args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

def download_price(**context):
    # stock_list_str = Variable.get("stock_list")
    # 获取配置的变量Variables
    stock_list = Variable.get("stock_list_json", deserialize_json=True)

    # 如果有配置参数,则使用配置参数的数据(Trigger DAG with parameters)
    stocks = context["dag_run"].conf.get("stocks")
    if stocks:
        stock_list = stocks

    for ticker in stock_list:
        msft = yf.Ticker(ticker)
        hist = msft.history(period="max")
        # print(type(hist))
        # print(hist.shape)
        # print(os.getcwd())

        with open(f'./{ticker}.csv', 'w') as writer:
            hist.to_csv(writer, index=True)
        print("Finished downloading price data for " + ticker)

# test
# download_price()

# [START instantiate_dag]
with DAG(
        dag_id='download_stock_price',
        default_args=args,
        description='download stock price and save to local csv files',
        schedule_interval='0 0 * * *',
        start_date=days_ago(2),
        dagrun_timeout=timedelta(minutes=60),
        tags=['quantdata'],
) as dag:
    # [END instantiate_dag]

    dag.doc_md = """
    This DAG download stock price
    """

    download_task = PythonOperator(
        task_id="download_prices",
        python_callable=download_price,
        provide_context=True
    )

"""

if __name__ == "__main__":
    dag.cli()
"""

设置配置:
file

然后手动触发运行,可以看到打印的数据是通过配置获取到的:
file


相关文章:
Airflow快速学习入门
airflow 文档学习(二) - 概念
Airflow官方文档

为者常成,行者常至