"学习与实践动态线程池,提升性能,优化并发处理"

2024-06-13

相关文档

美团线程池实践:https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html 线程池思想解析:https://www.javadoop.com/post/java-thread-pool

引言

在后台项目开发过程中,我们常常借助线程池来实现多线程任务,以此提升系统的吞吐率和响应性;而线程池的参数配置却是一个难以合理评估的值,虽然业界也针对cpu密集型,IO密集型等场景给出了一些参数配置的经验与方案,但是实际业务场景中通常会因为流量的随机性,业务的更迭性等情况出现预计和实际运行情况偏差较大的情况;而不合理的线程池参数,可能导致服务器负载升高,服务不可用,内存溢出等严重问题;一旦遇到参数不合理的问题,还需要重新上线修改,并且存在反复修改的情况,而这期间花费的时间可能带来更大的风险,甚至导致严重业务事故;那么有没有一种方式能有效感知上述问题并及时避免以上问题呢?或许动态线程池可以。

什么是动态线程池

简单来说,动态线程池就是能在不重新部署应用的情况下动态实时变更其核心参数,并且能对其核心参数及运行状态进行监控及告警;以便开发人员可以及时感知到实际业务中因为各种随机情况导致线程池异常的场景,并依据动态变更能力快速调整并验证参数的合理性。

为什么需要动态线程池,存在什么痛点

线程池在给我们业务带来性能和吞吐提升的同时,也存在诸多风险和问题,其中主要原因就在于我们难以设置出合理的线程池参数,一方面线程池的运行机制不是很好理解,配置合理强依赖开发人员的个人经验和知识;另一方面,线程池执行的情况和任务类型相关性较大,同时实际场景中流量的随机性,业务的更迭性也导致业界难以有一套成熟或开箱即用的经验策略来帮助开发人员参考。而线程池参数难以合理设置的特性又不得不让我们关注以下三个痛点问题:

1.运行情况难感知:在业务使用线程池的过程中,线程池的运行情况对于开发人员来说很难感知,我们难以知道每个线程池创建了多少个线程,是否有队列积压,线程池运行状态怎么样,线程池是否已经耗尽... 直到出现线上问题或收到客诉才后知后觉;(我们能否对系统中用到的线程池进行一个整体的把控,在线程池任务积压,任务拒绝等问题发生时,甚至问题发生前进行及时感知,让开发人员能未雨绸缪,尽早发现和解决问题呢?-线程池监控,异常告警)

流量突增导致预估和实际情况偏差较大,同时由于未能及时感知并解决积压情况,最终引发客诉 case1:广告主大批量删除物料后异步清理附属表出现任务积压 问题描述:广告主批量删除计划物料后,对应物料附属表数据未及时删除,导致广告主关键词等物料数上限得不到释放而影响创建新物料,引发线上客诉。 问题原因:广告主删除计划物料后,系统会同步删除计划物料主表信息,然后通过线程池的方式异步删除计划物料附属表数据。临近大促广告主物料增删频率及单次批量操作的物料数量都有明显增加,由于核心线程设置较小同时队列设置过长,导致计划主表同步删除后异步删除附属表的任务出现队列积压,对应的关键词等物料数上限得不到释放而影响新物料创建,引发线上客诉。



2.线程拒绝难定位:当拒绝发生后,即使我们迅速感知到了线程池运行异常,也经常会因为拒绝持续时间较短而拿不到问题发生时的线程堆栈,因此通常难以快速定位甚至无法定位到是哪里的原因导致的拒绝,比如是流量的突增将线程池打满,还是某个业务逻辑耗时较长将线程池中的线程拖住;(我们有没有一种方式能在线程池拒绝后去更容易的定位到问题呢?-自动触发线程池堆栈打印,分析工具)

case2: 线程池拒绝具有随机性,当拒绝时长较短时,难以定位问题原因 问题描述:某业务接口内部计算逻辑较多,且存在多处外部接口调用逻辑,上线后不定时出现线程池拒绝异常,由于持续时间不长,问题发生后无法通过jstack去获取问题发生时现场的线程堆栈, 很难定位是什么原因导致了线程池拒绝;由于没有较好的排查手段,只能通过逐步搂日志的方式排查,而排查过程又可能因为日志较多或者日志不全出现问题定位时间长或者是根本无法定位的情况。 问题原因:某外部某接口不稳定,在性能较差且流量较大时就容易把调用线程池打满,导致可用率下降



