Java 中的协议缓冲区分隔 I/O 函数是否有 C++ 等效项?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/2340730/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-13 06:20:31  来源:igfitidea点击:

Are there C++ equivalents for the Protocol Buffers delimited I/O functions in Java?

javac++serializationprotocol-buffers

提问by tzaman

I'm trying to read / write multiple Protocol Buffers messages from files, in both C++ and Java. Google suggests writing length prefixes before the messages, but there's no way to do that by default (that I could see).

我正在尝试使用 C++ 和 Java 从文件中读取/写入多个协议缓冲区消息。Google 建议在消息之前编写长度前缀,但默认情况下没有办法做到这一点(我可以看到)。

However, the Java API in version 2.1.0 received a set of "Delimited" I/O functions which apparently do that job:

但是,2.1.0 版中的 Java API 收到了一组“分隔的”I/O 函数,它们显然可以完成这项工作:

parseDelimitedFrom
mergeDelimitedFrom
writeDelimitedTo

Are there C++ equivalents? And if not, what's the wire format for the size prefixes the Java API attaches, so I can parse those messages in C++?

有 C++ 等价物吗?如果没有,Java API 附加的大小前缀的有线格式是什么,以便我可以在 C++ 中解析这些消息?



Update:

更新:

These now exist in google/protobuf/util/delimited_message_util.has of v3.3.0.

这些现在存在于google/protobuf/util/delimited_message_util.hv3.3.0 中。

采纳答案by Kenton Varda

I'm a bit late to the party here, but the below implementations include some optimizations missing from the other answers and will not fail after 64MB of input (though it still enforces the 64MB limiton each individual message, just not on the whole stream).

我在这里参加聚会有点晚了,但是下面的实现包括其他答案中缺少的一些优化,并且在输入 64MB 后不会失败(尽管它仍然对每条消息强制执行64MB 限制,只是不在整个流上)。

(I am the author of the C++ and Java protobuf libraries, but I no longer work for Google. Sorry that this code never made it into the official lib. This is what it would look like if it had.)

(我是 C++ 和 Java protobuf 库的作者,但我不再为 Google 工作。抱歉,这段代码从未进入官方库。这就是它的样子。)

bool writeDelimitedTo(
    const google::protobuf::MessageLite& message,
    google::protobuf::io::ZeroCopyOutputStream* rawOutput) {
  // We create a new coded stream for each message.  Don't worry, this is fast.
  google::protobuf::io::CodedOutputStream output(rawOutput);

  // Write the size.
  const int size = message.ByteSize();
  output.WriteVarint32(size);

  uint8_t* buffer = output.GetDirectBufferForNBytesAndAdvance(size);
  if (buffer != NULL) {
    // Optimization:  The message fits in one buffer, so use the faster
    // direct-to-array serialization path.
    message.SerializeWithCachedSizesToArray(buffer);
  } else {
    // Slightly-slower path when the message is multiple buffers.
    message.SerializeWithCachedSizes(&output);
    if (output.HadError()) return false;
  }

  return true;
}

bool readDelimitedFrom(
    google::protobuf::io::ZeroCopyInputStream* rawInput,
    google::protobuf::MessageLite* message) {
  // We create a new coded stream for each message.  Don't worry, this is fast,
  // and it makes sure the 64MB total size limit is imposed per-message rather
  // than on the whole stream.  (See the CodedInputStream interface for more
  // info on this limit.)
  google::protobuf::io::CodedInputStream input(rawInput);

  // Read the size.
  uint32_t size;
  if (!input.ReadVarint32(&size)) return false;

  // Tell the stream not to read beyond that size.
  google::protobuf::io::CodedInputStream::Limit limit =
      input.PushLimit(size);

  // Parse the message.
  if (!message->MergeFromCodedStream(&input)) return false;
  if (!input.ConsumedEntireMessage()) return false;

  // Release the limit.
  input.PopLimit(limit);

  return true;
}

回答by Jan

You can use getline for reading a string from a stream, using the specified delimiter:

您可以使用 getline 使用指定的分隔符从流中读取字符串:

istream& getline ( istream& is, string& str, char delim );

(defined in the header)

(在标题中定义)

回答by tzaman

Okay, so I haven't been able to find top-level C++ functions implementing what I need, but some spelunking through the Java API reference turned up the following, inside the MessageLiteinterface:

好的,所以我一直无法找到实现我需要的顶级 C++ 函数,但是通过 Java API 参考进行了一些探索,在MessageLite界面中发现了以下内容:

