Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.inject.Inject;
import javax.ws.rs.Consumes;
Expand Down Expand Up @@ -204,32 +205,12 @@ public List<Map<TableType, List<String>>> getSegments(
String endTimestampStr,
@ApiParam(value = "Whether to exclude the segments overlapping with the timestamps, false by default")
@QueryParam("excludeOverlapping") @DefaultValue("false") boolean excludeOverlapping) {
long startTimestamp;
long endTimestamp;
try {
startTimestamp = Strings.isNullOrEmpty(startTimestampStr) ? Long.MIN_VALUE : Long.parseLong(startTimestampStr);
endTimestamp = Strings.isNullOrEmpty(endTimestampStr) ? Long.MAX_VALUE : Long.parseLong(endTimestampStr);
} catch (NumberFormatException e) {
throw new ControllerApplicationException(LOGGER,
"Failed to parse the start/end timestamp. Please make sure they are in 'millisSinceEpoch' format.",
Response.Status.BAD_REQUEST, e);
}
Preconditions.checkArgument(startTimestamp < endTimestamp,
"The value of startTimestamp should be smaller than the one of endTimestamp. Start timestamp: %d. End "
+ "timestamp: %d", startTimestamp, endTimestamp);

List<String> tableNamesWithType = ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName,
Constants.validateTableType(tableTypeStr), LOGGER);
boolean shouldExcludeReplacedSegments = Boolean.parseBoolean(excludeReplacedSegments);
List<Map<TableType, List<String>>> resultList = new ArrayList<>(tableNamesWithType.size());
for (String tableNameWithType : tableNamesWithType) {
TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
List<String> segments =
_pinotHelixResourceManager.getSegmentsFor(tableNameWithType, shouldExcludeReplacedSegments, startTimestamp,
endTimestamp, excludeOverlapping);
resultList.add(Collections.singletonMap(tableType, segments));
}
return resultList;
return selectSegments(tableName, tableTypeStr, shouldExcludeReplacedSegments,
startTimestampStr, endTimestampStr, excludeOverlapping)
.stream()
.map(pair -> Collections.singletonMap(pair.getKey(), pair.getValue()))
.collect(Collectors.toList());
}

@GET
Expand Down Expand Up @@ -884,6 +865,52 @@ public SuccessResponse deleteSegments(
}
}

@DELETE
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
@Path("/segments/{tableName}/select")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please elaborate on why a new api is needed instead of adding more parameters for the existing segment deletion api?

Copy link
Contributor

@ankitsultana ankitsultana Jun 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to be able to delete data older than a given age using an API. At present I think we only have APIs that either delete all segments or delete one particular segment.

This functionality should be useful to everyone since this is a common requirement when you have a lot of customers using Pinot.

re: using an existing API, we could consider that but afaik there's no clean way of integrating this behavior of deletion in the existing APIs. Let us know if you have any API in mind.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's an delete api to delete list of segments, but it's not user friendly. Users need to use the get api with time window and then invoke the deletion api. Moreover, the new delete api flip the default value of excludeOverlapping (false -> true) and excludeReplacedSegments (true -> false) of get api to cater with more intuitive deletion requirement. If the segment min-max partially overlap with the give time window, the segments might contain data out of given time window. Most of customers don't expect to delete those data. Thus excludeOverlapping is set to true, meaning ignore segment that are partially overlapped for deletion. For excludeReplacedSegments, the default value is set to false, meaning completely delete segments regardless of the replaced states.

@jtao15 do you have any objections?

Copy link
Contributor

@snleee snleee Jul 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wirybeaver We deprecated GET /segments/{tableName}/select over GET /segments/{tableName}. (We added the annotation).

Can we add new parameters to DELETE /segments/{tableName} API?

Copy link
Contributor Author

@wirybeaver wirybeaver Jul 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@snleee I actually copy the parameters from GET /segments/{tableName}, and enforce GET /segments/{tableName} and this delete function use the same private method selectSegments. It's just a coincidence that the path is identical to the deprecated GET /segments/{tableName}/select. I can rename the path to DELETE /segments/{tableName}/choose.

The existing DELETE /segments/{tableName} has a note @ApiOperation(value = "Delete all segments", notes = "Delete all segments"), that's the reason why I want to have a different path.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wirybeaver
We can change the convention to be:

Delete all segments -> delete all segments that satisfies the filter.

startTime, endTime, excludeReplacedSegments. Please take a look at /segments/{tableName}. It already has those filters for fetching segments.

We can keep the backward compatibility by picking all segments if no filter is provided. How do you think?

