JAVA实现动态线程池

  • 时间:2025-11-28 23:06 作者: 来源: 阅读:8
  • 扫一扫,手机访问
摘要:为什么需要动态线程池 在业内,线程池的核心线程数、最大线程数,通常会有一些“最佳实践”。比如对于CPU密集型任务,核心线程数可以设置成CPU核心+1,IO密集型任务,可以设置成核心线程数的2~3倍。 但这些值都是理论上的“最佳”值,实际上最佳值可能与这个相差甚远,可能我们在我们的代码上了生产后,我们才发现线程池设置的参数不合理,那么,为了调整这个线程池参数,我们可能需要重启服务。如果是紧急情况,可

为什么需要动态线程池

在业内,线程池的核心线程数、最大线程数,通常会有一些“最佳实践”。比如对于CPU密集型任务,核心线程数可以设置成CPU核心+1,IO密集型任务,可以设置成核心线程数的2~3倍。

但这些值都是理论上的“最佳”值,实际上最佳值可能与这个相差甚远,可能我们在我们的代码上了生产后,我们才发现线程池设置的参数不合理,那么,为了调整这个线程池参数,我们可能需要重启服务。如果是紧急情况,可能重启可能就来不及了。

这个时候,如果能够不用重启动态的调整线程池的参数,就会很舒服。另外,如果我们有条件,能通过压测计算出线程池的最佳配置的话,那么动态线程池能够节省我们的很多实践。

动态线程池实现概览


为了让大家更清晰,我们总体上介绍动态线程池的组成部分。

我们会定义一个动态线程池管理器DynamicThreadPoolManager,负责注册动态线程池,并在配置中心的配置发生变更时,去更新线程池的参数

动态线程池DynamicThreadPoolExecutor,负责在创建线程的时候,自动的去注册当前线程,并提供更新线程池参数的方法。

ThreadPoolConfigProperties,监听配置中心参数的变更,当监听到变更是,调用DynamicThreadPoolManager实现线程池参数更新

ThreadPoolConfiguration,统一将线程池定义在一个地方,并将线程池注册成Bean(为后续的监控做准备)

以上部分就是动态线程池的主要部分了。此外,为了能更好的调整我们的线程池参数,我们需要监控线程池的状态,因此监控也是不可缺少的一环。

监控的话,主要是俩个类。

1、ThreadPoolMonitor负责发现,并注册线程池的Micrometer指标。

2、ThreadPoolEndpoint,将线程池的状态以端点的形式对外暴露,以便prometheus之类的监控组件可以采集这些指标

OK,在对大框架有了了解后,接下来我们来介绍各个部分的组成部分。

动态线程池组成部分

pom依赖


<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>3.0.13</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
</dependency>

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

<dependency>
  <groupId>com.alibaba.cloud</groupId>
  <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
  <version>2022.0.0.0</version>
</dependency>

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-web</artifactId>
</dependency>

nacos版本:3.1.0

1、线程池配置

线程池的参数配置如下,其中orderService是线程池的名称,剩下的就是线程池的参数。


threadpool:
  configs:
    orderService:
      corePoolSize: 26
      maximumPoolSize: 50
      queueCapacity: 600
      keepAliveTime: 60
      rejectedHandler: CallerRunsPolicy
    userService:
      corePoolSize: 5
      maximumPoolSize: 20
      queueCapacity: 200
      keepAliveTime: 120
      rejectedHandler: AbortPolicy
    emailService:
      corePoolSize: 2
      maximumPoolSize: 8
      queueCapacity: 100
      keepAliveTime: 300
      rejectedHandler: DiscardPolicy

2、动态线程池DynamicThreadPoolExecutor

核心就俩个方法

提供创建线程池的方法,在方法中会自动将当前线程池注册到线程池管理器中提供更新参数的方法

@Slf4j
public class DynamicThreadPoolExecutor extends ThreadPoolExecutor {
    private final String threadPoolName;

    /**
     * 私有构造函数,确保只能通过create方法创建实例
     */
    private DynamicThreadPoolExecutor(String poolName, ThreadPoolConfig config) {
        super(config.getCorePoolSize(),
              config.getMaximumPoolSize(),
              config.getKeepAliveTime(),
              TimeUnit.SECONDS,
              new ResizableCapacityLinkedBlockingQueue<>(config.getQueueCapacity()),
              new NamedThreadFactory(poolName),
              createRejectedHandler(config.getRejectedHandler()));
        
        this.threadPoolName = poolName;
    }

