java Netty 得到一个 exceptionCaught() 事件被触发,它到达了 TextWebsocketEncoder 上的管道尾部

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/34634750/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-11-02 23:08:32  来源:igfitidea点击:

Netty getting An exceptionCaught() event was fired, and it reached at the tail of the pipeline on TextWebsocketEncoder

javaexceptionnetty

提问by user63898

I try to do simple web socket decode and then encode but I'm getting this exception when it pass the TextWebsocketDecoder handler:

我尝试进行简单的网络套接字解码然后编码,但是当它通过 TextWebsocketDecoder 处理程序时出现此异常:

io.netty.channel.DefaultChannelPipeline$TailContext exceptionCaught
WARNING: An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
    at io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:101)
    at io.netty.buffer.DefaultByteBufHolder.release(DefaultByteBufHolder.java:73)
    at io.netty.util.ReferenceCountUtil.release(ReferenceCountUtil.java:59)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:112)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:318)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:304)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:318)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:304)
    at io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler.channelRead(WebSocketServerProtocolHandler.java:147)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:318)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:304)
    at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:318)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:304)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:276)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:263)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:318)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:304)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
    at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:112)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
    at java.lang.Thread.run(Thread.java:745)

What I have is simple Initializer which work find until TextWebsocketEncoder:

我拥有的是简单的初始化程序,它可以在 TextWebsocketEncoder 之前找到:

public class ServerInitializer extends ChannelInitializer<Channel> {
    private final ChannelGroup group;

    public GameServerInitializer(ChannelGroup group) {
        this.group = group;
    }

    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new HttpServerCodec());
        pipeline.addLast(new HttpObjectAggregator(64 * 1024));
        pipeline.addLast(new ChunkedWriteHandler());
        pipeline.addLast(new HttpRequestHandler("/ws"));
        pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
        pipeline.addLast(new TextWebSocketFrameHandler(group));
        pipeline.addLast("textWebsocketDecoder",new TextWebsocketDecoder());
        pipeline.addLast("textWebsocketEncoder",new TextWebsocketEncoder());
    }
}

TextWebSocketFrameHandler

TextWebSocketFrameHandler

public class TextWebSocketFrameHandler  extends SimpleChannelInboundHandler<TextWebSocketFrame>{
     private final ChannelGroup group;

        public TextWebSocketFrameHandler(ChannelGroup group) {
            this.group = group;
        }

        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) {

                ctx.pipeline().remove(HttpRequestHandler.class);

                group.writeAndFlush(new TextWebSocketFrame("Client " + ctx.channel() + " joined"));

                group.add(ctx.channel());

            } else {
                super.userEventTriggered(ctx, evt);
            }
        }

        @Override
        public void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
             ctx.fireChannelRead(msg);
            //group.writeAndFlush(msg.retain());
        }
}

and this are the TextWebsocketDecoder and TextWebsocketEncoder :

这是 TextWebsocketDecoder 和 TextWebsocketEncoder :

TextWebsocketDecoder :

TextWebsocketDecoder :

public class TextWebsocketDecoder extends MessageToMessageDecoder<TextWebSocketFrame>
{

    @Override
    protected void decode(ChannelHandlerContext ctx, TextWebSocketFrame frame, List<Object> out) throws Exception
    {
        String json = frame.text(); 
        JSONObject jsonObject = new JSONObject(json);
        int type = jsonObject.getInt("type");
        JSONArray msgJsonArray = jsonObject.getJSONArray("msg");
        String user = msgJsonArray.getString(0);
        String pass = msgJsonArray.getString(1);
        String connectionkey = msgJsonArray.getString(2);
        int timestamp = jsonObject.getInt("timestamp");

        JSONObject responseJson = new JSONObject();
        responseJson.put("type",Config.LOGIN_SUCCESS);
        responseJson.put("connectionkey",connectionkey);

        out.add(responseJson); // After This im getting the exception !!!
    }
}

TextWebsocketEncoder

文本网络套接字编码器

import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;

public class TextWebsocketEncoder extends MessageToMessageEncoder<JSONObject>
{

