Skip to content

HADOOP-16729 out of band deletes #952

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1502,12 +1502,10 @@
</property>

<property>
<name>fs.s3a.metadatastore.authoritative.dir.ttl</name>
<value>3600000</value>
<name>fs.s3a.metadatastore.metadata.ttl</name>
<value>15m</value>
<description>
This value sets how long a directory listing in the MS is considered as
authoritative. The value is in milliseconds.
MetadataStore should be authoritative to use this configuration knob.
This value sets how long an entry in a MetadataStore is valid.
</description>
</property>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,10 +353,14 @@ private Constants() {
/**
* How long a directory listing in the MS is considered as authoritative.
*/
public static final String METADATASTORE_AUTHORITATIVE_DIR_TTL =
"fs.s3a.metadatastore.authoritative.dir.ttl";
public static final long DEFAULT_METADATASTORE_AUTHORITATIVE_DIR_TTL =
TimeUnit.MINUTES.toMillis(60);
public static final String METADATASTORE_METADATA_TTL =
"fs.s3a.metadatastore.metadata.ttl";

/**
* Default TTL in milliseconds: 15 minutes.
*/
public static final long DEFAULT_METADATASTORE_METADATA_TTL =
TimeUnit.MINUTES.toMillis(15);

/** read ahead buffer size to prevent connection re-establishments. */
public static final String READAHEAD_RANGE = "fs.s3a.readahead.range";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ private boolean buildNextStatusBatch(S3ListResult objects) {
if (acceptor.accept(keyPath, summary) && filter.accept(keyPath)) {
S3AFileStatus status = createFileStatus(keyPath, summary,
owner.getDefaultBlockSize(keyPath), owner.getUsername(),
null, null);
summary.getETag(), null);
LOG.debug("Adding: {}", status);
stats.add(status);
added++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@
import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
import org.apache.hadoop.fs.s3a.s3guard.PathMetadata;
import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
import org.apache.hadoop.fs.s3a.s3guard.ITtlTimeProvider;
import org.apache.hadoop.fs.s3native.S3xLoginHelper;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.fs.store.EtagChecksum;
Expand Down Expand Up @@ -244,7 +245,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,

private AWSCredentialProviderList credentials;

private S3Guard.ITtlTimeProvider ttlTimeProvider;
private ITtlTimeProvider ttlTimeProvider;

/** Add any deprecated keys. */
@SuppressWarnings("deprecation")
Expand Down Expand Up @@ -388,9 +389,11 @@ public void initialize(URI name, Configuration originalConf)
getMetadataStore(), allowAuthoritative);
}
initMultipartUploads(conf);
long authDirTtl = conf.getLong(METADATASTORE_AUTHORITATIVE_DIR_TTL,
DEFAULT_METADATASTORE_AUTHORITATIVE_DIR_TTL);
ttlTimeProvider = new S3Guard.TtlTimeProvider(authDirTtl);
if (hasMetadataStore()) {
long authDirTtl = conf.getTimeDuration(METADATASTORE_METADATA_TTL,
DEFAULT_METADATASTORE_METADATA_TTL, TimeUnit.MILLISECONDS);
ttlTimeProvider = new S3Guard.TtlTimeProvider(authDirTtl);
}
} catch (AmazonClientException e) {
throw translateException("initializing ", new Path(name), e);
}
Expand Down Expand Up @@ -1341,7 +1344,7 @@ childDst, length, getDefaultBlockSize(childDst), username,
}
}

metadataStore.move(srcPaths, dstMetas);
metadataStore.move(srcPaths, dstMetas, ttlTimeProvider);

if (!src.getParent().equals(dst.getParent())) {
LOG.debug("source & dest parents are different; fix up dir markers");
Expand Down Expand Up @@ -1722,7 +1725,7 @@ void deleteObjectAtPath(Path f, String key, boolean isFile)
instrumentation.directoryDeleted();
}
deleteObject(key);
metadataStore.delete(f);
metadataStore.delete(f, ttlTimeProvider);
}