    /**
     * 统一的创建方法 - 核心方法
     */
    public static DynamicThreadPoolExecutor create(String poolName, ThreadPoolConfig config) {
        // 验证配置
        config.validate();

        // 创建实例
        DynamicThreadPoolExecutor executor = new DynamicThreadPoolExecutor(poolName, config);
        
        // 自动注册到管理器
        DynamicThreadPoolManager.registerThreadPool(poolName, executor);

        log.info("Created dynamic thread pool: {} with config: {}", poolName, config);
        return executor;
    }

    /**
     * 快捷创建方法
     */
    public static DynamicThreadPoolExecutor create(String poolName, 
                                                  int corePoolSize, 
                                                  int maximumPoolSize,
                                                  long keepAliveTime,
                                                  int queueCapacity) {
        ThreadPoolConfig config = new ThreadPoolConfig();
        config.setCorePoolSize(corePoolSize);
        config.setMaximumPoolSize(maximumPoolSize);
        config.setKeepAliveTime(keepAliveTime);
        config.setQueueCapacity(queueCapacity);
        
        return create(poolName, config);
    }

    // 其他原有方法保持不变...
    private static RejectedExecutionHandler createRejectedHandler(String handlerType) {
        switch (handlerType) {
            case "CallerRunsPolicy": return new CallerRunsPolicy();
            case "DiscardPolicy": return new DiscardPolicy();
            case "DiscardOldestPolicy": return new DiscardOldestPolicy();
            case "AbortPolicy":
            default: return new AbortPolicy();
        }
    }

    public void updateConfig(ThreadPoolConfig newConfig) {
        newConfig.validate();
        
        setMaximumPoolSize(newConfig.getMaximumPoolSize());
        setCorePoolSize(newConfig.getCorePoolSize());
        setKeepAliveTime(newConfig.getKeepAliveTime(), TimeUnit.SECONDS);
        
        ResizableCapacityLinkedBlockingQueue<Runnable> queue = 
            (ResizableCapacityLinkedBlockingQueue<Runnable>) getQueue();
        queue.setCapacity(newConfig.getQueueCapacity());           
        
        setRejectedExecutionHandler(createRejectedHandler(newConfig.getRejectedHandler()));
        
        log.info("ThreadPool [{}] config updated: core={}, max={}, queue={}, keepAlive={}s", 
                 threadPoolName, newConfig.getCorePoolSize(), newConfig.getMaximumPoolSize(),
                 newConfig.getQueueCapacity(), newConfig.getKeepAliveTime());
    }
}

3、动态线程池管理器DynamicThreadPoolManager

提供注册线程池的方法提供更新线程池配置的方法(底层还是委托打的某个线程池去更新)

@Slf4j
public class DynamicThreadPoolManager {
    
    private final static Map<String, DynamicThreadPoolExecutor> threadPoolMap = new ConcurrentHashMap<>();

    /**
     * 注册线程池
     */
    public static void registerThreadPool(String poolName, DynamicThreadPoolExecutor executor) {
        threadPoolMap.put(poolName, executor);
        
        log.info("Registered thread pool: {}", poolName);
    }

    /**
     * 获取线程池
     */
    public DynamicThreadPoolExecutor getThreadPool(String poolName) {
        DynamicThreadPoolExecutor executor = threadPoolMap.get(poolName);
        if (executor == null) {
            throw new IllegalArgumentException("Thread pool not found: " + poolName);
        }
        return executor;
    }

    /**
     * 更新线程池配置
     */
    public static synchronized void updateThreadPoolConfig(String poolName, ThreadPoolConfig newConfig) {
        DynamicThreadPoolExecutor executor = threadPoolMap.get(poolName);
        if (executor == null) {
            return;
        }
        executor.updateConfig(newConfig);
    }

    /**
     * 批量更新所有线程池配置
     */
    public static void updateAllThreadPoolConfigs(Map<String, ThreadPoolConfig> newConfigs) {
        for (Map.Entry<String, ThreadPoolConfig> entry : newConfigs.entrySet()) {
            updateThreadPoolConfig(entry.getKey(), entry.getValue());
        }
        log.info("Updated {} thread pool configs", newConfigs.size());
    }

    /**
     * 获取所有线程池名称
     */
    public Set<String> getAllThreadPoolNames() {
        return new HashSet<>(threadPoolMap.keySet());
    }
}

4、读取配置中心配置ThreadPoolConfigProperties

