Linux C++ 套接字协议缓冲区

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/9496101/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-06 04:54:15  来源:igfitidea点击:

Protocol Buffer over socket in C++

c++linux

提问by punith

I am trying to explore Protocol Buffer (PB) in Linux platform and my coding language is C++. I found examples in the protocol buffer online docs but nothing specific to socket send and receive (Or I have missed it completely :) ). So I decided to add the message Length before the actual message and send it across socket. I would appreciate if anyone can suggest a better solution than what I am planning to do and also is there anything ready made in PB for creating such packets.

我正在尝试在 Linux 平台上探索协议缓冲区 (PB),我的编码语言是 C++。我在协议缓冲区在线文档中找到了示例,但没有任何特定于套接字发送和接收的内容(或者我完全错过了 :))。所以我决定在实际消息之前添加消息长度并通过套接字发送它。如果有人能提出比我计划做的更好的解决方案,我将不胜感激,并且在 PB 中是否有任何准备好用于创建此类数据包的内容。

But I still end up with a problem at server side where I have to decode the packet. Say if the client sends a packet of 10 byte in which first 4 byte is the length of the packet; But it is impossible to know the length before decoding the packet. So even if i read the first 4 byte how do i deduce the the value with half read packet using Protocol Buffer.

但我仍然在服务器端遇到问题,我必须解码数据包。假设客户端发送一个 10 字节的数据包,其中前 4 个字节是数据包的长度;但是在解码数据包之前是不可能知道长度的。因此,即使我读取了前 4 个字节,我如何使用协议缓冲区推导出具有半读数据包的值。

采纳答案by punith

At last I could get it working . I am posting the code here so that one can review and comment on it as well as if some one wants to implement it in c++, this piece of code can help. Its a shabby code my intention was to get Protobuf working in length prefixed manner. I have taken the code of client server from some site which I don't remember and I have modified it to accommodate protobuf. Here the server first peeks into the socket and gets the length of the total packet and then actual socket read is done to read the entire packet. There can be zillion ways to do it but for quick solution I did it in this manner. But I need to find a better way to avoid 2 recv per packets, but in my condition all the messages are of different size, so this is the only way I guess.

最后我可以让它工作。我在这里发布代码,以便人们可以对其进行和评论,如果有人想用 C++ 实现它,这段代码可以提供帮助。这是一个破旧的代码,我的目的是让 Protobuf 以长度前缀的方式工作。我从某个我不记得的站点上获取了客户端服务器的代码,并且我对其进行了修改以适应 protobuf。这里服务器首先查看套接字并获取总数据包的长度,然后进行实际套接字读取以读取整个数据包。可以有无数种方法来做到这一点,但为了快速解决,我以这种方式做到了。但是我需要找到一种更好的方法来避免每个数据包 2 recv,但在我的情况下,所有消息的大小都不同,所以这是我猜的唯一方法。

Proto file

原型文件

  message log_packet {
  required fixed64 log_time =1;
  required fixed32 log_micro_sec =2;
  required fixed32 sequence_no =3;
  required fixed32 shm_app_id =4;
  required string packet_id =5;
  required string log_level=6;
  required string log_msg=7;
  }

Protocol buffer Client Code

协议缓冲区客户端代码

#include <unistd.h>
#include "message.pb.h"
#include <iostream>
#include <google/protobuf/message.h>
#include <google/protobuf/descriptor.h>
#include <google/protobuf/io/zero_copy_stream_impl.h>
#include <google/protobuf/io/coded_stream.h>
#include <google/protobuf/io/zero_copy_stream_impl_lite.h>


using namespace google::protobuf::io;

