C++ 我可以在不等待未来限制的情况下使用 std::async 吗?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/21531096/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-27 23:41:42  来源:igfitidea点击:

Can I use std::async without waiting for the future limitation?

c++multithreadingc++11asynchronousstdasync

提问by Roee Gavirel

High level
I want to call some functions with no return value in a async mode without waiting for them to finish. If I use std::async the future object doesn't destruct until the task is over, this make the call not sync in my case.

高级
我想在异步模式下调用一些没有返回值的函数,而无需等待它们完成。如果我使用 std::async 未来对象在任务结束之前不会破坏,这会使调用在我的情况下不同步。

Example

例子

void sendMail(const std::string& address, const std::string& message)
{
    //sending the e-mail which takes some time...
}

myResonseType processRequest(args...)
{
    //Do some processing and valuate the address and the message...

    //Sending the e-mail async
    auto f = std::async(std::launch::async, sendMail, address, message);

    //returning the response ASAP to the client
    return myResponseType;

} //<-- I'm stuck here until the async call finish to allow f to be destructed.
  // gaining no benefit from the async call.

My questions are

我的问题是

  1. Is there a way to overcome this limitation?
  2. if (1) is no, should I implement once a thread that will take those "zombie" futures and wait on them?
  3. Is (1) and (2) are no, is there any other option then just build my own thread pool?
  1. 有没有办法克服这个限制?
  2. 如果(1)不是,我是否应该实现一个线程来获取那些“僵尸”期货并等待它们?
  3. (1) 和 (2) 是否定的,有没有其他选择然后构建我自己的线程池?

note:
I rather not using the option of thread+detach (suggested by @galop1n) since creating a new thread have an overhead I wish to avoid. While using std::async (at least on MSVC) is using an inner thread pool.

注意:
我宁愿不使用线程+分离的选项(由@galop1n 建议),因为创建新线程有我希望避免的开销。使用 std::async (至少在 MSVC 上)是使用内部线程池。

Thanks.

谢谢。

回答by Jonathan Wakely

You can move the future into a global object, so when the local future's destructor runs it doesn't have to wait for the asynchronous thread to complete.

您可以将未来移动到全局对象中,因此当本地未来的析构函数运行时,它不必等待异步线程完成。

std::vector<std::future<void>> pending_futures;

myResonseType processRequest(args...)
{
    //Do some processing and valuate the address and the message...

    //Sending the e-mail async
    auto f = std::async(std::launch::async, sendMail, address, message);

    // transfer the future's shared state to a longer-lived future
    pending_futures.push_back(std::move(f));

    //returning the response ASAP to the client
    return myResponseType;

}

N.B. This is not safe if the asynchronous thread refers to any local variables in the processRequestfunction.

注意如果异步线程引用processRequest函数中的任何局部变量,这将是不安全的。

While using std::async(at least on MSVC) is using an inner thread pool.

使用时std::async(至少在 MSVC 上)是使用内部线程池。

That's actually non-conforming, the standard explicitly says tasks run with std::launch::asyncmust run as if in a new thread, so any thread-local variables must not persist from one task to another. It doesn't usually matter though.

这实际上是不符合标准的,标准明确规定运行的任务std::launch::async必须像在新线程中一样运行,因此任何线程局部变量不得从一个任务持续到另一个任务。不过这通常无关紧要。

回答by galop1n

why do you not just start a thread and detach if you do not care on joining ?

如果您不关心加入,为什么不直接启动一个线程并分离?

std::thread{ sendMail, address, message}.detach();   

std::async is bound to the lifetime of the std::future it returns and their is no alternative to that.

std::async 绑定到它返回的 std::future 的生命周期,并且不能替代它。

Putting the std::future in a waiting queue read by an other thread will require the same safety mechanism as a pool receiving new task, like mutex around the container.

将 std::future 放入由其他线程读取的等待队列将需要与接收新任务的池相同的安全机制,例如容器周围的互斥锁。

Your best option, then, is a thread pool to consume tasks directly pushed in a thread safe queue. And it will not depends on a specific implementation.

那么,您最好的选择是使用线程池来使用直接推送到线程安全队列中的任务。并且它不会取决于特定的实现。

Below a thread pool implementation taking any callable and arguments, the threads do poling on the queue, a better implementation should use condition variables (coliru) :

在采用任何可调用和参数的线程池实现下方,线程在队列上进行轮询,更好的实现应该使用条件变量 ( coliru):

#include <iostream>
#include <queue>
#include <memory>
#include <thread>
#include <mutex>
#include <functional>
#include <string>

struct ThreadPool {
    struct Task {
        virtual void Run() const = 0;
        virtual ~Task() {};
    };   

    template < typename task_, typename... args_ >
    struct RealTask : public Task {
        RealTask( task_&& task, args_&&... args ) : fun_( std::bind( std::forward<task_>(task), std::forward<args_>(args)... ) ) {}
        void Run() const override {
            fun_();
        }
    private:
        decltype( std::bind(std::declval<task_>(), std::declval<args_>()... ) ) fun_;
    };

    template < typename task_, typename... args_ >
    void AddTask( task_&& task, args_&&... args ) {
        auto lock = std::unique_lock<std::mutex>{mtx_};
        using FinalTask = RealTask<task_, args_... >;
        q_.push( std::unique_ptr<Task>( new FinalTask( std::forward<task_>(task), std::forward<args_>(args)... ) ) );
    }

