多线程 采用spring线程池ThreadPoolTaskExecutor提高程序处理能力 笔记
场景:
MQ批量推送消息过来,逐条订阅,逐条 一开始程序还行,处理能力还可以,因为数据不是很多,涉及的程序处理过程虽然多,但是勉强撑过去。
但是随着业务增长数据由原来的每分钟10条数据,增加到了100条数据,由于之前程序处理能力有限,导致mq上出现消息堆积,越堆越多。
解决方案A:
因为是分布式系统,多部署几个消费者,解决问题。
解决方案B:
采用多线程处理。只要服务器资源够,那么久可以提高生产效率,开启10个线程。那么相当于之前的一个程序处理,变成了10个程序处理,效率提高10倍。(其他因素不考虑的情况下,比如数据库连接数,CPU,内存消耗等)。
经搜,发现采用spring的线程池的居多,因为简单,采用jdk原生线程池的,用得不好的情况下问题还挺多。。
经测,采用了spring线程池解决问题。
例子:
新建一个配置文件,指定固定线程数,以及其他连个参数即可。
配置类 TaskPoolConfig
这里指定两个业务类型线程池。
ThreadPoolTaskExecutor 里面有很多参数,但大多有默认值,可不用设置,只需设置我们自己的线程池大小即可。
import java.util.concurrent.Executor; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; @Configuration
@EnableAsync
public class TaskPoolConfig {
/**msg线程池 * * @return */ @Bean("msgExecutor") public Executor msgExecutor(){ ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor(); threadPoolTaskExecutor.setCorePoolSize(5); threadPoolTaskExecutor.setThreadNamePrefix(" msgExecutor=="); threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true); threadPoolTaskExecutor.setAwaitTerminationSeconds(600); return threadPoolTaskExecutor; } /**money线程池 * * @return */ @Bean("moneyExecutor") public Executor moneyExecutor(){ ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor(); threadPoolTaskExecutor.setCorePoolSize(5); threadPoolTaskExecutor.setThreadNamePrefix(" moneyExecutor=="); threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true); threadPoolTaskExecutor.setAwaitTerminationSeconds(600); return threadPoolTaskExecutor; } }
业务处理类 AsyncService.java
import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import lombok.extern.slf4j.Slf4j; @Slf4j @Service public class AsyncService { @Async("msgExecutor") public void msgExecutor(String num) { log.info("msgExecutor ======"+num+"======="); try { Thread.sleep(3000); } catch (Exception e) { e.printStackTrace(); } log.info("msgExecutor ======"+num+"======="); } @Async("moneyExecutor") public void moneyExecutor(String num) { log.info("moneyExecutor ======"+num+"======="); try { Thread.sleep(1000); } catch (Exception e) { e.printStackTrace(); } log.info("moneyExecutor ======"+num+"======="); } }
测试类:(注意,调用类与实现业务类必须是两个分开的类)
import lombok.extern.slf4j.Slf4j; @Slf4j public class TaskPoolTest extends BaseTest { @Autowired private AsyncService asyncService; @Test public void showTest() throws Exception { for (int i=0;i<10;i++) { asyncService.moneyExecutor("" + i); } System.out.println("other==============="); Thread.sleep(10000); for (int i=100;i<110;i++) { asyncService.msgExecutor("" + i); } } }
执行结果:
结果日志中可以看到,执行的程序最终执行完成后 。虽然执行test已完成,都执行了shuting down 了,但程序依旧会等待到所有线程执行完结后才终止。
只是得益于这两个句:
threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true); threadPoolTaskExecutor.setAwaitTerminationSeconds(600);
结论:spring提供的线程池简单好用,提供服务利用率。在多地方可以考虑使用。