void writeDelimitedTo(OutputStream output)
/*  Like writeTo(OutputStream), but writes the size of 
    the message as a varint before writing the data.   */

So the Java size prefix is a (Protocol Buffers) varint!

所以 Java 大小前缀是一个 (Protocol Buffers) varint!

Armed with that information, I went digging through the C++ API and found the CodedStreamheader, which has these:

有了这些信息,我深入研究了 C++ API 并找到了CodedStream标头,其中包含以下内容:

bool CodedInputStream::ReadVarint32(uint32 * value)
void CodedOutputStream::WriteVarint32(uint32 value)

Using those, I should be able to roll my own C++ functions that do the job.

使用这些,我应该能够推出自己的 C++ 函数来完成这项工作。

They should really add this to the main Message API though; it's missing functionality considering Java has it, and so does Marc Gravell's excellent protobuf-net C# port (via SerializeWithLengthPrefix and DeserializeWithLengthPrefix).

不过,他们真的应该将其添加到主消息 API 中;考虑到 Java 具有它,它缺少功能,Marc Gravell 出色的 protobuf-net C# 端口(通过 SerializeWithLengthPrefix 和 DeserializeWithLengthPrefix)也是如此。

回答by Yukiko

I solved the same problem using CodedOutputStream/ArrayOutputStream to write the message (with the size) and CodedInputStream/ArrayInputStream to read the message (with the size).

我解决了同样的问题,使用 CodedOutputStream/ArrayOutputStream 写入消息(带大小)和 CodedInputStream/ArrayInputStream 读取消息(带大小)。

For example, the following pseudo-code writes the message size following by the message:

例如,以下伪代码在消息后面写入消息大小:

const unsigned bufLength = 256;
unsigned char buffer[bufLength];
Message protoMessage;

google::protobuf::io::ArrayOutputStream arrayOutput(buffer, bufLength);
google::protobuf::io::CodedOutputStream codedOutput(&arrayOutput);

codedOutput.WriteLittleEndian32(protoMessage.ByteSize());
protoMessage.SerializeToCodedStream(&codedOutput);

When writing you should also check that your buffer is large enough to fit the message (including the size). And when reading, you should check that your buffer contains a whole message (including the size).

写入时,您还应该检查缓冲区是否足够大以适合消息(包括大小)。并且在阅读时,您应该检查您的缓冲区是否包含完整的消息(包括大小)。

It definitely would be handy if they added convenience methods to C++ API similar to those provided by the Java API.

如果他们向 C++ API 添加类似于 Java API 提供的便利方法,那肯定会很方便。

回答by Kim Laurio

Was also looking for a solution for this. Here's the core of our solution, assuming some java code wrote many MyRecord messages with writeDelimitedTointo a file. Open the file and loop, doing:

也在为此寻找解决方案。这是我们解决方案的核心,假设一些 java 代码将许多 MyRecord 消息writeDelimitedTo写入一个文件。打开文件并循环,执行:

if(someCodedInputStream->ReadVarint32(&bytes)) {
  CodedInputStream::Limit msgLimit = someCodedInputStream->PushLimit(bytes);
  if(myRecord->ParseFromCodedStream(someCodedInputStream)) {
    //do your stuff with the parsed MyRecord instance
  } else {
    //handle parse error
  }
  someCodedInputStream->PopLimit(msgLimit);
} else {
  //maybe end of file
}

Hope it helps.

希望能帮助到你。

回答by jaybny

Here you go:

干得好:

#include <google/protobuf/io/zero_copy_stream_impl.h>
#include <google/protobuf/io/coded_stream.h>

using namespace google::protobuf::io;

class FASWriter 
{
    std::ofstream mFs;
    OstreamOutputStream *_OstreamOutputStream;
    CodedOutputStream *_CodedOutputStream;
public:
    FASWriter(const std::string &file) : mFs(file,std::ios::out | std::ios::binary)
    {
        assert(mFs.good());

        _OstreamOutputStream = new OstreamOutputStream(&mFs);
        _CodedOutputStream = new CodedOutputStream(_OstreamOutputStream);
    }

    inline void operator()(const ::google::protobuf::Message &msg)
    {
        _CodedOutputStream->WriteVarint32(msg.ByteSize());

        if ( !msg.SerializeToCodedStream(_CodedOutputStream) )
            std::cout << "SerializeToCodedStream error " << std::endl;
    }

    ~FASWriter()
    {
        delete _CodedOutputStream;
        delete _OstreamOutputStream;
        mFs.close();
    }
};

