From 85c4ba066481e0c303bc2928b493e578af79a146 Mon Sep 17 00:00:00 2001 From: Hang Chen Date: Sat, 11 Dec 2021 12:31:50 +0800 Subject: [PATCH] Use current resourceUsage value as historyUsage when leader change in ThresholdShedder (#13136) ### Motivation Fix #13119 ### Modification 1. User current resourceUsage value as historyUsage value when leader change in ThresholdShedder to speed up getting the actual historyUsage value. (cherry picked from commit 6d9d24d50db5418ddbb845d2c7a2be2b9ac72893) --- .../loadbalance/impl/ThresholdShedder.java | 22 +++++++++---------- 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java index 39965921d65c0..727be9ba80730 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java @@ -38,13 +38,9 @@ public class ThresholdShedder implements LoadSheddingStrategy { private static final Logger log = LoggerFactory.getLogger(ThresholdShedder.class); - private final Multimap selectedBundlesCache = ArrayListMultimap.create(); - private static final double ADDITIONAL_THRESHOLD_PERCENT_MARGIN = 0.05; - private static final double MB = 1024 * 1024; - private final Map brokerAvgResourceUsage = new HashMap<>(); @Override @@ -139,25 +135,27 @@ private double getBrokerAvgUsage(final LoadData loadData, final double historyPe for (Map.Entry entry : loadData.getBrokerData().entrySet()) { LocalBrokerData localBrokerData = entry.getValue().getLocalData(); String broker = entry.getKey(); - updateAvgResourceUsage(broker, localBrokerData, historyPercentage, conf); - totalUsage += brokerAvgResourceUsage.getOrDefault(broker, 0.0); + totalUsage += updateAvgResourceUsage(broker, localBrokerData, historyPercentage, conf); totalBrokers++; } return totalBrokers > 0 ? totalUsage / totalBrokers : 0; } - private void updateAvgResourceUsage(String broker, LocalBrokerData localBrokerData, final double historyPercentage, - final ServiceConfiguration conf) { - double historyUsage = - brokerAvgResourceUsage.getOrDefault(broker, 0.0); - historyUsage = historyUsage * historyPercentage - + (1 - historyPercentage) * localBrokerData.getMaxResourceUsageWithWeight( + private double updateAvgResourceUsage(String broker, LocalBrokerData localBrokerData, + final double historyPercentage, final ServiceConfiguration conf) { + Double historyUsage = + brokerAvgResourceUsage.get(broker); + double resourceUsage = localBrokerData.getMaxResourceUsageWithWeight( conf.getLoadBalancerCPUResourceWeight(), conf.getLoadBalancerMemoryResourceWeight(), conf.getLoadBalancerDirectMemoryResourceWeight(), conf.getLoadBalancerBandwithInResourceWeight(), conf.getLoadBalancerBandwithOutResourceWeight()); + historyUsage = historyUsage == null + ? resourceUsage : historyUsage * historyPercentage + (1 - historyPercentage) * resourceUsage; + brokerAvgResourceUsage.put(broker, historyUsage); + return historyUsage; } }