Bigquery - 通过python将新数据行插入表中

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/36673456/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-19 18:12:42  来源:igfitidea点击:

Bigquery - Insert new data row into table by python

pythongoogle-bigquery

提问by Luis Kang

I read many documents about google bigquery-python, but I can't understand how to manage bigquery data by python code.

我阅读了很多关于 google bigquery-python 的文档,但我无法理解如何通过 python 代码管理 bigquery 数据。

At first, I make a new table as below.

首先,我创建了一个新表,如下所示。

credentials = GoogleCredentials.get_application_default()
service = build('bigquery', 'v2', credentials = credentials)

project_id = 'my_project'
dataset_id = 'my_dataset'
table_id = 'my_table'

project_ref = {'projectId': project_id}
dataset_ref = {'datasetId': dataset_id,
               'projectId': project_id}
table_ref = {'tableId': table_id,
             'datasetId': dataset_id,
             'projectId': project_id}

dataset = {'datasetReference': dataset_ref}
table = {'tableReference': table_ref}
table['schema'] = {'fields': [
    {'name': 'id', 'type': 'string'},
...
]}

table = service.tables().insert(body = table, **dataset_ref).execute()

And then I want to insert a data into this table, so I tried to do like below.

然后我想在这个表中插入一个数据,所以我试着像下面那样做。

fetch_list = []
patch = {'key': 'value'}
fetch_list.append(patch)

table = service.tables().patch(body = fetch_list, **table_ref).execute()

But nothing happened.

但什么也没发生。

How can I update new data into bigquery table?

如何将新数据更新到 bigquery 表中?

Please show me some example codes.

请给我看一些示例代码。

回答by Willian Fuks

EDIT Nov 2018:

2018 年 11 月编辑:

The answer of this question is outdated already as the google cloud client has evolved considerably since this last post.

这个问题的答案已经过时了,因为自上一篇文章以来,谷歌云客户端已经有了很大的发展。

