统一管理项目的线程池

本贴最后更新于 446 天前,其中的信息可能已经时异事殊

一、问题描述

频繁的创建、销毁线程和线程池,会给系统带来额外的开销。未经池化及统一管理的线程,则会导致系统内线程数上限不可控。

例如如下代码,每次发送邮件都会创建一个新的线程池,并且业务结束之后线程池也未随之销毁

public static boolean sendMail(MailInfo mailInfo, MailServerInfo mailServerInfo) {
    try {
        ExecutorService executorService = Executors.newCachedThreadPool();
        Future<Boolean> future = executorService.submit(() -> {
            try {
                return asyncSendEmail(mailInfo, mailServerInfo);
            } catch (Exception e) {
                return false;
            }
        });
        return future.get(10, TimeUnit.SECONDS);
    } catch (Exception e) {
        LOG.error(e.getMessage(), e);
        return false;
    }
}

这种情况下,随着访问数增加,系统内线程数持续增长,CPU 负载逐步提高。极端情况下,甚至可能会导致 CPU 资源被吃满,整个服务不可用。

为了解决上述问题,可增加统一线程池配置,替换掉自建线程和线程池。

二、自建线程池

在 ThreadPoolConfig 中,创建统一线程池配置类,并交由 Spring 容器管理

@Configuration
@EnableAsync
public class ThreadPoolConfig implements AsyncConfigurer, SecureInvokeConfigurer {
    /**
     * 项目共用线程池
     */
    public static final String CHATTERHUB_EXECUTOR = "chatterHubExecutor";
    /**
     * websocket通信线程池
     */
    public static final String WS_EXECUTOR = "websocketExecutor";


    // GPT服务线程池
    public static final String AICHAT_EXECUTOR = "aichatExecutor";


    @Bean(CHATTERHUB_EXECUTOR)
    @Primary
    /**
     * @Primary是一个注解(Annotation),通常用于Java Spring框架中的Bean定义。它的作用是指定一个候选的Bean,
     * 当存在多个相同类型的Bean时,Spring容器会优先选择被@Primary注解标记的Bean作为注入的目标对象。
     */
    public ThreadPoolTaskExecutor chatterHubExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(10);
        executor.setQueueCapacity(200);
        executor.setThreadNamePrefix("chatterHub-executor-");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //满了调用线程执行,认为重要任务
        executor.setThreadFactory(new MyThreadFactory(executor));
        executor.initialize();

        return executor;
    }

.......

这里面做了两件事,创建一个统一线程池,并且还通过实现 AsyncConfigurer 设置了 @async 注解也使用我们的统一线程池,这样方便统一管理。

我们的线程池没有用 Excutors 快速创建。是因为 Excutors 创建的线程池用的无界队列,有 oom 的风险(小考点****)

executor.setThreadNamePrefix("chattethub-executor-")设置线程前缀,这样排查 cpu 占用,死锁问题或者其他 bug 的时候根据线程名,可以比较容易看出是业务问题还是底层框架问题。

三、优雅停机

当项目关闭的时候,需要通过 jvm 的 shutdownHook 回调线程池,等队列里任务执行完再停机。保证任务不丢失。

shutdownHook 会回调 spring 容器,所以我们实现 spring 的 DisposableBean 的 destroy 方法也可以达到一样的效果,在里面调用 executor.shutdown()并等待线程池执行完毕。

由于我们用的就是 spring 管理的线程池

image

连优雅停机的事,都可以直接交给 spring 自己来管理了,非常方便。

内部源码,点进去可以看见。

@Override
public void destroy() {
    shutdown();
}

/**
 * Perform a shutdown on the underlying ExecutorService.
 * @see java.util.concurrent.ExecutorService#shutdown()
 * @see java.util.concurrent.ExecutorService#shutdownNow()
 */
public void shutdown() {
    if (logger.isDebugEnabled()) {
        logger.debug("Shutting down ExecutorService" + (this.beanName != null ? " '" + this.beanName + "'" : ""));
    }
    if (this.executor != null) {
        if (this.waitForTasksToCompleteOnShutdown) {
            this.executor.shutdown();
        }
        else {
            for (Runnable remainingTask : this.executor.shutdownNow()) {
                cancelRemainingTask(remainingTask);
            }
        }
        awaitTerminationIfNecessary(this.executor);
    }
}

