Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -571,12 +571,14 @@ public SuccessResponse reloadAllSegmentsDeprecated2(
@ApiOperation(value = "Delete a segment", notes = "Delete a segment")
public SuccessResponse deleteSegment(
@ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
@ApiParam(value = "Name of the segment", required = true) @PathParam("segmentName") @Encoded String segmentName) {
@ApiParam(value = "Name of the segment", required = true) @PathParam("segmentName") @Encoded String segmentName,
@ApiParam(value = "Retention period for the deleted segments (e.g. 12h, 3d); Using 0d or -1d will instantly "
+ "delete segments without retention") @QueryParam("retention") String retentionPeriod) {
segmentName = URIUtils.decode(segmentName);
TableType tableType = SegmentName.isRealtimeSegmentName(segmentName) ? TableType.REALTIME : TableType.OFFLINE;
String tableNameWithType =
ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableType, LOGGER).get(0);
deleteSegmentsInternal(tableNameWithType, Collections.singletonList(segmentName));
deleteSegmentsInternal(tableNameWithType, Collections.singletonList(segmentName), retentionPeriod);
return new SuccessResponse("Segment deleted");
}

Expand All @@ -587,14 +589,17 @@ public SuccessResponse deleteSegment(
@ApiOperation(value = "Delete all segments", notes = "Delete all segments")
public SuccessResponse deleteAllSegments(
@ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
@ApiParam(value = "OFFLINE|REALTIME", required = true) @QueryParam("type") String tableTypeStr) {
@ApiParam(value = "OFFLINE|REALTIME", required = true) @QueryParam("type") String tableTypeStr,
@ApiParam(value = "Retention period for the deleted segments (e.g. 12h, 3d); Using 0d or -1d will instantly "
+ "delete segments without retention") @QueryParam("retention") String retentionPeriod) {
TableType tableType = Constants.validateTableType(tableTypeStr);
if (tableType == null) {
throw new ControllerApplicationException(LOGGER, "Table type must not be null", Status.BAD_REQUEST);
}
String tableNameWithType =
ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableType, LOGGER).get(0);
deleteSegmentsInternal(tableNameWithType, _pinotHelixResourceManager.getSegmentsFor(tableNameWithType, false));
deleteSegmentsInternal(tableNameWithType, _pinotHelixResourceManager.getSegmentsFor(tableNameWithType, false),
retentionPeriod);
return new SuccessResponse("All segments of table " + tableNameWithType + " deleted");
}

Expand All @@ -607,6 +612,8 @@ public SuccessResponse deleteAllSegments(
notes = "Delete the segments in the JSON array payload")
public SuccessResponse deleteSegments(
@ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
@ApiParam(value = "Retention period for the deleted segments (e.g. 12h, 3d); Using 0d or -1d will instantly "
+ "delete segments without retention") @QueryParam("retention") String retentionPeriod,
List<String> segments) {
int numSegments = segments.size();
if (numSegments == 0) {
Expand All @@ -622,16 +629,17 @@ public SuccessResponse deleteSegments(
TableType tableType = isRealtimeSegment ? TableType.REALTIME : TableType.OFFLINE;
String tableNameWithType =
ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableType, LOGGER).get(0);
deleteSegmentsInternal(tableNameWithType, segments);
deleteSegmentsInternal(tableNameWithType, segments, retentionPeriod);
if (numSegments <= 5) {
return new SuccessResponse("Deleted segments: " + segments + " from table: " + tableNameWithType);
} else {
return new SuccessResponse("Deleted " + numSegments + " segments from table: " + tableNameWithType);
}
}

private void deleteSegmentsInternal(String tableNameWithType, List<String> segments) {
PinotResourceManagerResponse response = _pinotHelixResourceManager.deleteSegments(tableNameWithType, segments);
private void deleteSegmentsInternal(String tableNameWithType, List<String> segments, String retentionPeriod) {
PinotResourceManagerResponse response = _pinotHelixResourceManager.deleteSegments(tableNameWithType, segments,
retentionPeriod);
if (!response.isSuccessful()) {
throw new ControllerApplicationException(LOGGER,
"Failed to delete segments from table: " + tableNameWithType + ", error message: " + response.getMessage(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@
import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.apache.pinot.spi.utils.CommonConstants.Server;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.TimeUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.spi.utils.retry.RetryPolicies;
import org.apache.pinot.spi.utils.retry.RetryPolicy;
Expand Down Expand Up @@ -694,21 +695,32 @@ public List<SegmentZKMetadata> getSegmentsZKMetadata(String tableNameWithType) {
return ZKMetadataProvider.getSegmentsZKMetadata(_propertyStore, tableNameWithType);
}

public synchronized PinotResourceManagerResponse deleteSegments(String tableNameWithType, List<String> segmentNames) {
return deleteSegments(tableNameWithType, segmentNames, null);
}

/**
* Delete a list of segments from ideal state and remove them from the local storage.
*
* @param tableNameWithType Table name with type suffix
* @param segmentNames List of names of segment to be deleted
* @param retentionPeriod The retention period of the deleted segments.
* @return Request response
*/
public synchronized PinotResourceManagerResponse deleteSegments(String tableNameWithType, List<String> segmentNames) {
public synchronized PinotResourceManagerResponse deleteSegments(String tableNameWithType, List<String> segmentNames,
@Nullable String retentionPeriod) {
try {
LOGGER.info("Trying to delete segments: {} from table: {} ", segmentNames, tableNameWithType);
Preconditions.checkArgument(TableNameBuilder.isTableResource(tableNameWithType),
"Table name: %s is not a valid table name with type suffix", tableNameWithType);
HelixHelper.removeSegmentsFromIdealState(_helixZkManager, tableNameWithType, segmentNames);
TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
_segmentDeletionManager.deleteSegments(tableNameWithType, segmentNames, tableConfig);
if (retentionPeriod != null) {
_segmentDeletionManager.deleteSegments(tableNameWithType, segmentNames,
TimeUtils.convertPeriodToMillis(retentionPeriod));
} else {
TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
_segmentDeletionManager.deleteSegments(tableNameWithType, segmentNames, tableConfig);
}
return PinotResourceManagerResponse.success("Segment " + segmentNames + " deleted");
} catch (final Exception e) {
LOGGER.error("Caught exception while deleting segment: {} from table: {}", segmentNames, tableNameWithType, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,16 @@ public void stop() {
}

public void deleteSegments(String tableName, Collection<String> segmentIds) {
deleteSegments(tableName, segmentIds, null);
deleteSegments(tableName, segmentIds, (Long) null);
}

public void deleteSegments(String tableName, Collection<String> segmentIds,
@Nullable TableConfig tableConfig) {
Long deletedSegmentsRetentionMs = getRetentionMsFromTableConfig(tableConfig);
deleteSegments(tableName, segmentIds, getRetentionMsFromTableConfig(tableConfig));
}

public void deleteSegments(String tableName, Collection<String> segmentIds,
@Nullable Long deletedSegmentsRetentionMs) {
deleteSegmentsWithDelay(tableName, segmentIds, deletedSegmentsRetentionMs, DEFAULT_DELETION_DELAY_SECONDS);
}

Expand Down