using namespace std;
int main(int argv, char** argc){

/* Coded output stram */

log_packet payload ;

payload.set_log_time(10);
payload.set_log_micro_sec(10);
payload.set_sequence_no(1);
payload.set_shm_app_id(101);
payload.set_packet_id("TST");
payload.set_log_level("DEBUG");
payload.set_log_msg("What shall we say then");

cout<<"size after serilizing is "<<payload.ByteSize()<<endl;
int siz = payload.ByteSize()+4;
char *pkt = new char [siz];
google::protobuf::io::ArrayOutputStream aos(pkt,siz);
CodedOutputStream *coded_output = new CodedOutputStream(&aos);
coded_output->WriteVarint32(payload.ByteSize());
payload.SerializeToCodedStream(coded_output);

        int host_port= 1101;
        char* host_name="127.0.0.1";

        struct sockaddr_in my_addr;

        char buffer[1024];
        int bytecount;
        int buffer_len=0;

        int hsock;
        int * p_int;
        int err;

        hsock = socket(AF_INET, SOCK_STREAM, 0);
        if(hsock == -1){
                printf("Error initializing socket %d\n",errno);
                goto FINISH;
        }

        p_int = (int*)malloc(sizeof(int));
        *p_int = 1;

        if( (setsockopt(hsock, SOL_SOCKET, SO_REUSEADDR, (char*)p_int, sizeof(int)) == -1 )||
                (setsockopt(hsock, SOL_SOCKET, SO_KEEPALIVE, (char*)p_int, sizeof(int)) == -1 ) ){
                printf("Error setting options %d\n",errno);
                free(p_int);
                goto FINISH;
        }
        free(p_int);

        my_addr.sin_family = AF_INET ;
        my_addr.sin_port = htons(host_port);

        memset(&(my_addr.sin_zero), 0, 8);
        my_addr.sin_addr.s_addr = inet_addr(host_name);
        if( connect( hsock, (struct sockaddr*)&my_addr, sizeof(my_addr)) == -1 ){
                if((err = errno) != EINPROGRESS){
                        fprintf(stderr, "Error connecting socket %d\n", errno);
                        goto FINISH;
                }
        }




       for (int i =0;i<10000;i++){
          for (int j = 0 ;j<10;j++) {

                if( (bytecount=send(hsock, (void *) pkt,siz,0))== -1 ) {
                        fprintf(stderr, "Error sending data %d\n", errno);
                        goto FINISH;
                }
                printf("Sent bytes %d\n", bytecount);
                usleep(1);
         }
        }
        delete pkt;

FINISH:
        close(hsock);

}

Protocol buffer Server Code

协议缓冲区服务器代码

#include <fcntl.h>
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include <stdio.h>
#include <netinet/in.h>
#include <resolv.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <pthread.h>
#include "message.pb.h"
#include <iostream>
#include <google/protobuf/io/coded_stream.h>
#include <google/protobuf/io/zero_copy_stream_impl.h>

using namespace std;
using namespace google::protobuf::io;



void* SocketHandler(void*);

int main(int argv, char** argc){

        int host_port= 1101;

        struct sockaddr_in my_addr;

        int hsock;
        int * p_int ;
        int err;

        socklen_t addr_size = 0;
        int* csock;
        sockaddr_in sadr;
        pthread_t thread_id=0;

        hsock = socket(AF_INET, SOCK_STREAM, 0);
        if(hsock == -1){
                printf("Error initializing socket %d\n", errno);
                goto FINISH;
        }

        p_int = (int*)malloc(sizeof(int));
        *p_int = 1;

        if( (setsockopt(hsock, SOL_SOCKET, SO_REUSEADDR, (char*)p_int, sizeof(int)) == -1 )||
                (setsockopt(hsock, SOL_SOCKET, SO_KEEPALIVE, (char*)p_int, sizeof(int)) == -1 ) ){
                printf("Error setting options %d\n", errno);
                free(p_int);
                goto FINISH;
        }
        free(p_int);

        my_addr.sin_family = AF_INET ;
        my_addr.sin_port = htons(host_port);

        memset(&(my_addr.sin_zero), 0, 8);
        my_addr.sin_addr.s_addr = INADDR_ANY ;

        if( bind( hsock, (sockaddr*)&my_addr, sizeof(my_addr)) == -1 ){
                fprintf(stderr,"Error binding to socket, make sure nothing else is listening on this port %d\n",errno);
                goto FINISH;
        }
        if(listen( hsock, 10) == -1 ){
                fprintf(stderr, "Error listening %d\n",errno);
                goto FINISH;
        }

        //Now lets do the server stuff

        addr_size = sizeof(sockaddr_in);

        while(true){
                printf("waiting for a connection\n");
                csock = (int*)malloc(sizeof(int));
                if((*csock = accept( hsock, (sockaddr*)&sadr, &addr_size))!= -1){
                        printf("---------------------\nReceived connection from %s\n",inet_ntoa(sadr.sin_addr));
                        pthread_create(&thread_id,0,&SocketHandler, (void*)csock );
                        pthread_detach(thread_id);
                }
                else{
                        fprintf(stderr, "Error accepting %d\n", errno);
                }
        }

FINISH:
;//oops
}

