Python 使用 SQLAlchemy 批量插入 Pandas 数据帧
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/31997859/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Bulk Insert A Pandas DataFrame Using SQLAlchemy
提问by none
I have some rather large pandas DataFrames and I'd like to use the new bulk SQL mappings to upload them to a Microsoft SQL Server via SQL Alchemy. The pandas.to_sql method, while nice, is slow.
我有一些相当大的 Pandas DataFrames,我想使用新的批量 SQL 映射通过 SQL Alchemy 将它们上传到 Microsoft SQL Server。pandas.to_sql 方法虽然不错,但速度很慢。
I'm having trouble writing the code...
我在写代码时遇到了麻烦...
I'd like to be able to pass this function a pandas DataFrame which I'm calling table
, a schema name I'm calling schema
, and a table name I'm calling name
. Ideally, the function will 1.) delete the table if it already exists. 2.) create a new table 3.) create a mapper and 4.) bulk insert using the mapper and pandas data. I'm stuck on part 3.
我希望能够将此函数传递给我正在调用的 Pandas DataFrame、我正在调用table
的模式名称和我正在调用schema
的表名称name
。理想情况下,该函数将 1.) 删除已存在的表。2.) 创建一个新表 3.) 创建一个映射器和 4.) 使用映射器和 Pandas 数据批量插入。我被困在第 3 部分。
Here's my (admittedly rough) code. I'm struggling with how to get the mapper function to work with my primary keys. I don't really need primary keys but the mapper function requires it.
这是我的(公认的粗略)代码。我正在努力解决如何让映射器函数与我的主键一起工作。我真的不需要主键,但映射器功能需要它。
Thanks for the insights.
感谢您的见解。
from sqlalchemy import create_engine Table, Column, MetaData
from sqlalchemy.orm import mapper, create_session
from sqlalchemy.ext.declarative import declarative_base
from pandas.io.sql import SQLTable, SQLDatabase
def bulk_upload(table, schema, name):
e = create_engine('mssql+pyodbc://MYDB')
s = create_session(bind=e)
m = MetaData(bind=e,reflect=True,schema=schema)
Base = declarative_base(bind=e,metadata=m)
t = Table(name,m)
m.remove(t)
t.drop(checkfirst=True)
sqld = SQLDatabase(e, schema=schema,meta=m)
sqlt = SQLTable(name, sqld, table).table
sqlt.metadata = m
m.create_all(bind=e,tables=[sqlt])
class MyClass(Base):
return
mapper(MyClass, sqlt)
s.bulk_insert_mappings(MyClass, table.to_dict(orient='records'))
return
回答by ansonw
I ran into a similar issue with pd.to_sql taking hours to upload data. The below code bulk inserted the same data in a few seconds.
我遇到了 pd.to_sql 需要几个小时才能上传数据的类似问题。下面的代码在几秒钟内批量插入了相同的数据。
from sqlalchemy import create_engine
import psycopg2 as pg
#load python script that batch loads pandas df to sql
import cStringIO
address = 'postgresql://<username>:<pswd>@<host>:<port>/<database>'
engine = create_engine(address)
connection = engine.raw_connection()
cursor = connection.cursor()
#df is the dataframe containing an index and the columns "Event" and "Day"
#create Index column to use as primary key
df.reset_index(inplace=True)
df.rename(columns={'index':'Index'}, inplace =True)
#create the table but first drop if it already exists
command = '''DROP TABLE IF EXISTS localytics_app2;
CREATE TABLE localytics_app2
(
"Index" serial primary key,
"Event" text,
"Day" timestamp without time zone,
);'''
cursor.execute(command)
connection.commit()
#stream the data using 'to_csv' and StringIO(); then use sql's 'copy_from' function
output = cStringIO.StringIO()
#ignore the index
df.to_csv(output, sep='\t', header=False, index=False)
#jump to start of stream
output.seek(0)
contents = output.getvalue()
cur = connection.cursor()
#null values become ''
cur.copy_from(output, 'localytics_app2', null="")
connection.commit()
cur.close()
回答by AkaGonjo
This might have been answered by then, but I found the solution by collating different answers on this site and aligning with SQLAlchemy's doc.
那时可能已经回答了这个问题,但我通过整理本网站上的不同答案并与 SQLAlchemy 的文档保持一致找到了解决方案。
- The table needs to already exist in db1; with an index set up with auto_increment on.
- The Class Currentneeds to align with the dataframe imported in the CSV and the table in the db1.
- 该表需要已经存在于 db1 中;使用 auto_increment 设置索引。
- Class Current需要与 CSV 中导入的数据框和 db1 中的表对齐。
Hope this helps whoever comes here and wants to mix Panda and SQLAlchemy in a quick way.
希望这可以帮助任何来到这里并希望以快速方式混合 Panda 和 SQLAlchemy 的人。
from urllib import quote_plus as urlquote
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Numeric
from sqlalchemy.orm import sessionmaker
import pandas as pd
# Set up of the engine to connect to the database
# the urlquote is used for passing the password which might contain special characters such as "/"
engine = create_engine('mysql://root:%s@localhost/db1' % urlquote('weirdPassword*withspcialcharacters'), echo=False)
conn = engine.connect()
Base = declarative_base()
#Declaration of the class in order to write into the database. This structure is standard and should align with SQLAlchemy's doc.
class Current(Base):
__tablename__ = 'tableName'
id = Column(Integer, primary_key=True)
Date = Column(String(500))
Type = Column(String(500))
Value = Column(Numeric())
def __repr__(self):
return "(id='%s', Date='%s', Type='%s', Value='%s')" % (self.id, self.Date, self.Type, self.Value)
# Set up of the table in db and the file to import
fileToRead = 'file.csv'
tableToWriteTo = 'tableName'
# Panda to create a lovely dataframe
df_to_be_written = pd.read_csv(fileToRead)
# The orient='records' is the key of this, it allows to align with the format mentioned in the doc to insert in bulks.
listToWrite = df_to_be_written.to_dict(orient='records')
metadata = sqlalchemy.schema.MetaData(bind=engine,reflect=True)
table = sqlalchemy.Table(tableToWriteTo, metadata, autoload=True)
# Open the session
Session = sessionmaker(bind=engine)
session = Session()
# Inser the dataframe into the database in one bulk
conn.execute(table.insert(), listToWrite)
# Commit the changes
session.commit()
# Close the session
session.close()
回答by dgorissen
As this is an I/O heavy workload you can also use the python threading module through multiprocessing.dummy. This sped things up for me:
由于这是 I/O 繁重的工作负载,您还可以通过multiprocessing.dummy使用 python 线程模块。这对我来说加快了速度:
import math
from multiprocessing.dummy import Pool as ThreadPool
...
def insert_df(df, *args, **kwargs):
nworkers = 4
chunksize = math.floor(df.shape[0] / nworkers)
chunks = [(chunksize * i, (chunksize * i) + chunksize) for i in range(nworkers)]
chunks.append((chunksize * nworkers, df.shape[0]))
pool = ThreadPool(nworkers)
def worker(chunk):
i, j = chunk
df.iloc[i:j, :].to_sql(*args, **kwargs)
pool.map(worker, chunks)
pool.close()
pool.join()
....
insert_df(df, "foo_bar", engine, if_exists='append')
回答by Fabien Vauchelles
Based on @ansonw answers:
基于@ansonw 的回答:
def to_sql(engine, df, table, if_exists='fail', sep='\t', encoding='utf8'):
# Create Table
df[:0].to_sql(table, engine, if_exists=if_exists)
# Prepare data
output = cStringIO.StringIO()
df.to_csv(output, sep=sep, header=False, encoding=encoding)
output.seek(0)
# Insert data
connection = engine.raw_connection()
cursor = connection.cursor()
cursor.copy_from(output, table, sep=sep, null='')
connection.commit()
cursor.close()
I insert 200000 lines in 5 seconds instead of 4 minutes
我在 5 秒而不是 4 分钟内插入 200000 行
回答by mgoldwasser
My postgres specific solution below auto-creates the database table using your pandas dataframe, and performs a fast bulk insert using the postgres COPY my_table FROM ...
下面我的 postgres 特定解决方案使用您的 Pandas 数据框自动创建数据库表,并使用 postgres 执行快速批量插入 COPY my_table FROM ...
import io
import pandas as pd
from sqlalchemy import create_engine
def write_to_table(df, db_engine, schema, table_name, if_exists='fail'):
string_data_io = io.StringIO()
df.to_csv(string_data_io, sep='|', index=False)
pd_sql_engine = pd.io.sql.pandasSQL_builder(db_engine, schema=schema)
table = pd.io.sql.SQLTable(table_name, pd_sql_engine, frame=df,
index=False, if_exists=if_exists, schema=schema)
table.create()
string_data_io.seek(0)
string_data_io.readline() # remove header
with db_engine.connect() as connection:
with connection.connection.cursor() as cursor:
copy_cmd = "COPY %s.%s FROM STDIN HEADER DELIMITER '|' CSV" % (schema, table_name)
cursor.copy_expert(copy_cmd, string_data_io)
connection.connection.commit()
回答by cryanbhu
For anyone facing this problem and having the destination DB as Redshift, note that Redshift does not implement the full set of Postgres commands, and so some of the answers using either Postgres' COPY FROM
or copy_from()
will not work.
psycopg2.ProgrammingError: syntax error at or near "stdin" error when trying to copy_from redshift
对于任何面临这个问题并将目标数据库作为 Redshift 的人,请注意 Redshift 没有实现完整的 Postgres 命令集,因此使用 Postgres'COPY FROM
或 的一些答案copy_from()
将不起作用。
psycopg2.ProgrammingError:尝试 copy_from redshift 时出现“stdin”错误或接近“stdin”错误的语法错误
Solution for speeding up the INSERTs to Redshift is to use a file ingest or Odo.
加速向 Redshift 插入的解决方案是使用文件摄取或 Odo。
Reference:
About Odo
http://odo.pydata.org/en/latest/perf.html
Odo with Redshift
https://github.com/blaze/odo/blob/master/docs/source/aws.rst
Redshift COPY (from S3 file)
https://docs.aws.amazon.com/redshift/latest/dg/r_COPY.html
参考:
关于 Odo
http://odo.pydata.org/en/latest/perf.html
Odo with Redshift
https://github.com/blaze/odo/blob/master/docs/source/aws.rst
Redshift COPY(来自 S3 文件)
https://docs.aws.amazon.com/redshift/latest/dg/r_COPY.html
回答by bootstrap
This worked for me to connect to Oracle Database using cx_Oracle and SQLALchemy
这对我使用 cx_Oracle 和 SQLALchemy 连接到 Oracle 数据库有用
import sqlalchemy
import cx_Oracle
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, String
from sqlalchemy.orm import sessionmaker
import pandas as pd
# credentials
username = "username"
password = "password"
connectStr = "connection:/string"
tableName = "tablename"
t0 = time.time()
# connection
dsn = cx_Oracle.makedsn('host','port',service_name='servicename')
Base = declarative_base()
class LANDMANMINERAL(Base):
__tablename__ = 'tablename'
DOCUMENTNUM = Column(String(500), primary_key=True)
DOCUMENTTYPE = Column(String(500))
FILENUM = Column(String(500))
LEASEPAYOR = Column(String(500))
LEASESTATUS = Column(String(500))
PROSPECT = Column(String(500))
SPLIT = Column(String(500))
SPLITSTATUS = Column(String(500))
engine = create_engine('oracle+cx_oracle://%s:%s@%s' % (username, password, dsn))
conn = engine.connect()
Base.metadata.bind = engine
# Creating the session
DBSession = sessionmaker(bind=engine)
session = DBSession()
# Bulk insertion
data = pd.read_csv('data.csv')
lists = data.to_dict(orient='records')
table = sqlalchemy.Table('landmanmineral', Base.metadata, autoreload=True)
conn.execute(table.insert(), lists)
session.commit()
session.close()
print("time taken %8.8f seconds" % (time.time() - t0) )
回答by freddy888
for people like me who are trying to implement the aforementioned solutions:
对于像我这样试图实施上述解决方案的人:
Pandas 0.24.0 has now to_sql with chunksize and method='multi' option that inserts in bulk...
Pandas 0.24.0 现在有 to_sql 和 chunksize 和 method='multi' 选项,可以批量插入......
回答by Suhas_Pote
Here is Simple Method
这是简单的方法
.
.
Download Drivers for SQL database connectivity
下载用于 SQL 数据库连接的驱动程序
For Linux and Mac OS:
对于 Linux 和 Mac 操作系统:
For Windows:
对于 Windows:
https://www.microsoft.com/en-us/download/details.aspx?id=56567
https://www.microsoft.com/en-us/download/details.aspx?id=56567
Creating Connection
创建连接
from sqlalchemy import create_engine
import urllib
server = '*****'
database = '********'
username = '**********'
password = '*********'
params = urllib.parse.quote_plus(
'DRIVER={ODBC Driver 17 for SQL Server};'+
'SERVER='+server+';DATABASE='+database+';UID='+username+';PWD='+ password)
engine = create_engine("mssql+pyodbc:///?odbc_connect=%s" % params)
#Checking Connection
connected = pd.io.sql._is_sqlalchemy_connectable(engine)
print(connected) #Output is True if connection established successfully
Data insertion
数据插入
df.to_sql('Table_Name', con=engine, if_exists='append', index=False)
"""
if_exists: {'fail', 'replace', 'append'}, default 'fail'
fail: If table exists, do nothing.
replace: If table exists, drop it, recreate it, and insert data.
append: If table exists, insert data. Create if does not exist.
"""
If there are many records
如果有很多记录
# limit based on sp_prepexec parameter count
tsql_chunksize = 2097 // len(bd_pred_score_100.columns)
# cap at 1000 (limit for number of rows inserted by table-value constructor)
tsql_chunksize = 1000 if tsql_chunksize > 1000 else tsql_chunksize
print(tsql_chunksize)
df.to_sql('table_name', con = engine, if_exists = 'append', index= False, chunksize=tsql_chunksize)
PS: You can change the parameters as per your requirement.
PS:您可以根据需要更改参数。
回答by Jonathan Perkins
Pandas 0.25.1 has a parameter to do multi-inserts, so it's no longer necessary to workaround this issue with SQLAlchemy.
Pandas 0.25.1 有一个参数可以进行多次插入,因此不再需要使用 SQLAlchemy 来解决这个问题。
Set method='multi'
when calling pandas.DataFrame.to_sql
.
method='multi'
调用时设置pandas.DataFrame.to_sql
。
In this example, it would be
df.to_sql(table, schema=schema, con=e, index=False, if_exists='replace', method='multi')
在这个例子中,它将是
df.to_sql(table, schema=schema, con=e, index=False, if_exists='replace', method='multi')
Answer sourced from docs here
答案来自此处的文档
Worth noting that I've only tested this with Redshift. Please let me know how it goes on other databases so I can update this answer.
值得注意的是,我只用 Redshift 对此进行了测试。请让我知道它在其他数据库上的运行情况,以便我可以更新此答案。