如何在 PostgreSQL 中 UPSERT (MERGE, INSERT ... ON DUPLICATE UPDATE)?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/17267417/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-09-11 00:14:37  来源:igfitidea点击:

How to UPSERT (MERGE, INSERT ... ON DUPLICATE UPDATE) in PostgreSQL?

postgresqlinsert-updateupsertsql-merge

提问by Craig Ringer

A very frequently asked question here is how to do an upsert, which is what MySQL calls INSERT ... ON DUPLICATE UPDATEand the standard supports as part of the MERGEoperation.

这里一个非常常见的问题是如何进行 upsert,这是 MySQL 调用INSERT ... ON DUPLICATE UPDATE和标准支持的MERGE操作的一部分。

Given that PostgreSQL doesn't support it directly (before pg 9.5), how do you do this? Consider the following:

鉴于 PostgreSQL 不直接支持它(在 pg 9.5 之前),你如何做到这一点?考虑以下:

CREATE TABLE testtable (
    id integer PRIMARY KEY,
    somedata text NOT NULL
);

INSERT INTO testtable (id, somedata) VALUES
(1, 'fred'),
(2, 'bob');

Now imagine that you want to "upsert" the tuples (2, 'Joe'), (3, 'Alan'), so the new table contents would be:

现在想象你想“更新”元组(2, 'Joe'), (3, 'Alan'),所以新的表格内容是:

(1, 'fred'),
(2, 'Joe'),    -- Changed value of existing tuple
(3, 'Alan')    -- Added new tuple

That's what people are talking about when discussing an upsert. Crucially, any approach must be safe in the presence of multiple transactions working on the same table- either by using explicit locking, or otherwise defending against the resulting race conditions.

这就是人们在讨论upsert. 至关重要的是,在存在多个事务处理同一个表的情况下,任何方法都必须是安全的——要么使用显式锁定,要么以其他方式防御由此产生的竞争条件。

This topic is discussed extensively at Insert, on duplicate update in PostgreSQL?, but that's about alternatives to the MySQL syntax, and it's grown a fair bit of unrelated detail over time. I'm working on definitive answers.

这个话题在Insert 上广泛讨论,关于 PostgreSQL 中的重复更新?,但那是关于 MySQL 语法的替代品,随着时间的推移,它增加了一些不相关的细节。我正在寻找明确的答案。

These techniques are also useful for "insert if not exists, otherwise do nothing", i.e. "insert ... on duplicate key ignore".

这些技术也可用于“如果不存在则插入,否则什么都不做”,即“在重复键忽略时插入...”。

回答by Craig Ringer

9.5 and newer:

9.5 及更新版本:

PostgreSQL 9.5 and newer support INSERT ... ON CONFLICT UPDATE(and ON CONFLICT DO NOTHING), i.e. upsert.

PostgreSQL 9.5 和更新的支持INSERT ... ON CONFLICT UPDATE(和ON CONFLICT DO NOTHING),即 upsert。

Comparison with ON DUPLICATE KEY UPDATE.

与 的比较ON DUPLICATE KEY UPDATE

Quick explanation.

快速解释

For usage see the manual- specifically the conflict_actionclause in the syntax diagram, and the explanatory text.

有关用法,请参阅手册- 特别是语法图中的conflict_action子句和解释性文本

Unlike the solutions for 9.4 and older that are given below, this feature works with multiple conflicting rows and it doesn't require exclusive locking or a retry loop.

与下面给出的 9.4 及更早版本的解决方案不同,此功能适用于多个冲突行,并且不需要排他锁定或重试循环。

The commit adding the feature is hereand the discussion around its development is here.

添加功能的提交在这里关于它的开发的讨论在这里



If you're on 9.5 and don't need to be backward-compatible you can stop reading now.

如果您使用的是 9.5 并且不需要向后兼容,您现在可以停止阅读



9.4 and older:

9.4 及以上:

PostgreSQL doesn't have any built-in UPSERT(or MERGE) facility, and doing it efficiently in the face of concurrent use is very difficult.

PostgreSQL 没有任何内置UPSERT(或MERGE)工具,在并发使用的情况下高效地做到这一点是非常困难的。

This article discusses the problem in useful detail.

本文详细讨论了这个问题

In general you must choose between two options:

通常,您必须在两个选项之间进行选择:

  • Individual insert/update operations in a retry loop; or
  • Locking the table and doing batch merge
  • 重试循环中的单个插入/更新操作;或者
  • 锁定表并进行批量合并

Individual row retry loop

单行重试循环

Using individual row upserts in a retry loop is the reasonable option if you want many connections concurrently trying to perform inserts.

如果您希望多个连接同时尝试执行插入,则在重试循环中使用单个行更新插入是合理的选择。

