java 线程中断未结束对输入流读取的阻塞调用

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/3843363/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-30 03:38:08  来源:igfitidea点击:

Thread interrupt not ending blocking call on input stream read

javamultithreadingnonblockingchannelrxtx

提问by JDS

I'm using RXTX to read data from a serial port. The reading is done within a thread spawned in the following manner:

我正在使用 RXTX 从串行端口读取数据。读取是在以下列方式产生的线程内完成的:

CommPortIdentifier portIdentifier = CommPortIdentifier.getPortIdentifier(port);
CommPort comm = portIdentifier.open("Whatever", 2000);
SerialPort serial = (SerialPort)comm;
...settings
Thread t = new Thread(new SerialReader(serial.getInputStream()));
t.start();

The SerialReader class implements Runnable and just loops indefinitely, reading from the port and constructing the data into useful packages before sending it off to other applications. However, I've reduced it down to the following simplicity:

SerialReader 类实现 Runnable 并且只是无限循环,从端口读取并将数据构造成有用的包,然后再将其发送到其他应用程序。但是,我已将其简化为以下简单性:

public void run() {
  ReadableByteChannel byteChan = Channels.newChannel(in); //in = InputStream passed to SerialReader
  ByteBuffer buffer = ByteBuffer.allocate(100);
  while (true) {
    try {
      byteChan.read(buffer);
    } catch (Exception e) {
      System.out.println(e);
    }
  }
}

When a user clicks a stop button, the following functionality fires that should in theory close the input stream and break out of the blocking byteChan.read(buffer) call. The code is as follows:

当用户单击停止按钮时,将触发以下功能,理论上应该关闭输入流并打破阻塞的 byteChan.read(buffer) 调用。代码如下:

public void stop() {
  t.interrupt();
  serial.close();
}

However, when I run this code, I never get a ClosedByInterruptException, which SHOULD fire once the input stream closes. Furthermore, the execution blocks on the call to serial.close() -- because the underlying input stream is still blocking on the read call. I've tried replacing the interrupt call with byteChan.close(), which should then cause an AsynchronousCloseException, however, I'm getting the same results.

但是,当我运行此代码时,我永远不会收到 ClosedByInterruptException,它应该在输入流关闭后触发。此外,执行会阻塞对 serial.close() 的调用——因为底层输入流仍然阻塞在读取调用上。我已经尝试用 byteChan.close() 替换中断调用,这应该会导致 AsynchronousCloseException,但是,我得到了相同的结果。

Any help on what I'm missing would be greatly appreciated.

对我所缺少的任何帮助将不胜感激。

采纳答案by JDS

The RXTX SerialInputStream (what is returned by the serial.getInputStream() call) supports a timeout scheme that ended up solving all my problems. Adding the following before creating the new SerialReader object causes the reads to no longer block indefinitely:

RXTX SerialInputStream(serial.getInputStream() 调用返回的内容)支持超时方案,最终解决了我的所有问题。在创建新的 SerialReader 对象之前添加以下内容会导致读取不再无限期地阻塞:

serial.enableReceiveTimeout(1000);

Within the SerialReader object, I had to change a few things around to read directly from the InputStream instead of creating the ReadableByteChannel, but now, I can stop and restart the reader without issue.

在 SerialReader 对象中,我必须更改一些内容才能直接从 InputStream 读取,而不是创建 ReadableByteChannel,但是现在,我可以毫无问题地停止并重新启动读取器。

回答by erickson

You can't make a stream that doesn't support interruptible I/O into an InterruptibleChannelsimply by wrapping it (and, anyway, ReadableByteChanneldoesn't extend InterruptibleChannel).

您不能通过InterruptibleChannel简单地将不支持可中断 I/O 的流包装(并且,无论如何,ReadableByteChannel不扩展InterruptibleChannel)。

You have to look at the contract of the underlying InputStream. What does SerialPort.getInputStream()say about the interruptibility of its result? If it doesn't say anything, you should assume that it ignores interrupts.

您必须查看底层证券的合约InputStreamSerialPort.getInputStream()关于其结果的可中断性怎么说?如果它什么也没说,你应该假设它忽略了中断。

For any I/O that doesn't explicitly support interruptibility, the only option is generally closing the stream from another thread. This may immediately raise an IOException(though it might not be an AsynchronousCloseException) in the thread blocked on a call to the stream.

对于任何不明确支持可中断性的 I/O,唯一的选择通常是从另一个线程关闭流。这可能会立即在调用流时阻塞的线程中引发一个IOException(尽管它可能不是AsynchronousCloseException)。

However, even this is extremely dependent on the implementation of the InputStream—and the underlying OS can be a factor too.

然而,即使这也极其依赖于实现InputStream——底层操作系统也可能是一个因素。



Note the source code comment on the ReadableByteChannelImplclass returned by newChannel():

请注意由ReadableByteChannelImpl返回的类的源代码注释newChannel()

  private static class ReadableByteChannelImpl
    extends AbstractInterruptibleChannel       // Not really interruptible
    implements ReadableByteChannel
  {
    InputStream in;
    ?

回答by Ray Tayek

i am using the code below to shutdown rxtx. i run tests that start them up and shut them down and the seems to work ok. my reader looks like:

我正在使用下面的代码来关闭 rxtx。我运行测试来启动和关闭它们,似乎工作正常。我的读者看起来像:

private void addPartsToQueue(final InputStream inputStream) {
    byte[] buffer = new byte[1024];
    int len = -1;
    boolean first = true;
    // the read can throw
    try {
        while ((len = inputStream.read(buffer)) > -1) {
            if (len > 0) {
                if (first) {
                    first = false;
                    t0 = System.currentTimeMillis();
                } else
                    t1 = System.currentTimeMillis();
                final String part = new String(new String(buffer, 0, len));
                queue.add(part);
                //System.out.println(part + " " + (t1 - t0));
            }
            try {
                Thread.sleep(sleep);
            } catch (InterruptedException e) {
                //System.out.println(Thread.currentThread().getName() + " interrupted " + e);
                break;
            }
        }
    } catch (IOException e) {
        System.err.println(Thread.currentThread().getName() + " " + e);
        //if(interruSystem.err.println(e);
        e.printStackTrace();
    }
    //System.out.println(Thread.currentThread().getName() + " is ending.");
}

thanks

谢谢

public void shutdown(final Device device) {
    shutdown(serialReaderThread);
    shutdown(messageAssemblerThread);
    serialPort.close();
    if (device != null)
        device.setSerialPort(null);
}

public static void shutdown(final Thread thread) {
    if (thread != null) {
        //System.out.println("before intterupt() on thread " + thread.getName() + ", it's state is " + thread.getState());
        thread.interrupt();
        //System.out.println("after intterupt() on thread " + thread.getName() + ", it's state is " + thread.getState());
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            System.out.println(Thread.currentThread().getName() + " was interrupted trying to sleep after interrupting" + thread.getName() + " " + e);
        }
        //System.out.println("before join() on thread " + thread.getName() + ", it's state is " + thread.getState());
        try {
            thread.join();
        } catch (InterruptedException e) {
            System.out.println(Thread.currentThread().getName() + " join interruped");
        }
        //System.out.println(Thread.currentThread().getName() + " after join() on thread " + thread.getName() + ", it's state is" + thread.getState());
    }