四、线程池使用

我们放进容器的线程池设置了 beanName。

image

业务需要用,也可以根据 beanName 取出想用的线程池。

image

也可以直接在方法上面加异步注解 @Async

image

五、异常捕获

搭建我们的项目的线程池,千万别忘了一点,就是线程运行抛异常了,要怎么处理。昨天一位哥们在群里问到这个问题,才发现很容易被人忽略,我们来看一看。

public static void main(String[] args) {
    Thread thread =new Thread(()->{
        log.info("111");
        throw new RuntimeException("运行时异常了");
    });
    thread.start();
}

看看这样一个语句,子线程执行报错,会打印错误日志吗

image

结果是这样的,异常并不会打印日志,只会在控制台输出。为啥呢?

如果出了问题,却不打印 error 日志,那问题就被隐藏了,非常危险

想要搞明白这个,首先要明白子线程的异常抛到哪里去了?

异常的去处

传统模式下,我们一般会通过 try catch 的方法去捕获线程的异常,并且打印到日志中。

Thread thread =new Thread(()->{
    try{
        log.info("111");
        throw new RuntimeException("运行时异常了");
    }catch (Exception e){
        log.error("异常发生",e);
    }
});
thread.start();

image

你会发现一个有意思的现象,当我们捕获了异常,就没有控制台的告警了,全都是日志打印。

其实,如果一个异常未被捕获,从线程中抛了出来。JVM 会回调一个方法dispatchUncaughtException

 /**
 * Dispatch an uncaught exception to the handler. This method is
 * intended to be called only by the JVM.
 */
private void dispatchUncaughtException(Throwable e) {
    getUncaughtExceptionHandler().uncaughtException(this, e);
}

这个方法在Thread​类中,会进行默认的异常处理,其实就是获取一个默认的异常处理器。默认的异常处理器是

ThreadGroup 实现的异常捕获方法。前面看到的控制台ERR打印​,就出自这里。

image

如何捕获线程异常

我们要做的很简单,就是给线程添加一个异常捕获处理器​,以后抛了异常,就给它转成 error 日志。这样才能及时发现问题。

Thread 有两个属性,一个类静态变量,一个实例对象。都可以设置异常捕获。区别在于一个生效的范围是单个 thread 对象,一个生效的范围是全局的 thread。

// null unless explicitly set
private volatile UncaughtExceptionHandler uncaughtExceptionHandler;

// null unless explicitly set
private static volatile UncaughtExceptionHandler defaultUncaughtExceptionHandler;

我们一般选择给每个 thread 实例都加一个异常捕获。毕竟别人的 thread 咱们别管,只管自己创建的 thread。

Thread thread = new Thread(() -> {
    log.info("111");
    throw new RuntimeException("运行时异常了");
});
Thread.UncaughtExceptionHandler uncaughtExceptionHandler =(t,e)->{
    log.error("Exception in thread ",e);
};
thread.setUncaughtExceptionHandler(uncaughtExceptionHandler);
thread.start();

image

线程池的异常捕获

我们工作中一般不直接创建对象,都用的线程池。这下要怎么去给线程设置异常捕获呢?

用线程池的ThreadFactory​,创建线程的工厂,创建线程的时候给线程添加异常捕获。不了解的去补一补线程池基础

private static ExecutorService executor = new ThreadPoolExecutor(1, 1,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>(500), 
    new NamedThreadFactory("refresh-ipDetail",null, false,
                           new MyUncaughtExceptionHandler()));

这是我的 ip 解析线程池,直接在工厂里添加一个异常捕获处理器就好了。它在创建 thread 的时候,会把这个异常捕获赋值给thread

image

如果是这么简单,那一切到这儿就结束了。

Spring 自身的线程池异常处理

我们还有两个线程池,用到了 Spring 的线程池。由于 Spring 的封装,想要给线程工厂设置一个捕获器,可是很困难的。

 @Bean(WS_EXECUTOR)