class FASReader
{
    std::ifstream mFs;

    IstreamInputStream *_IstreamInputStream;
    CodedInputStream *_CodedInputStream;
public:
    FASReader(const std::string &file), mFs(file,std::ios::in | std::ios::binary)
    {
        assert(mFs.good());

        _IstreamInputStream = new IstreamInputStream(&mFs);
        _CodedInputStream = new CodedInputStream(_IstreamInputStream);      
    }

    template<class T>
    bool ReadNext()
    {
        T msg;
        unsigned __int32 size;

        bool ret;
        if ( ret = _CodedInputStream->ReadVarint32(&size) )
        {   
            CodedInputStream::Limit msgLimit = _CodedInputStream->PushLimit(size);
            if ( ret = msg.ParseFromCodedStream(_CodedInputStream) )
            {
                _CodedInputStream->PopLimit(msgLimit);      
                std::cout << mFeed << " FASReader ReadNext: " << msg.DebugString() << std::endl;
            }
        }

        return ret;
    }

    ~FASReader()
    {
        delete _CodedInputStream;
        delete _IstreamInputStream;
        mFs.close();
    }
};

回答by Fulkerson

IsteamInputStream is very fragile to eofs and other errors that easily occurs when used together with std::istream. After this the protobuf streams are permamently damaged and any already used buffer data is destroyed. There are proper support for reading from traditional streams in protobuf.

IsteamInputStream 对 eofs 和其他错误非常脆弱,当与 std::istream 一起使用时很容易发生这些错误。在此之后,protobuf 流被永久损坏,任何已经使用的缓冲区数据都将被破坏。从 protobuf 中的传统流读取有适当的支持。

Implement google::protobuf::io::CopyingInputStreamand use that together with CopyingInputStreamAdapter. Do the same for the output variants.

google::protobuf::io::CopyingInputStreamCopyingInputStreamAdapter一起实现和使用它。对输出变量执行相同操作。

In practice a parsing call ends up in google::protobuf::io::CopyingInputStream::Read(void* buffer, int size)where a buffer is given. The only thing left to do is read into it somehow.

在实践中,解析调用在google::protobuf::io::CopyingInputStream::Read(void* buffer, int size)给出缓冲区的地方结束。唯一剩下要做的就是以某种方式阅读它。

Here's an example for use with Asio synchronized streams (SyncReadStream/SyncWriteStream):

下面是一个与 Asio 同步流(SyncReadStream/ SyncWriteStream)一起使用的示例:

#include <google/protobuf/io/zero_copy_stream_impl_lite.h>

using namespace google::protobuf::io;


template <typename SyncReadStream>
class AsioInputStream : public CopyingInputStream {
    public:
        AsioInputStream(SyncReadStream& sock);
        int Read(void* buffer, int size);
    private:
        SyncReadStream& m_Socket;
};


template <typename SyncReadStream>
AsioInputStream<SyncReadStream>::AsioInputStream(SyncReadStream& sock) :
    m_Socket(sock) {}


template <typename SyncReadStream>
int
AsioInputStream<SyncReadStream>::Read(void* buffer, int size)
{
    std::size_t bytes_read;
    boost::system::error_code ec;
    bytes_read = m_Socket.read_some(boost::asio::buffer(buffer, size), ec);

    if(!ec) {
        return bytes_read;
    } else if (ec == boost::asio::error::eof) {
        return 0;
    } else {
        return -1;
    }
}


template <typename SyncWriteStream>
class AsioOutputStream : public CopyingOutputStream {
    public:
        AsioOutputStream(SyncWriteStream& sock);
        bool Write(const void* buffer, int size);
    private:
        SyncWriteStream& m_Socket;
};


template <typename SyncWriteStream>
AsioOutputStream<SyncWriteStream>::AsioOutputStream(SyncWriteStream& sock) :
    m_Socket(sock) {}


template <typename SyncWriteStream>
bool
AsioOutputStream<SyncWriteStream>::Write(const void* buffer, int size)
{   
    boost::system::error_code ec;
    m_Socket.write_some(boost::asio::buffer(buffer, size), ec);
    return !ec;
}

Usage:

用法:

AsioInputStream<boost::asio::ip::tcp::socket> ais(m_Socket); // Where m_Socket is a instance of boost::asio::ip::tcp::socket
CopyingInputStreamAdaptor cis_adp(&ais);
CodedInputStream cis(&cis_adp);

Message protoMessage;
uint32_t msg_size;

/* Read message size */
if(!cis.ReadVarint32(&msg_size)) {
    // Handle error
 }

