Java 线程池系列-实战篇

线程池在实际使用过程中,有时候在理解比较偏理论的时候会出现一些判断错误,这里我们就来看一个实际的案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private static final ThreadPoolExecutor threadPoolExecutor =
new ThreadPoolExecutor(1, 1,
0, TimeUnit.MINUTES,
new MyArrayBlockingQueue<>(2));
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 100; i++) {
Thread.sleep(100);
for (int j = 0; j < 3; j++) {
threadPoolExecutor.execute(() -> {
int a = 0;
});
}
System.out.println("===============> 详情任务 - 任务处理完成");
}
System.out.println("都执行完成了");
}

MyArrayBlockingQueue 是我复制的 ArrayBlockingQueue 加了点日志,可以认为就是一样的,这种情况下
执行过程是怎么样的呢, 队列长度是 2,核心线程数和最大线程数都是 1,提交任务是采用了两层循环,内层是循环三次,往线程池里提交任务,然后内层循环完了以后会重新睡眠 100 毫秒
在进入下一次外层循环,如果能一眼看出来问题的说明对线程池了解得很深入了,如果没有的话我们就一起来看下
先说下结论,这个代码会出现拒绝异常

考虑下是什么原因呢,是不是我线程数太少了,放大一些,感觉符合直觉一点
修改成

1
2
3
4
private static final ThreadPoolExecutor threadPoolExecutor =
new ThreadPoolExecutor(100, 100,
0, TimeUnit.MINUTES,
new MyArrayBlockingQueue<>(2));

然而还是一样

只不过晚了点出现,那么问题出在哪呢
为什么我要去重写这个 MyArrayBlockingQueue,就是为了找到原因,其实很多讲解线程池的都是讲了线程池的参数,什么队列是链表的,数组的
但是没有讲到我是怎么往队列塞任务,怎么从队列取任务的呢

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length) {
return false;
}
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}

这里是往队列里塞任务,注意这里需要获得锁,
而对于获取任务呢

1
2
3
4
5
6
7
8
9
10
11
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}

注意这里也需要获得锁,当我一个线程池的线程数进入稳定状态,也就是保持一定数量的线程不变的情况下
上面是一种比较可能的情况,即核心线程数等于最大线程数,那么我在提交任务的时候是非常快的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}

再来看下这段代码,第一步只需要判断是否为空,第二步就是判断核心线程数量,明显我说的情况,前面两步就直接过去了
然后就是判断线程池运行状态和往队列里塞任务了,但是线程运行完一个任务主动从队列里获取则需要更多的逻辑
这样就造成了我往队列里塞任务会比获取任务快很多,队列一满,就会抛出拒绝异常
即使我把线程数量放大到 100 还是一样,只不过会出现的慢一点,那么口说无凭,我们来验证下,提交任务过快,那么我在提交
方法里做个延迟

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length) {
return false;
}
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
}

这样就没啥问题了

除了最后这个加延时,其他的直接用 ArrayBlockingQueue 就可以实验,实操一下会对这个逻辑有更深的理解