将大型 Pandas 数据帧写入 SQL Server 数据库

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/33816918/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-09-14 00:15:27  来源:igfitidea点击:

Write Large Pandas DataFrames to SQL Server database

pythonsql-serverpandassqlalchemy

提问by denvaar

I have 74 relatively large Pandas DataFrames (About 34,600 rows and 8 columns) that I am trying to insert into a SQL Server database as quickly as possible. After doing some research, I learned that the good ole pandas.to_sqlfunction is not good for such large inserts into a SQL Server database, which was the initial approach that I took (very slow - almost an hour for the application to complete vs about 4 minutes when using mysql database.)

我有 74 个相对较大的 Pandas DataFrame(大约 34,600 行和 8 列),我试图尽快将它们插入到 SQL Server 数据库中。在做了一些研究之后,我了解到好的 olepandas.to_sql函数并不适合将如此大的插入到 SQL Server 数据库中,这是我最初采用的方法(非常慢 - 应用程序几乎需要一个小时才能完成,而大约需要 4 分钟)使用 mysql 数据库。)

This article, and many other StackOverflow posts have been helpful in pointing me in the right direction, however I've hit a roadblock:

这篇文章和许多其他 StackOverflow 帖子都帮助我指明了正确的方向,但是我遇到了一个障碍:

I am trying to use SQLAlchemy's Core rather than the ORM for reasons explained in the link above. So, I am converting the dataframe to a dictionary, using pandas.to_dictand then doing an execute()and insert():

由于上面链接中解释的原因,我正在尝试使用 SQLAlchemy 的核心而不是 ORM。所以,我将数据框转换为字典,使用pandas.to_dict然后执行execute()and insert()

self._session_factory.engine.execute(
    TimeSeriesResultValues.__table__.insert(),
    data)
# 'data' is a list of dictionaries.

The problem is that insert is not getting any values -- they appear as a bunch of empty parenthesis and I get this error:

问题是 insert 没有得到任何值——它们显示为一堆空括号,我得到这个错误:

(pyodbc.IntegretyError) ('23000', "[23000] [FreeTDS][SQL Server]Cannot
insert the value NULL into the column...

There are values in the list of dictionaries that I passed in, so I can't figure out why the values aren't showing up.

我传入的字典列表中有值,所以我不知道为什么这些值没有显示出来。

EDIT:

编辑:

Here's the example I'm going off of:

这是我要离开的例子:

def test_sqlalchemy_core(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    engine.execute(
        Customer.__table__.insert(),
        [{"name": 'NAME ' + str(i)} for i in range(n)]
    )
    print("SQLAlchemy Core: Total time for " + str(n) +
        " records " + str(time.time() - t0) + " secs")

回答by maxymoo

I've got some sad news for you, SQLAlchemy actually doesn't implement bulk imports for SQL Server, it's actually just going to do the same slow individual INSERT statements that to_sqlis doing. I would say that your best bet is to try and script something up using the bcpcommand line tool. Here is a script that I've used in the past, but no guarantees:

我有一些不幸的消息要告诉你,SQLAlchemy 实际上并没有为 SQL Server 实现批量导入,它实际上只是执行与正在执行的缓慢的单个 INSERT 语句相同的操作to_sql。我会说你最好的选择是尝试使用bcp命令行工具编写一些脚本。这是我过去使用过的脚本,但不能保证:

from subprocess import check_output, call
import pandas as pd
import numpy as np
import os

pad = 0.1
tablename = 'sandbox.max.pybcp_test'
overwrite=True
raise_exception = True
server = 'P01'
trusted_connection= True
username=None
password=None
delimiter='|'
df = pd.read_csv('D:/inputdata.csv', encoding='latin', error_bad_lines=False)



def get_column_def_sql(col):
   if col.dtype == object:
      width = col.str.len().max() * (1+pad)
      return '[{}] varchar({})'.format(col.name, int(width)) 
   elif np.issubdtype(col.dtype, float):
      return'[{}] float'.format(col.name) 
   elif np.issubdtype(col.dtype, int):
      return '[{}] int'.format(col.name) 
   else:
      if raise_exception:
         raise NotImplementedError('data type {} not implemented'.format(col.dtype))
      else:
         print('Warning: cast column {} as varchar; data type {} not implemented'.format(col, col.dtype))
         width = col.str.len().max() * (1+pad)
         return '[{}] varchar({})'.format(col.name, int(width)) 

def create_table(df, tablename, server, trusted_connection, username, password, pad):         
    if trusted_connection:
       login_string = '-E'
    else:
       login_string = '-U {} -P {}'.format(username, password)

    col_defs = []
    for col in df:
       col_defs += [get_column_def_sql(df[col])]

    query_string = 'CREATE TABLE {}\n({})\nGO\nQUIT'.format(tablename, ',\n'.join(col_defs))       
    if overwrite == True:
       query_string = "IF OBJECT_ID('{}', 'U') IS NOT NULL DROP TABLE {};".format(tablename, tablename) + query_string


    query_file = 'c:\pybcp_tempqueryfile.sql'
    with open (query_file,'w') as f:
       f.write(query_string)

    if trusted_connection:
       login_string = '-E'
    else:
       login_string = '-U {} -P {}'.format(username, password)

    o = call('sqlcmd -S {} {} -i {}'.format(server, login_string, query_file), shell=True)
    if o != 0:
       raise BaseException("Failed to create table")
   # o = call('del {}'.format(query_file), shell=True)


def call_bcp(df, tablename):   
    if trusted_connection:
       login_string = '-T'
    else:
       login_string = '-U {} -P {}'.format(username, password)
    temp_file = 'c:\pybcp_tempqueryfile.csv'

    #remove the delimiter and change the encoding of the data frame to latin so sql server can read it
    df.loc[:,df.dtypes == object] = df.loc[:,df.dtypes == object].apply(lambda col: col.str.replace(delimiter,'').str.encode('latin'))
    df.to_csv(temp_file, index = False, sep = '|', errors='ignore')
    o = call('bcp sandbox.max.pybcp_test2 in c:\pybcp_tempqueryfile.csv -S "localhost" -T -t^| -r\n -c')

回答by Hung Nguyen

This just recently been updated as of SQLAchemy ver: 1.3.0 just in case anyone else needs to know. Should make your dataframe.to_sql statement much faster.

这只是最近更新为 SQLAchemy ver: 1.3.0 以防万一其他人需要知道。应该使您的 dataframe.to_sql 语句更快。

https://docs.sqlalchemy.org/en/latest/changelog/migration_13.html#support-for-pyodbc-fast-executemany

https://docs.sqlalchemy.org/en/latest/changelog/migration_13.html#support-for-pyodbc-fast-executemany

engine = create_engine( "mssql+pyodbc://scott:tiger@mssql2017:1433/test?driver=ODBC+Driver+13+for+SQL+Server", fast_executemany=True)

engine = create_engine( "mssql+pyodbc://scott:tiger@mssql2017:1433/test?driver=ODBC+Driver+13+for+SQL+Server", fast_executemany=True)