/**
Expand Down Expand Up @@ -2143,7 +2146,7 @@ private boolean innerDelete(S3AFileStatus status, boolean recursive)
}
}
}
metadataStore.deleteSubtree(f);
metadataStore.deleteSubtree(f, ttlTimeProvider);
} else {
LOG.debug("delete: Path is a file");
deleteObjectAtPath(f, key, true);
Expand Down Expand Up @@ -2466,7 +2469,10 @@ S3AFileStatus innerGetFileStatus(final Path f,
LOG.debug("Getting path status for {} ({})", path, key);

// Check MetadataStore, if any.
PathMetadata pm = metadataStore.get(path, needEmptyDirectoryFlag);
PathMetadata pm = null;
if (hasMetadataStore()) {
pm = S3Guard.getWithTtl(metadataStore, path, ttlTimeProvider);
}
Set<Path> tombstones = Collections.emptySet();
if (pm != null) {
if (pm.isDeleted()) {
Expand Down Expand Up @@ -2501,7 +2507,7 @@ S3AFileStatus innerGetFileStatus(final Path f,
LOG.debug("S3Guard metadata for {} is outdated, updating it",
path);
return S3Guard.putAndReturn(metadataStore, s3AFileStatus,
instrumentation);
instrumentation, ttlTimeProvider);
}
}
}
Expand Down Expand Up @@ -2534,12 +2540,14 @@ S3AFileStatus innerGetFileStatus(final Path f,
null, null);
}
// entry was found, save in S3Guard
return S3Guard.putAndReturn(metadataStore, s3FileStatus, instrumentation);
return S3Guard.putAndReturn(metadataStore, s3FileStatus,
instrumentation, ttlTimeProvider);
} else {
// there was no entry in S3Guard
// retrieve the data and update the metadata store in the process.
return S3Guard.putAndReturn(metadataStore,
s3GetFileStatus(path, key, tombstones), instrumentation);
s3GetFileStatus(path, key, tombstones), instrumentation,
ttlTimeProvider);
}
}

Expand Down Expand Up @@ -3191,11 +3199,12 @@ void finishedWrite(String key, long length, String eTag, String versionId)
// See note about failure semantics in S3Guard documentation
try {
if (hasMetadataStore()) {
S3Guard.addAncestors(metadataStore, p, username);
S3Guard.addAncestors(metadataStore, p, username, ttlTimeProvider);
S3AFileStatus status = createUploadFileStatus(p,
S3AUtils.objectRepresentsDirectory(key, length), length,
getDefaultBlockSize(p), username, eTag, versionId);
S3Guard.putAndReturn(metadataStore, status, instrumentation);
S3Guard.putAndReturn(metadataStore, status, instrumentation,
ttlTimeProvider);
}
} catch (IOException e) {
if (failOnMetadataWriteError) {
Expand Down Expand Up @@ -3860,12 +3869,12 @@ public AWSCredentialProviderList shareCredentials(final String purpose) {
}

@VisibleForTesting
protected S3Guard.ITtlTimeProvider getTtlTimeProvider() {
public ITtlTimeProvider getTtlTimeProvider() {
return ttlTimeProvider;
}

@VisibleForTesting
protected void setTtlTimeProvider(S3Guard.ITtlTimeProvider ttlTimeProvider) {
protected void setTtlTimeProvider(ITtlTimeProvider ttlTimeProvider) {
this.ttlTimeProvider = ttlTimeProvider;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,10 @@
* directory helps prevent unnecessary queries during traversal of an entire
* sub-tree.
*
* Some mutating operations, notably {@link #deleteSubtree(Path)} and
* {@link #move(Collection, Collection)}, are less efficient with this schema.
* Some mutating operations, notably
* {@link MetadataStore#deleteSubtree(Path, ITtlTimeProvider)} and
* {@link MetadataStore#move(Collection, Collection, ITtlTimeProvider)},
* are less efficient with this schema.
* They require mutating multiple items in the DynamoDB table.
*
* By default, DynamoDB access is performed within the same AWS region as
Expand Down Expand Up @@ -471,14 +473,15 @@ private void initDataAccessRetries(Configuration config) {

@Override
@Retries.RetryTranslated
public void delete(Path path) throws IOException {
innerDelete(path, true);
public void delete(Path path, ITtlTimeProvider ttlTimeProvider)
throws IOException {
innerDelete(path, true, ttlTimeProvider);
}

@Override
@Retries.RetryTranslated
public void forgetMetadata(Path path) throws IOException {
innerDelete(path, false);
innerDelete(path, false, null);
}

/**
Expand All @@ -487,10 +490,13 @@ public void forgetMetadata(Path path) throws IOException {
* There is no check as to whether the entry exists in the table first.
* @param path path to delete
* @param tombstone flag to create a tombstone marker
* @param ttlTimeProvider The time provider to set last_updated. Must not
* be null if tombstone is true.
* @throws IOException I/O error.
*/
@Retries.RetryTranslated
private void innerDelete(final Path path, boolean tombstone)
private void innerDelete(final Path path, boolean tombstone,
ITtlTimeProvider ttlTimeProvider)
throws IOException {
checkPath(path);
LOG.debug("Deleting from table {} in region {}: {}",
Expand All @@ -505,8 +511,13 @@ private void innerDelete(final Path path, boolean tombstone)
// on that of S3A itself
boolean idempotent = S3AFileSystem.DELETE_CONSIDERED_IDEMPOTENT;
if (tombstone) {
Preconditions.checkArgument(ttlTimeProvider != null, "ttlTimeProvider "
+ "must not be null");
final PathMetadata pmTombstone = PathMetadata.tombstone(path);
// update the last updated field of record when putting a tombstone
pmTombstone.setLastUpdated(ttlTimeProvider.getNow());
Item item = PathMetadataDynamoDBTranslation.pathMetadataToItem(
new DDBPathMetadata(PathMetadata.tombstone(path)));
new DDBPathMetadata(pmTombstone));
writeOp.retry(
"Put tombstone",
path.toString(),
Expand All @@ -524,7 +535,8 @@ private void innerDelete(final Path path, boolean tombstone)

@Override
@Retries.RetryTranslated
public void deleteSubtree(Path path) throws IOException {
public void deleteSubtree(Path path, ITtlTimeProvider ttlTimeProvider)
throws IOException {
checkPath(path);
LOG.debug("Deleting subtree from table {} in region {}: {}",
tableName, region, path);
Expand All @@ -537,7 +549,7 @@ public void deleteSubtree(Path path) throws IOException {

for (DescendantsIterator desc = new DescendantsIterator(this, meta);
desc.hasNext();) {
innerDelete(desc.next().getPath(), true);
innerDelete(desc.next().getPath(), true, ttlTimeProvider);
}
}

Expand Down Expand Up @@ -731,7 +743,8 @@ Collection<DDBPathMetadata> completeAncestry(
@Override
@Retries.RetryTranslated
public void move(Collection<Path> pathsToDelete,
Collection<PathMetadata> pathsToCreate) throws IOException {
Collection<PathMetadata> pathsToCreate, ITtlTimeProvider ttlTimeProvider)
throws IOException {
if (pathsToDelete == null && pathsToCreate == null) {
return;
}
Expand All @@ -754,7 +767,11 @@ public void move(Collection<Path> pathsToDelete,
}
if (pathsToDelete != null) {
for (Path meta : pathsToDelete) {
newItems.add(new DDBPathMetadata(PathMetadata.tombstone(meta)));
Preconditions.checkArgument(ttlTimeProvider != null, "ttlTimeProvider"
+ " must not be null");
final PathMetadata pmTombstone = PathMetadata.tombstone(meta);
pmTombstone.setLastUpdated(ttlTimeProvider.getNow());
newItems.add(new DDBPathMetadata(pmTombstone));
}
}

Expand Down Expand Up @@ -1024,14 +1041,37 @@ public void destroy() throws IOException {
}

@Retries.RetryTranslated
private ItemCollection<ScanOutcome> expiredFiles(long modTime,
String keyPrefix) throws IOException {
String filterExpression =
"mod_time < :mod_time and begins_with(parent, :parent)";
String projectionExpression = "parent,child";
ValueMap map = new ValueMap()
.withLong(":mod_time", modTime)
.withString(":parent", keyPrefix);
private ItemCollection<ScanOutcome> expiredFiles(PruneMode pruneMode,
long cutoff, String keyPrefix) throws IOException {

String filterExpression;
String projectionExpression;
ValueMap map;

switch (pruneMode) {
case ALL_BY_MODTIME:
filterExpression =
"mod_time < :mod_time and begins_with(parent, :parent)";
projectionExpression = "parent,child";
map = new ValueMap()
.withLong(":mod_time", cutoff)
.withString(":parent", keyPrefix);
break;
case TOMBSTONES_BY_LASTUPDATED:
filterExpression =
"last_updated < :last_updated and begins_with(parent, :parent) "
+ "and is_deleted = :is_deleted";
projectionExpression = "parent,child";
map = new ValueMap()
.withLong(":last_updated", cutoff)
.withString(":parent", keyPrefix)
.withBoolean(":is_deleted", true);
break;
default:
throw new UnsupportedOperationException("Unsupported prune mode: "
+ pruneMode);
}

return readOp.retry(
"scan",
keyPrefix,
Expand All @@ -1041,20 +1081,31 @@ private ItemCollection<ScanOutcome> expiredFiles(long modTime,

@Override
@Retries.RetryTranslated
public void prune(long modTime) throws IOException {
prune(modTime, "/");
public void prune(PruneMode pruneMode, long cutoff) throws IOException {
prune(pruneMode, cutoff, "/");
}

/**
* Prune files, in batches. There's a sleep between each batch.
* @param modTime Oldest modification time to allow
*
* @param pruneMode The mode of operation for the prune For details see
* {@link MetadataStore#prune(PruneMode, long)}
* @param cutoff Oldest modification time to allow
* @param keyPrefix The prefix for the keys that should be removed
* @throws IOException Any IO/DDB failure.
* @throws InterruptedIOException if the prune was interrupted
*/
@Override
@Retries.RetryTranslated
public void prune(long modTime, String keyPrefix) throws IOException {
public void prune(PruneMode pruneMode, long cutoff, String keyPrefix)
throws IOException {
final ItemCollection<ScanOutcome> items =
expiredFiles(pruneMode, cutoff, keyPrefix);
innerPrune(items);
}

private void innerPrune(ItemCollection<ScanOutcome> items)
throws IOException {
int itemCount = 0;
try {
Collection<Path> deletionBatch =
Expand All @@ -1064,7 +1115,7 @@ public void prune(long modTime, String keyPrefix) throws IOException {
S3GUARD_DDB_BACKGROUND_SLEEP_MSEC_DEFAULT,
TimeUnit.MILLISECONDS);
Set<Path> parentPathSet = new HashSet<>();
for (Item item : expiredFiles(modTime, keyPrefix)) {
for (Item item : items) {
DDBPathMetadata md = PathMetadataDynamoDBTranslation
.itemToPathMetadata(item, username);
Path path = md.getFileStatus().getPath();
Expand Down
Loading