    @Override
    protected void encode(ChannelHandlerContext arg0, JSONObject arg1, List<Object> out) throws Exception {
        String json = arg1.toString();
        out.add(new TextWebSocketFrame(json));          
    } 

}

采纳答案by Ferrybig

The exception

例外

Inside your TextWebSocketFrameHandler, you are calling ctx.fireChannelRead(msg);, this passes the message up 1 chain, however MessageToMessageDecoderisn't prepared to deal with this. To explain this problem I need to explain how the MessageToMessageDecoder works.

在您的 TextWebSocketFrameHandler 中,您正在调用ctx.fireChannelRead(msg);,这会将消息向上传递 1 个链,但MessageToMessageDecoder不准备处理此问题。为了解释这个问题,我需要解释 MessageToMessageDecoder 是如何工作的。

MessageToMessageDecoderworks by catching every message from the upstream and passing them to your custom code, your custom code handles the work, and the mtmd handles the closing of the resource you passed in.

MessageToMessageDecoder通过捕获来自上游的每条消息并将它们传递给您的自定义代码来工作,您的自定义代码处理工作,而 mtmd 处理您传入的资源的关闭。

Since you are passing the reference to the other side, you are effectively closing the WebSocketFrame multiple times, causing bugs. MessageToMessageDecoder even warns you for this in the javadoc.

由于您将引用传递给另一端,因此您实际上多次关闭 WebSocketFrame,从而导致错误。MessageToMessageDecoder 甚至会在javadoc 中警告您这一点。

To solve the problem, we follow the instruction in the manual and make our channelRead the following:

为了解决这个问题,我们按照手册中的说明,让我们的频道阅读以下内容:

@Override
public void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
     msg.retain(); // ferrybig: fixed bug http://stackoverflow.com/q/34634750/1542723
     ctx.fireChannelRead(msg);
     //group.writeAndFlush(msg.retain());
}

The not sending back problem

不发回问题

Inside your comments, you stated the code doesn't send anything back. This is expected as your pipeline only consumes data and passes it up the chain. To fix this, it would require some rework at your pipeline.

