pandas 如何使用 Boto3 get_query_results 方法从 AWS Athena 创建数据帧
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/52026405/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to Create Dataframe from AWS Athena using Boto3 get_query_results method
提问by Niv Cohen
I'm using AWS Athena to query raw data from S3. Since Athena writes the query output into S3 output bucket I used to do:
我正在使用 AWS Athena 从 S3 查询原始数据。由于 Athena 将查询输出写入 S3 输出存储桶,我曾经这样做:
df = pd.read_csv(OutputLocation)
But this seems like an expensive way. Recently I noticed the get_query_results
method of boto3
which returns a complex dictionary of the results.
但这似乎是一种昂贵的方式。最近我注意到它的get_query_results
方法boto3
返回一个复杂的结果字典。
client = boto3.client('athena')
response = client.get_query_results(
QueryExecutionId=res['QueryExecutionId']
)
I'm facing two main issues:
我面临两个主要问题:
- How can I format the results of
get_query_results
intopandas
data frame? get_query_results
only returns 1000 rows. How can I use it to get two million rows?
- 如何将结果格式化
get_query_results
为pandas
数据框? get_query_results
只返回 1000 行。我怎样才能用它来获得两百万行?
回答by Eric Bellet
get_query_results only returns 1000 rows. How can I use it to get two million rows into a Pandas dataframe?
get_query_results 仅返回 1000 行。我如何使用它来将 200 万行放入 Pandas 数据框中?
If you try to add:
如果您尝试添加:
client.get_query_results(QueryExecutionId=res['QueryExecutionId'], MaxResults=2000)
You will obtain the next error:
您将获得下一个错误:
An error occurred (InvalidRequestException) when calling the GetQueryResults operation: MaxResults is more than maximum allowed length 1000.
调用 GetQueryResults 操作时发生错误 (InvalidRequestException):MaxResults 超过最大允许长度 1000。
You can obtain millions of rows if you obtain the file directly from your bucket s3 (in the next example into a Pandas Dataframe):
如果直接从存储桶 s3 中获取文件,则可以获得数百万行(在下一个示例中为 Pandas 数据帧):
def obtain_data_from_s3(self):
self.resource = boto3.resource('s3',
region_name = self.region_name,
aws_access_key_id = self.aws_access_key_id,
aws_secret_access_key= self.aws_secret_access_key)
response = self.resource \
.Bucket(self.bucket) \
.Object(key= self.folder + self.filename + '.csv') \
.get()
return pd.read_csv(io.BytesIO(response['Body'].read()), encoding='utf8')
The self.filename can be:
self.filename 可以是:
self.filename = response['QueryExecutionId'] + ".csv"
Because Athena names the files as the QueryExecutionId. I will write you all my code that takes a query and return a dataframe with all the rows and columns.
因为 Athena 将文件命名为 QueryExecutionId。我将为您编写所有用于查询并返回包含所有行和列的数据框的代码。
import time
import boto3
import pandas as pd
import io
class QueryAthena:
def __init__(self, query, database):
self.database = database
self.folder = 'my_folder/'
self.bucket = 'my_bucket'
self.s3_input = 's3://' + self.bucket + '/my_folder_input'
self.s3_output = 's3://' + self.bucket + '/' + self.folder
self.region_name = 'us-east-1'
self.aws_access_key_id = "my_aws_access_key_id"
self.aws_secret_access_key = "my_aws_secret_access_key"
self.query = query
def load_conf(self, q):
try:
self.client = boto3.client('athena',
region_name = self.region_name,
aws_access_key_id = self.aws_access_key_id,
aws_secret_access_key= self.aws_secret_access_key)
response = self.client.start_query_execution(
QueryString = q,
QueryExecutionContext={
'Database': self.database
},
ResultConfiguration={
'OutputLocation': self.s3_output,
}
)
self.filename = response['QueryExecutionId']
print('Execution ID: ' + response['QueryExecutionId'])
except Exception as e:
print(e)
return response
def run_query(self):
queries = [self.query]
for q in queries:
res = self.load_conf(q)
try:
query_status = None
while query_status == 'QUEUED' or query_status == 'RUNNING' or query_status is None:
query_status = self.client.get_query_execution(QueryExecutionId=res["QueryExecutionId"])['QueryExecution']['Status']['State']
print(query_status)
if query_status == 'FAILED' or query_status == 'CANCELLED':
raise Exception('Athena query with the string "{}" failed or was cancelled'.format(self.query))
time.sleep(10)
print('Query "{}" finished.'.format(self.query))
df = self.obtain_data()
return df
except Exception as e:
print(e)
def obtain_data(self):
try:
self.resource = boto3.resource('s3',
region_name = self.region_name,
aws_access_key_id = self.aws_access_key_id,
aws_secret_access_key= self.aws_secret_access_key)
response = self.resource \
.Bucket(self.bucket) \
.Object(key= self.folder + self.filename + '.csv') \
.get()
return pd.read_csv(io.BytesIO(response['Body'].read()), encoding='utf8')
except Exception as e:
print(e)
if __name__ == "__main__":
query = "SELECT * FROM bucket.folder"
qa = QueryAthena(query=query, database='myAthenaDb')
dataframe = qa.run_query()
回答by Niv Cohen
I have a solution for my first question, using the following function
我有我的第一个问题的解决方案,使用以下功能
def results_to_df(results):
columns = [
col['Label']
for col in results['ResultSet']['ResultSetMetadata']['ColumnInfo']
]
listed_results = []
for res in results['ResultSet']['Rows'][1:]:
values = []
for field in res['Data']:
try:
values.append(list(field.values())[0])
except:
values.append(list(' '))
listed_results.append(
dict(zip(columns, values))
)
return listed_results
and then:
进而:
t = results_to_df(response)
pd.DataFrame(t)
As for my 2nd question and to the request of @EricBellet I'm also adding my approach for pagination which I find as inefficient and longer in compare to loading the results from Athena output in S3:
至于我的第二个问题和@EricBellet 的要求,我还添加了我的分页方法,与从 S3 中的 Athena 输出加载结果相比,我发现它效率低下且时间更长:
def run_query(query, database, s3_output):
'''
Function for executing Athena queries and return the query ID
'''
client = boto3.client('athena')
response = client.start_query_execution(
QueryString=query,
QueryExecutionContext={
'Database': database
},
ResultConfiguration={
'OutputLocation': s3_output,
}
)
print('Execution ID: ' + response['QueryExecutionId'])
return response
def format_result(results):
'''
This function format the results toward append in the needed format.
'''
columns = [
col['Label']
for col in results['ResultSet']['ResultSetMetadata']['ColumnInfo']
]
formatted_results = []
for result in results['ResultSet']['Rows'][0:]:
values = []
for field in result['Data']:
try:
values.append(list(field.values())[0])
except:
values.append(list(' '))
formatted_results.append(
dict(zip(columns, values))
)
return formatted_results
res = run_query(query_2, database, s3_ouput) #query Athena
import sys
import boto3
marker = None
formatted_results = []
query_id = res['QueryExecutionId']
i = 0
start_time = time.time()
while True:
paginator = client.get_paginator('get_query_results')
response_iterator = paginator.paginate(
QueryExecutionId=query_id,
PaginationConfig={
'MaxItems': 1000,
'PageSize': 1000,
'StartingToken': marker})
for page in response_iterator:
i = i + 1
format_page = format_result(page)
if i == 1:
formatted_results = pd.DataFrame(format_page)
elif i > 1:
formatted_results = formatted_results.append(pd.DataFrame(format_page))
try:
marker = page['NextToken']
except KeyError:
break
print ("My program took", time.time() - start_time, "to run")
It's not formatted so good but I think it does the job...
它的格式不是很好,但我认为它可以完成工作......
回答by Hammad Usmani
A very simple solution is to use a list comprehension with the boto3 Athena paginator. The list comprehension can then be simply passed into the pd.DataFrame()
to create a DataFrame as such,
一个非常简单的解决方案是对 boto3 Athena 分页器使用列表推导式。然后可以简单地将列表理解传递到 中pd.DataFrame()
以创建一个 DataFrame,
pd.DataFrame([[data.get('VarCharValue') for data in row['Data']] for row in
results['ResultSet']['Rows']])
Boto3 Athena to Pandas DataFrame
Boto3 Athena 到 Pandas DataFrame
import pandas as pd
import boto3
result = get_query_results( . . . ) # your code here
def cleanQueryResult(result) :
'''
This will take the dictionary of the raw Boto3 Athena results and turn it into a
2D array for further processing
Parameters
----------
result dict
The dictionary from the boto3 Athena client function get_query_results
Returns
-------
list(list())
2D list which is essentially the table result. The first row is the column name.
'''
return [[data.get('VarCharValue') for data in row['Data']]
for row in result['ResultSet']['Rows']]
# note that row 1 is the header
df = pd.DataFrame(cleanQueryResult(result))
Millions of Results
数以百万计的结果
This requires a the paginator object, https://boto3.amazonaws.com/v1/documentation/api/1.9.42/reference/services/athena.html#paginators
这需要分页器对象,https://boto3.amazonaws.com/v1/documentation/api/1.9.42/reference/services/athena.html#paginators
As a hint, here's how you can append after each page
作为提示,这里是如何在每个页面后附加
df.append(pd.DataFrame(cleanQueryResult(next_page), ignore_index = True))