Python 如何在 Airflow 中创建条件任务

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/43678408/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-19 23:16:51  来源:igfitidea点击:

How to create a conditional task in Airflow

pythonconditional-statementsairflow

提问by Alexis.Rolland

I would like to create a conditional task in Airflow as described in the schema below. The expected scenario is the following:

我想在 Airflow 中创建一个条件任务,如下面的架构中所述。预期的场景如下:

  • Task 1 executes
  • If Task 1 succeed, then execute Task 2a
  • Else If Task 1 fails, then execute Task 2b
  • Finally execute Task 3
  • 任务 1 执行
  • 如果任务 1 成功,则执行任务 2a
  • Else 如果任务 1 失败,则执行任务 2b
  • 最后执行任务3

Conditional TaskAll tasks above are SSHExecuteOperator. I'm guessing I should be using the ShortCircuitOperator and / or XCom to manage the condition but I am not clear on how to implement that. Could you please describe the solution?

有条件的任务以上所有任务都是SSHExecuteOperator。我猜我应该使用 ShortCircuitOperator 和/或 XCom 来管理条件,但我不清楚如何实现它。你能描述一下解决方案吗?

采纳答案by Jean S

You have to use airflow trigger rules

你必须使用气流触发规则

All operators have a trigger_rule argument which defines the rule by which the generated task get triggered.

所有操作符都有一个 trigger_rule 参数,它定义了触发生成任务的规则。

The trigger rule possibilities:

触发规则可能性:

ALL_SUCCESS = 'all_success'
ALL_FAILED = 'all_failed'
ALL_DONE = 'all_done'
ONE_SUCCESS = 'one_success'
ONE_FAILED = 'one_failed'
DUMMY = 'dummy'

Here is the idea to solve your problem:

这是解决您的问题的想法:

from airflow.operators.ssh_execute_operator import SSHExecuteOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.contrib.hooks import SSHHook

sshHook = SSHHook(conn_id=<YOUR CONNECTION ID FROM THE UI>)

task_1 = SSHExecuteOperator(
        task_id='task_1',
        bash_command=<YOUR COMMAND>,
        ssh_hook=sshHook,
        dag=dag)

task_2 = SSHExecuteOperator(
        task_id='conditional_task',
        bash_command=<YOUR COMMAND>,
        ssh_hook=sshHook,
        dag=dag)

task_2a = SSHExecuteOperator(
        task_id='task_2a',
        bash_command=<YOUR COMMAND>,
        trigger_rule=TriggerRule.ALL_SUCCESS,
        ssh_hook=sshHook,
        dag=dag)

task_2b = SSHExecuteOperator(
        task_id='task_2b',
        bash_command=<YOUR COMMAND>,
        trigger_rule=TriggerRule.ALL_FAILED,
        ssh_hook=sshHook,
        dag=dag)

task_3 = SSHExecuteOperator(
        task_id='task_3',
        bash_command=<YOUR COMMAND>,
        trigger_rule=TriggerRule.ONE_SUCCESS,
        ssh_hook=sshHook,
        dag=dag)


task_2.set_upstream(task_1)
task_2a.set_upstream(task_2)
task_2b.set_upstream(task_2)
task_3.set_upstream(task_2a)
task_3.set_upstream(task_2b)

回答by villasv

Airflow has a BranchPythonOperatorthat can be used to express the branching dependency more directly.

Airflow 有一个BranchPythonOperator,可以更直接地表达分支依赖。

The docsdescribe its use:

文档描述了它的用法:

The BranchPythonOperator is much like the PythonOperator except that it expects a python_callable that returns a task_id. The task_id returned is followed, and all of the other paths are skipped. The task_id returned by the Python function has to be referencing a task directly downstream from the BranchPythonOperator task.

...

If you want to skip some tasks, keep in mind that you can't have an empty path, if so make a dummy task.

BranchPythonOperator 与 PythonOperator 非常相似,不同之处在于它需要一个返回 task_id 的 python_callable。遵循返回的 task_id,并跳过所有其他路径。Python 函数返回的 task_id 必须直接引用 BranchPythonOperator 任务的下游任务。

...

如果您想跳过某些任务,请记住您不能有空路径,如果是这样,请创建一个虚拟任务。

Code Example

代码示例

def dummy_test():
    return 'branch_a'

A_task = DummyOperator(task_id='branch_a', dag=dag)
B_task = DummyOperator(task_id='branch_false', dag=dag)

branch_task = BranchPythonOperator(
    task_id='branching',
    python_callable=dummy_test,
    dag=dag,
)

branch_task >> A_task 
branch_task >> B_task

EDIT:

编辑

If you're installing an Airflow version >=1.10.3, you can also return a list of task ids, allowing you to skip multiple downstream paths in a single Operator and don't use a dummy task before joining.

如果您安装的是 Airflow 版本 >=1.10.3,您还可以返回任务 ids 列表,允许您在单个 Operator 中跳过多个下游路径,并且在加入之前不要使用虚拟任务