C++ boost asio async_write :如何不交错 async_write 调用?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/7754695/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-28 17:26:23  来源:igfitidea点击:

boost asio async_write : how to not interleaving async_write calls?

c++asynchronousboost-asio

提问by TheSquad

Here's my implementation :

这是我的实现:

  • Client A send a message for Client B
  • Server process the message by async_readthe right amount of data and will wait for new data from Client A (in Order not to block Client A)
  • Afterwards Server will process the information (probably do a mysql query) and then send the message to Client B with async_write.
  • 客户端 A 向客户端 B 发送消息
  • 服务器通过async_read适量的数据处理消息,并等待来自客户端 A 的新数据(为了不阻塞客户端 A)
  • 随后服务器将处理该信息(可能是做一个MySQL查询),然后将消息发送到客户端B async_write

The problem is, if Client A send message really fast, async_writeswill interleave before the previous async_write handler is called.

问题是,如果客户端 A 发送消息真的很快,async_writes它将在调用前一个 async_write 处理程序之前交错。

Is there a simple way to avoid this problem ?

有没有一种简单的方法可以避免这个问题?

EDIT 1 : If a Client C sends a message to Client B just after Client A, the same issue should appear...

编辑 1:如果客户端 C 在客户端 A 之后立即向客户端 B 发送消息,则应出现相同的问题...

EDIT 2 : This would work ? because it seems to block, I don't know where...

编辑 2:这行得通吗?因为好像堵了,不知道在哪...

 namespace structure {                                                              
  class User {                                                                     
  public:                                                                          
    User(boost::asio::io_service& io_service, boost::asio::ssl::context& context) :
      m_socket(io_service, context), m_strand(io_service), is_writing(false) {}    

    ssl_socket& getSocket() {                                                      
      return m_socket;                                                             
    }                                                                              

    boost::asio::strand getStrand() {                                              
      return m_strand;                                                             
    }                                                                              

    void push(std::string str) {                                                   
      m_strand.post(boost::bind(&structure::User::strand_push, this, str));        
    }                                                                              

    void strand_push(std::string str) {                                            

      std::cout << "pushing: " << boost::this_thread::get_id() << std::endl;       
      m_queue.push(str);                                                           
      if (!is_writing) {                                                           
        write();                                                                   
        std::cout << "going to write" << std::endl;                                
      }                                                                            
      std::cout << "Already writing" << std::endl;                                 
    }                                                                              

    void write() {                                                                 
      std::cout << "writing" << std::endl;                                         
      is_writing = true;                                                           
      std::string str = m_queue.front();                                           
      boost::asio::async_write(m_socket,                                           
                               boost::asio::buffer(str.c_str(), str.size()),       
                               boost::bind(&structure::User::sent, this)           
                               );                                                  
    }                                                                              

    void sent() {                                                                  
      std::cout << "sent" << std::endl;                                            
      m_queue.pop();                                                               
      if (!m_queue.empty()) {                                                      
        write();                                                                   
        return;                                                                    
      }                                                                            
      else                                                                         
        is_writing = false;                                                        
      std::cout << "done sent" << std::endl;                                       
    }                                          

  private:                                     
    ssl_socket          m_socket;              
    boost::asio::strand m_strand;              
    std::queue<std::string>     m_queue;       
    bool                        is_writing;    
  };                                           
}                                              

#endif

回答by Sam Miller

Is there a simple way to avoid this problem ?

有没有一种简单的方法可以避免这个问题?

Yes, maintain an outgoing queue for each client. Inspect the queue size in the async_writecompletion handler, if non-zero, start another async_writeoperation. Here is a sample

是的,为每个客户端维护一个传出队列。检查async_write完成处理程序中的队列大小,如果非零,则开始另一个async_write操作。这是一个示例

#include <boost/asio.hpp>
#include <boost/bind.hpp>

#include <deque>
#include <iostream>
#include <string>

class Connection
{
public:
    Connection(
            boost::asio::io_service& io_service
            ) :
        _io_service( io_service ),
        _strand( _io_service ),
        _socket( _io_service ),
        _outbox()
    {

    }

