Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.pinot.common.utils.helix;

import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -29,6 +28,7 @@
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -102,20 +102,21 @@ public static IdealState cloneIdealState(IdealState idealState) {
* @param updater A function that returns an updated ideal state given an input ideal state
* @return updated ideal state if successful, null if not
*/
public static IdealState updateIdealState(final HelixManager helixManager, final String resourceName,
final Function<IdealState, IdealState> updater, RetryPolicy policy, final boolean noChangeOk) {
public static IdealState updateIdealState(HelixManager helixManager, String resourceName,
Function<IdealState, IdealState> updater, RetryPolicy policy, boolean noChangeOk) {
// NOTE: ControllerMetrics could be null because this method might be invoked by Broker.
ControllerMetrics controllerMetrics = ControllerMetrics.get();
try {
long startTimeMs = System.currentTimeMillis();
IdealStateWrapper idealStateWrapper = new IdealStateWrapper();
int retries = policy.attempt(new Callable<Boolean>() {
int retries = policy.attempt(new Callable<>() {
@Override
public Boolean call() {
HelixDataAccessor dataAccessor = helixManager.getHelixDataAccessor();
PropertyKey idealStateKey = dataAccessor.keyBuilder().idealStates(resourceName);
IdealState idealState = dataAccessor.getProperty(idealStateKey);

// Make a copy of the the idealState above to pass it to the updater
// Make a copy of the idealState above to pass it to the updater
// NOTE: new IdealState(idealState.getRecord()) does not work because it's shallow copy for map fields and
// list fields
IdealState idealStateCopy = cloneIdealState(idealState);
Expand Down Expand Up @@ -197,20 +198,21 @@ private boolean shouldCompress(IdealState is) {
numChars += entry.getValue().length();
}
numChars *= is.getNumPartitions();
if (_minNumCharsInISToTurnOnCompression > 0
&& numChars > _minNumCharsInISToTurnOnCompression) {
return true;
}
return _minNumCharsInISToTurnOnCompression > 0 && numChars > _minNumCharsInISToTurnOnCompression;
}
return false;
}
});
controllerMetrics.addMeteredValue(resourceName, ControllerMeter.IDEAL_STATE_UPDATE_RETRY, retries);
controllerMetrics.addTimedValue(resourceName, ControllerTimer.IDEAL_STATE_UPDATE_TIME_MS,
System.currentTimeMillis() - startTimeMs, TimeUnit.MILLISECONDS);
if (controllerMetrics != null) {
controllerMetrics.addMeteredValue(resourceName, ControllerMeter.IDEAL_STATE_UPDATE_RETRY, retries);
controllerMetrics.addTimedValue(resourceName, ControllerTimer.IDEAL_STATE_UPDATE_TIME_MS,
System.currentTimeMillis() - startTimeMs, TimeUnit.MILLISECONDS);
}
return idealStateWrapper._idealState;
} catch (Exception e) {
controllerMetrics.addMeteredValue(resourceName, ControllerMeter.IDEAL_STATE_UPDATE_FAILURE, 1L);
if (controllerMetrics != null) {
controllerMetrics.addMeteredValue(resourceName, ControllerMeter.IDEAL_STATE_UPDATE_FAILURE, 1L);
}
throw new RuntimeException("Caught exception while updating ideal state for resource: " + resourceName, e);
}
}
Expand Down