/* Make sure not to read beyond limit of message */
CodedInputStream::Limit msg_limit = cis.PushLimit(msg_size);
if(!msg.ParseFromCodedStream(&cis)) {
    // Handle error
}

/* Remove limit */
cis.PopLimit(msg_limit);

回答by gp-coder

Working with an objective-c version of protocol-buffers, I ran into this exact issue. On sending from the iOS client to a Java based server that uses parseDelimitedFrom, which expects the length as the first byte, I needed to call writeRawByte to the CodedOutputStream first. Posting here to hopegully help others that run into this issue. While working through this issue, one would think that Google proto-bufs would come with a simply flag which does this for you...

使用objective-c 版本的protocol-buffers,我遇到了这个确切的问题。在从 iOS 客户端发送到使用 parseDelimitedFrom 的基于 Java 的服务器时,它期望长度为第一个字节,我需要先将 writeRawByte 调用到 CodedOutputStream。在这里发帖希望能帮助遇到这个问题的其他人。在解决这个问题时,人们会认为 Google proto-bufs 会带有一个简单的标志,可以为您执行此操作...

    Request* request = [rBuild build];

    [self sendMessage:request];
} 


- (void) sendMessage:(Request *) request {

    //** get length
    NSData* n = [request data];
    uint8_t len = [n length];

    PBCodedOutputStream* os = [PBCodedOutputStream streamWithOutputStream:outputStream];
    //** prepend it to message, such that Request.parseDelimitedFrom(in) can parse it properly
    [os writeRawByte:len];
    [request writeToCodedOutputStream:os];
    [os flush];
}

回答by fireboot

I ran into the same issue in both C++ and Python.

我在 C++ 和 Python 中都遇到了同样的问题。

