从 Python 多处理访问 MySQL 连接池
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/24374058/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Accessing a MySQL connection pool from Python multiprocessing
提问by ensnare
I'm trying to set up a MySQL connection pool and have my worker processes access the already established pool instead of setting up a new connection each time.
我正在尝试设置一个 MySQL 连接池,并让我的工作进程访问已经建立的池,而不是每次都设置一个新连接。
I'm confused if I should pass the database cursor to each process, or if there's some other way to do this? Shouldn't MySql.connector do the pooling automatically? When I check my log files, many, many connections are opened and closed ... one for each process.
我很困惑是否应该将数据库游标传递给每个进程,或者是否有其他方法可以做到这一点?MySql.connector 不应该自动进行池化吗?当我检查我的日志文件时,打开和关闭了许多连接……每个进程一个。
My code looks something like this:
我的代码看起来像这样:
PATH = "/tmp"
class DB(object):
def __init__(self):
connected = False
while not connected:
try:
cnxpool = mysql.connector.pooling.MySQLConnectionPool(pool_name = "pool1",
**config.dbconfig)
self.__cnx = cnxpool.get_connection()
except mysql.connector.errors.PoolError:
print("Sleeping.. (Pool Error)")
sleep(5)
except mysql.connector.errors.DatabaseError:
print("Sleeping.. (Database Error)")
sleep(5)
self.__cur = self.__cnx.cursor(cursor_class=MySQLCursorDict)
def execute(self, query):
return self.__cur.execute(query)
def isValidFile(self, name):
return True
def readfile(self, fname):
d = DB()
d.execute("""INSERT INTO users (first_name) VALUES ('michael')""")
def main():
queue = multiprocessing.Queue()
pool = multiprocessing.Pool(None, init, [queue])
for dirpath, dirnames, filenames in os.walk(PATH):
full_path_fnames = map(lambda fn: os.path.join(dirpath, fn),
filenames)
full_path_fnames = filter(is_valid_file, full_path_fnames)
pool.map(readFile, full_path_fnames)
if __name__ == '__main__':
sys.exit(main())
回答by mata
First, you're creating a different connection pool for each instance of your DB
class. The pools having the same name doesn't make them the same pool
首先,您要为DB
类的每个实例创建不同的连接池。具有相同名称的池不会使它们成为同一个池
From the documentation:
从文档:
It is not an error for multiple pools to have the same name. An application that must distinguish pools by their
pool_name
property should create each pool with a distinct name.
多个池具有相同名称并不是错误。必须通过池的
pool_name
属性来区分池的应用程序应该使用不同的名称创建每个池。
Besides that, sharing a database connection (or connection pool) between different processes would be a bad idea (and i highly doubt it would even work correctly), so each process using it's own connections is actually what you should aim for.
除此之外,在不同进程之间共享数据库连接(或连接池)将是一个坏主意(我非常怀疑它甚至可以正常工作),因此使用自己的连接的每个进程实际上是您应该瞄准的目标。
You could just initialize the pool in your init
initializer as a global variable and use that instead.
Very simple example:
您可以将初始化程序中的池init
初始化为全局变量并使用它。
非常简单的例子:
from multiprocessing import Pool
from mysql.connector.pooling import MySQLConnectionPool
from mysql.connector import connect
import os
pool = None
def init():
global pool
print("PID %d: initializing pool..." % os.getpid())
pool = MySQLConnectionPool(...)
def do_work(q):
con = pool.get_connection()
print("PID %d: using connection %s" % (os.getpid(), con))
c = con.cursor()
c.execute(q)
res = c.fetchall()
con.close()
return res
def main():
p = Pool(initializer=init)
for res in p.map(do_work, ['select * from test']*8):
print(res)
p.close()
p.join()
if __name__ == '__main__':
main()
Or just use a simple connection instead of a connection pool, as only one connection will be active in each process at a time anyway.
The number of concurrently used connections is implicitly limited by the size of the multiprocessing.Pool
.
或者只是使用一个简单的连接而不是连接池,因为无论如何每次在每个进程中只有一个连接处于活动状态。
并发使用的连接数隐式受限于multiprocessing.Pool
.
回答by ThomasYe
You created multiple DB object instance. In mysql.connector.pooling.py, pool_name is only a attribute to let you make out which pool it is. There is no mapping in the mysql pool.
您创建了多个数据库对象实例。在 mysql.connector.pooling.py 中, pool_name 只是一个属性,可以让您确定它是哪个池。mysql 池中没有映射。
So, you create multiple DB instance in def readfile()
, then you will have several connection pool.
因此,您在 中创建多个数据库实例def readfile()
,那么您将拥有多个连接池。
A Singleton is useful in this case.
在这种情况下,单例很有用。
(I spent several hours to find it out. In Tornado framework, each http get create a new handler, which leads to making a new connection.)
(我花了几个小时才找到它。在 Tornado 框架中,每个 http get 都会创建一个新的处理程序,这会导致建立新的连接。)
回答by buxizhizhoum
#!/usr/bin/python
# -*- coding: utf-8 -*-
import time
import mysql.connector.pooling
dbconfig = {
"host":"127.0.0.1",
"port":"3306",
"user":"root",
"password":"123456",
"database":"test",
}
class MySQLPool(object):
"""
create a pool when connect mysql, which will decrease the time spent in
request connection, create connection and close connection.
"""
def __init__(self, host="172.0.0.1", port="3306", user="root",
password="123456", database="test", pool_name="mypool",
pool_size=3):
res = {}
self._host = host
self._port = port
self._user = user
self._password = password
self._database = database
res["host"] = self._host
res["port"] = self._port
res["user"] = self._user
res["password"] = self._password
res["database"] = self._database
self.dbconfig = res
self.pool = self.create_pool(pool_name=pool_name, pool_size=pool_size)
def create_pool(self, pool_name="mypool", pool_size=3):
"""
Create a connection pool, after created, the request of connecting
MySQL could get a connection from this pool instead of request to
create a connection.
:param pool_name: the name of pool, default is "mypool"
:param pool_size: the size of pool, default is 3
:return: connection pool
"""
pool = mysql.connector.pooling.MySQLConnectionPool(
pool_name=pool_name,
pool_size=pool_size,
pool_reset_session=True,
**self.dbconfig)
return pool
def close(self, conn, cursor):
"""
A method used to close connection of mysql.
:param conn:
:param cursor:
:return:
"""
cursor.close()
conn.close()
def execute(self, sql, args=None, commit=False):
"""
Execute a sql, it could be with args and with out args. The usage is
similar with execute() function in module pymysql.
:param sql: sql clause
:param args: args need by sql clause
:param commit: whether to commit
:return: if commit, return None, else, return result
"""
# get connection form connection pool instead of create one.
conn = self.pool.get_connection()
cursor = conn.cursor()
if args:
cursor.execute(sql, args)
else:
cursor.execute(sql)
if commit is True:
conn.commit()
self.close(conn, cursor)
return None
else:
res = cursor.fetchall()
self.close(conn, cursor)
return res
def executemany(self, sql, args, commit=False):
"""
Execute with many args. Similar with executemany() function in pymysql.
args should be a sequence.
:param sql: sql clause
:param args: args
:param commit: commit or not.
:return: if commit, return None, else, return result
"""
# get connection form connection pool instead of create one.
conn = self.pool.get_connection()
cursor = conn.cursor()
cursor.executemany(sql, args)
if commit is True:
conn.commit()
self.close(conn, cursor)
return None
else:
res = cursor.fetchall()
self.close(conn, cursor)
return res
if __name__ == "__main__":
mysql_pool = MySQLPool(**dbconfig)
sql = "select * from store WHERE create_time < '2017-06-02'"
p = Pool()
for i in range(5):
p.apply_async(mysql_pool.execute, args=(sql,))
Code above creates a connection pool at the beginning, and get connections from it in execute()
, once the connection pool has been created, the work is to remain it, since the pool is created only once, it will save the time to request for a connection every time you would like to connect to MySQL.
Hope it helps!
上面的代码一开始就创建了一个连接池,并从中获取连接execute()
,一旦创建了连接池,工作就是保留它,因为池只创建一次,这样可以节省请求连接的时间每次您想连接到 MySQL 时。希望能帮助到你!