聊聊 dubbo 的线程池

之前没注意到这一块,只是比较模糊的印象 dubbo 自己基于 ThreadPoolExecutor 定义了几个线程池,但是没具体看过,主要是觉得就是为了避免使用 jdk 自带的那几个(java.util.concurrent.Executors),防止出现那些问题
看下代码目录主要是这几个

  • FixedThreadPool:创建一个复用固定个数线程的线程池。
    简单看下代码
    1
    2
    3
    4
    5
    6
    public Executor getExecutor(URL url) {
    String name = url.getParameter("threadname", "Dubbo");
    int threads = url.getParameter("threads", 200);
    int queues = url.getParameter("queues", 0);
    return new ThreadPoolExecutor(threads, threads, 0L, TimeUnit.MILLISECONDS, (BlockingQueue)(queues == 0 ? new SynchronousQueue() : (queues < 0 ? new LinkedBlockingQueue() : new LinkedBlockingQueue(queues))), new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }
    可以看到核心线程数跟最大线程数一致,也就是说就不会在核心线程数和最大线程数之间动态变化了
  • LimitedThreadPool:创建一个线程池,这个线程池中线程个数随着需要量动态增加,但是数量不超过配置的阈值的个数,另外空闲线程不会被回收,会一直存在。
    1
    2
    3
    4
    5
    6
    7
    public Executor getExecutor(URL url) {
    String name = url.getParameter("threadname", "Dubbo");
    int cores = url.getParameter("corethreads", 0);
    int threads = url.getParameter("threads", 200);
    int queues = url.getParameter("queues", 0);
    return new ThreadPoolExecutor(cores, threads, 9223372036854775807L, TimeUnit.MILLISECONDS, (BlockingQueue)(queues == 0 ? new SynchronousQueue() : (queues < 0 ? new LinkedBlockingQueue() : new LinkedBlockingQueue(queues))), new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }
    这个特点主要是创建了保活时间特别长,即可以认为不会被回收了
  • EagerThreadPool :创建一个线程池,这个线程池当所有核心线程都处于忙碌状态时候,创建新的线程来执行新任务,而不是把任务放入线程池阻塞队列。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    public Executor getExecutor(URL url) {
    String name = url.getParameter("threadname", "Dubbo");
    int cores = url.getParameter("corethreads", 0);
    int threads = url.getParameter("threads", 2147483647);
    int queues = url.getParameter("queues", 0);
    int alive = url.getParameter("alive", 60000);
    TaskQueue<Runnable> taskQueue = new TaskQueue(queues <= 0 ? 1 : queues);
    EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores, threads, (long)alive, TimeUnit.MILLISECONDS, taskQueue, new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    taskQueue.setExecutor(executor);
    return executor;
    }
    这个是改动最多的一个了,因为需要实现这个机制,有兴趣的可以详细看下
  • CachedThreadPool: 创建一个自适应线程池,当线程处于空闲1分钟时候,线程会被回收,当有新请求到来时候会创建新线程
    1
    2
    3
    4
    5
    6
    7
    8
    public Executor getExecutor(URL url) {
    String name = url.getParameter("threadname", "Dubbo");
    int cores = url.getParameter("corethreads", 0);
    int threads = url.getParameter("threads", 2147483647);
    int queues = url.getParameter("queues", 0);
    int alive = url.getParameter("alive", 60000);
    return new ThreadPoolExecutor(cores, threads, (long)alive, TimeUnit.MILLISECONDS, (BlockingQueue)(queues == 0 ? new SynchronousQueue() : (queues < 0 ? new LinkedBlockingQueue() : new LinkedBlockingQueue(queues))), new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }
    这里可以看到线程池的配置,核心是 0,最大线程数是 2147483647,保活时间是一分钟
    只是非常简略的介绍下,有兴趣可以自行阅读代码。