Python 使用 SQLAlchemy ORM 批量插入
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/3659142/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Bulk insert with SQLAlchemy ORM
提问by Nick Holden
Is there any way to get SQLAlchemy to do a bulk insert rather than inserting each individual object. i.e.,
有什么方法可以让 SQLAlchemy 进行批量插入而不是插入每个单独的对象。IE,
doing:
正在做:
INSERT INTO `foo` (`bar`) VALUES (1), (2), (3)
rather than:
而不是:
INSERT INTO `foo` (`bar`) VALUES (1)
INSERT INTO `foo` (`bar`) VALUES (2)
INSERT INTO `foo` (`bar`) VALUES (3)
I've just converted some code to use sqlalchemy rather than raw sql and although it is now much nicer to work with it seems to be slower now (up to a factor of 10), I'm wondering if this is the reason.
我刚刚将一些代码转换为使用 sqlalchemy 而不是原始 sql,虽然现在使用它更好,但现在似乎更慢(最多 10 倍),我想知道这是否是原因。
May be I could improve the situation using sessions more efficiently. At the moment I have autoCommit=Falseand do a session.commit()after I've added some stuff. Although this seems to cause the data to go stale if the DB is changed elsewhere, like even if I do a new query I still get old results back?
也许我可以更有效地使用会话来改善这种情况。目前我已经添加了一些东西autoCommit=False并做了一个session.commit()。尽管如果在其他地方更改数据库,这似乎会导致数据过时,例如即使我执行新查询,我仍然会得到旧结果?
Thanks for your help!
谢谢你的帮助!
回答by dhaffey
As far as I know, there is no way to get the ORM to issue bulk inserts. I believe the underlying reason is that SQLAlchemy needs to keep track of each object's identity (i.e., new primary keys), and bulk inserts interfere with that. For example, assuming your footable contains an idcolumn and is mapped to a Fooclass:
据我所知,没有办法让 ORM 发出批量插入。我相信根本原因是 SQLAlchemy 需要跟踪每个对象的身份(即新的主键),而批量插入会干扰这一点。例如,假设您的foo表包含一id列并映射到一个Foo类:
x = Foo(bar=1)
print x.id
# None
session.add(x)
session.flush()
# BEGIN
# INSERT INTO foo (bar) VALUES(1)
# COMMIT
print x.id
# 1
Since SQLAlchemy picked up the value for x.idwithout issuing another query, we can infer that it got the value directly from the INSERTstatement. If you don't need subsequent access to the created objects via the sameinstances, you can skip the ORM layer for your insert:
由于 SQLAlchemy 在x.id没有发出另一个查询的情况下获取了值,我们可以推断它直接从INSERT语句中获取了值。如果您不需要通过相同的实例对创建的对象进行后续访问,您可以跳过插入的 ORM 层:
Foo.__table__.insert().execute([{'bar': 1}, {'bar': 2}, {'bar': 3}])
# INSERT INTO foo (bar) VALUES ((1,), (2,), (3,))
SQLAlchemy can't match these new rows with any existing objects, so you'll have to query them anew for any subsequent operations.
SQLAlchemy 无法将这些新行与任何现有对象匹配,因此您必须重新查询它们以进行任何后续操作。
As far as stale data is concerned, it's helpful to remember that the session has no built-in way to know when the database is changed outside of the session. In order to access externally modified data through existing instances, the instances must be marked as expired. This happens by default on session.commit(), but can be done manually by calling session.expire_all()or session.expire(instance). An example (SQL omitted):
就陈旧数据而言,记住会话没有内置方法可以知道何时在会话之外更改数据库是有帮助的。为了通过现有实例访问外部修改的数据,必须将实例标记为expired。这默认发生在session.commit(),但可以通过调用session.expire_all()或手动完成session.expire(instance)。一个例子(SQL省略):
x = Foo(bar=1)
session.add(x)
session.commit()
print x.bar
# 1
foo.update().execute(bar=42)
print x.bar
# 1
session.expire(x)
print x.bar
# 42
session.commit()expires x, so the first print statement implicitly opens a new transaction and re-queries x's attributes. If you comment out the first print statement, you'll notice that the second one now picks up the correct value, because the new query isn't emitted until after the update.
session.commit()expires x,因此第一个 print 语句隐式地打开一个新事务并重新查询x的属性。如果您注释掉第一个打印语句,您会注意到第二个现在选择了正确的值,因为直到更新之后才会发出新查询。
This makes sense from the point of view of transactional isolation - you should only pick up external modifications between transactions. If this is causing you trouble, I'd suggest clarifying or re-thinking your application's transaction boundaries instead of immediately reaching for session.expire_all().
从事务隔离的角度来看,这是有道理的 - 您应该只在事务之间进行外部修改。如果这给您带来了麻烦,我建议您澄清或重新考虑应用程序的事务边界,而不是立即访问session.expire_all().
回答by user3805082
Direct support was added to SQLAlchemy as of version 0.8
从 0.8 版开始,对 SQLAlchemy 添加了直接支持
As per the docs, connection.execute(table.insert().values(data))should do the trick. (Note that this is notthe same as connection.execute(table.insert(), data)which results in many individual row inserts via a call to executemany). On anything but a local connection the difference in performance can be enormous.
根据文档,connection.execute(table.insert().values(data))应该可以解决问题。(请注意,这是不一样的connection.execute(table.insert(), data)通过将呼叫这导致许多个别行插入executemany)。除了本地连接之外的任何东西,性能差异都可能是巨大的。
回答by Eefret
回答by Pierre
SQLAlchemy introduced that in version 1.0.0:
SQLAlchemy 在版本中介绍了这一点1.0.0:
Bulk operations - SQLAlchemy docs
With these operations, you can now do bulk inserts or updates!
通过这些操作,您现在可以进行批量插入或更新!
For instance, you can do:
例如,您可以执行以下操作:
s = Session()
objects = [
User(name="u1"),
User(name="u2"),
User(name="u3")
]
s.bulk_save_objects(objects)
s.commit()
Here, a bulk insert will be made.
在这里,将进行批量插入。
回答by Grant Humphries
The sqlalchemy docs have a writeupon the performance of various techniques that can be used for bulk inserts:
sqlalchemy 文档有一篇关于可用于批量插入的各种技术的性能的文章:
ORMs are basically not intended for high-performance bulk inserts - this is the whole reason SQLAlchemy offers the Core in addition to the ORM as a first-class component.
For the use case of fast bulk inserts, the SQL generation and execution system that the ORM builds on top of is part of the Core. Using this system directly, we can produce an INSERT that is competitive with using the raw database API directly.
Alternatively, the SQLAlchemy ORM offers the Bulk Operations suite of methods, which provide hooks into subsections of the unit of work process in order to emit Core-level INSERT and UPDATE constructs with a small degree of ORM-based automation.
The example below illustrates time-based tests for several different methods of inserting rows, going from the most automated to the least. With cPython 2.7, runtimes observed:
classics-MacBook-Pro:sqlalchemy classic$ python test.py SQLAlchemy ORM: Total time for 100000 records 12.0471920967 secs SQLAlchemy ORM pk given: Total time for 100000 records 7.06283402443 secs SQLAlchemy ORM bulk_save_objects(): Total time for 100000 records 0.856323003769 secs SQLAlchemy Core: Total time for 100000 records 0.485800027847 secs sqlite3: Total time for 100000 records 0.487842082977 secScript:
import time import sqlite3 from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, create_engine from sqlalchemy.orm import scoped_session, sessionmaker Base = declarative_base() DBSession = scoped_session(sessionmaker()) engine = None class Customer(Base): __tablename__ = "customer" id = Column(Integer, primary_key=True) name = Column(String(255)) def init_sqlalchemy(dbname='sqlite:///sqlalchemy.db'): global engine engine = create_engine(dbname, echo=False) DBSession.remove() DBSession.configure(bind=engine, autoflush=False, expire_on_commit=False) Base.metadata.drop_all(engine) Base.metadata.create_all(engine) def test_sqlalchemy_orm(n=100000): init_sqlalchemy() t0 = time.time() for i in xrange(n): customer = Customer() customer.name = 'NAME ' + str(i) DBSession.add(customer) if i % 1000 == 0: DBSession.flush() DBSession.commit() print( "SQLAlchemy ORM: Total time for " + str(n) + " records " + str(time.time() - t0) + " secs") def test_sqlalchemy_orm_pk_given(n=100000): init_sqlalchemy() t0 = time.time() for i in xrange(n): customer = Customer(id=i+1, name="NAME " + str(i)) DBSession.add(customer) if i % 1000 == 0: DBSession.flush() DBSession.commit() print( "SQLAlchemy ORM pk given: Total time for " + str(n) + " records " + str(time.time() - t0) + " secs") def test_sqlalchemy_orm_bulk_insert(n=100000): init_sqlalchemy() t0 = time.time() n1 = n while n1 > 0: n1 = n1 - 10000 DBSession.bulk_insert_mappings( Customer, [ dict(name="NAME " + str(i)) for i in xrange(min(10000, n1)) ] ) DBSession.commit() print( "SQLAlchemy ORM bulk_save_objects(): Total time for " + str(n) + " records " + str(time.time() - t0) + " secs") def test_sqlalchemy_core(n=100000): init_sqlalchemy() t0 = time.time() engine.execute( Customer.__table__.insert(), [{"name": 'NAME ' + str(i)} for i in xrange(n)] ) print( "SQLAlchemy Core: Total time for " + str(n) + " records " + str(time.time() - t0) + " secs") def init_sqlite3(dbname): conn = sqlite3.connect(dbname) c = conn.cursor() c.execute("DROP TABLE IF EXISTS customer") c.execute( "CREATE TABLE customer (id INTEGER NOT NULL, " "name VARCHAR(255), PRIMARY KEY(id))") conn.commit() return conn def test_sqlite3(n=100000, dbname='sqlite3.db'): conn = init_sqlite3(dbname) c = conn.cursor() t0 = time.time() for i in xrange(n): row = ('NAME ' + str(i),) c.execute("INSERT INTO customer (name) VALUES (?)", row) conn.commit() print( "sqlite3: Total time for " + str(n) + " records " + str(time.time() - t0) + " sec") if __name__ == '__main__': test_sqlalchemy_orm(100000) test_sqlalchemy_orm_pk_given(100000) test_sqlalchemy_orm_bulk_insert(100000) test_sqlalchemy_core(100000) test_sqlite3(100000)
ORM 基本上不用于高性能批量插入——这就是 SQLAlchemy 提供 Core 以及 ORM 作为一流组件的全部原因。
对于快速批量插入的用例,ORM 构建在其之上的 SQL 生成和执行系统是 Core 的一部分。直接使用这个系统,我们可以生成一个可以与直接使用原始数据库 API 竞争的 INSERT。
或者,SQLAlchemy ORM 提供了批量操作方法套件,它提供了工作过程单元子部分的挂钩,以便以基于 ORM 的小程度自动化发出核心级 INSERT 和 UPDATE 构造。
下面的示例说明了几种不同的插入行方法的基于时间的测试,从最自动化到最不自动化。使用 cPython 2.7,运行时观察到:
classics-MacBook-Pro:sqlalchemy classic$ python test.py SQLAlchemy ORM: Total time for 100000 records 12.0471920967 secs SQLAlchemy ORM pk given: Total time for 100000 records 7.06283402443 secs SQLAlchemy ORM bulk_save_objects(): Total time for 100000 records 0.856323003769 secs SQLAlchemy Core: Total time for 100000 records 0.485800027847 secs sqlite3: Total time for 100000 records 0.487842082977 sec脚本:
import time import sqlite3 from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, create_engine from sqlalchemy.orm import scoped_session, sessionmaker Base = declarative_base() DBSession = scoped_session(sessionmaker()) engine = None class Customer(Base): __tablename__ = "customer" id = Column(Integer, primary_key=True) name = Column(String(255)) def init_sqlalchemy(dbname='sqlite:///sqlalchemy.db'): global engine engine = create_engine(dbname, echo=False) DBSession.remove() DBSession.configure(bind=engine, autoflush=False, expire_on_commit=False) Base.metadata.drop_all(engine) Base.metadata.create_all(engine) def test_sqlalchemy_orm(n=100000): init_sqlalchemy() t0 = time.time() for i in xrange(n): customer = Customer() customer.name = 'NAME ' + str(i) DBSession.add(customer) if i % 1000 == 0: DBSession.flush() DBSession.commit() print( "SQLAlchemy ORM: Total time for " + str(n) + " records " + str(time.time() - t0) + " secs") def test_sqlalchemy_orm_pk_given(n=100000): init_sqlalchemy() t0 = time.time() for i in xrange(n): customer = Customer(id=i+1, name="NAME " + str(i)) DBSession.add(customer) if i % 1000 == 0: DBSession.flush() DBSession.commit() print( "SQLAlchemy ORM pk given: Total time for " + str(n) + " records " + str(time.time() - t0) + " secs") def test_sqlalchemy_orm_bulk_insert(n=100000): init_sqlalchemy() t0 = time.time() n1 = n while n1 > 0: n1 = n1 - 10000 DBSession.bulk_insert_mappings( Customer, [ dict(name="NAME " + str(i)) for i in xrange(min(10000, n1)) ] ) DBSession.commit() print( "SQLAlchemy ORM bulk_save_objects(): Total time for " + str(n) + " records " + str(time.time() - t0) + " secs") def test_sqlalchemy_core(n=100000): init_sqlalchemy() t0 = time.time() engine.execute( Customer.__table__.insert(), [{"name": 'NAME ' + str(i)} for i in xrange(n)] ) print( "SQLAlchemy Core: Total time for " + str(n) + " records " + str(time.time() - t0) + " secs") def init_sqlite3(dbname): conn = sqlite3.connect(dbname) c = conn.cursor() c.execute("DROP TABLE IF EXISTS customer") c.execute( "CREATE TABLE customer (id INTEGER NOT NULL, " "name VARCHAR(255), PRIMARY KEY(id))") conn.commit() return conn def test_sqlite3(n=100000, dbname='sqlite3.db'): conn = init_sqlite3(dbname) c = conn.cursor() t0 = time.time() for i in xrange(n): row = ('NAME ' + str(i),) c.execute("INSERT INTO customer (name) VALUES (?)", row) conn.commit() print( "sqlite3: Total time for " + str(n) + " records " + str(time.time() - t0) + " sec") if __name__ == '__main__': test_sqlalchemy_orm(100000) test_sqlalchemy_orm_pk_given(100000) test_sqlalchemy_orm_bulk_insert(100000) test_sqlalchemy_core(100000) test_sqlite3(100000)
回答by juanitogan
SQLAlchemy introduced that in version 1.0.0:
SQLAlchemy 在版本中介绍了这一点1.0.0:
Bulk operations - SQLAlchemy docs
With these operations, you can now do bulk inserts or updates!
通过这些操作,您现在可以进行批量插入或更新!
For instance (if you want the lowest overhead for simple table INSERTs), you can use Session.bulk_insert_mappings():
例如(如果您希望简单表插入的开销最低),您可以使用Session.bulk_insert_mappings():
loadme = [(1, 'a'),
(2, 'b'),
(3, 'c')]
dicts = [dict(bar=t[0], fly=t[1]) for t in loadme]
s = Session()
s.bulk_insert_mappings(Foo, dicts)
s.commit()
Or, if you want, skip the loadmetuples and write the dictionaries directly into dicts(but I find it easier to leave all the wordiness out of the data and load up a list of dictionaries in a loop).
或者,如果您愿意,可以跳过loadme元组并将字典直接写入dicts(但我发现将所有冗长的内容从数据中删除并循环加载字典列表更容易)。
回答by Matthew Moisen
Piere's answer is correct but one issue is that bulk_save_objectsby default does not return the primary keys of the objects, if that is of concern to you. Set return_defaultsto Trueto get this behavior.
皮埃尔的回答是正确的,但一个问题是bulk_save_objects默认情况下不返回对象的主键,如果你关心的话。设置return_defaults为True获取此行为。
The documentation is here.
文档在这里。
foos = [Foo(bar='a',), Foo(bar='b'), Foo(bar='c')]
session.bulk_save_objects(foos, return_defaults=True)
for foo in foos:
assert foo.id is not None
session.commit()
回答by reubano
回答by lelabo_m
The best answer I found so far was in sqlalchemy documentation:
到目前为止,我找到的最佳答案是在 sqlalchemy 文档中:
There is a complete example of a benchmark of possible solutions.
有一个完整的可能解决方案基准示例。
As shown in the documentation:
如文档所示:
bulk_save_objects is not the best solution but it performance are correct.
bulk_save_objects 不是最好的解决方案,但它的性能是正确的。
The second best implementation in terms of readability I think was with the SQLAlchemy Core:
我认为在可读性方面第二好的实现是使用 SQLAlchemy Core:
def test_sqlalchemy_core(n=100000):
init_sqlalchemy()
t0 = time.time()
engine.execute(
Customer.__table__.insert(),
[{"name": 'NAME ' + str(i)} for i in xrange(n)]
)
The context of this function is given in the documentation article.
该函数的上下文在文档文章中给出。
回答by chjortlund
All Roads Lead to Rome, but some of them crosses mountains, requires ferries but if you want to get there quickly just take the motorway.
条条大路通罗马,但其中一些要穿越山脉,需要渡轮,但如果您想快速到达那里,只需走高速公路即可。
In this case the motorway is to use the execute_batch()feature of psycopg2. The documentation says it the best:
在这种情况下,高速公路将使用psycopg2的execute_batch()功能。文档说的是最好的:
The current implementation of executemany()is (using an extremely charitable understatement) not particularly performing. These functions can be used to speed up the repeated execution of a statement against a set of parameters. By reducing the number of server roundtrips the performance can be orders of magnitude better than using executemany().
当前的实现executemany()是(使用极其慈善的轻描淡写)表现不佳。这些函数可用于加速针对一组参数的语句的重复执行。通过减少服务器往返次数,性能可以比使用executemany().
In my own test execute_batch()is approximately twice as fastas executemany(), and gives the option to configure the page_size for further tweaking (if you want to squeeze the last 2-3% of performance out of the driver).
在我自己的测试execute_batch()是快2倍左右的executemany(),并给出配置进行进一步的调整所以page_size的选项(如果你想挤进业绩的最后2-3%的驾驶者)。
The same feature can easily be enabled if you are using SQLAlchemy by setting use_batch_mode=Trueas a parameter when you instantiate the engine with create_engine()
如果您在使用 SQLAlchemyuse_batch_mode=True实例化引擎时将其设置为参数,则可以轻松启用相同的功能create_engine()

