java.nio 选择器和 SocketChannel 用于继续流式传输
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/10716057/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
java.nio Selectors and SocketChannel for continues Streaming
提问by Robert Brooks
I am currently using the java.nio.channel.Selectors & SocketChannels for a application that will open 1-to-many connections for continues Streaming to a Server. I have three threads for my application: StreamWriteWorker - performs write operation to the SocketChannel, StreamReadWorker - reads bytes from the buffer and parse content, and StreamTaskDispatcher - performs Selector's selection for readyOps and dispatches new runnables for the worker threads.
我目前正在使用 java.nio.channel.Selectors & SocketChannels 作为一个应用程序,该应用程序将打开一对多连接以继续流式传输到服务器。我的应用程序有三个线程:StreamWriteWorker - 对 SocketChannel 执行写入操作,StreamReadWorker - 从缓冲区读取字节并解析内容,以及 StreamTaskDispatcher - 执行 Selector 对 readyOps 的选择并为工作线程调度新的 runnable。
Problem - Invocation on the Selector's selection method only returns a value > 0 (valid readyOps) on the first invocation; I am able to perform a write and send data on all ready channels that one time, but all of the following invocation of the Selector's selection method returns 0.
问题 - 对 Selector 的选择方法的调用仅在第一次调用时返回值 > 0(有效的 readyOps);我能够在所有准备好的通道上执行一次写入和发送数据,但是所有以下对 Selector 的选择方法的调用都返回 0。
Question: Do I need to invoke close on the SocketChannel after every Read/Write (I hope not!)? If not what could be the cause for the SocketChannels not being available of for any Read/Write Ops?
问题:我是否需要在每次读/写后在 SocketChannel 上调用 close(我希望不是!)?如果不是,可能是什么原因导致 SocketChannels 无法用于任何读/写操作?
I am sorry I cannot post the code, but I hope I have explained the problem clearly enough for someone to help. I have searched for answers and I see you cannot reuse a SocketChannel connection after it close, but my channel should not be close, the server never receives EOF stream result.
很抱歉我不能发布代码,但我希望我已经清楚地解释了这个问题,以便有人提供帮助。我已经搜索了答案,我看到您在关闭后无法重用 SocketChannel 连接,但是我的通道不应该关闭,服务器永远不会收到 EOF 流结果。
I made some progress and figured out that the write operation was not occurring on the server app due to json parsing error. So now my SocketChannel on the client app code becomes ready for another write operation after it process a read operation. I guess this is the TCP nature of SocketChannels. However, the SocketChannel does not become available for another read operation on the server app side,. Is this normal behavior for SocketChannels? Do I need to close the connection on the client side after the read operation and establish a new connection?
我取得了一些进展,并发现由于 json 解析错误,写入操作未在服务器应用程序上发生。所以现在我在客户端应用程序代码上的 SocketChannel 在处理读取操作后准备好进行另一个写入操作。我想这是 SocketChannels 的 TCP 特性。但是,SocketChannel 无法用于服务器应用程序端的另一个读取操作。这是 SocketChannels 的正常行为吗?读取操作后是否需要在客户端关闭连接并建立新连接?
Here is a code sample of what I am trying to do:
这是我正在尝试做的代码示例:
package org.stream.socket;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CodingErrorAction;
import java.util.HashMap;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.lang3.RandomStringUtils;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonPrimitive;
import com.google.gson.stream.JsonToken;
public class ClientServerTest {
private LinkedBlockingQueue<byte[]> dataQueue = new LinkedBlockingQueue<byte[]>();
private ExecutorService executor = Executors.newFixedThreadPool(1);
private HashMap<String, Integer> uuidToSize = new HashMap<String, Integer>();
private class StreamWriteTask implements Runnable {
private ByteBuffer buffer;
private SelectionKey key;
private Selector selector;
private StreamWriteTask(ByteBuffer buffer, SelectionKey key, Selector selector) {
this.buffer = buffer;
this.key = key;
this.selector = selector;
}
@Override
public void run() {
SocketChannel sc = (SocketChannel) key.channel();
byte[] data = (byte[]) key.attachment();
buffer.clear();
buffer.put(data);
buffer.flip();
int results = 0;
while (buffer.hasRemaining()) {
try {
results = sc.write(buffer);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
if (results == 0) {
buffer.compact();
buffer.flip();
data = new byte[buffer.remaining()];
buffer.get(data);
key.interestOps(SelectionKey.OP_WRITE);
key.attach(data);
selector.wakeup();
return;
}
}
key.interestOps(SelectionKey.OP_READ);
key.attach(null);
selector.wakeup();
}
}
private class StreamReadTask implements Runnable {
private ByteBuffer buffer;
private SelectionKey key;
private Selector selector;
private StreamReadTask(ByteBuffer buffer, SelectionKey key, Selector selector) {
this.buffer = buffer;
this.key = key;
this.selector = selector;
}
private boolean checkUUID(byte[] data) {
return uuidToSize.containsKey(new String(data));
}
@Override
public void run() {
SocketChannel sc = (SocketChannel) key.channel();
buffer.clear();
byte[] data = (byte[]) key.attachment();
if (data != null) {
buffer.put(data);
}
int count = 0;
int readAttempts = 0;
try {
while ((count = sc.read(buffer)) > 0) {
readAttempts++;
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
if (count == 0) {
buffer.flip();
data = new byte[buffer.limit()];
buffer.get(data);
if (checkUUID(data)) {
key.interestOps(SelectionKey.OP_READ);
key.attach(data);
} else {
System.out.println("Clinet Read - uuid ~~~~ " + new String(data));
key.interestOps(SelectionKey.OP_WRITE);
key.attach(null);
}
}
if (count == -1) {
try {
sc.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
selector.wakeup();
}
}
private class ClientWorker implements Runnable {
@Override
public void run() {
try {
Selector selector = Selector.open();
SocketChannel sc = SocketChannel.open();
sc.configureBlocking(false);
sc.connect(new InetSocketAddress("127.0.0.1", 9001));
sc.register(selector, SelectionKey.OP_CONNECT);
ByteBuffer buffer = ByteBuffer.allocateDirect(65535);
while (selector.isOpen()) {
int count = selector.select(10);
if (count == 0) {
continue;
}
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while (it.hasNext()) {
final SelectionKey key = it.next();
it.remove();
if (!key.isValid()) {
continue;
}
if (key.isConnectable()) {
sc = (SocketChannel) key.channel();
if (!sc.finishConnect()) {
continue;
}
sc.register(selector, SelectionKey.OP_WRITE);
}
if (key.isReadable()) {
key.interestOps(0);
executor.execute(new StreamReadTask(buffer, key, selector));
}
if (key.isWritable()) {
key.interestOps(0);
if(key.attachment() == null){
key.attach(dataQueue.take());
}
executor.execute(new StreamWriteTask(buffer, key, selector));
}
}
}
} catch (IOException ex) {
// Handle Exception
}catch(InterruptedException ex){
}
}
}
private class ServerWorker implements Runnable {
@Override
public void run() {
try {
Selector selector = Selector.open();
ServerSocketChannel ssc = ServerSocketChannel.open();
ServerSocket socket = ssc.socket();
socket.bind(new InetSocketAddress(9001));
ssc.configureBlocking(false);
ssc.register(selector, SelectionKey.OP_ACCEPT);
ByteBuffer buffer = ByteBuffer.allocateDirect(65535);
DataHandler handler = new DataHandler();
while (selector.isOpen()) {
int count = selector.select(10);
if (count == 0) {
continue;
}
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while (it.hasNext()) {
final SelectionKey key = it.next();
it.remove();
if (!key.isValid()) {
continue;
}
if (key.isAcceptable()) {
ssc = (ServerSocketChannel) key.channel();
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ);
}
if (key.isReadable()) {
handler.readSocket(buffer, key);
}
if (key.isWritable()) {
handler.writeToSocket(buffer, key);
}
}
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
private class DataHandler {
private JsonObject parseData(StringBuilder builder) {
if (!builder.toString().endsWith("}")) {
return null;
}
JsonParser parser = new JsonParser();
JsonObject obj = (JsonObject) parser.parse(builder.toString());
return obj;
}
private void readSocket(ByteBuffer buffer, SelectionKey key)
throws IOException {
SocketChannel sc = (SocketChannel) key.channel();
buffer.clear();
int count = Integer.MAX_VALUE;
int readAttempts = 0;
try {
while ((count = sc.read(buffer)) > 0) {
readAttempts++;
}
} catch (IOException e) {
e.printStackTrace();
}
if (count == 0) {
buffer.flip();
StringBuilder builder = key.attachment() instanceof StringBuilder ? (StringBuilder) key
.attachment() : new StringBuilder();
Charset charset = Charset.forName("UTF-8");
CharsetDecoder decoder = charset.newDecoder();
decoder.onMalformedInput(CodingErrorAction.IGNORE);
System.out.println(buffer);
CharBuffer charBuffer = decoder.decode(buffer);
String content = charBuffer.toString();
charBuffer = null;
builder.append(content);
System.out.println(content);
JsonObject obj = parseData(builder);
if (obj == null) {
key.attach(builder);
key.interestOps(SelectionKey.OP_READ);
} else {
System.out.println("data ~~~~~~~ " + builder.toString());
JsonPrimitive uuid = obj.get("uuid").getAsJsonPrimitive();
key.attach(uuid.toString().getBytes());
key.interestOps(SelectionKey.OP_WRITE);
}
}
if (count == -1) {
key.attach(null);
sc.close();
}
}
private void writeToSocket(ByteBuffer buffer, SelectionKey key)
throws IOException {
SocketChannel sc = (SocketChannel) key.channel();
byte[] data = (byte[]) key.attachment();
buffer.clear();
buffer.put(data);
buffer.flip();
int writeAttempts = 0;
while (buffer.hasRemaining()) {
int results = sc.write(buffer);
writeAttempts++;
System.out.println("Write Attempt #" + writeAttempts);
if (results == 0) {
buffer.compact();
buffer.flip();
data = new byte[buffer.remaining()];
buffer.get(data);
key.attach(data);
key.interestOps(SelectionKey.OP_WRITE);
break;
}
}
key.interestOps(SelectionKey.OP_READ);
key.attach(null);
}
}
public ClientServerTest() {
for (int index = 0; index < 1000; index++) {
JsonObject obj = new JsonObject();
String uuid = UUID.randomUUID().toString();
uuidToSize.put(uuid, uuid.length());
obj.addProperty("uuid", uuid);
String data = RandomStringUtils.randomAlphanumeric(10000);
obj.addProperty("event", data);
dataQueue.add(obj.toString().getBytes());
}
Thread serverWorker = new Thread(new ServerWorker());
serverWorker.start();
Thread clientWorker = new Thread(new ClientWorker());
clientWorker.start();
}
/**
* @param args
*/
public static void main(String[] args) {
ClientServerTest test = new ClientServerTest();
for(;;){
}
}
}
采纳答案by user207421
The correct way to process
OP_CONNECT
is to attemptfinishConnect()
once, and if it succeeds deregisterOP_CONNECT
and registerOP_READ
orOP_WRITE
, probably the latter as you are a client. Looping and sleeping in non-blocking mode doesn't make sense. IffinishConnect()
returns false,OP_CONNECT
will fire again.Your processing of
!key.isAcceptable()
,!key.isReadable()
, and!key.isWriteable()
makes absolutely zero sense whatsoever. If the key is acceptable, callaccept()
. If it's readable, callread()
. If it's writeable, callwrite()
. It's as simple as that.You need to be aware that channels are almost always writeable, except for the brief periods when their socket send buffer is full. So only register for
OP_WRITE
when you have something to write, or better still afteryou've tried a write and got a zero return; then whenOP_WRITE
fires, retry the write and deregisterOP_WRITE
unless you got another zero.You are being far too economical with your
ByteBuffer
. In practice you need one per channel.You can save it as the key attachment so you can get it back when you need it. Otherwise you don't have any way of accumulating partial reads, which are certain to happen, or any way of retrying writes either.
正确的处理方式
OP_CONNECT
是尝试finishConnect()
一次,如果成功则注销OP_CONNECT
并注册OP_READ
或OP_WRITE
,可能是后者,因为您是客户。在非阻塞模式下循环和休眠是没有意义的。如果finishConnect()
返回 false,OP_CONNECT
将再次触发。您对
!key.isAcceptable()
、!key.isReadable()
、 和 的处理!key.isWriteable()
绝对是零意义的。如果密钥可以接受,请致电accept()
。如果可读,请调用read()
. 如果它是可写的,则调用write()
. 就这么简单。您需要注意通道几乎总是可写的,除非它们的套接字发送缓冲区已满的短暂时间。所以只
OP_WRITE
在你有东西要写的时候注册,或者在你尝试写并且得到零回报之后更好;然后当OP_WRITE
触发时,重试写入并注销,OP_WRITE
除非你得到另一个零。你的
ByteBuffer
. 实际上,每个通道需要一个。您可以将其保存为密钥附件,以便在需要时取回。否则,您没有任何方法可以累积部分读取,这肯定会发生,也没有任何方法可以重试写入。