    ThreadPool() {
        for( auto & t : pool_ )
            t = std::thread( [=] {
                while ( true ) {
                    std::unique_ptr<Task> task;
                    {
                        auto lock = std::unique_lock<std::mutex>{mtx_};
                        if ( q_.empty() && stop_ ) 
                            break;
                        if ( q_.empty() )
                            continue;
                        task = std::move(q_.front());
                        q_.pop();
                    }
                    if (task)
                        task->Run();
                }
            } );
    }
    ~ThreadPool() {
        {
            auto lock = std::unique_lock<std::mutex>{mtx_};
            stop_ = true;
        }
        for( auto & t : pool_ )
            t.join();
    }
private:
    std::queue<std::unique_ptr<Task>> q_;
    std::thread pool_[8]; 
    std::mutex mtx_;
    volatile bool stop_ {};
};

void foo( int a, int b ) {
    std::cout << a << "." << b;
}
void bar( std::string const & s) {
    std::cout << s;
}

int main() {
    ThreadPool pool;
    for( int i{}; i!=42; ++i ) {
        pool.AddTask( foo, 3, 14 );    
        pool.AddTask( bar, " - " );    
    }
}

回答by galop1n

Rather than moving the future into a global object(and manually manage deletion of unused futures), you can actually move it into the local scopeof the asynchronously called function.

与其将未来移动到全局对象中(并手动管理未使用的期货的删除),您实际上可以将其移动到异步调用函数的本地范围内

"Let the async function take its own future", so to speak.

“让异步函数拥有自己的未来”,可以这么说。

I have come up with this template wrapper which works for me (tested on Windows):

我想出了这个对我有用的模板包装器(在 Windows 上测试过):

#include <future>

template<class Function, class... Args>
void async_wrapper(Function&& f, Args&&... args, std::future<void>& future,
                   std::future<void>&& is_valid, std::promise<void>&& is_moved) {
    is_valid.wait(); // Wait until the return value of std::async is written to "future"
    auto our_future = std::move(future); // Move "future" to a local variable
    is_moved.set_value(); // Only now we can leave void_async in the main thread

    // This is also used by std::async so that member function pointers work transparently
    auto functor = std::bind(f, std::forward<Args>(args)...);
    functor();
}

template<class Function, class... Args> // This is what you call instead of std::async
void void_async(Function&& f, Args&&... args) {
    std::future<void> future; // This is for std::async return value
    // This is for our synchronization of moving "future" between threads
    std::promise<void> valid;
    std::promise<void> is_moved;
    auto valid_future = valid.get_future();
    auto moved_future = is_moved.get_future();

    // Here we pass "future" as a reference, so that async_wrapper
    // can later work with std::async's return value
    future = std::async(
        async_wrapper<Function, Args...>,
        std::forward<Function>(f), std::forward<Args>(args)...,
        std::ref(future), std::move(valid_future), std::move(is_moved)
    );
    valid.set_value(); // Unblock async_wrapper waiting for "future" to become valid
    moved_future.wait(); // Wait for "future" to actually be moved
}

I am a little surprised it works because I thought that the moved future's destructor would block until we leave async_wrapper. It should wait for async_wrapperto return but it is waiting inside that very function. Logically, it should be a deadlock but it isn't.

我有点惊讶它的工作原理,因为我认为移动的未来的析构函数会阻塞,直到我们离开async_wrapper。它应该等待async_wrapper返回,但它正在该函数内部等待。从逻辑上讲,它应该是一个僵局,但事实并非如此。

I also tried to add a line at the end of async_wrapperto manually empty the future object:

我还尝试在async_wrapper的末尾添加一行来手动清空未来对象:

our_future = std::future<void>();

This does not block either.

这也不会阻塞。

回答by hanshenrik

i have no idea what i'm doing, but this seem to work:

我不知道我在做什么,但这似乎有效:

// :( http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2012/n3451.pdf
template<typename T>
void noget(T&& in)
{
    static std::mutex vmut;
    static std::vector<T> vec;
    static std::thread getter;
    static std::mutex single_getter;
    if (single_getter.try_lock())
    {
        getter = std::thread([&]()->void
        {
            size_t size;
            for(;;)
            {
                do
                {
                    vmut.lock();
                    size=vec.size();
                    if(size>0)
                    {
                        T target=std::move(vec[size-1]);
                        vec.pop_back();
                        vmut.unlock();
                        // cerr << "getting!" << endl;
                        target.get();
                    }
                    else
                    {
                        vmut.unlock();
                    }
                }while(size>0);
                // ˉ\_(ツ)_/ˉ
                std::this_thread::sleep_for(std::chrono::milliseconds(100));
            }
        });
        getter.detach();
    }
    vmut.lock();
    vec.push_back(std::move(in));
    vmut.unlock();
}

it creates a dedicated getter thread for each type of future you throw at it (eg. if you give a future and future, you'll have 2 threads. if you give it 100x future, you'll still only have 2 threads), and when there's a future you don't want to deal with, just do notget(fut);- you can also noget(std::async([]()->void{...}));works just fine, no block, it seems. warning, do nottry to get the value from a future after using noget() on it. that's probably UB and asking for trouble.

它为您投入的每种类型的未来创建一个专用的 getter 线程(例如,如果您提供未来和未来,您将有 2 个线程。如果您给它 100x 未来,您仍然只有 2 个线程),当有一个你不想处理的未来时,就去做notget(fut);——你也可以noget(std::async([]()->void{...}));工作得很好,似乎没有障碍。警告,千万不能尝试使用noget()就可以了之后,得到一个未来的价值。那可能是 UB 并自找麻烦。