如何通过 Python Boto3 将数据加载到 Amazon Redshift?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/34983043/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-19 15:49:55  来源:igfitidea点击:

How to Load Data into Amazon Redshift via Python Boto3?

pythonamazon-web-servicesamazon-s3amazon-redshiftboto3

提问by Chris

In Amazon Redshift's Getting Started Guide, data is pulled from Amazon S3 and loaded into an Amazon Redshift Cluster utilizing SQLWorkbench/J. I'd like to mimic the same process of connecting to the cluster and loading sample data into the cluster utilizing Boto3.

在 Amazon Redshift 的入门指南 中,数据从 Amazon S3 中提取并使用 SQLWorkbench/J 加载到 Amazon Redshift 集群中。我想模拟使用Boto3连接到集群并将示例数据加载到集群中的相同过程。

However in Boto3's documentationof Redshift, I'm unable to find a method that would allow me to upload data into Amazon Redshift cluster.

但是,在Boto3 的Redshift文档中,我找不到一种方法可以让我将数据上传到 Amazon Redshift 集群。

I've been able to connect with Redshift utilizing Boto3 with the following code:

我已经能够通过以下代码使用 Boto3 与 Redshift 连接:

client = boto3.client('redshift')

But I'm not sure what method would allow me to either create tables or upload data to Amazon Redshift the way it's done in the tutorial with SQLWorkbenchJ.

但我不确定哪种方法可以让我创建表或将数据上传到 Amazon Redshift,就像在教程中使用 SQLWorkbenchJ完成的那样

采纳答案by Mark B

Go back to step 4 in that tutorial you linked. See where it shows you how to get the URL of the cluster? You have to connect to that URL with a PostgreSQL driver. The AWS SDKs such as Boto3 provide access to the AWS API. You need to connect to Redshift over a PostgreSQL API, just like you would connect to a PostgreSQL database on RDS.

返回到您链接的教程中的第 4 步。看看它在哪里向您展示了如何获取集群的 URL?您必须使用 PostgreSQL 驱动程序连接到该 URL。Boto3 等 AWS 开发工具包提供对 AWS API 的访问。您需要通过 PostgreSQL API 连接到 Redshift,就像连接到 RDS 上的 PostgreSQL 数据库一样。

回答by Alex B

Right, you need psycopg2Python module to execute COPY command.

对,你需要psycopg2Python 模块来执行 COPY 命令。

My code looks like this:

我的代码如下所示:

import psycopg2
#Amazon Redshift connect string 
conn_string = "dbname='***' port='5439' user='***' password='***' host='mycluster.***.redshift.amazonaws.com'"  
#connect to Redshift (database should be open to the world)
con = psycopg2.connect(conn_string);
sql="""COPY %s FROM '%s' credentials 
      'aws_access_key_id=%s; aws_secret_access_key=%s'
       delimiter '%s' FORMAT CSV %s %s; commit;""" % 
      (to_table, fn, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY,delim,quote,gzip)

#Here
#  fn - s3://path_to__input_file.gz
#  gzip = 'gzip'

cur = con.cursor()
cur.execute(sql)
con.close() 

I used boto3/psycopg2 to write CSV_Loader_For_Redshift

我用 boto3/psycopg2 写CSV_Loader_For_Redshift

回答by Golokesh Patra

Using psycopyg2 & get_cluster_credentials

使用 psycopyg2 和 get_cluster_credentials

Prerequisites -

先决条件 -

  • IAM ROLE attached to respective User

    IAM Role with get_cluster_credentials policy LINK

  • On cloud (EC2) with appropriate IAM Role attached

  • 附加到相应用户的 IAM 角色

    具有 get_cluster_credentials 策略的 IAM 角色LINK

  • 在附加了适当 IAM 角色的云 (EC2) 上

The below code will work only if you deploying it on a PC/VM where a user's AWS Credentials are already configured [ CLI - aws configure ] OR you are on an instance in the same Account,VPC.

