使用 PythonOperator 模板文件的气流

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/41749974/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-20 01:33:38  来源:igfitidea点击:

Airflow using template files for PythonOperator

pythonjinja2airflow

提问by dlamblin

The method of getting a BashOperatoror SqlOperatorto pick up an external file for its template is somewhat clearly documented, but looking at the PythonOperatormy test of what I understand from the docs is not working. I am not sure how the templates_extsand templates_dictparameters would correctly interact to pick up a file.

获取BashOperatorSqlOperator获取其模板的外部文件的方法有些清楚地记录在案,但是查看PythonOperator我从文档中理解的内容的测试不起作用。我不确定templates_extstemplates_dict参数如何正确交互以获取文件。

In my dags folder I've created: pyoptemplate.sqland pyoptemplate.tas well as test_python_operator_template.py:

在我的DAG文件夹我创建:pyoptemplate.sqlpyoptemplate.t以及test_python_operator_template.py

pyoptemplate.sql:

pyoptemplate.sql:

SELECT * FROM {{params.table}};

pyoptemplate.t:

pyoptemplate.t:

SELECT * FROM {{params.table}};

test_python_operator_template.py:

test_python_operator_template.py:

# coding: utf-8
# vim:ai:si:et:sw=4 ts=4 tw=80
"""
# A Test of Templates in PythonOperator
"""

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

import pprint

pp = pprint.PrettyPrinter(indent=4)


def templated_function(ds, **kwargs):
    """This function will try to use templates loaded from external files"""
    pp.pprint(ds)
    pp.pprint(kwargs)


# Define the DAG
dag = DAG(dag_id='test_python_operator_template_dag',
          default_args={"owner": "lamblin",
                        "start_date": datetime.now()},
          template_searchpath=['/Users/daniellamblin/airflow/dags'],
          schedule_interval='@once')


# Define the single task in this controller example DAG
op = PythonOperator(task_id='test_python_operator_template',
                    provide_context=True,
                    python_callable=templated_function,
                    templates_dict={
                        'pyoptemplate': '',
                        'pyoptemplate.sql': '',
                        'sql': 'pyoptemplate',
                        'file1':'pyoptemplate.sql',
                        'file2':'pyoptemplate.t',
                        'table': '{{params.table}}'},
                    templates_exts=['.sql','.t'],
                    params={'condition_param': True,
                            'message': 'Hello World',
                            'table': 'TEMP_TABLE'},
                    dag=dag)

The result from a run shows that tablewas templated correctly as a string, but the others did not pull in any files for templating.

运行结果显示,它table被正确地模板化为字符串,但其他人没有提取任何文件进行模板化。

dlamblin$ airflow test test_python_operator_template_dag test_python_operator_template 2017-01-18
[2017-01-18 23:58:06,698] {__init__.py:36} INFO - Using executor SequentialExecutor
[2017-01-18 23:58:07,342] {models.py:154} INFO - Filling up the DagBag from /Users/daniellamblin/airflow/dags
[2017-01-18 23:58:07,620] {models.py:1196} INFO - 
--------------------------------------------------------------------------------
Starting attempt 1 of 1
--------------------------------------------------------------------------------

[2017-01-18 23:58:07,620] {models.py:1219} INFO - Executing <Task(PythonOperator): test_python_operator_template> on 2017-01-18 00:00:00
'2017-01-18'
{   u'END_DATE': '2017-01-18',
    u'conf': <module 'airflow.configuration' from '/Library/Python/2.7/site-packages/airflow/configuration.pyc'>,
    u'dag': <DAG: test_python_operator_template_dag>,
    u'dag_run': None,
    u'ds_nodash': u'20170118',
    u'end_date': '2017-01-18',
    u'execution_date': datetime.datetime(2017, 1, 18, 0, 0),
    u'latest_date': '2017-01-18',
    u'macros': <module 'airflow.macros' from '/Library/Python/2.7/site-packages/airflow/macros/__init__.pyc'>,
    u'params': {   'condition_param': True,
                   'message': 'Hello World',
                   'table': 'TEMP_TABLE'},
    u'run_id': None,
    u'tables': None,
    u'task': <Task(PythonOperator): test_python_operator_template>,
    u'task_instance': <TaskInstance: test_python_operator_template_dag.test_python_operator_template 2017-01-18 00:00:00 [running]>,
    u'task_instance_key_str': u'test_python_operator_template_dag__test_python_operator_template__20170118',
    'templates_dict': {   'file1': u'pyoptemplate.sql',
                          'file2': u'pyoptemplate.t',
                          'pyoptemplate': u'',
                          'pyoptemplate.sql': u'',
                          'sql': u'pyoptemplate',
                          'table': u'TEMP_TABLE'},
    u'test_mode': True,
    u'ti': <TaskInstance: test_python_operator_template_dag.test_python_operator_template 2017-01-18 00:00:00 [running]>,
    u'tomorrow_ds': '2017-01-19',
    u'tomorrow_ds_nodash': u'20170119',
    u'ts': '2017-01-18T00:00:00',
    u'ts_nodash': u'20170118T000000',
    u'yesterday_ds': '2017-01-17',
    u'yesterday_ds_nodash': u'20170117'}
[2017-01-18 23:58:07,634] {python_operator.py:67} INFO - Done. Returned value was: None

采纳答案by Ardan

As of Airflow 1.8, the way the PythonOperator replaces its template_extfield in __init__doesn't work. Tasks only check template_exton the __class__. To create a PythonOperator that picks up SQL template files you only need to do the following:

从 Airflow 1.8 开始,PythonOperator 替换其template_ext字段的方式__init__不起作用。任务只检查template_ext__class__。要创建一个提取 SQL 模板文件的 PythonOperator,您只需要执行以下操作:

class SQLTemplatedPythonOperator(PythonOperator):
    template_ext = ('.sql',)

And then to access the SQL from your task when it runs:

然后在运行时从您的任务访问 SQL:

SQLTemplatedPythonOperator(
    templates_dict={'query': 'my_template.sql'},
    params={'my_var': 'my_value'},
    python_callable=my_func,
    provide_context=True,
)

def my_func(**context):
    context['templates_dict']['query']

回答by P. Xie

Recently I came across the same issue and finally solved it. @Ardan 's solution is correct but just want to repeat with a more complete answer with some details in how Airflow works for the newcomers.

最近我遇到了同样的问题,终于解决了。@Ardan 的解决方案是正确的,但只是想用更完整的答案重复一遍,其中包含有关 Airflow 如何为新人工作的一些细节。

Of course you first need one of this:

当然,您首先需要其中一个:

from airflow.operators.python_operator import PythonOperator

class SQLTemplatedPythonOperator(PythonOperator):

    # somehow ('.sql',) doesn't work but tuple of two works...
    template_ext = ('.sql','.abcdefg')

Assuming you have a sql template file like below:

假设您有一个如下所示的 sql 模板文件:

# stored at path: $AIRFLOW_HOME/sql/some.sql
select {{some_params}} from my_table;

First make sure you add your folder to the search path in your dag params.

首先确保将文件夹添加到 dag 参数的搜索路径中。

Do not pass template_searchpath to args and then pass args to DAG!!!! It doesn't work.

不要将 template_searchpath 传递给 args,然后将 args 传递给 DAG!!!!它不起作用。

dag = DAG(
    dag_id= "some_name",
    default_args=args,
    schedule_interval="@once",
    template_searchpath='/Users/your_name/some_path/airflow_home/sql'
)

Then your operator call will be

然后您的接线员呼叫将是

SQLTemplatedPythonOperator(
        templates_dict={'query': 'some.sql'},
        op_kwargs={"args_directly_passed_to_your_function": "some_value"},
        task_id='dummy',
        params={"some_params":"some_value"},
        python_callable=your_func,
        provide_context=True,
        dag=dag,
    )

Your function will be:

您的功能将是:

def your_func(args_directly_passed_to_your_function=None):
    query = context['templates_dict']['query']
    dome_some_thing(query)

Some explanations:

一些解释:

  1. Airflow uses values from the context to render your template. To manually add it to the context, you can use the params field like above.

  2. PythonOperator does not take template file extension from the template_ext field any more like @Ardan mentioned. The source code is here. It only takes extension from self.__class__.template_ext.

  3. Airflow loops through the template_dict field and if value.endswith(file_extension) == True, then it renders the template.

  1. Airflow 使用上下文中的值来呈现您的模板。要将其手动添加到上下文中,您可以使用上面的 params 字段。

  2. PythonOperator 不再像提到的@Ardan 那样从 template_ext 字段中获取模板文件扩展名。源代码在这里。它只需要从self.__class__.template_ext扩展。

  3. Airflow 循环遍历 template_dict 字段,如果 value.endswith(file_extension) == True,则渲染模板。

回答by Will Fitzgerald

I don't think this is really possible. But the following workaround might be helpful:

我不认为这真的可能。但以下解决方法可能会有所帮助:

def templated_function(ds, **kwargs):
    kwargs['ds'] = ds                                # put ds into 'context'
    task = kwargs['task']                            # get handle on task
    templ = open(kwargs['templates_dict']['file1']).read() # get template
    sql = task.render_template('', tmpl, kwargs)           # render it
    pp.pprint(sql)

Would love a better solution, though!

不过,会喜欢更好的解决方案!

回答by Saurabh Mishra

Unable to get a script file templated in python to work (new to python). But an example with bash operator is following, maybe that can give you some hints

无法使在 python 中模板化的脚本文件工作(python 新手)。但是下面是一个带有 bash 运算符的示例,也许可以给您一些提示

from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    #'start_date': airflow.utils.dates.days_ago(2),
    'email': ['[email protected]']}

dag = DAG('sr5', description='Simple tutorial DAG',
          schedule_interval='0 12 * * *',
          start_date=datetime(2017, 3, 20),
          catchup=False, #so that on scehduler restart, it doesn't try to catchup on all the missed runs
          template_searchpath=['/Users/my_name/Desktop/utils/airflow/resources'])

t1 = BashOperator(
    task_id='t1',
    depends_on_past=False,
    params={
        'ds1': 'hie'},
    bash_command="01.sh",
    dag=dag)

the 01.sh script looks like follows

01.sh 脚本如下所示

#!/bin/sh

echo {{ ds }}
echo {{ params.ds1 }}

This give an output as follows on test execution

这在测试执行时给出如下输出

[2017-05-12 08:31:52,981] {bash_operator.py:91} INFO - Output:

[2017-05-12 08:31:52,984] {bash_operator.py:95} INFO - 2017-05-05

[2017-05-12 08:31:52,984] {bash_operator.py:95} INFO - hie

[2017-05-12 08:31:52,981] {bash_operator.py:91} 信息 - 输出:

[2017-05-12 08:31:52,984] {bash_operator.py:95} 信息 - 2017-05-05

[2017-05-12 08:31:52,984] {bash_operator.py:95} 信息 - 嘿