Python 如何在 Airflow 中设置 DAG 之间的依赖关系?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/38022323/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-19 20:14:09  来源:igfitidea点击:

How to set dependencies between DAGs in Airflow?

pythonetlairflow

提问by Conor

I am using Airflowto schedule batch jobs. I have one DAG (A) that runs every night and another DAG (B) that runs once per month. B depends on A having completed successfully. However B takes a long time to run and so I would like to keep it in a separate DAG to allow better SLA reporting.

我正在使用Airflow来安排批处理作业。我有一个每天晚上运行的 DAG (A) 和另一个每月运行一次的 DAG (B)。B 取决于 A 已成功完成。但是 B 需要很长时间才能运行,所以我想将它保存在一个单独的 DAG 中,以便更好地报告 SLA。

How can I make running DAG B dependent on a successful run of DAG A on the same day?

如何使运行 DAG B 依赖于在同一天成功运行 DAG A?

回答by p.magalhaes

You can achieve this behavior using an operator called ExternalTaskSensor. Your task (B1) in DAG(B) will be scheduled and wait for a success on task (A2) in DAG(A)

您可以使用名为 ExternalTask​​Sensor 的运算符来实现此行为。您在 DAG(B) 中的任务 (B1) 将被安排并等待 DAG(A) 中任务 (A2) 的成功

External Task Sensor documentation

外部任务传感器文档

回答by nono

It looks like a TriggerDagRunOperatorcan be used as well, and you can use a python callable to add some logic. As explained here : https://www.linkedin.com/pulse/airflow-lesson-1-triggerdagrunoperator-siddharth-anand

看起来也可以使用TriggerDagRunOperator,并且您可以使用 python 可调用来添加一些逻辑。如此处所述:https: //www.linkedin.com/pulse/airflow-lesson-1-triggerdagrunoperator-siddharth-anand

回答by doraemon

When cross-DAG dependency is needed, there are often two requirements:

当需要跨DAG依赖时,往往有两个需求:

  1. Task B1on DAG Bneeds to run after task A1on DAG Ais done. This can be achieved using ExternalTaskSensoras others have mentioned:

    B1 = ExternalTaskSensor(task_id="B1",
                            external_dag_id='A',
                            external_task_id='A1',
                            mode="reschedule")
    
  2. When user clears task A1on DAG A, we want Airflow to clear task B1on DAG Bto let it re-run. This can be achieved using ExternalTaskMarker(since Airflow v1.10.8).

    A1 = ExternalTaskMarker(task_id="A1", 
                            external_dag_id="B",
                            external_task_id="B1")
    
  1. B1DAG 上的任务B需要A1在 DAGA上的任务完成后运行。这可以使用ExternalTaskSensor其他人提到的方法来实现:

    B1 = ExternalTaskSensor(task_id="B1",
                            external_dag_id='A',
                            external_task_id='A1',
                            mode="reschedule")
    
  2. 当用户清除A1DAG 上的任务时A,我们希望 Airflow 清除B1DAGB上的任务以使其重新运行。这可以使用ExternalTaskMarker(自 Airflow v1.10.8 起)来实现。

    A1 = ExternalTaskMarker(task_id="A1", 
                            external_dag_id="B",
                            external_task_id="B1")
    

Please see the doc about cross-DAG dependencies for more details: https://airflow.apache.org/docs/stable/howto/operator/external.html

有关更多详细信息,请参阅有关跨 DAG 依赖项的文档:https: //airflow.apache.org/docs/stable/howto/operator/external.html