Nicksxs's Blog

What hurts more, the pain of hard work or the pain of regret?

上次用netty写的一个玩具http server,发现了一个问题,为啥channelRead0方法会被调用两次,这里我们来研究下
我们在收到http请求的时候需要经过的一个必要的过程就是编解码,而这里我们用的是 io.netty.handler.codec.http.HttpServerCodec

1
pipeline.addLast(new HttpServerCodec());

它继承了 io.netty.channel.CombinedChannelDuplexHandler

1
2
3
4
public HttpServerCodec(HttpDecoderConfig config) {
this.queue = new ArrayDeque();
this.init(new HttpServerRequestDecoder(config), new HttpServerResponseEncoder());
}

在初始化的时候调用了 io.netty.channel.CombinedChannelDuplexHandler#init
传入参数

1
2
3
4
public HttpServerCodec(HttpDecoderConfig config) {
this.queue = new ArrayDeque();
this.init(new HttpServerRequestDecoder(config), new HttpServerResponseEncoder());
}

这个 HttpServerRequestDecoder

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private final class HttpServerRequestDecoder extends HttpRequestDecoder {
HttpServerRequestDecoder(HttpDecoderConfig config) {
super(config);
}

protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
int oldSize = out.size();
super.decode(ctx, buffer, out);
int size = out.size();

for(int i = oldSize; i < size; ++i) {
Object obj = out.get(i);
if (obj instanceof HttpRequest) {
HttpServerCodec.this.queue.add(((HttpRequest)obj).method());
}
}

}

这个decode方法就会调用到父类的decode方法 io.netty.handler.codec.http.HttpObjectDecoder#decode

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
if (this.resetRequested.get()) {
this.resetNow();
}