监听配置中心配置发生修改时,变更线程池的参数


@Slf4j
@Component
@RefreshScope
@ConfigurationProperties(prefix = "threadpool")
@Data
public class ThreadPoolConfigProperties {

    private Map<String, ThreadPoolConfig> configs = new HashMap<>();

    /**
     * 监听刷新事件
     */
    @EventListener
    public void handleRefreshEvent(RefreshEvent event) {
        log.info("RefreshEvent received, updating thread pools");
        // 强制更新线程池配置
        DynamicThreadPoolManager.updateAllThreadPoolConfigs(this.configs);
    }

    public ThreadPoolConfig getConfig(String threadPoolName) {
        return this.configs.get(threadPoolName);
    }
}

@Data
public class ThreadPoolConfig {
    private int corePoolSize = 5;
    private int maximumPoolSize = 10;
    private int queueCapacity = 100;
    private long keepAliveTime = 60L;
    private String rejectedHandler = "AbortPolicy";
    
    // 验证配置合法性
    public void validate() {
        if (corePoolSize < 0) {
            throw new IllegalArgumentException("corePoolSize must be >= 0");
        }
        if (maximumPoolSize <= 0) {
            throw new IllegalArgumentException("maximumPoolSize must be > 0");
        }
        if (corePoolSize > maximumPoolSize) {
            throw new IllegalArgumentException("corePoolSize must be <= maximumPoolSize");
        }
        if (queueCapacity <= 0) {
            throw new IllegalArgumentException("queueCapacity must be > 0");
        }
        if (keepAliveTime < 0) {
            throw new IllegalArgumentException("keepAliveTime must be >= 0");
        }
    }
}

5、创建线程池


@Configuration
public class ThreadPoolConfiguration {
    
    @Autowired
    private ThreadPoolProperties threadPoolProperties;

    @Resource
    private ThreadPoolConfigProperties threadPoolConfigProperties;
    
    /**
     * 订单服务线程池 - 通过配置创建
     */
    @Bean
    @ConditionalOnProperty(prefix = "threadpool.configs.orderService", name = "corePoolSize")
    public DynamicThreadPoolExecutor orderService() {
        ThreadPoolConfig config = threadPoolConfigProperties.getConfig("orderService");
        return DynamicThreadPoolExecutor.create("orderService", config);
    }
    
    /**
     * 用户服务线程池 - 使用参数创建
     */
    @Bean("userThreadPool")
    public DynamicThreadPoolExecutor userThreadPool() {
        return DynamicThreadPoolExecutor.create("userThreadPool", 3, 10, 1000, 100);
    }
    
    /**
     * 邮件服务线程池
     */
    @Bean("emailThreadPool")
    public DynamicThreadPoolExecutor emailThreadPool() {
        ThreadPoolConfig config = new ThreadPoolConfig();
        config.setCorePoolSize(2);
        config.setMaximumPoolSize(5);
        config.setQueueCapacity(50);
        config.setKeepAliveTime(120L);
        return DynamicThreadPoolExecutor.create("emailThreadPool", config);
    }
}

动态线程池部分看完,我们看下线程池的监控部分。

线程池监控部分

1、 ThreadPoolMonitor

发现Spring容器中所有的线程池Bean,并为这些线程池注册监控指标


@Component
public class ThreadPoolMonitor {
    
    private final MeterRegistry meterRegistry;
    private final ApplicationContext applicationContext;
    
    // 存储所有线程池的引用
    private final Map<String, ThreadPoolExecutor> threadPools = new ConcurrentHashMap<>();
    
    public ThreadPoolMonitor(MeterRegistry meterRegistry,
                             ApplicationContext applicationContext) {
        this.meterRegistry = meterRegistry;
        this.applicationContext = applicationContext;
        initializeMonitoring();
    }
    
    private void initializeMonitoring() {
        // 1. 自动发现所有线程池
        discoverThreadPools();
        
        // 2. 为监控系统注册Micrometer指标
        registerMicrometerMetrics();
    }
    
    private void discoverThreadPools() {
        // 发现各种类型的线程池...
        Map<String, ThreadPoolExecutor> executors = 
            applicationContext.getBeansOfType(ThreadPoolExecutor.class);
        threadPools.putAll(executors);
        
        // 还可以发现其他类型的线程池...
    }
    
