Bigquery - 通过python将新数据行插入表中
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/36673456/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Bigquery - Insert new data row into table by python
提问by Luis Kang
I read many documents about google bigquery-python, but I can't understand how to manage bigquery data by python code.
我阅读了很多关于 google bigquery-python 的文档,但我无法理解如何通过 python 代码管理 bigquery 数据。
At first, I make a new table as below.
首先,我创建了一个新表,如下所示。
credentials = GoogleCredentials.get_application_default()
service = build('bigquery', 'v2', credentials = credentials)
project_id = 'my_project'
dataset_id = 'my_dataset'
table_id = 'my_table'
project_ref = {'projectId': project_id}
dataset_ref = {'datasetId': dataset_id,
'projectId': project_id}
table_ref = {'tableId': table_id,
'datasetId': dataset_id,
'projectId': project_id}
dataset = {'datasetReference': dataset_ref}
table = {'tableReference': table_ref}
table['schema'] = {'fields': [
{'name': 'id', 'type': 'string'},
...
]}
table = service.tables().insert(body = table, **dataset_ref).execute()
And then I want to insert a data into this table, so I tried to do like below.
然后我想在这个表中插入一个数据,所以我试着像下面那样做。
fetch_list = []
patch = {'key': 'value'}
fetch_list.append(patch)
table = service.tables().patch(body = fetch_list, **table_ref).execute()
But nothing happened.
但什么也没发生。
How can I update new data into bigquery table?
如何将新数据更新到 bigquery 表中?
Please show me some example codes.
请给我看一些示例代码。
回答by Willian Fuks
EDIT Nov 2018:
2018 年 11 月编辑:
The answer of this question is outdated already as the google cloud client has evolved considerably since this last post.
这个问题的答案已经过时了,因为自上一篇文章以来,谷歌云客户端已经有了很大的发展。
The official docs contains all information needed already; hereyou can find everything needed for streaming insert and this onehas a complete overview of all methods available so far (you'll also find Python code examples on each page and each method).
官方文档已经包含了所有需要的信息;在这里,您可以找到流式插入所需的一切,并且此文对目前可用的所有方法进行了完整概述(您还将在每个页面和每个方法中找到 Python 代码示例)。
Original Answer:
原答案:
There are a few different ways that you can use to insert data to BQ.
您可以使用几种不同的方法将数据插入到 BQ。
For a deeper understanding of how the python-api works, here's everything you'll need: bq-python-api(at first the docs are somewhat scary but after you get a hang of it it's rather quite simple).
为了更深入地了解 python-api 的工作原理,这里是你需要的一切:bq-python-api(起初文档有点吓人,但在你掌握了它之后就相当简单了)。
There are 2 main methods that I use to insert data to BQ. The first one is data streamingand it's supposed to be used when you can insert row by row in a real time fashion. Code example:
我使用两种主要方法将数据插入到 BQ。第一个是数据流,它应该在您可以以实时方式逐行插入时使用。代码示例:
import uuid
def stream_data(self, table, data, schema):
# first checks if table already exists. If it doesn't, then create it
r = self.service.tables().list(projectId=your_project_id,
datasetId=your_dataset_id).execute()
table_exists = [row['tableReference']['tableId'] for row in
r['tables'] if
row['tableReference']['tableId'] == table]
if not table_exists:
body = {
'tableReference': {
'tableId': table,
'projectId': your_project_id,
'datasetId': your_dataset_id
},
'schema': schema
}
self.service.tables().insert(projectId=your_project_id,
datasetId=your_dataset_id,
body=body).execute()
# with table created, now we can stream the data
# to do so we'll use the tabledata().insertall() function.
body = {
'rows': [
{
'json': data,
'insertId': str(uuid.uuid4())
}
]
}
self.service.tabledata().insertAll(projectId=your_project_id),
datasetId=your_dataset_id,
tableId=table,
body=body).execute(num_retries=5)
Here my self.service
is correspondent to your service
object.
这里 myself.service
对应于你的service
对象。
An example of input data
that we have in our project:
data
我们在项目中的输入示例:
data = {u'days_validated': '20', u'days_trained': '80', u'navigated_score': '1', u'description': 'First trial of top seller alg. No filter nor any condition is applied. Skus not present in train count as rank=0.5', u'init_cv_date': '2016-03-06', u'metric': 'rank', u'unix_date': '1461610020241117', u'purchased_score': '10', u'result': '0.32677139316724546', u'date': '2016-04-25', u'carted_score': '3', u'end_cv_date': '2016-03-25'}
And its correspondent schema
:
及其通讯员schema
:
schema = {u'fields': [{u'type': u'STRING', u'name': u'date', u'mode': u'NULLABLE'}, {u'type': u'INTEGER', u'name': u'unix_date', u'mode': u'NULLABLE'}, {u'type': u'STRING', u'name': u'init_cv_date', u'mode': u'NULLABLE'}, {u'type': u'STRING', u'name': u'end_cv_date', u'mode': u'NULLABLE'}, {u'type': u'INTEGER', u'name': u'days_trained', u'mode': u'NULLABLE'}, {u'type': u'INTEGER', u'name': u'days_validated', u'mode': u'NULLABLE'}, {u'type': u'INTEGER', u'name': u'navigated_score', u'mode': u'NULLABLE'}, {u'type': u'INTEGER', u'name': u'carted_score', u'mode': u'NULLABLE'}, {u'type': u'INTEGER', u'name': u'purchased_score', u'mode': u'NULLABLE'}, {u'type': u'STRING', u'name': u'description', u'mode': u'NULLABLE'}, {u'type': u'STRING', u'name': u'metric', u'mode': u'NULLABLE'}, {u'type': u'FLOAT', u'name': u'result', u'mode': u'NULLABLE'}]}
The other way to insert data is to use the job insertfunction. As you can see in the documentation, it accepts several sources for your data. I have an example of how you can do so by loading the results of a query into another table:
另一种插入数据的方法是使用作业插入功能。正如您在文档中看到的,它接受多种数据来源。我有一个示例,说明如何将查询结果加载到另一个表中:
def create_table_from_query(self,
query,
dest_table,
how):
body = {
'configuration': {
'query': {
'destinationTable': {
'projectId': your_project_id,
'tableId': dest_table,
'datasetId': your_dataset_id
},
'writeDisposition': how,
'query': query,
},
}
}
response = self.connector.jobs().insert(projectId=self._project_id,
body=body).execute()
self.wait_job_completion(response['jobReference']['jobId'])
def wait_job_completion(self, job_id):
while True:
response = self.connector.jobs().get(projectId=self._project_id,
jobId=job_id).execute()
if response['status']['state'] == 'DONE':
return
The how
input accepts the available options for this field in the documentation (such as "WRITE_TRUNCATE", or "WRITE_APPEND").
所述how
输入接受文档在此字段中可用的选项(如“WRITE_TRUNCATE”或“WRITE_APPEND”)。
You can load the data from a csv file for instance, in this case, the configuration
variable would be defined something along the lines:
例如,您可以从 csv 文件加载数据,在这种情况下,configuration
变量将按照以下方式定义:
"configuration": {
"load": {
"fieldDelimiter": "\t"
"sourceFormat": "CSV"
"destinationTable": {
"projectId": your_project_id,
"tableId": table_id,
"datasetId": your_dataset_id
},
"writeDisposition": "WRITE_TRUNCATE"
"schema": schema,
"sourceUris": file_location_in_google_cloud_storage
},
}
(Using as example a csv file delimited by tabs. It could be a json file as well, the documentation will walk you through the available options).
(以一个由制表符分隔的 csv 文件为例。它也可以是一个 json 文件,文档将引导您完成可用的选项)。
Running jobs() require some time for it to complete (that's why we created the wait_job_completion
method). It should be cheaper though as compared to real time streaming.
运行 jobs() 需要一些时间才能完成(这就是我们创建该wait_job_completion
方法的原因)。与实时流媒体相比,它应该更便宜。
Any questions let us know,
有任何问题请告诉我们,