Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.zkclient.exception.ZkBadVersionException;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.utils.SchemaUtils;
Expand Down Expand Up @@ -112,14 +113,20 @@ public static String constructPropertyStorePathForInstancePartitions(String inst
return StringUtil.join("/", PROPERTYSTORE_INSTANCE_PARTITIONS_PREFIX, instancePartitionsName);
}

public static String constructPropertyStorePathForControllerJob() {
return StringUtil.join("/", PROPERTYSTORE_CONTROLLER_JOBS_PREFIX);
}

public static String constructPropertyStorePathForResource(String resourceName) {
return StringUtil.join("/", PROPERTYSTORE_SEGMENTS_PREFIX, resourceName);
}

public static String constructPropertyStorePathForControllerJob(ControllerJobType jobType) {
if (jobType == ControllerJobType.TABLE_REBALANCE) {
return StringUtil.join("/", PROPERTYSTORE_CONTROLLER_JOBS_PREFIX, jobType.name());
} else {
// For other types, we will continue to use the root path until we migrate
// other types to use separate nodes based on jobType like TABLE_REBALANCE
return StringUtil.join("/", PROPERTYSTORE_CONTROLLER_JOBS_PREFIX);
}
}

public static String constructPropertyStorePathForResourceConfig(String resourceName) {
return StringUtil.join("/", PROPERTYSTORE_TABLE_CONFIGS_PREFIX, resourceName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,5 @@
package org.apache.pinot.common.metadata.controllerjob;

public enum ControllerJobType {
RELOAD_SEGMENT,
RELOAD_ALL_SEGMENTS,
FORCE_COMMIT
RELOAD_SEGMENT, RELOAD_ALL_SEGMENTS, FORCE_COMMIT, TABLE_REBALANCE
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import javax.ws.rs.core.Response;
import org.apache.commons.httpclient.HttpConnectionManager;
import org.apache.helix.model.IdealState;
import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.api.exception.ControllerApplicationException;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
Expand Down Expand Up @@ -165,7 +166,8 @@ public JsonNode getForceCommitJobStatus(
@ApiParam(value = "Force commit job id", required = true) @PathParam("jobId") String forceCommitJobId)
throws Exception {
Map<String, String> controllerJobZKMetadata =
_pinotHelixResourceManager.getControllerJobZKMetadata(forceCommitJobId);
_pinotHelixResourceManager.getControllerJobZKMetadata(forceCommitJobId,
ControllerJobType.FORCE_COMMIT);
if (controllerJobZKMetadata == null) {
throw new ControllerApplicationException(LOGGER, "Failed to find controller job id: " + forceCommitJobId,
Response.Status.NOT_FOUND);
Expand Down Expand Up @@ -227,8 +229,8 @@ public ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap getConsumingSegmentsI
String tableNameWithType = TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(realtimeTableName);
ConsumingSegmentInfoReader consumingSegmentInfoReader =
new ConsumingSegmentInfoReader(_executor, _connectionManager, _pinotHelixResourceManager);
return consumingSegmentInfoReader
.getConsumingSegmentsInfo(tableNameWithType, _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
return consumingSegmentInfoReader.getConsumingSegmentsInfo(tableNameWithType,
_controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
} catch (Exception e) {
throw new ControllerApplicationException(LOGGER,
String.format("Failed to get consuming segments info for table %s. %s", realtimeTableName, e.getMessage()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,8 @@ public SuccessResponse reloadSegmentDeprecated2(
public ServerReloadControllerJobStatusResponse getReloadJobStatus(
@ApiParam(value = "Reload job id", required = true) @PathParam("jobId") String reloadJobId)
throws Exception {
Map<String, String> controllerJobZKMetadata = _pinotHelixResourceManager.getControllerJobZKMetadata(reloadJobId);
Map<String, String> controllerJobZKMetadata = _pinotHelixResourceManager.
getControllerJobZKMetadata(reloadJobId, ControllerJobType.RELOAD_SEGMENT);
if (controllerJobZKMetadata == null) {
throw new ControllerApplicationException(LOGGER, "Failed to find controller job id: " + reloadJobId,
Status.NOT_FOUND);
Expand Down
Loading