pandas 将临时表与 SQLAlchemy 一起使用

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/44140632/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-09-14 03:40:47  来源:igfitidea点击:

Use temp table with SQLAlchemy

pythonsql-serverpandassqlalchemytemp-tables

提问by Kris Harper

I am trying to use use a temp table with SQLAlchemy and join it against an existing table. This is what I have so far

我正在尝试将临时表与 SQLAlchemy 一起使用,并将其与现有表连接。这是我到目前为止

engine = db.get_engine(db.app, 'MY_DATABASE')
df = pd.DataFrame({"id": [1, 2, 3], "value": [100, 200, 300], "date": [date.today(), date.today(), date.today()]})
temp_table = db.Table('#temp_table',
                      db.Column('id', db.Integer),
                      db.Column('value', db.Integer),
                      db.Column('date', db.DateTime))
temp_table.create(engine)
df.to_sql(name='tempdb.dbo.#temp_table',
          con=engine,
          if_exists='append',
          index=False)
query = db.session.query(ExistingTable.id).join(temp_table, temp_table.c.id == ExistingTable.id)
out_df = pd.read_sql(query.statement, engine)
temp_table.drop(engine)
return out_df.to_dict('records')

This doesn't return any results because the insert statements that to_sqldoes don't get run (I think this is because they are run using sp_prepexec, but I'm not entirely sure about that).

这不会返回任何结果,因为to_sql不会运行的插入语句(我认为这是因为它们使用 运行sp_prepexec,但我不完全确定)。

I then tried just writing out the SQL statement (CREATE TABLE #temp_table..., INSERT INTO #temp_table..., SELECT [id] FROM...) and then running pd.read_sql(query, engine). I get the error message

然后我尝试只写出 SQL 语句 ( CREATE TABLE #temp_table..., INSERT INTO #temp_table..., SELECT [id] FROM...) 然后运行pd.read_sql(query, engine). 我收到错误信息

This result object does not return rows. It has been closed automatically.

此结果对象不返回行。它已自动关闭。

I guess this is because the statement does more than just SELECT?

我想这是因为该语句不仅仅是SELECT?

How can I fix this issue (either solution would work, although the first would be preferable as it avoids hard-coded SQL). To be clear, I can't modify the schema in the existing database—it's a vendor database.

我该如何解决这个问题(两种解决方案都可以,虽然第一种解决方案更可取,因为它避免了硬编码的 SQL)。需要明确的是,我无法修改现有数据库中的架构——它是一个供应商数据库。

回答by van

In case the number of records to be inserted in the temporary table is small/moderate, one possibility would be to use a literal subqueryor a values CTEinstead of creating temporary table.

如果要插入临时表的记录数量很少/中等,一种可能性是使用 aliteral subquery或 avalues CTE代替创建临时表。

# MODEL
class ExistingTable(Base):
    __tablename__ = 'existing_table'
    id = sa.Column(sa.Integer, primary_key=True)
    name = sa.Column(sa.String)
    # ...

Assume also following data is to be inserted into temptable:

假设还要将以下数据插入到temp表中:

# This data retrieved from another database and used for filtering
rows = [
    (1, 100, datetime.date(2017, 1, 1)),
    (3, 300, datetime.date(2017, 3, 1)),
    (5, 500, datetime.date(2017, 5, 1)),
]

Create a CTE or a sub-query containing that data:

创建包含该数据的 CTE 或子查询:

stmts = [
    # @NOTE: optimization to reduce the size of the statement:
    # make type cast only for first row, for other rows DB engine will infer
    sa.select([
        sa.cast(sa.literal(i), sa.Integer).label("id"),
        sa.cast(sa.literal(v), sa.Integer).label("value"),
        sa.cast(sa.literal(d), sa.DateTime).label("date"),
    ]) if idx == 0 else
    sa.select([sa.literal(i), sa.literal(v), sa.literal(d)])  # no type cast

    for idx, (i, v, d) in enumerate(rows)
]
subquery = sa.union_all(*stmts)

# Choose one option below.
# I personally prefer B because one could reuse the CTE multiple times in the same query
# subquery = subquery.alias("temp_table")  # option A
subquery = subquery.cte(name="temp_table")  # option B

Create final query with the required joins and filters:

使用所需的连接和过滤器创建最终查询:

query = (
    session
    .query(ExistingTable.id)
    .join(subquery, subquery.c.id == ExistingTable.id)
    # .filter(subquery.c.date >= XXX_DATE)
)

# TEMP: Test result output
for res in query:
    print(res)    

Finally, get pandas data frame:

最后,获取pandas数据框:

out_df = pd.read_sql(query.statement, engine)
result = out_df.to_dict('records')

回答by Mikhail Lobanov

You can try to use another solution - Process-Keyed Table

您可以尝试使用另一种解决方案 - Process-Keyed Table

A process-keyed table is simply a permanent table that serves as a temp table. To permit processes to use the table simultaneously, the table has an extra column to identify the process. The simplest way to do this is the global variable @@spid (@@spid is the process id in SQL Server).

进程键控表只是用作临时表的永久表。为了允许进程同时使用该表,该表有一个额外的列来标识进程。最简单的方法是使用全局变量@@spid(@@spid 是 SQL Server 中的进程 ID)。

...

...

One alternative for the process-key is to use a GUID (data type uniqueidentifier).

进程键的一种替代方法是使用 GUID(数据类型唯一标识符)。

http://www.sommarskog.se/share_data.html#prockeyed

http://www.sommarskog.se/share_data.html#prockeyed