The official docs contains all information needed already; hereyou can find everything needed for streaming insert and this onehas a complete overview of all methods available so far (you'll also find Python code examples on each page and each method).

官方文档已经包含了所有需要的信息;在这里,您可以找到流式插入所需的一切,并且此文对目前可用的所有方法进行了完整概述(您还将在每个页面和每个方法中找到 Python 代码示例)。

Original Answer:

原答案:

There are a few different ways that you can use to insert data to BQ.

您可以使用几种不同的方法将数据插入到 BQ。

For a deeper understanding of how the python-api works, here's everything you'll need: bq-python-api(at first the docs are somewhat scary but after you get a hang of it it's rather quite simple).

为了更深入地了解 python-api 的工作原理,这里是你需要的一切:bq-python-api(起初文档有点吓人,但在你掌握了它之后就相当简单了)。

There are 2 main methods that I use to insert data to BQ. The first one is data streamingand it's supposed to be used when you can insert row by row in a real time fashion. Code example:

我使用两种主要方法将数据插入到 BQ。第一个是数据流,它应该在您可以以实时方式逐行插入时使用。代码示例:

import uuid
def stream_data(self, table, data, schema):
    # first checks if table already exists. If it doesn't, then create it
    r = self.service.tables().list(projectId=your_project_id,
                                     datasetId=your_dataset_id).execute()
    table_exists = [row['tableReference']['tableId'] for row in
                    r['tables'] if
                    row['tableReference']['tableId'] == table]
    if not table_exists:
        body = {
            'tableReference': {
                'tableId': table,
                'projectId': your_project_id,
                'datasetId': your_dataset_id
            },
            'schema': schema
        }
        self.service.tables().insert(projectId=your_project_id,
                                     datasetId=your_dataset_id,
                                     body=body).execute()

    # with table created, now we can stream the data
    # to do so we'll use the tabledata().insertall() function.
    body = {
        'rows': [
            {
                'json': data,
                'insertId': str(uuid.uuid4())
            }
        ]
    }
    self.service.tabledata().insertAll(projectId=your_project_id),
                                       datasetId=your_dataset_id,
                                       tableId=table,
                                         body=body).execute(num_retries=5)

Here my self.serviceis correspondent to your serviceobject.

这里 myself.service对应于你的service对象。

An example of input datathat we have in our project:

data我们在项目中的输入示例:

data = {u'days_validated': '20', u'days_trained': '80', u'navigated_score': '1', u'description': 'First trial of top seller alg. No filter nor any condition is applied. Skus not present in train count as rank=0.5', u'init_cv_date': '2016-03-06', u'metric': 'rank', u'unix_date': '1461610020241117', u'purchased_score': '10', u'result': '0.32677139316724546', u'date': '2016-04-25', u'carted_score': '3', u'end_cv_date': '2016-03-25'}

And its correspondent schema:

及其通讯员schema

schema = {u'fields': [{u'type': u'STRING', u'name': u'date', u'mode': u'NULLABLE'}, {u'type': u'INTEGER', u'name': u'unix_date', u'mode': u'NULLABLE'}, {u'type': u'STRING', u'name': u'init_cv_date', u'mode': u'NULLABLE'}, {u'type': u'STRING', u'name': u'end_cv_date', u'mode': u'NULLABLE'}, {u'type': u'INTEGER', u'name': u'days_trained', u'mode': u'NULLABLE'}, {u'type': u'INTEGER', u'name': u'days_validated', u'mode': u'NULLABLE'}, {u'type': u'INTEGER', u'name': u'navigated_score', u'mode': u'NULLABLE'}, {u'type': u'INTEGER', u'name': u'carted_score', u'mode': u'NULLABLE'}, {u'type': u'INTEGER', u'name': u'purchased_score', u'mode': u'NULLABLE'}, {u'type': u'STRING', u'name': u'description', u'mode': u'NULLABLE'}, {u'type': u'STRING', u'name': u'metric', u'mode': u'NULLABLE'}, {u'type': u'FLOAT', u'name': u'result', u'mode': u'NULLABLE'}]}

The other way to insert data is to use the job insertfunction. As you can see in the documentation, it accepts several sources for your data. I have an example of how you can do so by loading the results of a query into another table:

另一种插入数据的方法是使用作业插入功能。正如您在文档中看到的,它接受多种数据来源。我有一个示例,说明如何将查询结果加载到另一个表中:

def create_table_from_query(self,
                            query,
                            dest_table,
                            how):
    body = {
        'configuration': {
            'query': {
                'destinationTable': {
                    'projectId': your_project_id,
                    'tableId': dest_table,
                    'datasetId': your_dataset_id
                },
                'writeDisposition': how,
                'query': query,
            },
        }
    }

    response = self.connector.jobs().insert(projectId=self._project_id,
                                            body=body).execute()
    self.wait_job_completion(response['jobReference']['jobId'])

def wait_job_completion(self, job_id):
    while True:
        response = self.connector.jobs().get(projectId=self._project_id,
                                             jobId=job_id).execute()
        if response['status']['state'] == 'DONE':
            return

The howinput accepts the available options for this field in the documentation (such as "WRITE_TRUNCATE", or "WRITE_APPEND").

所述how输入接受文档在此字段中可用的选项(如“WRITE_TRUNCATE”或“WRITE_APPEND”)。

You can load the data from a csv file for instance, in this case, the configurationvariable would be defined something along the lines:

例如,您可以从 csv 文件加载数据,在这种情况下,configuration变量将按照以下方式定义:

"configuration": {
  "load": {
    "fieldDelimiter": "\t"
    "sourceFormat": "CSV"
    "destinationTable": {
      "projectId": your_project_id,
      "tableId": table_id,
      "datasetId": your_dataset_id
    },
    "writeDisposition": "WRITE_TRUNCATE"
    "schema": schema,
    "sourceUris": file_location_in_google_cloud_storage
  },
}

(Using as example a csv file delimited by tabs. It could be a json file as well, the documentation will walk you through the available options).

(以一个由制表符分隔的 csv 文件为例。它也可以是一个 json 文件,文档将引导您完成可用的选项)。

Running jobs() require some time for it to complete (that's why we created the wait_job_completionmethod). It should be cheaper though as compared to real time streaming.

运行 jobs() 需要一些时间才能完成(这就是我们创建该wait_job_completion方法的原因)。与实时流媒体相比,它应该更便宜。

Any questions let us know,

有任何问题请告诉我们,