Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -91,6 +92,7 @@
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
import org.apache.pinot.controller.helix.core.rebalance.ZkBasedTableRebalanceObserver;
import org.apache.pinot.controller.recommender.RecommenderDriver;
import org.apache.pinot.controller.tuner.TableConfigTunerUtils;
import org.apache.pinot.controller.util.CompletionServiceHelper;
Expand Down Expand Up @@ -617,6 +619,22 @@ private ObjectNode validateConfig(TableConfig tableConfig, Schema schema, @Nulla
}
}

@GET
@Produces(MediaType.APPLICATION_JSON)
@Authenticate(AccessType.UPDATE)
@Path("/rebalanceStatus/{jobId}")
@ApiOperation(value = "Gets the current status of a rebalance operation",
notes = "Gets the current status of a rebalance operation")
public RebalanceResult rebalanceStatus(@ApiParam(value = "Rebalance Job Id", required = true) @PathParam("jobId") String jobId)
throws JsonProcessingException {
Map<String, String> controllerJobZKMetadata = _pinotHelixResourceManager.getControllerJobZKMetadata(jobId);
if (controllerJobZKMetadata == null) {
throw new ControllerApplicationException(LOGGER, "Failed to find controller job id: " + jobId,
Response.Status.NOT_FOUND);
}
return JsonUtils.stringToObject(controllerJobZKMetadata.get("rebalanceResult"), RebalanceResult.class);
}

