气流中的 Python 脚本调度

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/41730297/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-20 01:31:40  来源:igfitidea点击:

Python script scheduling in airflow

pythonapache-sparkschedulingreloadairflow

提问by Abhishek Pansotra

Hi everyone,

嗨,大家好,

I need to schedule my python files(which contains data extraction from sql and some joins)using airflow. I have successfully installed airflow into my linux server and webserver of airflow is available with me. But even after going through documentation I am not clear where exactly I need to write script for scheduling and how will that script be available into airflow webserver so I could see the status

我需要使用气流来安排我的 python文件(其中包含从 sql 和一些连接中提取的数据)。我已成功将气流安装到我的 linux 服务器中,并且我可以使用气流网络服务器。但即使通过文档,我也不清楚我到底需要在哪里编写用于调度的脚本以及该脚本如何在气流网络服务器中可用,以便我可以看到状态

As far as the configuration is concerned I know where the dag folder is located in my home directory and also where example dags are located.

就配置而言,我知道 dag 文件夹在我的主目录中的位置以及示例 dag 所在的位置。

Note:Please dont mark this as duplicate with How to run bash script file in Airflow as I need to run python files lying in some different location.

注意:请不要使用 How to run bash script file in Airflow 将其标记为重复,因为我需要运行位于不同位置的 python 文件。

Please find the configuration in Airflow webserver as :

请在 Airflow 网络服务器中找到配置为:

enter image description here

在此处输入图片说明

Below is the screenshot of dag folder in AIRFLOW_HOME dir

下面是AIRFLOW_HOME目录下dag文件夹的截图

enter image description here

在此处输入图片说明

Also find the below screenshot for DAG creation screenshot and Missing DAG error

还可以找到以下 DAG 创建屏幕截图和 Missing DAG 错误的屏幕截图

enter image description here

在此处输入图片说明

enter image description here

在此处输入图片说明

After i select the simpleDAG following error of missing DAG is populated

在我选择了简单的DAG 之后,缺少 DAG 的错误被填充

enter image description here

在此处输入图片说明

回答by postrational

You should probably use the PythonOperatorto call your function. If you want to define the function somewhere else, you can simply import it from a module as long as it's accessible in your PYTHONPATH.

您可能应该使用PythonOperator来调用您的函数。如果你想在其他地方定义函数,你可以简单地从模块中导入它,只要它在你的PYTHONPATH.

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from my_script import my_python_function

dag = DAG('tutorial', default_args=default_args)

PythonOperator(dag=dag,
               task_id='my_task_powered_by_python',
               provide_context=False,
               python_callable=my_python_function,
               op_args=['arguments_passed_to_callable'],
               op_kwargs={'keyword_argument':'which will be passed to function'})

If your function my_python_functionwas in a script file /path/to/my/scripts/dir/my_script.py

如果您的函数my_python_function在脚本文件中/path/to/my/scripts/dir/my_script.py

Then before starting Airflow, you could add the path to your scripts to the PYTHONPATHlike so:

然后在启动 Airflow 之前,您可以将脚本的路径添加到如下PYTHONPATH所示:

export PYTHONPATH=/path/to/my/scripts/dir/:$PYTHONPATH

More information here: https://airflow.incubator.apache.org/code.html#airflow.operators.PythonOperator

更多信息:https: //airflow.incubator.apache.org/code.html#airflow.operators.PythonOperator

Default args and other considerations as in the tutorial: https://airflow.incubator.apache.org/tutorial.html

教程中的默认参数和其他注意事项:https: //airflow.incubator.apache.org/tutorial.html

回答by liferacer

You can also use bashoperator to execute python scripts in Airflow. You can put your scripts in a folder in DAG folder. If your scripts are somewhere else, just give a path to those scripts.

您还可以使用 bashoperator 在 Airflow 中执行 python 脚本。您可以将脚本放在 DAG 文件夹中的文件夹中。如果您的脚本在其他地方,只需给出这些脚本的路径。

    from airflow import DAG
    from airflow.operators import BashOperator,PythonOperator
    from datetime import datetime, timedelta

    seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
                                      datetime.min.time())

    default_args = {
        'owner': 'airflow',
        'depends_on_past': False,
        'start_date': seven_days_ago,
        'email': ['[email protected]'],
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
      )

    dag = DAG('simple', default_args=default_args)
t1 = BashOperator(
    task_id='testairflow',
    bash_command='python /home/airflow/airflow/dags/scripts/file1.py',
    dag=dag)

回答by user7126545

Airflow parses all Python files in $AIRFLOW_HOME/dags (in your case /home/amit/airflow/dags). And that python script should retrun a DAG object back as shown in answer from "postrational". Now when it is being reported as missing that means there is some issue in Python code and Airflow could not load it. Check airflow webserver or scheduler logs for more details, as stderr or stdout goes there.

Airflow 解析 $AIRFLOW_HOME/dags 中的所有 Python 文件(在您的情况下为 /home/amit/airflow/dags)。并且该 python 脚本应该重新运行一个 DAG 对象,如“postrational”的回答所示。现在,当它被报告为丢失时,这意味着 Python 代码中存在一些问题并且 Airflow 无法加载它。检查气流网络服务器或调度程序日志以获取更多详细信息,因为 stderr 或 stdout 在那里。

回答by Siddharth Kumar

  1. Install airflow using Airflow official documentation. Its good idea to install in python virtual environment. http://python-guide-pt-br.readthedocs.io/en/latest/dev/virtualenvs/
  2. When we start airflow first time using
  1. 使用 Airflow 官方文档安装气流。安装在python虚拟环境中是个好主意。 http://python-guide-pt-br.readthedocs.io/en/latest/dev/virtualenvs/
  2. 当我们第一次开始使用气流时

airflow webserver -p <port>

airflow webserver -p <port>

It loads examples dags automatically, It can be disable in $HOME/airflow/airflow.cfg

它会自动加载示例 dag,可以在 $HOME/airflow/airflow.cfg 中禁用它

`load_examples = False`
  1. Create dagsfolder in $HOME/airflow/, put tutorial.py file in dagsfolder from https://airflow.incubator.apache.org/tutorial.html
  2. Do some experiments, make changes in tutorial.py. If you are giving schedule_interval as cron syntax, then 'start_date' : datetime(2017, 7, 7)

    'start_date': datetime.now()
    

    dag = DAG('tutorial', default_args=default_args,schedule_interval="@once")or dag = DAG('tutorial', default_args=default_args,schedule_interval="* * * * *") # schedule each minute

  3. start airflow: $ airflow webserver -p <port>

  4. start scheduler: $ airflow scheduler
  1. 在 $HOME/airflow/ 中创建dags文件夹,将 tutorial.py 文件放在https://airflow.incubator.apache.org/tutorial.html中的dags文件夹中
  2. 做一些实验,在tutorial.py 中进行更改。如果您将 schedule_interval 作为 cron 语法提供,则'start_date' : datetime(2017, 7, 7)

    'start_date': datetime.now()
    

    dag = DAG('tutorial', default_args=default_args,schedule_interval="@once")或者 dag = DAG('tutorial', default_args=default_args,schedule_interval="* * * * *") # schedule each minute

  3. 启动气流: $ airflow webserver -p <port>

  4. 启动调度程序: $ airflow scheduler