Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -707,7 +707,8 @@ public synchronized PinotResourceManagerResponse deleteSegments(String tableName
Preconditions.checkArgument(TableNameBuilder.isTableResource(tableNameWithType),
"Table name: %s is not a valid table name with type suffix", tableNameWithType);
HelixHelper.removeSegmentsFromIdealState(_helixZkManager, tableNameWithType, segmentNames);
_segmentDeletionManager.deleteSegments(tableNameWithType, segmentNames);
TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
_segmentDeletionManager.deleteSegments(tableNameWithType, segmentNames, tableConfig);
return PinotResourceManagerResponse.success("Segment " + segmentNames + " deleted");
} catch (final Exception e) {
LOGGER.error("Caught exception while deleting segment: {} from table: {}", segmentNames, tableNameWithType, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,21 @@

import java.io.IOException;
import java.net.URI;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.helix.AccessOption;
import org.apache.helix.HelixAdmin;
import org.apache.helix.ZNRecord;
Expand All @@ -40,10 +44,12 @@
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.utils.SegmentName;
import org.apache.pinot.common.utils.URIUtils;
import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.apache.pinot.spi.utils.TimeUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -53,22 +59,34 @@ public class SegmentDeletionManager {
private static final Logger LOGGER = LoggerFactory.getLogger(SegmentDeletionManager.class);
private static final long MAX_DELETION_DELAY_SECONDS = 300L; // Maximum of 5 minutes back-off to retry the deletion
private static final long DEFAULT_DELETION_DELAY_SECONDS = 2L;

// Retention date format will be written as suffix to deleted segments under `Deleted_Segments` folder. for example:
// `Deleted_Segments/myTable/myTable_mySegment_0__RETENTION_UNTIL__202202021200` to indicate that this segment
// file will be permanently deleted after Feb 2nd 2022 12PM.
private static final String DELETED_SEGMENTS = "Deleted_Segments";
private static final String RETENTION_UNTIL_SEPARATOR = "__RETENTION_UNTIL__";
private static final String RETENTION_DATE_FORMAT_STR = "yyyyMMddHHmm";
private static final SimpleDateFormat RETENTION_DATE_FORMAT;

static {
RETENTION_DATE_FORMAT = new SimpleDateFormat(RETENTION_DATE_FORMAT_STR);
RETENTION_DATE_FORMAT.setTimeZone(TimeZone.getTimeZone("UTC"));
}

private final ScheduledExecutorService _executorService;
private final String _dataDir;
private final String _helixClusterName;
private final HelixAdmin _helixAdmin;
private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
private final int _defaultDeletedSegmentsRetentionInDays;
private final long _defaultDeletedSegmentsRetentionMs;

public SegmentDeletionManager(String dataDir, HelixAdmin helixAdmin, String helixClusterName,
ZkHelixPropertyStore<ZNRecord> propertyStore, int deletedSegmentsRetentionInDays) {
_dataDir = dataDir;
_helixAdmin = helixAdmin;
_helixClusterName = helixClusterName;
_propertyStore = propertyStore;
_defaultDeletedSegmentsRetentionInDays = deletedSegmentsRetentionInDays;
_defaultDeletedSegmentsRetentionMs = TimeUnit.DAYS.toMillis(deletedSegmentsRetentionInDays);

_executorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
Expand All @@ -84,22 +102,29 @@ public void stop() {
_executorService.shutdownNow();
}

public void deleteSegments(final String tableName, final Collection<String> segmentIds) {
deleteSegmentsWithDelay(tableName, segmentIds, DEFAULT_DELETION_DELAY_SECONDS);
public void deleteSegments(String tableName, Collection<String> segmentIds) {
deleteSegments(tableName, segmentIds, null);
}

public void deleteSegments(String tableName, Collection<String> segmentIds,
@Nullable TableConfig tableConfig) {
Long deletedSegmentsRetentionMs = getRetentionMsFromTableConfig(tableConfig);
deleteSegmentsWithDelay(tableName, segmentIds, deletedSegmentsRetentionMs, DEFAULT_DELETION_DELAY_SECONDS);
}

protected void deleteSegmentsWithDelay(final String tableName, final Collection<String> segmentIds,
final long deletionDelaySeconds) {
protected void deleteSegmentsWithDelay(String tableName, Collection<String> segmentIds,
Long deletedSegmentsRetentionMs, long deletionDelaySeconds) {
_executorService.schedule(new Runnable() {
@Override
public void run() {
deleteSegmentFromPropertyStoreAndLocal(tableName, segmentIds, deletionDelaySeconds);
deleteSegmentFromPropertyStoreAndLocal(tableName, segmentIds, deletedSegmentsRetentionMs,
deletionDelaySeconds);
}
}, deletionDelaySeconds, TimeUnit.SECONDS);
}

protected synchronized void deleteSegmentFromPropertyStoreAndLocal(String tableName, Collection<String> segmentIds,
long deletionDelay) {
Long deletedSegmentsRetentionMs, long deletionDelay) {
// Check if segment got removed from ExternalView or IdealState
ExternalView externalView = _helixAdmin.getResourceExternalView(_helixClusterName, tableName);
IdealState idealState = _helixAdmin.getResourceIdealState(_helixClusterName, tableName);
Expand Down Expand Up @@ -151,7 +176,7 @@ protected synchronized void deleteSegmentFromPropertyStoreAndLocal(String tableN
}
segmentsToDelete.removeAll(propStoreFailedSegs);

removeSegmentsFromStore(tableName, segmentsToDelete);
removeSegmentsFromStore(tableName, segmentsToDelete, deletedSegmentsRetentionMs);
}

LOGGER.info("Deleted {} segments from table {}:{}", segmentsToDelete.size(), tableName,
Expand All @@ -160,27 +185,35 @@ protected synchronized void deleteSegmentFromPropertyStoreAndLocal(String tableN
if (!segmentsToRetryLater.isEmpty()) {
long effectiveDeletionDelay = Math.min(deletionDelay * 2, MAX_DELETION_DELAY_SECONDS);
LOGGER.info("Postponing deletion of {} segments from table {}", segmentsToRetryLater.size(), tableName);
deleteSegmentsWithDelay(tableName, segmentsToRetryLater, effectiveDeletionDelay);
deleteSegmentsWithDelay(tableName, segmentsToRetryLater, deletedSegmentsRetentionMs, effectiveDeletionDelay);
return;
}
}

public void removeSegmentsFromStore(String tableNameWithType, List<String> segments) {
removeSegmentsFromStore(tableNameWithType, segments, null);
}

public void removeSegmentsFromStore(String tableNameWithType, List<String> segments,
@Nullable Long deletedSegmentsRetentionMs) {
for (String segment : segments) {
removeSegmentFromStore(tableNameWithType, segment);
removeSegmentFromStore(tableNameWithType, segment, deletedSegmentsRetentionMs);
}
}

protected void removeSegmentFromStore(String tableNameWithType, String segmentId) {
protected void removeSegmentFromStore(String tableNameWithType, String segmentId,
@Nullable Long deletedSegmentsRetentionMs) {
// Ignore HLC segments as they are not stored in Pinot FS
if (SegmentName.isHighLevelConsumerSegmentName(segmentId)) {
return;
}
if (_dataDir != null) {
long retentionMs = deletedSegmentsRetentionMs == null
? _defaultDeletedSegmentsRetentionMs : deletedSegmentsRetentionMs;
String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
URI fileToDeleteURI = URIUtils.getUri(_dataDir, rawTableName, URIUtils.encode(segmentId));
PinotFS pinotFS = PinotFSFactory.create(fileToDeleteURI.getScheme());
if (_defaultDeletedSegmentsRetentionInDays <= 0) {
if (retentionMs <= 0) {
// delete the segment file instantly if retention is set to zero
try {
if (pinotFS.delete(fileToDeleteURI, true)) {
Expand All @@ -193,8 +226,9 @@ protected void removeSegmentFromStore(String tableNameWithType, String segmentId
}
} else {
// move the segment file to deleted segments first and let retention manager handler the deletion
URI deletedSegmentMoveDestURI = URIUtils.getUri(_dataDir, DELETED_SEGMENTS, rawTableName,
URIUtils.encode(segmentId));
String deletedFileName = deletedSegmentsRetentionMs == null ? URIUtils.encode(segmentId)
: getDeletedSegmentFileName(URIUtils.encode(segmentId), deletedSegmentsRetentionMs);
URI deletedSegmentMoveDestURI = URIUtils.getUri(_dataDir, DELETED_SEGMENTS, rawTableName, deletedFileName);
try {
if (pinotFS.exists(fileToDeleteURI)) {
// Overwrites the file if it already exists in the target directory.
Expand Down Expand Up @@ -223,9 +257,8 @@ protected void removeSegmentFromStore(String tableNameWithType, String segmentId

/**
* Removes aged deleted segments from the deleted directory
* @param retentionInDays: retention for deleted segments in days
*/
public void removeAgedDeletedSegments(int retentionInDays) {
public void removeAgedDeletedSegments() {
if (_dataDir != null) {
URI deletedDirURI = URIUtils.getUri(_dataDir, DELETED_SEGMENTS);
PinotFS pinotFS = PinotFSFactory.create(deletedDirURI.getScheme());
Expand Down Expand Up @@ -254,8 +287,8 @@ public void removeAgedDeletedSegments(int retentionInDays) {
int numFilesDeleted = 0;
for (String targetFile : targetFiles) {
URI targetURI = URIUtils.getUri(targetFile);
Date dateToDelete = DateTime.now().minusDays(retentionInDays).toDate();
if (pinotFS.lastModified(targetURI) < dateToDelete.getTime()) {
long deletionTimeMs = getDeletionTimeMsFromFile(targetFile, pinotFS.lastModified(targetURI));
if (System.currentTimeMillis() >= deletionTimeMs) {
if (!pinotFS.delete(targetURI, true)) {
LOGGER.warn("Cannot remove file {} from deleted directory.", targetURI.toString());
} else {
Expand All @@ -278,4 +311,37 @@ public void removeAgedDeletedSegments(int retentionInDays) {
LOGGER.info("dataDir is not configured, won't delete any expired segments from deleted directory.");
}
}

private String getDeletedSegmentFileName(String fileName, long deletedSegmentsRetentionMs) {
return fileName + RETENTION_UNTIL_SEPARATOR + RETENTION_DATE_FORMAT.format(new Date(
System.currentTimeMillis() + deletedSegmentsRetentionMs));
}

private long getDeletionTimeMsFromFile(String targetFile, long lastModifiedTime) {
String[] split = StringUtils.splitByWholeSeparator(targetFile, RETENTION_UNTIL_SEPARATOR);
if (split.length == 2) {
try {
return RETENTION_DATE_FORMAT.parse(split[1]).getTime();
} catch (Exception e) {
LOGGER.warn("No retention suffix found for file: {}", targetFile);
}
}
LOGGER.info("Fallback to using default cluster retention config: {} ms", _defaultDeletedSegmentsRetentionMs);
return lastModifiedTime + _defaultDeletedSegmentsRetentionMs;
}

@Nullable
private static Long getRetentionMsFromTableConfig(@Nullable TableConfig tableConfig) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(minor) Annotate the return as @Nullable

if (tableConfig != null) {
SegmentsValidationAndRetentionConfig validationConfig = tableConfig.getValidationConfig();
if (!StringUtils.isEmpty(validationConfig.getDeletedSegmentsRetentionPeriod())) {
try {
return TimeUtils.convertPeriodToMillis(validationConfig.getDeletedSegmentsRetentionPeriod());
} catch (Exception e) {
LOGGER.warn("Unable to parse deleted segment retention config for table {}", tableConfig.getTableName(), e);
}
}
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,17 +64,13 @@ public class RetentionManager extends ControllerPeriodicTask<Void> {

private static final Logger LOGGER = LoggerFactory.getLogger(RetentionManager.class);

private final int _deletedSegmentsRetentionInDays;

public RetentionManager(PinotHelixResourceManager pinotHelixResourceManager,
LeadControllerManager leadControllerManager, ControllerConf config, ControllerMetrics controllerMetrics) {
super("RetentionManager", config.getRetentionControllerFrequencyInSeconds(),
config.getRetentionManagerInitialDelayInSeconds(), pinotHelixResourceManager, leadControllerManager,
controllerMetrics);
_deletedSegmentsRetentionInDays = config.getDeletedSegmentsRetentionInDays();

LOGGER.info("Starting RetentionManager with runFrequencyInSeconds: {}, deletedSegmentsRetentionInDays: {}",
getIntervalInSeconds(), _deletedSegmentsRetentionInDays);
LOGGER.info("Starting RetentionManager with runFrequencyInSeconds: {}", getIntervalInSeconds());
}

@Override
Expand All @@ -97,8 +93,8 @@ protected void processTable(String tableNameWithType) {

@Override
protected void postprocess() {
LOGGER.info("Removing aged (more than {} days) deleted segments for all tables", _deletedSegmentsRetentionInDays);
_pinotHelixResourceManager.getSegmentDeletionManager().removeAgedDeletedSegments(_deletedSegmentsRetentionInDays);
LOGGER.info("Removing aged deleted segments for all tables");
_pinotHelixResourceManager.getSegmentDeletionManager().removeAgedDeletedSegments();
}

private void manageRetentionForTable(TableConfig tableConfig) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.testng.Assert;
import org.testng.annotations.Test;

import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.*;

Expand Down Expand Up @@ -103,7 +102,7 @@ private void testDifferentTimeUnits(long pastTimeStamp, TimeUnit timeUnit, long
SegmentDeletionManager deletionManager = pinotHelixResourceManager.getSegmentDeletionManager();

// Verify that the removeAgedDeletedSegments() method in deletion manager is actually called.
verify(deletionManager, times(1)).removeAgedDeletedSegments(anyInt());
verify(deletionManager, times(1)).removeAgedDeletedSegments();

// Verify that the deleteSegments method is actually called.
verify(pinotHelixResourceManager, times(1)).deleteSegments(anyString(), anyList());
Expand Down Expand Up @@ -177,7 +176,7 @@ public Void answer(InvocationOnMock invocationOnMock)
throws Throwable {
return null;
}
}).when(deletionManager).removeAgedDeletedSegments(anyInt());
}).when(deletionManager).removeAgedDeletedSegments();
when(resourceManager.getSegmentDeletionManager()).thenReturn(deletionManager);

// If and when PinotHelixResourceManager.deleteSegments() is invoked, make sure that the segments deleted
Expand Down Expand Up @@ -229,7 +228,7 @@ public void testRealtimeLLCCleanup()
SegmentDeletionManager deletionManager = pinotHelixResourceManager.getSegmentDeletionManager();

// Verify that the removeAgedDeletedSegments() method in deletion manager is actually called.
verify(deletionManager, times(1)).removeAgedDeletedSegments(anyInt());
verify(deletionManager, times(1)).removeAgedDeletedSegments();

// Verify that the deleteSegments method is actually called.
verify(pinotHelixResourceManager, times(1)).deleteSegments(anyString(), anyList());
Expand Down
Loading