在您的评论中,您表示代码不会发回任何内容。这是预期的,因为您的管道只消耗数据并将其向上传递。要解决此问题,您的管道需要进行一些返工。

  1. We need to swap the order of the json-webframe decoder and encoder:

    pipeline.addLast("textWebsocketDecoder",new TextWebsocketEncoder());
    pipeline.addLast("textWebsocketEncoder",new TextWebsocketDecoder());
    

    This is because your Decoder is generating the output that would be send back ↑ the chain of handlers, this output won't be seen by the encoder if the decoder was above that. (Your decoder shouldn't be called a decoder following the netty naming)

  2. We need to change your decoder to send the generated data actually back ↑ the chain instead of ↓ into the non-existing void.

    To make these changes, we going to let the TextWebSocketDecoderextend ChannelInboundHandlerAdapterinstead of MessageToMessageDecoder<TextWebSocketFrame>since we are handling messages instead of passing them to a other handler.

    We are changing the signature of the decode method to channelRead(ChannelHandlerContext ctx, Object msg), and add some boilerplate code:

    public void channelRead(ChannelHandlerContext ctx, Object msg) /* throws Exception */
        TextWebSocketFrame frame = (TextWebSocketFrame) msg;
        try {
            /* Remaining code, follow the steps further of see end result */
        } finally {
            frame.release();
        }
    }
    
  3. We adapt our code to pass the result up the pipeline instead of down:

    public void channelRead(ChannelHandlerContext ctx, Object msg) /* throws Exception */
        TextWebSocketFrame frame = (TextWebSocketFrame) msg;
        try {
    
            String json = frame.text(); 
            JSONObject jsonObject = new JSONObject(json);
            int type = jsonObject.getInt("type");
            JSONArray msgJsonArray = jsonObject.getJSONArray("msg");
            String user = msgJsonArray.getString(0);
            String pass = msgJsonArray.getString(1);
            String connectionkey = msgJsonArray.getString(2);
            int timestamp = jsonObject.getInt("timestamp");
    
            JSONObject responseJson = new JSONObject();
            responseJson.put("type",Config.LOGIN_SUCCESS);
            responseJson.put("connectionkey",connectionkey);
    
            ctx.writeAndFlush(responseJson)
        } finally {
            frame.release();
        }
    }
    
  1. 我们需要交换 json-webframe 解码器和编码器的顺序:

    pipeline.addLast("textWebsocketDecoder",new TextWebsocketEncoder());
    pipeline.addLast("textWebsocketEncoder",new TextWebsocketDecoder());
    

    这是因为您的解码器正在生成将发送回 ↑ 处理程序链的输出,如果解码器高于该输出,则编码器将看不到该输出。(您的解码器不应按照 netty 命名被称为解码器)

  2. 我们需要更改您的解码器以将生成的数据实际发送回 ↑ 链而不是 ↓ 到不存在的空隙中。

    为了进行这些更改,我们将使用TextWebSocketDecoder扩展ChannelInboundHandlerAdapter而不是MessageToMessageDecoder<TextWebSocketFrame>因为我们正在处理消息而不是将它们传递给其他处理程序。

    我们将 decode 方法的签名更改为channelRead(ChannelHandlerContext ctx, Object msg),并添加一些样板代码:

    public void channelRead(ChannelHandlerContext ctx, Object msg) /* throws Exception */
        TextWebSocketFrame frame = (TextWebSocketFrame) msg;
        try {
            /* Remaining code, follow the steps further of see end result */
        } finally {
            frame.release();
        }
    }
    
  3. 我们调整我们的代码以将结果向上传递而不是向下传递:

    public void channelRead(ChannelHandlerContext ctx, Object msg) /* throws Exception */
        TextWebSocketFrame frame = (TextWebSocketFrame) msg;
        try {
    
            String json = frame.text(); 
            JSONObject jsonObject = new JSONObject(json);
            int type = jsonObject.getInt("type");
            JSONArray msgJsonArray = jsonObject.getJSONArray("msg");
            String user = msgJsonArray.getString(0);
            String pass = msgJsonArray.getString(1);
            String connectionkey = msgJsonArray.getString(2);
            int timestamp = jsonObject.getInt("timestamp");
    
            JSONObject responseJson = new JSONObject();
            responseJson.put("type",Config.LOGIN_SUCCESS);
            responseJson.put("connectionkey",connectionkey);
    
            ctx.writeAndFlush(responseJson)
        } finally {
            frame.release();
        }
    }
    

Notice that you may be tempted to remove our previous code from the exception, but doing this will trigger undefined behavior when ran under the async nature of netty.

请注意,您可能想从异常中删除我们之前的代码,但是在 netty 的异步性质下运行时,这样做会触发未定义的行为。

回答by Prim

You use SimpleChannelInboundHandlerwhich auto-releases catched data according to documentation.

您可以SimpleChannelInboundHandler根据文档使用which 自动发布捕获的数据。

So, when you call ctx.fireChannelRead(msg);to pass msg to others handlers on pipeline, there is a problem besauce msg will be released.

因此,当您调用ctx.fireChannelRead(msg);将 msg 传递给管道上的其他处理程序时,会出现问题,因为 msg 将被释放。

To fix this, you can use ChannelInboundHandlerAdapteror you can stop auto-releasing process of SimpleChannelInboundHandlerby calling the proper constructor, or you can call ReferenceCountUtil.retain(msg);before firing upper on pipeline.

要解决此问题,您可以使用ChannelInboundHandlerAdapter或可以SimpleChannelInboundHandler通过调用正确的构造函数来停止自动释放过程,或者您可以ReferenceCountUtil.retain(msg);在管道上触发上层之前调用。

See documentation of SimpleChannelInboundHandler here: http://netty.io/4.0/api/io/netty/channel/SimpleChannelInboundHandler.html

在此处查看 SimpleChannelInboundHandler 的文档:http: //netty.io/4.0/api/io/netty/channel/SimpleChannelInboundHandler.html

and read about Reference counted objects here (new concept of netty 4): http://netty.io/wiki/reference-counted-objects.html

并在此处阅读引用计数对象(netty 4 的新概念):http: //netty.io/wiki/reference-counted-objects.html