int toRead;
int toRead;
ByteBuf line;
switch (this.currentState) {
case SKIP_CONTROL_CHARS:
case READ_INITIAL:
try {
line = this.lineParser.parse(buffer);
if (line == null) {
return;
}

String[] initialLine = this.splitInitialLine(line);

assert initialLine.length == 3 : "initialLine::length must be 3";

this.message = this.createMessage(initialLine);
this.currentState = HttpObjectDecoder.State.READ_HEADER;
} catch (Exception var9) {
out.add(this.invalidMessage(this.message, buffer, var9));
return;
}
case READ_HEADER:
try {
State nextState = this.readHeaders(buffer);
if (nextState == null) {
return;
}

this.currentState = nextState;
switch (nextState) {
case SKIP_CONTROL_CHARS:
this.addCurrentMessage(out);
out.add(LastHttpContent.EMPTY_LAST_CONTENT);
this.resetNow();
return;
case READ_CHUNK_SIZE:
if (!this.chunkedSupported) {
throw new IllegalArgumentException("Chunked messages not supported");
}

this.addCurrentMessage(out);
return;
default:
if (this.contentLength == 0L || this.contentLength == -1L && this.isDecodingRequest()) {
this.addCurrentMessage(out);
out.add(LastHttpContent.EMPTY_LAST_CONTENT);
this.resetNow();
return;
}

assert nextState == HttpObjectDecoder.State.READ_FIXED_LENGTH_CONTENT || nextState == HttpObjectDecoder.State.READ_VARIABLE_LENGTH_CONTENT;

this.addCurrentMessage(out);
if (nextState == HttpObjectDecoder.State.READ_FIXED_LENGTH_CONTENT) {
this.chunkSize = this.contentLength;
}

return;
}
} catch (Exception var10) {
out.add(this.invalidMessage(this.message, buffer, var10));
return;
}
case READ_CHUNK_SIZE:
try {
line = this.lineParser.parse(buffer);
if (line == null) {
return;
}

toRead = getChunkSize(line.array(), line.arrayOffset() + line.readerIndex(), line.readableBytes());
this.chunkSize = (long)toRead;
if (toRead == 0) {
this.currentState = HttpObjectDecoder.State.READ_CHUNK_FOOTER;
return;
}

this.currentState = HttpObjectDecoder.State.READ_CHUNKED_CONTENT;
} catch (Exception var8) {
out.add(this.invalidChunk(buffer, var8));
return;
}
case READ_CHUNKED_CONTENT:
assert this.chunkSize <= 2147483647L;

toRead = Math.min((int)this.chunkSize, this.maxChunkSize);
if (!this.allowPartialChunks && buffer.readableBytes() < toRead) {
return;
}

toRead = Math.min(toRead, buffer.readableBytes());
if (toRead == 0) {
return;
}

HttpContent chunk = new DefaultHttpContent(buffer.readRetainedSlice(toRead));
this.chunkSize -= (long)toRead;
out.add(chunk);
if (this.chunkSize != 0L) {
return;
}

this.currentState = HttpObjectDecoder.State.READ_CHUNK_DELIMITER;
case READ_CHUNK_DELIMITER:
toRead = buffer.writerIndex();
toRead = buffer.readerIndex();

while(toRead > toRead) {
byte next = buffer.getByte(toRead++);
if (next == 10) {
this.currentState = HttpObjectDecoder.State.READ_CHUNK_SIZE;
break;
}
}

buffer.readerIndex(toRead);
return;
case READ_VARIABLE_LENGTH_CONTENT:
toRead = Math.min(buffer.readableBytes(), this.maxChunkSize);
if (toRead > 0) {
ByteBuf content = buffer.readRetainedSlice(toRead);
out.add(new DefaultHttpContent(content));
}

return;
case READ_FIXED_LENGTH_CONTENT:
toRead = buffer.readableBytes();
if (toRead == 0) {
return;
}

toRead = Math.min(toRead, this.maxChunkSize);
if ((long)toRead > this.chunkSize) {
toRead = (int)this.chunkSize;
}

ByteBuf content = buffer.readRetainedSlice(toRead);
this.chunkSize -= (long)toRead;
if (this.chunkSize == 0L) {
out.add(new DefaultLastHttpContent(content, this.trailersFactory));
this.resetNow();
} else {
out.add(new DefaultHttpContent(content));
}

return;
case READ_CHUNK_FOOTER:
try {
LastHttpContent trailer = this.readTrailingHeaders(buffer);
if (trailer == null) {
return;
}

out.add(trailer);
this.resetNow();
return;
} catch (Exception var7) {
out.add(this.invalidChunk(buffer, var7));
return;
}
case BAD_MESSAGE:
buffer.skipBytes(buffer.readableBytes());
break;
case UPGRADED:
toRead = buffer.readableBytes();
if (toRead > 0) {
out.add(buffer.readBytes(toRead));
}
}

}

这个方法里会做实际的byte转换成message或者说string的转换
而在我们把消息转换完成,

会在最后加一个 io.netty.handler.codec.http.LastHttpContent#EMPTY_LAST_CONTENT
这样其实我的message就会变成两条,也就是为啥channelRead0会被调用两次,这样的目的也是让我的业务代码可以分辨是否请求已经读完,并且可以在读完以后有一些其他操作空间
可以看到第一次调用channelRead0的msg

第二次就是

netty是java网络框架中非常有名,因为它把各种网络类型,阻塞非阻塞的都做了上层的抽象,非常值得学习,这边就先以一个简单的http server来做下实践,
主体的server代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public static void main(String[] args) throws InterruptedException {

// boss 线程组
EventLoopGroup boss = new NioEventLoopGroup();
// worker 线程组
EventLoopGroup work = new NioEventLoopGroup();
try {
// 服务端启动类
ServerBootstrap bootstrap = new ServerBootstrap();
// 绑定线程组
bootstrap.group(boss, work)
// 设置服务端的tcp连接的最大排队连接数
.option(ChannelOption.SO_BACKLOG, 1024)
// 设置日志处理器
.handler(new LoggingHandler(LogLevel.DEBUG))
// 绑定Channel
.channel(NioServerSocketChannel.class)
// 实际的逻辑handler
.childHandler(new HttpServerInitializer());

// 绑定端口
ChannelFuture f = bootstrap.bind(new InetSocketAddress(8082)).sync();
System.out.println("server start up on port 8080");
f.channel().closeFuture().sync();
} finally {
boss.shutdownGracefully();
work.shutdownGracefully();
}
}

