Python Airflow:如何通过 SSH 和从不同的服务器运行 BashOperator

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/39457592/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-19 22:17:48  来源:igfitidea点击:

Airflow: How to SSH and run BashOperator from a different server

pythonsshairflow

提问by CMPE

Is there a way to ssh to different server and run BashOperator using Airbnb's Airflow? I am trying to run a hive sql command with Airflow but I need to SSH to a different box in order to run the hive shell. My tasks should look like this:

有没有办法通过 ssh 连接到不同的服务器并使用 Airbnb 的 Airflow 运行 BashOperator?我正在尝试使用 Airflow 运行 hive sql 命令,但我需要通过 SSH 连接到不同的框才能运行 hive shell。我的任务应该是这样的:

  1. SSH to server1
  2. start Hive shell
  3. run Hive command
  1. SSH 到服务器 1
  2. 启动 Hive 外壳
  3. 运行 Hive 命令

Thanks!

谢谢!

回答by CMPE

I think that I just figured it out:

我想我只是想通了:

  1. Create a SSH connection in UI under Admin > Connection. Note: the connection will be deleted if you reset the database

  2. In the Python file add the following

    from airflow.contrib.hooks import SSHHook
    sshHook = SSHHook(conn_id=<YOUR CONNECTION ID FROM THE UI>)
    
  3. Add the SSH operator task

    t1 = SSHExecuteOperator(
        task_id="task1",
        bash_command=<YOUR COMMAND>,
        ssh_hook=sshHook,
        dag=dag)
    
  1. 在 Admin > Connection 下的 UI 中创建 SSH 连接。注意:如果您重置数据库,连接将被删除

  2. 在 Python 文件中添加以下内容

    from airflow.contrib.hooks import SSHHook
    sshHook = SSHHook(conn_id=<YOUR CONNECTION ID FROM THE UI>)
    
  3. 添加SSH操作员任务

    t1 = SSHExecuteOperator(
        task_id="task1",
        bash_command=<YOUR COMMAND>,
        ssh_hook=sshHook,
        dag=dag)
    

Thanks!

谢谢!

回答by politeauthority

One thing to note with Anton's answer is that the argument is actually ssh_conn_id, not conn_idfor the SSHOperatorobject. At least in version 1.10.

Anton 的回答要注意的一件事是,论点实际上是ssh_conn_id,而不是conn_id针对SSHOperator对象。至少在 1.10 版中。

A quick example would look like

一个简单的例子看起来像

from datetime import timedelta, datetime
import airflow
from airflow import DAG
from airflow.contrib.operators.ssh_operator import SSHOperator
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'start_date': datetime.now() - timedelta(minutes=20),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}
dag = DAG(dag_id='testing_stuff',
          default_args=default_args,
          schedule_interval='0,10,20,30,40,50 * * * *',
          dagrun_timeout=timedelta(seconds=120))
# Step 1 - Dump data from postgres databases
t1_bash = """
echo 'Hello World'
"""
t1 = SSHOperator(
    ssh_conn_id='ssh_default',
    task_id='test_ssh_operator',
    command=t1_bash,
    dag=dag)