Skip to content

Commit

Permalink
[improve][broker] PIP-192 Added LeastResourceUsageWithWeight broker a…
Browse files Browse the repository at this point in the history
…ssignment strategy to the load balancer extensions (#18964)

Master Issue: #16691

### Motivation
We will start raising PRs to implement PIP-192, #16691

### Modifications
This PR Added `LeastResourceUsageWithWeight` to the load balancer extensions and its unit test.

For the pip-192 project, this LeastResourceUsageWithWeight
- is a copy of the existing broker-bundle assignment strategy, `org.apache.pulsar.broker.loadbalance.impl.LeastResourceUsageWithWeight`.
- uses the load data from the load balancer extensions.
  • Loading branch information
heesung-sn authored Dec 29, 2022
1 parent ae98392 commit a8da549
Show file tree
Hide file tree
Showing 4 changed files with 496 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.loadbalance.extensions.strategy;

import java.util.List;
import java.util.Optional;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
import org.apache.pulsar.common.naming.ServiceUnitId;

/**
* The broker selection strategy is designed to select the broker according to different implementations.
*/
public interface BrokerSelectionStrategy {

/**
* Choose an appropriate broker according to different load balancing implementations.
*
* @param brokers
* The candidate brokers list.
* @param bundle
* The input bundle to select the owner broker
* @param context
* The context contains information needed for selection (load data, config, and etc).
*/
Optional<String> select(List<String> brokers, ServiceUnitId bundle, LoadManagerContext context);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.loadbalance.extensions.strategy;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
import org.apache.pulsar.common.naming.ServiceUnitId;

/**
* Placement strategy which selects a broker based on which one has the least resource usage with weight.
* This strategy takes into account the historical load percentage and short-term load percentage, and thus will not
* cause cluster fluctuations due to short-term load jitter.
*/
@Slf4j
public class LeastResourceUsageWithWeight implements BrokerSelectionStrategy {
private static final double MAX_RESOURCE_USAGE = 1.0d;
// Maintain this list to reduce object creation.
private final ArrayList<String> bestBrokers;
private final Set<String> noLoadDataBrokers;
private final Map<String, Double> brokerAvgResourceUsageWithWeight;

public LeastResourceUsageWithWeight() {
this.bestBrokers = new ArrayList<>();
this.brokerAvgResourceUsageWithWeight = new HashMap<>();
this.noLoadDataBrokers = new HashSet<>();
}

// A broker's max resource usage with weight using its historical load and short-term load data with weight.
private double getMaxResourceUsageWithWeight(final String broker, final BrokerLoadData brokerLoadData,
final ServiceConfiguration conf, boolean debugMode) {
final double overloadThreshold = conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0;
final double maxUsageWithWeight =
updateAndGetMaxResourceUsageWithWeight(broker, brokerLoadData, conf, debugMode);

if (maxUsageWithWeight > overloadThreshold) {
log.warn(
"Broker {} is overloaded, max resource usage with weight percentage: {}%, "
+ "CPU: {}%, MEMORY: {}%, DIRECT MEMORY: {}%, BANDWIDTH IN: {}%, "
+ "BANDWIDTH OUT: {}%, CPU weight: {}, MEMORY weight: {}, DIRECT MEMORY weight: {}, "
+ "BANDWIDTH IN weight: {}, BANDWIDTH OUT weight: {}",
broker, maxUsageWithWeight * 100,
brokerLoadData.getCpu().percentUsage(), brokerLoadData.getMemory().percentUsage(),
brokerLoadData.getDirectMemory().percentUsage(), brokerLoadData.getBandwidthIn().percentUsage(),
brokerLoadData.getBandwidthOut().percentUsage(), conf.getLoadBalancerCPUResourceWeight(),
conf.getLoadBalancerMemoryResourceWeight(), conf.getLoadBalancerDirectMemoryResourceWeight(),
conf.getLoadBalancerBandwithInResourceWeight(),
conf.getLoadBalancerBandwithOutResourceWeight());
}

if (debugMode) {
log.info("Broker {} has max resource usage with weight percentage: {}%",
broker, maxUsageWithWeight * 100);
}
return maxUsageWithWeight;
}

/**
* Update and get the max resource usage with weight of broker according to the service configuration.
*
* @param broker The broker name.
* @param brokerData The broker load data.
* @param conf The service configuration.
* @param debugMode The debug mode to print computed load states and decision information.
* @return the max resource usage with weight of broker
*/
private double updateAndGetMaxResourceUsageWithWeight(String broker, BrokerLoadData brokerData,
ServiceConfiguration conf, boolean debugMode) {
final double historyPercentage = conf.getLoadBalancerHistoryResourcePercentage();
Double historyUsage = brokerAvgResourceUsageWithWeight.get(broker);
double resourceUsage = brokerData.getMaxResourceUsageWithWeight(
conf.getLoadBalancerCPUResourceWeight(),
conf.getLoadBalancerMemoryResourceWeight(),
conf.getLoadBalancerDirectMemoryResourceWeight(),
conf.getLoadBalancerBandwithInResourceWeight(),
conf.getLoadBalancerBandwithOutResourceWeight());
historyUsage = historyUsage == null
? resourceUsage : historyUsage * historyPercentage + (1 - historyPercentage) * resourceUsage;
if (debugMode) {
log.info(
"Broker {} get max resource usage with weight: {}, history resource percentage: {}%, CPU weight: "
+ "{}, MEMORY weight: {}, DIRECT MEMORY weight: {}, BANDWIDTH IN weight: {}, BANDWIDTH "
+ "OUT weight: {} ",
broker, historyUsage, historyPercentage, conf.getLoadBalancerCPUResourceWeight(),
conf.getLoadBalancerMemoryResourceWeight(), conf.getLoadBalancerDirectMemoryResourceWeight(),
conf.getLoadBalancerBandwithInResourceWeight(),
conf.getLoadBalancerBandwithOutResourceWeight());
}
brokerAvgResourceUsageWithWeight.put(broker, historyUsage);
return historyUsage;
}

/**
* Find a suitable broker to assign the given bundle to.
* This method is not thread safety.
*
* @param candidates The candidates for which the bundle may be assigned.
* @param bundleToAssign The data for the bundle to assign.
* @param context The load manager context.
* @return The name of the selected broker as it appears on ZooKeeper.
*/
@Override
public Optional<String> select(List<String> candidates, ServiceUnitId bundleToAssign, LoadManagerContext context) {
var conf = context.brokerConfiguration();
if (candidates.isEmpty()) {
log.info("There are no available brokers as candidates at this point for bundle: {}", bundleToAssign);
return Optional.empty();
}

bestBrokers.clear();
noLoadDataBrokers.clear();
// Maintain of list of all the best scoring brokers and then randomly
// select one of them at the end.
double totalUsage = 0.0d;

// TODO: use loadBalancerDebugModeEnabled too.
boolean debugMode = log.isDebugEnabled();
for (String broker : candidates) {
var brokerLoadDataOptional = context.brokerLoadDataStore().get(broker);
if (brokerLoadDataOptional.isEmpty()) {
log.warn("There is no broker load data for broker:{}. Skipping this broker.", broker);
noLoadDataBrokers.add(broker);
continue;
}

var brokerLoadData = brokerLoadDataOptional.get();

double usageWithWeight =
getMaxResourceUsageWithWeight(broker, brokerLoadData, context.brokerConfiguration(), debugMode);
totalUsage += usageWithWeight;
}

if (candidates.size() > noLoadDataBrokers.size()) {
final double avgUsage = totalUsage / (candidates.size() - noLoadDataBrokers.size());
final double diffThreshold =
conf.getLoadBalancerAverageResourceUsageDifferenceThresholdPercentage() / 100.0;
if (debugMode) {
log.info("Computed avgUsage:{}, diffThreshold:{}", avgUsage, diffThreshold);
}
candidates.forEach(broker -> {
Double avgResUsage = brokerAvgResourceUsageWithWeight.getOrDefault(broker, MAX_RESOURCE_USAGE);
if ((avgResUsage + diffThreshold <= avgUsage && !noLoadDataBrokers.contains(broker))) {
bestBrokers.add(broker);
}
});
}

if (bestBrokers.isEmpty()) {
// Assign randomly as all brokers are overloaded.
log.warn("Assign randomly as none of the brokers are underloaded. candidatesSize:{}, "
+ "noLoadDataBrokersSize:{}", candidates.size(), noLoadDataBrokers.size());
for (String broker : candidates) {
bestBrokers.add(broker);
}
}

if (debugMode) {
log.info("Selected {} best brokers: {} from candidate brokers: {}, noLoadDataBrokers:{}",
bestBrokers.size(), bestBrokers,
candidates,
noLoadDataBrokers);
}
return Optional.of(bestBrokers.get(ThreadLocalRandom.current().nextInt(bestBrokers.size())));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.loadbalance.extensions.strategy;
Loading

0 comments on commit a8da549

Please sign in to comment.