动态调整线程池参数实践

动态调整线程池参数实践欢迎大家关注我的微信公众号【老周聊架构】,Java后端主流技术栈的原理、源码分析、架构以及各种互联网高并发、高性能、高可用的解决方案。一、线程池遇到的挑战我们上一篇《一文读懂线程池的实现原理》已经从线程池如何维护自身状态、线程池如何管理任务、线程池如何管理线程三个维度来深入剖析线程池的底层原理与源码剖析,这让我们对线程池的原理有了较为深入的理解。这对我们多线程编程有很大的帮助,但在使用线程池时还是会面临几个棘手的问题。开发人员个人经验与水平参差不齐,配置线程池参数都是按照自己想法来,没有统一.

大家好,又见面了,我是你们的朋友全栈君。

欢迎大家关注我的微信公众号【老周聊架构】,Java后端主流技术栈的原理、源码分析、架构以及各种互联网高并发、高性能、高可用的解决方案。

一、线程池遇到的挑战

我们上一篇 《一文读懂线程池的实现原理 》已经从线程池如何维护自身状态、线程池如何管理任务、线程池如何管理线程三个维度来深入剖析线程池的底层原理与源码剖析,这让我们对线程池的原理有了较为深入的理解。这对我们多线程编程有很大的帮助,但在使用线程池时还是会面临几个棘手的问题。

  • 开发人员个人经验与水平参差不齐,配置线程池参数都是按照自己想法来,没有统一的一个配置标准。
  • 线程池执行情况与任务类型相关性较大,IO 密集型和 CPU 密集型的任务运行起来的情况差异非常大。
  • 当你配置好线程池后,有的时间段流量高峰期,导致线程池忙不过来;有的时间段流量低峰期,线程池比较空闲。这就会导致资源调度失衡,降低了系统的稳定性。

我们先来看下美团调研的业界一些线程池参数配置方案:

在这里插入图片描述

  • 第一种方案是出自《Java并发编程实践》,显然和业务场景有所偏离。
  • 第二种方案也不太合理,为什么呢?我们一个项目里一般来说不止一个自定义线程池吧?比如有专门处理数据异步持久化的线程池,有专门处理查询请求的线程池,这样去做一个简单的线程隔离。但是如果都用这样的参数配置的话,显然是不合理的。
  • 第三种方案虽然考虑到了业务场景,但这是理想状态。流量是不可能这么均衡的,就拿美团来说,下午 3、4 点的流量,能和 12 点左右午饭时的流量比吗?

基于上面线程池的几个痛点,那有没有好的解决方案呢?有的,那就是动态调整线程池参数。

尽管业界没有一些成熟的经验配置策略,那么我们是不是可以从修改线程池参数的成本入手?毕竟每次线上线程池故障的话,都得修改代码里的线程池相应的参数,然后再部署上线,这个过程在对可用性要求极高的项目中那是极其慢的,可能给公司造成巨大的损失。

既然改代码里的线程池相应的参数并上线这个过程慢,那我们是不是可以把相应的线程池参数配置到分布式配置中心上去?实现线程池参数可动态配置和即时生效,线程池参数动态化前后的参数修改流程对比如下:

在这里插入图片描述

二、动态化线程池

2.1 整体设计

动态化线程池的核心设计包括以下三个方面:

2.1.1 简化线程池配置

线程池构造参数有 7 个,但是最核心的是 3 个:corePoolSize、maximumPoolSize、workQueue,它们最大程度地决定了线程池的任务分配和线程分配策略。考虑到在实际应用中我们获取并发性的场景主要是两种:

  • 并行执行子任务,提高响应速度。这种情况下,应该使用同步队列,没有什么任务应该被缓存下来,而是应该立即执行。
  • 并行执行大批次任务,提升吞吐量。这种情况下,应该使用有界队列,使用队列去缓冲大批量的任务,队列容量必须声明,防止任务无限制堆积。

所以线程池只需要提供这三个关键参数的配置,并且提供两种队列的选择,就可以满足绝大多数的业务需求。

2.1.2 参数可动态修改

为了解决参数不好配,修改参数成本高等问题。在 Java 线程池留有高扩展性的基础上,封装线程池,允许线程池监听同步外部的消息,根据消息进行修改配置。将线程池的配置放置在平台侧,允许开发同学简单的查看、修改线程池配置。

2.1.3 增加线程池监控

