用netty实现一个简单的http server-深入理解下

上次用netty写的一个玩具http server,发现了一个问题,为啥channelRead0方法会被调用两次,这里我们来研究下
我们在收到http请求的时候需要经过的一个必要的过程就是编解码,而这里我们用的是 io.netty.handler.codec.http.HttpServerCodec

1
pipeline.addLast(new HttpServerCodec());

它继承了 io.netty.channel.CombinedChannelDuplexHandler

1
2
3
4
public HttpServerCodec(HttpDecoderConfig config) {
this.queue = new ArrayDeque();
this.init(new HttpServerRequestDecoder(config), new HttpServerResponseEncoder());
}

在初始化的时候调用了 io.netty.channel.CombinedChannelDuplexHandler#init
传入参数

1
2
3
4
public HttpServerCodec(HttpDecoderConfig config) {
this.queue = new ArrayDeque();
this.init(new HttpServerRequestDecoder(config), new HttpServerResponseEncoder());
}

这个 HttpServerRequestDecoder

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private final class HttpServerRequestDecoder extends HttpRequestDecoder {
HttpServerRequestDecoder(HttpDecoderConfig config) {
super(config);
}

protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
int oldSize = out.size();
super.decode(ctx, buffer, out);
int size = out.size();

for(int i = oldSize; i < size; ++i) {
Object obj = out.get(i);
if (obj instanceof HttpRequest) {
HttpServerCodec.this.queue.add(((HttpRequest)obj).method());
}
}

}

这个decode方法就会调用到父类的decode方法 io.netty.handler.codec.http.HttpObjectDecoder#decode

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
if (this.resetRequested.get()) {
this.resetNow();
}

int toRead;
int toRead;
ByteBuf line;
switch (this.currentState) {
case SKIP_CONTROL_CHARS:
case READ_INITIAL:
try {
line = this.lineParser.parse(buffer);
if (line == null) {
return;
}

String[] initialLine = this.splitInitialLine(line);

assert initialLine.length == 3 : "initialLine::length must be 3";

this.message = this.createMessage(initialLine);
this.currentState = HttpObjectDecoder.State.READ_HEADER;
} catch (Exception var9) {
out.add(this.invalidMessage(this.message, buffer, var9));
return;
}
case READ_HEADER:
try {
State nextState = this.readHeaders(buffer);
if (nextState == null) {
return;
}

this.currentState = nextState;
switch (nextState) {
case SKIP_CONTROL_CHARS:
this.addCurrentMessage(out);
out.add(LastHttpContent.EMPTY_LAST_CONTENT);
this.resetNow();
return;
case READ_CHUNK_SIZE:
if (!this.chunkedSupported) {
throw new IllegalArgumentException("Chunked messages not supported");
}

this.addCurrentMessage(out);
return;
default:
if (this.contentLength == 0L || this.contentLength == -1L && this.isDecodingRequest()) {
this.addCurrentMessage(out);
out.add(LastHttpContent.EMPTY_LAST_CONTENT);
this.resetNow();
return;
}

assert nextState == HttpObjectDecoder.State.READ_FIXED_LENGTH_CONTENT || nextState == HttpObjectDecoder.State.READ_VARIABLE_LENGTH_CONTENT;

this.addCurrentMessage(out);
if (nextState == HttpObjectDecoder.State.READ_FIXED_LENGTH_CONTENT) {
this.chunkSize = this.contentLength;
}

return;
}
} catch (Exception var10) {
out.add(this.invalidMessage(this.message, buffer, var10));
return;
}
case READ_CHUNK_SIZE:
try {
line = this.lineParser.parse(buffer);
if (line == null) {
return;
}

toRead = getChunkSize(line.array(), line.arrayOffset() + line.readerIndex(), line.readableBytes());
this.chunkSize = (long)toRead;
if (toRead == 0) {
this.currentState = HttpObjectDecoder.State.READ_CHUNK_FOOTER;
return;
}

this.currentState = HttpObjectDecoder.State.READ_CHUNKED_CONTENT;
} catch (Exception var8) {
out.add(this.invalidChunk(buffer, var8));
return;
}
case READ_CHUNKED_CONTENT:
assert this.chunkSize <= 2147483647L;

toRead = Math.min((int)this.chunkSize, this.maxChunkSize);
if (!this.allowPartialChunks && buffer.readableBytes() < toRead) {
return;
}

toRead = Math.min(toRead, buffer.readableBytes());
if (toRead == 0) {
return;
}

HttpContent chunk = new DefaultHttpContent(buffer.readRetainedSlice(toRead));
this.chunkSize -= (long)toRead;
out.add(chunk);
if (this.chunkSize != 0L) {
return;
}

this.currentState = HttpObjectDecoder.State.READ_CHUNK_DELIMITER;
case READ_CHUNK_DELIMITER:
toRead = buffer.writerIndex();
toRead = buffer.readerIndex();

while(toRead > toRead) {
byte next = buffer.getByte(toRead++);
if (next == 10) {
this.currentState = HttpObjectDecoder.State.READ_CHUNK_SIZE;
break;
}
}

buffer.readerIndex(toRead);
return;
case READ_VARIABLE_LENGTH_CONTENT:
toRead = Math.min(buffer.readableBytes(), this.maxChunkSize);
if (toRead > 0) {
ByteBuf content = buffer.readRetainedSlice(toRead);
out.add(new DefaultHttpContent(content));
}

return;
case READ_FIXED_LENGTH_CONTENT:
toRead = buffer.readableBytes();
if (toRead == 0) {
return;
}

toRead = Math.min(toRead, this.maxChunkSize);
if ((long)toRead > this.chunkSize) {
toRead = (int)this.chunkSize;
}

ByteBuf content = buffer.readRetainedSlice(toRead);
this.chunkSize -= (long)toRead;
if (this.chunkSize == 0L) {
out.add(new DefaultLastHttpContent(content, this.trailersFactory));
this.resetNow();
} else {
out.add(new DefaultHttpContent(content));
}

return;
case READ_CHUNK_FOOTER:
try {
LastHttpContent trailer = this.readTrailingHeaders(buffer);
if (trailer == null) {
return;
}

out.add(trailer);
this.resetNow();
return;
} catch (Exception var7) {
out.add(this.invalidChunk(buffer, var7));
return;
}
case BAD_MESSAGE:
buffer.skipBytes(buffer.readableBytes());
break;
case UPGRADED:
toRead = buffer.readableBytes();
if (toRead > 0) {
out.add(buffer.readBytes(toRead));
}
}

}

这个方法里会做实际的byte转换成message或者说string的转换
而在我们把消息转换完成,

会在最后加一个 io.netty.handler.codec.http.LastHttpContent#EMPTY_LAST_CONTENT
这样其实我的message就会变成两条,也就是为啥channelRead0会被调用两次,这样的目的也是让我的业务代码可以分辨是否请求已经读完,并且可以在读完以后有一些其他操作空间
可以看到第一次调用channelRead0的msg

第二次就是