Airflow - Python 文件不在同一个 DAG 文件夹中

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/33510365/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-19 13:28:48  来源:igfitidea点击:

Airflow - Python file NOT in the same DAG folder

pythoncelerycelery-taskairflow

提问by p.magalhaes

I am trying to use Airflow to execute a simple task python.

我正在尝试使用 Airflow 来执行一个简单的任务 python。

from __future__ import print_function
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG
from datetime import datetime, timedelta


from pprint import pprint

seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
                                  datetime.min.time())

args = {
    'owner': 'airflow',
    'start_date': seven_days_ago,
}

dag = DAG(dag_id='python_test', default_args=args)


def print_context(ds, **kwargs):
    pprint(kwargs)
    print(ds)
    return 'Whatever you return gets printed in the logs'

run_this = PythonOperator(
    task_id='print',
    provide_context=True,
    python_callable=print_context,
    dag=dag)

If i try, for example:

如果我尝试,例如:

airflow test python_test print 2015-01-01

气流测试 python_test 打印 2015-01-01

It works!

有用!

Now i want to put my def print_context(ds, **kwargs)function in other python file. So i create antoher file called: simple_test.py and change:

现在我想把我的def print_context(ds, **kwargs)函数放在其他 python 文件中。所以我创建了另一个名为:simple_test.py 的文件并更改:

run_this = PythonOperator(
    task_id='print',
    provide_context=True,
    python_callable=simple_test.print_context,
    dag=dag)

Now I try to run again:

现在我尝试再次运行:

airflow test python_test print 2015-01-01

气流测试 python_test 打印 2015-01-01

And OK! It still work!

好!它仍然有效!

But if i create a module, for example, worker module with file SimplePython.py, import (from worker import SimplePython)it and try:

但是,如果我创建一个模块,例如,带有 file 的 worker 模块SimplePython.py,请导入 ( from worker import SimplePython)it 并尝试:

airflow test python_test print 2015-01-01

气流测试 python_test 打印 2015-01-01

It gives the message :

它给出了以下信息:

ImportError: No module named worker

导入错误:没有名为 worker 的模块

The questions:

问题:

  1. Is it possible to import a module inside a DAG definition?
  2. How Airflow+Celery is going to distribute all necessary python sources files across the worker nodes?
  1. 是否可以在 DAG 定义中导入模块?
  2. Airflow+Celery 将如何在工作节点之间分发所有必要的 Python 源文件?

回答by Yongyiw

For your first question, it is possible.

对于你的第一个问题,这是可能的。

And I guess you should create an empty file named __init__.pyunder the same directory with SimplePython.py(It is workerdirectory in your case). By doing that workerdirectory will be regarded as a python module.

我想你应该__init__.py在同一个目录下创建一个空文件SimplePython.pyworker在你的情况下是目录)。通过这样做,该worker目录将被视为一个python模块。

Then in your DAG definition, try from worker.SimplePython import print_context.

然后在您的 DAG 定义中,尝试from worker.SimplePython import print_context.

In you case, I guess it would be better if you write a plugin for airflow, because you might want to upgrade airflow core project without removing your customized functions.

在您的情况下,我想如果您为气流编写插件会更好,因为您可能希望在不删除自定义功能的情况下升级气流核心项目。

回答by nono

For your second question : How Airflow+Celery is going to distribute all necessary python sources files across the worker nodes?

对于您的第二个问题:Airflow+Celery 将如何在工作节点之间分发所有必要的 Python 源文件?

From documentation : The worker needs to have access to its DAGS_FOLDER, and you need to synchronize the filesystems by your own mean. A common setup would be to store your DAGS_FOLDER in a Git repository and sync it across machines using Chef, Puppet, Ansible, or whatever you use to configure machines in your environment. If all your boxes have a common mount point, having your pipelines files shared there should work as well

来自文档:worker 需要访问其 DAGS_FOLDER,您需要通过自己的方式同步文件系统。一个常见的设置是将您的 DAGS_FOLDER 存储在 Git 存储库中,并使用 Chef、Puppet、Ansible 或您在环境中用于配置机器的任何工具在机器之间同步它。如果你所有的机器都有一个共同的挂载点,让你的管道文件在那里共享也应该工作

http://pythonhosted.org/airflow/installation.html?highlight=chef

