在项目实践中,会用到很多池化的技术,比如线程池、数据库连接池、HTTP连接池等,这种池化技术的思想主要是为了减少每次获取资源的消耗,提高对资源的利用率。

其中,线程池的定义是管理一系列的资源池,并提供管理和限制线程资源的方式。每个线程池还维护一些基本统计信息,比如已完成的任务数量。

在《Java并发编程的艺术》中,总结了线程池的好处:

  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
  • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

当频繁创建、销毁线程和线程池,会给系统带来额外的开销,未经池化及统一管理的线程,会导致系统内线程数上限不可控。因此,为解决上述问题,可增加统一线程池配置,替换掉自建线程和线程池。

在线程池实现中,很多都基于Executor框架,Executor 框架是 Java5 之后引进的,在 Java 5 之后,通过 Executor 来启动线程比使用 Thread 的 start 方法更好,除了更易管理,效率更好(用线程池实现,节约开销)外,还有关键的一点:有助于避免 this 逃逸问题(this 逃逸是指在构造函数返回之前其他线程就持有该对象的引用,调用尚未构造完全的对象的方法可能引发令人疑惑的错误)。

Executor 框架不仅包括了线程池的管理,还提供了线程工厂、队列以及拒绝策略等,Executor 框架让并发编程变得更加简单。Executor 框架结构主要由三大部分组成:

  • 1、任务(Runnable /Callable),执行任务需要实现的 Runnable 接口 或 Callable接口。Runnable 接口或 Callable 接口实现类都可以被 ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 执行。
  • 2、任务的执行(Executor),包括任务执行机制的核心接口 Executor ,以及继承自 Executor 接口的 ExecutorService 接口。ThreadPoolExecutor 和 ScheduledThreadPoolExecutor 这两个关键类实现了 ExecutorService 接口。
  • 3、异步计算的结果(Future),Future 接口以及 Future 接口的实现类 FutureTask 类都可以代表异步计算的结果。

因此得到Executor框架流程图:

Executor框架流程图

1.主线程首先要创建实现 Runnable 或者 Callable 接口的任务对象。

2.把创建完成的实现 Runnable/Callable接口的 对象直接交给 ExecutorService 执行: ExecutorService.execute(Runnable command))或者也可以把 Runnable 对象或Callable 对象提交给 ExecutorService 执行(ExecutorService.submit(Runnable task)或 ExecutorService.submit(Callable task))。

3.如果执行 ExecutorService.submit(…),ExecutorService 将返回一个实现Future接口的对象(submit()会返回一个 FutureTask 对象)。由于 FutureTask 实现了 Runnable,我们也可以创建 FutureTask,然后直接交给 ExecutorService 执行。

4.最后,主线程可以执行 FutureTask.get()方法来等待任务执行完成。主线程也可以执行 FutureTask.cancel(boolean mayInterruptIfRunning)来取消此任务的执行。

自建线程池

自建线程池
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Configuration
@EnableAsync
public class ThreadPoolConfig implement AsyncConfigurer {
    /**
     * 项目共用线程池
     */
    public static final String MALLCHAT_EXECUTOR = "mallchatExecutor";
    /**
     * websocket通信线程池
     */
    public static final String WS_EXECUTOR = "websocketExecutor";

    @Override
    public Executor getAsyncExecutor() {
        //返回项目主线程池
        return mallchatExecutor();
    }

    @Bean(MALLCHAT_EXECUTOR)
    @Primary
    public ThreadPoolTaskExecutor mallchatExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //线程池优雅停机的关键,等任务执行完再停机,保证任务不丢失。
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(10);
        executor.setQueueCapacity(200);
        //线程前缀
        executor.setThreadNamePrefix("mallchat-executor-");
        //拒绝策略
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());//满了调用线程执行,认为重要任务
        //使用自己修改后的线程工厂
        executor.setThreadFactory(new MyThreadFactory(executor));
        executor.initialize();
        return executor;
    }
}
上面的代码创建了一个统一线程池,并通过实现AsyncConfigurer设置了@Async注解也使用我们统一的线程池,方便统一管理。

