如何并行执行对 Pandas 数据帧的多个 SQL 查询

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/17913754/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-09-13 21:02:11  来源:igfitidea点击:

How to excecute multiple SQL queries to pandas dataframes in parallel

pythonsqlparallel-processingpandas

提问by

Hi all Python Pandas gurus. I'm looking for a way to run some SQL in parallel with Python, returning several Pandas dataframes. I have code similar to below that serially runs 4 SQL queries against a MS SQL server database. Two of the queries have much longer execution time vs. IO (network) time to get the results, so I'm thinking parallelizing would make the code run ~2x faster. Is there an easy way to execute the queries in parallel?

大家好 Python Pandas 大师。我正在寻找一种与 Python 并行运行一些 SQL 的方法,返回几个 Pandas 数据帧。我有类似于下面的代码,它针对 MS SQL 服务器数据库连续运行 4 个 SQL 查询。其中两个查询的执行时间比 IO(网络)时间长得多以获得结果,所以我认为并行化会使代码运行速度提高约 2 倍。有没有一种简单的方法可以并行执行查询?

Ideally, I would like to be able to read all the *.sql files in a sub dir of a project, then fire off the queries to run in parallel and return the four dataframes in a easy to use format (list?) for further operations (indexing, joining, aggregating).

理想情况下,我希望能够读取项目子目录中的所有 *.sql 文件,然后触发查询以并行运行并以易于使用的格式(列表?)返回四个数据帧以进一步操作(索引、连接、聚合)。

Thanks in advance, Randall

提前致谢,兰德尔

# imports
import ceODBC
import numpy as np
import pandas as pd
import pandas.io.sql as psql
from ConfigParser import ConfigParser  
import os
import glob

# db connection string
cnxn = 'DRIVER={SQL Server Native Client 11.0}; SERVER=<servername>; DATABASE=<dname>; Trusted_Connection=Yes'

# directories (also should be moved to config)
dataDir = os.getcwd() + '\data\'
sqlDir = os.getcwd() + '\sql\'

# read sql from external .sql files. Possible to read all *.sql files in a sql dir into a list (or other structure...)?
with open(sqlDir + 'q1.sql', 'r') as f: q1sql = f.read()
with open(sqlDir + 'q2.sql', 'r') as f: q2sql = f.read()
with open(sqlDir + 'q3.sql', 'r') as f: q3sql = f.read()
with open(sqlDir + 'q4.sql', 'r') as f: q4sql = f.read()

# Connect to db, run SQL, assign result into dataframe, close connection. 
cnxn = ceODBC.connect(cnxn)
cursor = cnxn.cursor()

# execute the queries and close the connection. Parallelize?
df1 = psql.frame_query(q1sql, cnxn)
df2 = psql.frame_query(q2sql, cnxn) 
df3 = psql.frame_query(q3sql, cnxn)
df4 = psql.frame_query(q4sql, cnxn) 

# close connection
cnxn.close()

回答by eri

Use N of connections in N threads. Then join theads and procces results.

在 N 个线程中使用 N 个连接。然后加入 theads 并处理结果。

# imports
import ceODBC
import numpy as np
import pandas as pd
import pandas.io.sql as psql
from ConfigParser import ConfigParser  
import os
import glob
import threading
enter code here


# db connection string
cnxn_string = 'DRIVER={SQL Server Native Client 11.0}; SERVER=<servername>; DATABASE=<dname>; Trusted_Connection=Yes'

# directories (also should be moved to config)
dataDir = os.getcwd() + '\data\'
sqlDir = os.getcwd() + '\sql\'

#variable to store results
responses={}
responses_lock=threading.Lock()

maxconnections = 8
pool_sema = BoundedSemaphore(value=maxconnections)


def task(fname):

    with open(fname, 'r') as f: sql = f.read()

    # Connect to db, run SQL, assign result into dataframe, close connection. 
    # to limit connections on DB used semaphore
    pool_sema.acquire()
    cnxn = ceODBC.connect(cnxn_string)
    cursor = cnxn.cursor()
    # execute the queries and close the connection. Parallelize?
    df = psql.frame_query(sql, cnxn)
    # close connection
    cnxn.close()
    pool_sema.release()

    # to ensure that only one thread can modify global variable
    responses_lock.acquire()
    responses[fname] = df
    responses_lock.release()


pool = []

#find sql files and spawn theads
for fname im glob.glob( os.path.join(sqlDir,'*sql')):
    #create new thread with task
    thread = threading.Thread(target=task,args=(fname,))
    thread.daemon = True
    # store thread in pool 
    pool.append(thread)
    #thread started
    thread.start()

#wait for all threads tasks done
for thread in pool:
    thread.join()

# results of each execution stored in responses dict

Each file executes in separate thread. Result stored in one variable.

每个文件在单独的线程中执行。结果存储在一个变量中。

Equivalent for function with withstatement:

等效于带有with语句的函数:

def task(fname):

    with open(fname, 'r') as f: sql = f.read()

    # Connect to db, run SQL, assign result into dataframe, close connection. 
    # to limit connections on DB used semaphore
    with pool_sema:
        cnxn = ceODBC.connect(cnxn_string)
        cursor = cnxn.cursor()
        # execute the queries and close the connection. Parallelize?
        df = psql.frame_query(sql, cnxn)
        # close connection
        cnxn.close()


    # to ensure that only one thread can modify global variable
    with responses_lock:
        responses[fname] = df

multiprocessing.Poolis easy for distributing heavy tasks, but has more IO operations in it self.

multiprocessing.Pool容易分配繁重的任务,但本身有更多的IO操作。