Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1502,12 +1502,11 @@
</property>

<property>
<name>fs.s3a.metadatastore.authoritative.dir.ttl</name>
<value>3600000</value>
<name>fs.s3a.metadatastore.metadata.ttl</name>
<value>15m</value>
<description>
This value sets how long a directory listing in the MS is considered as
authoritative. The value is in milliseconds.
MetadataStore should be authoritative to use this configuration knob.
This value sets how long a metadata in the MS is valid. The value is in
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

text only makes sense for us developers.

"This value sets how long an entry in the MetadataStore is considered valid. After this time has expired, in non-authoritative mode, S3Guard will check the S3 store for the existence/state of an object and use that as its source of truth.
This is needed when other applications are writing/updating
data in the samme store without going through S3Guard."

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true for non auth and auth mode as well.
Do we want to only have this in non auth mode?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If yes, I'll create another test, which will test this functionality and modify my pr accordingly.

milliseconds.
</description>
</property>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,10 +359,10 @@ private Constants() {
/**
* How long a directory listing in the MS is considered as authoritative.
*/
public static final String METADATASTORE_AUTHORITATIVE_DIR_TTL =
"fs.s3a.metadatastore.authoritative.dir.ttl";
public static final long DEFAULT_METADATASTORE_AUTHORITATIVE_DIR_TTL =
TimeUnit.MINUTES.toMillis(60);
public static final String METADATASTORE_METADATA_TTL =
"fs.s3a.metadatastore.metadata.ttl";
public static final long DEFAULT_METADATASTORE_METADATA_TTL =
TimeUnit.MINUTES.toSeconds(15);

/** read ahead buffer size to prevent connection re-establishments. */
public static final String READAHEAD_RANGE = "fs.s3a.readahead.range";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@
import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
import org.apache.hadoop.fs.s3a.s3guard.PathMetadata;
import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
import org.apache.hadoop.fs.s3a.s3guard.ITtlTimeProvider;
import org.apache.hadoop.fs.s3native.S3xLoginHelper;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.fs.store.EtagChecksum;
Expand Down Expand Up @@ -244,7 +245,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,

private AWSCredentialProviderList credentials;

private S3Guard.ITtlTimeProvider ttlTimeProvider;
private ITtlTimeProvider ttlTimeProvider;

/** Add any deprecated keys. */
@SuppressWarnings("deprecation")
Expand Down Expand Up @@ -388,9 +389,11 @@ public void initialize(URI name, Configuration originalConf)
getMetadataStore(), allowAuthoritative);
}
initMultipartUploads(conf);
long authDirTtl = conf.getLong(METADATASTORE_AUTHORITATIVE_DIR_TTL,
DEFAULT_METADATASTORE_AUTHORITATIVE_DIR_TTL);
ttlTimeProvider = new S3Guard.TtlTimeProvider(authDirTtl);
if(hasMetadataStore()) {
long authDirTtl = conf.getTimeDuration(METADATASTORE_METADATA_TTL,
DEFAULT_METADATASTORE_METADATA_TTL, TimeUnit.SECONDS);
ttlTimeProvider = new S3Guard.TtlTimeProvider(authDirTtl);
}
} catch (AmazonClientException e) {
throw translateException("initializing ", new Path(name), e);
}
Expand Down Expand Up @@ -1341,7 +1344,7 @@ childDst, length, getDefaultBlockSize(childDst), username,
}
}

metadataStore.move(srcPaths, dstMetas);
metadataStore.move(srcPaths, dstMetas, ttlTimeProvider);

if (!src.getParent().equals(dst.getParent())) {
LOG.debug("source & dest parents are different; fix up dir markers");
Expand Down Expand Up @@ -1722,7 +1725,7 @@ void deleteObjectAtPath(Path f, String key, boolean isFile)
instrumentation.directoryDeleted();
}
deleteObject(key);
metadataStore.delete(f);
metadataStore.delete(f, ttlTimeProvider);
}

