-
Notifications
You must be signed in to change notification settings - Fork 1.5k
add retention period to deleted segment files and allow table level o… #8176
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Jackie-Jiang
merged 10 commits into
apache:master
from
walterddr:segment_retention_to_deleted_segment_file
Feb 18, 2022
Merged
Changes from all commits
Commits
Show all changes
10 commits
Select commit
Hold shift + click to select a range
5295af9
add retention period to deleted segment files and allow table level o…
fd182c6
address diff comments
e986f9b
modify the storage format to readable simple date
046875a
also add segmet deletion manager test for default retention period wi…
74dabbe
do not encode default cluster deletion so that cluster level change a…
8433b3b
make retention manager take tableConfig instead of retention time
62e4ab5
address diff comments
ab894d6
address diff comments
dd04096
fix minor
5540a33
Update pinot-controller/src/main/java/org/apache/pinot/controller/hel…
walterddr File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,17 +20,21 @@ | |
|
|
||
| import java.io.IOException; | ||
| import java.net.URI; | ||
| import java.text.SimpleDateFormat; | ||
| import java.util.ArrayList; | ||
| import java.util.Collection; | ||
| import java.util.Date; | ||
| import java.util.HashSet; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Set; | ||
| import java.util.TimeZone; | ||
| import java.util.concurrent.Executors; | ||
| import java.util.concurrent.ScheduledExecutorService; | ||
| import java.util.concurrent.ThreadFactory; | ||
| import java.util.concurrent.TimeUnit; | ||
| import javax.annotation.Nullable; | ||
| import org.apache.commons.lang3.StringUtils; | ||
| import org.apache.helix.AccessOption; | ||
| import org.apache.helix.HelixAdmin; | ||
| import org.apache.helix.ZNRecord; | ||
|
|
@@ -40,10 +44,12 @@ | |
| import org.apache.pinot.common.metadata.ZKMetadataProvider; | ||
| import org.apache.pinot.common.utils.SegmentName; | ||
| import org.apache.pinot.common.utils.URIUtils; | ||
| import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig; | ||
| import org.apache.pinot.spi.config.table.TableConfig; | ||
| import org.apache.pinot.spi.filesystem.PinotFS; | ||
| import org.apache.pinot.spi.filesystem.PinotFSFactory; | ||
| import org.apache.pinot.spi.utils.TimeUtils; | ||
| import org.apache.pinot.spi.utils.builder.TableNameBuilder; | ||
| import org.joda.time.DateTime; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
|
|
@@ -53,22 +59,34 @@ public class SegmentDeletionManager { | |
| private static final Logger LOGGER = LoggerFactory.getLogger(SegmentDeletionManager.class); | ||
| private static final long MAX_DELETION_DELAY_SECONDS = 300L; // Maximum of 5 minutes back-off to retry the deletion | ||
| private static final long DEFAULT_DELETION_DELAY_SECONDS = 2L; | ||
|
|
||
| // Retention date format will be written as suffix to deleted segments under `Deleted_Segments` folder. for example: | ||
| // `Deleted_Segments/myTable/myTable_mySegment_0__RETENTION_UNTIL__202202021200` to indicate that this segment | ||
| // file will be permanently deleted after Feb 2nd 2022 12PM. | ||
| private static final String DELETED_SEGMENTS = "Deleted_Segments"; | ||
| private static final String RETENTION_UNTIL_SEPARATOR = "__RETENTION_UNTIL__"; | ||
| private static final String RETENTION_DATE_FORMAT_STR = "yyyyMMddHHmm"; | ||
| private static final SimpleDateFormat RETENTION_DATE_FORMAT; | ||
|
|
||
| static { | ||
| RETENTION_DATE_FORMAT = new SimpleDateFormat(RETENTION_DATE_FORMAT_STR); | ||
| RETENTION_DATE_FORMAT.setTimeZone(TimeZone.getTimeZone("UTC")); | ||
| } | ||
|
|
||
| private final ScheduledExecutorService _executorService; | ||
| private final String _dataDir; | ||
| private final String _helixClusterName; | ||
| private final HelixAdmin _helixAdmin; | ||
| private final ZkHelixPropertyStore<ZNRecord> _propertyStore; | ||
| private final int _defaultDeletedSegmentsRetentionInDays; | ||
| private final long _defaultDeletedSegmentsRetentionMs; | ||
|
|
||
| public SegmentDeletionManager(String dataDir, HelixAdmin helixAdmin, String helixClusterName, | ||
| ZkHelixPropertyStore<ZNRecord> propertyStore, int deletedSegmentsRetentionInDays) { | ||
| _dataDir = dataDir; | ||
| _helixAdmin = helixAdmin; | ||
| _helixClusterName = helixClusterName; | ||
| _propertyStore = propertyStore; | ||
| _defaultDeletedSegmentsRetentionInDays = deletedSegmentsRetentionInDays; | ||
| _defaultDeletedSegmentsRetentionMs = TimeUnit.DAYS.toMillis(deletedSegmentsRetentionInDays); | ||
|
|
||
| _executorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { | ||
| @Override | ||
|
|
@@ -84,22 +102,29 @@ public void stop() { | |
| _executorService.shutdownNow(); | ||
| } | ||
|
|
||
| public void deleteSegments(final String tableName, final Collection<String> segmentIds) { | ||
| deleteSegmentsWithDelay(tableName, segmentIds, DEFAULT_DELETION_DELAY_SECONDS); | ||
| public void deleteSegments(String tableName, Collection<String> segmentIds) { | ||
| deleteSegments(tableName, segmentIds, null); | ||
| } | ||
|
|
||
| public void deleteSegments(String tableName, Collection<String> segmentIds, | ||
| @Nullable TableConfig tableConfig) { | ||
| Long deletedSegmentsRetentionMs = getRetentionMsFromTableConfig(tableConfig); | ||
| deleteSegmentsWithDelay(tableName, segmentIds, deletedSegmentsRetentionMs, DEFAULT_DELETION_DELAY_SECONDS); | ||
| } | ||
|
|
||
| protected void deleteSegmentsWithDelay(final String tableName, final Collection<String> segmentIds, | ||
| final long deletionDelaySeconds) { | ||
| protected void deleteSegmentsWithDelay(String tableName, Collection<String> segmentIds, | ||
| Long deletedSegmentsRetentionMs, long deletionDelaySeconds) { | ||
| _executorService.schedule(new Runnable() { | ||
| @Override | ||
| public void run() { | ||
| deleteSegmentFromPropertyStoreAndLocal(tableName, segmentIds, deletionDelaySeconds); | ||
| deleteSegmentFromPropertyStoreAndLocal(tableName, segmentIds, deletedSegmentsRetentionMs, | ||
| deletionDelaySeconds); | ||
| } | ||
| }, deletionDelaySeconds, TimeUnit.SECONDS); | ||
| } | ||
|
|
||
| protected synchronized void deleteSegmentFromPropertyStoreAndLocal(String tableName, Collection<String> segmentIds, | ||
| long deletionDelay) { | ||
| Long deletedSegmentsRetentionMs, long deletionDelay) { | ||
| // Check if segment got removed from ExternalView or IdealState | ||
| ExternalView externalView = _helixAdmin.getResourceExternalView(_helixClusterName, tableName); | ||
| IdealState idealState = _helixAdmin.getResourceIdealState(_helixClusterName, tableName); | ||
|
|
@@ -151,7 +176,7 @@ protected synchronized void deleteSegmentFromPropertyStoreAndLocal(String tableN | |
| } | ||
| segmentsToDelete.removeAll(propStoreFailedSegs); | ||
|
|
||
| removeSegmentsFromStore(tableName, segmentsToDelete); | ||
| removeSegmentsFromStore(tableName, segmentsToDelete, deletedSegmentsRetentionMs); | ||
| } | ||
|
|
||
| LOGGER.info("Deleted {} segments from table {}:{}", segmentsToDelete.size(), tableName, | ||
|
|
@@ -160,27 +185,35 @@ protected synchronized void deleteSegmentFromPropertyStoreAndLocal(String tableN | |
| if (!segmentsToRetryLater.isEmpty()) { | ||
| long effectiveDeletionDelay = Math.min(deletionDelay * 2, MAX_DELETION_DELAY_SECONDS); | ||
| LOGGER.info("Postponing deletion of {} segments from table {}", segmentsToRetryLater.size(), tableName); | ||
| deleteSegmentsWithDelay(tableName, segmentsToRetryLater, effectiveDeletionDelay); | ||
| deleteSegmentsWithDelay(tableName, segmentsToRetryLater, deletedSegmentsRetentionMs, effectiveDeletionDelay); | ||
| return; | ||
| } | ||
| } | ||
|
|
||
| public void removeSegmentsFromStore(String tableNameWithType, List<String> segments) { | ||
| removeSegmentsFromStore(tableNameWithType, segments, null); | ||
| } | ||
|
|
||
| public void removeSegmentsFromStore(String tableNameWithType, List<String> segments, | ||
| @Nullable Long deletedSegmentsRetentionMs) { | ||
| for (String segment : segments) { | ||
| removeSegmentFromStore(tableNameWithType, segment); | ||
| removeSegmentFromStore(tableNameWithType, segment, deletedSegmentsRetentionMs); | ||
| } | ||
| } | ||
|
|
||
| protected void removeSegmentFromStore(String tableNameWithType, String segmentId) { | ||
| protected void removeSegmentFromStore(String tableNameWithType, String segmentId, | ||
| @Nullable Long deletedSegmentsRetentionMs) { | ||
| // Ignore HLC segments as they are not stored in Pinot FS | ||
| if (SegmentName.isHighLevelConsumerSegmentName(segmentId)) { | ||
| return; | ||
| } | ||
| if (_dataDir != null) { | ||
| long retentionMs = deletedSegmentsRetentionMs == null | ||
| ? _defaultDeletedSegmentsRetentionMs : deletedSegmentsRetentionMs; | ||
| String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); | ||
| URI fileToDeleteURI = URIUtils.getUri(_dataDir, rawTableName, URIUtils.encode(segmentId)); | ||
| PinotFS pinotFS = PinotFSFactory.create(fileToDeleteURI.getScheme()); | ||
| if (_defaultDeletedSegmentsRetentionInDays <= 0) { | ||
| if (retentionMs <= 0) { | ||
| // delete the segment file instantly if retention is set to zero | ||
| try { | ||
| if (pinotFS.delete(fileToDeleteURI, true)) { | ||
|
|
@@ -193,8 +226,9 @@ protected void removeSegmentFromStore(String tableNameWithType, String segmentId | |
| } | ||
| } else { | ||
| // move the segment file to deleted segments first and let retention manager handler the deletion | ||
| URI deletedSegmentMoveDestURI = URIUtils.getUri(_dataDir, DELETED_SEGMENTS, rawTableName, | ||
| URIUtils.encode(segmentId)); | ||
| String deletedFileName = deletedSegmentsRetentionMs == null ? URIUtils.encode(segmentId) | ||
| : getDeletedSegmentFileName(URIUtils.encode(segmentId), deletedSegmentsRetentionMs); | ||
| URI deletedSegmentMoveDestURI = URIUtils.getUri(_dataDir, DELETED_SEGMENTS, rawTableName, deletedFileName); | ||
| try { | ||
| if (pinotFS.exists(fileToDeleteURI)) { | ||
| // Overwrites the file if it already exists in the target directory. | ||
|
|
@@ -223,9 +257,8 @@ protected void removeSegmentFromStore(String tableNameWithType, String segmentId | |
|
|
||
| /** | ||
| * Removes aged deleted segments from the deleted directory | ||
| * @param retentionInDays: retention for deleted segments in days | ||
| */ | ||
| public void removeAgedDeletedSegments(int retentionInDays) { | ||
| public void removeAgedDeletedSegments() { | ||
| if (_dataDir != null) { | ||
| URI deletedDirURI = URIUtils.getUri(_dataDir, DELETED_SEGMENTS); | ||
| PinotFS pinotFS = PinotFSFactory.create(deletedDirURI.getScheme()); | ||
|
|
@@ -254,8 +287,8 @@ public void removeAgedDeletedSegments(int retentionInDays) { | |
| int numFilesDeleted = 0; | ||
| for (String targetFile : targetFiles) { | ||
| URI targetURI = URIUtils.getUri(targetFile); | ||
| Date dateToDelete = DateTime.now().minusDays(retentionInDays).toDate(); | ||
| if (pinotFS.lastModified(targetURI) < dateToDelete.getTime()) { | ||
| long deletionTimeMs = getDeletionTimeMsFromFile(targetFile, pinotFS.lastModified(targetURI)); | ||
| if (System.currentTimeMillis() >= deletionTimeMs) { | ||
| if (!pinotFS.delete(targetURI, true)) { | ||
| LOGGER.warn("Cannot remove file {} from deleted directory.", targetURI.toString()); | ||
| } else { | ||
|
|
@@ -278,4 +311,37 @@ public void removeAgedDeletedSegments(int retentionInDays) { | |
| LOGGER.info("dataDir is not configured, won't delete any expired segments from deleted directory."); | ||
| } | ||
| } | ||
|
|
||
| private String getDeletedSegmentFileName(String fileName, long deletedSegmentsRetentionMs) { | ||
| return fileName + RETENTION_UNTIL_SEPARATOR + RETENTION_DATE_FORMAT.format(new Date( | ||
| System.currentTimeMillis() + deletedSegmentsRetentionMs)); | ||
| } | ||
|
|
||
| private long getDeletionTimeMsFromFile(String targetFile, long lastModifiedTime) { | ||
| String[] split = StringUtils.splitByWholeSeparator(targetFile, RETENTION_UNTIL_SEPARATOR); | ||
| if (split.length == 2) { | ||
| try { | ||
| return RETENTION_DATE_FORMAT.parse(split[1]).getTime(); | ||
| } catch (Exception e) { | ||
| LOGGER.warn("No retention suffix found for file: {}", targetFile); | ||
| } | ||
| } | ||
| LOGGER.info("Fallback to using default cluster retention config: {} ms", _defaultDeletedSegmentsRetentionMs); | ||
| return lastModifiedTime + _defaultDeletedSegmentsRetentionMs; | ||
| } | ||
|
|
||
| @Nullable | ||
| private static Long getRetentionMsFromTableConfig(@Nullable TableConfig tableConfig) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (minor) Annotate the return as |
||
| if (tableConfig != null) { | ||
| SegmentsValidationAndRetentionConfig validationConfig = tableConfig.getValidationConfig(); | ||
| if (!StringUtils.isEmpty(validationConfig.getDeletedSegmentsRetentionPeriod())) { | ||
| try { | ||
| return TimeUtils.convertPeriodToMillis(validationConfig.getDeletedSegmentsRetentionPeriod()); | ||
| } catch (Exception e) { | ||
| LOGGER.warn("Unable to parse deleted segment retention config for table {}", tableConfig.getTableName(), e); | ||
| } | ||
| } | ||
| } | ||
| return null; | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.