Python 一定时间后中断功能
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/25027122/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
break the function after certain time
提问by user2372074
In python, for a toy example:
在python中,以玩具为例:
for x in range(0, 3):
# call function A(x)
I want to continue the for loop if function A takes more than 5 second by skipping it so I won't get stuck or waste time.
如果函数 A 跳过它超过 5 秒,我想继续 for 循环,这样我就不会卡住或浪费时间。
By doing some search, I realized subprocess or thread may help but I have no idea how to implement here. Any help will be great. Thanks
通过进行一些搜索,我意识到子进程或线程可能会有所帮助,但我不知道如何在这里实现。任何帮助都会很棒。谢谢
采纳答案by TheSoundDefense
I think creating a new process may be overkill. If you're on Mac or a Unix-based system, you should be able to use signal.SIGALRM to forcibly time out functions that take too long. This will work on functions that are idling for network or other issues that you absolutely can't handle by modifying your function. I have an example of using it in this answer:
我认为创建一个新流程可能有点矫枉过正。如果您使用的是 Mac 或基于 Unix 的系统,您应该能够使用 signal.SIGALRM 来强制超时耗时过长的函数。这将适用于因网络或其他问题而空闲的函数,而这些问题您绝对无法通过修改函数来处理。我有一个在这个答案中使用它的例子:
https://stackoverflow.com/a/24921763/3803152
https://stackoverflow.com/a/24921763/3803152
Editing my answer in here, though I'm not sure I'm supposed to do that:
在这里编辑我的答案,虽然我不确定我是否应该这样做:
import signal
class TimeoutException(Exception): # Custom exception class
pass
def timeout_handler(signum, frame): # Custom signal handler
raise TimeoutException
# Change the behavior of SIGALRM
signal.signal(signal.SIGALRM, timeout_handler)
for i in range(3):
# Start the timer. Once 5 seconds are over, a SIGALRM signal is sent.
signal.alarm(5)
# This try/except loop ensures that
# you'll catch TimeoutException when it's sent.
try:
A(i) # Whatever your function that might hang
except TimeoutException:
continue # continue the for loop if function A takes more than 5 second
else:
# Reset the alarm
signal.alarm(0)
This basically sets a timer for 5 seconds, then tries to execute your code. If it fails to complete before time runs out, a SIGALRM is sent, which we catch and turn into a TimeoutException. That forces you to the except block, where your program can continue.
这基本上将计时器设置为 5 秒,然后尝试执行您的代码。如果它在时间用完之前未能完成,则会发送一个 SIGALRM,我们将其捕获并转换为 TimeoutException。这会迫使您进入 except 块,在那里您的程序可以继续。
EDIT: whoops, TimeoutExceptionis a class, not a function. Thanks, abarnert!
编辑:哎呀,TimeoutException是一个类,而不是一个函数。谢谢,阿巴纳特!
回答by ZekeDroid
The comments are correct in that you should check inside. Here is a potential solution. Note that an asynchronous function (by using a thread for example) is different from this solution. This is synchronous which means it will still run in series.
评论是正确的,您应该检查内部。这是一个潜在的解决方案。请注意,异步函数(例如通过使用线程)与此解决方案不同。这是同步的,这意味着它仍将串联运行。
import time
for x in range(0,3):
someFunction()
def someFunction():
start = time.time()
while (time.time() - start < 5):
# do your normal function
return;
回答by abarnert
If you can break your work up and check every so often, that's almost always the best solution. But sometimes that's not possible—e.g., maybe you're reading a file off an slow file share that every once in a while just hangs for 30 seconds. To deal with that internally, you'd have to restructure your whole program around an async I/O loop.
如果您可以将工作分解并经常检查,那几乎总是最好的解决方案。但有时这是不可能的——例如,也许您正在从一个缓慢的文件共享中读取文件,该文件每隔一段时间就会挂起 30 秒。为了在内部处理这个问题,您必须围绕异步 I/O 循环重构整个程序。
If you don't need to be cross-platform, you can use signals on *nix (including Mac and Linux), APCs on Windows, etc. But if you need to be cross-platform, that doesn't work.
如果你不需要跨平台,你可以在*nix(包括Mac和Linux)上使用信号,在Windows上使用APC等。但是如果你需要跨平台,那行不通。
So, if you really need to do it concurrently, you can, and sometimes you have to. In that case, you probably want to use a process for this, not a thread. You can't really kill a thread safely, but you can kill a process, and it can be as safe as you want it to be. Also, if the thread is taking 5+ seconds because it's CPU-bound, you don't want to fight with it over the GIL.
所以,如果你真的需要同时进行,你可以,有时你必须这样做。在这种情况下,您可能希望为此使用一个进程,而不是一个线程。你不能真正安全地杀死一个线程,但你可以杀死一个进程,它可以像你想要的那样安全。此外,如果线程因为受 CPU 限制而花费 5 秒以上的时间,则您不想通过 GIL 与它争斗。
There are two basic options here.
这里有两个基本选项。
First, you can put the code in another script and run it with subprocess:
首先,您可以将代码放在另一个脚本中并使用以下命令运行它subprocess:
subprocess.check_call([sys.executable, 'other_script.py', arg, other_arg],
timeout=5)
Since this is going through normal child-process channels, the only communication you can use is some argvstrings, a success/failure return value (actually a small integer, but that's not much better), and optionally a hunk of text going in and a chunk of text coming out.
由于这是通过正常的子进程通道,因此您可以使用的唯一通信是一些argv字符串、成功/失败返回值(实际上是一个小整数,但这并没有好多少),以及可选的一大块文本输入和一个一大段文字出来了。
Alternatively, you can use multiprocessingto spawn a thread-like child process:
或者,您可以使用multiprocessing生成一个类似线程的子进程:
p = multiprocessing.Process(func, args)
p.start()
p.join(5)
if p.is_alive():
p.terminate()
As you can see, this is a little more complicated, but it's better in a few ways:
如您所见,这有点复杂,但在以下几个方面更好:
- You can pass arbitrary Python objects (at least anything that can be pickled) rather than just strings.
- Instead of having to put the target code in a completely independent script, you can leave it as a function in the same script.
- It's more flexible—e.g., if you later need to, say, pass progress updates, it's very easy to add a queue in either or both directions.
- 您可以传递任意 Python 对象(至少任何可以被腌制的对象)而不仅仅是字符串。
- 不必将目标代码放在一个完全独立的脚本中,您可以将它作为一个函数保留在同一个脚本中。
- 它更灵活——例如,如果您稍后需要,例如,传递进度更新,则可以很容易地在任一方向或两个方向上添加队列。
The big problem with any kind of parallelism is sharing mutable data—e.g., having a background task update a global dictionary as part of its work (which your comments say you're trying to do). With threads, you can sort of get away with it, but race conditions can lead to corrupted data, so you have to be very careful with locking. With child processes, you can't get away with it at all. (Yes, you can use shared memory, as Sharing state between processesexplains, but this is limited to simple types like numbers, fixed arrays, and types you know how to define as C structures, and it just gets you back to the same problems as threads.)
任何类型的并行性的大问题是共享可变数据——例如,让后台任务更新全局字典作为其工作的一部分(您的评论说您正在尝试这样做)。使用线程,您可以侥幸逃脱,但竞争条件会导致数据损坏,因此您必须非常小心锁定。对于子进程,您根本无法逃脱。(是的,您可以使用共享内存,正如进程间共享状态所解释的那样,但这仅限于简单类型,如数字、固定数组和您知道如何定义为 C 结构的类型,它只会让您回到相同的问题作为线程。)
Ideally, you arrange things so you don't need to share any data while the process is running—you pass in a dictas a parameter and get a dictback as a result. This is usually pretty easy to arrange when you have a previously-synchronous function that you want to put in the background.
理想情况下,您可以安排一些事情,以便在进程运行时不需要共享任何数据——您将 adict作为参数传递并dict作为结果返回。当您有一个以前同步的函数要放在后台时,这通常很容易安排。
But what if, say, a partial result is better than no result? In that case, the simplest solution is to pass the results over a queue. You can do this with an explicit queue, as explained in Exchanging objects between processes, but there's an easier way.
但是,如果说,部分结果比没有结果好呢?在这种情况下,最简单的解决方案是通过队列传递结果。您可以使用显式队列执行此操作,如在进程之间交换对象中所述,但还有一种更简单的方法。
If you can break the monolithic process into separate tasks, one for each value (or group of values) you wanted to stick in the dictionary, you can schedule them on a Pool—or, even better, a concurrent.futures.Executor. (If you're on Python 2.x or 3.1, see the backport futureson PyPI.)
如果您可以将整体流程分解为单独的任务,针对您想要保留在字典中的每个值(或一组值),您可以将它们安排在一个Pool- 或者甚至更好的concurrent.futures.Executor. (如果您使用的是 Python 2.x 或 3.1,请参阅futuresPyPI 上的向后移植。)
Let's say your slow function looked like this:
假设您的慢功能如下所示:
def spam():
global d
for meat in get_all_meats():
count = get_meat_count(meat)
d.setdefault(meat, 0) += count
Instead, you'd do this:
相反,你会这样做:
def spam_one(meat):
count = get_meat_count(meat)
return meat, count
with concurrent.futures.ProcessPoolExecutor(max_workers=1) as executor:
results = executor.map(spam_one, get_canned_meats(), timeout=5)
for (meat, count) in results:
d.setdefault(meat, 0) += count
As many results as you get within 5 seconds get added to the dict; if that isn't all of them, the rest are abandoned, and a TimeoutErroris raised (which you can handle however you want—log it, do some quick fallback code, whatever).
5 秒内得到的结果尽可能多地添加到 dict 中;如果这不是全部,则其余的将被放弃,并TimeoutError引发 a(您可以根据需要处理它 - 记录它,执行一些快速回退代码,无论如何)。
And if the tasks really are independent (as they are in my stupid little example, but of course they may not be in your real code, at least not without a major redesign), you can parallelize the work for free just by removing that max_workers=1. Then, if you run it on an 8-core machine, it'll kick off 8 workers and given them each 1/8th of the work to do, and things will get done faster. (Usually not 8x as fast, but often 3-6x as fast, which is still pretty nice.)
如果任务真的是独立的(就像我愚蠢的小例子中的那样,但当然它们可能不在你的真实代码中,至少在没有重大重新设计的情况下不是),你可以通过删除它来免费并行化工作max_workers=1。然后,如果你在 8 核机器上运行它,它将启动 8 个工人,并给他们每 1/8 的工作要做,并且事情会更快地完成。(通常不是快 8 倍,但通常快 3-6 倍,这仍然很不错。)
回答by Mikel Pascual
This seems like better idea (sorry, not sure of the python names of thing yet):
这似乎是更好的主意(抱歉,还不确定 Python 的名称):
import signal
def signal_handler(signum, frame):
raise Exception("Timeout!")
signal.signal(signal.SIGALRM, signal_handler)
signal.alarm(3) # Three seconds
try:
for x in range(0, 3):
# call function A(x)
except Exception, msg:
print "Timeout!"
signal.alarm(0) # reset
回答by Igor Kremin
Maybe some one find this decorator useful, based on TheSoundDefense answer:
根据 TheSoundDefense 的回答,也许有人会发现这个装饰器很有用:
import time
import signal
class TimeoutException(Exception): # Custom exception class
pass
def break_after(seconds=2):
def timeout_handler(signum, frame): # Custom signal handler
raise TimeoutException
def function(function):
def wrapper(*args, **kwargs):
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(seconds)
try:
res = function(*args, **kwargs)
signal.alarm(0) # Clear alarm
return res
except TimeoutException:
print u'Oops, timeout: %s sec reached.' % seconds, function.__name__, args, kwargs
return
return wrapper
return function
test:
测试:
@break_after(3)
def test(a,b,c):
return time.sleep(10)
>>> test(1,2,3)
Oops, timeout: 3 sec reached. test (1, 2, 3) {}