对某事物缺乏状态的观测,就对其改进无从下手。在线程池执行任务的生命周期添加监控能力,帮助开发同学了解线程池状态。

在这里插入图片描述
2.2 功能架构

动态化线程池提供如下功能:

  • 动态调参:支持线程池参数动态调整、界面化操作;包括修改线程池核心大小、最大核心大小、队列长度等;参数修改后及时生效。
  • 任务监控:支持应用粒度、线程池粒度、任务粒度的 Transaction 监控;可以看到线程池的任务执行情况、最大任务执行时间、平均任务执行时间、P95/P99线等。
  • 负载告警:支持告警规则配置,当超过阈值时(线程池队列任务积压到一定值、线程池负载数达到一定阈值)会通知相关的开发负责人。
  • 操作监控:创建、修改和删除线程池都会通知到应用的开发负责人。
  • 操作日志:可以查看线程池参数的修改记录,谁在什么时候修改了线程池参数、修改前的参数值是什么。
  • 权限校验:只有应用开发负责人才能够修改应用的线程池参数。

在这里插入图片描述

三、Apollo 分布式配置中心

了解完动态化线程池的整体设计与功能架构后,我相信你也可以设计出一款动态线程池组件出来的。下面跟着老周来实践一下动态调整线程池参数,可能不像上面设计的那样那么全面,但会把动态调整线程池参数的核心给实现一下。

不难发现动态化线程池的核心是配置管理,那我们就得找一个分布式配置中心,这里老周用的 Apollo,还有其它的像 Spring Cloud Config、disconf、某些大型互联网公司自研的分布式配置中心等,根据自己的项目情况以及使用场景来选择就行。

3.1 Apollo 总体设计

Apollo(阿波罗)是携程框架部门研发的分布式配置中心,能够集中化管理应用不同环境、不同集群的配置,配置修改后能够实时推送到应用端,并且具备规范的权限、流程治理等特性,适用于微服务配置管理场景。

3.1.1 基础模型

如下图即是 Apollo 的基础模型:

  1. 用户在配置中心对配置进行修改并发布
  2. 配置中心通知 Apollo 客户端有配置更新
  3. Apollo 客户端从配置中心拉取最新的配置、更新本地配置并通知到应用

在这里插入图片描述
3.1.2 架构模块

在这里插入图片描述
上图简要描述了 Apollo 的总体设计,我们可以从下往上看:

  • Config Service 提供配置的读取、推送等功能,服务对象是 Apollo 客户端
  • Admin Service 提供配置的修改、发布等功能,服务对象是 Apollo Portal(管理界面)
  • Config Service 和 Admin Service 都是多实例、无状态部署,所以需要将自己注册到 Eureka 中并保持心跳
  • 在 Eureka 之上我们架了一层 Meta Server 用于封装 Eureka 的服务发现接口
  • Client 通过域名访问 Meta Server 获取 Config Service 服务列表(IP+Port),而后直接通过 IP+Port 访问服务,同时在 Client 侧会做 load balance、错误重试
  • Portal 通过域名访问 Meta Server 获取 Admin Service 服务列表(IP+Port),而后直接通过 IP+Port 访问服务,同时在 Portal 侧会做 load balance、错误重试
  • 为了简化部署,我们实际上会把 Config Service、Eureka 和 Meta Server 三个逻辑角色部署在同一个 JVM 进程中

3.2 服务端设计

3.2.1 配置发布后的实时推送设计

在配置中心中,一个重要的功能就是配置发布后实时推送到客户端。本文重点分析配置更新推送方式,下面我们简要看一下这块是怎么设计实现的。

在这里插入图片描述
上图简要描述了配置发布的大致过程:

  1. 用户在 Portal 操作配置发布
  2. Portal 调用 Admin Service 的接口操作发布
  3. Admin Service 发布配置后,发送 ReleaseMessage 给各个 Config Service
  4. Config Service 收到 ReleaseMessage 后,通知对应的客户端

上图的发送 ReleaseMessage 的实现方式详情请往下继续看:

Admin Service 在配置发布后,需要通知所有的 Config Service 有配置发布,从而 Config Service 可以通知对应的客户端来拉取最新的配置。

从概念上来看,这是一个典型的消息使用场景,Admin Service 作为 producer 发出消息,各个 Config Service 作为 consumer 消费消息。通过一个消息组件(Message Queue)就能很好的实现 Admin Service 和 Config Service 的解耦。

