Python 如何运行简单的气流 DAG

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/41805265/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-20 01:38:55  来源:igfitidea点击:

How to Run a Simple Airflow DAG

pythonairflow

提问by djohon

I am totally new to Airflow. I would like to run a simple DAG at a specified date. I'm struggling to make difference between the start date, the execution date, and backfilling. And what is the command to run the DAG?

我对 Airflow 完全陌生。我想在指定日期运行一个简单的 DAG。我正在努力区分开始日期、执行日期和回填。运行 DAG 的命令是什么?

Here is what I've tried since:

这是我从那以后尝试过的:

airflow run dag_1 task_1 2017-1-23

The first time I ran that command, the task executed correctly, but when I tried again it did not work.

我第一次运行该命令时,任务正确执行,但当我再次尝试时,它不起作用。

Here is another command I ran:

这是我运行的另一个命令:

airflow backfill dag_1 -s 2017-1-23 -e 2017-1-24

I don't know what to expect from this command. Will the DAGs execute every day from 23 to 24?

我不知道对这个命令有什么期望。DAG 会从 23 日到 24 日每天执行吗?

Before running the two commands above, I did this:

在运行上面的两个命令之前,我是这样做的:

airflow initdb
airflow scheduler 
airflow webserver -p 8085 --debug &

Here is my DAG

这是我的 DAG

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2017, 1, 23, 12),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'dag_1', default_args=default_args, schedule_interval=timedelta(1))

t1 = BashOperator(
    task_id='create_clients',
    bash_command='Rscript /scripts/Cli.r',
    dag=dag)

t2 = BashOperator(
    task_id='create_operation',
    bash_command='Rscript Operation.r',
    retries=3,
    dag=dag)

t2.set_upstream(t1)

Screenshot:Tree View

屏幕截图:树视图

UPDATE

更新

airflow run dag_1 task_1 2017-1-23T10:34

回答by Necravolver

If you run it once with the

如果你用

airflow run dag_1 task_1 2017-1-23

The run is saved and running it again won't do anything you can try to re-run it by forcing it

运行已保存并再次运行它不会做任何事情您可以尝试通过强制重新运行它

airflow run --force=true dag_1 task_1 2017-1-23

The airflow backfill command will run any executions that would have run in the time period specified from the start to end date. It will depend what schedule you set on the DAG, if you set it to trigger every hour it should run 24 times, but it also won't re-execute previously executed runs.

气流回填命令将运行在从开始到结束日期指定的时间段内运行的任何执行。这将取决于您在 DAG 上设置的计划,如果您将其设置为每小时触发,它应该运行 24 次,但它也不会重新执行以前执行的运行。

You can clear the task as if it NEVER ran

您可以清除任务,就好像它从未运行过一样

airflow clear dag_1 -s 2017-1-23 -e 2017-1-24

Also check the cli docs here: https://airflow.incubator.apache.org/cli.html

还可以在这里查看 cli 文档:https: //airflow.incubator.apache.org/cli.html

回答by Priyank Mehta

difference between the start date ,the execution date and backfilling

开始日期、执行日期和回填之间的差异

Backfilling is done to run DAG explicitly to test/manually run DAG/re run a DAG which error-ed out. You do this using CLI

回填是为了显式运行 DAG 以测试/手动运行 DAG/重新运行出错的 DAG。您使用 CLI 执行此操作

airflow backfill -s <<start_date>> <<dag>> 
#optionally provide -1 as start_date to run it immediately

start_dateis, as the name suggests, date from when the DAG definition is valid

顾名思义,start_date是 DAG 定义有效的日期

execution_dateis the date-time when it is to be run. This you provide while testing individual tasks of DAG as below

execution_date是要运行的日期时间。您在测试 DAG 的各个任务时提供的,如下所示

airflow test <<dag>> <<task>> <<exec_date>>

what is the command to run the dag

运行 dag 的命令是什么

Backfillis the command to run DAG explicitly. Otherwise you just put the DAG in the DAGBAG folder and the scheduler will run it as per the schedule defined in the DAG definition

Backfill是显式运行 DAG 的命令。否则,您只需将 DAG 放在 DAGBAG 文件夹中,调度程序将根据 DAG 定义中定义的调度运行它

airflow backfill -s <<start_date>> <<dag>> 
#optionally provide -1 as start_date to run it immediately