如何使用python的多处理终止进程

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/32053618/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-19 10:55:03  来源:igfitidea点击:

how to to terminate process using python's multiprocessing

pythonmultiprocessingpython-multiprocessing

提问by Dan Littlejohn

I have some code that needs to run against several other systems that may hang or have problems not under my control. I would like to use python's multiprocessing to spawn child processes to run independent of the main program and then when they hang or have problems terminate them, but I am not sure of the best way to go about this.

我有一些代码需要针对其他几个可能挂起或出现不受我控制的问题的系统运行。我想使用 python 的多处理来产生子进程以独立于主程序运行,然后当它们挂起或出现问题时终止它们,但我不确定最好的方法来解决这个问题。

When terminate is called it does kill the child process, but then it becomes a defunct zombie that is not released until the process object is gone. The example code below where the loop never ends works to kill it and allow a respawn when called again, but does not seem like a good way of going about this (ie multiprocessing.Process() would be better in the __init__()).

当调用 terminate 时,它​​确实会杀死子进程,但随后它变成了一个不复存在的僵尸,直到进程对象消失后才会被释放。下面的示例代码在循环永不结束的情况下可以杀死它并在再次调用时允许重新生成,但似乎不是解决此问题的好方法(即 multiprocessing.Process() 在 __init__() 中会更好)。

Anyone have a suggestion?

有人有建议吗?

class Process(object):
    def __init__(self):
        self.thing = Thing()
        self.running_flag = multiprocessing.Value("i", 1)

    def run(self):
        self.process = multiprocessing.Process(target=self.thing.worker, args=(self.running_flag,))
        self.process.start()
        print self.process.pid

    def pause_resume(self):
        self.running_flag.value = not self.running_flag.value

    def terminate(self):
        self.process.terminate()

class Thing(object):
    def __init__(self):
        self.count = 1

    def worker(self,running_flag):
        while True:
            if running_flag.value:
                self.do_work()

    def do_work(self):
        print "working {0} ...".format(self.count)
        self.count += 1
        time.sleep(1)

回答by Pie 'Oh' Pah

You might run the child processes as daemons in the background.

您可以在后台将子进程作为守护进程运行。

process.daemon = True

Any errors and hangs (or an infinite loop) in a daemon process will not affect the main process, and it will only be terminated once the main process exits.

守护进程中的任何错误和挂起(或无限循环)都不会影响主进程,并且只有在主进程退出后才会终止。

This will work for simple problems until you run into a lot of child daemon processes which will keep reaping memories from the parent process without any explicit control.

这将适用于简单的问题,直到您遇到许多子守护进程,这些子守护进程将在没有任何明确控制的情况下不断从父进程中获取内存。

Best way is to set up a Queueto have all the child processes communicate to the parent process so that we can jointhem and clean up nicely. Here is some simple code that will check if a child processing is hanging (aka time.sleep(1000)), and send a message to the queue for the main process to take action on it:

最好的方法是设置一个Queue让所有子进程与父进程通信,以便我们可以join很好地清理它们。下面是一些简单的代码,用于检查子进程是否挂起(又名time.sleep(1000)),并将消息发送到队列以便主进程对其采取行动:

import multiprocessing as mp
import time
import queue

running_flag = mp.Value("i", 1)

def worker(running_flag, q):
    count = 1
    while True:
        if running_flag.value:
            print "working {0} ...".format(count)
            count += 1
            q.put(count)
            time.sleep(1)
            if count > 3:
                # Simulate hanging with sleep
                print "hanging..."
                time.sleep(1000)

def watchdog(q):
    """
    This check the queue for updates and send a signal to it
    when the child process isn't sending anything for too long
    """
    while True:
        try:
            msg = q.get(timeout=10.0)
        except queue.Empty as e:
            print "[WATCHDOG]: Maybe WORKER is slacking"
            q.put("KILL WORKER")

