javascript Node.js TCP 服务器传入缓冲区

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/5955498/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-25 18:56:34  来源:igfitidea点击:

Node.js TCP server incoming buffer

javascripttcpnode.js

提问by user747454

I have two node processes that speak to each other. I will call them [Node Server]and [Node Sender]. [Node Sender]continually processes information and writes a message over a TCP connection to [Node Server]. [Node Server]then writes back a status message.

我有两个相互通信的节点进程。我将它们称为[Node Server][Node Sender][Node Sender]不断处理信息,并通过 TCP 连接将消息写入[Node Server][节点服务器]然后写回状态消息。

Example of [Node Sender]:

[节点发送者]示例:

var message = "Test Message";
[Node Sender].connection.write(message);

Example of [Node Server]:

【节点服务器】示例:

[Node Server].socket.on("data", function(p_data) {
    this.write("OK");

    // Do some work with p_data
}

This works without issue, p_dataalways contains "Test Message"when sent at anything above 5 milliseconds. However, if I speed [Node Sender]to write every millisecond, p_dataoccasionally ends up with something like "Test MessageTest MessageTes".

这可以正常工作,p_data在以 5 毫秒以上的任何时间发送时始终包含“测试消息”。但是,如果我加速[Node Sender]每毫秒写入一次,p_data偶尔会以类似"Test MessageTest MessageTes" 的方式结束

I understand that the buffer in [Node Sender]is probably filling faster than the write command is sending it. Is there a way to force a one-to-one ratio in sending messages while still remaining asynchronous?

我知道[Node Sender]中的缓冲区填充速度可能比写入命令发送它的速度快。有没有办法强制一对一的消息发送同时保持异步?

I can certainly just add a terminator to my message and just fill a buffer in [Node Server], but I wanted to make sure there wasn't something obvious I was missing.

我当然可以在我的消息中添加一个终止符并在[Node Server] 中填充一个缓冲区,但我想确保没有明显我遗漏的东西。

回答by Rob Raisch

No, you're not missing anything and yes, you do need to add some form of termination to your messages.

不,您没有遗漏任何东西,是的,您确实需要在消息中添加某种形式的终止。

There are two basic problems here:

这里有两个基本问题:

  1. The TCP protocol is stream-oriented, not message-oriented; it has no intrinsic knowledge of what might constitute a "message".

  2. The data event fired by the node.js net library indicates that somedata has arrived but without any idea of what a message might contain, it cannot indicate that it has received any specific data.

  1. TCP 协议是面向流的,而不是面向消息的;它对可能构成"消息"的内容没有内在知识。

  2. node.js 网络库触发的数据事件表明一些数据已经到达,但不知道消息可能包含什么,它不能表明它已收到任何特定数据。

So by sending your messages faster than Node can process them, the socket recv buffer fills with multiple "messages".

因此,通过以比 Node 处理它们更快的速度发送您的消息,套接字接收缓冲区会填充多个“消息”。

A typical solution to this problem is to add line-termination, as can be found in https://github.com/baudehlo/Haraka/blob/master/connection.jsat lines 32-34:

此问题的典型解决方案是添加行终止,如 https://github.com/baudehlo/Haraka/blob/master/connection.js第 32-34 行所示:

self.client.on('data', function (data) {
    self.process_data(data);
});

and lines 110-134:

和第 110-134 行:

Connection.prototype.process_data = function (data) {
  if (this.disconnected) {
    logger.logwarn("data after disconnect from " + this.remote_ip);
    return;
  }

  this.current_data += data;
  this._process_data();
};

Connection.prototype._process_data = function() {
  var results;
  while (results = line_regexp.exec(this.current_data)) {
    var this_line = results[1];
    if (this.state === 'pause') {
        this.early_talker = 1;
        var self = this;
        // If you talk early, we're going to give you a delay
        setTimeout(function() { self._process_data() }, this.early_talker_delay);
        break;
    }
    this.current_data = this.current_data.slice(this_line.length);
    this.process_line(this_line);
  }
};

回答by jeremyko

You need to accumulate incoming buffer data to get your complete message. please refer to below example. this server expects data with 4 byte header and message body. header is unsigned int which means total length of body and the body is string data with delimiter '|'. please note that it is possible that this 'header and message' is not being received at a time. so we need to accumulate incoming data until we get a full length of data. And it is also possible that multiple 'header and message' is being received at a time. The point is that we need to data accumulation.

您需要累积传入的缓冲区数据才能获得完整的消息。请参考下面的例子。该服务器需要具有 4 字节标头和消息正文的数据。标头是 unsigned int,这意味着主体的总长度,主体是带有分隔符“|”的字符串数据。请注意,有可能一次未收到此“标题和消息”。所以我们需要积累传入的数据,直到我们得到完整长度的数据。并且一次接收多个“标题和消息”也是可能的。关键是我们需要数据积累。

var SERVER_PORT = 8124;
var TCP_DELIMITER = '|';
var packetHeaderLen = 4; // 32 bit integer --> 4

var server = net.createServer( function(c) {
    var accumulatingBuffer = new Buffer(0); 
    var totalPacketLen   = -1; 
    var accumulatingLen  =  0;
    var recvedThisTimeLen=  0;
    var remoteAddress = c.remoteAddress;
    var address= c.address();
    var remotePort= c.remotePort;
    var remoteIpPort = remoteAddress +":"+ remotePort;

    console.log('-------------------------------'+remoteAddress);
    console.log('remoteIpPort='+ remoteIpPort); 

    c.on('data', function(data) {
        console.log('received data length :' + data.length ); 
        console.log('data='+ data); 

        recvedThisTimeLen = data.length;
        console.log('recvedThisTimeLen='+ recvedThisTimeLen);

        //accumulate incoming data
        var tmpBuffer = new Buffer( accumulatingLen + recvedThisTimeLen );
        accumulatingBuffer.copy(tmpBuffer);
        data.copy ( tmpBuffer, accumulatingLen  ); // offset for accumulating
        accumulatingBuffer = tmpBuffer; 
        tmpBuffer = null;
        accumulatingLen += recvedThisTimeLen ;
        console.log('accumulatingBuffer = ' + accumulatingBuffer  ); 
        console.log('accumulatingLen    =' + accumulatingLen );

        if( recvedThisTimeLen < packetHeaderLen ) {
            console.log('need to get more data(less than header-length received) -> wait..');
            return;
        } else if( recvedThisTimeLen == packetHeaderLen ) {
            console.log('need to get more data(only header-info is available) -> wait..');
            return;
        } else {
            console.log('before-totalPacketLen=' + totalPacketLen ); 
            //a packet info is available..
            if( totalPacketLen < 0 ) {
                totalPacketLen = accumulatingBuffer.readUInt32BE(0) ; 
                console.log('totalPacketLen=' + totalPacketLen );
            }
        }    

        //while=> 
        //in case of the accumulatingBuffer has multiple 'header and message'.
        while( accumulatingLen >= totalPacketLen + packetHeaderLen ) {
            console.log( 'accumulatingBuffer= ' + accumulatingBuffer );

            var aPacketBufExceptHeader = new Buffer( totalPacketLen  ); // a whole packet is available...
            console.log( 'aPacketBufExceptHeader len= ' + aPacketBufExceptHeader.length );
            accumulatingBuffer.copy( aPacketBufExceptHeader, 0, packetHeaderLen, accumulatingBuffer.length); // 

            ////////////////////////////////////////////////////////////////////
            //process one packet data
            var stringData = aPacketBufExceptHeader.toString();
            var usage = stringData.substring(0,stringData.indexOf(TCP_DELIMITER));
            console.log("usage: " + usage);
            //call handler
            (serverFunctions [usage])(c, remoteIpPort, stringData.substring(1+stringData.indexOf(TCP_DELIMITER)));
            ////////////////////////////////////////////////////////////////////

            //rebuild buffer
            var newBufRebuild = new Buffer( accumulatingBuffer.length );
            newBufRebuild.fill();
            accumulatingBuffer.copy( newBufRebuild, 0, totalPacketLen + packetHeaderLen, accumulatingBuffer.length  );

            //init      
            accumulatingLen -= (totalPacketLen +4) ;
            accumulatingBuffer = newBufRebuild;
            newBufRebuild = null;
            totalPacketLen = -1;
            console.log( 'Init: accumulatingBuffer= ' + accumulatingBuffer );   
            console.log( '      accumulatingLen   = ' + accumulatingLen );  

            if( accumulatingLen <= packetHeaderLen ) {
                return;
            } else {
                totalPacketLen = accumulatingBuffer.readUInt32BE(0) ; 
                console.log('totalPacketLen=' + totalPacketLen );
            }    
        }  
    }); 

    ...
});

please refer to below for whole example.

请参阅下面的完整示例。

https://github.com/jeremyko/nodeChatServer

https://github.com/jeremyko/nodeChatServer

Hope this help.

希望这有帮助。