-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Add a new controller endpoint for segment deletion with a time window #10758
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
62fae96
98c7fea
30e5ca2
3a99498
052b1e5
6b58a73
f23eb6c
2d0507f
cd03c63
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -41,6 +41,7 @@ | |
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.concurrent.Executor; | ||
| import java.util.stream.Collectors; | ||
| import javax.annotation.Nullable; | ||
| import javax.inject.Inject; | ||
| import javax.ws.rs.Consumes; | ||
|
|
@@ -204,32 +205,12 @@ public List<Map<TableType, List<String>>> getSegments( | |
| String endTimestampStr, | ||
| @ApiParam(value = "Whether to exclude the segments overlapping with the timestamps, false by default") | ||
| @QueryParam("excludeOverlapping") @DefaultValue("false") boolean excludeOverlapping) { | ||
| long startTimestamp; | ||
| long endTimestamp; | ||
| try { | ||
| startTimestamp = Strings.isNullOrEmpty(startTimestampStr) ? Long.MIN_VALUE : Long.parseLong(startTimestampStr); | ||
| endTimestamp = Strings.isNullOrEmpty(endTimestampStr) ? Long.MAX_VALUE : Long.parseLong(endTimestampStr); | ||
| } catch (NumberFormatException e) { | ||
| throw new ControllerApplicationException(LOGGER, | ||
| "Failed to parse the start/end timestamp. Please make sure they are in 'millisSinceEpoch' format.", | ||
| Response.Status.BAD_REQUEST, e); | ||
| } | ||
| Preconditions.checkArgument(startTimestamp < endTimestamp, | ||
| "The value of startTimestamp should be smaller than the one of endTimestamp. Start timestamp: %d. End " | ||
| + "timestamp: %d", startTimestamp, endTimestamp); | ||
|
|
||
| List<String> tableNamesWithType = ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, | ||
| Constants.validateTableType(tableTypeStr), LOGGER); | ||
| boolean shouldExcludeReplacedSegments = Boolean.parseBoolean(excludeReplacedSegments); | ||
| List<Map<TableType, List<String>>> resultList = new ArrayList<>(tableNamesWithType.size()); | ||
| for (String tableNameWithType : tableNamesWithType) { | ||
| TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); | ||
| List<String> segments = | ||
| _pinotHelixResourceManager.getSegmentsFor(tableNameWithType, shouldExcludeReplacedSegments, startTimestamp, | ||
| endTimestamp, excludeOverlapping); | ||
| resultList.add(Collections.singletonMap(tableType, segments)); | ||
| } | ||
| return resultList; | ||
| return selectSegments(tableName, tableTypeStr, shouldExcludeReplacedSegments, | ||
| startTimestampStr, endTimestampStr, excludeOverlapping) | ||
| .stream() | ||
| .map(pair -> Collections.singletonMap(pair.getKey(), pair.getValue())) | ||
| .collect(Collectors.toList()); | ||
| } | ||
|
|
||
| @GET | ||
|
|
@@ -884,6 +865,52 @@ public SuccessResponse deleteSegments( | |
| } | ||
| } | ||
|
|
||
| @DELETE | ||
| @Consumes(MediaType.APPLICATION_JSON) | ||
| @Produces(MediaType.APPLICATION_JSON) | ||
| @Path("/segments/{tableName}/select") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @wirybeaver We deprecated Can we add new parameters to
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @snleee I actually copy the parameters from The existing
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @wirybeaver
We can keep the backward compatibility by picking all segments if no filter is provided. How do you think?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @snleee Thanks for your suggestion. From the perspective of safety, since it's a delete api, my hunt is to enforce users MUST input non-empty startTime and endTime, which will break the compatibility that delete all segment if no filter is provides. That's another reason to have different PATH. User would know they are doing a dangerous operation if they use I am OK to add filters into
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @snleee Just a gentle reminder for your thoughts.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @wirybeaver sorry for the late response on this. The current behavior for Adding filter (start, end) means that we will delete the segments matching the filter. I think that we are keeping the backward compatibility and not breaking any existing behavior? Am I missing something?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wanted to enforce both start and end to be non empty to avoid the case that customers incidentally delete all segments due to forgetting inputing (start, end). However, since start and end must be non empty, current If you thought we don't need to validate empty start and end, I am good to reuse the existing api
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see. You need the protection. In that case, I'm good with keeping the separate API. It would be nice to have if 'DELETE /segments' also gets extra filter just to have the feature parity with
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will have a cosmetic change.use |
||
| @Authenticate(AccessType.DELETE) | ||
| @ApiOperation(value = "Delete selected segments. An optional 'excludeReplacedSegments' parameter is used to get the" | ||
| + " list of segments which has not yet been replaced (determined by segment lineage entries) and can be queried" | ||
| + " from the table. The value is false by default.", | ||
| // TODO: more and more filters can be added later on, like excludeErrorSegments, excludeConsumingSegments, etc. | ||
| notes = "List all segments") | ||
| public SuccessResponse deleteSegmentsWithTimeWindow( | ||
| @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName, | ||
| @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String tableTypeStr, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should this be required? do we have a default value for this?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If not specified, both realtime and offline parts will be deleted, which make sense to me. |
||
| @ApiParam(value = "Whether to ignore replaced segments for deletion, which have been replaced" | ||
| + " specified in the segment lineage entries and cannot be queried from the table, false by default") | ||
| @QueryParam("excludeReplacedSegments") @DefaultValue("false") boolean excludeReplacedSegments, | ||
| @ApiParam(value = "Start timestamp (inclusive) in milliseconds", required = true) @QueryParam("startTimestamp") | ||
| String startTimestampStr, | ||
| @ApiParam(value = "End timestamp (exclusive) in milliseconds", required = true) @QueryParam("endTimestamp") | ||
| String endTimestampStr, | ||
| @ApiParam(value = "Whether to ignore segments that are partially overlapping with the [start, end)" | ||
| + "for deletion, true by default") | ||
| @QueryParam("excludeOverlapping") @DefaultValue("true") boolean excludeOverlapping, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this value be false by default?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IIUC, the excludeOverlapping here means exclude partially overlapped segments. Let say the segment's min and max time is [100L, 200L]. And the input [start_time, end time] is [150L, 250L]. Since segment contains some data before 150L, I guess customer prefer not deleting this segment, and thus I set the default value to true, indicating "partial overlapping segments won't be delelted"
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Rephrase the ApiParam value. keep segments -> ignore segments for deletion. |
||
| @ApiParam(value = "Retention period for the table segments (e.g. 12h, 3d); If not set, the retention period " | ||
| + "will default to the first config that's not null: the table config, then to cluster setting, then '7d'. " | ||
| + "Using 0d or -1d will instantly delete segments without retention") | ||
| @QueryParam("retention") String retentionPeriod) { | ||
| if (Strings.isNullOrEmpty(startTimestampStr) || Strings.isNullOrEmpty(endTimestampStr)) { | ||
| throw new ControllerApplicationException(LOGGER, "start and end timestamp must by non empty", Status.BAD_REQUEST); | ||
| } | ||
|
|
||
| int numSegments = 0; | ||
| for (Pair<TableType, List<String>> tableTypeSegments : selectSegments( | ||
| tableName, tableTypeStr, excludeReplacedSegments, startTimestampStr, endTimestampStr, excludeOverlapping)) { | ||
| TableType tableType = tableTypeSegments.getLeft(); | ||
| List<String> segments = tableTypeSegments.getRight(); | ||
| numSegments += segments.size(); | ||
| if (segments.isEmpty()) { | ||
| continue; | ||
| } | ||
| String tableNameWithType = TableNameBuilder.forType(tableType).tableNameWithType(tableName); | ||
| deleteSegmentsInternal(tableNameWithType, segments, retentionPeriod); | ||
| } | ||
| return new SuccessResponse("Deleted " + numSegments + " segments from table: " + tableName); | ||
| } | ||
|
|
||
| private void deleteSegmentsInternal(String tableNameWithType, List<String> segments, String retentionPeriod) { | ||
| PinotResourceManagerResponse response = _pinotHelixResourceManager.deleteSegments(tableNameWithType, segments, | ||
| retentionPeriod); | ||
|
|
@@ -1018,9 +1045,9 @@ private TableTierReader.TableTierDetails getTableTierInternal(String tableName, | |
| public List<Map<TableType, List<String>>> getSelectedSegments( | ||
| @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName, | ||
| @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String tableTypeStr, | ||
| @ApiParam(value = "Start timestamp (inclusive)") @QueryParam("startTimestamp") @DefaultValue("") | ||
| @ApiParam(value = "Start timestamp (inclusive) in milliseconds") @QueryParam("startTimestamp") @DefaultValue("") | ||
| String startTimestampStr, | ||
| @ApiParam(value = "End timestamp (exclusive)") @QueryParam("endTimestamp") @DefaultValue("") | ||
| @ApiParam(value = "End timestamp (exclusive) in milliseconds") @QueryParam("endTimestamp") @DefaultValue("") | ||
| String endTimestampStr, | ||
| @ApiParam(value = "Whether to exclude the segments overlapping with the timestamps, false by default") | ||
| @QueryParam("excludeOverlapping") @DefaultValue("false") boolean excludeOverlapping) { | ||
|
|
@@ -1130,4 +1157,34 @@ private SuccessResponse updateZKTimeIntervalInternal(String tableNameWithType) { | |
| } | ||
| return new SuccessResponse("Successfully updated time interval zk metadata for table: " + tableNameWithType); | ||
| } | ||
|
|
||
| private List<Pair<TableType, List<String>>> selectSegments( | ||
| String tableName, String tableTypeStr, boolean excludeReplacedSegments, String startTimestampStr, | ||
| String endTimestampStr, boolean excludeOverlapping) { | ||
| long startTimestamp; | ||
| long endTimestamp; | ||
| try { | ||
| startTimestamp = Strings.isNullOrEmpty(startTimestampStr) ? Long.MIN_VALUE : Long.parseLong(startTimestampStr); | ||
| endTimestamp = Strings.isNullOrEmpty(endTimestampStr) ? Long.MAX_VALUE : Long.parseLong(endTimestampStr); | ||
| } catch (NumberFormatException e) { | ||
| throw new ControllerApplicationException(LOGGER, | ||
| "Failed to parse the start/end timestamp. Please make sure they are in 'millisSinceEpoch' format.", | ||
| Response.Status.BAD_REQUEST, e); | ||
| } | ||
| Preconditions.checkArgument(startTimestamp < endTimestamp, | ||
| "The value of startTimestamp should be smaller than the one of endTimestamp. Start timestamp: %d. End " | ||
| + "timestamp: %d", startTimestamp, endTimestamp); | ||
|
|
||
| List<String> tableNamesWithType = ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, | ||
| Constants.validateTableType(tableTypeStr), LOGGER); | ||
| List<Pair<TableType, List<String>>> resultList = new ArrayList<>(tableNamesWithType.size()); | ||
| for (String tableNameWithType : tableNamesWithType) { | ||
| TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); | ||
| List<String> segments = | ||
| _pinotHelixResourceManager.getSegmentsFor(tableNameWithType, excludeReplacedSegments, startTimestamp, | ||
| endTimestamp, excludeOverlapping); | ||
| resultList.add(Pair.of(tableType, segments)); | ||
| } | ||
| return resultList; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,6 +24,7 @@ | |
| import java.util.HashMap; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.concurrent.TimeUnit; | ||
| import org.apache.pinot.controller.helix.ControllerTest; | ||
| import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; | ||
| import org.apache.pinot.controller.utils.SegmentMetadataMockUtils; | ||
|
|
@@ -165,6 +166,39 @@ public void testSegmentCrcApi() | |
| checkCrcRequest(rawTableName, segmentMetadataTable, 9); | ||
| } | ||
|
|
||
| @Test | ||
| public void testDeleteSegmentsWithTimeWindow() | ||
wirybeaver marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| throws Exception { | ||
| // Adding table and segment | ||
| String rawTableName = "deleteWithTimeWindowTestTable"; | ||
| String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(rawTableName); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you add a test case that timeindow covers both realtime and offline table |
||
| TableConfig tableConfig = | ||
| new TableConfigBuilder(TableType.OFFLINE).setTableName(rawTableName).setNumReplicas(1) | ||
| .setDeletedSegmentsRetentionPeriod("0d").build(); | ||
| PinotHelixResourceManager resourceManager = TEST_INSTANCE.getHelixResourceManager(); | ||
| resourceManager.addTable(tableConfig); | ||
| SegmentMetadata segmentMetadata = SegmentMetadataMockUtils.mockSegmentMetadata(rawTableName, | ||
| 10L, 20L, TimeUnit.MILLISECONDS); | ||
| resourceManager.addNewSegment(offlineTableName, segmentMetadata, "downloadUrl"); | ||
|
|
||
| // Send query and verify | ||
| ControllerRequestURLBuilder urlBuilder = TEST_INSTANCE.getControllerRequestURLBuilder(); | ||
| // case 1: no overlapping | ||
| String reply = ControllerTest.sendDeleteRequest(urlBuilder.forSegmentDeleteWithTimeWindowAPI( | ||
| rawTableName, 0L, 10L)); | ||
| assertTrue(reply.contains("Deleted 0 segments")); | ||
|
|
||
| // case 2: partial overlapping | ||
| reply = ControllerTest.sendDeleteRequest(urlBuilder.forSegmentDeleteWithTimeWindowAPI( | ||
| rawTableName, 10L, 20L)); | ||
| assertTrue(reply.contains("Deleted 0 segments")); | ||
|
|
||
| // case 3: fully within the time window | ||
| reply = ControllerTest.sendDeleteRequest(urlBuilder.forSegmentDeleteWithTimeWindowAPI( | ||
| rawTableName, 10L, 21L)); | ||
| assertTrue(reply.contains("Deleted 1 segments")); | ||
| } | ||
|
|
||
| private void checkCrcRequest(String tableName, Map<String, SegmentMetadata> metadataTable, int expectedSize) | ||
| throws Exception { | ||
| String crcMapStr = ControllerTest.sendGetRequest( | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please elaborate on why a new api is needed instead of adding more parameters for the existing segment deletion api?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We want to be able to delete data older than a given age using an API. At present I think we only have APIs that either delete all segments or delete one particular segment.
This functionality should be useful to everyone since this is a common requirement when you have a lot of customers using Pinot.
re: using an existing API, we could consider that but afaik there's no clean way of integrating this behavior of deletion in the existing APIs. Let us know if you have any API in mind.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's an delete api to delete list of segments, but it's not user friendly. Users need to use the get api with time window and then invoke the deletion api. Moreover, the new delete api flip the default value of excludeOverlapping (false -> true) and excludeReplacedSegments (true -> false) of get api to cater with more intuitive deletion requirement. If the segment min-max partially overlap with the give time window, the segments might contain data out of given time window. Most of customers don't expect to delete those data. Thus excludeOverlapping is set to true, meaning ignore segment that are partially overlapped for deletion. For excludeReplacedSegments, the default value is set to false, meaning completely delete segments regardless of the replaced states.
@jtao15 do you have any objections?