Java 使用 Netty 的异步 HTTP 客户端
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/9745727/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Asynchronous HTTP client with Netty
提问by Nitzan Tomer
I'm new to netty and still strugling to find my way. I'm looking to create an http client that works asynchronously. The netty examples of http only show how to wait for IO operations, and not how to use addListener, and so I've been trying to figure this out for the last few days.
我是 netty 的新手,仍在努力寻找自己的方式。我正在寻找创建一个异步工作的 http 客户端。http 的 netty 示例只显示了如何等待 IO 操作,而不显示如何使用addListener,因此最近几天我一直在尝试解决这个问题。
I'm trying to create a request class that will handle all of the different states of a request, from connecting, sending the data, handling the response and then closing of the connection. In order to do that my class extends SimpleChannelUpstreamHandlerand implements ChannelFutureListener. I use a ChannelPipelineFactorywhich adds the (this) instance the class (as a SimpleChannelUpstreamHandler) to the pipeline as a handler.
我正在尝试创建一个请求类,它将处理请求的所有不同状态,从连接、发送数据、处理响应然后关闭连接。为了做到这一点,我的类扩展了SimpleChannelUpstreamHandler并实现了ChannelFutureListener。我使用ChannelPipelineFactory将(这个)实例类(作为SimpleChannelUpstreamHandler)添加到管道作为处理程序。
The connection is created like this:
连接是这样创建的:
this.state = State.Connecting;
this.clientBootstrap.connect(this.address).addListener(this);
Then the operationCompletemethod:
然后是operationComplete方法:
@Override
public void operationComplete(ChannelFuture future) throws Exception {
State oldState = this.state;
if (!future.isSuccess()) {
this.status = Status.Failed;
future.getChannel().disconnect().addListener(this);
}
else if (future.isCancelled()) {
this.status = Status.Canceled;
future.getChannel().disconnect().addListener(this);
}
else switch (this.state) {
case Connecting:
this.state = State.Sending;
Channel channel = future.getChannel();
channel.write(this.createRequest()).addListener(this);
break;
case Sending:
this.state = State.Disconnecting;
future.getChannel().disconnect().addListener(this);
break;
case Disconnecting:
this.state = State.Closing;
future.getChannel().close().addListener(this);
break;
case Closing:
this.state = State.Finished;
break;
}
System.out.println("request operationComplete start state: " + oldState + ", end state: " + this.state + ", status: " + this.status);
}
private HttpRequest createRequest() {
String url = this.url.toString();
HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url);
request.setHeader(HttpHeaders.Names.HOST, this.url.getHost());
request.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
request.setHeader(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP);
return request;
}
The class also overrides the messageReceivedmethod:
该类还覆盖了messageReceived方法:
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
System.out.println("messageReceived");
HttpResponse response = (HttpResponse) e.getMessage();
ChannelBuffer content = response.getContent();
if (content.readable()) {
System.out.println("CONTENT: " + content.toString(CharsetUtil.UTF_8));
}
}
The problem is that I get this output:
问题是我得到了这个输出:
request operationComplete start state: Connecting, end state: Sending, status: Unknown
request operationComplete start state: Sending, end state: Disconnecting, status: Unknown
request operationComplete start state: Closing, end state: Finished, status: Unknown
request operationComplete start state: Disconnecting, end state: Finished, status: Unknown
As you can see the messageReceivedof the is not being executed for some reason, even though the pipeline factory adds the instance of this class to the pipeline.
如您所见,即使管道工厂将此类的实例添加到管道中,但由于某种原因未执行的messageReceived。
Any ideas what I'm missing here? Thanks.
任何想法我在这里失踪?谢谢。
Edit
编辑
I managed to finally get this working thanks to the help of @JestanNirojan, in case someone will be interested in the solution:
由于@JestanNirojan 的帮助,我终于设法完成了这项工作,以防有人对解决方案感兴趣:
public class ClientRequest extends SimpleChannelUpstreamHandler {
....
public void connect() {
this.state = State.Connecting;
System.out.println(this.state);
this.clientBootstrap.connect(this.address);
}
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
this.state = State.Sending;
System.out.println(this.state);
ctx.getChannel().write(this.createRequest());
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
HttpResponse response = (HttpResponse) e.getMessage();
ChannelBuffer content = response.getContent();
if (content.readable()) {
System.out.println("CONTENT: " + content.toString(CharsetUtil.UTF_8));
}
this.state = State.Disconnecting;
System.out.println(this.state);
}
@Override
public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
this.state = State.Closing;
System.out.println(this.state);
}
@Override
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
this.state = State.Finished;
System.out.println(this.state);
}
private HttpRequest createRequest() {
String url = this.url.toString();
HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url);
request.setHeader(HttpHeaders.Names.HOST, this.url.getHost());
request.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
request.setHeader(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP);
return request;
}
}
采纳答案by Jestan Nirojan
You are using a ChannelFutureListener to do all operations in the channel (which is bad), and the future listener will be executed right after calling those channel operations.
您正在使用 ChannelFutureListener 来执行通道中的所有操作(这很糟糕),未来的侦听器将在调用这些通道操作后立即执行。
The problem is, After sending the message, channel is disconnected immediately and the handler can not receive the response message which comes later.
问题是,发送消息后,通道立即断开,处理程序无法接收到后来的响应消息。
........
case Sending:
this.state = State.Disconnecting;
future.getChannel().disconnect().addListener(this);
break;
........
you should not block the channel future thread at all. The best approach is extend the SimpleChannelUpstreamHandler's
你根本不应该阻塞通道未来的线程。最好的方法是扩展 SimpleChannelUpstreamHandler 的
channelConnected(..) {}
messageReceived(..) {}
channelDisconnected(..) {}
methods and react to those events. you can keep the state in that handler too.
方法并对这些事件做出反应。您也可以将状态保留在该处理程序中。