在实现上,考虑到 Apollo 的实际使用场景,以及为了尽可能减少外部依赖,我们没有采用外部的消息中间件,而是通过数据库实现了一个简单的消息队列。

在这里插入图片描述
3.3 客户端设计

在这里插入图片描述
上图简要描述了 Apollo 客户端的实现原理:

  1. 客户端和服务端保持了一个长连接,从而能第一时间获得配置更新的推送。(通过 Http Long Polling 实现)
  2. 客户端还会定时从 Apollo 配置中心服务端拉取应用的最新配置。
  3. 客户端从 Apollo 配置中心服务端获取到应用的最新配置后,会保存在内存中。
  4. 客户端会把从服务端获取到的配置在本地文件系统缓存一份。
  5. 应用程序可以从 Apollo 客户端获取最新的配置、订阅配置更新通知。

我们从基础模型、服务端设计、客户端设计三个维度来分析了 Apollo 总体设计,相信你对 Apollo 分布式配置中心有了全面且清晰的理解了。为了照顾没有用过 Apollo 这款分布式配置中心的同学,老周这里还是简单给个 Apollo 开发样例演示,希望对你后面的动态调整线程池参数实践有所帮助。

四、动态调整线程池参数实践

我们了解原理以及架构后,那我们开始实践了。

4.1 服务端安装

请看官方文档进行相应的安装:https://ctripcorp.github.io/apollo/#/zh/deployment/quick-start

执行启动脚本后,当看到如下输出后,就说明启动成功了!

在这里插入图片描述
启动成功后访问地址:http://localhost:8070

在这里插入图片描述
默认输入用户名:apollo、密码:admin,进行登录。

在这里插入图片描述
点击 SampleApp,我们看到在 DEV 环境包含一个 timeout 配置项,100 是这个配置项的值,下面我们在应用程序读取这个配置项:

在这里插入图片描述
4.2 应用程序

4.2.1 引入依赖

<dependencies>
    <dependency>
		<groupId>com.ctrip.framework.apollo</groupId>
		<artifactId>apollo-client</artifactId>
		<version>1.7.0</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-simple</artifactId>
        <version>1.7.25</version>
        <scope>compile</scope>
    </dependency>
</dependencies>	

4.2.2 样例测试

/** * 动态获取Apollo配置 * 注意要配置:-Dapp.id=myApp -Denv=DEV -Dapollo.cluster=default -Ddev_meta=http://localhost:8080 * * @author 微信公众号【老周聊架构】 */
public class GetApolloConfigTest { 
   
    public static void main(String[] args) throws InterruptedException { 
   
        Config config = ConfigService.getAppConfig();
        config.addChangeListener(new ConfigChangeListener() { 
   
            @Override
            public void onChange(ConfigChangeEvent changeEvent) { 
   
                System.out.println("Changes for namespace " + changeEvent.getNamespace());
                for (String key : changeEvent.changedKeys()) { 
   
                    ConfigChange change = changeEvent.getChange(key);
                    System.out.println(String.format("Found change - key: %s, oldValue: %s, newValue: %s, changeType: %s", change.getPropertyName(), change.getOldValue(), change.getNewValue(), change.getChangeType()));
                }
            }
        });
        Thread.sleep(1000000L);
    }
}

我们现在把配置项默认的值 100 改为 200 程序输出结果如下:

在这里插入图片描述
控制台会出现以下日志,表明动态获取 Apollo 配置成功了。

在这里插入图片描述

4.3 动态线程池

上面我们把 Apollo 的动态监听修改配置的功能整明白了以后,再把线程池和 Apollo 结合起来构建动态线程池那就方便多了。首先我们用默认值构建一个线程池,然后线程池会监听 Apollo 关于相关配置项,如果相关配置有变化则刷新相关参数。

在这里插入图片描述

代码演示:

/** * 动态线程池工厂 * * @author 微信公众号【老周聊架构】 */
@Slf4j
@Component
public class DynamicThreadPoolFactory { 

/** 这里是你的namespace,我这里是默认的application **/
private static final String NAME_SPACE = "application";
/** 线程执行器 **/
private volatile ThreadPoolExecutor executor;
/** 核心线程数 **/
private Integer corePoolSize = 10;
/** 最大值线程数 **/
private Integer maximumPoolSize = 20;
/** 待执行任务的队列的长度 **/
private Integer workQueueSize = 1000;
/** 线程空闲时间 **/
private Long keepAliveTime = 1000L;
/** 线程名 **/
private String threadName;
public DynamicThreadPoolFactory() { 

Config config = ConfigService.getConfig(NAME_SPACE);
init(config);
listen(config);
}
/** * 初始化 */
private void init(Config config) { 

if (executor == null) { 

synchronized (DynamicThreadPoolFactory.class) { 

if (executor == null) { 

String corePoolSizeProperty = config.getProperty(ParamsEnum.CORE_POOL_SIZE.getParam(), corePoolSize.toString());
String maximumPoolSizeProperty = config.getProperty(ParamsEnum.MAXIMUM_POOL_SIZE.getParam(), maximumPoolSize.toString());
String keepAliveTImeProperty = config.getProperty(ParamsEnum.KEEP_ALIVE_TIME.getParam(), keepAliveTime.toString());
BlockingQueue<Runnable> workQueueProperty = new LinkedBlockingQueue<>(workQueueSize);
executor = new ThreadPoolExecutor(Integer.valueOf(corePoolSizeProperty), Integer.valueOf(maximumPoolSizeProperty),
Long.valueOf(keepAliveTImeProperty), TimeUnit.MILLISECONDS, workQueueProperty);
}
}
}
}
/** * 监听器 */
private void listen(Config config) { 

config.addChangeListener(new ConfigChangeListener() { 

@Override
public void onChange(ConfigChangeEvent changeEvent) { 

log.info("命名空间发生变化={}", changeEvent.getNamespace());
for (String key : changeEvent.changedKeys()) { 

ConfigChange change = changeEvent.getChange(key);
String newValue = change.getNewValue();
refreshThreadPool(key, newValue);
log.info("发生变化key={},oldValue={},newValue={},changeType={}", change.getPropertyName(), change.getOldValue(), change.getNewValue(), change.getChangeType());
}
}
});
}
/** * 刷新线程池 */
private void refreshThreadPool(String key, String newValue) { 

if (executor == null) { 

return;
}
if (ParamsEnum.CORE_POOL_SIZE.getParam().equals(key)) { 

executor.setCorePoolSize(Integer.valueOf(newValue));
log.info("修改核心线程数key={},value={}", key, newValue);
}
if (ParamsEnum.MAXIMUM_POOL_SIZE.getParam().equals(key)) { 

executor.setMaximumPoolSize(Integer.valueOf(newValue));
log.info("修改最大线程数key={},value={}", key, newValue);
}
if (ParamsEnum.KEEP_ALIVE_TIME.getParam().equals(key)) { 

executor.setKeepAliveTime(Integer.valueOf(newValue), TimeUnit.MILLISECONDS);
log.info("修改线程空闲时间key={},value={}", key, newValue);
}
}
public ThreadPoolExecutor getExecutor(String threadName) { 

return executor;
}
}
@AllArgsConstructor
public enum ParamsEnum { 

CORE_POOL_SIZE("corePoolSize", "核心线程数"),
MAXIMUM_POOL_SIZE("maximumPoolSize", "最大线程数"),
KEEP_ALIVE_TIME("keepAliveTime", "线程空闲时间"),
;
@Getter
private String param;
@Getter
private String desc;
}
/** * 动态线程池执行器 * * @author 微信公众号【老周聊架构】 */
@Component
public class DynamicThreadExecutor { 

@Resource
private DynamicThreadPoolFactory threadPoolFactory;
public void execute(String bizName, Runnable job) { 

threadPoolFactory.getExecutor(bizName).execute(job);
}
public Future<?> sumbit(String bizName, Runnable job) { 

return threadPoolFactory.getExecutor(bizName).submit(job);
}
}
@Slf4j
public class DynamicThreadPoolExecutorTest { 

@Resource
private DynamicThreadExecutor dynamicThreadExecutor;
/** * 记得 IDEA VM options 要记得加下面的参数 * -Dapp.id=SampleApp -Denv=DEV -Dapollo.meta=http://localhost:8080 */
@Test
public void testExecute() throws InterruptedException { 

while (true) { 

dynamicThreadExecutor.execute("bizName", new Runnable() { 

@Override
public void run() { 

System.out.println("bizInfo");
}
});
TimeUnit.SECONDS.sleep(1);
}
}
}

这里可以通过 JDK 自带的 JVisualVM 工具可以查看到相应的线程使用情况。

