diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LeastResourceUsageWithWeight.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LeastResourceUsageWithWeight.java index 3abb6d6283558..35dd7282101b5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LeastResourceUsageWithWeight.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LeastResourceUsageWithWeight.java @@ -91,6 +91,11 @@ private double updateAndGetMaxResourceUsageWithWeight(String broker, BrokerData ServiceConfiguration conf) { final double historyPercentage = conf.getLoadBalancerHistoryResourcePercentage(); Double historyUsage = brokerAvgResourceUsageWithWeight.get(broker); + LocalBrokerData localData = brokerData.getLocalData(); + // If the broker restarted or MsgRate is 0, should use current resourceUsage to cover the historyUsage + if (localData.getBundles().size() == 0 || (localData.getMsgRateIn() == 0 && localData.getMsgRateOut() == 0)){ + historyUsage = null; + } double resourceUsage = brokerData.getLocalData().getMaxResourceUsageWithWeight( conf.getLoadBalancerCPUResourceWeight(), conf.getLoadBalancerMemoryResourceWeight(), diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerStrategyTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerStrategyTest.java index 0de4cbde52616..4f86536c54a5e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerStrategyTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerStrategyTest.java @@ -23,6 +23,7 @@ import java.lang.reflect.Field; import java.util.HashMap; import java.util.HashSet; +import java.util.Collections; import java.util.Map; import java.util.Optional; @@ -131,6 +132,16 @@ public void testLeastResourceUsageWithWeight() { brokerDataMap.put("2", brokerData2); brokerDataMap.put("3", brokerData3); assertEquals(strategy.selectBroker(candidates, bundleData, loadData, conf), Optional.of("2")); + + // test restart broker can load bundle as one of the best brokers. + brokerData1 = initBrokerData(35,100); + brokerData2 = initBrokerData(20,100); + brokerData3 = initBrokerData(0,100); + brokerData3.getLocalData().setBundles(Collections.emptySet()); + brokerDataMap.put("1", brokerData1); + brokerDataMap.put("2", brokerData2); + brokerDataMap.put("3", brokerData3); + assertEquals(strategy.selectBroker(candidates, bundleData, loadData, conf), Optional.of("3")); } public void testLeastResourceUsageWithWeightWithArithmeticException() @@ -180,6 +191,12 @@ private BrokerData initBrokerData(double usage, double limit) { localBrokerData.setDirectMemory(new ResourceUsage(usage, limit)); localBrokerData.setBandwidthIn(new ResourceUsage(usage, limit)); localBrokerData.setBandwidthOut(new ResourceUsage(usage, limit)); + // add msgRate and bundle for update resource usage check. + localBrokerData.setMsgRateIn(100.00); + localBrokerData.setMsgRateOut(100.00); + Set bundles = new HashSet<>(); + bundles.add("0x00000000_0xffffffff"); + localBrokerData.setBundles(bundles); BrokerData brokerData = new BrokerData(localBrokerData); TimeAverageBrokerData timeAverageBrokerData = new TimeAverageBrokerData(); brokerData.setTimeAverageData(timeAverageBrokerData);