Skip to content

Commit

Permalink
Merge pull request #17 from apache/master
Browse files Browse the repository at this point in the history
 Update the latest code
  • Loading branch information
CrazyHZM authored Dec 6, 2018
2 parents 8d30d5e + 63bf28d commit f4da633
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.RpcException;
Expand All @@ -32,6 +33,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

/**
* Abstract implementation of Directory: Invoker list returned from this Directory's list method have been filtered by Routers
Expand Down Expand Up @@ -62,7 +64,14 @@ public AbstractDirectory(URL url, URL consumerUrl, List<Router> routers) {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
this.url = url;

if (url.getProtocol().equals(Constants.REGISTRY_PROTOCOL)) {
Map<String, String> queryMap = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
this.url = url.clearParameters().addParameters(queryMap).removeParameter(Constants.MONITOR_KEY);
} else {
this.url = url;
}

this.consumerUrl = consumerUrl;
setRouters(routers);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,48 +26,75 @@

/**
* LeastActiveLoadBalance
* <p>
* Filter the number of invokers with the least number of active calls and count the weights and quantities of these invokers.
* If there is only one invoker, use the invoker directly;
* if there are multiple invokers and the weights are not the same, then random according to the total weight;
* if there are multiple invokers and the same weight, then randomly called.
*/
public class LeastActiveLoadBalance extends AbstractLoadBalance {

public static final String NAME = "leastactive";

@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
int length = invokers.size(); // Number of invokers
int leastActive = -1; // The least active value of all invokers
int leastCount = 0; // The number of invokers having the same least active value (leastActive)
int[] leastIndexes = new int[length]; // The index of invokers having the same least active value (leastActive)
int totalWeight = 0; // The sum of with warmup weights
int firstWeight = 0; // Initial value, used for comparision
boolean sameWeight = true; // Every invoker has the same weight value?
// Number of invokers
int length = invokers.size();
// The least active value of all invokers
int leastActive = -1;
// The number of invokers having the same least active value (leastActive)
int leastCount = 0;
// The index of invokers having the same least active value (leastActive)
int[] leastIndexes = new int[length];
// The sum of the warmup weights of all the least active invokes
int totalWeight = 0;
// The weight of the first least active invoke
int firstWeight = 0;
// Every least active invoker has the same weight value?
boolean sameWeight = true;

// Filter out all the least active invokers
for (int i = 0; i < length; i++) {
Invoker<T> invoker = invokers.get(i);
int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); // Active number
// Get the active number of the invoke
int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive();
// Get the weight of the invoke configuration. The default value is 100.
int afterWarmup = getWeight(invoker, invocation);
if (leastActive == -1 || active < leastActive) { // Restart, when find a invoker having smaller least active value.
leastActive = active; // Record the current least active value
leastCount = 1; // Reset leastCount, count again based on current leastCount
leastIndexes[0] = i; // Reset
totalWeight = afterWarmup; // Reset
firstWeight = afterWarmup; // Record the weight the first invoker
sameWeight = true; // Reset, every invoker has the same weight value?
} else if (active == leastActive) { // If current invoker's active value equals with leaseActive, then accumulating.
leastIndexes[leastCount++] = i; // Record index number of this invoker
totalWeight += afterWarmup; // Add this invoker's with warmup weight to totalWeight.
// If it is the first invoker or the active number of the invoker is less than the current least active number
if (leastActive == -1 || active < leastActive) {
// Reset the active number of the current invoker to the least active number
leastActive = active;
// Reset the number of least active invokers
leastCount = 1;
// Put the first least active invoker first in leastIndexs
leastIndexes[0] = i;
// Reset totalWeight
totalWeight = afterWarmup;
// Record the weight the first least active invoker
firstWeight = afterWarmup;
// Each invoke has the same weight (only one invoker here)
sameWeight = true;
// If current invoker's active value equals with leaseActive, then accumulating.
} else if (active == leastActive) {
// Record the index of the least active invoker in leastIndexs order
leastIndexes[leastCount++] = i;
// Accumulate the total weight of the least active invoker
totalWeight += afterWarmup;
// If every invoker has the same weight?
if (sameWeight && i > 0
&& afterWarmup != firstWeight) {
sameWeight = false;
}
}
}
// assert(leastCount > 0)
// Choose an invoker from all the least active invokers
if (leastCount == 1) {
// If we got exactly one invoker having the least active value, return this invoker directly.
return invokers.get(leastIndexes[0]);
}
if (!sameWeight && totalWeight > 0) {
// If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
// If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on
// totalWeight.
int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight) + 1;
// Return a invoker based on the random value.
for (int i = 0; i < leastCount; i++) {
Expand All @@ -81,4 +108,4 @@ protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation
// If all invokers have the same weight value or totalWeight=0, return evenly.
return invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.concurrent.TimeUnit;

import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.junit.Ignore;
import org.junit.Test;


Expand Down Expand Up @@ -87,6 +88,7 @@ public void run() {


@Test
@Ignore
public void testCustomExecutor() {
Executor mockedExecutor = mock(Executor.class);
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
Expand Down

0 comments on commit f4da633

Please sign in to comment.