For the C++ version, I used a mix of the code Kenton Varda posted on this thread and the code from the pull request he sent to the protobuf team (because the version posted here doesn't handle EOF while the one he sent to github does).

对于 C++ 版本,我混合使用了 Kenton Varda 在此线程上发布的代码和他发送给 protobuf 团队的拉取请求中的代码(因为这里发布的版本不处理 EOF,而他发送到 github 的版本可以)。

#include <google/protobuf/message_lite.h>
#include <google/protobuf/io/zero_copy_stream.h>
#include <google/protobuf/io/coded_stream.h>


bool writeDelimitedTo(const google::protobuf::MessageLite& message,
    google::protobuf::io::ZeroCopyOutputStream* rawOutput)
{
    // We create a new coded stream for each message.  Don't worry, this is fast.
    google::protobuf::io::CodedOutputStream output(rawOutput);

    // Write the size.
    const int size = message.ByteSize();
    output.WriteVarint32(size);

    uint8_t* buffer = output.GetDirectBufferForNBytesAndAdvance(size);
    if (buffer != NULL)
    {
        // Optimization:  The message fits in one buffer, so use the faster
        // direct-to-array serialization path.
        message.SerializeWithCachedSizesToArray(buffer);
    }

    else
    {
        // Slightly-slower path when the message is multiple buffers.
        message.SerializeWithCachedSizes(&output);
        if (output.HadError())
            return false;
    }

    return true;
}

bool readDelimitedFrom(google::protobuf::io::ZeroCopyInputStream* rawInput, google::protobuf::MessageLite* message, bool* clean_eof)
{
    // We create a new coded stream for each message.  Don't worry, this is fast,
    // and it makes sure the 64MB total size limit is imposed per-message rather
    // than on the whole stream.  (See the CodedInputStream interface for more
    // info on this limit.)
    google::protobuf::io::CodedInputStream input(rawInput);
    const int start = input.CurrentPosition();
    if (clean_eof)
        *clean_eof = false;


    // Read the size.
    uint32_t size;
    if (!input.ReadVarint32(&size))
    {
        if (clean_eof)
            *clean_eof = input.CurrentPosition() == start;
        return false;
    }
    // Tell the stream not to read beyond that size.
    google::protobuf::io::CodedInputStream::Limit limit = input.PushLimit(size);

    // Parse the message.
    if (!message->MergeFromCodedStream(&input)) return false;
    if (!input.ConsumedEntireMessage()) return false;

    // Release the limit.
    input.PopLimit(limit);

    return true;
}

And here is my python2 implementation:

这是我的python2实现:

from google.protobuf.internal import encoder
from google.protobuf.internal import decoder

#I had to implement this because the tools in google.protobuf.internal.decoder
#read from a buffer, not from a file-like objcet
def readRawVarint32(stream):
    mask = 0x80 # (1 << 7)
    raw_varint32 = []
    while 1:
        b = stream.read(1)
        #eof
        if b == "":
            break
        raw_varint32.append(b)
        if not (ord(b) & mask):
            #we found a byte starting with a 0, which means it's the last byte of this varint
            break
    return raw_varint32

def writeDelimitedTo(message, stream):
    message_str = message.SerializeToString()
    delimiter = encoder._VarintBytes(len(message_str))
    stream.write(delimiter + message_str)

def readDelimitedFrom(MessageType, stream):
    raw_varint32 = readRawVarint32(stream)
    message = None

    if raw_varint32:
        size, _ = decoder._DecodeVarint32(raw_varint32, 0)

        data = stream.read(size)
        if len(data) < size:
            raise Exception("Unexpected end of file")

        message = MessageType()
        message.ParseFromString(data)

    return message

#In place version that takes an already built protobuf object
#In my tests, this is around 20% faster than the other version 
#of readDelimitedFrom()
def readDelimitedFrom_inplace(message, stream):
    raw_varint32 = readRawVarint32(stream)

    if raw_varint32:
        size, _ = decoder._DecodeVarint32(raw_varint32, 0)

        data = stream.read(size)
        if len(data) < size:
            raise Exception("Unexpected end of file")

        message.ParseFromString(data)

        return message
    else:
        return None

It might not be the best looking code and I'm sure it can be refactored a fair bit, but at least that should show you one way to do it.

它可能不是最好看的代码,我相信它可以重构相当多,但至少应该向您展示一种方法来做到这一点。

Now the big problem: It's SLOW.

现在最大的问题是:它很

Even when using the C++ implementation of python-protobuf, it's one order of magnitude slower than in pure C++. I have a benchmark where I read 10M protobuf messages of ~30 bytes each from a file. It takes ~0.9s in C++, and 35s in python.

即使使用 python-protobuf 的 C++ 实现,它也比纯 C++ 慢一个数量级。我有一个基准测试,我从文件中读取 10M protobuf 消息,每个消息约 30 个字节。在 C++ 中需要大约 0.9 秒,在 python 中需要 35 秒。

One way to make it a bit faster would be to re-implement the varint decoder to make it read from a file and decode in one go, instead of reading from a file and then decoding as this code currently does. (profiling shows that a significant amount of time is spent in the varint encoder/decoder). But needless to say that alone is not enough to close the gap between the python version and the C++ version.

使它更快一点的一种方法是重新实现 varint 解码器,使其从文件中读取并一次性解码,而不是像当前代码那样从文件中读取然后解码。(分析显示在 varint 编码器/解码器中花费了大量时间)。但不用说,仅凭这一点还不足以缩小 python 版本和 C++ 版本之间的差距。

Any idea to make it faster is very welcome :)

任何让它更快的想法都非常受欢迎:)

回答by ciphersimian

Since I'm not allowed to write this as a comment to Kenton Varda's answer above; I believe there is a bug in the code he posted (as well as in other answers which have been provided). The following code:

由于我不允许将其写为对上面肯顿·瓦尔达 (Kenton Varda) 回答的评论;我相信他发布的代码中存在错误(以及提供的其他答案)。以下代码:

...
google::protobuf::io::CodedInputStream input(rawInput);

// Read the size.
uint32_t size;
if (!input.ReadVarint32(&size)) return false;

// Tell the stream not to read beyond that size.
google::protobuf::io::CodedInputStream::Limit limit =
    input.PushLimit(size);
...

sets an incorrect limit because it does not take into account the size of the varint32 which has already been read from input. This can result in data loss/corruption as additional bytes are read from the stream which may be part of the next message. The usual way of handling this correctly is to delete the CodedInputStream used to read the size and create a new one for reading the payload:

设置不正确的限制,因为它没有考虑已经从输入中读取的 varint32 的大小。这可能会导致数据丢失/损坏,因为从流中读取了额外的字节,这可能是下一条消息的一部分。正确处理此问题的常用方法是删除用于读取大小的 CodedInputStream 并创建一个新的用于读取有效负载:

...
uint32_t size;
{
  google::protobuf::io::CodedInputStream input(rawInput);

  // Read the size.
  if (!input.ReadVarint32(&size)) return false;
}

google::protobuf::io::CodedInputStream input(rawInput);

// Tell the stream not to read beyond that size.
google::protobuf::io::CodedInputStream::Limit limit =
    input.PushLimit(size);
...