Python、SQLite 和线程
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/524797/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Python, SQLite and threading
提问by daniels
I'm working on an application that will gather data through HTTP from several places, cache the data locally and then serve it through HTTP.
我正在开发一个应用程序,它将通过 HTTP 从多个地方收集数据,在本地缓存数据,然后通过 HTTP 提供服务。
So I was looking at the following. My application will first create several threads that will gather data at a specified interval and cache that data locally into a SQLite database.
所以我在看以下内容。我的应用程序将首先创建几个线程,这些线程将按指定的时间间隔收集数据并将该数据本地缓存到 SQLite 数据库中。
Then in the main thread start a CherryPy application that will query that SQLite database and serve the data.
然后在主线程中启动一个 CherryPy 应用程序,它将查询该 SQLite 数据库并提供数据。
My problem is: how do I handle connections to the SQLite database from my threads and from the CherryPy application?
我的问题是:如何处理从我的线程和 CherryPy 应用程序到 SQLite 数据库的连接?
If I'd do a connection per thread to the database will I also be able to create/use an in memory database?
如果我每个线程都连接到数据库,我还能创建/使用内存数据库吗?
采纳答案by Ali Afshar
Short answer: Don't use Sqlite3 in a threaded application.
简短回答:不要在线程应用程序中使用 Sqlite3。
Sqlite3 databases scale well for size, but rather terribly for concurrency. You will be plagued with "Database is locked" errors.
Sqlite3 数据库在规模上可以很好地扩展,但在并发性方面却非常糟糕。你会被“数据库被锁定”错误所困扰。
If you do, you will need a connection per thread, and you have to ensure that these connections clean up after themselves. This is traditionally handled using thread-local sessions, and is performed rather well (for example) using SQLAlchemy's ScopedSession. I would use this if I were you, even if you aren't using the SQLAlchemy ORM features.
如果这样做,则每个线程都需要一个连接,并且必须确保这些连接会自行清理。这在传统上是使用线程本地会话处理的,并且使用 SQLAlchemy 的 ScopedSession 执行得相当好(例如)。如果我是你,我会使用它,即使你没有使用 SQLAlchemy ORM 功能。
回答by S.Lott
"...create several threads that will gather data at a specified interval and cache that data locally into a sqlite database. Then in the main thread start a CherryPy app that will query that sqlite db and serve the data."
“...创建几个线程,以指定的时间间隔收集数据并将该数据本地缓存到 sqlite 数据库中。然后在主线程中启动一个 CherryPy 应用程序,它将查询该 sqlite 数据库并提供数据。”
Don't waste a lot of time on threads. The things you're describing are simply OS processes. Just start ordinary processes to do gathering and run Cherry Py.
不要在线程上浪费大量时间。您所描述的只是操作系统进程。只需启动普通进程来收集和运行 Cherry Py。
You have no real use for concurrent threads in a single process for this. Gathering data at a specified interval -- when done with simple OS processes -- can be scheduled by the OS very simply. Cron, for example, does a great job of this.
为此,您在单个进程中没有真正使用并发线程。以指定的时间间隔收集数据——当使用简单的操作系统进程完成时——可以由操作系统非常简单地安排。例如,Cron 在这方面做得很好。
A CherryPy App, also, is an OS process, not a single thread of some larger process.
CherryPy 应用程序也是一个操作系统进程,而不是某个更大进程的单个线程。
Just use processes -- threads won't help you.
只使用进程——线程不会帮助你。
回答by paprika
Depending on the application the DB could be a real overhead. If we are talking about volatile data, maybe you could skip the communication via DB completely and share the data between the data gathering processand the data serving process(es)via IPC. This is not an option if the data has to be persisted, of course.
根据应用程序,数据库可能是一个真正的开销。如果我们谈论易失性数据,也许您可以完全跳过通过 DB 的通信,并通过 IPC在数据收集进程和数据服务进程之间共享数据。当然,如果必须保留数据,这不是一个选项。
回答by Martin Beckett
Depending on the data rate sqlite could be exactly the correct way to do this. The entire database is locked for each write so you aren't going to scale to 1000s of simultaneous writes per second. But if you only have a few it is the safest way of assuring you don't overwrite each other.
根据数据速率,sqlite 可能正是执行此操作的正确方法。每次写入都会锁定整个数据库,因此您不会扩展到每秒 1000 次同时写入。但是,如果您只有几个,那么这是确保不会相互覆盖的最安全方法。
回答by PirateApp
This test is being done to determine the best way to write and read from SQLite database. We follow 3 approaches below
正在进行此测试以确定从 SQLite 数据库写入和读取的最佳方式。我们遵循以下 3 种方法
- Read and write without any threads (the methods with the word normal on it)
- Read and write with Threads
- Read and write with processes
- 无线程读写(带有normal字样的方法)
- 使用线程读取和写入
- 使用进程读写
Our sample dataset is a dummy generated OHLC dataset with a symbol, timestamp, and 6 fake values for ohlc and volumefrom, volumeto
我们的示例数据集是一个虚拟生成的 OHLC 数据集,带有符号、时间戳和 ohlc 和 volumefrom、volumeto 的 6 个假值
Reads
读取
- Normal method takes about 0.25 seconds to read
- Threaded method takes 10 seconds
- Processing takes 0.25 seconds to read
- 普通方法读取大约需要0.25秒
- 线程方法需要 10 秒
- 处理需要 0.25 秒读取
Winner: Processing and Normal
获胜者:处理和正常
Writes
写
- Normal method takes about 1.5 seconds to write
- Threaded method takes about 30 seconds
- Processing takes about 30 seconds
- 普通方法写入大约需要1.5秒
- 线程方法大约需要 30 秒
- 处理大约需要 30 秒
Winner: Normal
获胜者:普通
Note:All records are not written using the threaded and processed write methods. Threaded and processed write methods obviously run into database locked errors as the writes are queued up SQlite only queues up writes to a certain threshold and then throws sqlite3.OperationalError indicating database is locked. The ideal way is to retry inserting the same chunk again but there is no point as the method execution for parallel insertion takes more tine than a sequential read even without retrying the locked/failed inserts Without retrying, 97% of the rows were written and still took 10x more time than a sequential write
注意:并非所有记录都使用线程化和处理写入方法写入。线程化和处理写入方法显然会遇到数据库锁定错误,因为写入排队 SQlite 只将写入排队到某个阈值,然后抛出 sqlite3.OperationalError 指示数据库已锁定。理想的方法是再次重试插入相同的块,但没有意义,因为即使不重试锁定/失败的插入,并行插入的方法执行比顺序读取花费更多的时间 如果不重试,97% 的行已写入并且仍然比顺序写入花费的时间多 10 倍
Strategies to takeaway:
外卖策略:
Prefer reading SQLite and writing it in the same thread
If you must do multithreading, use multiprocessing to read which has more or less the same performance and defer to single threaded write operations
DO NOT USE THREADINGfor reads and writes as it is 10x slower on both, you can thank the GIL for that
更喜欢阅读 SQLite 并在同一线程中编写它
如果您必须进行多线程,请使用多处理读取具有或多或少相同性能并遵循单线程写入操作
不要使用线程进行读取和写入,因为两者都慢 10 倍,您可以为此感谢 GIL
Here is the code for the complete test
这是完整测试的代码
import sqlite3
import time
import random
import string
import os
import timeit
from functools import wraps
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import threading
import os
database_file = os.path.realpath('../files/ohlc.db')
create_statement = 'CREATE TABLE IF NOT EXISTS database_threading_test (symbol TEXT, ts INTEGER, o REAL, h REAL, l REAL, c REAL, vf REAL, vt REAL, PRIMARY KEY(symbol, ts))'
insert_statement = 'INSERT INTO database_threading_test VALUES(?,?,?,?,?,?,?,?)'
select = 'SELECT * from database_threading_test'
def time_stuff(some_function):
def wrapper(*args, **kwargs):
t0 = timeit.default_timer()
value = some_function(*args, **kwargs)
print(timeit.default_timer() - t0, 'seconds')
return value
return wrapper
def generate_values(count=100):
end = int(time.time()) - int(time.time()) % 900
symbol = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(10))
ts = list(range(end - count * 900, end, 900))
for i in range(count):
yield (symbol, ts[i], random.random() * 1000, random.random() * 1000, random.random() * 1000, random.random() * 1000, random.random() * 1e9, random.random() * 1e5)
def generate_values_list(symbols=1000,count=100):
values = []
for _ in range(symbols):
values.extend(generate_values(count))
return values
@time_stuff
def sqlite_normal_read():
"""
100k records in the database, 1000 symbols, 100 rows
First run
0.25139795300037804 seconds
Second run
Third run
"""
conn = sqlite3.connect(os.path.realpath('../files/ohlc.db'))
try:
with conn:
conn.execute(create_statement)
results = conn.execute(select).fetchall()
print(len(results))
except sqlite3.OperationalError as e:
print(e)
@time_stuff
def sqlite_normal_write():
"""
1000 symbols, 100 rows
First run
2.279409104000024 seconds
Second run
2.3364172020001206 seconds
Third run
"""
l = generate_values_list()
conn = sqlite3.connect(os.path.realpath('../files/ohlc.db'))
try:
with conn:
conn.execute(create_statement)
conn.executemany(insert_statement, l)
except sqlite3.OperationalError as e:
print(e)
@time_stuff
def sequential_batch_read():
"""
We read all the rows for each symbol one after the other in sequence
First run
3.661222331999852 seconds
Second run
2.2836898810001003 seconds
Third run
0.24514851899994028 seconds
Fourth run
0.24082150699996419 seconds
"""
conn = sqlite3.connect(os.path.realpath('../files/ohlc.db'))
try:
with conn:
conn.execute(create_statement)
symbols = conn.execute("SELECT DISTINCT symbol FROM database_threading_test").fetchall()
for symbol in symbols:
results = conn.execute("SELECT * FROM database_threading_test WHERE symbol=?", symbol).fetchall()
except sqlite3.OperationalError as e:
print(e)
def sqlite_threaded_read_task(symbol):
results = []
conn = sqlite3.connect(os.path.realpath('../files/ohlc.db'))
try:
with conn:
results = conn.execute("SELECT * FROM database_threading_test WHERE symbol=?", symbol).fetchall()
except sqlite3.OperationalError as e:
print(e)
finally:
return results
def sqlite_multiprocessed_read_task(symbol):
results = []
conn = sqlite3.connect(os.path.realpath('../files/ohlc.db'))
try:
with conn:
results = conn.execute("SELECT * FROM database_threading_test WHERE symbol=?", symbol).fetchall()
except sqlite3.OperationalError as e:
print(e)
finally:
return results
@time_stuff
def sqlite_threaded_read():
"""
1000 symbols, 100 rows per symbol
First run
9.429676861000189 seconds
Second run
10.18928106400017 seconds
Third run
10.382290903000467 seconds
"""
conn = sqlite3.connect(os.path.realpath('../files/ohlc.db'))
symbols = conn.execute("SELECT DISTINCT SYMBOL from database_threading_test").fetchall()
with ThreadPoolExecutor(max_workers=8) as e:
results = e.map(sqlite_threaded_read_task, symbols, chunksize=50)
for result in results:
pass
@time_stuff
def sqlite_multiprocessed_read():
"""
1000 symbols, 100 rows
First run
0.2484774920012569 seconds!!!
Second run
0.24322178500005975 seconds
Third run
0.2863524549993599 seconds
"""
conn = sqlite3.connect(os.path.realpath('../files/ohlc.db'))
symbols = conn.execute("SELECT DISTINCT SYMBOL from database_threading_test").fetchall()
with ProcessPoolExecutor(max_workers=8) as e:
results = e.map(sqlite_multiprocessed_read_task, symbols, chunksize=50)
for result in results:
pass
def sqlite_threaded_write_task(n):
"""
We ignore the database locked errors here. Ideal case would be to retry but there is no point writing code for that if it takes longer than a sequential write even without database locke errors
"""
conn = sqlite3.connect(os.path.realpath('../files/ohlc.db'))
data = list(generate_values())
try:
with conn:
conn.executemany("INSERT INTO database_threading_test VALUES(?,?,?,?,?,?,?,?)",data)
except sqlite3.OperationalError as e:
print("Database locked",e)
finally:
conn.close()
return len(data)
def sqlite_multiprocessed_write_task(n):
"""
We ignore the database locked errors here. Ideal case would be to retry but there is no point writing code for that if it takes longer than a sequential write even without database locke errors
"""
conn = sqlite3.connect(os.path.realpath('../files/ohlc.db'))
data = list(generate_values())
try:
with conn:
conn.executemany("INSERT INTO database_threading_test VALUES(?,?,?,?,?,?,?,?)",data)
except sqlite3.OperationalError as e:
print("Database locked",e)
finally:
conn.close()
return len(data)
@time_stuff
def sqlite_threaded_write():
"""
Did not write all the results but the outcome with 97400 rows written is still this...
Takes 20x the amount of time as a normal write
1000 symbols, 100 rows
First run
28.17819765000013 seconds
Second run
25.557972323000058 seconds
Third run
"""
symbols = [i for i in range(1000)]
with ThreadPoolExecutor(max_workers=8) as e:
results = e.map(sqlite_threaded_write_task, symbols, chunksize=50)
for result in results:
pass
@time_stuff
def sqlite_multiprocessed_write():
"""
1000 symbols, 100 rows
First run
30.09209805699993 seconds
Second run
27.502465319000066 seconds
Third run
"""
symbols = [i for i in range(1000)]
with ProcessPoolExecutor(max_workers=8) as e:
results = e.map(sqlite_multiprocessed_write_task, symbols, chunksize=50)
for result in results:
pass
sqlite_normal_write()