Python 如何在 Airflow 中创建条件任务
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/43678408/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to create a conditional task in Airflow
提问by Alexis.Rolland
I would like to create a conditional task in Airflow as described in the schema below. The expected scenario is the following:
我想在 Airflow 中创建一个条件任务,如下面的架构中所述。预期的场景如下:
- Task 1 executes
- If Task 1 succeed, then execute Task 2a
- Else If Task 1 fails, then execute Task 2b
- Finally execute Task 3
- 任务 1 执行
- 如果任务 1 成功,则执行任务 2a
- Else 如果任务 1 失败,则执行任务 2b
- 最后执行任务3
All tasks above are SSHExecuteOperator.
I'm guessing I should be using the ShortCircuitOperator and / or XCom to manage the condition but I am not clear on how to implement that. Could you please describe the solution?
以上所有任务都是SSHExecuteOperator。我猜我应该使用 ShortCircuitOperator 和/或 XCom 来管理条件,但我不清楚如何实现它。你能描述一下解决方案吗?
采纳答案by Jean S
You have to use airflow trigger rules
你必须使用气流触发规则
All operators have a trigger_rule argument which defines the rule by which the generated task get triggered.
所有操作符都有一个 trigger_rule 参数,它定义了触发生成任务的规则。
The trigger rule possibilities:
触发规则可能性:
ALL_SUCCESS = 'all_success'
ALL_FAILED = 'all_failed'
ALL_DONE = 'all_done'
ONE_SUCCESS = 'one_success'
ONE_FAILED = 'one_failed'
DUMMY = 'dummy'
Here is the idea to solve your problem:
这是解决您的问题的想法:
from airflow.operators.ssh_execute_operator import SSHExecuteOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.contrib.hooks import SSHHook
sshHook = SSHHook(conn_id=<YOUR CONNECTION ID FROM THE UI>)
task_1 = SSHExecuteOperator(
task_id='task_1',
bash_command=<YOUR COMMAND>,
ssh_hook=sshHook,
dag=dag)
task_2 = SSHExecuteOperator(
task_id='conditional_task',
bash_command=<YOUR COMMAND>,
ssh_hook=sshHook,
dag=dag)
task_2a = SSHExecuteOperator(
task_id='task_2a',
bash_command=<YOUR COMMAND>,
trigger_rule=TriggerRule.ALL_SUCCESS,
ssh_hook=sshHook,
dag=dag)
task_2b = SSHExecuteOperator(
task_id='task_2b',
bash_command=<YOUR COMMAND>,
trigger_rule=TriggerRule.ALL_FAILED,
ssh_hook=sshHook,
dag=dag)
task_3 = SSHExecuteOperator(
task_id='task_3',
bash_command=<YOUR COMMAND>,
trigger_rule=TriggerRule.ONE_SUCCESS,
ssh_hook=sshHook,
dag=dag)
task_2.set_upstream(task_1)
task_2a.set_upstream(task_2)
task_2b.set_upstream(task_2)
task_3.set_upstream(task_2a)
task_3.set_upstream(task_2b)
回答by villasv
Airflow has a BranchPythonOperatorthat can be used to express the branching dependency more directly.
Airflow 有一个BranchPythonOperator,可以更直接地表达分支依赖。
The docsdescribe its use:
该文档描述了它的用法:
The BranchPythonOperator is much like the PythonOperator except that it expects a python_callable that returns a task_id. The task_id returned is followed, and all of the other paths are skipped. The task_id returned by the Python function has to be referencing a task directly downstream from the BranchPythonOperator task.
...
If you want to skip some tasks, keep in mind that you can't have an empty path, if so make a dummy task.
BranchPythonOperator 与 PythonOperator 非常相似,不同之处在于它需要一个返回 task_id 的 python_callable。遵循返回的 task_id,并跳过所有其他路径。Python 函数返回的 task_id 必须直接引用 BranchPythonOperator 任务的下游任务。
...
如果您想跳过某些任务,请记住您不能有空路径,如果是这样,请创建一个虚拟任务。
Code Example
代码示例
def dummy_test():
return 'branch_a'
A_task = DummyOperator(task_id='branch_a', dag=dag)
B_task = DummyOperator(task_id='branch_false', dag=dag)
branch_task = BranchPythonOperator(
task_id='branching',
python_callable=dummy_test,
dag=dag,
)
branch_task >> A_task
branch_task >> B_task
EDIT:
编辑:
If you're installing an Airflow version >=1.10.3, you can also return a list of task ids, allowing you to skip multiple downstream paths in a single Operator and don't use a dummy task before joining.
如果您安装的是 Airflow 版本 >=1.10.3,您还可以返回任务 ids 列表,允许您在单个 Operator 中跳过多个下游路径,并且在加入之前不要使用虚拟任务。