以下代码仅在您将其部署在已配置用户 AWS 凭证的 PC/VM 上 [CLI - aws configure] 或您位于同一账户、VPC 中的实例上时才有效。

  1. Have a config.ini file -

    [Redshift]
    
    port = 3389
    
    username = please_enter_username
    
    database_name = please_database-name
    
    cluster_id = please_enter_cluster_id_name
    
    url = please_enter_cluster_endpoint_url
    
    region = us-west-2
    
  2. My Redshift_connection.py

    import logging
    
    import psycopg2
    
    import boto3
    
    import ConfigParser
    
    
    def db_connection():
       logger = logging.getLogger(__name__)
    
       parser = ConfigParser.ConfigParser()
    
       parser.read('config.ini')
    
       RS_PORT = parser.get('Redshift','port')
    
       RS_USER = parser.get('Redshift','username')
    
       DATABASE = parser.get('Redshift','database_name')
    
       CLUSTER_ID = parser.get('Redshift','cluster_id')
    
       RS_HOST = parser.get('Redshift','url')
    
       REGION_NAME = parser.get('Redshift','region')
    
       client = boto3.client('redshift',region_name=REGION_NAME)
    
       cluster_creds = client.get_cluster_credentials(DbUser=RS_USER,
                                                   DbName=DATABASE,
                                                   ClusterIdentifier=CLUSTER_ID,
                                                   AutoCreate=False)
    
    try:
      conn = psycopg2.connect(
        host=RS_HOST,
        port=RS_PORT,
        user=cluster_creds['DbUser'],
        password=cluster_creds['DbPassword'],
        database=DATABASE
      )
    
      return conn
    except psycopg2.Error:
      logger.exception('Failed to open database connection.')
      print "Failed"
    
  3. Query Execution script -

    from Redshift_Connection import db_connection
    
    def executescript(redshift_cursor):
        query = "SELECT * FROM <SCHEMA_NAME>.<TABLENAME>"
        cur=redshift_cursor
        cur.execute(query)
    

    conn = db_connection() conn.set_session(autocommit=False) cursor = conn.cursor() executescript(cursor) conn.close()

  1. 有一个 config.ini 文件 -

    [Redshift]
    
    port = 3389
    
    username = please_enter_username
    
    database_name = please_database-name
    
    cluster_id = please_enter_cluster_id_name
    
    url = please_enter_cluster_endpoint_url
    
    region = us-west-2
    
  2. 我的 Redshift_connection.py

    import logging
    
    import psycopg2
    
    import boto3
    
    import ConfigParser
    
    
    def db_connection():
       logger = logging.getLogger(__name__)
    
       parser = ConfigParser.ConfigParser()
    
       parser.read('config.ini')
    
       RS_PORT = parser.get('Redshift','port')
    
       RS_USER = parser.get('Redshift','username')
    
       DATABASE = parser.get('Redshift','database_name')
    
       CLUSTER_ID = parser.get('Redshift','cluster_id')
    
       RS_HOST = parser.get('Redshift','url')
    
       REGION_NAME = parser.get('Redshift','region')
    
       client = boto3.client('redshift',region_name=REGION_NAME)
    
       cluster_creds = client.get_cluster_credentials(DbUser=RS_USER,
                                                   DbName=DATABASE,
                                                   ClusterIdentifier=CLUSTER_ID,
                                                   AutoCreate=False)
    
    try:
      conn = psycopg2.connect(
        host=RS_HOST,
        port=RS_PORT,
        user=cluster_creds['DbUser'],
        password=cluster_creds['DbPassword'],
        database=DATABASE
      )
    
      return conn
    except psycopg2.Error:
      logger.exception('Failed to open database connection.')
      print "Failed"
    
  3. 查询执行脚本 -

    from Redshift_Connection import db_connection
    
    def executescript(redshift_cursor):
        query = "SELECT * FROM <SCHEMA_NAME>.<TABLENAME>"
        cur=redshift_cursor
        cur.execute(query)
    

    conn = db_connection() conn.set_session(autocommit=False) cursor = conn.cursor() executescript(cursor) conn.close()