    private void registerMicrometerMetrics() {
        threadPools.forEach((name, executor) -> {
            Tags tags = Tags.of("pool.name", name);
            
            // 注册各种监控指标
            Gauge.builder("custom.threadpool.core.size", executor, ThreadPoolExecutor::getCorePoolSize)
                    .tags(tags)
                    .description("核心线程数")
                    .register(meterRegistry);

            Gauge.builder("custom.threadpool.max.size", executor, ThreadPoolExecutor::getMaximumPoolSize)
                    .tags(tags)
                    .description("最大线程数")
                    .register(meterRegistry);

            Gauge.builder("custom.threadpool.active.count", executor, ThreadPoolExecutor::getActiveCount)
                    .tags(tags)
                    .description("活跃线程数")
                    .register(meterRegistry);

            Gauge.builder("custom.threadpool.pool.size", executor, ThreadPoolExecutor::getPoolSize)
                    .tags(tags)
                    .description("当前线程数")
                    .register(meterRegistry);

            Gauge.builder("custom.threadpool.queue.size", executor,
                            e -> e.getQueue().size())
                    .tags(tags)
                    .description("队列大小")
                    .register(meterRegistry);

            Gauge.builder("custom.threadpool.completed.tasks", executor,
                            ThreadPoolExecutor::getCompletedTaskCount)
                    .tags(tags)
                    .description("已完成任务数")
                    .register(meterRegistry);

            System.out.println("已注册线程池监控: " + name);
        });
    }
    
    /**
     * 为自定义端点提供完整状态数据
     */
    public Map<String, ThreadPoolStatus> getThreadPoolStatus() {
        Map<String, ThreadPoolStatus> statusMap = new HashMap<>();
        threadPools.forEach((name, executor) -> {
            statusMap.put(name, buildThreadPoolStatus(executor));
        });
        return statusMap;
    }
    
    private ThreadPoolStatus buildThreadPoolStatus(ThreadPoolExecutor executor) {
        return ThreadPoolStatus.builder()
            .corePoolSize(executor.getCorePoolSize())
            .maximumPoolSize(executor.getMaximumPoolSize())
            .poolSize(executor.getPoolSize())
            .activeCount(executor.getActiveCount())
            .largestPoolSize(executor.getLargestPoolSize())
            .taskCount(executor.getTaskCount())
            .completedTaskCount(executor.getCompletedTaskCount())
            .queueSize(executor.getQueue().size())
            .queueRemainingCapacity(executor.getQueue().remainingCapacity())
            .utilization(calculateUtilization(executor))
            .build();
    }
    
    private double calculateUtilization(ThreadPoolExecutor executor) {
        return executor.getMaximumPoolSize() > 0 ? 
            (double) executor.getActiveCount() / executor.getMaximumPoolSize() : 0;
    }
}

2、ThreadPoolEndpoint

对外暴露线程池的监控指标


@Component
@Endpoint(id = "threadpools")
public class ThreadPoolEndpoint {
    
    private final ThreadPoolMonitor monitor;
    
    public ThreadPoolEndpoint(ThreadPoolMonitor monitor) {
        this.monitor = monitor;
    }

    @Resource
    private ThreadPoolConfigProperties threadPoolConfigProperties;

    @ReadOperation
    public Map<String, Object> getThreadPools() {
        Map<String, Object> result = new HashMap<>();
        result.put("timestamp", Instant.now().toString());
        result.put("threadPools", monitor.getThreadPoolStatus());
        return result;
    }
    
    @ReadOperation
    public ThreadPoolStatus getThreadPool(@Selector String poolName) {
        // 返回单个线程池的详细状态
        return monitor.getThreadPoolStatus().get(poolName);
    }
}

请求localhost:8888/actuator/threadpools可以看到对外暴露的线程池状态。

最后还有一些可优化的点:

配置文件中,拒绝策略采用的是字符串,容易拼错,其实可以改成用枚举会更方便监控可以完善一下,比如监控任务的平均执行时间、任务的等待时间。动态线程池完善,目前线程池是不支持上下文传递的,可以优化目前只要是配置文件变更,都会被监听到。我们可以只针对线程池配置文件的变更做处理…等等

这篇的监控还是比较简单,企业级的应该要接入prometheus+grafana。

上面对于阻塞队列的可变性其实我并没有细说,因为对于LinkedBlockingQueue,capacity定义成final了,也就是说是不可变的,因此如果我们还需要自己实现一个可变的队列。

这俩个后面有空我在写俩篇文章介绍。

此外,如果有其他问题,也欢迎与我沟通。

  • 全部评论(0)
手机二维码手机访问领取大礼包
返回顶部