我们的线程池没有使用Executors快速创建,这是因为Executors创建的线程池用的是无界队列,有oom的风险。

通过设置线程前缀便于排查cpu占用,死锁问题或其他bug时,可以根据线程名看出是业务问题还是底层框架问题。

优雅停机

当项目关闭时,需要通过jvm的shutdownHook回调线程池,等队列中的任务执行完后再停机,保证任务不丢失。

通过shutdownHook回调实现优雅停机
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import java.util.concurrent.Callable;

//省略import
public class ThreadUtil {
    //懒汉式单例创建线程池:用于执行定时、顺序任务
    static class SeqOrScheduledTargetThreadPoolLazyHolder {
        //线程池:用于定时任务、顺序排队执行任务
        static final ScheduledThreadPoolExecutor EXECUTOR =
                new ScheduledThreadPoolExecutor(1,
                        new CustomThreadFactory("seq"));

        static {
            //注册JVM关闭时的钩子函数
            Runtime.getRuntime().addShutdownHook(
                    new ShutdownHookThread("定时和顺序任务线程池",
                            new Callable<Void>() {
                                @Override
                                public Void call() throws Exception {
                                    //优雅地关闭线程池
                                    shutdownThreadPoolGracefully(EXECUTOR);
                                    return null;
                                }
                            }));
        }
    }
    //省略不相关代码
}

由于shutdownHook会回调spring容器,因此我们实现spring的DisposableBean的destory方法可以达到同样的效果,在里面调用executor.shutdown()并等待线程池执行完毕。