3.参数问题难以快速调整:在定位到某个线程池参数设置不合理的情况后,我们需要根据情况随即进行调整,但是"修改->打包->审批->发布"的时间很可能会扩大问题的影响甚至是事故严重程度;同时因为线程池参数难以合理设置的原因,可能导致我们要重复进行上述"修改->打包->审批->发布"的流程...(有没有一种方法能快速修改并验证参数设置的合理性呢?-参数动态调整)

线程池参数设置不合理,难以快速调整参数,业务风险上升 case3:应用JSF接口修改为异步调用后出现可用率下降 问题描述:将应用中部分JSF接口切换为异步模式后,对应可用率有明显下降 问题原因:在修改为异步模式的JSF接口中,部分业务在拿到future对象后使用ThenApply做了一些耗时的操作,另外还有一部分在ThenApply里面又调用了另外一个异步方法;而thenApply的执行会使用jsf的callBack线程池,由于线程池线程配置较小,并且部分回调方法耗时较长,导致callBack线程池被打满,子任务请求线程时进入阻塞队列排队,出现接口超时可用率下降。

业界动态线程池动态线程池调研

当前业界已存在部分动态线程池组件,其主体功能及大体思想类似,但存在以下几个问题

1.与外部中间件耦合较多,难以二次开发加以使用;

2.使用灵活性受限,难以根据业务自身特点进行定制化(自动触发线程池堆栈打印,一键清空队列,callback线程池等)

综合考虑上述问题,决定结合公司中间件及自身业务特点实现一套集线程池监控,异常告警,线程栈自动获取,动态刷新为一体的动态线程池组件。

如何实现动态线程池

整体方案

线程池监控及告警

要实现线程池监控及告警,我们需要关注以下几个要点

1.如何获取到待监控的线程池信息

在实际业务中我们通常想要知道应用中有哪些线程池,每个线程池各个参数在每个时刻的运行情况是怎么样的;对于第一种场景,我们可以构建一个线程池管理器,用于管理应用中使用到的业务线程池,为此我们可以在应用初始化时将这些线程池按名称和实际对象注册到管理器;后续使用时就可以根据名称从管理中心拉取到对应线程池;

public class ThreadPoolManager {
    // 线程池管理器
    private static final ConcurrentHashMap< String, Executor > REGISTER_MAP_BY_NAME = new ConcurrentHashMap<  >();
    private static final ConcurrentHashMap< Executor, String > REGISTER_MAP_BY_EXECUTOR = new ConcurrentHashMap<  >();

    // 注册线程池 
    public static void registerExecutor(String threadPoolName, Executor executor) {
        REGISTER_MAP_BY_NAME.putIfAbsent(threadPoolName, executor);
        REGISTER_MAP_BY_EXECUTOR.putIfAbsent(executor, threadPoolName);
    }

    // 根据名称获取线程池
    public static Executor getExecutorByName(String threadPoolName) {
        return REGISTER_MAP_BY_NAME.get(threadPoolName);
    }

    // 根据线程池获取名称
    public static String getNameByExecutor(Executor executor) {
        return REGISTER_MAP_BY_EXECUTOR.get(executor);
    }
    
    // 获取所有线程池名称
    public static Set< String > getAllExecutorNames() {
        return REGISTER_MAP_BY_NAME.keySet();
    }
}

对于第二种场景,线程池的核心实现类ThreadPoolExecutor提供了多个参数查询方法,我们可以借助这些方法查询某一时刻该线程池的运行快照

getCorePoolSize() // 核心线程数
getMaximumPoolSize() // 最大线程数
getQueue() // 阻塞队列,获取队列大小,容量等
getActiveCount() // 活跃线程数
getTaskCount() // 历史已完成和正在执行的任务数量
getCompletedTaskCount() // 已完成任务数

2.如何将监控的信息保存和展示出来