Copy link
Contributor Author

@wirybeaver wirybeaver Jul 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@snleee Thanks for your suggestion. From the perspective of safety, since it's a delete api, my hunt is to enforce users MUST input non-empty startTime and endTime, which will break the compatibility that delete all segment if no filter is provides. That's another reason to have different PATH. User would know they are doing a dangerous operation if they use DELETE /segments/{tableName} meanwhile DELETE /segments/{tableName}/choose has a safety guard even though they forgot to add the timestamp.

I am OK to add filters into DELETE /segments/{tableName} and remove DELETE /segments/{tableName}/choose if you thought the lack of prevention of missing time window is not a major concern.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@snleee Just a gentle reminder for your thoughts.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@snleee @jtao15 Could you share your idea when you are available?

Copy link
Contributor

@snleee snleee Jul 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wirybeaver sorry for the late response on this.

The current behavior for DELETE /segments/{tableName} API is already to delete all segments in the table.

Adding filter (start, end) means that we will delete the segments matching the filter. I think that we are keeping the backward compatibility and not breaking any existing behavior?

Am I missing something?

Copy link
Contributor Author

@wirybeaver wirybeaver Jul 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to enforce both start and end to be non empty to avoid the case that customers incidentally delete all segments due to forgetting inputing (start, end). However, since start and end must be non empty, current DELETE /segments/{tableName} will fail, which is a breaking change.

If you thought we don't need to validate empty start and end, I am good to reuse the existing api

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. You need the protection. In that case, I'm good with keeping the separate API. It would be nice to have if 'DELETE /segments' also gets extra filter just to have the feature parity with GET /segments. Anyway, this can be addressed as part of the separate PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will have a cosmetic change.use DELETE /segments/{tableName}/choose instead of DELETE /segments/{tableName}/select

@Authenticate(AccessType.DELETE)
@ApiOperation(value = "Delete selected segments. An optional 'excludeReplacedSegments' parameter is used to get the"
+ " list of segments which has not yet been replaced (determined by segment lineage entries) and can be queried"
+ " from the table. The value is false by default.",
// TODO: more and more filters can be added later on, like excludeErrorSegments, excludeConsumingSegments, etc.
notes = "List all segments")
public SuccessResponse deleteSegmentsWithTimeWindow(
@ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
@ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String tableTypeStr,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be required? do we have a default value for this?
if not, how does it work for hybrid tables?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If not specified, both realtime and offline parts will be deleted, which make sense to me.

@ApiParam(value = "Whether to ignore replaced segments for deletion, which have been replaced"
+ " specified in the segment lineage entries and cannot be queried from the table, false by default")
@QueryParam("excludeReplacedSegments") @DefaultValue("false") boolean excludeReplacedSegments,
@ApiParam(value = "Start timestamp (inclusive) in milliseconds", required = true) @QueryParam("startTimestamp")
String startTimestampStr,
@ApiParam(value = "End timestamp (exclusive) in milliseconds", required = true) @QueryParam("endTimestamp")
String endTimestampStr,
@ApiParam(value = "Whether to ignore segments that are partially overlapping with the [start, end)"
+ "for deletion, true by default")
@QueryParam("excludeOverlapping") @DefaultValue("true") boolean excludeOverlapping,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this value be false by default?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, the excludeOverlapping here means exclude partially overlapped segments.

Let say the segment's min and max time is [100L, 200L]. And the input [start_time, end time] is [150L, 250L]. Since segment contains some data before 150L, I guess customer prefer not deleting this segment, and thus I set the default value to true, indicating "partial overlapping segments won't be delelted"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rephrase the ApiParam value. keep segments -> ignore segments for deletion.

@ApiParam(value = "Retention period for the table segments (e.g. 12h, 3d); If not set, the retention period "
+ "will default to the first config that's not null: the table config, then to cluster setting, then '7d'. "
+ "Using 0d or -1d will instantly delete segments without retention")
@QueryParam("retention") String retentionPeriod) {
if (Strings.isNullOrEmpty(startTimestampStr) || Strings.isNullOrEmpty(endTimestampStr)) {
throw new ControllerApplicationException(LOGGER, "start and end timestamp must by non empty", Status.BAD_REQUEST);
}

int numSegments = 0;
for (Pair<TableType, List<String>> tableTypeSegments : selectSegments(
tableName, tableTypeStr, excludeReplacedSegments, startTimestampStr, endTimestampStr, excludeOverlapping)) {
TableType tableType = tableTypeSegments.getLeft();
List<String> segments = tableTypeSegments.getRight();
numSegments += segments.size();
if (segments.isEmpty()) {
continue;
}
String tableNameWithType = TableNameBuilder.forType(tableType).tableNameWithType(tableName);
deleteSegmentsInternal(tableNameWithType, segments, retentionPeriod);
}
return new SuccessResponse("Deleted " + numSegments + " segments from table: " + tableName);
}

