Python 如何防止气流回填 dag 运行?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/38751872/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to prevent airflow from backfilling dag runs?
提问by m0meni
Say you have an airflow DAG that doesn't make sense to backfill, meaning that, after it's run once, running it subsequent times quickly would be completely pointless.
假设您有一个回填没有意义的气流 DAG,这意味着,在它运行一次之后,快速运行它之后的几次将完全没有意义。
For example, if you're loading data from some source that is only updated hourly into your database, backfilling, which occurs in rapid succession, would just be importing the same data again and again.
例如,如果您从某个仅每小时更新一次的数据源加载数据到您的数据库中,快速连续发生的回填只会一次又一次地导入相同的数据。
This is especially annoying when you instantiate a new hourly task, and it runs N
amount of times for each hour it missed, doing redundant work, before it starts running on the interval you specified.
当您实例化一个新的每小时任务时,这尤其令人讨厌,并且N
在它开始按您指定的时间间隔运行之前,它会为它错过的每一小时运行多次,做多余的工作。
The only solution I can think of is something that they specifically advised against in FAQ of the docs
我能想到的唯一解决方案是他们在文档的常见问题解答中特别建议反对的东西
We recommend against using dynamic values as start_date, especially
datetime.now()
as it can be quite confusing.
我们建议不要使用动态值作为 start_date,特别是
datetime.now()
因为它可能会非常混乱。
Is there any way to disable backfilling for a DAG, or should I do the above?
有什么方法可以禁用 DAG 的回填,还是应该执行上述操作?
采纳答案by sage88
Upgrade to airflow version 1.8 and use catchup_by_default=False in the airflow.cfg or apply catchup=False to each of your dags.
升级到airflow 1.8 版并在airflow.cfg 中使用catchup_by_default=False 或将catchup=False 应用于您的每个dag。
https://github.com/apache/incubator-airflow/blob/master/UPDATING.md#catchup_by_default
https://github.com/apache/incubator-airflow/blob/master/UPDATING.md#catchup_by_default
回答by Ziggy Eunicien
This appears to be an unsolved Airflow problem. I know I would really like to have exactly the same feature. Here is as far as I've gotten; it may be useful to others.
这似乎是一个未解决的气流问题。我知道我真的很想拥有完全相同的功能。这是我所得到的;它可能对其他人有用。
The are UI features (at least in 1.7.1.3) which can help with this problem. If you go to the Tree view and click on a specific task (square boxes), a dialog button will come up with a 'mark success' button. Clicking 'past', then clicking 'mark success' will label all the instances of that task in DAG as successful and they will not be run. The top level DAG (circles on top) can also be labeled as successful in a similar fashion, but there doesn't appear to be way to label multiple DAG instances.
可以帮助解决此问题的 UI 功能(至少在 1.7.1.3 中)。如果您转到树视图并单击特定任务(方框),则会出现一个带有“标记成功”按钮的对话框按钮。单击“过去”,然后单击“标记成功”会将 DAG 中该任务的所有实例标记为成功,并且它们不会运行。顶级 DAG(顶部的圆圈)也可以以类似的方式标记为成功,但似乎没有办法标记多个 DAG 实例。
I haven't looked into it deeply enough yet, but it may be possible to use the 'trigger_dag' subcommand to mark states of DAGs. see here: https://github.com/apache/incubator-airflow/pull/644/commits/4d30d4d79f1a18b071b585500474248e5f46d67d
我还没有对它进行足够深入的研究,但可能可以使用 'trigger_dag' 子命令来标记 DAG 的状态。见这里:https: //github.com/apache/incubator-airflow/pull/644/commits/4d30d4d79f1a18b071b585500474248e5f46d67d
A CLI feature to mark DAGs is in the works: http://mail-archives.apache.org/mod_mbox/airflow-commits/201606.mbox/%[email protected]%3Ehttps://github.com/apache/incubator-airflow/pull/1590
用于标记 DAG 的 CLI 功能正在开发中: http ://mail-archives.apache.org/mod_mbox/airflow-commits/201606.mbox/%3CJIRA.12973462.1464369259000.37918.146518985las%3EJIRA://github%EJIRA .com/apache/incubator-airflow/pull/1590
UPDATE (9/28/2016): A new operator 'LatestOnlyOperator' has been added (https://github.com/apache/incubator-airflow/pull/1752) which will only run the latest version of downstream tasks. Sounds very useful and hopefully it will make it into the releases soon
更新(2016 年 9 月 28 日):添加了一个新的运营商“LatestOnlyOperator”(https://github.com/apache/incubator-airflow/pull/1752),它只会运行最新版本的下游任务。听起来非常有用,希望它很快就会发布
UPDATE 2: As of airflow 1.8, the LatestOnlyOperator
has been released.
更新 2:从气流 1.8 开始,LatestOnlyOperator
已发布。
回答by Ben Tallman
Setting catchup=False in your dag declaration will provide this exact functionality.
在您的 dag 声明中设置 catchup=False 将提供这个确切的功能。
I don't have the "reputation" to comment, but I wanted to say that catchup=False was designed (by me) for this exact purpose. In addition, I can verify that in 1.10.1 it is working when set explicitly in the instantiation. However I do not see it working when placed in the default args. I've been away from Airflow for 18 months though, so it will be a bit before I can take a look at why the default args isn't working for catchup.
我没有评论的“声誉”,但我想说 catchup=False 是(由我)为此确切目的而设计的。此外,我可以验证在 1.10.1 中它在实例化中显式设置时是否正常工作。但是,当放置在默认参数中时,我看不到它的工作原理。不过,我已经离开 Airflow 18 个月了,所以在我了解为什么默认 args 无法追赶之前还需要一段时间。
dag = DAG('example_dag',
max_active_runs=3,
catchup=False,
schedule_interval=timedelta(minutes=5),
default_args=default_args)