监管了应用中的业务线程池,也能获取到某一时刻各线程池的运行情况快照,但要实现线程池数据监控还需要我们在每个时刻去采集线程池运行信息,并将其保存下来,同时还需要将这些数据用一个可视化页面展示出来供我们观察才行,否则我们只知道某一时刻的线程池情况也意义不大。为此,我们需要考虑上面看到的过程,例如使用Micrometer采集性能数据,使用Prometheus时序数据库存储指标数据,使用Grafana展示数据;而现在,我们只需要根据pfinder的埋点要求将对应要监控的线程池指标配置到上报逻辑即可,剩下的数据分时采集,数据存储,数据展示可以完全交给pfinder来完成。

// 已经设置埋点的线程池
public static ConcurrentHashSet< String > monitorThreadPool = new ConcurrentHashSet<  >();

// 监控埋点注册
public static void monitorRegister() {
    log.info("===> monitor register start...");
    // 1.获取所有线程池
    Set< String > allExecutorNames = ThreadPoolManager.getAllExecutorNames();
    // 2.遍历线程池,注册埋点
    allExecutorNames.forEach(executorName-> {
        if (!monitorThreadPool.contains(executorName)) {
            monitorThreadPool.add(executorName);
            Executor executor = ThreadPoolManager.getExecutorByName(executorName);
            collect(executor, executorName);
        }
    });
    log.info("===> monitor register end...");
}

// pfinder指标埋点
public static void collect(Executor executorService, String threadPoolName) {
    ThreadPoolExecutor executor = (ThreadPoolExecutor)executorService;
    String prefix = "thread.pool."+threadPoolName;
    gauge1 = PfinderContext.getMetricRegistry().gauges(prefix)
            .gauge(() -> executor.isShutdown() ? 0 : executor.getCorePoolSize())
            .tags(MetricTag.of("type_dimension", "core_size")).build();
    gauge2 = PfinderContext.getMetricRegistry().gauges(prefix)
            .gauge(() -> executor.isShutdown() ? 0 : executor.getMaximumPoolSize())
            .tags(MetricTag.of("type_dimension", "max_size"))
            .build();
    gauge4 = PfinderContext.getMetricRegistry().gauges(prefix)
            .gauge(() -> executor.isShutdown() ? 0 : executor.getQueue().size())
            .tags(MetricTag.of("type_dimension", "queue_size"))
            .build();
}

3.如何监听到异常并告警

线程池运行过程中,我们可能更多关注线程池拒绝前感知线程池队列是否有积压,线程数是否已达设置核心或最大线程数高点以及线程池拒绝异常;由于使用pfinder作为线程池监控组件,其中线程池队列是否有积压,线程数是否已达设置核心,最大线程数高点等异常监听及告警可以直接依赖pfinder的告警配置来实现; 例如下图中配置队列积压超过阈值时的报警

而线程池拒绝异常,我们可以在线程池初始化时包装线程池的拒绝策略,在执行实际拒绝策略前抛出告警;

@Slf4j
public class RejectInvocationHandler implements InvocationHandler {

    private final Object target;

    @Value("${jtool.pool.reject.alarm.key}")
    private String key;

    public RejectInvocationHandler(Object target) {
        this.target = target;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        ExecutorService executor = (ExecutorService)args[1];
        if (Strings.equals(method.getName(), "rejectedExecution")) {
            try {
                rejectBefore(executor);
            }
            catch (Exception exp) {
                log.error("==> Exception while do rejectBefore for pool [{}]", executor, exp);
            }
        }
        return method.invoke(target, args);
    }

    private void rejectBefore(ExecutorService executor) {
        // 触发报警  
        rejectAlarm(executor); 
    }

    /**
     * 拒绝报警
     */
    private void rejectAlarm(ExecutorService executor) {
        String alarmKey = Objects.nonNull(key) ? key : ThreadPoolConst.UMP_ALARM_KEY;
        ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor)executor;
        String threadPoolName = ThreadPoolManager.getNameByExecutor(threadPoolExecutor);
        String errorMsg = String.format("===> 线程池拒绝报警 key: [%s], cur executor: [%s], core size: [%s], max size: [%s], queue size: [%s], curQueue size: [%s]",
                alarmKey, threadPoolName, threadPoolExecutor.getCorePoolSize(), threadPoolExecutor.getMaximumPoolSize(), threadPoolExecutor.getQueue().size()+threadPoolExecutor.getQueue().remainingCapacity(), threadPoolExecutor.getQueue().size());
        log.error(errorMsg);
        Profiler.businessAlarm(alarmKey, errorMsg);
    }

}

