线程池的使用


什么时候需要用线程池?

答:线程的创建比较昂贵(创建线程需要系统资源,频繁创建和销毁消耗大量时间,导致性能问题);短平快的任务(接收大量小任务)进行分发处理使用线程池而不是一个任务对应一个新线程。

线程池的使用需要注意哪些?

1. 需要手动声明线程池

《阿里巴巴Java开发手册》中讲,禁止使用快捷的工具方法创建线程池,而应手动new ThreadPoolExecutor来创建线程池。

注意:newFixedThreadPool和newCachedThreadPool,可能因为资源耗尽导致OOM问题。

2.注意线程池几个关键参数,创建合适的线程池

1. corePoolSize:线程池核心线程大小
2. maximumPoolSize: 线程池最大线程数量
3. keepAliveTime: 空闲线程存活时间
4. unit:空闲线程存活时间单位
5. workQueue: 工作队列 (1.ArrayBlocking 2.LinkedBlockingQueue 3.SynchronizedQueue 4.PriorityBlockingQueue)
6. ThreadFactory:线程工厂
7. Handler 拒绝策略

3. 熟悉线程池默认工作行为,为线程池设置合适的初始化参数

线程池默认工作行为:
1. 线程池不会初始化corePoolSize个线程,有任务来了才创建工作线程; 2. 当核心线程满了之后不会立即扩容线程池,而是把任务堆积到工作队列中; 3. 当工作队列满了后扩容线程池,一直到线程个数达到maximumPoolSize为止; 4. 如果队列已满且达到了最大线程后还有任务进来,按照拒绝策略处理; 5. 当线程数大于核心线程数时,线程等待keepAliveTime后还是没有任务需要处理的话,收缩线程到核心线程数。

4. 根据实际容量规划需求,通过一些手段改变默认工作行为

1. 声明线程池后立即调用prestartAllCoreThreads方法,来启动所有核心线程池;
2. 传入true给allCoreThreadTimeOut方法,来让线程池在空闲的时候同样回收核心线程。
3. 方法: 
- Java线程池先用工作队列存放来不及处理的任务,满了后再扩容。
- 当工作队列设置很大时,最大线程数这个参数显得无意义,可以优先开启更多线程,而把队列当成一个后备方案。
4. 思路:
- 重写队列offer方法,造成这个队列已满的假象;
- 这样达到最大线程后会触发拒绝策略,实现一个自定义的拒绝策略处理程序;
- 再把任务真正插入队列。Tomcat线程池也实现了类似的效果()。

 5. 注意线程池的混用

要根据任务的‘轻重缓急’来指定线程池的核心参数,包括线程数、回收策略和任务队列
- 对于执行比较慢、数量不大的IO任务,或许要考虑更多的线程数,而不需要太大的队列
- 对于吞吐量较大的计算型任务,线程数量不宜过多,可以是CPU核数*2(CPU 密集型任务,过多的线程只会增加线程切换的开销),但可能需要较长的队列来做缓冲。
- Java 8 的 parallel stream 功能,可以让我们很方便地并行处理集合中的元素,其背后是共享同一个 ForkJoinPool,默认并行度是 CPU 核数 -1- 对于 CPU 绑定的任务来说,使用这样的配置比较合适,但如果集合操作涉及同步 IO 操作的话(比如数据库操作、外部服务调用等),建议自定义一个 ForkJoinPool(或普通线程池)。

6. 确保线程池是在复用的

- 使用线程池目的是复用,每次new一个线程池会很糟糕;
- 如果从类库来获取一个线程池(标准/自定义类库),请务必查看源码,以确认线程实例化方式和配置符合预期。
- 复用线程池,要根据任务的性质来选用不同的线程池;
- IO绑定任务与CPU绑定任务,如果希望减少任务间干扰,需要隔离线程池。

示例: 

//案例:线上遇到的一个事故:当天直播,我们调用一个外部服务去发送短信,发送短信接口正常时可以在100ms内响应,TPS100的发送量,CachedThreadPool能稳定在占用10个左右线程的情况下满足需求。
//在某个时间段,外部短信服务不可用了(貌似是短信平台调整敏感词),我们调用这个服务的超时又特别长,比如1分钟,直播前5分钟要发送短信,而1分钟产生 6000 个发送短信的任务,需要6000个线程,
//没多久因为无法创建线程导致了OOM,整个应用程序崩溃。
//我们知道线程是需要分配一定的内存空间作为线程栈的,比如1MB,因此无限制创建线程必然会导致OOM
//从源码中可以看到,newCachedThreadPool这种线程池的最大线程数是Integer.MAX_VALUE,可以认为是无上限的,而其工作队列SynchronousQueue是一个没有存储空间的阻塞队列
//这就以为着,只要有请求到来,就必须找到一条工作线程来处理,如果当前没有空闲的线程就再创建一条新的。
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
              60L, TimeUnit.SECONDS,
              new SynchronousQueue());
}
//这里的核心线程数是0,keepAliveTime是60秒,在60秒之后所有的线程都是可以回收的。
//所以这就会出现如果业务操作并发量较大,可能一下子开启几千个线程而又不会撑爆内存。下边案例就是。

//案例:某项目生产环境时不时有报警提示线程数过多,超过2000个,收到报警后查看监控发现,瞬时线程数比较多但过一会儿又降下来,线程数抖动很力换,而应用的访问量变化不大。
//wrong:每次都创建一个线程池
class ThreadPoolHelper {
    public static ThreadPoolExecutor getThreadPool() {
        //线程池没有复用
        return (ThreadPoolExecutor) Executors.newCachedThreadPool();
    }
}
//right:使用静态字段来存放线程池的引用
class ThreadPoolHelper {
    private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 50,
            2, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(1000),
            new ThreadFactoryBuilder().setNameFormat("demo-threadpool-%d").get());

    public static ThreadPoolExecutor getRightThreadPool() {
        return threadPoolExecutor;
    }
}

监控

//简单的打印线程池基本内部信息,包括线程数、活跃线程数、完成了多少任务,以及队列中还有多少积压任务等信息:
private void printStats(ThreadPoolExecutor threadPool) {
   Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
        log.info("=========================");
        log.info("Pool Size: {}", threadPool.getPoolSize());
        log.info("Active Threads: {}", threadPool.getActiveCount());
        log.info("Number of Tasks Completed: {}", threadPool.getCompletedTaskCount());
        log.info("Number of Tasks in Queue: {}", threadPool.getQueue().size());

        log.info("=========================");
    }, 0, 1, TimeUnit.SECONDS);
}

相关