从 Python 多处理访问 MySQL 连接池

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/24374058/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-19 04:30:28  来源:igfitidea点击:

Accessing a MySQL connection pool from Python multiprocessing

pythonconnection-poolingmysql-python

提问by ensnare

I'm trying to set up a MySQL connection pool and have my worker processes access the already established pool instead of setting up a new connection each time.

我正在尝试设置一个 MySQL 连接池,并让我的工作进程访问已经建立的池,而不是每次都设置一个新连接。

I'm confused if I should pass the database cursor to each process, or if there's some other way to do this? Shouldn't MySql.connector do the pooling automatically? When I check my log files, many, many connections are opened and closed ... one for each process.

我很困惑是否应该将数据库游标传递给每个进程,或者是否有其他方法可以做到这一点?MySql.connector 不应该自动进行池化吗?当我检查我的日志文件时,打开和关闭了许多连接……每个进程一个。

My code looks something like this:

我的代码看起来像这样:

PATH = "/tmp"

class DB(object):
  def __init__(self):
    connected = False
    while not connected:
      try:
        cnxpool = mysql.connector.pooling.MySQLConnectionPool(pool_name = "pool1",
                                                          **config.dbconfig)
        self.__cnx = cnxpool.get_connection()
      except mysql.connector.errors.PoolError:
        print("Sleeping.. (Pool Error)")
        sleep(5)
      except mysql.connector.errors.DatabaseError:
        print("Sleeping.. (Database Error)")
        sleep(5)

    self.__cur = self.__cnx.cursor(cursor_class=MySQLCursorDict)

  def execute(self, query):
    return self.__cur.execute(query)

def isValidFile(self, name):
  return True

def readfile(self, fname):
  d = DB()
  d.execute("""INSERT INTO users (first_name) VALUES ('michael')""")

def main():
  queue = multiprocessing.Queue()
  pool = multiprocessing.Pool(None, init, [queue])
  for dirpath, dirnames, filenames in os.walk(PATH):

    full_path_fnames = map(lambda fn: os.path.join(dirpath, fn),
                           filenames)
    full_path_fnames = filter(is_valid_file, full_path_fnames)
    pool.map(readFile, full_path_fnames)

if __name__ == '__main__':
  sys.exit(main())

回答by mata

First, you're creating a different connection pool for each instance of your DBclass. The pools having the same name doesn't make them the same pool

首先,您要为DB类的每个实例创建不同的连接池。具有相同名称的池不会使它们成为同一个池

From the documentation:

文档

It is not an error for multiple pools to have the same name. An application that must distinguish pools by their pool_nameproperty should create each pool with a distinct name.

多个池具有相同名称并不是错误。必须通过池的pool_name属性来区分池的应用程序应该使用不同的名称创建每个池。

Besides that, sharing a database connection (or connection pool) between different processes would be a bad idea (and i highly doubt it would even work correctly), so each process using it's own connections is actually what you should aim for.

除此之外,在不同进程之间共享数据库连接(或连接池)将是一个坏主意(我非常怀疑它甚至可以正常工作),因此使用自己的连接的每个进程实际上是您应该瞄准的目标。

You could just initialize the pool in your initinitializer as a global variable and use that instead.
Very simple example:

您可以将初始化程序中的池init初始化为全局变量并使用它。
非常简单的例子:

from multiprocessing import Pool
from mysql.connector.pooling import MySQLConnectionPool
from mysql.connector import connect
import os

pool = None

def init():
    global pool
    print("PID %d: initializing pool..." % os.getpid())
    pool = MySQLConnectionPool(...)

def do_work(q):
    con = pool.get_connection()
    print("PID %d: using connection %s" % (os.getpid(), con))
    c = con.cursor()
    c.execute(q)
    res = c.fetchall()
    con.close()
    return res

def main():
    p = Pool(initializer=init)
    for res in p.map(do_work, ['select * from test']*8):
        print(res)
    p.close()
    p.join()

if __name__ == '__main__':
    main()

Or just use a simple connection instead of a connection pool, as only one connection will be active in each process at a time anyway.
The number of concurrently used connections is implicitly limited by the size of the multiprocessing.Pool.

或者只是使用一个简单的连接而不是连接池,因为无论如何每次在每个进程中只有一个连接处于活动状态。
并发使用的连接数隐式受限于multiprocessing.Pool.

回答by ThomasYe

