-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Add controller API for reload segment task status #8828
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
8fcac1f
939f992
b609d6b
a67de69
0402c7a
cf5362b
d1c19b8
0bdd724
016ae6e
e77d3eb
6a666ca
b8f9b3a
38a0d0d
1252a1c
a9bb169
9c68c6d
a23755e
a992c0f
77f1c6f
51069f0
9ee7b86
7dbfe3f
00f0170
02d421c
4287096
ca6dcd9
5404a06
9483cc4
2eff6ce
f03d7bf
ba386fb
9759922
ae6ca35
c1a94fe
966e46f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,24 @@ | ||
| /** | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.apache.pinot.common.metadata.controllerjob; | ||
|
|
||
| public enum ControllerJobType { | ||
| RELOAD_SEGMENT, | ||
| RELOAD_ALL_SEGMENTS | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,6 +22,7 @@ | |
| import com.fasterxml.jackson.databind.JsonNode; | ||
| import com.google.common.base.Preconditions; | ||
| import com.google.common.base.Strings; | ||
| import com.google.common.collect.BiMap; | ||
| import io.swagger.annotations.Api; | ||
| import io.swagger.annotations.ApiKeyAuthDefinition; | ||
| import io.swagger.annotations.ApiOperation; | ||
|
|
@@ -33,6 +34,7 @@ | |
| import io.swagger.annotations.SwaggerDefinition; | ||
| import java.io.IOException; | ||
| import java.util.ArrayList; | ||
| import java.util.Arrays; | ||
| import java.util.Collections; | ||
| import java.util.HashMap; | ||
| import java.util.LinkedHashMap; | ||
|
|
@@ -57,11 +59,13 @@ | |
| import javax.ws.rs.core.Response; | ||
| import javax.ws.rs.core.Response.Status; | ||
| import org.apache.commons.httpclient.HttpConnectionManager; | ||
| import org.apache.commons.lang3.tuple.Pair; | ||
| import org.apache.helix.store.zk.ZkHelixPropertyStore; | ||
| import org.apache.helix.zookeeper.datamodel.ZNRecord; | ||
| import org.apache.pinot.common.exception.InvalidConfigException; | ||
| import org.apache.pinot.common.lineage.SegmentLineage; | ||
| import org.apache.pinot.common.metadata.ZKMetadataProvider; | ||
| import org.apache.pinot.common.metadata.controllerjob.ControllerJobType; | ||
| import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; | ||
| import org.apache.pinot.common.utils.SegmentName; | ||
| import org.apache.pinot.common.utils.URIUtils; | ||
|
|
@@ -71,10 +75,12 @@ | |
| import org.apache.pinot.controller.api.exception.ControllerApplicationException; | ||
| import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; | ||
| import org.apache.pinot.controller.helix.core.PinotResourceManagerResponse; | ||
| import org.apache.pinot.controller.util.CompletionServiceHelper; | ||
| import org.apache.pinot.controller.util.ConsumingSegmentInfoReader; | ||
| import org.apache.pinot.controller.util.TableMetadataReader; | ||
| import org.apache.pinot.controller.util.TableTierReader; | ||
| import org.apache.pinot.spi.config.table.TableType; | ||
| import org.apache.pinot.spi.utils.CommonConstants; | ||
| import org.apache.pinot.spi.utils.JsonUtils; | ||
| import org.apache.pinot.spi.utils.builder.TableNameBuilder; | ||
| import org.slf4j.Logger; | ||
|
|
@@ -459,9 +465,25 @@ public SuccessResponse reloadSegment( | |
| TableType tableType = SegmentName.isRealtimeSegmentName(segmentName) ? TableType.REALTIME : TableType.OFFLINE; | ||
| String tableNameWithType = | ||
| ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableType, LOGGER).get(0); | ||
| int numMessagesSent = _pinotHelixResourceManager.reloadSegment(tableNameWithType, segmentName, forceDownload); | ||
| if (numMessagesSent > 0) { | ||
| return new SuccessResponse("Sent " + numMessagesSent + " reload messages"); | ||
| Pair<Integer, String> msgInfo = | ||
| _pinotHelixResourceManager.reloadSegment(tableNameWithType, segmentName, forceDownload); | ||
| boolean zkJobMetaWriteSuccess = false; | ||
| if (msgInfo.getLeft() > 0) { | ||
| try { | ||
| if (_pinotHelixResourceManager.addNewReloadSegmentJob(tableNameWithType, segmentName, msgInfo.getRight(), | ||
| msgInfo.getLeft())) { | ||
| zkJobMetaWriteSuccess = true; | ||
| } else { | ||
| LOGGER.error("Failed to add reload segment job meta into zookeeper for table: {}, segment: {}", | ||
| tableNameWithType, segmentName); | ||
| } | ||
| } catch (Exception e) { | ||
| LOGGER.error("Failed to add reload segment job meta into zookeeper for table: {}, segment: {}", | ||
| tableNameWithType, segmentName, e); | ||
| } | ||
| return new SuccessResponse( | ||
| String.format("Submitted reload job id: %s, sent %d reload messages. Job meta ZK storage status: %s", | ||
| msgInfo.getRight(), msgInfo.getLeft(), zkJobMetaWriteSuccess ? "SUCCESS" : "FAILED")); | ||
| } else { | ||
| throw new ControllerApplicationException(LOGGER, | ||
| "Failed to find segment: " + segmentName + " in table: " + tableName, Status.NOT_FOUND); | ||
|
|
@@ -558,7 +580,7 @@ public SuccessResponse reloadSegmentDeprecated1( | |
| LOGGER); | ||
| int numMessagesSent = 0; | ||
| for (String tableNameWithType : tableNamesWithType) { | ||
| numMessagesSent += _pinotHelixResourceManager.reloadSegment(tableNameWithType, segmentName, false); | ||
| numMessagesSent += _pinotHelixResourceManager.reloadSegment(tableNameWithType, segmentName, false).getLeft(); | ||
| } | ||
| return new SuccessResponse("Sent " + numMessagesSent + " reload messages"); | ||
| } | ||
|
|
@@ -577,6 +599,107 @@ public SuccessResponse reloadSegmentDeprecated2( | |
| return reloadSegmentDeprecated1(tableName, segmentName, tableTypeStr); | ||
| } | ||
|
|
||
| @GET | ||
| @Path("segments/segmentReloadStatus/{jobId}") | ||
| @Produces(MediaType.APPLICATION_JSON) | ||
| @ApiOperation(value = "Get status for a submitted reload operation", | ||
| notes = "Get status for a submitted reload operation") | ||
| public ServerReloadControllerJobStatusResponse getReloadJobStatus( | ||
| @ApiParam(value = "Reload job id", required = true) @PathParam("jobId") String reloadJobId) | ||
| throws Exception { | ||
| Map<String, String> controllerJobZKMetadata = _pinotHelixResourceManager.getControllerJobZKMetadata(reloadJobId); | ||
| if (controllerJobZKMetadata == null) { | ||
| throw new ControllerApplicationException(LOGGER, "Failed to find controller job id: " + reloadJobId, | ||
| Status.NOT_FOUND); | ||
| } | ||
|
|
||
| String tableNameWithType = | ||
| controllerJobZKMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE); | ||
| Map<String, List<String>> serverToSegments; | ||
|
|
||
| String singleSegmentName = null; | ||
| if (controllerJobZKMetadata.get(CommonConstants.ControllerJob.JOB_TYPE) | ||
| .equals(ControllerJobType.RELOAD_SEGMENT.toString())) { | ||
| // No need to query servers where this segment is not supposed to be hosted | ||
saurabhd336 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| singleSegmentName = controllerJobZKMetadata.get(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME); | ||
| serverToSegments = new HashMap<>(); | ||
| List<String> segmentList = Arrays.asList(singleSegmentName); | ||
| _pinotHelixResourceManager.getServers(tableNameWithType, singleSegmentName).forEach(server -> { | ||
| serverToSegments.put(server, segmentList); | ||
| }); | ||
| } else { | ||
| serverToSegments = _pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType); | ||
| } | ||
|
|
||
| BiMap<String, String> serverEndPoints = | ||
| _pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet()); | ||
| CompletionServiceHelper completionServiceHelper = | ||
| new CompletionServiceHelper(_executor, _connectionManager, serverEndPoints); | ||
|
|
||
| List<String> serverUrls = new ArrayList<>(); | ||
Jackie-Jiang marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| BiMap<String, String> endpointsToServers = serverEndPoints.inverse(); | ||
| for (String endpoint : endpointsToServers.keySet()) { | ||
| String reloadTaskStatusEndpoint = | ||
| endpoint + "/controllerJob/reloadStatus/" + tableNameWithType + "?reloadJobTimestamp=" | ||
| + controllerJobZKMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS); | ||
| if (singleSegmentName != null) { | ||
| reloadTaskStatusEndpoint = reloadTaskStatusEndpoint + "&segmentName=" + singleSegmentName; | ||
| } | ||
| serverUrls.add(reloadTaskStatusEndpoint); | ||
| } | ||
|
|
||
| CompletionServiceHelper.CompletionServiceResponse serviceResponse = | ||
saurabhd336 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| completionServiceHelper.doMultiGetRequest(serverUrls, null, true, 10000); | ||
|
|
||
| ServerReloadControllerJobStatusResponse serverReloadControllerJobStatusResponse = | ||
| new ServerReloadControllerJobStatusResponse(); | ||
| serverReloadControllerJobStatusResponse.setSuccessCount(0); | ||
|
|
||
| int totalSegments = 0; | ||
| for (Map.Entry<String, List<String>> entry: serverToSegments.entrySet()) { | ||
| totalSegments += entry.getValue().size(); | ||
| } | ||
| serverReloadControllerJobStatusResponse.setTotalSegmentCount(totalSegments); | ||
| serverReloadControllerJobStatusResponse.setTotalServersQueried(serverUrls.size()); | ||
| serverReloadControllerJobStatusResponse.setTotalServerCallsFailed(serviceResponse._failedResponseCount); | ||
|
|
||
| for (Map.Entry<String, String> streamResponse : serviceResponse._httpResponses.entrySet()) { | ||
| String responseString = streamResponse.getValue(); | ||
| try { | ||
| ServerReloadControllerJobStatusResponse response = | ||
| JsonUtils.stringToObject(responseString, ServerReloadControllerJobStatusResponse.class); | ||
|
Comment on lines
+669
to
+670
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why are we using
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed that this is risky assumption. We can change this to |
||
| serverReloadControllerJobStatusResponse.setSuccessCount( | ||
| serverReloadControllerJobStatusResponse.getSuccessCount() + response.getSuccessCount()); | ||
| } catch (Exception e) { | ||
| serverReloadControllerJobStatusResponse.setTotalServerCallsFailed( | ||
| serverReloadControllerJobStatusResponse.getTotalServerCallsFailed() + 1 | ||
| ); | ||
| } | ||
| } | ||
|
|
||
| // Add ZK fields | ||
| serverReloadControllerJobStatusResponse.setMetadata(controllerJobZKMetadata); | ||
|
|
||
| // Add derived fields | ||
| long submissionTime = | ||
| Long.parseLong(controllerJobZKMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS)); | ||
| double timeElapsedInMinutes = ((double) System.currentTimeMillis() - (double) submissionTime) / (1000.0 * 60.0); | ||
| int remainingSegments = serverReloadControllerJobStatusResponse.getTotalSegmentCount() | ||
| - serverReloadControllerJobStatusResponse.getSuccessCount(); | ||
|
|
||
| double estimatedRemainingTimeInMinutes = -1; | ||
| if (serverReloadControllerJobStatusResponse.getSuccessCount() > 0) { | ||
| estimatedRemainingTimeInMinutes = | ||
| ((double) remainingSegments / (double) serverReloadControllerJobStatusResponse.getSuccessCount()) | ||
| * timeElapsedInMinutes; | ||
| } | ||
|
|
||
| serverReloadControllerJobStatusResponse.setTimeElapsedInMinutes(timeElapsedInMinutes); | ||
| serverReloadControllerJobStatusResponse.setEstimatedTimeRemainingInMinutes(estimatedRemainingTimeInMinutes); | ||
|
|
||
| return serverReloadControllerJobStatusResponse; | ||
| } | ||
|
|
||
| @POST | ||
| @Path("segments/{tableName}/reload") | ||
| @Authenticate(AccessType.UPDATE) | ||
|
|
@@ -586,7 +709,8 @@ public SuccessResponse reloadAllSegments( | |
| @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName, | ||
| @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String tableTypeStr, | ||
| @ApiParam(value = "Whether to force server to download segment") @QueryParam("forceDownload") | ||
| @DefaultValue("false") boolean forceDownload) { | ||
| @DefaultValue("false") boolean forceDownload) | ||
| throws JsonProcessingException { | ||
| TableType tableTypeFromTableName = TableNameBuilder.getTableTypeFromTableName(tableName); | ||
| TableType tableTypeFromRequest = Constants.validateTableType(tableTypeStr); | ||
| // When rawTableName is provided but w/o table type, Pinot tries to reload both OFFLINE | ||
|
|
@@ -597,14 +721,31 @@ public SuccessResponse reloadAllSegments( | |
| if (forceDownload && (tableTypeFromTableName == null && tableTypeFromRequest == null)) { | ||
| tableTypeFromRequest = TableType.OFFLINE; | ||
| } | ||
| List<String> tableNamesWithType = ResourceUtils | ||
| .getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableTypeFromRequest, LOGGER); | ||
| Map<String, Integer> numMessagesSentPerTable = new LinkedHashMap<>(); | ||
| List<String> tableNamesWithType = | ||
| ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableTypeFromRequest, | ||
| LOGGER); | ||
| Map<String, Map<String, String>> perTableMsgData = new LinkedHashMap<>(); | ||
| for (String tableNameWithType : tableNamesWithType) { | ||
| int numMsgSent = _pinotHelixResourceManager.reloadAllSegments(tableNameWithType, forceDownload); | ||
| numMessagesSentPerTable.put(tableNameWithType, numMsgSent); | ||
| Pair<Integer, String> msgInfo = _pinotHelixResourceManager.reloadAllSegments(tableNameWithType, forceDownload); | ||
| Map<String, String> tableReloadMeta = new HashMap<>(); | ||
| tableReloadMeta.put("numMessagesSent", String.valueOf(msgInfo.getLeft())); | ||
| tableReloadMeta.put("reloadJobId", msgInfo.getRight()); | ||
| perTableMsgData.put(tableNameWithType, tableReloadMeta); | ||
| // Store in ZK | ||
| try { | ||
| if (_pinotHelixResourceManager.addNewReloadAllSegmentsJob(tableNameWithType, msgInfo.getRight(), | ||
| msgInfo.getLeft())) { | ||
| tableReloadMeta.put("reloadJobMetaZKStorageStatus", "SUCCESS"); | ||
| } else { | ||
| tableReloadMeta.put("reloadJobMetaZKStorageStatus", "FAILED"); | ||
| LOGGER.error("Failed to add reload all segments job meta into zookeeper for table: {}", tableNameWithType); | ||
| } | ||
| } catch (Exception e) { | ||
| tableReloadMeta.put("reloadJobMetaZKStorageStatus", "FAILED"); | ||
| LOGGER.error("Failed to add reload all segments job meta into zookeeper for table: {}", tableNameWithType, e); | ||
| } | ||
| } | ||
| return new SuccessResponse("Sent " + numMessagesSentPerTable + " reload messages"); | ||
| return new SuccessResponse("Segment reload details: " + JsonUtils.objectToString(perTableMsgData)); | ||
| } | ||
|
|
||
| @Deprecated | ||
|
|
@@ -622,7 +763,7 @@ public SuccessResponse reloadAllSegmentsDeprecated1( | |
| LOGGER); | ||
| int numMessagesSent = 0; | ||
| for (String tableNameWithType : tableNamesWithType) { | ||
| numMessagesSent += _pinotHelixResourceManager.reloadAllSegments(tableNameWithType, false); | ||
| numMessagesSent += _pinotHelixResourceManager.reloadAllSegments(tableNameWithType, false).getLeft(); | ||
| } | ||
| return new SuccessResponse("Sent " + numMessagesSent + " reload messages"); | ||
| } | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.