private void deleteSegmentsInternal(String tableNameWithType, List<String> segments, String retentionPeriod) {
PinotResourceManagerResponse response = _pinotHelixResourceManager.deleteSegments(tableNameWithType, segments,
retentionPeriod);
Expand Down Expand Up @@ -1018,9 +1045,9 @@ private TableTierReader.TableTierDetails getTableTierInternal(String tableName,
public List<Map<TableType, List<String>>> getSelectedSegments(
@ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
@ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String tableTypeStr,
@ApiParam(value = "Start timestamp (inclusive)") @QueryParam("startTimestamp") @DefaultValue("")
@ApiParam(value = "Start timestamp (inclusive) in milliseconds") @QueryParam("startTimestamp") @DefaultValue("")
String startTimestampStr,
@ApiParam(value = "End timestamp (exclusive)") @QueryParam("endTimestamp") @DefaultValue("")
@ApiParam(value = "End timestamp (exclusive) in milliseconds") @QueryParam("endTimestamp") @DefaultValue("")
String endTimestampStr,
@ApiParam(value = "Whether to exclude the segments overlapping with the timestamps, false by default")
@QueryParam("excludeOverlapping") @DefaultValue("false") boolean excludeOverlapping) {
Expand Down Expand Up @@ -1130,4 +1157,34 @@ private SuccessResponse updateZKTimeIntervalInternal(String tableNameWithType) {
}
return new SuccessResponse("Successfully updated time interval zk metadata for table: " + tableNameWithType);
}

private List<Pair<TableType, List<String>>> selectSegments(
String tableName, String tableTypeStr, boolean excludeReplacedSegments, String startTimestampStr,
String endTimestampStr, boolean excludeOverlapping) {
long startTimestamp;
long endTimestamp;
try {
startTimestamp = Strings.isNullOrEmpty(startTimestampStr) ? Long.MIN_VALUE : Long.parseLong(startTimestampStr);
endTimestamp = Strings.isNullOrEmpty(endTimestampStr) ? Long.MAX_VALUE : Long.parseLong(endTimestampStr);
} catch (NumberFormatException e) {
throw new ControllerApplicationException(LOGGER,
"Failed to parse the start/end timestamp. Please make sure they are in 'millisSinceEpoch' format.",
Response.Status.BAD_REQUEST, e);
}
Preconditions.checkArgument(startTimestamp < endTimestamp,
"The value of startTimestamp should be smaller than the one of endTimestamp. Start timestamp: %d. End "
+ "timestamp: %d", startTimestamp, endTimestamp);

List<String> tableNamesWithType = ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName,
Constants.validateTableType(tableTypeStr), LOGGER);
List<Pair<TableType, List<String>>> resultList = new ArrayList<>(tableNamesWithType.size());
for (String tableNameWithType : tableNamesWithType) {
TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
List<String> segments =
_pinotHelixResourceManager.getSegmentsFor(tableNameWithType, excludeReplacedSegments, startTimestamp,
endTimestamp, excludeOverlapping);
resultList.add(Pair.of(tableType, segments));
}
return resultList;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.controller.helix.ControllerTest;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
Expand Down Expand Up @@ -165,6 +166,39 @@ public void testSegmentCrcApi()
checkCrcRequest(rawTableName, segmentMetadataTable, 9);
}

@Test
public void testDeleteSegmentsWithTimeWindow()
throws Exception {
// Adding table and segment
String rawTableName = "deleteWithTimeWindowTestTable";
String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(rawTableName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a test case that timeindow covers both realtime and offline table

TableConfig tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName(rawTableName).setNumReplicas(1)
.setDeletedSegmentsRetentionPeriod("0d").build();
PinotHelixResourceManager resourceManager = TEST_INSTANCE.getHelixResourceManager();
resourceManager.addTable(tableConfig);
SegmentMetadata segmentMetadata = SegmentMetadataMockUtils.mockSegmentMetadata(rawTableName,
10L, 20L, TimeUnit.MILLISECONDS);
resourceManager.addNewSegment(offlineTableName, segmentMetadata, "downloadUrl");

// Send query and verify
ControllerRequestURLBuilder urlBuilder = TEST_INSTANCE.getControllerRequestURLBuilder();
// case 1: no overlapping
String reply = ControllerTest.sendDeleteRequest(urlBuilder.forSegmentDeleteWithTimeWindowAPI(
rawTableName, 0L, 10L));
assertTrue(reply.contains("Deleted 0 segments"));

// case 2: partial overlapping
reply = ControllerTest.sendDeleteRequest(urlBuilder.forSegmentDeleteWithTimeWindowAPI(
rawTableName, 10L, 20L));
assertTrue(reply.contains("Deleted 0 segments"));

// case 3: fully within the time window
reply = ControllerTest.sendDeleteRequest(urlBuilder.forSegmentDeleteWithTimeWindowAPI(
rawTableName, 10L, 21L));
assertTrue(reply.contains("Deleted 1 segments"));
}

private void checkCrcRequest(String tableName, Map<String, SegmentMetadata> metadataTable, int expectedSize)
throws Exception {
String crcMapStr = ControllerTest.sendGetRequest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,26 +39,38 @@ private SegmentMetadataMockUtils() {
}

public static SegmentMetadata mockSegmentMetadata(String tableName, String segmentName, int numTotalDocs,
String crc) {
String crc, long startTime, long endTime, TimeUnit timeUnit) {
SegmentMetadata segmentMetadata = Mockito.mock(SegmentMetadata.class);
Mockito.when(segmentMetadata.getTableName()).thenReturn(tableName);
Mockito.when(segmentMetadata.getName()).thenReturn(segmentName);
Mockito.when(segmentMetadata.getTotalDocs()).thenReturn(numTotalDocs);
Mockito.when(segmentMetadata.getCrc()).thenReturn(crc);
Mockito.when(segmentMetadata.getStartTime()).thenReturn(1L);
Mockito.when(segmentMetadata.getEndTime()).thenReturn(10L);
Mockito.when(segmentMetadata.getStartTime()).thenReturn(startTime);
Mockito.when(segmentMetadata.getEndTime()).thenReturn(endTime);
Mockito.when(segmentMetadata.getTimeInterval()).thenReturn(
new Interval(TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS),
TimeUnit.MILLISECONDS.convert(10, TimeUnit.DAYS)));
Mockito.when(segmentMetadata.getTimeUnit()).thenReturn(TimeUnit.DAYS);
new Interval(TimeUnit.MILLISECONDS.convert(startTime, timeUnit),
TimeUnit.MILLISECONDS.convert(endTime, timeUnit)));
Mockito.when(segmentMetadata.getTimeUnit()).thenReturn(timeUnit);
return segmentMetadata;
}

public static SegmentMetadata mockSegmentMetadata(String tableName, String segmentName, int numTotalDocs,
String crc) {
return mockSegmentMetadata(tableName, segmentName, numTotalDocs, crc, 1L, 10L, TimeUnit.DAYS);
}

public static SegmentMetadata mockSegmentMetadata(String tableName) {
String uniqueNumericString = Long.toString(System.nanoTime());
return mockSegmentMetadata(tableName, tableName + uniqueNumericString, 100, uniqueNumericString);
}

public static SegmentMetadata mockSegmentMetadata(String tableName, long startTime,
long endTime, TimeUnit timeUnit) {
String uniqueNumericString = Long.toString(System.nanoTime());
return mockSegmentMetadata(tableName, tableName + uniqueNumericString, 100,
uniqueNumericString, startTime, endTime, timeUnit);
}

public static SegmentMetadata mockSegmentMetadata(String tableName, String segmentName) {
String uniqueNumericString = Long.toString(System.nanoTime());
return mockSegmentMetadata(tableName, segmentName, 100, uniqueNumericString);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,14 @@ public String forSegmentListAPI(String tableName, @Nullable String tableType, bo
return url.append(parameter).toString();
}

public String forSegmentDeleteWithTimeWindowAPI(String tableName, long startTimeInMilliSeconds,
long endTimeInMilliSeconds) {
StringBuilder url = new StringBuilder();
url.append(StringUtil.join("/", _baseUrl, "segments", tableName,
String.format("select?startTimestamp=%d&endTimestamp=%d", startTimeInMilliSeconds, endTimeInMilliSeconds)));
return url.toString();
}

private void appendUrlParameter(StringBuilder url, String urlParameterKey, String urlParameterValue) {
if (url.length() == 0) {
url.append("?").append(urlParameterKey).append("=").append(urlParameterValue);
Expand Down