Python 在 Airflow 中创建动态工作流的正确方法
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/41517798/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Proper way to create dynamic workflows in Airflow
提问by costrouc
Problem
问题
Is there any way in Airflow to create a workflow such that the number of tasks B.* is unknown until completion of Task A? I have looked at subdags but it looks like it can only work with a static set of tasks that have to be determined at Dag creation.
Airflow 中是否有任何方法可以创建工作流,使得任务 B.* 的数量在任务 A 完成之前是未知的?我看过 subdags,但看起来它只能处理必须在创建 Dag 时确定的一组静态任务。
Would dag triggers work? And if so could you please provide an example.
dag 触发器会起作用吗?如果可以,请提供一个例子。
I have an issue where it is impossible to know the number of task B's that will be needed to calculate Task C until Task A has been completed. Each Task B.* will take several hours to compute and cannot be combined.
我有一个问题,在任务 A 完成之前,不可能知道计算任务 C 所需的任务 B 的数量。每个任务 B.* 将需要几个小时来计算并且不能合并。
|---> Task B.1 --|
|---> Task B.2 --|
Task A ------|---> Task B.3 --|-----> Task C
| .... |
|---> Task B.N --|
Idea #1
想法#1
I don't like this solution because I have to create a blocking ExternalTaskSensor and all the Task B.* will take between 2-24 hours to complete. So I do not consider this a viable solution. Surely there is an easier way? Or was Airflow not designed for this?
我不喜欢这个解决方案,因为我必须创建一个阻塞的 ExternalTaskSensor 并且所有任务 B.* 将需要 2-24 小时才能完成。所以我不认为这是一个可行的解决方案。当然有更简单的方法吗?还是 Airflow 不是为此而设计的?
Dag 1
Task A -> TriggerDagRunOperator(Dag 2) -> ExternalTaskSensor(Dag 2, Task Dummy B) -> Task C
Dag 2 (Dynamically created DAG though python_callable in TriggerDagrunOperator)
|-- Task B.1 --|
|-- Task B.2 --|
Task Dummy A --|-- Task B.3 --|-----> Task Dummy B
| .... |
|-- Task B.N --|
Edit 1:
编辑1:
As of now this question still does not have a great answer. I have been contacted by several people looking for a solution.
截至目前,这个问题仍然没有很好的答案。有几个人联系我寻求解决方案。
回答by Oleg Yamin
Here is how I did it with a similar request without any subdags:
这是我在没有任何 subdag 的情况下使用类似请求执行此操作的方法:
First create a method that returns whatever values you want
首先创建一个返回任何你想要的值的方法
def values_function():
return values
Next create method that will generate the jobs dynamically:
下一个将动态生成作业的创建方法:
def group(number, **kwargs):
#load the values if needed in the command you plan to execute
dyn_value = "{{ task_instance.xcom_pull(task_ids='push_func') }}"
return BashOperator(
task_id='JOB_NAME_{}'.format(number),
bash_command='script.sh {} {}'.format(dyn_value, number),
dag=dag)
And then combine them:
然后将它们组合起来:
push_func = PythonOperator(
task_id='push_func',
provide_context=True,
python_callable=values_function,
dag=dag)
complete = DummyOperator(
task_id='All_jobs_completed',
dag=dag)
for i in values_function():
push_func >> group(i) >> complete
回答by Christopher Beck
I have figured out a way to create workflows based on the result of previous tasks.
Basically what you want to do is have two subdags with the following:
我已经找到了一种根据以前任务的结果创建工作流的方法。
基本上你想要做的是有两个带有以下内容的子标签:
- Xcom push a list (or what ever you need to create the dynamic workflow later) in the subdag that gets executed first (see test1.py
def return_list()
) - Pass the main dag object as a parameter to your second subdag
- Now if you have the main dag object, you can use it to get a list of its task instances. From that list of task instances, you can filter out a task of the current run by using
parent_dag.get_task_instances(settings.Session, start_date=parent_dag.get_active_runs()[-1])[-1]
), one could probably add more filters here. - With that task instance, you can use xcom pull to get the value you need by specifying the dag_id to the one of the first subdag:
dag_id='%s.%s' % (parent_dag_name, 'test1')
- Use the list/value to create your tasks dynamically
- Xcom 在首先执行的 subdag 中推送一个列表(或您稍后创建动态工作流所需的任何内容)(请参阅 test1.py
def return_list()
) - 将主 dag 对象作为参数传递给您的第二个 subdag
- 现在,如果您拥有主 dag 对象,则可以使用它来获取其任务实例的列表。从该任务实例列表中,您可以使用
parent_dag.get_task_instances(settings.Session, start_date=parent_dag.get_active_runs()[-1])[-1]
)过滤掉当前运行的任务,您可能可以在此处添加更多过滤器。 - 使用该任务实例,您可以使用 xcom pull 通过将 dag_id 指定为第一个 subdag 来获取所需的值:
dag_id='%s.%s' % (parent_dag_name, 'test1')
- 使用列表/值动态创建您的任务
Now I have tested this in my local airflow installation and it works fine. I don't know if the xcom pull part will have any problems if there is more than one instance of the dag running at the same time, but then you'd probably either use a unique key or something like that to uniquely identify the xcom value you want. One could probably optimize the 3. step to be 100% sure to get a specific task of the current main dag, but for my use this performs well enough, I think one only needs one task_instance object to use xcom_pull.
现在我已经在我的本地气流安装中测试了它并且它工作正常。如果同时运行多个 dag 实例,我不知道 xcom pull 部分是否会出现任何问题,但是您可能会使用唯一键或类似的东西来唯一标识 xcom你想要的价值。可以优化 3. 步骤以 100% 确定获得当前主 dag 的特定任务,但对于我的使用,这表现得足够好,我认为只需要一个 task_instance 对象即可使用 xcom_pull。
Also I clean the xcoms for the first subdag before every execution, just to make sure that I don't accidentally get any wrong value.
此外,我在每次执行之前清理第一个 subdag 的 xcoms,只是为了确保我不会意外得到任何错误的值。
I'm pretty bad at explaining, so I hope the following code will make everything clear:
我很不擅长解释,所以我希望下面的代码能让一切都清楚:
test1.py
测试1.py
from airflow.models import DAG
import logging
from airflow.operators.python_operator import PythonOperator
from airflow.operators.postgres_operator import PostgresOperator
log = logging.getLogger(__name__)
def test1(parent_dag_name, start_date, schedule_interval):
dag = DAG(
'%s.test1' % parent_dag_name,
schedule_interval=schedule_interval,
start_date=start_date,
)
def return_list():
return ['test1', 'test2']
list_extract_folder = PythonOperator(
task_id='list',
dag=dag,
python_callable=return_list
)
clean_xcoms = PostgresOperator(
task_id='clean_xcoms',
postgres_conn_id='airflow_db',
sql="delete from xcom where dag_id='{{ dag.dag_id }}'",
dag=dag)
clean_xcoms >> list_extract_folder
return dag
test2.py
测试2.py
from airflow.models import DAG, settings
import logging
from airflow.operators.dummy_operator import DummyOperator
log = logging.getLogger(__name__)
def test2(parent_dag_name, start_date, schedule_interval, parent_dag=None):
dag = DAG(
'%s.test2' % parent_dag_name,
schedule_interval=schedule_interval,
start_date=start_date
)
if len(parent_dag.get_active_runs()) > 0:
test_list = parent_dag.get_task_instances(settings.Session, start_date=parent_dag.get_active_runs()[-1])[-1].xcom_pull(
dag_id='%s.%s' % (parent_dag_name, 'test1'),
task_ids='list')
if test_list:
for i in test_list:
test = DummyOperator(
task_id=i,
dag=dag
)
return dag
and the main workflow:
和主要工作流程:
test.py
测试文件
from datetime import datetime
from airflow import DAG
from airflow.operators.subdag_operator import SubDagOperator
from subdags.test1 import test1
from subdags.test2 import test2
DAG_NAME = 'test-dag'
dag = DAG(DAG_NAME,
description='Test workflow',
catchup=False,
schedule_interval='0 0 * * *',
start_date=datetime(2018, 8, 24))
test1 = SubDagOperator(
subdag=test1(DAG_NAME,
dag.start_date,
dag.schedule_interval),
task_id='test1',
dag=dag
)
test2 = SubDagOperator(
subdag=test2(DAG_NAME,
dag.start_date,
dag.schedule_interval,
parent_dag=dag),
task_id='test2',
dag=dag
)
test1 >> test2
回答by Kyle Bridenstine
Yes this is possible I've created an example DAG that demonstrates this.
是的,这是可能的,我已经创建了一个示例 DAG 来演示这一点。
import airflow
from airflow.operators.python_operator import PythonOperator
import os
from airflow.models import Variable
import logging
from airflow import configuration as conf
from airflow.models import DagBag, TaskInstance
from airflow import DAG, settings
from airflow.operators.bash_operator import BashOperator
main_dag_id = 'DynamicWorkflow2'
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2),
'provide_context': True
}
dag = DAG(
main_dag_id,
schedule_interval="@once",
default_args=args)
def start(*args, **kwargs):
value = Variable.get("DynamicWorkflow_Group1")
logging.info("Current DynamicWorkflow_Group1 value is " + str(value))
def resetTasksStatus(task_id, execution_date):
logging.info("Resetting: " + task_id + " " + execution_date)
dag_folder = conf.get('core', 'DAGS_FOLDER')
dagbag = DagBag(dag_folder)
check_dag = dagbag.dags[main_dag_id]
session = settings.Session()
my_task = check_dag.get_task(task_id)
ti = TaskInstance(my_task, execution_date)
state = ti.current_state()
logging.info("Current state of " + task_id + " is " + str(state))
ti.set_state(None, session)
state = ti.current_state()
logging.info("Updated state of " + task_id + " is " + str(state))
def bridge1(*args, **kwargs):
# You can set this value dynamically e.g., from a database or a calculation
dynamicValue = 2
variableValue = Variable.get("DynamicWorkflow_Group2")
logging.info("Current DynamicWorkflow_Group2 value is " + str(variableValue))
logging.info("Setting the Airflow Variable DynamicWorkflow_Group2 to " + str(dynamicValue))
os.system('airflow variables --set DynamicWorkflow_Group2 ' + str(dynamicValue))
variableValue = Variable.get("DynamicWorkflow_Group2")
logging.info("Current DynamicWorkflow_Group2 value is " + str(variableValue))
# Below code prevents this bug: https://issues.apache.org/jira/browse/AIRFLOW-1460
for i in range(dynamicValue):
resetTasksStatus('secondGroup_' + str(i), str(kwargs['execution_date']))
def bridge2(*args, **kwargs):
# You can set this value dynamically e.g., from a database or a calculation
dynamicValue = 3
variableValue = Variable.get("DynamicWorkflow_Group3")
logging.info("Current DynamicWorkflow_Group3 value is " + str(variableValue))
logging.info("Setting the Airflow Variable DynamicWorkflow_Group3 to " + str(dynamicValue))
os.system('airflow variables --set DynamicWorkflow_Group3 ' + str(dynamicValue))
variableValue = Variable.get("DynamicWorkflow_Group3")
logging.info("Current DynamicWorkflow_Group3 value is " + str(variableValue))
# Below code prevents this bug: https://issues.apache.org/jira/browse/AIRFLOW-1460
for i in range(dynamicValue):
resetTasksStatus('thirdGroup_' + str(i), str(kwargs['execution_date']))
def end(*args, **kwargs):
logging.info("Ending")
def doSomeWork(name, index, *args, **kwargs):
# Do whatever work you need to do
# Here I will just create a new file
os.system('touch /home/ec2-user/airflow/' + str(name) + str(index) + '.txt')
starting_task = PythonOperator(
task_id='start',
dag=dag,
provide_context=True,
python_callable=start,
op_args=[])
# Used to connect the stream in the event that the range is zero
bridge1_task = PythonOperator(
task_id='bridge1',
dag=dag,
provide_context=True,
python_callable=bridge1,
op_args=[])
DynamicWorkflow_Group1 = Variable.get("DynamicWorkflow_Group1")
logging.info("The current DynamicWorkflow_Group1 value is " + str(DynamicWorkflow_Group1))
for index in range(int(DynamicWorkflow_Group1)):
dynamicTask = PythonOperator(
task_id='firstGroup_' + str(index),
dag=dag,
provide_context=True,
python_callable=doSomeWork,
op_args=['firstGroup', index])
starting_task.set_downstream(dynamicTask)
dynamicTask.set_downstream(bridge1_task)
# Used to connect the stream in the event that the range is zero
bridge2_task = PythonOperator(
task_id='bridge2',
dag=dag,
provide_context=True,
python_callable=bridge2,
op_args=[])
DynamicWorkflow_Group2 = Variable.get("DynamicWorkflow_Group2")
logging.info("The current DynamicWorkflow value is " + str(DynamicWorkflow_Group2))
for index in range(int(DynamicWorkflow_Group2)):
dynamicTask = PythonOperator(
task_id='secondGroup_' + str(index),
dag=dag,
provide_context=True,
python_callable=doSomeWork,
op_args=['secondGroup', index])
bridge1_task.set_downstream(dynamicTask)
dynamicTask.set_downstream(bridge2_task)
ending_task = PythonOperator(
task_id='end',
dag=dag,
provide_context=True,
python_callable=end,
op_args=[])
DynamicWorkflow_Group3 = Variable.get("DynamicWorkflow_Group3")
logging.info("The current DynamicWorkflow value is " + str(DynamicWorkflow_Group3))
for index in range(int(DynamicWorkflow_Group3)):
# You can make this logic anything you'd like
# I chose to use the PythonOperator for all tasks
# except the last task will use the BashOperator
if index < (int(DynamicWorkflow_Group3) - 1):
dynamicTask = PythonOperator(
task_id='thirdGroup_' + str(index),
dag=dag,
provide_context=True,
python_callable=doSomeWork,
op_args=['thirdGroup', index])
else:
dynamicTask = BashOperator(
task_id='thirdGroup_' + str(index),
bash_command='touch /home/ec2-user/airflow/thirdGroup_' + str(index) + '.txt',
dag=dag)
bridge2_task.set_downstream(dynamicTask)
dynamicTask.set_downstream(ending_task)
# If you do not connect these then in the event that your range is ever zero you will have a disconnection between your stream
# and your tasks will run simultaneously instead of in your desired stream order.
starting_task.set_downstream(bridge1_task)
bridge1_task.set_downstream(bridge2_task)
bridge2_task.set_downstream(ending_task)
Before you run the DAG create these three Airflow Variables
在运行 DAG 之前创建这三个气流变量
airflow variables --set DynamicWorkflow_Group1 1
airflow variables --set DynamicWorkflow_Group2 0
airflow variables --set DynamicWorkflow_Group3 0
You'll see that the DAG goes from this
你会看到 DAG 从这里开始
To this after it's ran
运行后到这个
You can see more information on this DAG in my article on creating Dynamic Workflows On Airflow.
回答by Ena
OA: "Is there any way in Airflow to create a workflow such that the number of tasks B.* is unknown until completion of Task A?"
OA:“Airflow 中是否有任何方法可以创建工作流,使得任务 B.* 的数量在任务 A 完成之前是未知的?”
Short answer is no. Airflow will build the DAG flow before starting to running it.
简短的回答是否定的。Airflow 将在开始运行之前构建 DAG 流。
That said we came to a simple conclusion, that is we don't have such needing. When you want to parallelize some work you should evaluate the resources you have available and not the number of items to process.
也就是说,我们得出了一个简单的结论,那就是我们没有这样的需求。当您想要并行处理某些工作时,您应该评估可用的资源,而不是要处理的项目数量。
We did it like this: we dynamically generate a fixed number of tasks, say 10, that will split the job. For example if we need to process 100 files each task will process 10 of them. I will post the code later today.
我们是这样做的:我们动态生成固定数量的任务,比如 10 个,这将拆分工作。例如,如果我们需要处理 100 个文件,每个任务将处理其中的 10 个。我将在今天晚些时候发布代码。
Update
更新
Here is the code, sorry for the delay.
这是代码,抱歉延迟。
from datetime import datetime, timedelta
import airflow
from airflow.operators.dummy_operator import DummyOperator
args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 1, 8),
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(seconds=5)
}
dag = airflow.DAG(
'parallel_tasks_v1',
schedule_interval="@daily",
catchup=False,
default_args=args)
# You can read this from variables
parallel_tasks_total_number = 10
start_task = DummyOperator(
task_id='start_task',
dag=dag
)
# Creates the tasks dynamically.
# Each one will elaborate one chunk of data.
def create_dynamic_task(current_task_number):
return DummyOperator(
provide_context=True,
task_id='parallel_task_' + str(current_task_number),
python_callable=parallelTask,
# your task will take as input the total number and the current number to elaborate a chunk of total elements
op_args=[current_task_number, int(parallel_tasks_total_number)],
dag=dag)
end = DummyOperator(
task_id='end',
dag=dag)
for page in range(int(parallel_tasks_total_number)):
created_task = create_dynamic_task(page)
start_task >> created_task
created_task >> end
Code explanation:
代码说明:
Here we have a single start task and a single end task (both dummy).
这里我们有一个开始任务和一个结束任务(都是虚拟的)。
Then from the start task with the for loop we create 10 tasks with the same python callable. The tasks are created in the function create_dynamic_task.
然后从带有 for 循环的 start 任务开始,我们创建了 10 个具有相同 python 可调用项的任务。任务在函数 create_dynamic_task 中创建。
To each python callable we pass as arguments the total number of parallel tasks and the current task index.
我们将并行任务的总数和当前任务索引作为参数传递给每个 python 可调用对象。
Suppose you have 1000 items to elaborate: the first task will receive in input that it should elaborate the first chunk out of 10 chunks. It will divide the 1000 items into 10 chunks and elaborate the first one.
假设您有 1000 个项目要详细说明:第一个任务将收到输入,它应该详细说明 10 个块中的第一个块。它将 1000 个项目分成 10 个块并详细说明第一个。
回答by flinz
I think I have found a nicer solution to this at https://github.com/mastak/airflow_multi_dagrun, which uses simple enqueuing of DagRuns by triggering multiple dagruns, similar to TriggerDagRuns. Most of the credits go to https://github.com/mastak, although I had to patch some detailsto make it work with the most recent airflow.
我想我在https://github.com/mastak/airflow_multi_dagrun找到了一个更好的解决方案,它通过触发多个 dagruns 来使用 DagRuns 的简单排队,类似于TriggerDagRuns。大多数功劳都归于 https://github.com/mastak,尽管我不得不修补一些细节以使其与最新的气流一起工作。
The solution uses a custom operator that triggers several DagRuns:
该解决方案使用触发多个 DagRuns的自定义运算符:
from airflow import settings
from airflow.models import DagBag
from airflow.operators.dagrun_operator import DagRunOrder, TriggerDagRunOperator
from airflow.utils.decorators import apply_defaults
from airflow.utils.state import State
from airflow.utils import timezone
class TriggerMultiDagRunOperator(TriggerDagRunOperator):
CREATED_DAGRUN_KEY = 'created_dagrun_key'
@apply_defaults
def __init__(self, op_args=None, op_kwargs=None,
*args, **kwargs):
super(TriggerMultiDagRunOperator, self).__init__(*args, **kwargs)
self.op_args = op_args or []
self.op_kwargs = op_kwargs or {}
def execute(self, context):
context.update(self.op_kwargs)
session = settings.Session()
created_dr_ids = []
for dro in self.python_callable(*self.op_args, **context):
if not dro:
break
if not isinstance(dro, DagRunOrder):
dro = DagRunOrder(payload=dro)
now = timezone.utcnow()
if dro.run_id is None:
dro.run_id = 'trig__' + now.isoformat()
dbag = DagBag(settings.DAGS_FOLDER)
trigger_dag = dbag.get_dag(self.trigger_dag_id)
dr = trigger_dag.create_dagrun(
run_id=dro.run_id,
execution_date=now,
state=State.RUNNING,
conf=dro.payload,
external_trigger=True,
)
created_dr_ids.append(dr.id)
self.log.info("Created DagRun %s, %s", dr, now)
if created_dr_ids:
session.commit()
context['ti'].xcom_push(self.CREATED_DAGRUN_KEY, created_dr_ids)
else:
self.log.info("No DagRun created")
session.close()
You can then submit several dagruns from the callable function in your PythonOperator, for example:
然后,您可以从 PythonOperator 中的可调用函数提交多个 dagruns,例如:
from airflow.operators.dagrun_operator import DagRunOrder
from airflow.models import DAG
from airflow.operators import TriggerMultiDagRunOperator
from airflow.utils.dates import days_ago
def generate_dag_run(**kwargs):
for i in range(10):
order = DagRunOrder(payload={'my_variable': i})
yield order
args = {
'start_date': days_ago(1),
'owner': 'airflow',
}
dag = DAG(
dag_id='simple_trigger',
max_active_runs=1,
schedule_interval='@hourly',
default_args=args,
)
gen_target_dag_run = TriggerMultiDagRunOperator(
task_id='gen_target_dag_run',
dag=dag,
trigger_dag_id='common_target',
python_callable=generate_dag_run
)
I created a fork with the code at https://github.com/flinz/airflow_multi_dagrun
我用https://github.com/flinz/airflow_multi_dagrun的代码创建了一个分支
回答by rotten
The jobs graph is not generated at run time. Rather the graph is built when it is picked up by Airflow from your dags folder. Therefore it isn't really going to be possible to have a different graph for the job every time it runs. You can configure a job to build a graph based on a query at loadtime. That graph will remain the same for every run after that, which is probably not very useful.
作业图不是在运行时生成的。而是在 Airflow 从您的 dags 文件夹中提取图形时构建该图形。因此,每次运行时都不可能为作业提供不同的图表。您可以配置作业以在加载时基于查询构建图形。之后每次运行该图都将保持不变,这可能不是很有用。
You can design a graph which executes different tasks on every run based on query results by using a Branch Operator.
您可以设计一个图表,通过使用 Branch Operator 根据查询结果在每次运行时执行不同的任务。
What I've done is to pre-configure a set of tasks and then take the query results and distribute them across the tasks. This is probably better anyhow because if your query returns a lot of results, you probably don't want to flood the scheduler with a lot of concurrent tasks anyhow. To be even safer, I also used a pool to ensure my concurrency doesn't get out of hand with an unexpectedly large query.
我所做的是预先配置一组任务,然后获取查询结果并将它们分发到任务中。无论如何,这可能会更好,因为如果您的查询返回大量结果,您可能不想用大量并发任务淹没调度程序。为了更安全,我还使用了一个池来确保我的并发性不会因意外大的查询而失控。
"""
- This is an idea for how to invoke multiple tasks based on the query results
"""
import logging
from datetime import datetime
from airflow import DAG
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.mysql_operator import MySqlOperator
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from include.run_celery_task import runCeleryTask
########################################################################
default_args = {
'owner': 'airflow',
'catchup': False,
'depends_on_past': False,
'start_date': datetime(2019, 7, 2, 19, 50, 00),
'email': ['rotten@stackoverflow'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 0,
'max_active_runs': 1
}
dag = DAG('dynamic_tasks_example', default_args=default_args, schedule_interval=None)
totalBuckets = 5
get_orders_query = """
select
o.id,
o.customer
from
orders o
where
o.created_at >= current_timestamp at time zone 'UTC' - '2 days'::interval
and
o.is_test = false
and
o.is_processed = false
"""
###########################################################################################################
# Generate a set of tasks so we can parallelize the results
def createOrderProcessingTask(bucket_number):
return PythonOperator(
task_id=f'order_processing_task_{bucket_number}',
python_callable=runOrderProcessing,
pool='order_processing_pool',
op_kwargs={'task_bucket': f'order_processing_task_{bucket_number}'},
provide_context=True,
dag=dag
)
# Fetch the order arguments from xcom and doStuff() to them
def runOrderProcessing(task_bucket, **context):
orderList = context['ti'].xcom_pull(task_ids='get_open_orders', key=task_bucket)
if orderList is not None:
for order in orderList:
logging.info(f"Processing Order with Order ID {order[order_id]}, customer ID {order[customer_id]}")
doStuff(**op_kwargs)
# Discover the orders we need to run and group them into buckets for processing
def getOpenOrders(**context):
myDatabaseHook = PostgresHook(postgres_conn_id='my_database_conn_id')
# initialize the task list buckets
tasks = {}
for task_number in range(0, totalBuckets):
tasks[f'order_processing_task_{task_number}'] = []
# populate the task list buckets
# distribute them evenly across the set of buckets
resultCounter = 0
for record in myDatabaseHook.get_records(get_orders_query):
resultCounter += 1
bucket = (resultCounter % totalBuckets)
tasks[f'order_processing_task_{bucket}'].append({'order_id': str(record[0]), 'customer_id': str(record[1])})
# push the order lists into xcom
for task in tasks:
if len(tasks[task]) > 0:
logging.info(f'Task {task} has {len(tasks[task])} orders.')
context['ti'].xcom_push(key=task, value=tasks[task])
else:
# if we didn't have enough tasks for every bucket
# don't bother running that task - remove it from the list
logging.info(f"Task {task} doesn't have any orders.")
del(tasks[task])
return list(tasks.keys())
###################################################################################################
# this just makes sure that there aren't any dangling xcom values in the database from a crashed dag
clean_xcoms = MySqlOperator(
task_id='clean_xcoms',
mysql_conn_id='airflow_db',
sql="delete from xcom where dag_id='{{ dag.dag_id }}'",
dag=dag)
# Ideally we'd use BranchPythonOperator() here instead of PythonOperator so that if our
# query returns fewer results than we have buckets, we don't try to run them all.
# Unfortunately I couldn't get BranchPythonOperator to take a list of results like the
# documentation says it should (Airflow 1.10.2). So we call all the bucket tasks for now.
get_orders_task = PythonOperator(
task_id='get_orders',
python_callable=getOpenOrders,
provide_context=True,
dag=dag
)
get_orders_task.set_upstream(clean_xcoms)
# set up the parallel tasks -- these are configured at compile time, not at run time:
for bucketNumber in range(0, totalBuckets):
taskBucket = createOrderProcessingTask(bucketNumber)
taskBucket.set_upstream(get_orders_task)
###################################################################################################
回答by Muhammad Bin Ali
What I think your are looking for is creating DAG dynamically I encountered this type of situation few days ago after some search I found this blog.
我认为您正在寻找的是动态创建 DAG 我在几天前经过一番搜索后遇到了这种情况,我发现了这个博客。
Dynamic Task Generation
动态任务生成
start = DummyOperator(
task_id='start',
dag=dag
)
end = DummyOperator(
task_id='end',
dag=dag)
def createDynamicETL(task_id, callableFunction, args):
task = PythonOperator(
task_id = task_id,
provide_context=True,
#Eval is used since the callableFunction var is of type string
#while the python_callable argument for PythonOperators only receives objects of type callable not strings.
python_callable = eval(callableFunction),
op_kwargs = args,
xcom_push = True,
dag = dag,
)
return task
Setting the DAG workflow
设置 DAG 工作流程
with open('/usr/local/airflow/dags/config_files/dynamicDagConfigFile.yaml') as f:
# Use safe_load instead to load the YAML file
configFile = yaml.safe_load(f)
# Extract table names and fields to be processed
tables = configFile['tables']
# In this loop tasks are created for each table defined in the YAML file
for table in tables:
for table, fieldName in table.items():
# In our example, first step in the workflow for each table is to get SQL data from db.
# Remember task id is provided in order to exchange data among tasks generated in dynamic way.
get_sql_data_task = createDynamicETL('{}-getSQLData'.format(table),
'getSQLData',
{'host': 'host', 'user': 'user', 'port': 'port', 'password': 'pass',
'dbname': configFile['dbname']})
# Second step is upload data to s3
upload_to_s3_task = createDynamicETL('{}-uploadDataToS3'.format(table),
'uploadDataToS3',
{'previous_task_id': '{}-getSQLData'.format(table),
'bucket_name': configFile['bucket_name'],
'prefix': configFile['prefix']})
# This is where the magic lies. The idea is that
# once tasks are generated they should linked with the
# dummy operators generated in the start and end tasks.
# Then you are done!
start >> get_sql_data_task
get_sql_data_task >> upload_to_s3_task
upload_to_s3_task >> end
This is how our DAG looks like after putting the code together
import yaml
import airflow
from airflow import DAG
from datetime import datetime, timedelta, time
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
start = DummyOperator(
task_id='start',
dag=dag
)
def createDynamicETL(task_id, callableFunction, args):
task = PythonOperator(
task_id=task_id,
provide_context=True,
# Eval is used since the callableFunction var is of type string
# while the python_callable argument for PythonOperators only receives objects of type callable not strings.
python_callable=eval(callableFunction),
op_kwargs=args,
xcom_push=True,
dag=dag,
)
return task
end = DummyOperator(
task_id='end',
dag=dag)
with open('/usr/local/airflow/dags/config_files/dynamicDagConfigFile.yaml') as f:
# use safe_load instead to load the YAML file
configFile = yaml.safe_load(f)
# Extract table names and fields to be processed
tables = configFile['tables']
# In this loop tasks are created for each table defined in the YAML file
for table in tables:
for table, fieldName in table.items():
# In our example, first step in the workflow for each table is to get SQL data from db.
# Remember task id is provided in order to exchange data among tasks generated in dynamic way.
get_sql_data_task = createDynamicETL('{}-getSQLData'.format(table),
'getSQLData',
{'host': 'host', 'user': 'user', 'port': 'port', 'password': 'pass',
'dbname': configFile['dbname']})
# Second step is upload data to s3
upload_to_s3_task = createDynamicETL('{}-uploadDataToS3'.format(table),
'uploadDataToS3',
{'previous_task_id': '{}-getSQLData'.format(table),
'bucket_name': configFile['bucket_name'],
'prefix': configFile['prefix']})
# This is where the magic lies. The idea is that
# once tasks are generated they should linked with the
# dummy operators generated in the start and end tasks.
# Then you are done!
start >> get_sql_data_task
get_sql_data_task >> upload_to_s3_task
upload_to_s3_task >> end
It was very help full hope It will also help some one else
这是非常有帮助的,希望它也能帮助其他人
回答by MarMat
I found this Medium postwhich is very similar to this question. However it is full of typos, and does not work when I tried implementing it.
我发现这个Medium post与这个问题非常相似。然而,它充满了错别字,当我尝试实现它时不起作用。
My answer to the above is as follows:
我对上面的回答如下:
If you are creating tasks dynamically you must do so by iterating over something which is not created by an upstream task, or can be defined independently of that task.I learned that you can't pass execution dates or other airflow variables to something outside of a template (e.g., a task) as many others have pointed out before. See also this post.
如果您正在动态创建任务,则必须通过迭代不是由上游任务创建或可以独立于该任务定义的内容来实现。我了解到您不能像许多其他人之前指出的那样将执行日期或其他气流变量传递给模板之外的东西(例如,任务)。另见这篇文章。