Python RuntimeError: 在 async + apscheduler 的线程中没有当前的事件循环

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/46727787/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-19 17:47:40  来源:igfitidea点击:

RuntimeError: There is no current event loop in thread in async + apscheduler

pythonpython-3.xpython-asyncioaiohttpapscheduler

提问by Valera Shutylev

I have a async function and need to run in with apscheduller every N minutes. There is a python code below

我有一个异步功能,需要每 N 分钟运行一次 apscheduller。下面有一个python代码

URL_LIST = ['<url1>',
            '<url2>',
            '<url2>',
            ]

def demo_async(urls):
    """Fetch list of web pages asynchronously."""
    loop = asyncio.get_event_loop() # event loop
    future = asyncio.ensure_future(fetch_all(urls)) # tasks to do
    loop.run_until_complete(future) # loop until done

async def fetch_all(urls):
    tasks = [] # dictionary of start times for each url
    async with ClientSession() as session:
        for url in urls:
            task = asyncio.ensure_future(fetch(url, session))
            tasks.append(task) # create list of tasks
        _ = await asyncio.gather(*tasks) # gather task responses

async def fetch(url, session):
    """Fetch a url, using specified ClientSession."""
    async with session.get(url) as response:
        resp = await response.read()
        print(resp)

if __name__ == '__main__':
    scheduler = AsyncIOScheduler()
    scheduler.add_job(demo_async, args=[URL_LIST], trigger='interval', seconds=15)
    scheduler.start()
    print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))

    # Execution will block here until Ctrl+C (Ctrl+Break on Windows) is pressed.
    try:
        asyncio.get_event_loop().run_forever()
    except (KeyboardInterrupt, SystemExit):
        pass

But when i tried to run it i have the next error info

但是当我尝试运行它时,我有下一个错误信息

Job "demo_async (trigger: interval[0:00:15], next run at: 2017-10-12 18:21:12 +04)" raised an exception.....
..........\lib\asyncio\events.py", line 584, in get_event_loop
    % threading.current_thread().name)
RuntimeError: There is no current event loop in thread '<concurrent.futures.thread.ThreadPoolExecutor object at 0x0356B150>_0'.

Could you please help me with this? Python 3.6, APScheduler 3.3.1,

你能帮我解决这个问题吗?Python 3.6,APScheduler 3.3.1,

采纳答案by Alex Gr?nholm

Just pass fetch_allto scheduler.add_job()directly. The asyncio scheduler supports coroutine functions as job targets.

直接传过去就好fetch_allscheduler.add_job()。asyncio 调度器支持协程函数作为作业目标。

If the target callable is nota coroutine function, it will be run in a worker thread (due to historical reasons), hence the exception.

如果目标可调用不是协程函数,它将在工作线程中运行(由于历史原因),因此例外。

回答by prezha

In your def demo_async(urls), try to replace:

在您的 中def demo_async(urls),尝试替换:

loop = asyncio.get_event_loop()

with:

和:

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

回答by radzak

The important thing that hasn't been mentioned is why the error occurs. For me personally, knowing why the error occurs is as important as solving the actual problem.

没有提到的重要事情是错误发生的原因。就我个人而言,了解错误发生的原因与解决实际问题一样重要。

Let's take a look at the implementation of the get_event_loopof BaseDefaultEventLoopPolicy:

我们来看看 of 的get_event_loop实现BaseDefaultEventLoopPolicy

class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy):
    ...

    def get_event_loop(self):
        """Get the event loop.

        This may be None or an instance of EventLoop.
        """
        if (self._local._loop is None and
            not self._local._set_called and
            isinstance(threading.current_thread(), threading._MainThread)):
            self.set_event_loop(self.new_event_loop())
        if self._local._loop is None:
            raise RuntimeError('There is no current event loop in thread %r.'
                               % threading.current_thread().name)
        return self._local._loop

You can see that the self.set_event_loop(self.new_event_loop())is only executed if all of the below conditions are met:

