如何使用 Pandas DataFrame 对 db 表的现有行执行更新?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/42461959/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How do I perform an UPDATE of existing rows of a db table using a Pandas DataFrame?
提问by D Clancy
I am attempting to query a subset of a MySql database table, feed the results into a Pandas DataFrame, alter some data, and then write the updated rows back to the same table. My table size is ~1MM rows, and the number of rows I will be altering will be relatively small (<50,000) so bringing back the entire table and performing a df.to_sql(tablename,engine, if_exists='replace')
isn't a viable option. Is there a straightforward way to UPDATE the rows that have been altered without iterating over every row in the DataFrame?
我正在尝试查询 MySql 数据库表的子集,将结果提供给 Pandas DataFrame,更改一些数据,然后将更新的行写回同一个表。我的表大小为 ~1MM 行,我将更改的行数相对较小(<50,000),因此带回整个表并执行 adf.to_sql(tablename,engine, if_exists='replace')
不是一个可行的选择。是否有一种直接的方法来更新已更改的行而无需遍历 DataFrame 中的每一行?
I am aware of this project, which attempts to simulate an "upsert" workflow, but it seems it only accomplishes the task of inserting new non-duplicate rows rather than updating parts of existing rows:
我知道这个项目试图模拟“upsert”工作流,但它似乎只完成了插入新的非重复行而不是更新现有行的部分的任务:
Here is a skeleton of what I'm attempting to accomplish on a much larger scale:
这是我试图在更大范围内完成的工作的骨架:
import pandas as pd
from sqlalchemy import create_engine
import threading
#Get sample data
d = {'A' : [1, 2, 3, 4], 'B' : [4, 3, 2, 1]}
df = pd.DataFrame(d)
engine = create_engine(SQLALCHEMY_DATABASE_URI)
#Create a table with a unique constraint on A.
engine.execute("""DROP TABLE IF EXISTS test_upsert """)
engine.execute("""CREATE TABLE test_upsert (
A INTEGER,
B INTEGER,
PRIMARY KEY (A))
""")
#Insert data using pandas.to_sql
df.to_sql('test_upsert', engine, if_exists='append', index=False)
#Alter row where 'A' == 2
df_in_db.loc[df_in_db['A'] == 2, 'B'] = 6
Now I would like to write df_in_db
back to my 'test_upsert'
table with the updated data reflected.
现在我想写df_in_db
回我的'test_upsert'
表,并反映更新的数据。
This SO question is very similar, and one of the comments recommends using an "sqlalchemy table class" to perform the task.
这个 SO 问题非常相似,其中一个评论建议使用“sqlalchemy 表类”来执行任务。
Update table using sqlalchemy table class
Can anyone expand on how I would implement this for my specific case above if that is the best (only?) way to implement it?
如果这是实现它的最佳(唯一?)方法,任何人都可以扩展我将如何针对上述特定情况实施此操作吗?
采纳答案by MaxU
I think the easiest way would be to:
我认为最简单的方法是:
first delete those rows that are going to be "upserted". This can be done in a loop, but it's not very efficient for bigger data sets (5K+ rows), so i'd save this slice of the DF into a temporary MySQL table:
首先删除将要“插入”的那些行。这可以在循环中完成,但对于更大的数据集(5K+ 行)来说效率不是很高,所以我将这部分 DF 保存到一个临时的 MySQL 表中:
# assuming we have already changed values in the rows and saved those changed rows in a separate DF: `x`
x = df[mask] # `mask` should help us to find changed rows...
# make sure `x` DF has a Primary Key column as index
x = x.set_index('a')
# dump a slice with changed rows to temporary MySQL table
x.to_sql('my_tmp', engine, if_exists='replace', index=True)
conn = engine.connect()
trans = conn.begin()
try:
# delete those rows that we are going to "upsert"
engine.execute('delete from test_upsert where a in (select a from my_tmp)')
trans.commit()
# insert changed rows
x.to_sql('test_upsert', engine, if_exists='append', index=True)
except:
trans.rollback()
raise
PS i didn't test this code so it might have some small bugs, but it should give you an idea...
PS我没有测试这段代码所以它可能有一些小错误,但它应该给你一个想法......
回答by patrick
A MySQL specific solution using Panda's to_sql"method" arg and sqlalchemy's mysql insert on_duplicate_key_updatefeatures:
使用Panda 的 to_sql"method" arg 和sqlalchemy的mysql insert on_duplicate_key_update功能的MySQL 特定解决方案:
def create_method(meta):
def method(table, conn, keys, data_iter):
sql_table = db.Table(table.name, meta, autoload=True)
insert_stmt = db.dialects.mysql.insert(sql_table).values([dict(zip(keys, data)) for data in data_iter])
upsert_stmt = insert_stmt.on_duplicate_key_update({x.name: x for x in insert_stmt.inserted})
conn.execute(upsert_stmt)
return method
engine = db.create_engine(...)
conn = engine.connect()
with conn.begin():
meta = db.MetaData(conn)
method = create_method(meta)
df.to_sql(table_name, conn, if_exists='append', method=method)