google::protobuf::uint32 readHdr(char *buf)
{
  google::protobuf::uint32 size;
  google::protobuf::io::ArrayInputStream ais(buf,4);
  CodedInputStream coded_input(&ais);
  coded_input.ReadVarint32(&size);//Decode the HDR and get the size
  cout<<"size of payload is "<<size<<endl;
  return size;
}

void readBody(int csock,google::protobuf::uint32 siz)
{
  int bytecount;
  log_packet payload;
  char buffer [siz+4];//size of the payload and hdr
  //Read the entire buffer including the hdr
  if((bytecount = recv(csock, (void *)buffer, 4+siz, MSG_WAITALL))== -1){
                fprintf(stderr, "Error receiving data %d\n", errno);
        }
  cout<<"Second read byte count is "<<bytecount<<endl;
  //Assign ArrayInputStream with enough memory 
  google::protobuf::io::ArrayInputStream ais(buffer,siz+4);
  CodedInputStream coded_input(&ais);
  //Read an unsigned integer with Varint encoding, truncating to 32 bits.
  coded_input.ReadVarint32(&siz);
  //After the message's length is read, PushLimit() is used to prevent the CodedInputStream 
  //from reading beyond that length.Limits are used when parsing length-delimited 
  //embedded messages
  google::protobuf::io::CodedInputStream::Limit msgLimit = coded_input.PushLimit(siz);
  //De-Serialize
  payload.ParseFromCodedStream(&coded_input);
  //Once the embedded message has been parsed, PopLimit() is called to undo the limit
  coded_input.PopLimit(msgLimit);
  //Print the message
  cout<<"Message is "<<payload.DebugString();

}

void* SocketHandler(void* lp){
    int *csock = (int*)lp;

        char buffer[4];
        int bytecount=0;
        string output,pl;
        log_packet logp;

        memset(buffer, '##代码##', 4);

        while (1) {
        //Peek into the socket and get the packet size
        if((bytecount = recv(*csock,
                         buffer,
                                 4, MSG_PEEK))== -1){
                fprintf(stderr, "Error receiving data %d\n", errno);
        }else if (bytecount == 0)
                break;
        cout<<"First read byte count is "<<bytecount<<endl;
        readBody(*csock,readHdr(buffer));
        }

FINISH:
        free(csock);
    return 0;
}

回答by Tamás Szelei

Unfortunately, protobuf does not provide a way for "packaging" (delimiting) your messages:

不幸的是,protobuf 没有提供“打包”(定界)你的消息的方法:

If you want to write multiple messages to a single file or stream, it is up to you to keep track of where one message ends and the next begins. The Protocol Buffer wire format is not self-delimiting, so protocol buffer parsers cannot determine where a message ends on their own. The easiest way to solve this problem is to write the size of each message before you write the message itself.

如果您想将多条消息写入单个文件或流,则由您来跟踪一条消息的结束位置和下一条消息的开始位置。Protocol Buffer 有线格式不是自定界的,因此 Protocol Buffer 解析器无法自行确定消息的结束位置。解决这个问题最简单的方法是在写消息本身之前写下每条消息的大小。

(from their documentation)

(来自他们的文档

So, they basically recommend the same solution you arrived at.

因此,他们基本上推荐了与您获得的相同的解决方案。