bash 气流将参数传递给依赖任务

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/38574151/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-09-08 22:21:46  来源:igfitidea点击:

Airflow pass parameters to dependent task

bashairflow

提问by Carleto

What is the way to pass parameter into dependent tasks in Airflow? I have a lot of bashes files, and i'm trying to migrate this approach to airflow, but i don't know how to pass some properties between tasks.

在Airflow中将参数传递给依赖任务的方法是什么?我有很多 bashes 文件,我正在尝试将这种方法迁移到气流,但我不知道如何在任务之间传递一些属性。

This is a real example:

这是一个真实的例子:

#sqoop bash template
sqoop_template = """
        sqoop job --exec {{params.job}} -- --target-dir {{params.dir}} --outdir /src/
    """

s3_template = """
        s3-dist-cp --src= {{params.dir}} "--dest={{params.s3}}
    """



#Task of extraction in EMR
t1 = BashOperator(
        task_id='extract_account', 
        bash_command=sqoop_template, 
        params={'job': 'job', 'dir': 'hdfs:///account/' + time.now().strftime("%Y-%m-%d-%H-%M-%S")},
        dag=dag)
#Task to upload in s3 backup.
t2 = BashOperator(
        task_id='s3_upload',
        bash_command=s3_template,
        params={}, #here i need the dir name created in t1
        depends_on_past=True
    )

t2.set_upstream(t1)

In t2 i need to access the dir name created in t1.

在 t2 中,我需要访问在 t1 中创建的目录名称。

Solution

解决方案

#Execute a valid job sqoop
def sqoop_import(table_name, job_name):
    s3, hdfs = dirpath(table_name)
    sqoop_job = job_default_config(job_name, hdfs)
    #call(sqoop_job)
    return {'hdfs_dir': hdfs, 's3_dir': s3}

def s3_upload(**context):
    hdfs = context['task_instance'].xcom_pull(task_ids='sqoop_import')['hdfs_dir']
    s3 = context['task_instance'].xcom_pull(task_ids='sqoop_import')['s3_dir']
    s3_cpdist_job = ["s3-dist-cp", "--src=%s" % (hdfs), "--dest=%s" % (s3)]
    #call(s3_cpdist_job)
    return {'s3_dir': s3} #context['task_instance'].xcom_pull(task_ids='sqoop_import')

def sns_notify(**context):
    s3 = context['task_instance'].xcom_pull(task_ids='distcp_s3')['s3_dir']
    client = boto3.client('sns')
    arn = 'arn:aws:sns:us-east-1:744617668409:pipeline-notification-stg'
    response = client.publish(TargetArn=arn, Message=s3)
    return response

That's not is the definitive solution, so improvements are welcome. Thanks.

这不是最终的解决方案,因此欢迎改进。谢谢。

采纳答案by Vineet Goel

Check out XComs - http://airflow.incubator.apache.org/concepts.html#xcoms. These are used for communicating state between tasks.

查看 XComs - http://airflow.incubator.apache.org/concepts.html#xcoms。这些用于在任务之间通信状态。