    void write( 
            const std::string& message
            )
    {
        _strand.post(
                boost::bind(
                    &Connection::writeImpl,
                    this,
                    message
                    )
                );
    }

private:
    void writeImpl(
            const std::string& message
            )
    {
        _outbox.push_back( message );
        if ( _outbox.size() > 1 ) {
            // outstanding async_write
            return;
        }

        this->write();
    }

    void write()
    {
        const std::string& message = _outbox[0];
        boost::asio::async_write(
                _socket,
                boost::asio::buffer( message.c_str(), message.size() ),
                _strand.wrap(
                    boost::bind(
                        &Connection::writeHandler,
                        this,
                        boost::asio::placeholders::error,
                        boost::asio::placeholders::bytes_transferred
                        )
                    )
                );
    }

    void writeHandler(
            const boost::system::error_code& error,
            const size_t bytesTransferred
            )
    {
        _outbox.pop_front();

        if ( error ) {
            std::cerr << "could not write: " << boost::system::system_error(error).what() << std::endl;
            return;
        }

        if ( !_outbox.empty() ) {
            // more messages to send
            this->write();
        }
    }


private:
    typedef std::deque<std::string> Outbox;

private:
    boost::asio::io_service& _io_service;
    boost::asio::io_service::strand _strand;
    boost::asio::ip::tcp::socket _socket;
    Outbox _outbox;
};

int
main()
{
    boost::asio::io_service io_service;
    Connection foo( io_service );
}

some key points

一些关键点

  • the boost::asio::io_service::strandprotects access to Connection::_outbox
  • a handler is dispatched from Connection::write()since it is public
  • boost::asio::io_service::strand保护访问Connection::_outbox
  • 处理程序是从其分派的,Connection::write()因为它是公共的

it wasn't obvious to me if you were using similar practices in the example in your question since all methods are public.

如果您在问题的示例中使用类似的做法,对我来说并不明显,因为所有方法都是公开的。

回答by rustyx

Just trying to improve Sam's great answer. The improvement points are:

只是想改进 Sam 的好答案。改进点是:

  • async_writetries hard to send every single byte from the buffer(s) before completing, which means you should supply all the input data that you haveto the write operation, otherwise the framing overhead may increase due to TCP packets being smaller than they could have been.

  • asio::streambuf, while being very convenient to use, is not zero-copy. The example below demonstrates a zero-copyapproach: keep the input data chunks where they are and use a scatter/gather overload of async_writethat takes in a sequence of input buffers (which are just pointers to the actual input data).

  • async_write尝试在完成之前发送缓冲区中的每个字节,这意味着您应该将所有输入数据提供给写操作,否则由于 TCP 数据包比它们本来的要小,帧开销可能会增加.

  • asio::streambuf,虽然使用起来非常方便,但不是零拷贝。下面的示例演示了一种零复制方法:将输入数据块保持在它们所在的位置async_write,并在一系列输入缓冲区(它们只是指向实际输入数据的指针)中使用分散/聚集重载。

Full source code:

完整源代码:

#include <boost/asio.hpp>
#include <iostream>
#include <memory>
#include <mutex>
#include <string>
#include <thread>
#include <unordered_set>
#include <vector>

using namespace std::chrono_literals;
using boost::asio::ip::tcp;

class Server
{
  class Connection : public std::enable_shared_from_this<Connection>
  {
    friend class Server;
    void ProcessCommand(const std::string& cmd) {
      if (cmd == "stop") {
        server_.Stop();
        return;
      }
      if (cmd == "") {
        Close();
        return;
      }
      std::thread t([this, self = shared_from_this(), cmd] {
        for (int i = 0; i < 30; ++i) {
          Write("Hello, " + cmd + " " + std::to_string(i) + "\r\n");
        }
        server_.io_service_.post([this, self] {
          DoReadCmd();
        });
      });
      t.detach();
    }

    void DoReadCmd() {
      read_timer_.expires_from_now(server_.read_timeout_);
      read_timer_.async_wait([this](boost::system::error_code ec) {
        if (!ec) {
          std::cout << "Read timeout\n";
          Shutdown();
        }
      });
      boost::asio::async_read_until(socket_, buf_in_, '\n', [this, self = shared_from_this()](boost::system::error_code ec, std::size_t bytes_read) {
        read_timer_.cancel();
        if (!ec) {
          const char* p = boost::asio::buffer_cast<const char*>(buf_in_.data());
          std::string cmd(p, bytes_read - (bytes_read > 1 && p[bytes_read - 2] == '\r' ? 2 : 1));
          buf_in_.consume(bytes_read);
          ProcessCommand(cmd);
        }
        else {
          Close();
        }
      });
    }