/**
Expand Down Expand Up @@ -2143,7 +2146,7 @@ private boolean innerDelete(S3AFileStatus status, boolean recursive)
}
}
}
metadataStore.deleteSubtree(f);
metadataStore.deleteSubtree(f, ttlTimeProvider);
} else {
LOG.debug("delete: Path is a file");
deleteObjectAtPath(f, key, true);
Expand Down Expand Up @@ -2466,7 +2469,10 @@ S3AFileStatus innerGetFileStatus(final Path f,
LOG.debug("Getting path status for {} ({})", path, key);

// Check MetadataStore, if any.
PathMetadata pm = metadataStore.get(path, needEmptyDirectoryFlag);
PathMetadata pm = null;
if(hasMetadataStore()) {
pm = S3Guard.getWithTtl(metadataStore, path, ttlTimeProvider);
}
Set<Path> tombstones = Collections.emptySet();
if (pm != null) {
if (pm.isDeleted()) {
Expand Down Expand Up @@ -2501,7 +2507,7 @@ S3AFileStatus innerGetFileStatus(final Path f,
LOG.debug("S3Guard metadata for {} is outdated, updating it",
path);
return S3Guard.putAndReturn(metadataStore, s3AFileStatus,
instrumentation);
instrumentation, ttlTimeProvider);
}
}
}
Expand Down Expand Up @@ -2534,12 +2540,14 @@ S3AFileStatus innerGetFileStatus(final Path f,
null, null);
}
// entry was found, save in S3Guard
return S3Guard.putAndReturn(metadataStore, s3FileStatus, instrumentation);
return S3Guard.putAndReturn(metadataStore, s3FileStatus,
instrumentation, ttlTimeProvider);
} else {
// there was no entry in S3Guard
// retrieve the data and update the metadata store in the process.
return S3Guard.putAndReturn(metadataStore,
s3GetFileStatus(path, key, tombstones), instrumentation);
s3GetFileStatus(path, key, tombstones), instrumentation,
ttlTimeProvider);
}
}

Expand Down Expand Up @@ -3191,11 +3199,12 @@ void finishedWrite(String key, long length, String eTag, String versionId)
// See note about failure semantics in S3Guard documentation
try {
if (hasMetadataStore()) {
S3Guard.addAncestors(metadataStore, p, username);
S3Guard.addAncestors(metadataStore, p, username, ttlTimeProvider);
S3AFileStatus status = createUploadFileStatus(p,
S3AUtils.objectRepresentsDirectory(key, length), length,
getDefaultBlockSize(p), username, eTag, versionId);
S3Guard.putAndReturn(metadataStore, status, instrumentation);
S3Guard.putAndReturn(metadataStore, status, instrumentation,
ttlTimeProvider);
}
} catch (IOException e) {
if (failOnMetadataWriteError) {
Expand Down Expand Up @@ -3860,12 +3869,12 @@ public AWSCredentialProviderList shareCredentials(final String purpose) {
}

@VisibleForTesting
protected S3Guard.ITtlTimeProvider getTtlTimeProvider() {
public ITtlTimeProvider getTtlTimeProvider() {
return ttlTimeProvider;
}

@VisibleForTesting
protected void setTtlTimeProvider(S3Guard.ITtlTimeProvider ttlTimeProvider) {
protected void setTtlTimeProvider(ITtlTimeProvider ttlTimeProvider) {
this.ttlTimeProvider = ttlTimeProvider;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,10 @@
* directory helps prevent unnecessary queries during traversal of an entire
* sub-tree.
*
* Some mutating operations, notably {@link #deleteSubtree(Path)} and
* {@link #move(Collection, Collection)}, are less efficient with this schema.
* Some mutating operations, notably
* {@link MetadataStore#deleteSubtree(Path, ITtlTimeProvider)} and
* {@link MetadataStore#move(Collection, Collection, ITtlTimeProvider)},
* are less efficient with this schema.
* They require mutating multiple items in the DynamoDB table.
*
* By default, DynamoDB access is performed within the same AWS region as
Expand Down Expand Up @@ -470,14 +472,15 @@ private void initDataAccessRetries(Configuration config) {

@Override
@Retries.RetryTranslated
public void delete(Path path) throws IOException {
innerDelete(path, true);
public void delete(Path path, ITtlTimeProvider ttlTimeProvider)
throws IOException {
innerDelete(path, true, ttlTimeProvider);
}

@Override
@Retries.RetryTranslated
public void forgetMetadata(Path path) throws IOException {
innerDelete(path, false);
innerDelete(path, false, null);
}

/**
Expand All @@ -486,10 +489,13 @@ public void forgetMetadata(Path path) throws IOException {
* There is no check as to whether the entry exists in the table first.
* @param path path to delete
* @param tombstone flag to create a tombstone marker
* @param ttlTimeProvider The time provider to set last_updated. Must not
* be null if tombstone is true.
* @throws IOException I/O error.
*/
@Retries.RetryTranslated
private void innerDelete(final Path path, boolean tombstone)
private void innerDelete(final Path path, boolean tombstone,
ITtlTimeProvider ttlTimeProvider)
throws IOException {
checkPath(path);
LOG.debug("Deleting from table {} in region {}: {}",
Expand All @@ -504,8 +510,13 @@ private void innerDelete(final Path path, boolean tombstone)
// on that of S3A itself
boolean idempotent = S3AFileSystem.DELETE_CONSIDERED_IDEMPOTENT;
if (tombstone) {
Preconditions.checkArgument(ttlTimeProvider != null, "ttlTimeProvider "
+ "must not be null");
final PathMetadata pmTombstone = PathMetadata.tombstone(path);
// update the last updated field of record when putting a tombstone
pmTombstone.setLastUpdated(ttlTimeProvider.getNow());
Item item = PathMetadataDynamoDBTranslation.pathMetadataToItem(
new DDBPathMetadata(PathMetadata.tombstone(path)));
new DDBPathMetadata(pmTombstone));
writeOp.retry(
"Put tombstone",
path.toString(),
Expand All @@ -523,7 +534,8 @@ private void innerDelete(final Path path, boolean tombstone)

@Override
@Retries.RetryTranslated
public void deleteSubtree(Path path) throws IOException {
public void deleteSubtree(Path path, ITtlTimeProvider ttlTimeProvider)
throws IOException {
checkPath(path);
LOG.debug("Deleting subtree from table {} in region {}: {}",
tableName, region, path);
Expand All @@ -536,7 +548,7 @@ public void deleteSubtree(Path path) throws IOException {

for (DescendantsIterator desc = new DescendantsIterator(this, meta);
desc.hasNext();) {
innerDelete(desc.next().getPath(), true);
innerDelete(desc.next().getPath(), true, ttlTimeProvider);
}
}

Expand Down Expand Up @@ -730,7 +742,8 @@ Collection<DDBPathMetadata> completeAncestry(
@Override
@Retries.RetryTranslated
public void move(Collection<Path> pathsToDelete,
Collection<PathMetadata> pathsToCreate) throws IOException {
Collection<PathMetadata> pathsToCreate, ITtlTimeProvider ttlTimeProvider)
throws IOException {
if (pathsToDelete == null && pathsToCreate == null) {
return;
}
Expand All @@ -753,7 +766,11 @@ public void move(Collection<Path> pathsToDelete,
}
if (pathsToDelete != null) {
for (Path meta : pathsToDelete) {
newItems.add(new DDBPathMetadata(PathMetadata.tombstone(meta)));
Preconditions.checkArgument(ttlTimeProvider != null, "ttlTimeProvider"
+ " must not be null");
final PathMetadata pmTombstone = PathMetadata.tombstone(meta);
pmTombstone.setLastUpdated(ttlTimeProvider.getNow());
newItems.add(new DDBPathMetadata(pmTombstone));
}
}

Expand Down Expand Up @@ -1023,14 +1040,37 @@ public void destroy() throws IOException {
}

@Retries.RetryTranslated
private ItemCollection<ScanOutcome> expiredFiles(long modTime,
String keyPrefix) throws IOException {
String filterExpression =
"mod_time < :mod_time and begins_with(parent, :parent)";
String projectionExpression = "parent,child";
ValueMap map = new ValueMap()
.withLong(":mod_time", modTime)
.withString(":parent", keyPrefix);
private ItemCollection<ScanOutcome> expiredFiles(PruneMode pruneMode,
long cutoff, String keyPrefix) throws IOException {

String filterExpression;
String projectionExpression;
ValueMap map;

switch (pruneMode) {
case ALL_BY_MODTIME:
filterExpression =
"mod_time < :mod_time and begins_with(parent, :parent)";
projectionExpression = "parent,child";
map = new ValueMap()
.withLong(":mod_time", cutoff)
.withString(":parent", keyPrefix);
break;
case TOMBSTONES_BY_LASTUPDATED:
filterExpression =
"last_updated < :last_updated and begins_with(parent, :parent) "
+ "and is_deleted = :is_deleted";
projectionExpression = "parent,child";
map = new ValueMap()
.withLong(":last_updated", cutoff)
.withString(":parent", keyPrefix)
.withBoolean(":is_deleted", true);
break;
default:
throw new UnsupportedOperationException("Unsupported prune mode: "
+ pruneMode);
}

return readOp.retry(
"scan",
keyPrefix,
Expand All @@ -1040,20 +1080,31 @@ private ItemCollection<ScanOutcome> expiredFiles(long modTime,

@Override
@Retries.RetryTranslated
public void prune(long modTime) throws IOException {
prune(modTime, "/");
public void prune(PruneMode pruneMode, long cutoff) throws IOException {
prune(pruneMode, cutoff, "/");
}

/**
* Prune files, in batches. There's a sleep between each batch.
* @param modTime Oldest modification time to allow
*
* @param pruneMode The mode of operation for the prune For details see
* {@link MetadataStore#prune(PruneMode, long)}
* @param cutoff Oldest modification time to allow
* @param keyPrefix The prefix for the keys that should be removed
* @throws IOException Any IO/DDB failure.
* @throws InterruptedIOException if the prune was interrupted
*/
@Override
@Retries.RetryTranslated
public void prune(long modTime, String keyPrefix) throws IOException {
public void prune(PruneMode pruneMode, long cutoff, String keyPrefix)
throws IOException {
final ItemCollection<ScanOutcome> items =
expiredFiles(pruneMode, cutoff, keyPrefix);
innerPrune(items);
}

private void innerPrune(ItemCollection<ScanOutcome> items)
throws IOException {
int itemCount = 0;
try {
Collection<Path> deletionBatch =
Expand All @@ -1063,7 +1114,7 @@ public void prune(long modTime, String keyPrefix) throws IOException {
S3GUARD_DDB_BACKGROUND_SLEEP_MSEC_DEFAULT,
TimeUnit.MILLISECONDS);
Set<Path> parentPathSet = new HashSet<>();
for (Item item : expiredFiles(modTime, keyPrefix)) {
for (Item item : items) {
DDBPathMetadata md = PathMetadataDynamoDBTranslation
.itemToPathMetadata(item, username);
Path path = md.getFileStatus().getPath();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.s3a.s3guard;

/**
* This interface is defined for handling TTL expiry of metadata in S3Guard.
*
* TTL can be tested by implementing this interface and setting is as
* {@code S3Guard.ttlTimeProvider}. By doing this, getNow() can return any
* value preferred and flaky tests could be avoided. By default getNow()
* returns the EPOCH in runtime.
*/
public interface ITtlTimeProvider {
long getNow();
long getMetadataTtl();
}
Loading