如何解决线程池任务拒绝异常(RejectedExecutionException)
在开发过程中,我们常常会遇到各种意想不到的问题。这不,测试小哥急匆匆地找上门来,告知在使用ThreadPoolExecutor
执行异步任务时,测试环境出现了报错:
java.util.concurrent.RejectedExecutionException: task java.util.concurrent.FutureTask@1e19e316 rejected from java.util.concurrent.ThreadPoolExecutor@647b9364[running, pool size = 12, active threads = 12, queued tasks = 32, completed tasks = 44]
奇怪的是,这段代码在本地运行一切正常,可一到测试环境就抛出RejectedExecutionException
异常,这究竟是怎么回事呢?
让我们深入代码内部寻找答案:
List<List<String>> partitionedIds = Lists.partition(externalUserIds, 100); List<CompletableFuture<List<ExternalUserRecord>>> futureList = partitionedIds.stream() .map(batch -> CompletableFuture.supplyAsync( () -> externalUserRecordService.batchGetExternalUserRecord(batch), executor)) .collect(Collectors.toList());
经过分析,发现问题的关键在于:externalUserIds
数据会按照100个一组进行分批处理。比如说,如果externalUserIds
包含5000条数据,就会被拆分成50组。而每一组数据都会提交一个异步任务,这就导致可能同时有50个任务提交到线程池。
要知道,线程池的队列默认最大容量是32。一旦提交的任务数量超过这个队列容量,就会抛出RejectedExecutionException
异常。
那为什么本地运行正常,测试环境却出错呢?原因其实很简单,本地的数据量比较小,假设externalUserIds
只有300条数据,分批后仅仅产生3组任务,远未达到队列的最大容量。而测试环境的数据量可能非常大,externalUserIds
可能包含上万条数据,产生的任务数量远远超过了32,这就触发了异常。
找到了问题根源,接下来看看都有哪些解决方案。
方案1:扩大队列容量(当前已采用方案)
在ThreadPoolExecutor
的配置中,将queueCapacity
从32提升到10000,具体代码如下:
private static final int QUEUE_CAPACITY = 10000; @Bean public ThreadPoolTaskExecutor threadPoolTaskExecutor() { ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor(); threadPoolTaskExecutor.setCorePoolSize(CORE_POOL_SIZE); threadPoolTaskExecutor.setMaxPoolSize(MAX_POOL_SIZE); threadPoolTaskExecutor.setKeepAliveSeconds(KEEP_ALIVE_SECONDS); threadPoolTaskExecutor.setQueueCapacity(QUEUE_CAPACITY); threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); threadPoolTaskExecutor.setThreadFactory(new CustomizableThreadFactory("excellent-mall-pool-thread-")); threadPoolTaskExecutor.initialize(); return threadPoolTaskExecutor; }
这种方案的优点是能够确保所有任务都不会被拒绝,都能得到执行。但缺点也很明显,大量任务排队等待执行,可能会导致任务等待时间过长。
方案2:限制并发任务数量
借助`Semaphore`来控制同时执行的任务数量,防止一次性提交过多任务,代码实现如下:
Semaphore semaphore = new Semaphore(10); List<CompletableFuture<List<ExternalUserRecord>>> futureList = partitionedIds.stream() .map(batch -> CompletableFuture.supplyAsync(() -> { try { semaphore.acquire(); return externalUserRecordService.batchGetExternalUserRecord(batch); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return Collections.emptyList(); } finally { semaphore.release(); } }, executor)) .collect(Collectors.toList());
该方案的优势在于能够有效控制最大并发任务数,避免线程池因任务过多而超负荷运行。不过,如果任务总量较多,这种限制可能会对整体的执行速度产生一定影响。
方案3:分批有序执行任务
摒弃一次性提交所有任务的方式,改为按批次执行,等待前一批任务执行完毕后,再提交下一批任务,代码如下:
for (List<String> batch : partitionedIds) { List<CompletableFuture<List<ExternalUserRecord>>> futureList = batch.stream() .map(ids -> CompletableFuture.supplyAsync( () -> externalUserRecordService.batchGetExternalUserRecord(ids), executor)) .collect(Collectors.toList()); // 等待本批任务执行完再提交下一批 futureList.forEach(CompletableFuture::join); }
这种方案的好处是不会让线程池瞬间承受大量任务的压力,有效降低了线程池的负担。但由于需要依次等待每一批任务完成,所以任务整体的执行时间可能会稍有延长。
总结
总结一下,本次出现线程池任务拒绝异常的根本原因是线程池队列容量不足,导致部分任务被拒绝。目前最直接的解决办法是增大queueCapacity
,不过从长远来看,更推荐使用Semaphore
限制并发任务数,或者采用按批次执行任务的方式,避免瞬间提交大量任务。
在实际项目中,我们可以根据具体的业务需求,权衡这三种方案的利弊,选择最合适的方案,以此保障系统的稳定性和执行效率。
今天的分享就先到这里,还有个查询耗时8s的页面等着我去优化,这又是一个值得深入研究的问题,先留个悬念,后续再和大家分享。