体验下Java 21的虚拟线程-协程

Java在后续版本中添加了虚拟线程,也是类似于php跟go的协程,对应操作系统的线程是在线程基础上模拟了一层子线程的逻辑,因为减少了操作系统的线程上下文切换开销,能够在常规业务场景带了比较大的性能提升,但也并非银弹,不能包治百病
首先安装下jdk 21 版本,需要用 /usr/libexec/java_home 切换下JAVA_HOME
然后在PATH中设置好

1
export JAVA_HOME=$(/usr/libexec/java_home -v 21)

首先是试一下线程版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
long start = System.currentTimeMillis();
ExecutorService executor = Executors.newFixedThreadPool(200);
for (int i = 0; i < 100000; i++) {
executor.submit(() -> {
try {
// 线程睡眠 10ms,可以等同于模拟业务耗时10ms
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException e) {

}
});
}
executor.close();
System.out.printf("totalMillis:%dms\n", System.currentTimeMillis() - start);


耗时5897ms

1
2
3
4
5
6
7
8
9
10
11
12
13
14
long start = System.currentTimeMillis();
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
for (int i = 0; i < 10000; i++) {
executor.submit(() -> {
// 模拟业务处理
try {
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException e) {

}
});
}
executor.close();
System.out.printf("totalMillis:%dms\n", System.currentTimeMillis() - start);


耗时154ms
相对来说还是能快很多的
而核心的虚拟线程实现主要来自于调度

1
2
3
4
5
6
7
8
9
10
11
12
static Thread newVirtualThread(Executor scheduler,
String name,
int characteristics,
Runnable task) {
if (ContinuationSupport.isSupported()) {
return new VirtualThread(scheduler, name, characteristics, task);
} else {
if (scheduler != null)
throw new UnsupportedOperationException();
return new BoundVirtualThread(name, characteristics, task);
}
}

在创建虚拟线程过程中,我们需要去处理调度器,初始时调度器为空

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) {
super(name, characteristics, /*bound*/ false);
Objects.requireNonNull(task);

// choose scheduler if not specified
if (scheduler == null) {
Thread parent = Thread.currentThread();
if (parent instanceof VirtualThread vparent) {
scheduler = vparent.scheduler;
} else {
scheduler = DEFAULT_SCHEDULER;
}
}

this.scheduler = scheduler;
this.cont = new VThreadContinuation(this, task);
this.runContinuation = this::runContinuation;
}

而这个默认调度器就是

1
private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler();

对应创建的就是个ForkJoinPool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private static ForkJoinPool createDefaultScheduler() {
ForkJoinWorkerThreadFactory factory = pool -> {
PrivilegedAction<ForkJoinWorkerThread> pa = () -> new CarrierThread(pool);
return AccessController.doPrivileged(pa);
};
PrivilegedAction<ForkJoinPool> pa = () -> {
int parallelism, maxPoolSize, minRunnable;
String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism");
String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize");
String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable");
if (parallelismValue != null) {
parallelism = Integer.parseInt(parallelismValue);
} else {
parallelism = Runtime.getRuntime().availableProcessors();
}
if (maxPoolSizeValue != null) {
maxPoolSize = Integer.parseInt(maxPoolSizeValue);
parallelism = Integer.min(parallelism, maxPoolSize);
} else {
maxPoolSize = Integer.max(parallelism, 256);
}
if (minRunnableValue != null) {
minRunnable = Integer.parseInt(minRunnableValue);
} else {
minRunnable = Integer.max(parallelism / 2, 1);
}
Thread.UncaughtExceptionHandler handler = (t, e) -> { };
boolean asyncMode = true; // FIFO
return new ForkJoinPool(parallelism, factory, handler, asyncMode,
0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS);
};
return AccessController.doPrivileged(pa);
}

可以看到参数都是通过系统参数获取,或者用系统的cpu数量来决定并行度,主体的逻辑就是既然系统线程开销大,那我就在系统线程内部模拟一个更小颗粒度的,在线程内部进行调度的模型,以此来减少系统切换开销,只不过细节还有很多需要研究,有兴趣的可以留言探讨