自动触发线程堆栈打印

感知到拒绝线程池拒绝异常后,我们需要及时去定位线程池拒绝原因,但现在我们可能只知道是哪个线程池发生了线程池拒绝异常,却难以知道是什么原因导致的,我们常常希望在线程池拒绝时能拿到应用的线程堆栈信息,并依据其分析拒绝原因;但是线程拒绝常常发生速度很快,我们很难捕捉到拒绝时刻的全局线程堆栈快照;为此,我们考虑在线程池拒绝发生时自动触发线程池堆栈打印到日志;

public class RejectInvocationHandler implements InvocationHandler {
...
private void rejectBefore(ExecutorService executor) {
        // 打印线程堆栈到日志的间隔条件
        if (CommonProperty.canPrintStackTrace()) { 
            // 触发报警 
            rejectAlarm(executor); 
            // 触发线程堆栈打印 
            printThreadStack(executor); 
        }
    }
...
}

...
/**
 * 打印线程堆栈信息
 */
public static void printThreadStack(Executor executor) {
    if (!CommonProperty.logPrintFlag) {
        log.info("===> 线程池堆栈打印关闭:[{}]", CommonProperty.logPrintFlag);
        return;
    }
    logger.info("n=================>>> 线程池拒绝堆栈打印start,触发提交拒接处理的线程池:【{}】", executor);
    Map< Thread, StackTraceElement[] > allStackTraces = Thread.getAllStackTraces();
    log.info("===> allStackTraces size :[{}]", allStackTraces.size());
    StringBuilder stringBuilder = new StringBuilder();
    allStackTraces.entrySet().stream()
            .sorted(Comparator.comparing(entry -> entry.getKey().getName()))
            .forEach(threadEntry -> {
                Thread thread = threadEntry.getKey();
                stringBuilder.append(Strings.format("n线程:[{}] 时间: [{}]n", thread.getName(), new Date()));
                stringBuilder.append(Strings.format("tjava.lang.Thread.State: {}n", thread.getState()));
                StackTraceElement[] stack = threadEntry.getValue();
                for (StackTraceElement stackTraceElement : stack) {
                    stringBuilder.append("tt").append(stackTraceElement.toString()).append("n");
                }
                stringBuilder.append("n");
                logger.info(stringBuilder.toString());
                stringBuilder.delete(0, stringBuilder.length());
            });
    logger.info("==============>>> end");
}
...

打印后的信息如下所示

拿到问题发生时的堆栈信息后,我们就可以根据拒绝线程的名称去分析拒绝原因了,看看是否有什么原因导致线程被卡住;为了更方便的分析,可以根据简单的根据日志规则去分析拒绝线程池问题发生时各线程池的运行状态是什么,大部分都集中到了哪个方法的哪个位置