    void DoWrite() {
      active_buffer_ ^= 1; // switch buffers
      for (const auto& data : buffers_[active_buffer_]) {
        buffer_seq_.push_back(boost::asio::buffer(data));
      }
      write_timer_.expires_from_now(server_.write_timeout_);
      write_timer_.async_wait([this](boost::system::error_code ec) {
        if (!ec) {
          std::cout << "Write timeout\n";
          Shutdown();
        }
      });
      boost::asio::async_write(socket_, buffer_seq_, [this, self = shared_from_this()](const boost::system::error_code& ec, size_t bytes_transferred) {
        write_timer_.cancel();
        std::lock_guard<std::mutex> lock(buffers_mtx_);
        buffers_[active_buffer_].clear();
        buffer_seq_.clear();
        if (!ec) {
          std::cout << "Wrote " << bytes_transferred << " bytes\n";
          if (!buffers_[active_buffer_ ^ 1].empty()) // have more work
            DoWrite();
        }
        else {
          Close();
        }
      });
    }
    bool Writing() const { return !buffer_seq_.empty(); }

    Server& server_;
    boost::asio::streambuf buf_in_;
    std::mutex buffers_mtx_;
    std::vector<std::string> buffers_[2]; // a double buffer
    std::vector<boost::asio::const_buffer> buffer_seq_;
    int active_buffer_ = 0;
    bool closing_ = false;
    bool closed_ = false;
    boost::asio::deadline_timer read_timer_, write_timer_;
    tcp::socket socket_;
  public:
    Connection(Server& server) : server_(server), read_timer_(server.io_service_), write_timer_(server.io_service_), socket_(server.io_service_) {
    }

    void Start() {
      socket_.set_option(tcp::no_delay(true));
      DoReadCmd();
    }

    void Close() {
      closing_ = true;
      if (!Writing())
        Shutdown();
    }

    void Shutdown() {
      if (!closed_) {
        closing_ = closed_ = true;
        boost::system::error_code ec;
        socket_.shutdown(tcp::socket::shutdown_both, ec);
        socket_.close();
        server_.active_connections_.erase(shared_from_this());
      }
    }

    void Write(std::string&& data) {
      std::lock_guard<std::mutex> lock(buffers_mtx_);
      buffers_[active_buffer_ ^ 1].push_back(std::move(data)); // move input data to the inactive buffer
      if (!Writing())
        DoWrite();
    }

  };

  void DoAccept() {
    if (acceptor_.is_open()) {
      auto session = std::make_shared<Connection>(*this);
      acceptor_.async_accept(session->socket_, [this, session](boost::system::error_code ec) {
        if (!ec) {
          active_connections_.insert(session);
          session->Start();
        }
        DoAccept();
      });
    }
  }

  boost::asio::io_service io_service_;
  tcp::acceptor acceptor_;
  std::unordered_set<std::shared_ptr<Connection>> active_connections_;
  const boost::posix_time::time_duration read_timeout_ = boost::posix_time::seconds(30);
  const boost::posix_time::time_duration write_timeout_ = boost::posix_time::seconds(30);

public:
  Server(int port) : acceptor_(io_service_, tcp::endpoint(tcp::v6(), port), false) { }

  void Run() {
    std::cout << "Listening on " << acceptor_.local_endpoint() << "\n";
    DoAccept();
    io_service_.run();
  }

  void Stop() {
    acceptor_.close();
    {
      std::vector<std::shared_ptr<Connection>> sessionsToClose;
      copy(active_connections_.begin(), active_connections_.end(), back_inserter(sessionsToClose));
      for (auto& s : sessionsToClose)
        s->Shutdown();
    }
    active_connections_.clear();
    io_service_.stop();
  }

};

int main() {
  try {
    Server srv(8888);
    srv.Run();
  }
  catch (const std::exception& e) {
    std::cerr << "Error: " << e.what() << "\n";
  }
}