The PostgreSQL documentation contains a useful procedure that'll let you do this in a loop inside the database. It guards against lost updates and insert races, unlike most naive solutions. It will only work in READ COMMITTEDmode and is only safe if it's the only thing you do in the transaction, though. The function won't work correctly if triggers or secondary unique keys cause unique violations.

PostgreSQL 文档包含一个有用的过程,可让您在数据库内的循环中执行此操作。与大多数幼稚的解决方案不同,它可以防止丢失更新和插入竞争。不过,它只能在READ COMMITTED模式下工作,并且只有当它是您在交易中唯一要做的事情时才是安全的。如果触发器或辅助唯一键导致唯一违规,该功能将无法正常工作。

This strategy is very inefficient. Whenever practical you should queue up work and do a bulk upsert as described below instead.

这种策略非常低效。只要可行,您应该将工作排队并进行如下所述的批量 upsert。

Many attempted solutions to this problem fail to consider rollbacks, so they result in incomplete updates. Two transactions race with each other; one of them successfully INSERTs; the other gets a duplicate key error and does an UPDATEinstead. The UPDATEblocks waiting for the INSERTto rollback or commit. When it rolls back, the UPDATEcondition re-check matches zero rows, so even though the UPDATEcommits it hasn't actually done the upsert you expected. You have to check the result row counts and re-try where necessary.

许多尝试解决此问题的方法都没有考虑回滚,因此会导致更新不完整。两笔交易相互竞争;其中一个成功了INSERT;另一个收到重复的密钥错误并UPDATE改为执行。UPDATE等待INSERT回滚或提交的块。当它回滚时,UPDATE条件重新检查匹配零行,因此即使UPDATE提交它实际上并没有完成您期望的更新插入。您必须检查结果行计数并在必要时重试。

Some attempted solutions also fail to consider SELECT races. If you try the obvious and simple:

一些尝试的解决方案也没有考虑 SELECT 比赛。如果您尝试显而易见且简单的方法:

-- THIS IS WRONG. DO NOT COPY IT. It's an EXAMPLE.

BEGIN;

UPDATE testtable
SET somedata = 'blah'
WHERE id = 2;

-- Remember, this is WRONG. Do NOT COPY IT.

INSERT INTO testtable (id, somedata)
SELECT 2, 'blah'
WHERE NOT EXISTS (SELECT 1 FROM testtable WHERE testtable.id = 2);

COMMIT;

then when two run at once there are several failure modes. One is the already discussed issue with an update re-check. Another is where both UPDATEat the same time, matching zero rows and continuing. Then they both do the EXISTStest, which happens beforethe INSERT. Both get zero rows, so both do the INSERT. One fails with a duplicate key error.

那么当两个同时运行时,会出现多种故障模式。一个是已经讨论过的更新重新检查问题。另一个是两者UPDATE同时匹配零行并继续。然后,他们都做EXISTS测试,这恰好之前INSERT。两者都得到零行,因此两者都执行INSERT. 一个因重复密钥错误而失败。

This is why you need a re-try loop. You might think that you can prevent duplicate key errors or lost updates with clever SQL, but you can't. You need to check row counts or handle duplicate key errors (depending on the chosen approach) and re-try.

这就是您需要重试循环的原因。您可能认为使用巧妙的 SQL 可以防止重复键错误或丢失更新,但您不能。您需要检查行数或处理重复的键错误(取决于选择的方法)并重试。

Please don't roll your own solution for this. Like with message queuing, it's probably wrong.

请不要为此推出您自己的解决方案。就像消息队列一样,它可能是错误的。

Bulk upsert with lock

带锁的批量插入

Sometimes you want to do a bulk upsert, where you have a new data set that you want to merge into an older existing data set. This is vastlymore efficient than individual row upserts and should be preferred whenever practical.

有时您想要进行批量 upsert,其中您有一个新数据集,您希望将其合并到旧的现有数据集中。这大大超过各行upserts更高效,更应是首选,只要实用。

In this case, you typically follow the following process:

在这种情况下,您通常遵循以下过程:

  • CREATEa TEMPORARYtable

  • COPYor bulk-insert the new data into the temp table

  • LOCKthe target table IN EXCLUSIVE MODE. This permits other transactions to SELECT, but not make any changes to the table.

  • Do an UPDATE ... FROMof existing records using the values in the temp table;

  • Do an INSERTof rows that don't already exist in the target table;

  • COMMIT, releasing the lock.

  • CREATE一张TEMPORARY桌子

  • COPY或将新数据批量插入临时表

  • LOCK目标表IN EXCLUSIVE MODE。这允许其他事务执行SELECT,但不能对表进行任何更改。

  • UPDATE ... FROM使用临时表中的值执行现有记录;

  • 执行INSERT目标表中尚不存在的行;

  • COMMIT,解除锁定。

For example, for the example given in the question, using multi-valued INSERTto populate the temp table:

例如,对于问题中给出的示例,使用多值INSERT填充临时表:

BEGIN;

CREATE TEMPORARY TABLE newvals(id integer, somedata text);

INSERT INTO newvals(id, somedata) VALUES (2, 'Joe'), (3, 'Alan');

LOCK TABLE testtable IN EXCLUSIVE MODE;

UPDATE testtable
SET somedata = newvals.somedata
FROM newvals
WHERE newvals.id = testtable.id;

INSERT INTO testtable
SELECT newvals.id, newvals.somedata
FROM newvals
LEFT OUTER JOIN testtable ON (testtable.id = newvals.id)
WHERE testtable.id IS NULL;

COMMIT;

Related reading

相关阅读

What about MERGE?

怎么样MERGE

SQL-standard MERGEactually has poorly defined concurrency semantics and is not suitable for upserting without locking a table first.

SQL 标准MERGE实际上定义了很差的并发语义,不适合在没有先锁定表的情况下进行更新。

It's a really useful OLAP statement for data merging, but it's not actually a useful solution for concurrency-safe upsert. There's lots of advice to people using other DBMSes to use MERGEfor upserts, but it's actually wrong.

这是一个非常有用的数据合并 OLAP 语句,但它实际上不是并发安全 upsert 的有用解决方案。对于使用其他 DBMSMERGE进行更新插入的人,有很多建议,但实际上是错误的。

Other DBs:

其他数据库:

回答by Renzo

I am trying to contribute with another solution for the single insertion problem with the pre-9.5 versions of PostgreSQL. The idea is simply to try to perform first the insertion, and in case the record is already present, to update it:

我正在尝试为 PostgreSQL 9.5 之前版本的单个插入问题提供另一种解决方案。这个想法只是尝试首先执行插入,如果记录已经存在,则更新它:

do $$
begin 
  insert into testtable(id, somedata) values(2,'Joe');
exception when unique_violation then
  update testtable set somedata = 'Joe' where id = 2;
end $$;

Note that this solution can be applied only if there are no deletions of rows of the table.

请注意,此解决方案适用于没有删除表行的情况

I do not know about the efficiency of this solution, but it seems to me reasonable enough.

我不知道这个解决方案的效率,但在我看来它足够合理。

回答by Eric Wang

Here are some examples for insert ... on conflict ...(pg 9.5+) :

以下是insert ... on conflict ...第 9.5+ 页)的一些示例:

  • Insert, on conflict - do nothing.
    insert into dummy(id, name, size) values(1, 'new_name', 3)
    on conflict do nothing;`  
    
  • Insert, on conflict - do update, specify conflict target via column.
    insert into dummy(id, name, size) values(1, 'new_name', 3)
    on conflict(id)
    do update set name = 'new_name', size = 3;  
    
  • Insert, on conflict - do update, specify conflict target via constraint name.
    insert into dummy(id, name, size) values(1, 'new_name', 3)
    on conflict on constraint dummy_pkey
    do update set name = 'new_name', size = 4;
    
  • 插入,发生冲突 -什么都不做
    insert into dummy(id, name, size) values(1, 'new_name', 3)
    on conflict do nothing;`  
    
  • 插入,在发生冲突时执行更新,通过指定冲突目标。
    insert into dummy(id, name, size) values(1, 'new_name', 3)
    on conflict(id)
    do update set name = 'new_name', size = 3;  
    
  • 插入,在发生冲突时执行更新,通过约束名称指定冲突目标。
    insert into dummy(id, name, size) values(1, 'new_name', 3)
    on conflict on constraint dummy_pkey
    do update set name = 'new_name', size = 4;
    

回答by P.R.

SQLAlchemy upsert for Postgres >=9.5

用于 Postgres >=9.5 的 SQLAlchemy upsert

Since the large post above covers many different SQL approaches for Postgres versions (not only non-9.5 as in the question), I would like to add how to do it in SQLAlchemy if you are using Postgres 9.5. Instead of implementing your own upsert, you can also use SQLAlchemy's functions (which were added in SQLAlchemy 1.1). Personally, I would recommend using these, if possible. Not only because of convenience, but also because it lets PostgreSQL handle any race conditions that might occur.

由于上面的大帖子涵盖了 Postgres 版本的许多不同 SQL 方法(不仅是问题中的非 9.5),如果您使用的是 Postgres 9.5,我想添加如何在 SQLAlchemy 中执行此操作。除了实现自己的 upsert,您还可以使用 SQLAlchemy 的函数(在 SQLAlchemy 1.1 中添加)。就个人而言,如果可能的话,我建议使用这些。不仅是因为方便,还因为它可以让 PostgreSQL 处理任何可能发生的竞争条件。

