Python 使用 sqlalchemy 将 csv 文件加载到数据库中

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/31394998/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-19 09:55:26  来源:igfitidea点击:

using sqlalchemy to load csv file into a database

pythondatabasesqlalchemy

提问by alex chan

I would like to use csv files in to a database

我想在数据库中使用 csv 文件

采纳答案by Manuel J. Diaz

Because of the power of SQLAlchemy, I'm also using it on a project. It's power comes from the object-oriented way of "talking" to a database instead of hardcoding SQL statements that can be a pain to manage. Not to mention, it's also a lot faster.

由于 SQLAlchemy 的强大功能,我也在一个项目中使用它。它的强大之处在于它以面向对象的方式与数据库“对话”,而不是硬编码 SQL 语句,这种方式很难管理。更不用说,它也快了很多。

To answer your question bluntly, yes! Storing data from a CSV into a database using SQLAlchemy is a piece of cake. Here's a full working example (I used SQLAlchemy 1.0.6 and Python 2.7.6):

坦率地回答你的问题,是的!使用 SQLAlchemy 将数据从 CSV 存储到数据库中是小菜一碟。这是一个完整的工作示例(我使用了 SQLAlchemy 1.0.6 和 Python 2.7.6):

from numpy import genfromtxt
from time import time
from datetime import datetime
from sqlalchemy import Column, Integer, Float, Date
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

def Load_Data(file_name):
    data = genfromtxt(file_name, delimiter=',', skip_header=1, converters={0: lambda s: str(s)})
    return data.tolist()

Base = declarative_base()

class Price_History(Base):
    #Tell SQLAlchemy what the table name is and if there's any table-specific arguments it should know about
    __tablename__ = 'Price_History'
    __table_args__ = {'sqlite_autoincrement': True}
    #tell SQLAlchemy the name of column and its attributes:
    id = Column(Integer, primary_key=True, nullable=False) 
    date = Column(Date)
    opn = Column(Float)
    hi = Column(Float)
    lo = Column(Float)
    close = Column(Float)
    vol = Column(Float)

if __name__ == "__main__":
    t = time()

    #Create the database
    engine = create_engine('sqlite:///csv_test.db')
    Base.metadata.create_all(engine)

    #Create the session
    session = sessionmaker()
    session.configure(bind=engine)
    s = session()

    try:
        file_name = "t.csv" #sample CSV file used:  http://www.google.com/finance/historical?q=NYSE%3AT&ei=W4ikVam8LYWjmAGjhoHACw&output=csv
        data = Load_Data(file_name) 

        for i in data:
            record = Price_History(**{
                'date' : datetime.strptime(i[0], '%d-%b-%y').date(),
                'opn' : i[1],
                'hi' : i[2],
                'lo' : i[3],
                'close' : i[4],
                'vol' : i[5]
            })
            s.add(record) #Add all the records

        s.commit() #Attempt to commit all the records
    except:
        s.rollback() #Rollback the changes on error
    finally:
        s.close() #Close the connection
    print "Time elapsed: " + str(time() - t) + " s." #0.091s