http://pythonhosted.org/airflow/installation.html?highlight=chef

回答by ImDarrenG

You can package dependencies of your DAG as per:

您可以按照以下方式打包 DAG 的依赖项:

https://airflow.apache.org/concepts.html#packaged-dags

https://airflow.apache.org/concepts.html#packaged-dags

To allow this you can create a zip file that contains the dag(s) in the root of the zip file and have the extra modules unpacked in directories. For instance you can create a zip file that looks like this:

为此,您可以在 zip 文件的根目录中创建一个包含 dag 的 zip 文件,并将额外的模块解压缩到目录中。例如,您可以创建一个如下所示的 zip 文件:

my_dag1.py
my_dag2.py
package1/__init__.py
package1/functions.py

Airflow will scan the zip file and try to load my_dag1.py and my_dag2.py. It will not go into subdirectories as these are considered to be potential packages.

Airflow 将扫描 zip 文件并尝试加载 my_dag1.py 和 my_dag2.py。它不会进入子目录,因为这些被认为是潜在的包。

When using CeleryExecutor, you need to manually sync DAG directories, Airflow doesn't take care of that for you:

使用 CeleryExecutor 时,您需要手动同步 DAG 目录,Airflow 不会为您处理:

https://airflow.apache.org/configuration.html?highlight=scaling%20out%20celery#scaling-out-with-celery

https://airflow.apache.org/configuration.html?highlight=scaling%20out%20celery#scaling-out-with-celery

The worker needs to have access to its DAGS_FOLDER, and you need to synchronize the filesystems by your own means

worker 需要访问其 DAGS_FOLDER,您需要通过自己的方式同步文件系统

回答by 7yl4r

While packaging your dags into a zip as covered in the docs is the only supported solution I have seen, you can also do imports of modules that are inside the dags folder. This is useful if you sync the dags folder automatically using other tools like puppet & git.

虽然将您的 dags 打包成 zip 中的文档是我见过的唯一支持的解决方案,但您也可以导入 dags 文件夹内的模块。如果您使用其他工具(例如 puppet 和 git)自动同步 dags 文件夹,这将非常有用。

I am not clear on your directory structure from the question, so here is an example dags folder based on a typical python project structure:

我从问题中不清楚你的目录结构,所以这里是一个基于典型 python 项目结构的示例 dags 文件夹:

└── airflow/dags  # root airflow dags folder where all dags live
    └── my_dags  # git repo project root
        ├── my_dags  # python src root (usually named same as project)
        │?? ├── my_test_globals.py  # file I want to import
        │?? ├── dag_in_package.py 
        │   └── dags 
        │        └── dag_in_subpackage.py
        ├── README.md  # also setup.py, LICENSE, etc here
        └── dag_in_project_root.py

I have left out the (required [1]) __init__.pyfiles. Note the location of the three example dags. You would almost certainly use only one of these places for all your dags. I include them all here for sake of example because it shouldn't matter for the import. To import my_test_globalsfrom any of them:

我遗漏了(必需的 [ 1])__init__.py文件。请注意三个示例 dag 的位置。您几乎肯定会只使用这些地方中的一个来存放您的所有 dag。为了举例,我把它们都包括在这里,因为这对导入来说无关紧要。要从my_test_globals其中任何一个导入:

from my_dags.my_dags import my_test_globals

I believe this means that airflow runs with the python path set to the dags directory so each subdirectory of the dags folder can be treated as a python package. In my case it was the additional intermediate project root directory getting in the way of doing a typical intra-package absolute import. Thus, we could restructure this airflow project like this:

我相信这意味着气流运行时将 python 路径设置为 dags 目录,因此 dags 文件夹的每个子目录都可以被视为一个 python 包。就我而言,它是额外的中间项目根目录妨碍了进行典型的包内绝对导入。因此,我们可以像这样重构这个气流项目:

└── airflow/dags  # root airflow dags folder where all dags live
    └── my_dags  # git repo project root & python src root
        ├── my_test_globals.py  # file I want to import
        ├── dag_in_package.py 
        ├── dags 
        │    └── dag_in_subpackage.py
        ├── README.md  # also setup.py, LICENSE, etc here
        └── dag_in_project_root.py

So that imports look as we expect them to:

所以进口看起来像我们期望的那样:

from my_dags import my_test_globals