@POST
@Produces(MediaType.APPLICATION_JSON)
@Authenticate(AccessType.UPDATE)
Expand Down Expand Up @@ -662,28 +680,31 @@ public RebalanceResult rebalance(
rebalanceConfig.addProperty(RebalanceConfigConstants.EXTERNAL_VIEW_STABILIZATION_TIMEOUT_IN_MS,
externalViewStabilizationTimeoutInMs);

String rebalanceJobId = UUID.randomUUID().toString();
ZkBasedTableRebalanceObserver rebalanceObserver = new ZkBasedTableRebalanceObserver(rebalanceJobId, _pinotHelixResourceManager);

try {
if (dryRun || downtime) {
// For dry-run or rebalance with downtime, directly return the rebalance result as it should return immediately
return _pinotHelixResourceManager.rebalanceTable(tableNameWithType, rebalanceConfig);
return _pinotHelixResourceManager.rebalanceTable(tableNameWithType, rebalanceConfig, rebalanceObserver, rebalanceJobId);
} else {
// Make a dry-run first to get the target assignment
rebalanceConfig.setProperty(RebalanceConfigConstants.DRY_RUN, true);
RebalanceResult dryRunResult = _pinotHelixResourceManager.rebalanceTable(tableNameWithType, rebalanceConfig);
RebalanceResult dryRunResult = _pinotHelixResourceManager.rebalanceTable(tableNameWithType, rebalanceConfig, rebalanceObserver, rebalanceJobId);

if (dryRunResult.getStatus() == RebalanceResult.Status.DONE) {
// If dry-run succeeded, run rebalance asynchronously
rebalanceConfig.setProperty(RebalanceConfigConstants.DRY_RUN, false);
_executorService.submit(() -> {
try {
_pinotHelixResourceManager.rebalanceTable(tableNameWithType, rebalanceConfig);
_pinotHelixResourceManager.rebalanceTable(tableNameWithType, rebalanceConfig, rebalanceObserver, rebalanceJobId);
} catch (Throwable t) {
LOGGER.error("Caught exception/error while rebalancing table: {}", tableNameWithType, t);
}
});
return new RebalanceResult(RebalanceResult.Status.IN_PROGRESS,
"In progress, check controller logs for updates", dryRunResult.getInstanceAssignment(),
dryRunResult.getSegmentAssignment());
dryRunResult.getSegmentAssignment(), rebalanceJobId, System.currentTimeMillis());
} else {
// If dry-run failed or is no-op, return the dry-run result
return dryRunResult;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceObserver;
import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
import org.apache.pinot.controller.helix.core.util.ZKMetadataUtils;
import org.apache.pinot.controller.helix.starter.HelixConfig;
Expand Down Expand Up @@ -2129,7 +2130,7 @@ public boolean addNewReloadAllSegmentsJob(String tableNameWithType, String jobId
return addControllerJobToZK(jobId, jobMetadata);
}

private boolean addControllerJobToZK(String jobId, Map<String, String> jobMetadata) {
public boolean addControllerJobToZK(String jobId, Map<String, String> jobMetadata) {
String jobResourcePath = ZKMetadataProvider.constructPropertyStorePathForControllerJob();
Stat stat = new Stat();
ZNRecord tableJobsZnRecord = _propertyStore.get(jobResourcePath, stat, AccessOption.PERSISTENT);
Expand Down Expand Up @@ -3096,13 +3097,15 @@ private PinotResourceManagerResponse enableInstance(String instanceName, boolean
"Instance: " + instanceName + (enableInstance ? " enable" : " disable") + " failed, timeout");
}

public RebalanceResult rebalanceTable(String tableNameWithType, Configuration rebalanceConfig)
public RebalanceResult rebalanceTable(String tableNameWithType, Configuration rebalanceConfig, TableRebalanceObserver rebalanceObserver, String rebalanceId)
throws TableNotFoundException {
TableConfig tableConfig = getTableConfig(tableNameWithType);
if (tableConfig == null) {
throw new TableNotFoundException("Failed to find table config for table: " + tableNameWithType);
}
return new TableRebalancer(_helixZkManager).rebalance(tableConfig, rebalanceConfig);
TableRebalancer tableRebalancer = new TableRebalancer(_helixZkManager);
tableRebalancer.registerObserver(rebalanceObserver);
return tableRebalancer.rebalance(tableConfig, rebalanceConfig, rebalanceId);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,23 +33,62 @@ public class RebalanceResult {
private final Map<InstancePartitionsType, InstancePartitions> _instanceAssignment;
private final Map<String, Map<String, String>> _segmentAssignment;
private final String _description;
private final String _rebalanceId;
//indicates the time at which the rebalance was started (for status: IN_PROGRESS)
private final Long _startTimestamp;
//indicates the time at which the rebalance finished.
private Long _finishTimestamp;

public RebalanceResult(@JsonProperty(value = "status", required = true) Status status,
@JsonProperty(value = "description", required = true) String description,
@JsonProperty("instanceAssignment") @Nullable Map<InstancePartitionsType, InstancePartitions> instanceAssignment,
@JsonProperty("segmentAssignment") @Nullable Map<String, Map<String, String>> segmentAssignment, @JsonProperty(value = "rebalanceId", required = true) String rebalanceId, @JsonProperty(value = "startTimestamp", required = true) Long startTimestamp) {
_status = status;
_description = description;
_instanceAssignment = instanceAssignment;
_segmentAssignment = segmentAssignment;
_rebalanceId = rebalanceId;
_startTimestamp = startTimestamp;
}

@JsonCreator
public RebalanceResult(@JsonProperty(value = "status", required = true) Status status,
@JsonProperty(value = "description", required = true) String description,
@JsonProperty("instanceAssignment") @Nullable Map<InstancePartitionsType, InstancePartitions> instanceAssignment,
@JsonProperty("segmentAssignment") @Nullable Map<String, Map<String, String>> segmentAssignment) {
@JsonProperty("segmentAssignment") @Nullable Map<String, Map<String, String>> segmentAssignment, @JsonProperty(value = "rebalanceId", required = true) String rebalanceId, @JsonProperty(value = "startTimestamp", required = true) Long startTimestamp, @JsonProperty(value = "finishTimestamp", required = true) Long finishTimestamp) {
_status = status;
_description = description;
_instanceAssignment = instanceAssignment;
_segmentAssignment = segmentAssignment;
_rebalanceId = rebalanceId;
_startTimestamp = startTimestamp;
_finishTimestamp = finishTimestamp;
}

@JsonProperty
public Status getStatus() {
return _status;
}

@JsonProperty
public String getRebalanceId() {
return _rebalanceId;
}

@JsonProperty
public Long getStartTimestamp() {
return _startTimestamp;
}

@JsonProperty
public Long getFinishTimestamp() {
return _finishTimestamp;
}

public void setFinishTimestamp(Long timestamp) {
_finishTimestamp = timestamp;
}

@JsonProperty
public String getDescription() {
return _description;
Expand All @@ -66,6 +105,6 @@ public Map<String, Map<String, String>> getSegmentAssignment() {
}

public enum Status {
NO_OP, DONE, FAILED, IN_PROGRESS
NO_OP, DONE, FAILED, IN_PROGRESS, STARTED
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.apache.pinot.controller.helix.core.rebalance;

public interface TableRebalanceObserver {
void onNext(RebalanceResult rebalanceResult);
void onComplete();
void onError(Exception e);
}
Loading