大家好,又见面了,我是你们的朋友全栈君。
一、dubbo版本说明
基于dubbo版本2.6.2讲解
二、负载均衡的接口关系
@SPI(RandomLoadBalance.NAME)
public interface LoadBalance {
@Adaptive("loadbalance")
<T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;
}
1.根据@SPI(RandomLoadBalance.NAME)注解可以知道dubbo的默认负载均衡策略是RandomLoadBalance。
2.根据@Adaptive(“loadbalance”)注解我们可以在URL中通过loadbalance=xxx配置来动态指定select方法的负载均衡算法。
名称解释:
@SPI:扩展点注解
@Adaptive: 扩展点自适应注解
URL(元数据) 格式 :dubbo://ip:port/service.DemoService?anyhost=true&application=srcAnalysisClient&check=false&dubbo=2.8.4&generic=false&interface=service.DemoService&loadbalance=consistenthash&methods=sayHello,retMap&pid=14648&sayHello.timeout=20000&side=consumer×tamp=1493522325563
public abstract class AbstractLoadBalance implements LoadBalance {
// 计算权重
static int calculateWarmupWeight(int uptime, int warmup, int weight) {
// 随着服务的启动时间越来越长,慢慢提升权重,直到weight
int ww = (int) ((float) uptime / ((float) warmup / (float) weight));
return ww < 1 ? 1 : (ww > weight ? weight : ww);
}
@Override
public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// 为空校验
if (invokers == null || invokers.isEmpty())
return null;
// 如果只有一个可用服务直接返回该服务
if (invokers.size() == 1)
return invokers.get(0);
// 调用具体的负载均衡算法
return doSelect(invokers, url, invocation);
}
// 需要子类实现 (设计模式:模板模式)
protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation);
// 获取当前服务的权重
protected int getWeight(Invoker<?> invoker, Invocation invocation) {
// 获取服务配置的权重值,默认值为100
int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);
if (weight > 0) {
// 获取服务的启动时间
long timestamp = invoker.getUrl().getParameter(Constants.REMOTE_TIMESTAMP_KEY, 0L);
if (timestamp > 0L) {
// 求差值,得到预热时间(运行了多久)
int uptime = (int) (System.currentTimeMillis() - timestamp);
// 获取配置的总预热时间,默认值10分钟
int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP);
// 服务运行的时间少于预热时间,那么需要重新计算权重weight(即需要降权)
if (uptime > 0 && uptime < warmup) {
weight = calculateWarmupWeight(uptime, warmup, weight);
}
}
}
return weight;
}
}
三、dubbo内置的负载均衡策略
(1)RandomLoadBalance:加权随机负载均衡(dubbo默认的策略)
(2) RoundRobinLoadBalance:加权轮询负载均衡
(3) LeastActiveLoadBalance:最少活跃调用数负载均衡
(4) ConsistentHashLoadBalance:一致性哈希负载均衡
1.加权随机负载均衡(RandomLoadBalance)
Dubbo的加权随机负载均衡是按照权重设置为Provider分配随机概率。比如,有10个 Provider,并不是说,每个 Provider 的概率都是一样的,而是要结合这10个 Provider 的权重来分配概率。
Provider的权重可以在Dubbo Admin平台中设置,默认权重值是100。
调用量越大分布越均匀,而且按概率使用权重后也比较均匀,有利于动态调整提供者权重。
源码:
public class RandomLoadBalance extends AbstractLoadBalance {
public static final String NAME = "random";
private final Random random = new Random();
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
int length = invokers.size(); // 可用服务数量
int totalWeight = 0; // 所有服务权重总和
boolean sameWeight = true; // 所有服务的权重是否都一样
// 遍历所有服务
for (int i = 0; i < length; i++) {
// 计算权重
int weight = getWeight(invokers.get(i), invocation);
// 权重累加
totalWeight += weight;
// 比较每个服务的权重值是否一样
if (sameWeight && i > 0 && weight != getWeight(invokers.get(i - 1), invocation)) {
sameWeight = false;
}
}
// 如果不是所有的服务权重都相同,那么基于权重来随机选择。权重越大的,被选中的概率越大。
if (totalWeight > 0 && !sameWeight) {
// 从[0,totalWeight)范围中随机获取一个位移
int offset = random.nextInt(totalWeight);
// 遍历所有服务
for (int i = 0; i < length; i++) {
// 累减权重
offset -= getWeight(invokers.get(i), invocation);
// 如果位移小于0,则选中服务
if (offset < 0) {
return invokers.get(i);
}
}
}
// 如果所有服务的权重都相同或者权重总和小于等于零,直接random.nextInt随机从invokers中选择一个
return invokers.get(random.nextInt(length));
}
}
举例:
Provider |
Weight |
i |
A |
10 |
0 |
B |
20 |
1 |
C |
30 |
2 |
D |
40 |
3 |
totalWeight = 100
| | | | |
+-------------------------------50---------------------------------+ totalWeight
1 10 30 60 100
|---A----|----B-----|--------c----------|-------------D------------| 概率区间
假如 offset = random.nextInt(100)为50
第一次遍历i=0, offset = 50-10 = 40
第二次遍历i=1, offset = 40-20 = 20
第三次遍历i=2, offset = 20-30 = -10,小于零则选中服务C
2.加权轮询负载均衡(RoundRobinLoadBalance)
根据设置的权重判断轮询的比例。
存在慢的提供者累积请求问题,比如:第二台机器很慢,但没挂,当请求调到第二台时就卡在那,久而久之,所有请求都卡在调到第二台上。解决办法 :结合权重,把第二台机(性能低的)的权重设置低一点。
源码:
public class RoundRobinLoadBalance extends AbstractLoadBalance {
public static final String NAME = "roundrobin";
// key = interface + methodname,value = 轮询序号
private final ConcurrentMap<String, AtomicPositiveInteger> sequences = new ConcurrentHashMap<String, AtomicPositiveInteger>();
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// key = interface + methodname
String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
int length = invokers.size(); // 可用服务数量
int maxWeight = 0; // 最终保存了服务中最大的权重值
int minWeight = Integer.MAX_VALUE; // 最终保存了服务中最小的权重值
// key为服务,value为服务的权重计数器
final LinkedHashMap<Invoker<T>, IntegerWrapper> invokerToWeightMap = new LinkedHashMap<Invoker<T>, IntegerWrapper>();
int weightSum = 0;// 所有服务权重总和
// 遍历所有服务
for (int i = 0; i < length; i++) {
// 计算权重
int weight = getWeight(invokers.get(i), invocation);
// 选则最大的权重值并赋值给maxWeight
maxWeight = Math.max(maxWeight, weight);
// 选择最小的权重值并赋值给minWeight
minWeight = Math.min(minWeight, weight);
if (weight > 0) {
// 存储服务及其权重值
invokerToWeightMap.put(invokers.get(i), new IntegerWrapper(weight));
// 权重累加
weightSum += weight;
}
}
// 获取当前的轮询序号
AtomicPositiveInteger sequence = sequences.get(key);
if (sequence == null) {
sequences.putIfAbsent(key, new AtomicPositiveInteger());
sequence = sequences.get(key);
}
// 获取当前的轮询序号,用于取模。轮询号自增。
int currentSequence = sequence.getAndIncrement();
// 服务提供者之间的权重有差别,需要按权重轮询
if (maxWeight > 0 && minWeight < maxWeight) {
// 当前轮询序号 与 服务提供者权重总和 取模
int mod = currentSequence % weightSum;
// 从0循环直到最大权重
for (int i = 0; i < maxWeight; i++) {
// 遍历服务
for (Map.Entry<Invoker<T>, IntegerWrapper> each : invokerToWeightMap.entrySet()) {
// 服务提供者
final Invoker<T> k = each.getKey();
// 服务的权重值
final IntegerWrapper v = each.getValue();
// mod为0并且对应的服务提供者的权重计算器大于0
if (mod == 0 && v.getValue() > 0) {
// 返回服务提供者
return k;
}
// mode不等于0,服务提供者的权重计数器大于0
if (v.getValue() > 0) {
// 服务提供者的权重计数器减1
v.decrement();
// mod减1
mod--;
}
}
}
}
// 如果各服务提供者权重都相同,则直接对服务提供者取模。
return invokers.get(currentSequence % length);
}
// Integer包装类,主要包含了一个自减方法,用于服务提供者的权重计数
private static final class IntegerWrapper {
private int value;
public IntegerWrapper(int value) {
this.value = value;
}
public int getValue() {
return value;
}
public void setValue(int value) {
this.value = value;
}
public void decrement() {
this.value--;
}
}
}
问题:以上版本代码在mod == 0 && v.getValue() > 0 条件成立时才会被返回相应的 Invoker。假如 mod 很大,比如 10000,50000,甚至更大时,doSelect 方法需要进行很多次计算才能将 mod 减为0。由此可知,doSelect 的效率与 mod 有关,时间复杂度为 O(mod)。mod 又受最大权重 maxWeight 的影响,因此当某个服务提供者配置了非常大的权重,此时 RoundRobinLoadBalance 会产生比较严重的性能问题。该问题最初是在issue #2578中被反馈出来。
参考:【dubbo】负载均衡 RoundRobinLoadBalance Dubbo-2.6.5 提供的最新算法!线程安全性有什么影响?_qfzhangwei的专栏-CSDN博客_dubbo 线程安全
3.最少活跃调用数负载均衡(LeastActiveLoadBalance)
最少活跃调用数,如果活跃数相同则随机调用。
活跃数指调用前后计数差。使慢的提供者收到更少的请求,因为越慢的提供者的调用前后计数差会越大。举个例子:每个服务维护一个活跃数计数器。当A机器开始处理请求,该计数器加1,此时A还未处理完成。若处理完毕则计数器减1。而B机器接受到请求后很快处理完毕。那么A,B的活跃数分别是1,0。当又产生了一个新的请求,则选择B机器去执行(B活跃数最小),这样使慢的机器A收到少的请求。
Dubbo中使用ActiveLimitFilter过滤器来计算每个接口方法的活跃数。
源码:
public class LeastActiveLoadBalance extends AbstractLoadBalance {
public static final String NAME = "leastactive";
private final Random random = new Random();
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
int length = invokers.size(); // 可用服务数量
int leastActive = -1; // 最少活跃数的初始值是-1
int leastCount = 0; // 用来记录具有相同最少活跃数(leastActive)的服务提供者的数量
int[] leastIndexs = new int[length]; // 用来记录具有相同最少活跃数(leastActive)的服务提供者的索引
int totalWeight = 0; // 所有服务权重总和
int firstWeight = 0; //初始值 用于比较
boolean sameWeight = true; // 所有服务提供者的权重是否都相同
// 遍历所有服务提供者
for (int i = 0; i < length; i++) {
// 服务提供者
Invoker<T> invoker = invokers.get(i);
// 获取当前这个提供者的活跃数
int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive();
// 获取当前服务配置的权重值 (后面版本已改为用getWeight获取权限)
int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);
// 第一个服务 或者 当前提供者活跃数小于最少活跃数
if (leastActive == -1 || active < leastActive) {
// 记录当前最少活跃数
leastActive = active;
// 重置计数
leastCount = 1;
// 在leastIndexs的0位置记录当前提供者(最少活跃数的提供者)的索引
leastIndexs[0] = i;
// 总权重就是当前服务提供者的权重
totalWeight = weight;
// 将当前提供者的权重赋值给firstWeight
firstWeight = weight;
// 重置权重是否相同标识为true
sameWeight = true;
} else if (active == leastActive) { // 当前提供者的活跃数等于最少活跃数
// 记录当前提供者的索引
leastIndexs[leastCount++] = i;
// 权重累加
totalWeight += weight;
// 所有提供者的权重是否相同
if (sameWeight && i > 0 && weight != firstWeight) {
sameWeight = false;
}
}
}
// 如果恰好有一个提供者具有最少活跃数,那么直接返回这个提供者
if (leastCount == 1) {
// 从leastIndexs的0位置获取提供者在invokers中的索引
return invokers.get(leastIndexs[0]);
}
// 活跃数相同
// 如果每个提供者有不同的权重 并且 总权重大于0
if (!sameWeight && totalWeight > 0) {
// 从[0,totalWeight)范围中随机获取一个位移
int offsetWeight = random.nextInt(totalWeight);
// 遍历具有相同活跃数的提供者
for (int i = 0; i < leastCount; i++) {
int leastIndex = leastIndexs[i];
// 累减权重
offsetWeight -= getWeight(invokers.get(leastIndex), invocation);
// 如果位移小于等于0,则选中服务
if (offsetWeight <= 0)
return invokers.get(leastIndex);
}
}
// 所有提供者权重相同或者总权重等于0,直接random.nextInt随机从invokers中选择一个
return invokers.get(leastIndexs[random.nextInt(leastCount)]);
}
}
4.一致性哈希负载均衡(ConsistentHashLoadBalance)
一致性哈希负载均衡可以让参数相同的请求每次都路由到相同的服务提供者上。这种负载均衡的方式可以让请求相对平均。当某一服务提供者“挂”了,原本发往该提供者的请求,基于虚拟节点,会平摊到其他提供者,不会引起剧烈的变动。
Dubbo框架使用了优化过的Ketama一致性Hash.这种算法为每个真实的节点再创建多个虚拟节点,让节点在环形(通过TreeMap实现)上的分布更加均匀,后续的调用也会随之更加均匀。
源码:
public class ConsistentHashLoadBalance extends AbstractLoadBalance {
private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<?>>();
@SuppressWarnings("unchecked")
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// key = 接口名 + 方法名
String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
// 把所有可以调用的提供者列表进行Hash
int identityHashCode = System.identityHashCode(invokers);
// 查询一致性hash选择器
ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
// 若不存在选择器 或者 当前服务提供者列表的hashCode和之前的不相等,说明服务提供者列表发生了变化,则重新创建选择器
if (selector == null || selector.identityHashCode != identityHashCode) {
// 创建一致性hash选择器
selectors.put(key, new ConsistentHashSelector<T>(invokers, invocation.getMethodName(), identityHashCode));
// 重新查询一致性hash选择器
selector = (ConsistentHashSelector<T>) selectors.get(key);
}
// 从选择器中返回服务提供者
return selector.select(invocation);
}
// 一致性hash选择器
private static final class ConsistentHashSelector<T> {
// 存储虚拟节点
private final TreeMap<Long, Invoker<T>> virtualInvokers;
// 虚拟节点个数
private final int replicaNumber;
// 服务提供者列表hashCode
private final int identityHashCode;
// 参数位置数组
private final int[] argumentIndex;
ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
// 创建TreeMap来保存虚拟提供者节点。TreeMap是按照key的值从小到大排序的。
this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
this.identityHashCode = identityHashCode;
// 获取服务提供者的Url元数据
URL url = invokers.get(0).getUrl();
// 获取所配置的虚拟结点数,缺省值160个。如果要修改,请配置<dubbo:parameter key="hash.nodes" value="320" />
this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160);
// 获取需要hash的参数位置,缺省是0,0指的是方法里的第一个参数。如果要修改,请配置<dubbo:parameter key="hash.arguments" value="0,1" />
String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0"));
// 创建参数位置数组
argumentIndex = new int[index.length];
for (int i = 0; i < index.length; i++) {
argumentIndex[i] = Integer.parseInt(index[i]);
}
// 对每个提供者生成replicaNumber个虚拟结点,并存放于TreeMap中
for (Invoker<T> invoker : invokers) {
// 服务提供者的 address = ip:port
String address = invoker.getUrl().getAddress();
for (int i = 0; i < replicaNumber / 4; i++) {
// address+i 做摘要
byte[] digest = md5(address + i);
for (int h = 0; h < 4; h++) {
// 计算位置
long m = hash(digest, h);
// 将提供者放入TreeMap
virtualInvokers.put(m, invoker);
}
}
}
}
// 选择服务提供者
public Invoker<T> select(Invocation invocation) {
// 根据请求参数来生成Key
String key = toKey(invocation.getArguments());
// 根据这个参数生成消息摘要
byte[] digest = md5(key);
// 调用hash(digest, 0),将消息摘要转换为hashCode,这里仅取0-31位来生成HashCode
// 调用sekectForKey方法选择结点
return selectForKey(hash(digest, 0));
}
private String toKey(Object[] args) {
StringBuilder buf = new StringBuilder();
// 根据需要hash的参数的位置从args数组中取得参数并进行拼接
for (int i : argumentIndex) {
if (i >= 0 && i < args.length) {
buf.append(args[i]);
}
}
return buf.toString();
}
private Invoker<T> selectForKey(long hash) {
// 以下部分形成了环形查询
// tailMap方法其实就是是返回键值大于或等于key的那部分,再使用firstEntry方法获取这部分的第一个
Map.Entry<Long, Invoker<T>> entry = virtualInvokers.tailMap(hash, true).firstEntry();
// 如果entry为null说明这个hash值就是最大的了,要想找对应的服务提供者,就要找TreeMap的第一个元素
if (entry == null) {
entry = virtualInvokers.firstEntry();
}
// 返回服务提供者
return entry.getValue();
}
private long hash(byte[] digest, int number) {
return (((long) (digest[3 + number * 4] & 0xFF) << 24)
| ((long) (digest[2 + number * 4] & 0xFF) << 16)
| ((long) (digest[1 + number * 4] & 0xFF) << 8)
| (digest[number * 4] & 0xFF))
& 0xFFFFFFFFL;
}
private byte[] md5(String value) {
MessageDigest md5;
try {
md5 = MessageDigest.getInstance("MD5");
} catch (NoSuchAlgorithmException e) {
throw new IllegalStateException(e.getMessage(), e);
}
md5.reset();
byte[] bytes;
try {
bytes = value.getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
throw new IllegalStateException(e.getMessage(), e);
}
md5.update(bytes);
return md5.digest();
}
}
}
参考资料:
【dubbo】负载均衡 RoundRobinLoadBalance Dubbo-2.6.5 提供的最新算法!线程安全性有什么影响?_qfzhangwei的专栏-CSDN博客_dubbo 线程安全
深度解析dubbo负载均衡之LeastActiveLoadBalance_猿上生活-CSDN博客
Dubbo源码学习–ConsistentHashLoadBalance负载均衡(五)_井底之蛙-CSDN博客
发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/157325.html原文链接:https://javaforall.cn
【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛
【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...