Python 如何防止气流回填 dag 运行?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/38751872/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-19 21:24:02  来源:igfitidea点击:

How to prevent airflow from backfilling dag runs?

pythonscheduled-tasksairflow

提问by m0meni

Say you have an airflow DAG that doesn't make sense to backfill, meaning that, after it's run once, running it subsequent times quickly would be completely pointless.

假设您有一个回填没有意义的气流 DAG,这意味着,在它运行一次之后,快速运行它之后的几次将完全没有意义。

For example, if you're loading data from some source that is only updated hourly into your database, backfilling, which occurs in rapid succession, would just be importing the same data again and again.

例如,如果您从某个仅每小时更新一次的数据源加载数据到您的数据库中,快速连续发生的回填只会一次又一次地导入相同的数据。

This is especially annoying when you instantiate a new hourly task, and it runs Namount of times for each hour it missed, doing redundant work, before it starts running on the interval you specified.

当您实例化一个新的每小时任务时,这尤其令人讨厌,并且N在它开始按您指定的时间间隔运行之前,它会为它错过的每一小时运行多次,做多余的工作。

The only solution I can think of is something that they specifically advised against in FAQ of the docs

我能想到的唯一解决方案是他们在文档的常见问题解答中特别建议反对的东西

We recommend against using dynamic values as start_date, especially datetime.now()as it can be quite confusing.

我们建议不要使用动态值作为 start_date,特别是datetime.now()因为它可能会非常混乱。

Is there any way to disable backfilling for a DAG, or should I do the above?

有什么方法可以禁用 DAG 的回填,还是应该执行上述操作?

采纳答案by sage88

Upgrade to airflow version 1.8 and use catchup_by_default=False in the airflow.cfg or apply catchup=False to each of your dags.

升级到airflow 1.8 版并在airflow.cfg 中使用catchup_by_default=False 或将catchup=False 应用于您的每个dag。

https://github.com/apache/incubator-airflow/blob/master/UPDATING.md#catchup_by_default

https://github.com/apache/incubator-airflow/blob/master/UPDATING.md#catchup_by_default

回答by Ziggy Eunicien

This appears to be an unsolved Airflow problem. I know I would really like to have exactly the same feature. Here is as far as I've gotten; it may be useful to others.

这似乎是一个未解决的气流问题。我知道我真的很想拥有完全相同的功能。这是我所得到的;它可能对其他人有用。

The are UI features (at least in 1.7.1.3) which can help with this problem. If you go to the Tree view and click on a specific task (square boxes), a dialog button will come up with a 'mark success' button. Clicking 'past', then clicking 'mark success' will label all the instances of that task in DAG as successful and they will not be run. The top level DAG (circles on top) can also be labeled as successful in a similar fashion, but there doesn't appear to be way to label multiple DAG instances.

可以帮助解决此问题的 UI 功能(至少在 1.7.1.3 中)。如果您转到树视图并单击特定任务(方框),则会出现一个带有“标记成功”按钮的对话框按钮。单击“过去”,然后单击“标记成功”会将 DAG 中该任务的所有实例标记为成功,并且它们不会运行。顶级 DAG(顶部的圆圈)也可以以类似的方式标记为成功,但似乎没有办法标记多个 DAG 实例。

I haven't looked into it deeply enough yet, but it may be possible to use the 'trigger_dag' subcommand to mark states of DAGs. see here: https://github.com/apache/incubator-airflow/pull/644/commits/4d30d4d79f1a18b071b585500474248e5f46d67d

我还没有对它进行足够深入的研究,但可能可以使用 'trigger_dag' 子命令来标记 DAG 的状态。见这里:https: //github.com/apache/incubator-airflow/pull/644/commits/4d30d4d79f1a18b071b585500474248e5f46d67d

A CLI feature to mark DAGs is in the works: http://mail-archives.apache.org/mod_mbox/airflow-commits/201606.mbox/%[email protected]%3Ehttps://github.com/apache/incubator-airflow/pull/1590

用于标记 DAG 的 CLI 功能正在开发中: http ://mail-archives.apache.org/mod_mbox/airflow-commits/201606.mbox/%3CJIRA.12973462.1464369259000.37918.146518985las%3EJIRA://github%EJIRA .com/apache/incubator-airflow/pull/1590

UPDATE (9/28/2016): A new operator 'LatestOnlyOperator' has been added (https://github.com/apache/incubator-airflow/pull/1752) which will only run the latest version of downstream tasks. Sounds very useful and hopefully it will make it into the releases soon

更新(2016 年 9 月 28 日):添加了一个新的运营商“LatestOnlyOperator”(https://github.com/apache/incubator-airflow/pull/1752),它只会运行最新版本的下游任务。听起来非常有用,希望它很快就会发布

UPDATE 2: As of airflow 1.8, the LatestOnlyOperatorhas been released.

更新 2:从气流 1.8 开始,LatestOnlyOperator已发布。

回答by Ben Tallman

Setting catchup=False in your dag declaration will provide this exact functionality.

在您的 dag 声明中设置 catchup=False 将提供这个确切的功能。

I don't have the "reputation" to comment, but I wanted to say that catchup=False was designed (by me) for this exact purpose. In addition, I can verify that in 1.10.1 it is working when set explicitly in the instantiation. However I do not see it working when placed in the default args. I've been away from Airflow for 18 months though, so it will be a bit before I can take a look at why the default args isn't working for catchup.

我没有评论的“声誉”,但我想说 catchup=False 是(由我)为此确切目的而设计的。此外,我可以验证在 1.10.1 中它在实例化中显式设置时是否正常工作。但是,当放置在默认参数中时,我看不到它的工作原理。不过,我已经离开 Airflow 18 个月了,所以在我了解为什么默认 args 无法追赶之前还需要一段时间。

dag = DAG('example_dag',
        max_active_runs=3,
        catchup=False,
        schedule_interval=timedelta(minutes=5),
        default_args=default_args)