You created multiple DB object instance. In mysql.connector.pooling.py, pool_name is only a attribute to let you make out which pool it is. There is no mapping in the mysql pool.

您创建了多个数据库对象实例。在 mysql.connector.pooling.py 中, pool_name 只是一个属性,可以让您确定它是哪个池。mysql 池中没有映射。

So, you create multiple DB instance in def readfile(), then you will have several connection pool.

因此,您在 中创建多个数据库实例def readfile(),那么您将拥有多个连接池。

A Singleton is useful in this case.

在这种情况下,单例很有用。

(I spent several hours to find it out. In Tornado framework, each http get create a new handler, which leads to making a new connection.)

(我花了几个小时才找到它。在 Tornado 框架中,每个 http get 都会创建一个新的处理程序,这会导致建立新的连接。)

回答by buxizhizhoum

#!/usr/bin/python
# -*- coding: utf-8 -*-
import time
import mysql.connector.pooling


dbconfig = {
    "host":"127.0.0.1",
    "port":"3306",
    "user":"root",
    "password":"123456",
    "database":"test",
}


class MySQLPool(object):
    """
    create a pool when connect mysql, which will decrease the time spent in 
    request connection, create connection and close connection.
    """
    def __init__(self, host="172.0.0.1", port="3306", user="root",
                 password="123456", database="test", pool_name="mypool",
                 pool_size=3):
        res = {}
        self._host = host
        self._port = port
        self._user = user
        self._password = password
        self._database = database

        res["host"] = self._host
        res["port"] = self._port
        res["user"] = self._user
        res["password"] = self._password
        res["database"] = self._database
        self.dbconfig = res
        self.pool = self.create_pool(pool_name=pool_name, pool_size=pool_size)

    def create_pool(self, pool_name="mypool", pool_size=3):
        """
        Create a connection pool, after created, the request of connecting 
        MySQL could get a connection from this pool instead of request to 
        create a connection.
        :param pool_name: the name of pool, default is "mypool"
        :param pool_size: the size of pool, default is 3
        :return: connection pool
        """
        pool = mysql.connector.pooling.MySQLConnectionPool(
            pool_name=pool_name,
            pool_size=pool_size,
            pool_reset_session=True,
            **self.dbconfig)
        return pool

    def close(self, conn, cursor):
        """
        A method used to close connection of mysql.
        :param conn: 
        :param cursor: 
        :return: 
        """
        cursor.close()
        conn.close()

    def execute(self, sql, args=None, commit=False):
        """
        Execute a sql, it could be with args and with out args. The usage is 
        similar with execute() function in module pymysql.
        :param sql: sql clause
        :param args: args need by sql clause
        :param commit: whether to commit
        :return: if commit, return None, else, return result
        """
        # get connection form connection pool instead of create one.
        conn = self.pool.get_connection()
        cursor = conn.cursor()
        if args:
            cursor.execute(sql, args)
        else:
            cursor.execute(sql)
        if commit is True:
            conn.commit()
            self.close(conn, cursor)
            return None
        else:
            res = cursor.fetchall()
            self.close(conn, cursor)
            return res

    def executemany(self, sql, args, commit=False):
        """
        Execute with many args. Similar with executemany() function in pymysql.
        args should be a sequence.
        :param sql: sql clause
        :param args: args
        :param commit: commit or not.
        :return: if commit, return None, else, return result
        """
        # get connection form connection pool instead of create one.
        conn = self.pool.get_connection()
        cursor = conn.cursor()
        cursor.executemany(sql, args)
        if commit is True:
            conn.commit()
            self.close(conn, cursor)
            return None
        else:
            res = cursor.fetchall()
            self.close(conn, cursor)
            return res


if __name__ == "__main__":
    mysql_pool = MySQLPool(**dbconfig)
    sql = "select * from store WHERE create_time < '2017-06-02'"
    p = Pool()
    for i in range(5):
        p.apply_async(mysql_pool.execute, args=(sql,))

Code above creates a connection pool at the beginning, and get connections from it in execute(), once the connection pool has been created, the work is to remain it, since the pool is created only once, it will save the time to request for a connection every time you would like to connect to MySQL. Hope it helps!

上面的代码一开始就创建了一个连接池,并从中获取连接execute(),一旦创建了连接池,工作就是保留它,因为池只创建一次,这样可以节省请求连接的时间每次您想连接到 MySQL 时。希望能帮助到你!