我们在配置中心修改配置项把核心线程数设置为 50,最大线程数设置为 100:

在这里插入图片描述
你会观察到线程数显著上升

这里还可以在代码中通过打印相应的线程状态,更加直观的从日志上观察到核心线程、最大线程数的修改情况。

private static void threadPoolStatus(ThreadPoolExecutor executor, String name) { 

BlockingQueue<Runnable> queue = executor.getQueue();
System.out.println(Thread.currentThread().getName() + "-" + name + "-:" +
"核心线程数:" + executor.getCorePoolSize() +
" 活动线程数:" + executor.getActiveCount() +
" 最大线程数:" + executor.getMaximumPoolSize() +
" 线程池活跃度:" + divide(executor.getActiveCount(), executor.getMaximumPoolSize()) +
" 任务完成数:" + executor.getCompletedTaskCount() +
" 队列大小:" + (queue.size() + queue.remainingCapacity()) +
" 当前排队线程数:" + queue.size() +
" 队列剩余大小:" + queue.remainingCapacity() +
" 队列使用度:" + divide(queue.size(), queue.size() + queue.remainingCapacity()));
}

这样的话就可以实现动态调整线程池参数,这就很好的解决了我们线程池现有的痛点,不至于线上出了问题还得改代码部署那么漫长的修复时间了,动态线程池大大简化了运维以及开发快速修复相关问题的难度。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/135000.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)
blank

相关推荐

  • IP地址的构成_IP地址由两部分组成

    IP地址的构成_IP地址由两部分组成1、什么是IP地址?IP地址是人们在Internet上为了区分数以亿计的主机而给每台主机分配的一个专门的地址,通过IP地址就可以访问到每一台主机。IP地址由4部分数字组成,每部分数字对应于8位二进制数字,各部分之间用小数点分开,如某一台主机的IP地址为:211.152.65.112。2、IP地址管理机构InternetIP地址由NIC(InternetNetworkInformat…

  • 【C 语言】文件操作 ( fopen 文件打开方式详解 )「建议收藏」

    【C 语言】文件操作 ( fopen 文件打开方式详解 )「建议收藏」r:只读方式打开文件,文件必须存在;文件不存在打开失败;+:读写方式打开文件;w:打开只写文件,文件不存在创建文件,文件存在覆盖文件;a:打开只写文件,文件不存在创建文件,文件存在追加文件;

  • MySQL常见面试题_web面试题

    MySQL常见面试题_web面试题一、存储引擎MySQL常见的两种存储引擎:MyISAM与InnoDB二、字符集及校对规则字符集指的是一种从二进制编码到某类字符符号的映射。校对规则则是指某种字符集下的排序规则。Mysql中每一种字符集都会对应一系列的校对规则。Mysql采用的是类似继承的方式指定字符集的默认值,每个数据库以及每张数据表都有自己的默认值,他们逐层继承。比如:某个库中所有表的默认字符集将是该数据库所指定…

  • Python中psutil模块应用

    Python中psutil模块应用psutil(Pythonsystemandprocessutilities)是python的系统监控及进程的管理的工具,是一个功能很强大的跨平台的系统管理库。官方文档(https://pythonhosted.org/psutil/)psutil是一个第三方的开源项目,因此,需要先安装才能够使用。pip3installpsutil获取psutil版本信息importpsu…

  • python保留小数位的两种方法总结[通俗易懂]

    python保留小数位的两种方法总结[通俗易懂]题目背景:方法一:format函数format有不同用法,代码如下,前者使用了占位符{},使用占位符可以同时输出多个,后者一次只能输出一个importmathres=math.sqrt(a**2+b**2)#使用占位符print(‘{:.3f}’.format(res))#可以同时输出多个print(‘{:.3f}\n{:.2f}’.format(res,res))#不使用占位符只能输出一个print(format(res,’.3f’))运行结果:方法二:’%

  • 网页下载文件错误_python安装报错

    网页下载文件错误_python安装报错如图,使用webdriver的过程中出现如下提示,代码正常,下载地址正常,在正常浏览器中也可以成功下载文件但是模拟浏览器却无法成功获取文件;尝试了开发模式启动、禁用或启用js等等,都没有成功,快要放弃chrome准备改选firefox的时候,看到了一个解决方法:此方法只针对一种情况有效:如果你在下载路径前加了r,转义了原始字符串,如下那么,去掉“r”试一下成功了如有问题请留言…

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号