具有生存时间的 Python 内存缓存
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/31771286/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Python in-memory cache with time to live
提问by Louise McMahon
I have multiple threads running the same process that need to be able to to notify each other that something should not be worked on for the next n seconds its not the end of the world if they do however.
我有多个线程运行相同的进程,需要能够相互通知在接下来的 n 秒内不应处理某些事情,但如果它们这样做,则不是世界末日。
My aim is to be able to pass a string and a TTL to the cache and be able to fetch all the strings that are in the cache as a list. The cache can live in memory and the TTL's will be no more than 20 seconds.
我的目标是能够将字符串和 TTL 传递给缓存,并能够将缓存中的所有字符串作为列表获取。缓存可以存在于内存中,并且 TTL 不会超过 20 秒。
Does anyone have a any suggestions for how this can be accomplished?
有没有人对如何实现这一点有任何建议?
采纳答案by enrico.bacis
You can use the expiringdict
module:
您可以使用该expiringdict
模块:
The core of the library is
ExpiringDict
class which is an ordered dictionary with auto-expiring values for caching purposes.
该库的核心是
ExpiringDict
类,它是一个有序字典,具有用于缓存目的的自动过期值。
In the description they do not talk about multithreading, so in order not to mess up, use a Lock
.
在描述中他们没有谈论多线程,所以为了不搞砸,使用Lock
.
回答by Dawid Gos?awski
Something like that ?
类似的东西?
from time import time, sleep
import itertools
from threading import Thread, RLock
import signal
class CacheEntry():
def __init__(self, string, ttl=20):
self.string = string
self.expires_at = time() + ttl
self._expired = False
def expired(self):
if self._expired is False:
return (self.expires_at < time())
else:
return self._expired
class CacheList():
def __init__(self):
self.entries = []
self.lock = RLock()
def add_entry(self, string, ttl=20):
with self.lock:
self.entries.append(CacheEntry(string, ttl))
def read_entries(self):
with self.lock:
self.entries = list(itertools.dropwhile(lambda x:x.expired(), self.entries))
return self.entries
def read_entries(name, slp, cachelist):
while True:
print "{}: {}".format(name, ",".join(map(lambda x:x.string, cachelist.read_entries())))
sleep(slp)
def add_entries(name, ttl, cachelist):
s = 'A'
while True:
cachelist.add_entry(s, ttl)
print("Added ({}): {}".format(name, s))
sleep(1)
s += 'A'
if __name__ == "__main__":
signal.signal(signal.SIGINT, signal.SIG_DFL)
cl = CacheList()
print_threads = []
print_threads.append(Thread(None, read_entries, args=('t1', 1, cl)))
# print_threads.append(Thread(None, read_entries, args=('t2', 2, cl)))
# print_threads.append(Thread(None, read_entries, args=('t3', 3, cl)))
adder_thread = Thread(None, add_entries, args=('a1', 2, cl))
adder_thread.start()
for t in print_threads:
t.start()
for t in print_threads:
t.join()
adder_thread.join()
回答by User
The OP is using python 2.7 but if you're using python 3, ExpiringDict
mentioned in the accepted answer is currently, well, expired. The last commit to the github repowas June 17, 2017 and there is an open issue that it doesn't work with Python 3.5
OP 使用的是 python 2.7,但如果您使用的是 python 3,ExpiringDict
那么接受的答案中提到的当前已经过期。对github 存储库的最后一次提交是在2017 年 6 月 17 日,并且有一个未解决的问题,它不适用于 Python 3.5
There is a more recently maintained project cachetools(last commit May 4, 2020)
有一个最近维护的项目cachetools(最后一次提交时间为 2020 年 5 月 4 日)
pip install cachetools
pip install cachetools
from cachetools import TTLCache
cache = TTLCache(maxsize=10, ttl=360)
cache['apple'] = 'top dog'
...
>>> cache['apple']
'top dog'
... after 360 seconds...
>>> cache['apple']
KeyError exception thrown
ttl
is the time to live in seconds.
ttl
是以秒为单位的时间。
回答by Acumenus
Regarding an expiring in-memory cache, for general purpose use, a common design pattern to typically do this is not via a dictionary, but via a function or method decorator. A cache dictionary is managed behind the scenes. As such, this answer somewhat complements the answer by Userwhich uses a dictionary rather than a decorator.
对于即将到期的内存缓存,对于一般用途,通常执行此操作的常见设计模式不是通过字典,而是通过函数或方法装饰器。缓存字典在幕后管理。因此,这个答案在某种程度上补充了User使用字典而不是装饰器的答案。
The ttl_cache
decorator in cachetools==3.1.0
works a lot like functools.lru_cache
, but with a time to live.
将ttl_cache
在装饰cachetools==3.1.0
作品很像functools.lru_cache
,但有生存时间。
import cachetools.func
@cachetools.func.ttl_cache(maxsize=128, ttl=10 * 60)
def example_function(key):
return get_expensively_computed_value(key)
class ExampleClass:
EXP = 2
@classmethod
@cachetools.func.ttl_cache()
def example_classmethod(cls, i):
return i * cls.EXP
@staticmethod
@cachetools.func.ttl_cache()
def example_staticmethod(i):
return i * 3
回答by iutinvg
In case you don't want to use any 3rd libraries, you can add one more parameter to your expensive function: ttl_hash=None
. This new parameter is so-called "time sensitive hash", its the only purpose is to affect lru_cache
.
如果您不想使用任何第三个库,您可以在昂贵的函数中再添加一个参数:ttl_hash=None
. 这个新参数就是所谓的“时间敏感哈希”,它的唯一目的是影响lru_cache
.
For example:
例如:
from functools import lru_cache
import time
@lru_cache()
def my_expensive_function(a, b, ttl_hash=None):
del ttl_hash # to emphasize we don't use it and to shut pylint up
return a + b # horrible CPU load...
def get_ttl_hash(seconds=3600):
"""Return the same value withing `seconds` time period"""
return round(time.time() / seconds)
# somewhere in your code...
res = my_expensive_function(2, 2, ttl_hash=get_ttl_hash())
# cache will be updated once in an hour
回答by Javier Buzzi
I absolutely love the idea from @iutinvg, I just wanted to take it a little further. Decouple it from having to know to pass the ttl
and just make it a decorator so you don't have to think about it. If you have django
, py3
and don't feel like pip installing any dependencies, try this out.
我非常喜欢@iutinvg 的想法,我只是想更进一步。将它与必须知道传递的分离ttl
并使其成为装饰器,因此您不必考虑它。如果您有django
,py3
并且不想 pip 安装任何依赖项,请尝试一下。
import time
from django.utils.functional import lazy
from functools import lru_cache, partial, update_wrapper
def lru_cache_time(seconds, maxsize=None):
"""
Adds time aware caching to lru_cache
"""
def wrapper(func):
# Lazy function that makes sure the lru_cache() invalidate after X secs
ttl_hash = lazy(lambda: round(time.time() / seconds), int)()
@lru_cache(maxsize)
def time_aware(__ttl, *args, **kwargs):
"""
Main wrapper, note that the first argument ttl is not passed down.
This is because no function should bother to know this that
this is here.
"""
def wrapping(*args, **kwargs):
return func(*args, **kwargs)
return wrapping(*args, **kwargs)
return update_wrapper(partial(time_aware, ttl_hash), func)
return wrapper
@lru_cache_time(seconds=10)
def meaning_of_life():
"""
This message should show up if you call help().
"""
print('this better only show up once!')
return 42
@lru_cache_time(seconds=10)
def mutiply(a, b):
"""
This message should show up if you call help().
"""
print('this better only show up once!')
return a * b
# This is a test, prints a `.` for every second, there should be 10s
# beween each "this better only show up once!" *2 because of the two functions.
for _ in range(20):
meaning_of_life()
mutiply(50, 99991)
print('.')
time.sleep(1)