然后来看下 HttpServerInitializer

1
2
3
4
5
6
7
8
9
10
11
12
13
public class HttpServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
// 在这个Channel的pipeline上添加处理器
// 前三个都是netty提供的
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpContentCompressor((CompressionOptions[]) null ));
pipeline.addLast(new HttpServerExpectContinueHandler());
// 这个是我们自己实现的逻辑
pipeline.addLast(new SimpleHttpServerHandler());
}
}

实际的代码也是很简略,我们用一个hello world统一响应所有的请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class SimpleHttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {
private static final byte[] CONTENT = { 'H', 'e', 'l', 'l', 'o', ' ', 'W', 'o', 'r', 'l', 'd' };

@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
// 刷写响应
ctx.flush();
}

@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpObject msg) throws Exception {
// 先是从这个Channel读取到内容
System.out.println("channel read");
if (msg instanceof HttpRequest) {
// 判断是否为HttpRequest,这个其实依赖于前面netty自带的编码器
HttpRequest httpRequest = (HttpRequest) msg;
boolean keepAlive = HttpUtil.isKeepAlive(httpRequest);

// 包装http响应
FullHttpResponse response = new DefaultFullHttpResponse(httpRequest.protocolVersion(), OK, Unpooled.wrappedBuffer(CONTENT));

// 设置请求头
response.headers()
.set(CONTENT_TYPE, TEXT_PLAIN)
.setInt(CONTENT_LENGTH, response.content().readableBytes());

// 设置是否要连接保活
if (keepAlive) {
if (!httpRequest.protocolVersion().isKeepAliveDefault()) {
response.headers().set(CONNECTION, KEEP_ALIVE);
}
} else {
response.headers().set(CONNECTION, CLOSE);
}

// 写入channelFuture
ChannelFuture future = channelHandlerContext.write(response);

// 如果不保活,添加监听器
if (!keepAlive) {
future.addListener(ChannelFutureListener.CLOSE);
}
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}

上面就是这个简单的示例了,只是我在 channelRead0 入口建了个打印,但是这个再接收请求时会打印两次,可以作为个探讨题,我们下次来分析下

最近发现一个小问题,感觉挺有意思,我们来看下代码

1
2
3
4
public static void main(String[] args) {
Integer oneHundredYear = 36500;
System.out.println(oneHundredYear * 24 * 60 * 60 * 1000);
}

定义了个变量,用来表示100年的时间,然后计算出对应的时间戳
首先看下这个值

1
1094004736

如果对时间戳这些没概念的话,可能没发现有什么问题,但是好像也不对,乘以1000了,但是尾数是736
这个暂时我们先不管

1
2
3
4
5
public static void main(String[] args) {
Integer oneHundredYear = 36500;
System.out.println(oneHundredYear * 24 * 60 * 60 * 1000);
System.out.println(System.currentTimeMillis() + oneHundredYear * 24 * 60 * 60 * 1000);
}

然后来看下

1
2
1094004736
1726195843465

然后我们去看下这个时间戳,按正常理解应该是2124年的8月31日

但实际是不对的,只是12天后,跟100年这个差距天差地别了,这是为啥呢,可能大佬一眼就看出来了这是类型转换问题,但是比如说我这么改

1
2
3
4
5
public static void main(String[] args) {
Integer oneHundredYear = 36500;
System.out.println(oneHundredYear * 24 * 60 * 60 * 1000L);
System.out.println(System.currentTimeMillis() + oneHundredYear * 24 * 60 * 60 * 1000L);
}

或者

1
2
3
4
5
public static void main(String[] args) {
Integer oneHundredYear = 36500;
System.out.println(Long.valueOf(oneHundredYear * 24 * 60 * 60 * 1000));
System.out.println(System.currentTimeMillis() + Long.valueOf(oneHundredYear * 24 * 60 * 60 * 1000));
}

会发现还是一样,依旧是错误的
简单来说,就在于类型转换的时机,我们看下这个示例就知道了

1
2
3
4
5
6
7
public static void main(String[] args) {
Integer oneHundredYear = 36500;
System.out.println(oneHundredYear * 24);
System.out.println(oneHundredYear * 24 * 60);
System.out.println(oneHundredYear * 24 * 60 * 60);
System.out.println(oneHundredYear * 24 * 60 * 60 * 1000);
}

结果是这样

1
2
3
4
876000
52560000
-1141367296
1094004736

到乘第二个60的时候已经是负的了,因为已经超过了Integer的范围了
但是为啥用后面的示例转换成long类型还不行呢
这个就在于编译器是怎么做类型转换的
在第一个oneHundredYear跟24相乘的时候是认为两个Integer相乘,并且没有检查范围
只有在乘以显式申明的最后的1000的时候才会做转换
我们看下反编译

可以看到是先做int型的乘法,碰到有参数是long型时才会转类型
那么理论上其实我们只要在第二个60及之前申明long或者强转long就行了,这也是个很基础的问题,只是有时候写代码的时候直觉会以为加了个L就可以了,但实际是没那么简单

很多技术栈在优化过程中都会有更便捷的接入方式,或者接入demo,这次想拿apollo来对比一些例如向量数据库的部署方式,对我说的就是milvus,
apollo如果生产环境部署完全不推荐用这种方式,但是如果为了做个实验,研究下源码还是很方便的,当然前提是有docker环境
首先下载docker-compose配置文件,如果是x86环境就是这个链接,如果是m1这类的就是用这个链接,然后再下载sql文件夹
目录结构差不多是这样

然后在这个目录下执行
sudo docker-compose -f docker-compose-arm64.yml up -d
如果是非m1的话直接用
sudo docker-compose up -d
就好,因为docker-compose默认识别的文件名就是docker-compose.yml
然后看下日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
==== starting service ====
Service logging file is ./service/apollo-service.log
Application is running as root (UID 0). This is considered insecure.
Started [66]
Waiting for config service startup...
Config service started. You may visit http://localhost:8080 for service status now!
Waiting for admin service startup.
Admin service started
==== starting portal ====
Portal logging file is ./portal/apollo-portal.log
Application is running as root (UID 0). This is considered insecure.
Started [211]
Waiting for portal startup...
Portal started. You can visit http://localhost:8070 now!

就表示启动成功了,然后就可以访问后面那个地址 http://localhost:8070 进入控制台,默认用户名apollo,密码是admin
然后我们在应用中想要使用,主要是这个几个配置
第一个就是在resources目录下创建apollo-env.properties
里面是meta server的地址,比如我这边就是

1
dev.meta=http://127.0.0.1:8080

这表示是对应的spring.profiles.activedev的配置
第二步是添加pom依赖

1
2
3
4
5
<dependency>
<groupId>com.ctrip.framework.apollo</groupId>
<artifactId>apollo-client</artifactId>
<version>2.0.1</version>
</dependency>

最后一步是在springboot的启动类添加注解

1
@EnableApolloConfig

然后就可以直接使用@Value注解使用配置的值

请求下这个接口,就可以看到对应的值

然后我们可以在控制台修改下这个值,发布

默认日志也会把这个打印出来

1
2024-08-24 19:38:30.462  INFO 57920 --- [Apollo-Config-1] c.f.a.s.p.AutoUpdateConfigChangeListener : Auto update apollo changed value successfully, new value: apolloDemoValue, key: demo, beanName: demoController, field: com.nicksxs.spbdemo.controller.DemoController.demo

对应的请求也会拿到最新的值

Java在后续版本中添加了虚拟线程,也是类似于php跟go的协程,对应操作系统的线程是在线程基础上模拟了一层子线程的逻辑,因为减少了操作系统的线程上下文切换开销,能够在常规业务场景带了比较大的性能提升,但也并非银弹,不能包治百病
首先安装下jdk 21 版本,需要用 /usr/libexec/java_home 切换下JAVA_HOME
然后在PATH中设置好

1
export JAVA_HOME=$(/usr/libexec/java_home -v 21)

首先是试一下线程版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
long start = System.currentTimeMillis();
ExecutorService executor = Executors.newFixedThreadPool(200);
for (int i = 0; i < 100000; i++) {
executor.submit(() -> {
try {
// 线程睡眠 10ms,可以等同于模拟业务耗时10ms
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException e) {

}
});
}
executor.close();
System.out.printf("totalMillis:%dms\n", System.currentTimeMillis() - start);


耗时5897ms

1
2
3
4
5
6
7
8
9
10
11
12
13
14
long start = System.currentTimeMillis();
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
for (int i = 0; i < 10000; i++) {
executor.submit(() -> {
// 模拟业务处理
try {
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException e) {

}
});
}
executor.close();
System.out.printf("totalMillis:%dms\n", System.currentTimeMillis() - start);


耗时154ms
相对来说还是能快很多的
而核心的虚拟线程实现主要来自于调度

1
2
3
4
5
6
7
8
9
10
11
12
static Thread newVirtualThread(Executor scheduler,
String name,
int characteristics,
Runnable task) {
if (ContinuationSupport.isSupported()) {
return new VirtualThread(scheduler, name, characteristics, task);
} else {
if (scheduler != null)
throw new UnsupportedOperationException();
return new BoundVirtualThread(name, characteristics, task);
}
}

在创建虚拟线程过程中,我们需要去处理调度器,初始时调度器为空

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) {
super(name, characteristics, /*bound*/ false);
Objects.requireNonNull(task);

// choose scheduler if not specified
if (scheduler == null) {
Thread parent = Thread.currentThread();
if (parent instanceof VirtualThread vparent) {
scheduler = vparent.scheduler;
} else {
scheduler = DEFAULT_SCHEDULER;
}
}

this.scheduler = scheduler;
this.cont = new VThreadContinuation(this, task);
this.runContinuation = this::runContinuation;
}

