大家好,又见面了,我是你们的朋友全栈君。
今天我来学习一下Dubbo负载均衡之一的最小活跃策略-LeastActiveLoadBalance
首先,让我们对负载均衡做一个简单的介绍。所谓集负载均衡,其含义就是指将负载(工作任务)进行平衡、分摊到多个操作单元上进行运行。负载均衡、集群容错、服务降级这三个概念在微服务中非常重要。从调用顺序来看,一次完整的RPC调用首先是负载均衡、其次是集群容错、最后是服务降级:负载均衡解决了选哪一个的问题、集群容错解决了换哪一个的问题、而服务降级则是解决了全错了怎么办的问题
今天我们要学习的策略是最小活跃策略-LeastActiveLoadBalance。顾名思义,这种策略就是寻找调用活跃度最低的服务提供者。
话不多说,直接分析源码
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// Number of invokers
int length = invokers.size();
// The least active value of all invokers
int leastActive = -1;
// The number of invokers having the same least active value (leastActive)
int leastCount = 0;
// The index of invokers having the same least active value (leastActive)
int[] leastIndexes = new int[length];
// the weight of every invokers
int[] weights = new int[length];
// The sum of the warmup weights of all the least active invokers
int totalWeight = 0;
// The weight of the first least active invoker
int firstWeight = 0;
// Every least active invoker has the same weight value?
boolean sameWeight = true;
// Filter out all the least active invokers
for (int i = 0; i < length; i++) {
Invoker<T> invoker = invokers.get(i);
// Get the active number of the invoker
int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive();
// Get the weight of the invoker's configuration. The default value is 100.
int afterWarmup = getWeight(invoker, invocation);
// save for later use
weights[i] = afterWarmup;
// If it is the first invoker or the active number of the invoker is less than the current least active number
if (leastActive == -1 || active < leastActive) {
// Reset the active number of the current invoker to the least active number
leastActive = active;
// Reset the number of least active invokers
leastCount = 1;
// Put the first least active invoker first in leastIndexes
leastIndexes[0] = i;
// Reset totalWeight
totalWeight = afterWarmup;
// Record the weight the first least active invoker
firstWeight = afterWarmup;
// Each invoke has the same weight (only one invoker here)
sameWeight = true;
// If current invoker's active value equals with leaseActive, then accumulating.
} else if (active == leastActive) {
// Record the index of the least active invoker in leastIndexes order
leastIndexes[leastCount++] = i;
// Accumulate the total weight of the least active invoker
totalWeight += afterWarmup;
// If every invoker has the same weight?
if (sameWeight && afterWarmup != firstWeight) {
sameWeight = false;
}
}
}
// Choose an invoker from all the least active invokers
if (leastCount == 1) {
// If we got exactly one invoker having the least active value, return this invoker directly.
return invokers.get(leastIndexes[0]);
}
if (!sameWeight && totalWeight > 0) {
// If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on
// totalWeight.
int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight);
// Return a invoker based on the random value.
for (int i = 0; i < leastCount; i++) {
int leastIndex = leastIndexes[i];
offsetWeight -= weights[leastIndex];
if (offsetWeight < 0) {
return invokers.get(leastIndex);
}
}
}
// If all invokers have the same weight value or totalWeight=0, return evenly.
return invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]);
}
通过代码我们可以看出, 作者通过
RpcStatus.getStatus(invoker.getUrl(),invocation.getMethodName()).getActive()获取了每个提供者的活跃度,通过 getWeight(invoker, invocation) 获取了每个提供者的权重;最后再返回最低活跃提供者
如果只有一个最低活跃提供者,直接返回
if (leastCount == 1) {
// 如果只存在一个最低活跃的提供者 直接返回
return invokers.get(leastIndexes[0]);
}
如果存在多个、但是不是所有提供者的权重都是一样的:先生成一个随机数(0-最小总权重),循环用这个随机数-最小活跃度提供者们的权重 当减到小于0时 返回此时的最小活跃提供者
if (!sameWeight && totalWeight > 0) {
// 生成一个0-最小活跃度提供者总权重之间的整数
int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight);
// 循环用这个随机数-最小活跃度提供者们的权重 当减到小于0时 返回此时的最小活跃提供者
for (int i = 0; i < leastCount; i++) {
int leastIndex = leastIndexes[i];
offsetWeight -= weights[leastIndex];
if (offsetWeight < 0) {
return invokers.get(leastIndex);
}
}
}
当所有最小活跃提供者权重都一样:随机返回其中一个
invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]);
通过上面的代码我们可以看出这个最小活跃的大致逻辑,那么每个提供者的活跃度又是怎么算的呢?
public static RpcStatus getStatus(URL url, String methodName) {
String uri = url.toIdentityString();
ConcurrentMap<String, RpcStatus> map = METHOD_STATISTICS.computeIfAbsent(uri, k -> new ConcurrentHashMap<>());
return map.computeIfAbsent(methodName, k -> new RpcStatus());
}
是通过url和methodName来查出来的活跃度,url我们可以理解为一个提供者的ip,methodName的dubbo接口的方法,可见该活跃度是方法级别的活跃度而非接口级别!此处是取活跃度的地方,那么什么地方又是增加活跃度的呢?请接着往下看
public static boolean beginCount(URL url, String methodName, int max) {
max = (max <= 0) ? Integer.MAX_VALUE : max;
RpcStatus appStatus = getStatus(url);
RpcStatus methodStatus = getStatus(url, methodName);
if (methodStatus.active.get() == Integer.MAX_VALUE) {
return false;
}
for (int i; ; ) {
i = methodStatus.active.get();
if (i == Integer.MAX_VALUE || i + 1 > max) {
return false;
}
if (methodStatus.active.compareAndSet(i, i + 1)) {
break;
}
}
appStatus.active.incrementAndGet();
return true;
}
在RpcStatus中有一个beginCount的方法就是用来计数的,而调用这个方法的ActiveLimitFilter的dubbo的filter,这说明每一次发生rpc调用的时都会执行beginCount方法。
由此,我们可以清晰地看出最小活跃策略就是取目标rpc接口方法调用次数最少的一个提供者来作为本次rpc调用的提供者。但是这种最小活跃只是针对当前消费者,而不能针对整个集群,所以该最小活跃策略仍然不能将资源充分运用起来。
发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/157494.html原文链接:https://javaforall.cn
【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛
【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...