public ThreadPoolTaskExecutor websocketExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(16);
    executor.setMaxPoolSize(16);
    executor.setQueueCapacity(1000);//支持同时推送1000人
    executor.setThreadNamePrefix("websocket-executor-");
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());//满了直接丢弃,默认为不重要消息推送
    executor.initialize();
    return executor;
}

image

可以看到它自己实现了 ThreadFactory。在CustomizableThreadFactory​类的位置

image

点进去可以看见它内部封装好的创建线程的方法

image

压根就没有机会去设置一个线程捕获器。

它的抽象类 ​ExecutorConfigurationSupport ​​将自己赋值给线程工厂,提供了一个解耦的机会。

image

如果我们把这个线程工厂换了,那么它的线程创建方法就会失效。线程名,优先级啥的全都得我们一并做了。而我们只是想扩展一个线程捕获。

**这时候一个设计模式浮出脑海,**装饰模式(Wrapper、decorator)1

装饰器模式不会改变原有的功能,而是在功能前后做一个扩展点 。完全适合我们这次的改动。

首先先写一个自己的线程工厂,把 spring 的线程工厂传进来。调用它的线程创建后,再扩展设置我们的异常捕获

public class MyThreadFactory implements ThreadFactory {

    private ThreadFactory original;

    @Override
    public Thread newThread(Runnable r) {
        Thread thread = original.newThread(r);
        thread.setUncaughtExceptionHandler(new MyUncaughtExceptionHandler());//异常捕获
        return thread;
    }
}

第二步,替换 spring 线程池的线程工厂。

image

一个完美的装饰器模式就这么写完了。

