Python 如何在 Airflow 中设置 DAG 之间的依赖关系?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/38022323/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to set dependencies between DAGs in Airflow?
提问by Conor
I am using Airflowto schedule batch jobs. I have one DAG (A) that runs every night and another DAG (B) that runs once per month. B depends on A having completed successfully. However B takes a long time to run and so I would like to keep it in a separate DAG to allow better SLA reporting.
我正在使用Airflow来安排批处理作业。我有一个每天晚上运行的 DAG (A) 和另一个每月运行一次的 DAG (B)。B 取决于 A 已成功完成。但是 B 需要很长时间才能运行,所以我想将它保存在一个单独的 DAG 中,以便更好地报告 SLA。
How can I make running DAG B dependent on a successful run of DAG A on the same day?
如何使运行 DAG B 依赖于在同一天成功运行 DAG A?
回答by p.magalhaes
You can achieve this behavior using an operator called ExternalTaskSensor. Your task (B1) in DAG(B) will be scheduled and wait for a success on task (A2) in DAG(A)
您可以使用名为 ExternalTaskSensor 的运算符来实现此行为。您在 DAG(B) 中的任务 (B1) 将被安排并等待 DAG(A) 中任务 (A2) 的成功
回答by nono
It looks like a TriggerDagRunOperatorcan be used as well, and you can use a python callable to add some logic. As explained here : https://www.linkedin.com/pulse/airflow-lesson-1-triggerdagrunoperator-siddharth-anand
看起来也可以使用TriggerDagRunOperator,并且您可以使用 python 可调用来添加一些逻辑。如此处所述:https: //www.linkedin.com/pulse/airflow-lesson-1-triggerdagrunoperator-siddharth-anand
回答by doraemon
When cross-DAG dependency is needed, there are often two requirements:
当需要跨DAG依赖时,往往有两个需求:
Task
B1
on DAGB
needs to run after taskA1
on DAGA
is done. This can be achieved usingExternalTaskSensor
as others have mentioned:B1 = ExternalTaskSensor(task_id="B1", external_dag_id='A', external_task_id='A1', mode="reschedule")
When user clears task
A1
on DAGA
, we want Airflow to clear taskB1
on DAGB
to let it re-run. This can be achieved usingExternalTaskMarker
(since Airflow v1.10.8).A1 = ExternalTaskMarker(task_id="A1", external_dag_id="B", external_task_id="B1")
B1
DAG 上的任务B
需要A1
在 DAGA
上的任务完成后运行。这可以使用ExternalTaskSensor
其他人提到的方法来实现:B1 = ExternalTaskSensor(task_id="B1", external_dag_id='A', external_task_id='A1', mode="reschedule")
当用户清除
A1
DAG 上的任务时A
,我们希望 Airflow 清除B1
DAGB
上的任务以使其重新运行。这可以使用ExternalTaskMarker
(自 Airflow v1.10.8 起)来实现。A1 = ExternalTaskMarker(task_id="A1", external_dag_id="B", external_task_id="B1")
Please see the doc about cross-DAG dependencies for more details: https://airflow.apache.org/docs/stable/howto/operator/external.html
有关更多详细信息,请参阅有关跨 DAG 依赖项的文档:https: //airflow.apache.org/docs/stable/howto/operator/external.html