Python:捕捉 Ctrl-C 命令。提示“真的要退出(y/n)”,如果没有则继续执行

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/18114560/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-19 09:56:39  来源:igfitidea点击:

Python: Catch Ctrl-C command. Prompt "really want to quit (y/n)", resume execution if no

pythonsignalscopy-pastesigint

提问by Colin M

I have a program that may have a lengthy execution. In the main module I have the following:

我有一个可能需要很长时间执行的程序。在主模块中,我有以下内容:

import signal
def run_program()
   ...time consuming execution...

def Exit_gracefully(signal, frame):
    ... log exiting information ...
    ... close any open files ...
    sys.exit(0)

if __name__ == '__main__':
    signal.signal(signal.SIGINT, Exit_gracefully)
    run_program()

This works fine, but I'd like the possibility to pause execution upon catching SIGINT, prompting the user if they would really like to quit, and resuming where I left off in run_program() if they decide they don't want to quit.

这工作正常,但我希望有可能在捕获 SIGINT 时暂停执行,提示用户他们是否真的想退出,如果他们决定不想退出,则在 run_program() 中恢复我离开的位置。

The only way I can think of doing this is running the program in a separate thread, keeping the main thread waiting on it and ready to catch SIGINT. If the user wants to quit the main thread can do cleanup and kill the child thread.

我能想到的唯一方法是在一个单独的线程中运行程序,让主线程等待它并准备好捕捉 SIGINT。如果用户想退出主线程可以做清理和杀死子线程。

Is there a simpler way?

有没有更简单的方法?

回答by Antti Haapala

The python signal handlers do not seem to be real signal handlers; that is they happen after the fact, in the normal flow and after the C handler has already returned. Thus you'd try to put your quit logic within the signal handler. As the signal handler runs in the main thread, it will block execution there too.

python 信号处理程序似乎不是真正的信号处理程序;也就是说,它们发生在事实之后,在正常流程中并且在 C 处理程序已经返回之后。因此,您会尝试将退出逻辑放在信号处理程序中。当信号处理程序在主线程中运行时,它也会在那里阻塞执行。

Something like this seems to work nicely.

像这样的东西似乎工作得很好。

import signal
import time
import sys

def run_program():
    while True:
        time.sleep(1)
        print("a")

def exit_gracefully(signum, frame):
    # restore the original signal handler as otherwise evil things will happen
    # in raw_input when CTRL+C is pressed, and our signal handler is not re-entrant
    signal.signal(signal.SIGINT, original_sigint)

    try:
        if raw_input("\nReally quit? (y/n)> ").lower().startswith('y'):
            sys.exit(1)

    except KeyboardInterrupt:
        print("Ok ok, quitting")
        sys.exit(1)

    # restore the exit gracefully handler here    
    signal.signal(signal.SIGINT, exit_gracefully)

if __name__ == '__main__':
    # store the original SIGINT handler
    original_sigint = signal.getsignal(signal.SIGINT)
    signal.signal(signal.SIGINT, exit_gracefully)
    run_program()

The code restores the original signal handler for the duration of raw_input; raw_inputitself is not re-entrable, and re-entering it will lead to RuntimeError: can't re-enter readlinebeing raised from time.sleepwhich is something we don't want as it is harder to catch than KeyboardInterrupt. Rather, we let 2 consecutive Ctrl-C's to raise KeyboardInterrupt.

该代码在raw_input;的持续时间内恢复原始信号处理程序。raw_input本身是不可重新进入的,重新进入它会导致RuntimeError: can't re-enter readline被提升,time.sleep这是我们不想要的,因为它比KeyboardInterrupt. 相反,我们让 2 个连续的 Ctrl-C 来提高KeyboardInterrupt.

回答by Marc

from https://gist.github.com/rtfpessoa/e3b1fe0bbfcd8ac853bf

来自https://gist.github.com/rtfpessoa/e3b1fe0bbfcd8ac853bf

#!/usr/bin/env python

import signal
import sys

def signal_handler(signal, frame):
  # your code here
  sys.exit(0)

signal.signal(signal.SIGINT, signal_handler)

Bye!

再见!

回答by Carson

when procedure end then do something

当程序结束然后做某事

suppose you just want to the procedure will do something after the task end

假设你只是想让程序在任务结束后做一些事情

import time

class TestTask:
    def __init__(self, msg: str):
        self.msg = msg

    def __enter__(self):
        print(f'Task Start!:{self.msg}')
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        print('Task End!')

    @staticmethod
    def do_something():
        try:
            time.sleep(5)
        except:
            pass

with TestTask('Hello World') as task:
    task.do_something()

when the process leaves withthat will run __exit__even with KeyboardInterrupt happen that are same.

当进程离开时with__exit__即使出现相同的 KeyboardInterrupt也会运行。

if you don't like to see the error, add try ... except ...

如果您不想看到错误,请添加try ... except ...

@staticmethod
def do_something():
    try:
        time.sleep(5)
    except:
        pass

pause, continue, reset, and etc.

暂停、继续、重置等。

I don't have a perfect solution, but it may be useful to you.

我没有完美的解决方案,但它可能对您有用。

It's means divided your process to many subprocesses and save it that finished.it will not be executed again since you find it already done.

这意味着将您的流程划分为许多子流程并保存完成。因为您发现它已经完成,因此不会再次执行。

import time
from enum import Enum

class Action(Enum):
    EXIT = 0
    CONTINUE = 1
    RESET = 2

class TestTask:
    def __init__(self, msg: str):
        self.msg = msg

    def __enter__(self):
        print(f'Task Start!:{self.msg}')
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        print('Task End!')

    def do_something(self):
        tuple_job = (self._foo, self._bar)  # implement by yourself
        list_job_state = [0] * len(tuple_job)
        dict_keep = {}  # If there is a need to communicate between jobs, and you don't want to use class members, you can use this method.
        while 1:
            try:
                for idx, cur_process in enumerate(tuple_job):
                    if not list_job_state[idx]:
                        cur_process(dict_keep)
                        list_job_state[idx] = True
                if all(list_job_state):
                    print('100%')
                    break
            except KeyboardInterrupt:
                print('KeyboardInterrupt. input action:')
                msg = '\n\t'.join([f"{action + ':':<10}{str(act_number)}" for act_number, action in
                                   enumerate([name for name in vars(Action) if not name.startswith('_')])
                                   ])
                case = Action(int(input(f'\t{msg}\n:')))
                if case == Action.EXIT:
                    break
                if case == Action.RESET:
                    list_job_state = [0] * len(tuple_job)

    @staticmethod
    def _foo(keep_dict: dict) -> bool:  # implement by yourself
        time.sleep(2)
        print('1%')
        print('2%')
        print('...')
        print('60%')
        keep_dict['status_1'] = 'status_1'
        return True

    @staticmethod
    def _bar(keep_dict: dict) -> bool:  # implement by yourself
        time.sleep(2)
        print('61%')
        print(keep_dict.get('status_1'))
        print('...')
        print('99%')
        return True

with TestTask('Hello World') as task:
    task.do_something()

console

安慰

input action number:2
Task Start!:Hello World
1%
2%
...
60%
KeyboardInterrupt. input action:
        EXIT:     0
        CONTINUE: 1
        RESET:    2
:1
61%
status_1
...
99%
100%
Task End!