有人问,这么复杂,为啥得要用 spring 的线程池,不用原生的呢?因为 spring 提供了很多优雅关闭等功能啊,看看 上面的文章。


  1. 装饰模式(Wrapper、decorator)

    装饰设计(装饰者模式 / 装饰器模式) (refactoringguru.cn)

    定义

    装饰模式是一种结构型设计模式**,** 允许你通过将对象放入包含行为的特殊封装对象中来为原对象绑定新的行为**。**

    装饰设计模式

    动态地给子类添加额外更多的功能。

    并且在不需要额外创造更多子类的情况下,将对象的功能继续扩展

    代码拓展问题

    在项目开发后期,如果需要对该类进行拓展的话,有可能会造成代码量的激增,并且使代码难以管理

    假设你正在开发一个提供通知功能的库**,** 其他程序可使用它向用户发送关于重要事件的通知**。**

    库的最初版本基于** 通知器Noti­fi­er 类,** 其中只有很少的几个成员变量**,** 一个构造函数和一个** send发送方法。** 该方法可以接收来自客户端的消息参数**,** 并将该消息发送给一系列的邮箱**,** 邮箱列表则是通过构造函数传递给通知器的**。** 作为客户端的第三方程序仅会创建和配置通知器对象一次**,** 然后在有重要事件发生时对其进行调用**。**

    使用装饰模式前的库结构

    程序可以使用通知器类向预定义的邮箱发送重要事件通知。

    此后某个时刻**,** 你会发现库的用户希望使用除邮件通知之外的功能**。** 许多用户会希望接收关于紧急事件的手机短信**,** 还有些用户希望在微信上接收消息**,** 而公司用户则希望在 QQ 上接收消息**。**

    实现其他类型通知后的库结构

    每种通知类型都将作为通知器的一个子类得以实现。

    这有什么难的呢**?** 首先扩展** 通知器,** 然后在新的子类中加入额外的通知方法**。** 现在客户端要对所需通知形式的对应类进行初始化**,** 然后使用该类发送后续所有的通知消息**。**

    但是很快有人会问**:** ​** “为什么不同时使用多种通知形式呢?** 如果房子着火了**,** 你大概会想在所有渠道中都收到相同的消息吧**。 **

    你可以尝试创建一个特殊子类来将多种通知方法组合在一起以解决该问题**。** 但这种方式会使得代码量迅速膨胀**,** 不仅仅是程序库代码**,** 客户端代码也会如此**。**

    创建组合类后的程序库结构

    子类组合数量爆炸。

    你必须找到其他方法来规划通知类的结构**,** 否则它们的数量会在不经意之间打破吉尼斯纪录**。**

    真实世界类比

    装饰模式示例

    穿上多件衣服将获得组合性的效果。

    穿衣服是使用装饰的一个例子**。** 觉得冷时**,** 你可以穿一件毛衣**。** 如果穿毛衣还觉得冷**,** 你可以再套上一件夹克**。** 如果遇到下雨**,** 你还可以再穿一件雨衣**。** 所有这些衣物都** “扩展”** 了你的基本行为**,** 但它们并不是你的一部分**,** 如果你不再需要某件衣物**,** 可以方便地随时脱掉**。**

    使代码对修改 关闭,对增加 开发

    结构

    image

    实现代码

    装饰者模式:若要扩展功能,装饰者提供了比集成更有弹性的替代方案,动态地将责任附加到对象上。

    • 先简单描述下装饰者模式发挥作用的地方,当我们设计好了一个类,我们需要给这个类添加一些辅助的功能,并且不希望改变这个类的代码,这时候就是装饰者模式大展雄威的时候了。这里还体现了一个原则:类应该对扩展开放,对修改关闭。

    • 需求:设计游戏的装备系统,基本要求,要可以计算出每种装备在镶嵌了各种宝石后的攻击力和描述:

    • 1、装备的超类:IEquip.java

      • public interface IEquip {
        
            /**
             * 计算攻击力
             */
            public int caculateAttack();
        
            /**
             * 装备的描述
             */
            public String description();
        }
        
    • 2、各个装备的实现类:

      • eg:武器的实现类: ArmEquip.java
      • public class ArmEquip implements IEquip {
        
            @Override
            public int caculateAttack() {
                return 20;
            }
        
            @Override
            public String description() {
                return "屠龙宝刀";
            }
        }
        
    • 3、装饰品的超类(装饰品也属于装备):IEquipDecorator.java

      • public interface IEuipDecorator extends IEquip {
        }
        
    • 4、装饰品的实现类:

      • eg:蓝宝石的实现类(可累加): BlueGemDecorator.java

        • public class BlueGemDecorator implements IEuipDecorator {
          
              private IEquip iEquip;
          
              public BlueGemDecorator(IEquip iEquip) {
                  this.iEquip = iEquip;
              }
          
              /**
               * 累加攻击力
               */
              @Override
              public int caculateAttack() {
                  return 5 + iEquip.caculateAttack();
              }
          
              @Override
              public String description() {
                  return iEquip.description() + "+ 蓝宝石";
              }
          }
          
      • 红宝石装饰类

        • public class RedGemDecorator implements IEuipDecorator {
          
              private IEquip iEquip;
          
              public RedGemDecorator(IEquip iEquip) {
                  this.iEquip = iEquip;
              }
          
              /**
               * 累加攻击力
               */
              @Override
              public int caculateAttack() {
                  return 15 + iEquip.caculateAttack();
              }
          
              @Override
              public String description() {
                  return iEquip.description() + "+ 红宝石";
              }
          }
          
    • 5、最后测试:计算攻击力和查看描述:

       Log.e("---", "一个镶嵌2颗红宝石,1颗蓝宝石的靴子: ");
       IEquip iEquip = new RedGemDecorator(new RedGemDecorator(new BlueGemDecorator(new ShoeEquip())));
       Log.e("---", "攻击力:" + iEquip.caculateAttack());
       Log.e("---", "描述语:" + iEquip.description());
      
      
      

    上述装饰类都继承自一个基类 IEquip。同时,每个装饰类中,都有 IEquip 类

    这样就能够实现 “嵌套” 装饰了!!!

  • Java

    Java 是一种可以撰写跨平台应用软件的面向对象的程序设计语言,是由 Sun Microsystems 公司于 1995 年 5 月推出的。Java 技术具有卓越的通用性、高效性、平台移植性和安全性。

    3190 引用 • 8214 回帖 • 1 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...