而这个默认调度器就是

1
private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler();

对应创建的就是个ForkJoinPool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private static ForkJoinPool createDefaultScheduler() {
ForkJoinWorkerThreadFactory factory = pool -> {
PrivilegedAction<ForkJoinWorkerThread> pa = () -> new CarrierThread(pool);
return AccessController.doPrivileged(pa);
};
PrivilegedAction<ForkJoinPool> pa = () -> {
int parallelism, maxPoolSize, minRunnable;
String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism");
String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize");
String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable");
if (parallelismValue != null) {
parallelism = Integer.parseInt(parallelismValue);
} else {
parallelism = Runtime.getRuntime().availableProcessors();
}
if (maxPoolSizeValue != null) {
maxPoolSize = Integer.parseInt(maxPoolSizeValue);
parallelism = Integer.min(parallelism, maxPoolSize);
} else {
maxPoolSize = Integer.max(parallelism, 256);
}
if (minRunnableValue != null) {
minRunnable = Integer.parseInt(minRunnableValue);
} else {
minRunnable = Integer.max(parallelism / 2, 1);
}
Thread.UncaughtExceptionHandler handler = (t, e) -> { };
boolean asyncMode = true; // FIFO
return new ForkJoinPool(parallelism, factory, handler, asyncMode,
0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS);
};
return AccessController.doPrivileged(pa);
}

可以看到参数都是通过系统参数获取,或者用系统的cpu数量来决定并行度,主体的逻辑就是既然系统线程开销大,那我就在系统线程内部模拟一个更小颗粒度的,在线程内部进行调度的模型,以此来减少系统切换开销,只不过细节还有很多需要研究,有兴趣的可以留言探讨

0%