系统中大量使用线程池,有必要对线程池进行监控。 可以监控如下指标: 可以检测到正在执行的线程数。 可以检测任务队列堆积任务数。 可以检测活动线程数。 可以检测最大线程数。 ? 具体如下: queue.size:获取线程池任务队列数量。 taskCount:线程池需要执行的任务数量。 completedTaskCount:线程池在运行过程中已完成的任务数量。 largestPoolSize:线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过。如等于线程池的最大大小,则表示线程池曾经满了。 getPoolSize:线程池的线程数量。 通过扩展线程池进行监控,通过继承线程池并重写线程池的beforeExecute,afterExecute和terminated方法,我们可以在任务执行前,执行后和线程池关闭前干一些事情。 如监控任务的平均执行时间,最大执行时间和最小执行时间等。 这几个方法在线程池里是空方法。
线程池配置核心业务线程池和非核心业务线程池 核心业务的线程不够用 可以停掉非核心业务占用的线程 application.properties #线程池配置 gmall.pool.coreSize=8 27 * @Description: 线程池配置参数 */ @Data @Configuration @ConfigurationProperties(prefix = "gmall.pool") 27 * @Description: 配置当前系统的线程池信息 */ @Configuration public class ThreadPoolConfig { /** * 核心线程 //高并发系统的优化 //1、加缓存 //2、开异步 return null; } } 监控线程池: package com.xiepanpan.gmall.portal.controller 28 * @Description: 监控线程池 */ @RestController public class ThreadPoolController { @Qualifier("mainThreadPoolExecutor
最近我们组杨青同学遇到一个使用线程池不当的问题:异步处理的线程池线程将主线程hang住了,分析代码发现是线程池的拒绝策略设置得不合理,设置为CallerRunsPolicy。 从这个问题中,我们学到了两点: 线程池的使用,需要充分分析业务场景后作出选择,必要的情况下需要自定义线程池; 线程池的运行状况,也需要监控 关于线程池的监控,我参考了《Java编程的艺术》中提供的思路实现的 DEFAULT_QUEUE_SIZE; @Setter private int poolSize = DEFAULT_POOL_SIZE; /** * 用于周期性监控线程池的运行状态 ;(2)拒绝策略是将任务丢弃,但是需要记录错误日志;(3)使用一个调度线程池对业务线程池进行监控。 在查看监控日志的时候,看到下图所示的监控日志: ?
- 线程池名称 = 被监控的线程池2, 提交任务数+1 [ThreadPoolMonitor_0] INFO PoolMonitorTask - 线程池名称 = 被监控的线程池2, 活跃线程数峰值 = 被监控的线程池2, 提交任务数+1 [main] INFO MonitoredThreadPoolExecutor - 线程池名称 = 被监控的线程池2, 提交任务数+1 [被监控的线程池2_0] INFO - 任务0 [被监控的线程池2_0] INFO MonitoredThreadPoolExecutor - 线程池名称 = 被监控的线程池2, 任务排队时间 = 0, 任务执行时间 = 0 [被监控的线程池 线程池名称 = 被监控的线程池2, 提交任务数+1 [被监控的线程池2_0] INFO MonitoredThreadPoolExecutorTest - 增强beforeExecute0 [main - 任务2 [被监控的线程池2_0] INFO MonitoredThreadPoolExecutor - 线程池名称 = 被监控的线程池2, 任务排队时间 = 0, 任务执行时间 = 0 [被监控的线程池
接上文线程池原理(1) 线程池的创建 通过ThreadPoolExecutor构造函数实现(推荐) ? 线程池执行流程 任务缓冲 任务缓冲模块是线程池能够管理任务的核心部分。线程池的本质是对任务和线程的管理,而做到这一点最关键的思想就是将任务和线程两者解耦,不让两者直接关联,才可以做后续的分配工作。 线程池大小确定 线程池数量的确定一直是困扰着程序员的一个难题,大部分程序员在设定线程池大小的时候就是随心而定。 很多人甚至可能都会觉得把线程池配置过大一点比较好!我觉得这明显是有问题的。 I/O 密集型任务(2N):这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。 因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N。 如何判断是 CPU 密集任务还是 IO 密集任务?
但是,线程池使用不当也会使服务器资源枯竭,导致异常情况的发生,比如固定线程池的阻塞队列任务数量过多、缓存线程池创建的线程过多导致内存溢出、系统假死等问题。 因此,我们需要一种简单的监控方案来监控线程池的使用情况,比如完成任务数量、未完成任务数量、线程大小等信息。 ExecutorsUtil工具类 以下是我们开发的一个线程池工具类,该工具类扩展ThreadPoolExecutor实现了线程池监控功能,能实时将线程池使用信息打印到日志中,方便我们进行问题排查、系统调优 统计任务耗时、初始线程数、核心线程数、正在执行的任务数量、已完成任务数量、任务总数、队列里缓存的任务数量、池中存在的最大线程数、最大允许的线程数、线程空闲时间、线程池是否关闭、线程池是否终止信息 监控到的记录如下 : momentspush-pool-2-thread-90 ExecutorsUtil.java:91 momentspush_monitor: Duration: 599 ms, PoolSize:
废话不多说,开始我们的线程池源码的第二轮阅读。 回顾 简单回顾下上一篇线程池源码中涉及的两个方法,一个是execute() 执行任务的入口,还有一个是addWorker() 最通俗地理解就是是否需要添加新线程。 开始是一个循环,要么执行worker自带的第一个任务(firstTask),要么通过getTask() 获取任务 有任务首先得保证线程池是正常的,以下两种情况均调用wt.interrupt() 给「线程设置中断标志位 」 线程池处于STOP状态,也就是不接受新任务,也不执行队列中的任务 如果线程的标志位已经为true,那么清楚标志位,此时的线程池状态为STOP状态,这里看起来可能比较别扭,有了第一种情况为什么还要第二种 一进来也是一个死循环,可以先聚焦「什么时候会退出循环」,肯定是「不正常的情况」下会退出 当线程池状态不处于RUNNING或者SHUTDOWN的时候,或者是当线程处于SHUTDOWN但是工作队列中没有任务
前言 线程池实现原理-1 addWorker实现 在看addWorker方法之前,我们先看一个例子,了解一下retry的使用 break retry 跳到retry处,且不再进入循环 continue = null || workQueue.isEmpty) * 1.如果当前线程池的状态>SHUTDOWN,addWorker返回false,添加任务失败 * 2.如果当前线程池的状态 workerStarted) addWorkerFailed(w); } return workerStarted; } 仔细理解一下这段代码,其实就能理解,当线程池处于 // 1.核心线程允许被销毁 // 2.核心线程数 > corePoolSize boolean timed = allowCoreThreadTimeOut || wc 2.wc > maximumPoolSize肯定要删除线程了 // 3.workQueue为空可以销毁线程,此时有可能所有线程都被销毁了 // 4.workQueue不为空
在使用slf4j的MDC做日志跟踪的时候,会因为MDC不能跨线程导致跟踪失败,此外,为了监控线上服务器的运行状态,也很有必要对线程池的运行情况进行监控。 下面是一个带有线程池监控且兼容MDC的线程池,建议使用! /** * A SLF4J MDC-compatible {@link ThreadPoolExecutor}. MDC.setContextMap(previous); } } }; } } 参考 如何在线程池中使用 线程池的五种状态 Java线程池监控小结
InterruptedException { ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor)Executors.newFixedThreadPool(2) e.printStackTrace(); } })); } Thread.sleep(3000); // out => 等待线程 threadPoolExecutor.getQueue().size()); } 获取到堆积大小了,就可以通过打印日志的形式进行输出,也可以通过micrometer + prometheus + grafana进行完整的监控 ,可参考 通过micrometer实时监控线程池的各项指标 拓展: ThreadPoolExecutor支持其他数量监控,例如: ?
提高线程的可管理性:使用线程池可以对线程进行统一的分配,调优和监控。 2 线程池的创建 Executors中提供了一系列静态方法创建线程池: newSingleThreadExecutor:一个单线程的线程池。如果因异常结束,会再创建一个新的,保证按照提交顺序执行。 通过 ctl.get() 得到线程池的当前线程数,如果线程数小于corePoolSize,则调用 **addWorker(commond,true)** 方法创建新的线程执行任务,否则执行步骤2; 2. 且条件2不满足,则返回false * 4.条件2解读:线程池为shutdown状态时且任务队列不为空时,可以新增空任务的线程来处理队列中的任务 */ 如果当前线程是突然终止的,调用addWorker()创建工作线程 2. 当前线程不是突然终止,但当前工作线程数量小于线程池需要维护的线程数量,则创建工作线程。
vs submit() execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功与否; submit()方法用于提交需要返回值的任务。 线程池会返回一个 Future 类型的对象,通过这个 Future 对象可以判断任务是否执行成功 ,并且可以通过 Future 的 get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成, ,线程池的状态变为 SHUTDOWN。 线程池不再接受新任务了,但是队列里的任务得执行完毕。 shutdownNow() :关闭线程池,线程的状态变为 STOP。 线程池会终止当前正在运行的任务,并停止处理排队的任务并返回正在等待执行的 List。
主线程: 相当于生产者,只管向线程池提交任务。并不关心线程池是如何执行任务的。因此,并不关心是哪一个线程执行的这个任务。 线程池: 相当于消费者,负责接收任务,并将任务分配到一个空闲的线程中去执行。 python内置进程池 ? >>>执行结果 ? # 必须要有一个 main 测试 >>> if __name__ == "__main__": # Pool 的实例化必须在 main 测试之下 >>>pool = Pool(2) 池的其他操作 操作一 操作二: terminate - 中止进程池,中止所有任务。 # 会阻塞,知道结果产生了 >>> result = a_result.get() 使用线程池来实现并发服务器 ? >>>客户端 ? >>>执行结果 ?
考虑到之前用micrometer + prometheus + grafana搭建过监控体系,于是考虑使用micrometer做一次主动的线程池度量数据采集,最终可以相对实时地展示在grafana的面板中 2、提供两个方法,分别使用线程池实例模拟短时间耗时的任务和长时间耗时的任务。 3、提供一个方法用于清空线程池实例中的任务队列。 F:thread_pool_queue_size,Legend:-线程池积压任务数。 最终效果 多调用几次例子中提供的几个接口,就能得到一个监控线程池呈现的图表: ? 小结 针对线程池ThreadPoolExecutor的各项数据进行监控,有利于及时发现使用线程池的接口的异常,如果想要快速恢复,最有效的途径是:清空线程池中任务队列中积压的任务。 像HTTP客户端的连接池如Apache-Http-Client或者OkHttp等的监控,可以用类似的方式实现,数据收集的时候可能由于加锁等原因会有少量的性能损耗,不过这些都是可以忽略的,如果真的怕有性能影响
位表示工作线程数,最左边3位表示线程池状态。 ); } 第2处 发生拒绝的原因有两个(1)线程池状态非Runing (2)等待队列已满。 /** * 根据当前线程池状态,检查是否可以添加新的任务线程,如果可以则创建并启动任务 * 如果一切正常则返回true。 返回false 的可能如下: * 1.线程池没有处于RUNNING状态 * 2.线程工程创建新的任务线程失败 * @param firstTask 外部启动线程池时需要构造的第一个线程 corePoolSize : maximumPoolSize)) // 最大线程数不能超过2^29,否则影响左边3位的线程池状态值 return false;
所以需要通过线程池协调多个线程,并实现类似主次线程隔离、定时执行、周期执行等任务。线程池的作用包括: 利用线程池管理并复用线程、控制最大并发数等。 实现任务线程队列缓存策略和拒绝机制。 在了解线程池的基本作用后,我们学习一下线程池是如何创建线程的。 这个值的设置非常关键,设置过大会浪费资源,设置的过小会导致线程频繁地创建或销毁。 第2个参数:maximumPoolSize 表示线程池能够容纳同时执行的最大线程数。 从代码第2处来看,队列、线程工程、拒绝处理服务都必须有实例对象,但在实际编码中,很少有程序员对着三者进行实例化,而通过Executors这个线程池静态工厂提供默认实现,那么Executors与ThreadPoolExecutor execute(task1); } } } 当任务被拒绝的时候,拒绝策略会打印出当前线程池的大小以及达到了maximumPoolSize=2 ,且队列已满。
文章目录 一、线程池作用 二、线程池种类 三、线程池工作机制 四、线程池任务调度源码解析 一、线程池作用 ---- 线程池作用 : ① 避免创建线程 : 避免每次使用线程时 , 都需要 创建线程对象 ; ---- 线程池种类 : ① newCachedThreadPool : 可缓存线程池 , 如果 线程池线程个数已满 , 回收空闲线程 , 如果没有空闲线程 , 此时会创建新线程 ; ② newFixedThreadPool 后到的后执行 ) , LIFO 后入先出 ( 后到的先执行 ) ; 三、线程池工作机制 ---- 线程池线程相关概念: 线程数 : 线程池的 有 最大线程数 MaxSzie , 核心线程数 CoreSize , 任务拒绝后 , 处理善后 ; 四、线程池任务调度源码解析 ---- 在 AsyncTask.java 中 , 在静态代码块中 , 自己 自定义创建了线程池 , 没有使用上述四种线程池 ; 创建线程池时传入的参数 * 调用 addWorker 方法会检查运行状态, 和线程运行个数, 避免在不应该添加线程时执行错误操作. * * 2.
文章目录 一、线程池简介 二、线程池初始化方法简介 三、线程池使用示例 一、线程池简介 ---- 线程池一般是实现了 ExecutorService 接口的类 , 一般使用 ThreadPoolExecutor , 合理控制并发数 , 能提高 CPU 使用效率 ; 二、线程池初始化方法简介 ---- 线程池初始化方法简介 : newCachedThreadPool : 创建 可缓存线程池 ; 如果线程池长度超过处理需要 newScheduledThreadPool : 创建 定长周期任务线程池 ; 该线程池支持周期性任务执行 ; newSingleThreadExecutor : 创建 单线程化线程池 ; 该线程只有一个工作线程 是 自己配置的线程池 , 没有使用 Java 默认提供的四种线程池 , Java 提供的四种线程池是 可缓存线程池 , 定长线程池 , 定长周期任务线程池 , 单线程线程池 ; THREAD_POOL_EXECUTOR : 线程池线程分类 : 线程池的线程分为 核心线程 , 非核心线程 两类 ; 非核心线程闲置时间 : 非核心线程 超过一定的闲置时间 , 就会被回收 ; 假设线程池最大线程数是 8 , 核心线程数
concurrent.futures --- 启动并行任务 — Python 3.7.13 文档concurrent.futures 模块提供异步执行可调用对象高层接口异步执行可以由 ThreadPoolExecutor 使用线程或由 **Executor**ThreadPoolExecutor 线程池```pythonimport concurrent.futuresimport urllib.requestURLS = ['http exc)) else: print('%r page is %d bytes' % (url, len(data)))```ProcessPoolExecutor 进程池使用进程池来实现异步执行调用 112272535095293, 115280095190773, 115797848077099, 1099726899285419]def is_prime(n): if n % 2 = 0: return False sqrt_n = int(math.floor(math.sqrt(n))) for i in range(3, sqrt_n + 1, 2)
像这种,提前创建好线程,需要的时候直接使用,我们称之为线程池。这种本质上就是一个生产消费模型。 线程池实现 //ThreadPool.hpp #pragma once #include<iostream> #include<unistd.h> #include<string> #include< tm_sec); return std::string(buffer); } #define SCREEN_TYPE 1 #define FILE_TYPE 2 lg.Enable(SCREEN_TYPE);}while(0) #define EnableFile() do{lg.Enable(FILE_TYPE);}while(0) }; 携带日志的线程池设计 Task>(); tp->Init(); tp->Start(); int cnt=10; while (cnt) { // 不断地向线程池推送任务