Java 网络:事件 Socket/InputStream

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/5969827/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-30 13:45:19  来源:igfitidea点击:

Java networking: evented Socket/InputStream

javaeventssocketsnetworking

提问by slezica

I'm implementing an event-oriented layer over Java's Sockets, and I was wondering if there was a way to determine if there is data pending to read.

我正在 Java 的 Sockets 上实现一个面向事件的层,我想知道是否有办法确定是否有待读取的数据。

My normal approach would be to read from the socket into a buffer, and call the provided callbacks when the buffer is filled over a given amount of bytes (which could be 0, if the callback needs to be fired every time anything arrives), but I suspect Java is already doing the buffering for me.

我的正常方法是从套接字读取到缓冲区,并在缓冲区填充超过给定字节数时调用提供的回调(如果每次到达时都需要触发回调,则该回调可能为 0),但是我怀疑 Java 已经在为我做缓冲了。

Is the available()method of InputStream reliable for this? Should I just read()and do my own buffering on top of the Socket? Or is there another way?

available()InputStream的方法是否可靠?我应该只是read()在 Socket 上做我自己的缓冲吗?或者还有其他方法吗?

回答by Sorrow

Shortly put, no. available()is not reliable (at least it was not for me). I recommend using java.nio.channels.SocketChannelconnected with Selectorand SelectionKey. This solution is somewhat event-based, but is more complicated than just plain sockets.

简而言之,没有。available()不可靠(至少不适合我)。我建议使用java.nio.channels.SocketChannel连接SelectorSelectionKey。这个解决方案有点基于事件,但比简单的套接字更复杂。

For clients:

对于客户:

  1. Construct socket channel (socket), open a selector (selector = Selector.open();).
  2. Use non-blocking socket.configureBlocking(false);
  3. Register selector for connections socket.register(selector, SelectionKey.OP_CONNECT);
  4. Connect socket.connect(new InetSocketAddress(host, port));
  5. See if there is anything new selector.select();
  6. If the "new" refers to successful connection, register the selector for OP_READ; if the "new" refers to data available, just read from the socket.
  1. 构造套接字通道 ( socket),打开选择器 ( selector = Selector.open();)。
  2. 使用非阻塞 socket.configureBlocking(false);
  3. 注册连接选择器 socket.register(selector, SelectionKey.OP_CONNECT);
  4. 连接 socket.connect(new InetSocketAddress(host, port));
  5. 看看有没有新的 selector.select();
  6. 如果“new”是指成功连接,请注册选择器OP_READ;如果“新”指的是可用数据,则只需从套接字读取。

However, in order to have it asynchronous you would need to set up a separate thread (despite the socket being created as non-blocked, the thread will block anyway) that checks whether something has arrived or not.

但是,为了使其异步,您需要设置一个单独的线程(尽管创建的套接字是非阻塞的,但无论如何该线程都会阻塞)来检查是否有东西到达。

For servers, there is ServerSocketChanneland you use OP_ACCEPTfor it.

对于服务器,有ServerSocketChannel并且您可以使用OP_ACCEPT它。

For reference, this is my code (client), should give you a hint:

作为参考,这是我的代码(客户端),应该给你一个提示:

 private Thread readingThread = new ListeningThread();

 /**
  * Listening thread - reads messages in a separate thread so the application does not get blocked.
  */
 private class ListeningThread extends Thread {
  public void run() {
   running = true;
   try {
    while(!close) listen();
    messenger.close();
   }
   catch(ConnectException ce) {
    doNotifyConnectionFailed(ce);
   }
   catch(Exception e) {
//    e.printStackTrace();
    messenger.close();
   }
   running = false;
  }
 }

 /**
  * Connects to host and port.
  * @param host Host to connect to.
  * @param port Port of the host machine to connect to.
  */
 public void connect(String host, int port) {
  try {
   SocketChannel socket = SocketChannel.open();
   socket.configureBlocking(false);
   socket.register(this.selector, SelectionKey.OP_CONNECT);
   socket.connect(new InetSocketAddress(host, port));
  }
  catch(IOException e) {
   this.doNotifyConnectionFailed(e);
  }
 }

 /**
  * Waits for an event to happen, processes it and then returns.
  * @throws IOException when something goes wrong.
  */
 protected void listen() throws IOException {
  // see if there are any new things going on
  this.selector.select();
  // process events
  Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
  while(iter.hasNext()) {
   SelectionKey key = iter.next();
   iter.remove();
   // check validity
   if(key.isValid()) {
    // if connectable...
    if(key.isConnectable()) {
     // ...establish connection, make messenger, and notify everyone
     SocketChannel client = (SocketChannel)key.channel();
     // now this is tricky, registering for OP_READ earlier causes the selector not to wait for incoming bytes, which results in 100% cpu usage very, very fast
     if(client!=null && client.finishConnect()) {
      client.register(this.selector, SelectionKey.OP_READ);
     }
    }
    // if readable, tell messenger to read bytes
    else if(key.isReadable() && (SocketChannel)key.channel()==this.messenger.getSocket()) {
     // read message here
    }
   }
  }
 }

 /**
  * Starts the client.
  */
 public void start() {
  // start a reading thread
  if(!this.running) {
   this.readingThread = new ListeningThread();
   this.readingThread.start();
  }
 }

 /**
  * Tells the client to close at nearest possible moment.
  */
 public void close() {
  this.close = true;
 }

And for server:

对于服务器:

 /**
  * Constructs a server.
  * @param port Port to listen to.
  * @param protocol Protocol of messages.
  * @throws IOException when something goes wrong.
  */
 public ChannelMessageServer(int port) throws IOException {
  this.server = ServerSocketChannel.open();
  this.server.configureBlocking(false);
  this.server.socket().bind(new InetSocketAddress(port));
  this.server.register(this.selector, SelectionKey.OP_ACCEPT);
 }

 /**
  * Waits for event, then exits.
  * @throws IOException when something goes wrong.
  */
 protected void listen() throws IOException {
  // see if there are any new things going on
  this.selector.select();
  // process events
  Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
  while(iter.hasNext()) {
   SelectionKey key = iter.next();
   // do something with the connected socket
   iter.remove();
   if(key.isValid()) this.process(key);
  }
 }

 /**
  * Processes a selection key.
  * @param key SelectionKey.
  * @throws IOException when something is wrong.
  */
 protected void process(SelectionKey key) throws IOException {
  // if incoming connection
  if(key.isAcceptable()) {
   // get client
   SocketChannel client = (((ServerSocketChannel)key.channel()).accept());
    try {
     client.configureBlocking(false);
     client.register(this.selector, SelectionKey.OP_READ);
    }
    catch(Exception e) {
     // catch
    }
  }
  // if readable, tell messenger to read
  else if(key.isReadable()) {
  // read
  }
 }

Hope this helps.

希望这可以帮助。

回答by Peter Lawrey

available() will only tell you if you can read data without going to the OS. Its not very useful here.

available() 只会告诉您是否可以在不访问操作系统的情况下读取数据。它在这里不是很有用。

You can either do a blocking, or non-blocking read as you prefer. A non-blocking read just returns when there is no data to read so that may be what you want.

您可以根据自己的喜好进行阻塞或非阻塞读取。当没有要读取的数据时,非阻塞读取才会返回,因此这可能是您想要的。