Cross-posting from another answer I gave yesterday (https://stackoverflow.com/a/44395983/2156909)

从我昨天给出的另一个答案交叉发布(https://stackoverflow.com/a/44395983/2156909

SQLAlchemy supports ON CONFLICTnow with two methods on_conflict_do_update()and on_conflict_do_nothing():

SQLAlchemyON CONFLICT现在支持两种方法on_conflict_do_update()on_conflict_do_nothing()

Copying from the documentation:

从文档中复制:

from sqlalchemy.dialects.postgresql import insert

stmt = insert(my_table).values(user_email='[email protected]', data='inserted data')
stmt = stmt.on_conflict_do_update(
    index_elements=[my_table.c.user_email],
    index_where=my_table.c.user_email.like('%@gmail.com'),
    set_=dict(data=stmt.excluded.data)
    )
conn.execute(stmt)

http://docs.sqlalchemy.org/en/latest/dialects/postgresql.html?highlight=conflict#insert-on-conflict-upsert

http://docs.sqlalchemy.org/en/latest/dialects/postgresql.html?highlight=conflict#insert-on-conflict-upsert

回答by aristar

WITH UPD AS (UPDATE TEST_TABLE SET SOME_DATA = 'Joe' WHERE ID = 2 
RETURNING ID),
INS AS (SELECT '2', 'Joe' WHERE NOT EXISTS (SELECT * FROM UPD))
INSERT INTO TEST_TABLE(ID, SOME_DATA) SELECT * FROM INS

Tested on Postgresql 9.3

在 Postgresql 9.3 上测试

回答by reubano

Since this questionwas closed, I'm posting here for how you do it using SQLAlchemy. Via recursion, it retries a bulk insert or update to combat race conditionsand validation errors.

由于此问题已关闭,因此我将在此处发布您如何使用 SQLAlchemy 进行操作。通过递归,它重试批量插入或更新以解决竞争条件和验证错误。

First the imports

首先是进口

import itertools as it

from functools import partial
from operator import itemgetter

from sqlalchemy.exc import IntegrityError
from app import session
from models import Posts

Now a couple helper functions

现在有几个辅助函数

def chunk(content, chunksize=None):
    """Groups data into chunks each with (at most) `chunksize` items.
    https://stackoverflow.com/a/22919323/408556
    """
    if chunksize:
        i = iter(content)
        generator = (list(it.islice(i, chunksize)) for _ in it.count())
    else:
        generator = iter([content])

    return it.takewhile(bool, generator)


def gen_resources(records):
    """Yields a dictionary if the record's id already exists, a row object 
    otherwise.
    """
    ids = {item[0] for item in session.query(Posts.id)}

    for record in records:
        is_row = hasattr(record, 'to_dict')

        if is_row and record.id in ids:
            # It's a row but the id already exists, so we need to convert it 
            # to a dict that updates the existing record. Since it is duplicate,
            # also yield True
            yield record.to_dict(), True
        elif is_row:
            # It's a row and the id doesn't exist, so no conversion needed. 
            # Since it's not a duplicate, also yield False
            yield record, False
        elif record['id'] in ids:
            # It's a dict and the id already exists, so no conversion needed. 
            # Since it is duplicate, also yield True
            yield record, True
        else:
            # It's a dict and the id doesn't exist, so we need to convert it. 
            # Since it's not a duplicate, also yield False
            yield Posts(**record), False

And finally the upsert function

最后是 upsert 函数

def upsert(data, chunksize=None):
    for records in chunk(data, chunksize):
        resources = gen_resources(records)
        sorted_resources = sorted(resources, key=itemgetter(1))

        for dupe, group in it.groupby(sorted_resources, itemgetter(1)):
            items = [g[0] for g in group]

            if dupe:
                _upsert = partial(session.bulk_update_mappings, Posts)
            else:
                _upsert = session.add_all

            try:
                _upsert(items)
                session.commit()
            except IntegrityError:
                # A record was added or deleted after we checked, so retry
                # 
                # modify accordingly by adding additional exceptions, e.g.,
                # except (IntegrityError, ValidationError, ValueError)
                db.session.rollback()
                upsert(items)
            except Exception as e:
                # Some other error occurred so reduce chunksize to isolate the 
                # offending row(s)
                db.session.rollback()
                num_items = len(items)

                if num_items > 1:
                    upsert(items, num_items // 2)
                else:
                    print('Error adding record {}'.format(items[0]))

Here's how you use it

这是你如何使用它

>>> data = [
...     {'id': 1, 'text': 'updated post1'}, 
...     {'id': 5, 'text': 'updated post5'}, 
...     {'id': 1000, 'text': 'new post1000'}]
... 
>>> upsert(data)

The advantage this has over bulk_save_objectsis that it can handle relationships, error checking, etc on insert (unlike bulk operations).

这样做的优势bulk_save_objects在于它可以处理插入时的关系、错误检查等(与批量操作不同)。