调用executor.shutdown()实现优雅停机
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class TestService{
    public static void main(String[] args) {
        //创建固定 3 个线程的线程池,测试使用,工作中推荐ThreadPoolExecutor
        ExecutorService threadPool = Executors.newFixedThreadPool(3);

        //向线程池提交 10 个任务
        for (int i = 1; i <= 10; i++) {
            final int index = i;
            threadPool.submit(() -> {
                System.out.println("正在执行任务 " + index);
                //休眠 3 秒,模拟任务执行
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        //休眠 4 秒
        try {
            Thread.sleep(4000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        //关闭线程池
        threadPool.shutdown();
    }
}

在上述代码中,构造了一个包含固定3线程数的线程池,循环提交10个任务,每个任务休眠3秒,但主程序休眠4秒后,会调用shutdown方法。理论上在第二个线程时间循环中,线程池就已经被停止了,3s+4s最多执行完6个任务,但是我们在程序运行过程中丝毫感受不到线程何时被停止了,这个过程就是优雅停机的作用。

调用shutdownNow()实现停机
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
 * 尝试停止所有正在执行的任务,停止处理等待的任务,
 * 并返回等待处理的任务列表。
 *
 * @return 从未开始执行的任务列表
 */
public List<Runnable> shutdownNow() {
    List<Runnable> tasks; // 用于存储未执行的任务的列表
    final ReentrantLock mainLock = this.mainLock; // ThreadPoolExecutor的主锁
    mainLock.lock(); // 加锁以确保独占访问
    try {
        checkShutdownAccess(); // 检查是否有关闭的权限
        advanceRunState(STOP); // 将执行器的状态更新为STOP
        interruptWorkers(); // 中断所有工作线程
        tasks = drainQueue(); // 清空队列并将结果放入任务列表中
    } finally {
        mainLock.unlock(); // 无论try块如何退出都要释放锁
    }
    tryTerminate(); // 如果条件允许,尝试终止执行器
    
    return tasks; // 返回队列中未被执行的任务列表
}

与shutdown不同的是shutdownNow会尝试终止所有的正在执行的任务,清空队列,停止失败会抛出异常,并且返回未被执行的任务列表。

结合**shutdown()+awaitTermination(long timeout, TimeUnit unit)**实现优雅停机:

awaitTermination(long timeout, TimeUnit unit)是可以允许我们在调用shutdown方法后,再设置一个等待时间,如设置为5秒,则表示shutdown后5秒内线程池彻底终止,返回true,否则返回false;

这种方式里,我们将shutdown()结合awaitTermination(long timeout, TimeUnit unit)方法去使用,注意在调用 awaitTermination() 方法时,应该设置合理的超时时间,以避免程序长时间阻塞而导致性能问题,而且由于这个方法在超时后也会抛出异常,因此,我们在使用的时候要捕获并处理异常。

线程池使用

对于不同的线程池,通过设置beanName区分,

设置beanName区分不同线程池
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
    /**
     * 项目共用线程池
     */
    public static final String MALLCHAT_EXECUTOR = "mallchatExecutor";
    /**
     * websocket通信线程池
     */
    public static final String WS_EXECUTOR = "websocketExecutor";
    
    @Bean(MALLCHAT_EXECUTOR)
    @Primary
    public ThreadPoolTaskExecutor mallchatExecutor() {
        //具体的设置
    }
    
    @Bean(WS_EXECUTOR)
    public ThreadPoolTaskExecutor websocketExecutor() {
        //具体的设置
    }

当业务需要用时,可以通过

1
2
3
4
@Qualifier(ThreadPoolConfig.MALLCHAT_EXECUTOR)
//指定特定的线程执行器
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
threadPoolTaskExecutor.execute()->webSocketService.scanSuccess(eventKey);

或者直接在方法上加异步注解@Async实现线程池使用。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
    @Async
    @TransactionalEventListener(classes = GroupMemberAddEvent.class,fallbackExecution = true)
    public void sendAddMsg(GroupMemberAddEvent event){
        List<GroupMember> memberList = event.getMemberList();
        RoomGroup roomGroup = event.getRoomGroup();
        Long inviteUid = event.getInviteUid();
        User user = userInfoCache.get(inviteUid);
        List<Long> uidList = memberList.stream().map(GroupMember::getUid).collect(Collectors.toList());
        //组装系统消息,用于通知群成员
        ChatMessageReq chatMessageReq = RoomAdapter.buildGroupAddMessage(roomGroup, user, userInfoCache.getBatch(uidList));
        chatService.sendMsg(chatMessageReq,User.UID_SYSTEM);
    }

异常捕获

普通的异常被我们捕获之后,日志会记录相关的异常提示,但是没有控制台相关的异常信息。如果一个异常未被捕获,从线程中抛出来了,JVM会回调一个dispatchUncaughtException。

1
2
3
4
5
6
7
 /**
 * Dispatch an uncaught exception to the handler. This method is
 * intended to be called only by the JVM.
 */
private void dispatchUncaughtException(Throwable e) {
    getUncaughtExceptionHandler().uncaughtException(this, e);
}

这个方法在Thread类中,会进行默认的异常处理,也就是获取一个默认的异常处理器,这个异常处理器是ThreadGroup实现的异常捕获方法。

为了捕获线程异常,我们给线程添加一个异常捕获处理器,当出现异常时,将异常转化为error日志。

Thread有两个属性,一个是实例对象,一个是类静态变量,都可以设置异常捕获,区别在于一个生效的范围是单个thread对象,一个生效的范围是全局的thread。

1
2
3
4
5
// null unless explicitly set-实例对象-单个线程对象
private volatile UncaughtExceptionHandler uncaughtExceptionHandler;

// null unless explicitly set-类静态变量-全局所有线程
private static volatile UncaughtExceptionHandler defaultUncaughtExceptionHandler;

一般选择为每个Thread实例都加一个异常捕获,也就是只对自己开发的线程负责。

1
2
3
4
5
6
7
8
9
Thread thread = new Thread(() -> {
    log.info("111");
    throw new RuntimeException("运行时异常了");
});
Thread.UncaughtExceptionHandler uncaughtExceptionHandler =(t,e)->{
    log.error("Exception in thread ",e);
};
thread.setUncaughtExceptionHandler(uncaughtExceptionHandler);
thread.start();

线程池异常捕获

使用线程池的ThreadFactory,创建线程的工厂,创建线程的时候给线程添加异常捕获。

比如在ip解析线程池中,直接在工厂中添加一个异常捕获处理器,在创建thread时,将这个异常捕获处理器赋值给thread。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
private static ExecutorService executor = new ThreadPoolExecutor(1, 1,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>(500), 
    new NamedThreadFactory("refresh-ipDetail",null, false,
                           new MyUncaughtExceptionHandler()));

public class MyUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler{
    @Override
    public void uncaughtException(Thread t, Throwable e) {
        log.error("Exception in thread:" , e);
    }
}

由于我们还有两个线程池,用到了Spring线程池,由于Spring的封装,想给线程工厂设置一个捕获器很困难。

比如在websocket线程池中,

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
 @Bean(WS_EXECUTOR)
public ThreadPoolTaskExecutor websocketExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(16);
    executor.setMaxPoolSize(16);
    executor.setQueueCapacity(1000);//支持同时推送1000人
    executor.setThreadNamePrefix("websocket-executor-");
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());//满了直接丢弃,默认为不重要消息推送
    executor.initialize();
    return executor;
}

查看源码可知,Spring的线程池自己实现了ThreadFactory,我们并没有机会去设置一个自定义的线程捕获器。

而它的抽象类ExecutorConfigurationSupport将自己的值赋值给了线程工厂,提供了一个解耦机会。

1
2
3
4
5
6
package org.springframework.scheduling.concurrent;
public abstract class ExecutorConfigurationSupport extends CustomizableThreadFactory implements BeanNameAware, InitializingBean, DisposableBean {
    protected final Log logger = LogFactory.getLog(this.getClass());
    private ThreadFactory threadFactory = this;
    private boolean threadNamePrefixSet = false;
    ...

但是如果我们将这个线程工厂换了,那么它的线程创建方法都会失效,线程名,优先级都需要我们一起设置了,这加大很多无关的工作,而我们只想扩展一个线程捕获。

因此,我们想到了装饰器模式,装饰器模式不会改变原有的功能,只是在功能的前后做一个扩展点,因此适合我们的改动。

首先写一个自己的线程工厂,再把spring的线程工厂传进来,调用它的线程创建后,再扩展设置我们自己的异常捕获。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
public class MyThreadFactory implements ThreadFactory {

    private ThreadFactory original;

    @Override
    public Thread newThread(Runnable r) {
        //original是组合方式创建线程
        Thread thread = original.newThread(r);//执行spring线程自己的创建逻辑
        //额外装饰我们需要的创建逻辑
        thread.setUncaughtExceptionHandler(new MyUncaughtExceptionHandler());//异常捕获
        return thread;
    }
}

然后替换spring的线程池的线程工厂

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
    @Bean(WS_EXECUTOR)
    public ThreadPoolTaskExecutor websocketExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //线程池优雅停机的关键,等任务执行完再停机,保证任务不丢失。
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setCorePoolSize(16);
        executor.setMaxPoolSize(16);
        executor.setQueueCapacity(1000);
        //线程前缀
        executor.setThreadNamePrefix("websocket-executor-");
        //拒绝策略
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());//满了直接丢弃
        //使用自己修改后的线程工厂
        executor.setThreadFactory(new MyThreadFactory(executor));
        executor.initialize();
        return executor;
    }

通过上述配置,我们就能够实现统一的线程池管理和提高线程池效率。

—END—

参考文献 https://www.yuque.com/snab/mallchat/oo87m9gndo4yx2sw https://javaguide.cn/java/concurrent/java-thread-pool-summary.html