def main():
    """The main process"""
    q = mp.Queue()

    workr = mp.Process(target=worker, args=(running_flag, q))
    wdog = mp.Process(target=watchdog, args=(q,))

    # run the watchdog as daemon so it terminates with the main process
    wdog.daemon = True

    workr.start()
    print "[MAIN]: starting process P1"
    wdog.start()

    # Poll the queue
    while True:
        msg = q.get()
        if msg == "KILL WATCHDOG":
            print "[MAIN]: Terminating slacking WORKER"
            workr.terminate()
            time.sleep(0.1)
            if not workr.is_alive():
                print "[MAIN]: WORKER is a goner"
                workr.join(timeout=1.0)
                print "[MAIN]: Joined WORKER successfully!"
                q.close()
                break # watchdog process daemon gets terminated

if __name__ == '__main__':
    main()

Without terminating worker, attempt to join()it to the main process would have blocked forever since workerhas never finished.

如果没有终止worker,尝试到join()主进程将永远阻塞,因为worker从未完成。

回答by noxdafox

The way Python multiprocessing handles processes is a bit confusing.

Python 多处理处理进程的方式有点令人困惑。

From the multiprocessing guidelines:

从多处理指南:

Joining zombie processes

On Unix when a process finishes but has not been joined it becomes a zombie. There should never be very many because each time a new process starts (or active_children() is called) all completed processes which have not yet been joined will be joined. Also calling a finished process's Process.is_alive will join the process. Even so it is probably good practice to explicitly join all the processes that you start.

加入僵尸进程

在 Unix 上,当一个进程完成但尚未加入时,它会变成僵尸。永远不应该有很多,因为每次新进程启动(或调用 active_children() )时,所有尚未加入的已完成进程都将被加入。同时调用已完成进程的 Process.is_alive 将加入该进程。即便如此,明确加入您启动的所有流程可能是一种很好的做法。

In order to avoid a process to become a zombie, you need to call it's join()method once you kill it.

为了避免一个进程变成僵尸,你需要在join()杀死它后调用它的方法。

If you want a simpler way to deal with the hanging calls in your system you can take a look at pebble.

如果您想要一种更简单的方法来处理系统中的挂起呼叫,您可以查看pebble

回答by Leo

(Not having enough reputation points to comment, hereby a full answer)

(没有足够的声望点来评论,特此完整回答)

@PieOhPah: thank you for this very nice example.
Unfortunately there is just one little flaw that doesn't let the watchdog kill the worker:

@PieOhPah:谢谢你的这个很好的例子。
不幸的是,只有一个小缺陷不会让看门狗杀死工人:

if msg == "KILL WATCHDOG":

it should be:

它应该是:

if msg == "KILL WORKER":

So the code becomes (with print updated for python3):

所以代码变成了(为python3更新了打印):

import multiprocessing as mp
import time
import queue

running_flag = mp.Value("i", 1)

def worker(running_flag, q):
    count = 1
    while True:
        if running_flag.value:
            print ("working {0} ...".format(count))
            count += 1
            q.put(count)
            time.sleep(1)
            if count > 3:
                # Simulate hanging with sleep
                print ("hanging...")
                time.sleep(1000)

def watchdog(q):
    """
    This check the queue for updates and send a signal to it
    when the child process isn't sending anything for too long
    """
    while True:
        try:
            msg = q.get(timeout=10.0)
        except queue.Empty as e:
            print ("[WATCHDOG]: Maybe WORKER is slacking")
            q.put("KILL WORKER")

def main():
    """The main process"""
    q = mp.Queue()

    workr = mp.Process(target=worker, args=(running_flag, q))
    wdog = mp.Process(target=watchdog, args=(q,))

    # run the watchdog as daemon so it terminates with the main process
    wdog.daemon = True

    workr.start()
    print ("[MAIN]: starting process P1")
    wdog.start()

    # Poll the queue
    while True:
        msg = q.get()
#        if msg == "KILL WATCHDOG":
        if msg == "KILL WORKER":
            print ("[MAIN]: Terminating slacking WORKER")
            workr.terminate()
            time.sleep(0.1)
            if not workr.is_alive():
                print ("[MAIN]: WORKER is a goner")
                workr.join(timeout=1.0)
                print ("[MAIN]: Joined WORKER successfully!")
                q.close()
                break # watchdog process daemon gets terminated

if __name__ == '__main__':
    main()