(Note: this is not necessarily the "best" way to do this, but I think this format is very readable for a beginner; it's also very fast: 0.091s for 251 records inserted!)

(注意:这不一定是执行此操作的“最佳”方式,但我认为这种格式对于初学者来说非常易读;它也非常快:插入 251 条记录需要 0.091 秒!)

I think if you go through it line by line, you'll see what a breeze it is to use. Notice the lack of SQL statements -- hooray! I also took the liberty of using numpy to load the CSV contents in two lines, but it can be done without it if you like.

我想如果你一行一行地浏览它,你会看到它的使用是多么的轻而易举。注意缺少 SQL 语句——万岁!我还冒昧地使用 numpy 将 CSV 内容加载到两行中,但如果您愿意,也可以不用它。

If you wanted to compare against the traditional way of doing it, here's a full-working example for reference:

如果您想与传统的做法进行比较,这里有一个完整的示例供参考:

import sqlite3
import time
from numpy import genfromtxt

def dict_factory(cursor, row):
    d = {}
    for idx, col in enumerate(cursor.description):
        d[col[0]] = row[idx]
    return d


def Create_DB(db):      
    #Create DB and format it as needed
    with sqlite3.connect(db) as conn:
        conn.row_factory = dict_factory
        conn.text_factory = str

        cursor = conn.cursor()

        cursor.execute("CREATE TABLE [Price_History] ([id] INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL UNIQUE, [date] DATE, [opn] FLOAT, [hi] FLOAT, [lo] FLOAT, [close] FLOAT, [vol] INTEGER);")


def Add_Record(db, data):
    #Insert record into table
    with sqlite3.connect(db) as conn:
        conn.row_factory = dict_factory
        conn.text_factory = str

        cursor = conn.cursor()

        cursor.execute("INSERT INTO Price_History({cols}) VALUES({vals});".format(cols = str(data.keys()).strip('[]'), 
                    vals=str([data[i] for i in data]).strip('[]')
                    ))


def Load_Data(file_name):
    data = genfromtxt(file_name, delimiter=',', skiprows=1, converters={0: lambda s: str(s)})
    return data.tolist()


if __name__ == "__main__":
    t = time.time() 

    db = 'csv_test_sql.db' #Database filename 
    file_name = "t.csv" #sample CSV file used:  http://www.google.com/finance/historical?q=NYSE%3AT&ei=W4ikVam8LYWjmAGjhoHACw&output=csv

    data = Load_Data(file_name) #Get data from CSV

    Create_DB(db) #Create DB

    #For every record, format and insert to table
    for i in data:
        record = {
                'date' : i[0],
                'opn' : i[1],
                'hi' : i[2],
                'lo' : i[3],
                'close' : i[4],
                'vol' : i[5]
            }
        Add_Record(db, record)

    print "Time elapsed: " + str(time.time() - t) + " s." #3.604s

(Note: even in the "old" way, this is by no means the best way to do this, but it's very readable and a "1-to-1" translation from the SQLAlchemy way vs. the "old" way.)

(注意:即使在“旧”方式中,这也绝不是最好的方式,但它非常易读,并且是 SQLAlchemy 方式与“旧”方式的“一对一”翻译。)

Notice the the SQL statements: one to create the table, the other to insert records. Also, notice that it's a bit more cumbersome to maintain long SQL strings vs. a simple class attribute addition. Liking SQLAlchemy so far?

注意 SQL 语句:一个创建表,另一个插入记录。另外,请注意维护长 SQL 字符串比添加简单的类属性要麻烦一些。到目前为止喜欢 SQLAlchemy?

As for your foreign key inquiry, of course. SQLAlchemy has the power to do this too. Here's an example of how a class attribute would look like with a foreign key assignment (assuming the ForeignKeyclass has also been imported from the sqlalchemymodule):

至于你的外键查询,当然。SQLAlchemy 也有能力做到这一点。下面是一个带有外键分配的类属性的示例(假设ForeignKey该类也已从sqlalchemy模块中导入):

class Asset_Analysis(Base):
    #Tell SQLAlchemy what the table name is and if there's any table-specific arguments it should know about
    __tablename__ = 'Asset_Analysis'
    __table_args__ = {'sqlite_autoincrement': True}
    #tell SQLAlchemy the name of column and its attributes:
    id = Column(Integer, primary_key=True, nullable=False) 
    fid = Column(Integer, ForeignKey('Price_History.id'))

which points the "fid" column as a foreign key to Price_History's id column.

它将“fid”列作为 Price_History 的 id 列的外键。

Hope that helps!

希望有帮助!

回答by ARA1307

In case your CSV is quite large, using INSERTS is very ineffective. You should use a bulk loading mechanisms, which differ from base to base. E.g. in PostgreSQL you should use "COPY FROM" method:

如果您的 CSV 文件很大,则使用 INSERTS 非常无效。您应该使用批量加载机制,这在不同的基础上有所不同。例如,在 PostgreSQL 中,您应该使用“COPY FROM”方法:

with open(csv_file_path, 'r') as f:    
    conn = create_engine('postgresql+psycopg2://...').raw_connection()
    cursor = conn.cursor()
    cmd = 'COPY tbl_name(col1, col2, col3) FROM STDIN WITH (FORMAT CSV, HEADER FALSE)'
    cursor.copy_expert(cmd, f)
    conn.commit()

回答by BehavioralScientist

I have had the exact same problem, and I found it paradoxically easier to use a 2-step process with pandas:

我遇到了完全相同的问题,我发现对 Pandas 使用两步过程反而更容易:

import pandas as pd
with open(csv_file_path, 'r') as file:
    data_df = pd.read_csv(file)
data_df.to_sql('tbl_name', con=engine, index=True, index_label='id', if_exists='replace')

Note that my approach is similar to this one, but somehow Google sent me to this thread instead, so I thought I would share.

请注意,我的方法与类似,但不知何故 Google 将我发送到此线程,所以我想我会分享。

回答by Nickolay

To import a relatively small CSV file into database using sqlalchemy, you can use engine.execute(my_table.insert(), list_of_row_dicts), as described in detail in the "Executing Multiple Statements" section of the sqlalchemy tutorial.

要使用 sqlalchemy 将相对较小的 CSV 文件导入数据库,您可以使用engine.execute(my_table.insert(), list_of_row_dicts),如sqlalchemy 教程“执行多条语句”部分中所述

This is sometimes referred to as "executemany" style of invocation, because it results in an executemanyDBAPI call. The DB driver might execute a single multi-value INSERT .. VALUES (..), (..), (..)statement, which results in fewer round-trips to the DB and faster execution:

这有时被称为“executemany”风格的调用,因为它会导致一个executemanyDBAPI 调用。DB 驱动程序可能会执行单个多值INSERT .. VALUES (..), (..), (..)语句,从而减少到 DB 的往返次数并加快执行速度:

According to the sqlalchemy's FAQ, this is the fastest you can get without using DB-specific bulk loading methods, such as COPY FROMin Postgres, LOAD DATA LOCAL INFILEin MySQL, etc. In particular it's faster than using plain ORM (as in the answer by @Manuel J. Diaz here), bulk_save_objects, or bulk_insert_mappings.

根据sqlalchemy 的 FAQ,这是在不使用特定于数据库的批量加载方法(例如Postgres 中的COPY FROM、MySQL 中的LOAD DATA LOCAL INFILE等)的情况下可以获得的最快速度。特别是它比使用普通 ORM(如在@Manuel J. Diaz 在这里回答), bulk_save_objects, 或bulk_insert_mappings.

import csv
from sqlalchemy import create_engine, Table, Column, Integer, MetaData

engine = create_engine('sqlite:///sqlalchemy.db', echo=True)

metadata = MetaData()
# Define the table with sqlalchemy:
my_table = Table('MyTable', metadata,
    Column('foo', Integer),
    Column('bar', Integer),
)
metadata.create_all(engine)
insert_query = my_table.insert()

# Or read the definition from the DB:
# metadata.reflect(engine, only=['MyTable'])
# my_table = Table('MyTable', metadata, autoload=True, autoload_with=engine)
# insert_query = my_table.insert()

# Or hardcode the SQL query:
# insert_query = "INSERT INTO MyTable (foo, bar) VALUES (:foo, :bar)"

with open('test.csv', 'r', encoding="utf-8") as csvfile:
    csv_reader = csv.reader(csvfile, delimiter=',')
    engine.execute(
        insert_query,
        [{"foo": row[0], "bar": row[1]} 
            for row in csv_reader]
    )