Java 如何使用netty客户端获取服务器响应
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/23128232/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to get server response with netty client
提问by Moses
I want to write a netty based client. It should have method public String send(String msg);which should return response from the server or some future - doesen't matter. Also it should be multithreaded. Like this:
我想写一个基于 netty 的客户端。它应该有方法public String send(String msg); 哪个应该从服务器或将来返回响应 - 无关紧要。它也应该是多线程的。像这样:
public class Client {
public static void main(String[] args) throws InterruptedException {
Client client = new Client();
}
private Channel channel;
public Client() throws InterruptedException {
EventLoopGroup loopGroup = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(loopGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringDecoder()).
addLast(new StringEncoder()).
addLast(new ClientHandler());
}
});
channel = b.connect("localhost", 9091).sync().channel();
}
public String sendMessage(String msg) {
channel.writeAndFlush(msg);
return ??????????;
}
}
}
And I don't get how can I retrieve response from server after I invoke writeAndFlush(); What should I do?
在调用 writeAndFlush(); 后,我不知道如何从服务器检索响应;我该怎么办?
Also I use Netty 4.0.18.Final
我也使用 Netty 4.0.18.Final
回答by Teots
Calling channel.writeAndFlush(msg);
already returns a ChannelFuture. To handle the result of this method call, you could add a listener to the future like this:
调用channel.writeAndFlush(msg);
已经返回一个 ChannelFuture。要处理此方法调用的结果,您可以像这样向 future 添加一个侦听器:
future.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
// Perform post-closure operation
// ...
}
});
(this is taken from the Netty documentation see: Netty doc)
(这是从 Netty 文档中获取的,请参阅:Netty doc)
回答by Ferrybig
Returning a Future<String>
for the method is simple, we are going to implement the following method signature:
Future<String>
为方法返回 a很简单,我们将实现以下方法签名:
public Futute<String> sendMessage(String msg) {
The is relatively easy to do when you are known with the async programming structures. To solve the design problem, we are going to do the following steps:
当您熟悉异步编程结构时,这相对容易做到。为了解决设计问题,我们将执行以下步骤:
When a message is written, add a
Promise<String>
to aArrayBlockingQueue<Promise>
This will serve as a list of what messages have recently been send, and allows us to change our
Future<String>
objects return result.When a message arrives back into the handler, resolve it against the head of the
Queue
This allows us to get the correct future to change.
Update the state of the
Promise<String>
We call
promise.setSuccess()
to finally set the state on the object, this will propagate back to the future object.
写入消息时,将 a 添加
Promise<String>
到 aArrayBlockingQueue<Promise>
这将用作最近发送的消息列表,并允许我们更改
Future<String>
对象返回结果。当消息返回到处理程序时,根据消息的头部解析它
Queue
这使我们能够得到正确的未来改变。
更新状态
Promise<String>
我们调用
promise.setSuccess()
finally设置对象的状态,这将传播回未来的对象。
Example code
示例代码
public class ClientHandler extends SimpleChannelInboundHandler<String> {
private ChannelHandlerContext ctx;
private BlockingQueue<Promise<String>> messageList = new ArrayBlockingQueue<>(16);
@Override
public void channelActive(ChannelHandlerContext ctx) {
super.channelActive(ctx);
this.ctx = ctx;
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
super.channelInactive(ctx);
synchronized(this){
Promise<String> prom;
while((prom = messageList.poll()) != null)
prom.setFailure(new IOException("Connection lost"));
messageList = null;
}
}
public Future<String> sendMessage(String message) {
if(ctx == null)
throw new IllegalStateException();
return sendMessage(message, ctx.executor().newPromise());
}
public Future<String> sendMessage(String message, Promise<String> prom) {
synchronized(this){
if(messageList == null) {
// Connection closed
prom.setFailure(new IllegalStateException());
} else if(messageList.offer(prom)) {
// Connection open and message accepted
ctx.writeAndFlush(message).addListener();
} else {
// Connection open and message rejected
prom.setFailure(new BufferOverflowException());
}
return prom;
}
}
@Override
protected void messageReceived(ChannelHandlerContext ctx, String msg) {
synchronized(this){
if(messageList != null) {
messageList.poll().setSuccess(msg);
}
}
}
}
Documentation breakdown
文件分解
private ChannelHandlerContext ctx;
Used to store our reference to the ChannelHandlerContext, we use this so we can create promises
private BlockingQueue<Promise<String>> messageList = new ArrayBlockingQueue<>();
We keep the past messages in this list so we can change the result of the future
public void channelActive(ChannelHandlerContext ctx)
Called by netty when the connection becomes active. Init our variables here.
public void channelInactive(ChannelHandlerContext ctx)
Called by netty when the connection becomes inactive, either due to error or normal connection close.
protected void messageReceived(ChannelHandlerContext ctx, String msg)
Called by netty when a new message arrives, here pick out the head of the queue, and then we call setsuccess on it.
private ChannelHandlerContext ctx;
用于存储我们对 ChannelHandlerContext 的引用,我们使用它来创建承诺
private BlockingQueue<Promise<String>> messageList = new ArrayBlockingQueue<>();
我们将过去的消息保留在此列表中,以便我们可以更改未来的结果
public void channelActive(ChannelHandlerContext ctx)
当连接变为活动时由 netty 调用。在这里初始化我们的变量。
public void channelInactive(ChannelHandlerContext ctx)
当连接因错误或正常连接关闭而变为非活动状态时由 netty 调用。
protected void messageReceived(ChannelHandlerContext ctx, String msg)
当有新消息到达时由netty调用,这里挑出队列的头部,然后我们对其调用setsuccess。
Warning advise
警告提示
When using futures, there is 1 thing you need to lookout for, do not call get() from 1 of the netty threads if the future isn't done yet, failure to follow this simple rule will either result in a deadlock or a BlockingOperationException
.
使用期货时,您需要注意一件事,如果未来尚未完成,请不要从 1 个 netty 线程调用 get() ,不遵循此简单规则将导致死锁或BlockingOperationException
.
回答by wu hardy
You can find the sample in netty project. We can save the result into the last handler's custom fields. In the following code, it is handler.getFactorial() that is what we want.
您可以在 netty 项目中找到示例。我们可以将结果保存到最后一个处理程序的自定义字段中。在下面的代码中,我们想要的是 handler.getFactorial() 。
refer to http://www.lookatsrc.com/source/io/netty/example/factorial/FactorialClient.java?a=io.netty:netty-all
参考http://www.lookatsrc.com/source/io/netty/example/factorial/FactorialClient.java?a=io.netty:netty-all
FactorialClient.java
因子客户端.java
public final class FactorialClient {
static final boolean SSL = System.getProperty("ssl") != null;
static final String HOST = System.getProperty("host", "127.0.0.1");
static final int PORT = Integer.parseInt(System.getProperty("port", "8322"));
static final int COUNT = Integer.parseInt(System.getProperty("count", "1000"));
public static void main(String[] args) throws Exception {
// Configure SSL.
final SslContext sslCtx;
if (SSL) {
sslCtx = SslContextBuilder.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE).build();
} else {
sslCtx = null;
}
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new FactorialClientInitializer(sslCtx));
// Make a new connection.
ChannelFuture f = b.connect(HOST, PORT).sync();
// Get the handler instance to retrieve the answer.
FactorialClientHandler handler =
(FactorialClientHandler) f.channel().pipeline().last();
// Print out the answer.
System.err.format("Factorial of %,d is: %,d", COUNT, handler.getFactorial());
} finally {
group.shutdownGracefully();
}
}
}
public class FactorialClientHandler extends SimpleChannelInboundHandler<BigInteger> {
private ChannelHandlerContext ctx;
private int receivedMessages;
private int next = 1;
final BlockingQueue<BigInteger> answer = new LinkedBlockingQueue<BigInteger>();
public BigInteger getFactorial() {
boolean interrupted = false;
try {
for (;;) {
try {
return answer.take();
} catch (InterruptedException ignore) {
interrupted = true;
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
this.ctx = ctx;
sendNumbers();
}
@Override
public void channelRead0(ChannelHandlerContext ctx, final BigInteger msg) {
receivedMessages ++;
if (receivedMessages == FactorialClient.COUNT) {
// Offer the answer after closing the connection.
ctx.channel().close().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
boolean offered = answer.offer(msg);
assert offered;
}
});
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
private void sendNumbers() {
// Do not send more than 4096 numbers.
ChannelFuture future = null;
for (int i = 0; i < 4096 && next <= FactorialClient.COUNT; i++) {
future = ctx.write(Integer.valueOf(next));
next++;
}
if (next <= FactorialClient.COUNT) {
assert future != null;
future.addListener(numberSender);
}
ctx.flush();
}
private final ChannelFutureListener numberSender = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
sendNumbers();
} else {
future.cause().printStackTrace();
future.channel().close();
}
}
};
}