Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to set targetInstance for reloadSegment #14393

Merged
merged 1 commit into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
allow to set targetInstance for reloadSegment
  • Loading branch information
klsince committed Nov 5, 2024
commit 50780a3c5e87d2333e706caeca76612ad4a1b547
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,7 @@ public Response listSegmentLineage(
} catch (Exception e) {
throw new ControllerApplicationException(LOGGER,
String.format("Exception while listing segment lineage: %s for table: %s.", e.getMessage(),
tableNameWithType),
Status.INTERNAL_SERVER_ERROR, e);
tableNameWithType), Status.INTERNAL_SERVER_ERROR, e);
}
}

Expand Down Expand Up @@ -360,8 +359,8 @@ public Map<String, Object> getSegmentMetadata(

private JsonNode getExtraMetaData(String tableName, String segmentName, List<String> columns) {
try {
TableMetadataReader tableMetadataReader = new TableMetadataReader(_executor,
_connectionManager, _pinotHelixResourceManager);
TableMetadataReader tableMetadataReader =
new TableMetadataReader(_executor, _connectionManager, _pinotHelixResourceManager);
return tableMetadataReader.getSegmentMetadata(tableName, segmentName, columns,
_controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
} catch (InvalidConfigException e) {
Expand Down Expand Up @@ -390,19 +389,21 @@ public SuccessResponse reloadSegment(
@ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
@ApiParam(value = "Name of the segment", required = true) @PathParam("segmentName") @Encoded String segmentName,
@ApiParam(value = "Whether to force server to download segment") @QueryParam("forceDownload")
@DefaultValue("false") boolean forceDownload, @Context HttpHeaders headers) {
@DefaultValue("false") boolean forceDownload,
@ApiParam(value = "Name of the target instance to reload") @QueryParam("targetInstance") @Nullable
String targetInstance, @Context HttpHeaders headers) {
tableName = DatabaseUtils.translateTableName(tableName, headers);
long startTimeMs = System.currentTimeMillis();
segmentName = URIUtils.decode(segmentName);
String tableNameWithType = getExistingTable(tableName, segmentName);
Pair<Integer, String> msgInfo =
_pinotHelixResourceManager.reloadSegment(tableNameWithType, segmentName, forceDownload);
_pinotHelixResourceManager.reloadSegment(tableNameWithType, segmentName, forceDownload, targetInstance);
boolean zkJobMetaWriteSuccess = false;
if (msgInfo.getLeft() > 0) {
int numReloadMsgSent = msgInfo.getLeft();
if (numReloadMsgSent > 0) {
try {
if (_pinotHelixResourceManager.addNewReloadSegmentJob(tableNameWithType, segmentName, msgInfo.getRight(),
startTimeMs,
msgInfo.getLeft())) {
startTimeMs, numReloadMsgSent)) {
zkJobMetaWriteSuccess = true;
} else {
LOGGER.error("Failed to add reload segment job meta into zookeeper for table: {}, segment: {}",
Expand All @@ -414,11 +415,11 @@ public SuccessResponse reloadSegment(
}
return new SuccessResponse(
String.format("Submitted reload job id: %s, sent %d reload messages. Job meta ZK storage status: %s",
msgInfo.getRight(), msgInfo.getLeft(), zkJobMetaWriteSuccess ? "SUCCESS" : "FAILED"));
} else {
throw new ControllerApplicationException(LOGGER,
"Failed to find segment: " + segmentName + " in table: " + tableName, Status.NOT_FOUND);
msgInfo.getRight(), numReloadMsgSent, zkJobMetaWriteSuccess ? "SUCCESS" : "FAILED"));
}
throw new ControllerApplicationException(LOGGER,
String.format("Failed to find segment: %s in table: %s on %s", segmentName, tableName,
targetInstance == null ? "every instance" : targetInstance), Status.NOT_FOUND);
}

/**
Expand Down Expand Up @@ -522,15 +523,14 @@ public SuccessResponse resetSegments(
public ServerReloadControllerJobStatusResponse getReloadJobStatus(
@ApiParam(value = "Reload job id", required = true) @PathParam("jobId") String reloadJobId)
throws Exception {
Map<String, String> controllerJobZKMetadata = _pinotHelixResourceManager.
getControllerJobZKMetadata(reloadJobId, ControllerJobType.RELOAD_SEGMENT);
Map<String, String> controllerJobZKMetadata =
_pinotHelixResourceManager.getControllerJobZKMetadata(reloadJobId, ControllerJobType.RELOAD_SEGMENT);
if (controllerJobZKMetadata == null) {
throw new ControllerApplicationException(LOGGER, "Failed to find controller job id: " + reloadJobId,
Status.NOT_FOUND);
}

String tableNameWithType =
controllerJobZKMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE);
String tableNameWithType = controllerJobZKMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE);
Map<String, List<String>> serverToSegments;

String singleSegmentName =
Expand Down Expand Up @@ -571,7 +571,7 @@ public ServerReloadControllerJobStatusResponse getReloadJobStatus(
serverReloadControllerJobStatusResponse.setSuccessCount(0);

int totalSegments = 0;
for (Map.Entry<String, List<String>> entry: serverToSegments.entrySet()) {
for (Map.Entry<String, List<String>> entry : serverToSegments.entrySet()) {
totalSegments += entry.getValue().size();
}
serverReloadControllerJobStatusResponse.setTotalSegmentCount(totalSegments);
Expand All @@ -587,17 +587,15 @@ public ServerReloadControllerJobStatusResponse getReloadJobStatus(
serverReloadControllerJobStatusResponse.getSuccessCount() + response.getSuccessCount());
} catch (Exception e) {
serverReloadControllerJobStatusResponse.setTotalServerCallsFailed(
serverReloadControllerJobStatusResponse.getTotalServerCallsFailed() + 1
);
serverReloadControllerJobStatusResponse.getTotalServerCallsFailed() + 1);
}
}

// Add ZK fields
serverReloadControllerJobStatusResponse.setMetadata(controllerJobZKMetadata);

// Add derived fields
long submissionTime =
Long.parseLong(controllerJobZKMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS));
long submissionTime = Long.parseLong(controllerJobZKMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS));
double timeElapsedInMinutes = ((double) System.currentTimeMillis() - (double) submissionTime) / (1000.0 * 60.0);
int remainingSegments = serverReloadControllerJobStatusResponse.getTotalSegmentCount()
- serverReloadControllerJobStatusResponse.getSuccessCount();
Expand Down Expand Up @@ -625,7 +623,9 @@ public SuccessResponse reloadAllSegments(
@ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
@ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String tableTypeStr,
@ApiParam(value = "Whether to force server to download segment") @QueryParam("forceDownload")
@DefaultValue("false") boolean forceDownload, @Context HttpHeaders headers)
@DefaultValue("false") boolean forceDownload,
@ApiParam(value = "Name of the target instance to reload") @QueryParam("targetInstance") @Nullable
String targetInstance, @Context HttpHeaders headers)
throws JsonProcessingException {
tableName = DatabaseUtils.translateTableName(tableName, headers);
long startTimeMs = System.currentTimeMillis();
Expand All @@ -644,15 +644,20 @@ public SuccessResponse reloadAllSegments(
LOGGER);
Map<String, Map<String, String>> perTableMsgData = new LinkedHashMap<>();
for (String tableNameWithType : tableNamesWithType) {
Pair<Integer, String> msgInfo = _pinotHelixResourceManager.reloadAllSegments(tableNameWithType, forceDownload);
Pair<Integer, String> msgInfo =
_pinotHelixResourceManager.reloadAllSegments(tableNameWithType, forceDownload, targetInstance);
int numReloadMsgSent = msgInfo.getLeft();
if (numReloadMsgSent <= 0) {
continue;
}
Map<String, String> tableReloadMeta = new HashMap<>();
tableReloadMeta.put("numMessagesSent", String.valueOf(msgInfo.getLeft()));
tableReloadMeta.put("numMessagesSent", String.valueOf(numReloadMsgSent));
tableReloadMeta.put("reloadJobId", msgInfo.getRight());
perTableMsgData.put(tableNameWithType, tableReloadMeta);
// Store in ZK
try {
if (_pinotHelixResourceManager.addNewReloadAllSegmentsJob(tableNameWithType, msgInfo.getRight(), startTimeMs,
msgInfo.getLeft())) {
numReloadMsgSent)) {
tableReloadMeta.put("reloadJobMetaZKStorageStatus", "SUCCESS");
} else {
tableReloadMeta.put("reloadJobMetaZKStorageStatus", "FAILED");
Expand All @@ -663,6 +668,11 @@ public SuccessResponse reloadAllSegments(
LOGGER.error("Failed to add reload all segments job meta into zookeeper for table: {}", tableNameWithType, e);
}
}
if (perTableMsgData.isEmpty()) {
throw new ControllerApplicationException(LOGGER,
String.format("Failed to find any segments in table: %s on %s", tableName,
targetInstance == null ? "every instance" : targetInstance), Status.NOT_FOUND);
}
return new SuccessResponse(JsonUtils.objectToString(perTableMsgData));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1520,7 +1520,7 @@ public void updateSchema(Schema schema, boolean reload, boolean forceTableSchema
LOGGER.info("Reloading tables with name: {}", schemaName);
List<String> tableNamesWithType = getExistingTableNamesWithType(schemaName, null);
for (String tableNameWithType : tableNamesWithType) {
reloadAllSegments(tableNameWithType, false);
reloadAllSegments(tableNameWithType, false, null);
}
}
}
Expand Down Expand Up @@ -2605,8 +2605,10 @@ public void refreshSegment(String tableNameWithType, SegmentMetadata segmentMeta
sendSegmentRefreshMessage(tableNameWithType, segmentName, true, true);
}

public Pair<Integer, String> reloadAllSegments(String tableNameWithType, boolean forceDownload) {
LOGGER.info("Sending reload message for table: {} with forceDownload: {}", tableNameWithType, forceDownload);
public Pair<Integer, String> reloadAllSegments(String tableNameWithType, boolean forceDownload,
@Nullable String targetInstance) {
LOGGER.info("Sending reload message for table: {} with forceDownload: {}, and target: {}", tableNameWithType,
forceDownload, targetInstance == null ? "every instance" : targetInstance);

if (forceDownload) {
TableType tt = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
Expand All @@ -2617,7 +2619,7 @@ public Pair<Integer, String> reloadAllSegments(String tableNameWithType, boolean

Criteria recipientCriteria = new Criteria();
recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
recipientCriteria.setInstanceName("%");
recipientCriteria.setInstanceName(targetInstance == null ? "%" : targetInstance);
recipientCriteria.setResource(tableNameWithType);
recipientCriteria.setSessionSpecific(true);
SegmentReloadMessage segmentReloadMessage = new SegmentReloadMessage(tableNameWithType, forceDownload);
Expand All @@ -2635,9 +2637,10 @@ public Pair<Integer, String> reloadAllSegments(String tableNameWithType, boolean
return Pair.of(numMessagesSent, segmentReloadMessage.getMsgId());
}

public Pair<Integer, String> reloadSegment(String tableNameWithType, String segmentName, boolean forceDownload) {
LOGGER.info("Sending reload message for segment: {} in table: {} with forceDownload: {}", segmentName,
tableNameWithType, forceDownload);
public Pair<Integer, String> reloadSegment(String tableNameWithType, String segmentName, boolean forceDownload,
@Nullable String targetInstance) {
LOGGER.info("Sending reload message for segment: {} in table: {} with forceDownload: {}, and target: {}",
segmentName, tableNameWithType, forceDownload, targetInstance == null ? "every instance" : targetInstance);

if (forceDownload) {
TableType tt = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
Expand All @@ -2649,7 +2652,7 @@ public Pair<Integer, String> reloadSegment(String tableNameWithType, String segm

Criteria recipientCriteria = new Criteria();
recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
recipientCriteria.setInstanceName("%");
recipientCriteria.setInstanceName(targetInstance == null ? "%" : targetInstance);
recipientCriteria.setResource(tableNameWithType);
recipientCriteria.setPartition(segmentName);
recipientCriteria.setSessionSpecific(true);
Expand Down
Loading