public class ThreadLogAnalyzer {
    public static void main(String[] args) {
        String logFilePath = "/Users/huyongjia1/Desktop/huyongjia/demo/jsfdemo/MyJtool/src/main/resources/reject.monitor 21.log";
        String threadPoolNameLike = "simpleTestExecutor";
        int threadCount = 0;
        HashMap< String, Integer > statusMap = new HashMap<  >();
        HashMap< String, Integer > methodMap = new HashMap<  >();

        try (BufferedReader br = new BufferedReader(new FileReader(logFilePath))) {
            String line;
            while ((line = br.readLine()) != null) {
                if (line.contains("=================>>> 线程池拒绝堆栈打印start,触发提交拒接处理的线程池")) {
                    System.out.println("开始读整个线程");
                }
                if (line.contains(threadPoolNameLike)) {
                    threadCount++;
                    String curStatus = br.readLine();
                    if (curStatus.contains("java.lang.Thread.State")) {
                        statusMap.put(curStatus, (statusMap.getOrDefault(curStatus, 0) + 1));
                        String methodTrace = br.readLine();
                        methodMap.put(methodTrace, (methodMap.getOrDefault(methodTrace, 0) + 1));
                    }
                }
                if (line.contains("==============>>> end")) {
                    System.out.println("结束读整个线程");
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

        System.out.println(Strings.format("===> 当前线程名[{}]共计:{}", threadPoolNameLike, threadCount));
        System.out.println("n===> 状态分析结果:");
        for (Map.Entry< String, Integer > statusEntry : statusMap.entrySet()) {
            System.out.println(Strings.format("t {} {}", statusEntry.getKey(), statusEntry.getValue()));
        }
        System.out.println("n===> 方法分析结果:");
        ArrayList< Map.Entry< String, Integer >> methodEntryList = Lists.newArrayList(methodMap.entrySet());
        methodEntryList.sort(new Comparator< Map.Entry< String, Integer >>() {
            @Override
            public int compare(Map.Entry< String, Integer > o1, Map.Entry< String, Integer > o2) {
                return o2.getValue() - o1.getValue();
            }
        });
        for (Map.Entry< String, Integer > methodEntry : methodEntryList) {
            System.out.println(Strings.format("{} {} {}%", methodEntry.getKey(), methodEntry.getValue(), (methodEntry.getValue() / (double)threadCount) * 100));
        }
    }
}

例如当前收到simpleTestExecutor线程池拒绝告警,利用堆栈信息分析如下,可以看到该线程池共4个线程,其中3个的运行状态为TIMED_WAITING,并且都停在了sleep逻辑处

线程池参数动态刷新

要实现线程池参数动态刷新,我们需要关注以下几个要点:

1.哪些参数需要变更

在使用线程池时,我们通常需要配置多个参数,但是实际上我们只需要灵活配置好corePoolSize(核心线程数),maximumPoolSize(最大线程数),workQueue(队列长度)这三个核心参数就可以应对大部分场景了;

2.运行中的线程池如何变更参数

从前面我们可以知道线程池的核心实现类ThreadPoolExecutor提供了改变corePoolSize,maximumPoolSize的两个快捷方法:

1. setCorePoolSize(int corePoolSize)

2. setMaximumPoolSize(int maximumPoolSize)

我们只需要通过rpc或者http的方式将想要变更的参数传递到应用再利用上述方法设置进去即可;而队列长度的变更却相对麻烦点,因为我们常使用的阻塞队列LinkedBlockingQueue将队列大小设置为成了一个final类型的变量,我们无法快捷变更,那该怎么办呢,其中一个思想就是自定义一个LinkedBlockQueue,修改capacity为非final类型,同时考虑并发问题对其中涉及到的方法进行修改;(可参考RabbitMq中的VariableLinkedBlockingQueue)

3.应用集群场景下如何实现一键参数变更

实际情况下,我们的应用是已集群的方式部署的,这时我们可以借助ducc全局配置工具将要变更的参数传递到集群的各个机器,各机器根据再根据参数中的线程池名称去线程池管理中心拿到对应的线程进行参数变更即可;

/**
 * ducc控制线程池刷新方法, 需要动态刷新的线程池信息列表,举例如下:
 * value:
 * [
 *   {
 *     "threadPoolName": "my_pool",
 *     "corePoolSize": "10",
 *     "maximumPoolSize": "20",
 *     "queueCapacity": "100"
 *   }
 * ]
 */
@LafValue("jtool.pool.refresh")
public void refresh(@JsonConverter List< ThreadPoolProperties > threadPoolProperties) {
    String jsonString = JSON.toJSONString(threadPoolProperties);
    log.info("===> refresh thread pool properties [{}]", jsonString);
    threadPoolProperties = JSONObject.parseArray(jsonString, ThreadPoolProperties.class);
    refresh(threadPoolProperties);
}

public static boolean refresh(List< ThreadPoolProperties > threadPoolProperties) {
    if (Objects.isNull(threadPoolProperties)) {
        log.warn("refresh param is empty!");
        return false;
    }
    log.info("Executor refresh param: [{}]", threadPoolProperties);
    // 1.根据参数获取对应的线程池
    threadPoolProperties.forEach(threadPoolProperty -> {
        String threadPoolName = threadPoolProperty.getThreadPoolName();
        Executor executor = ThreadPoolManager.getExecutorByName(threadPoolName);
        if (Objects.isNull(executor)) {
            log.warn("Register not find this executor: {}", threadPoolName);
            return;
        }
        // 2. 线程池刷新
        refreshExecutor(executor, threadPoolName, threadPoolProperty);
        log.info("Refresh thread pool finish, threadPoolName: [{}]", threadPoolName);
    });
    return true;
}

实践效果

线程池监控

达成目的:对应用中的线程池情况做整体把控,能方便获取各线程池的运行情况

接入pfinder监控后的效果如下

1.pfinder监控:注册的监控线程池会自动上报线程池的活跃,核心,最大,队列大小,队列容量,任务数等指标;监控地址在pfinder当前服务的业务监控中,正确接入后可以看到已经被监控的线程池列表,监控埋点名称: thread.pool.{线程池名称}

如果要看某一个线程池的指标数据,可以单独进入某个线程池的监控,其中通过数据选择要看的指标(type_dimension)

可在展示维度中选择要具体展示的维度,比如分组,实例等

1.当前监控数据为分钟级维度,即按分钟粒度展示线程池参数的瞬时值,如果需要更精细化的数据,可以选择pfinder的秒级监控

2.支持监控指标:

监控指标 指标值
线程池核心参数 core_size
线程池最大线程参数 max_size
当前活跃线程数 active_size
阻塞队列容量(设置的大小) queue_capacity
阻塞队列当前大小(是否有排队) queue_size
线程池完成任务数 completed_task_count

此外,可以通过JSF接口查看当前时刻应用中的线程信息快照,当前时刻被监控的线程池有哪些:

感知异常告警

达成目的:及时感知线程池异常情况,避免问题放大

以队列积压和线程池拒绝告警为例

队列积压告警(邮件):

线程池拒绝告警(咚咚):

自动触发线程堆栈打印

达成目的:自动记录问题发生时的线程堆栈,为线程池拒绝异常排查提供思路,并加快问题定位

实践案例1:

大促期间对核心接口的压测途中,突然收到偶发机器的JSF线程池拒绝告警

实际分析发现拒绝时间较短,整体持续时间不到1分钟

下图因为线程不够瞬间打上去的线程数持续时间

由此来看我们难以通过人工触发jstack的方式在短时间内获取到问题发生时的线程堆栈,也因此无法定位到具体拒绝原因;因此我们尝试借助线程池拒绝时自动打印的线程堆栈分析,自动打印机制会在线程池发生拒绝策略的同时将全局线程堆栈打印到机器对应的日志目录

下图为问题发生时自动打印的堆栈日志

借助分析工具发现512个线程,511个都卡在了同一位置

从堆栈和分析结果可以比较容易的定位到时哪里出现了问题,最终发现我们在每次记录关键日志时通过InetAddress.getLocalHost()方法获取了本机ip,由于获取的方法在特定情况下可能出现加锁的情况,所以可能会先间歇性的线程阻塞;(网上相似案例:https://qa.1r1g.com/sf/ask/4489560281/)

实践案例2:

业务中某线程池偶尔会出现线程池拒绝异常,同样时间仅持续秒级,报警信息如下

通过自动触发的线程堆栈进行分析,发现该线程池中大量线程在拒绝时积压在某接口的jsf调用等待上

由此再结合方法监控及日志可以比较容易定位到该时刻接口性能波动导致

参数动态刷新

达成目的:迅速修改线程池参数,降低问题风险

如果需要对某线程池的参数做变更,只需将修改后的参数设置到ducc并重新发布即可

实践案例:

业务中部分场景从同步JSF调用改为异步后,可用率出现下降,通过分析发现是JSF的JSF-CLI-CB线程池设置较小,出现等待超时导致;借助动态线程池的动态配置能力修改对应ducc发布后问题得到改善

调整前后对比

审核编辑 黄宇

文章推荐

相关推荐