您可以看到self.set_event_loop(self.new_event_loop())只有在满足以下所有条件时才会执行:

  • self._local._loop is None- _local._loopis not set
  • not self._local._set_called- set_event_loophasn't been called yet
  • isinstance(threading.current_thread(), threading._MainThread)- current thread is the main one (this is not True in your case)
  • self._local._loop is None-_local._loop未设置
  • not self._local._set_called-set_event_loop还没有被调用
  • isinstance(threading.current_thread(), threading._MainThread)- 当前线程是主要线程(在您的情况下这不是真的)

Therefore the exception is raised, because no loop is set in the current thread:

因此引发异常,因为当前线程中没有设置循环:

if self._local._loop is None:
    raise RuntimeError('There is no current event loop in thread %r.'
                       % threading.current_thread().name)

回答by Asaf Pinhassi

Use asyncio.run()instead of directly using the event loop. It creates a new loop and closes it when finished.

使用asyncio.run()而不是直接使用事件循环。它创建一个新循环并在完成时关闭它。

This is how the 'run' looks like:

这是“运行”的样子:

if events._get_running_loop() is not None:
    raise RuntimeError(
        "asyncio.run() cannot be called from a running event loop")

if not coroutines.iscoroutine(main):
    raise ValueError("a coroutine was expected, got {!r}".format(main))

loop = events.new_event_loop()
try:
    events.set_event_loop(loop)
    loop.set_debug(debug)
    return loop.run_until_complete(main)
finally:
    try:
        _cancel_all_tasks(loop)
        loop.run_until_complete(loop.shutdown_asyncgens())
    finally:
        events.set_event_loop(None)
        loop.close()

回答by sodimel

Since this question continues to appear on the first page, I will write my problem and my answer here.

由于这个问题继续出现在第一页,我将在这里写下我的问题和我的答案。

I had a RuntimeError: There is no current event loop in thread 'Thread-X'.when using flask-socketioand Bleak.

我有一个RuntimeError: There is no current event loop in thread 'Thread-X'.使用flask-socketioBleak 的时候



Edit:well, I refactored my file and made a class.

编辑:好吧,我重构了我的文件并创建了一个类。

I initialized the loop in the constructor, and now everything is working fine:

我在构造函数中初始化了循环,现在一切正常:

class BLE:
    def __init__(self):
        self.loop = asyncio.get_event_loop()

    # function example, improvement of
    # https://github.com/hbldh/bleak/blob/master/examples/discover.py :
    def list_bluetooth_low_energy(self) -> list:
        async def run() -> list:
            BLElist = []
            devices = await bleak.discover()
            for d in devices:
                BLElist.append(d.name)
            return 'success', BLElist
        return self.loop.run_until_complete(run())

Usage:

用法:

ble = path.to.lib.BLE()
list = ble.list_bluetooth_low_energy()


Original answer:

原答案:

The solution was stupid. I did not pay attention to what I did, but I moved some importout of a function, like this:

解决方案是愚蠢的。我没有注意我做了什么,但我import从一个函数中移出了一些,如下所示:

import asyncio, platform
from bleak import discover

def listBLE() -> dict:
    async def run() -> dict:
        # my code that keep throwing exceptions.

    loop = asyncio.get_event_loop()
    ble_list = loop.run_until_complete(run())
    return ble_list

So I thought that I needed to change something in my code, and I created a new event loop using this piece of code just before the line with get_event_loop():

因此,我认为我需要更改代码中的某些内容,并且在以下行之前使用这段代码创建了一个新的事件循环get_event_loop()

loop = asyncio.new_event_loop()
loop = asyncio.set_event_loop()

At this moment I was pretty happy, since I had a loop running.

这一刻我很高兴,因为我有一个循环运行。

But not responding. And my code relied on a timeout to return some values, so it was pretty bad for my app.

但没有回应。而且我的代码依赖于超时来返回一些值,所以这对我的应用程序来说非常糟糕。

It took me nearly two hours to figure out that the problem was the import, and here is my (working) code:

我花了将近两个小时才发现问题是import,这是我的(工作)代码:

def list() -> dict:
    import asyncio, platform
    from bleak import discover

    async def run() -> dict:
        # my code running perfectly

    loop = asyncio.get_event_loop()
    ble_list  = loop.run_until_complete(run())
    return ble_list