Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add threshold for each unload round for uniform load shedder #13915

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
add threshold for each unload round for uniform load shedder
  • Loading branch information
hangc0276 committed Jan 25, 2022
commit b001103e32413f565f3192d47361e944a7bcdc1d
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.mutable.MutableDouble;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.commons.lang3.mutable.MutableObject;
Expand All @@ -32,21 +33,22 @@
import org.apache.pulsar.broker.loadbalance.LoadData;
import org.apache.pulsar.broker.loadbalance.LoadSheddingStrategy;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This strategy tends to distribute load uniformly across all brokers. This strategy checks laod difference between
* This strategy tends to distribute load uniformly across all brokers. This strategy checks load difference between
* broker with highest load and broker with lowest load. If the difference is higher than configured thresholds
* {@link ServiceConfiguration#getLoadBalancerMsgRateDifferenceShedderThreshold()} and
* {@link ServiceConfiguration#getLoadBalancerMsgRateDifferenceShedderThreshold()} then it finds out bundles which can
* be unloaded to distribute traffic evenly across all brokers.
*
*/
@Slf4j
public class UniformLoadShedder implements LoadSheddingStrategy {

private static final Logger log = LoggerFactory.getLogger(UniformLoadShedder.class);

private static final int MB = 1024 * 1024;
private static final double MAX_UNLOAD_PERCENTAGE = 0.2;
private static final int MIN_UNLOAD_MESSAGE = 1000;
private static final int MIN_UNLOAD_THROUGHPUT = 1 * MB;
private final Multimap<String, String> selectedBundlesCache = ArrayListMultimap.create();
private static final double EPS = 1e-6;

Expand Down Expand Up @@ -125,12 +127,14 @@ public Multimap<String, String> findBundlesForUnloading(final LoadData loadData,
underloadedBroker.getValue(), minMsgRate.getValue(), minThroughputRate.getValue());
}
MutableInt msgRateRequiredFromUnloadedBundles = new MutableInt(
(int) ((maxMsgRate.getValue() - minMsgRate.getValue()) / 2));
(int) ((maxMsgRate.getValue() - minMsgRate.getValue()) * MAX_UNLOAD_PERCENTAGE));
MutableInt msgThroughputRequiredFromUnloadedBundles = new MutableInt(
(int) ((maxThroughputRate.getValue() - minThroughputRate.getValue()) / 2));
(int) ((maxThroughputRate.getValue() - minThroughputRate.getValue()) * MAX_UNLOAD_PERCENTAGE));
LocalBrokerData overloadedBrokerData = brokersData.get(overloadedBroker.getValue()).getLocalData();

if (overloadedBrokerData.getBundles().size() > 1) {
if (overloadedBrokerData.getBundles().size() > 1
&& (msgRateRequiredFromUnloadedBundles.getValue() >= MIN_UNLOAD_MESSAGE
|| msgThroughputRequiredFromUnloadedBundles.getValue() >= MIN_UNLOAD_THROUGHPUT)) {
// Sort bundles by throughput, then pick the bundle which can help to reduce load uniformly with
// under-loaded broker
loadBundleData.entrySet().stream()
Expand Down