上次用netty写的一个玩具http server,发现了一个问题,为啥channelRead0方法会被调用两次,这里我们来研究下 我们在收到http请求的时候需要经过的一个必要的过程就是编解码,而这里我们用的是 io.netty.handler.codec.http.HttpServerCodec
1 pipeline.addLast(new HttpServerCodec ());
它继承了 io.netty.channel.CombinedChannelDuplexHandler
1 2 3 4 public HttpServerCodec (HttpDecoderConfig config) { this .queue = new ArrayDeque (); this .init(new HttpServerRequestDecoder (config), new HttpServerResponseEncoder ()); }
在初始化的时候调用了 io.netty.channel.CombinedChannelDuplexHandler#init
传入参数
1 2 3 4 public HttpServerCodec (HttpDecoderConfig config) { this .queue = new ArrayDeque (); this .init(new HttpServerRequestDecoder (config), new HttpServerResponseEncoder ()); }
这个 HttpServerRequestDecoder
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private final class HttpServerRequestDecoder extends HttpRequestDecoder { HttpServerRequestDecoder(HttpDecoderConfig config) { super (config); } protected void decode (ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception { int oldSize = out.size(); super .decode(ctx, buffer, out); int size = out.size(); for (int i = oldSize; i < size; ++i) { Object obj = out.get(i); if (obj instanceof HttpRequest) { HttpServerCodec.this .queue.add(((HttpRequest)obj).method()); } } }
这个decode方法就会调用到父类的decode方法 io.netty.handler.codec.http.HttpObjectDecoder#decode
protected void decode (ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception { if (this .resetRequested.get()) { this .resetNow(); } int toRead; int toRead; ByteBuf line; switch (this .currentState) { case SKIP_CONTROL_CHARS: case READ_INITIAL: try { line = this .lineParser.parse(buffer); if (line == null ) { return ; } String[] initialLine = this .splitInitialLine(line); assert initialLine.length == 3 : "initialLine::length must be 3" ; this .message = this .createMessage(initialLine); this .currentState = HttpObjectDecoder.State.READ_HEADER; } catch (Exception var9) { out.add(this .invalidMessage(this .message, buffer, var9)); return ; } case READ_HEADER: try { State nextState = this .readHeaders(buffer); if (nextState == null ) { return ; } this .currentState = nextState; switch (nextState) { case SKIP_CONTROL_CHARS: this .addCurrentMessage(out); out.add(LastHttpContent.EMPTY_LAST_CONTENT); this .resetNow(); return ; case READ_CHUNK_SIZE: if (!this .chunkedSupported) { throw new IllegalArgumentException ("Chunked messages not supported" ); } this .addCurrentMessage(out); return ; default : if (this .contentLength == 0L || this .contentLength == -1L && this .isDecodingRequest()) { this .addCurrentMessage(out); out.add(LastHttpContent.EMPTY_LAST_CONTENT); this .resetNow(); return ; } assert nextState == HttpObjectDecoder.State.READ_FIXED_LENGTH_CONTENT || nextState == HttpObjectDecoder.State.READ_VARIABLE_LENGTH_CONTENT; this .addCurrentMessage(out); if (nextState == HttpObjectDecoder.State.READ_FIXED_LENGTH_CONTENT) { this .chunkSize = this .contentLength; } return ; } } catch (Exception var10) { out.add(this .invalidMessage(this .message, buffer, var10)); return ; } case READ_CHUNK_SIZE: try { line = this .lineParser.parse(buffer); if (line == null ) { return ; } toRead = getChunkSize(line.array(), line.arrayOffset() + line.readerIndex(), line.readableBytes()); this .chunkSize = (long )toRead; if (toRead == 0 ) { this .currentState = HttpObjectDecoder.State.READ_CHUNK_FOOTER; return ; } this .currentState = HttpObjectDecoder.State.READ_CHUNKED_CONTENT; } catch (Exception var8) { out.add(this .invalidChunk(buffer, var8)); return ; } case READ_CHUNKED_CONTENT: assert this .chunkSize <= 2147483647L ; toRead = Math.min((int )this .chunkSize, this .maxChunkSize); if (!this .allowPartialChunks && buffer.readableBytes() < toRead) { return ; } toRead = Math.min(toRead, buffer.readableBytes()); if (toRead == 0 ) { return ; } HttpContent chunk = new DefaultHttpContent (buffer.readRetainedSlice(toRead)); this .chunkSize -= (long )toRead; out.add(chunk); if (this .chunkSize != 0L ) { return ; } this .currentState = HttpObjectDecoder.State.READ_CHUNK_DELIMITER; case READ_CHUNK_DELIMITER: toRead = buffer.writerIndex(); toRead = buffer.readerIndex(); while (toRead > toRead) { byte next = buffer.getByte(toRead++); if (next == 10 ) { this .currentState = HttpObjectDecoder.State.READ_CHUNK_SIZE; break ; } } buffer.readerIndex(toRead); return ; case READ_VARIABLE_LENGTH_CONTENT: toRead = Math.min(buffer.readableBytes(), this .maxChunkSize); if (toRead > 0 ) { ByteBuf content = buffer.readRetainedSlice(toRead); out.add(new DefaultHttpContent (content)); } return ; case READ_FIXED_LENGTH_CONTENT: toRead = buffer.readableBytes(); if (toRead == 0 ) { return ; } toRead = Math.min(toRead, this .maxChunkSize); if ((long )toRead > this .chunkSize) { toRead = (int )this .chunkSize; } ByteBuf content = buffer.readRetainedSlice(toRead); this .chunkSize -= (long )toRead; if (this .chunkSize == 0L ) { out.add(new DefaultLastHttpContent (content, this .trailersFactory)); this .resetNow(); } else { out.add(new DefaultHttpContent (content)); } return ; case READ_CHUNK_FOOTER: try { LastHttpContent trailer = this .readTrailingHeaders(buffer); if (trailer == null ) { return ; } out.add(trailer); this .resetNow(); return ; } catch (Exception var7) { out.add(this .invalidChunk(buffer, var7)); return ; } case BAD_MESSAGE: buffer.skipBytes(buffer.readableBytes()); break ; case UPGRADED: toRead = buffer.readableBytes(); if (toRead > 0 ) { out.add(buffer.readBytes(toRead)); } } }
这个方法里会做实际的byte转换成message或者说string的转换 而在我们把消息转换完成, 会在最后加一个 io.netty.handler.codec.http.LastHttpContent#EMPTY_LAST_CONTENT
这样其实我的message就会变成两条,也就是为啥channelRead0会被调用两次,这样的目的也是让我的业务代码可以分辨是否请求已经读完,并且可以在读完以后有一些其他操作空间 可以看到第一次调用channelRead0的msg 第二次就是