官方目前建议使用的负载均衡包括以下几种:
- random(随机算法)
- localPref(本地优先算法)
- roundRobin(轮询算法)
- consistentHash(一致性 hash 算法)
所以我们接下来分析以下以上四种负载均衡的源码是怎样的.
随机算法
我们先看一下 SOFARPC 的源码实现:
- @Override
- public ProviderInfo doSelect(SofaRequest invocation, List<ProviderInfo> providerInfos) {
- ProviderInfo providerInfo = null;
- int size = providerInfos.size(); // 总个数
- int totalWeight = 0; // 总权重
- boolean isWeightSame = true; // 权重是否都一样
- for (int i = 0; i <size; i++) {
- int weight = getWeight(providerInfos.get(i));
- totalWeight += weight; // 累计总权重
- if (isWeightSame && i> 0 && weight != getWeight(providerInfos.get(i - 1))) {
- isWeightSame = false; // 计算所有权重是否一样
- }
- }
- if (totalWeight> 0 && !isWeightSame) {
- // 如果权重不相同且权重大于 0 则按总权重数随机
- int offset = random.nextInt(totalWeight);
- // 并确定随机值落在哪个片断上
- for (int i = 0; i <size; i++) {
- offset -= getWeight(providerInfos.get(i));
- if (offset < 0) {
- providerInfo = providerInfos.get(i);
- break;
- }
- }
- } else {
- // 如果权重相同或权重为 0 则均等随机
- providerInfo = providerInfos.get(random.nextInt(size));
- }
- return providerInfo;
- }
上面主要做了几件事:
获取所有的 provider
遍历 provier, 如果当前的 provider 的权重和上一个 provider 的权重不一样, 那么就做个标记
如果权重不相同那么就随机取一个 0 到总权重之间的值, 遍历 provider 去减随机数, 如果减到小于 0, 那么就返回那个 provider
如果没有权重相同, 那么用随机函数取一个 provider
我们再来看看 dubbo 是怎么实现的:
- @Override
- protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
- int length = invokers.size(); // Number of invokers
- boolean sameWeight = true; // Every invoker has the same weight?
- int firstWeight = getWeight(invokers.get(0), invocation);
- int totalWeight = firstWeight; // The sum of weights
- for (int i = 1; i <length; i++) {
- int weight = getWeight(invokers.get(i), invocation);
- totalWeight += weight; // Sum
- if (sameWeight && weight != firstWeight) {
- sameWeight = false;
- }
- }
- if (totalWeight> 0 && !sameWeight) {
- // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
- int offset = ThreadLocalRandom.current().nextInt(totalWeight);
- // Return a invoker based on the random value.
- for (int i = 0; i <length; i++) {
- offset -= getWeight(invokers.get(i), invocation);
- if (offset < 0) {
- return invokers.get(i);
- }
- }
- }
- // If all invokers have the same weight value or totalWeight=0, return evenly.
- return invokers.get(ThreadLocalRandom.current().nextInt(length));
- }
获取 invoker 的数量
获取第一个 invoker 的权重, 并复制给 firstWeight
循环 invoker 集合, 把它们的权重全部相加, 并复制给 totalWeight, 如果权重不相等, 那么 sameWeight 为 false
如果 invoker 集合的权重并不是全部相等的, 那么获取一个随机数在 1 到 totalWeight 之间, 赋值给 offset 属性
循环遍历 invoker 集合, 获取权重并与 offset 相减, 当 offset 减到小于零, 那么就返回这个 inovker
如果权重相等, 那么直接在 invoker 集合里面取一个随机数返回
从上面我们可以看到, 基本上 SOFARPC 和 dubbo 的负载均衡实现是一致的.
本地优先算法
在负载均衡时使用保持本机优先. 这个相信大家也比较好理解. 在所有的可选地址中, 找到本机发布的地址, 然后进行调用.
- @Override
- public ProviderInfo doSelect(SofaRequest invocation, List<ProviderInfo> providerInfos) {
- String localhost = SystemInfo.getLocalHost();
- if (StringUtils.isEmpty(localhost)) {
- return super.doSelect(invocation, providerInfos);
- }
- List<ProviderInfo> localProviderInfo = new ArrayList<ProviderInfo>();
- for (ProviderInfo providerInfo : providerInfos) { // 解析 IP, 看是否和本地一致
- if (localhost.equals(providerInfo.getHost())) {
- localProviderInfo.add(providerInfo);
- }
- }
- if (CommonUtils.isNotEmpty(localProviderInfo)) { // 命中本机的服务端
- return super.doSelect(invocation, localProviderInfo);
- } else { // 没有命中本机上的服务端
- return super.doSelect(invocation, providerInfos);
- }
- }
查看本机的 host, 如果为空, 那么直接调用父类随机算法
遍历所有的 provider, 如果服务提供方的 host 和服务调用方的 host 一致, 那么保存到集合里
如果存在服务提供方的 host 和服务调用方的 host 一致, 那么就在这些集合中选取
如果不一致, 那么就在所有 provider 中选取
轮询算法
我们首先来看看 SOFARPC 的轮训是怎么实现的:
- private final ConcurrentMap<String, PositiveAtomicCounter> sequences = new ConcurrentHashMap<String, PositiveAtomicCounter>();
- @Override
- public ProviderInfo doSelect(SofaRequest request, List<ProviderInfo> providerInfos) {
- String key = getServiceKey(request); // 每个方法级自己轮询, 互不影响
- int length = providerInfos.size(); // 总个数
- PositiveAtomicCounter sequence = sequences.get(key);
- if (sequence == null) {
- sequences.putIfAbsent(key, new PositiveAtomicCounter());
- sequence = sequences.get(key);
- }
- return providerInfos.get(sequence.getAndIncrement() % length);
- }
- private String getServiceKey(SofaRequest request) {
- StringBuilder builder = new StringBuilder();
- builder.append(request.getTargetAppName()).append("#")
- .append(request.getMethodName());
- return builder.toString();
- }
从上面的代码我们可以看出, SOFARPC 的轮询做的很直接简单. 就是 new 了一个 map, 然后把每个服务的服务名拼上方法名存到 map 里面, 然后每次 value 值自增 1 对 provider 取模.
我们再看 dubbo 的实现方式:
- protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
- String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
- ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key);
- if (map == null) {
- methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<String, WeightedRoundRobin>());
- map = methodWeightMap.get(key);
- }
- int totalWeight = 0;
- long maxCurrent = Long.MIN_VALUE;
- long now = System.currentTimeMillis();
- Invoker<T> selectedInvoker = null;
- WeightedRoundRobin selectedWRR = null;
- for (Invoker<T> invoker : invokers) {
- String identifyString = invoker.getUrl().toIdentityString();
- WeightedRoundRobin weightedRoundRobin = map.get(identifyString);
- int weight = getWeight(invoker, invocation);
- if (weight <0) {
- weight = 0;
- }
- if (weightedRoundRobin == null) {
- weightedRoundRobin = new WeightedRoundRobin();
- weightedRoundRobin.setWeight(weight);
- map.putIfAbsent(identifyString, weightedRoundRobin);
- weightedRoundRobin = map.get(identifyString);
- }
- if (weight != weightedRoundRobin.getWeight()) {
- //weight changed
- weightedRoundRobin.setWeight(weight);
- }
- long cur = weightedRoundRobin.increaseCurrent();
- weightedRoundRobin.setLastUpdate(now);
- if (cur> maxCurrent) {
- maxCurrent = cur;
- selectedInvoker = invoker;
- selectedWRR = weightedRoundRobin;
- }
- totalWeight += weight;
- }
- if (!updateLock.get() && invokers.size() != map.size()) {
- if (updateLock.compareAndSet(false, true)) {
- try {
- // copy -> modify -> update reference
- ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<String, WeightedRoundRobin>();
- newMap.putAll(map);
- Iterator<Entry<String, WeightedRoundRobin>> it = newMap.entrySet().iterator();
- while (it.hasNext()) {
- Entry<String, WeightedRoundRobin> item = it.next();
- if (now - item.getValue().getLastUpdate()> RECYCLE_PERIOD) {
- it.remove();
- }
- }
- methodWeightMap.put(key, newMap);
- } finally {
- updateLock.set(false);
- }
- }
- }
- if (selectedInvoker != null) {
- selectedWRR.sel(totalWeight);
- return selectedInvoker;
- }
- // should not happen here
- return invokers.get(0);
- }
dubbo 的轮询的实现里面还加入了权重在里面, sofarpc 的权重轮询是放到另外一个类当中去做的, 因为性能太差了而被弃用了.
我们举个例子来简单看一下 dubbo 的加权轮询是怎么做的:
假定有 3 台 dubbo provider:
- 10.0.0.1:20884, weight=2
- 10.0.0.1:20886, weight=3
- 10.0.0.1:20888, weight=4
- totalWeight=9;
那么第一次调用的时候:
- 10.0.0.1:20884, weight=2 selectedWRR -> current = 2
- 10.0.0.1:20886, weight=3 selectedWRR -> current = 3
- 10.0.0.1:20888, weight=4 selectedWRR -> current = 4
- selectedInvoker-> 10.0.0.1:20888
调用 selectedWRR.sel(totalWeight);
10.0.0.1:20888, weight=4 selectedWRR -> current = -5
返回 10.0.0.1:20888 这个实例
那么第二次调用的时候:
- 10.0.0.1:20884, weight=2 selectedWRR -> current = 4
- 10.0.0.1:20886, weight=3 selectedWRR -> current = 6
- 10.0.0.1:20888, weight=4 selectedWRR -> current = -1
- selectedInvoker-> 10.0.0.1:20886
调用 selectedWRR.sel(totalWeight);
10.0.0.1:20886 , weight=4 selectedWRR -> current = -3
返回 10.0.0.1:20886 这个实例
那么第三次调用的时候:
- 10.0.0.1:20884, weight=2 selectedWRR -> current = 6
- 10.0.0.1:20886, weight=3 selectedWRR -> current = 0
- 10.0.0.1:20888, weight=4 selectedWRR -> current = 3
- selectedInvoker-> 10.0.0.1:20884
调用 selectedWRR.sel(totalWeight);
10.0.0.1:20884, weight=2 selectedWRR -> current = -3
返回 10.0.0.1:20884 这个实例
一致性 hash 算法
在 SOFARPC 中有两种方式实现一致性 hash 算法, 一种是带权重的一种是不带权重的, 我对比了一下, 两边的代码基本上是一样的, 所以我直接分析带权重的代码就好了.
下面我们来分析一下代码:
- private final ConcurrentHashMap<String, Selector> selectorCache = new ConcurrentHashMap<String, Selector>();
- @Override
- public ProviderInfo doSelect(SofaRequest request, List<ProviderInfo> providerInfos) {
- String interfaceId = request.getInterfaceName();
- String method = request.getMethodName();
- String key = interfaceId + "#" + method;
- // 判断是否同样的服务列表
- int hashcode = providerInfos.hashCode();
- Selector selector = selectorCache.get(key);
- // 原来没有
- if (selector == null ||
- // 或者服务列表已经变化
- selector.getHashCode() != hashcode) {
- selector = new Selector(interfaceId, method, providerInfos, hashcode);
- selectorCache.put(key, selector);
- }
- return selector.select(request);
- }
上面的 doSelect 方法就是获取到相同服务的 Selector, 如果没有就新建一个. Selector 是 WeightConsistentHashLoadBalancer 里面的内部类, 我们接下来看看这个内部类的实现.
- public Selector(String interfaceId, String method, List<ProviderInfo> actualNodes, int hashcode) {
- this.interfaceId = interfaceId;
- this.method = method;
- this.hashcode = hashcode;
- // 创建虚拟节点环 (provider 创建虚拟节点数 = 真实节点权重 * 32)
- this.virtualNodes = new TreeMap<Long, ProviderInfo>();
- // 设置越大越慢, 精度越高
- int num = 32;
- for (ProviderInfo providerInfo : actualNodes) {
- for (int i = 0; i <num * providerInfo.getWeight() / 4; i++) {
- byte[] digest = HashUtils.messageDigest(providerInfo.getHost() + providerInfo.getPort() + i);
- for (int h = 0; h < 4; h++) {
- long m = HashUtils.hash(digest, h);
- virtualNodes.put(m, providerInfo);
- }
- }
- }
- }
Selector 内部类中就是构建了一个 TreeMap 实例, 然后遍历所有的 provider, 每个 provider 虚拟的节点数是 (真实节点权重 * 32) 个.
虚拟好节点后, 我们直接调用 Selector#select 方法在 hash 环中得到相应的 provider.
- public ProviderInfo select(SofaRequest request) {
- String key = buildKeyOfHash(request.getMethodArgs());
- byte[] digest = HashUtils.messageDigest(key);
- return selectForKey(HashUtils.hash(digest, 0));
- }
- /**
- * 获取第一参数作为 hash 的 key
- *
- * @param args the args
- * @return the string
- */
- private String buildKeyOfHash(Object[] args) {
- if (CommonUtils.isEmpty(args)) {
- return StringUtils.EMPTY;
- } else {
- return StringUtils.toString(args[0]);
- }
- }
- /**
- * Select for key.
- *
- * @param hash the hash
- * @return the provider
- */
- private ProviderInfo selectForKey(long hash) {
- Map.Entry<Long, ProviderInfo> entry = virtualNodes.ceilingEntry(hash);
- if (entry == null) {
- entry = virtualNodes.firstEntry();
- }
- return entry.getValue();
- }
这上面主要是获取第一参数作为 hash 的 key, 然后对它进行 hash. 所以我感觉这里可能有一个问题就是如果一个某个服务里面很多个参数一样的服务, 那么是不是都会打到那同一台机器上呢?
dubbo 的实现方式也和 SOFARPC 类似, 这里不再赘述.
来源: https://www.cnblogs.com/luozhiyun/p/11308414.html