Skip to content

HADOOP-XXXXX. S3Guard tombstones can mislead about directory empty status and other fixes - wip #1077

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ public void initialize(URI name, Configuration originalConf)
DEFAULT_METADATASTORE_METADATA_TTL, TimeUnit.MILLISECONDS);
ttlTimeProvider = new S3Guard.TtlTimeProvider(authDirTtl);

setMetadataStore(S3Guard.getMetadataStore(this));
setMetadataStore(S3Guard.getMetadataStore(this, ttlTimeProvider));
allowAuthoritativeMetadataStore = conf.getBoolean(METADATASTORE_AUTHORITATIVE,
DEFAULT_METADATASTORE_AUTHORITATIVE);
allowAuthoritativePaths = S3Guard.getAuthoritativePaths(this);
Expand Down Expand Up @@ -1767,7 +1767,7 @@ void deleteObjectAtPath(Path f, String key, boolean isFile)
instrumentation.directoryDeleted();
}
deleteObject(key);
metadataStore.delete(f, ttlTimeProvider);
metadataStore.delete(f);
}

/**
Expand Down Expand Up @@ -2193,6 +2193,13 @@ void removeKeys(
*/
@Retries.RetryTranslated
public boolean delete(Path f, boolean recursive) throws IOException {
if (f.isRoot()) {
if (!recursive) {
return false;
}
LOG.debug("Deleting root content recursively");
}

try {
entryPoint(INVOCATION_DELETE);
boolean outcome = innerDelete(innerGetFileStatus(f, true), recursive);
Expand Down Expand Up @@ -2293,7 +2300,7 @@ private boolean innerDelete(S3AFileStatus status, boolean recursive)
}
try(DurationInfo ignored =
new DurationInfo(LOG, false, "Delete metastore")) {
metadataStore.deleteSubtree(f, ttlTimeProvider);
metadataStore.deleteSubtree(f);
}
} else {
LOG.debug("delete: Path is a file: {}", key);
Expand Down Expand Up @@ -2679,6 +2686,7 @@ S3AFileStatus innerGetFileStatus(final Path f,
DirListingMetadata children =
S3Guard.listChildrenWithTtl(metadataStore, path, ttlTimeProvider);
if (children != null) {
// todo check what is listed
tombstones = children.listTombstones();
}
LOG.debug("MetadataStore doesn't know if dir is empty, using S3.");
Expand Down Expand Up @@ -4066,6 +4074,7 @@ public ITtlTimeProvider getTtlTimeProvider() {
@VisibleForTesting
protected void setTtlTimeProvider(ITtlTimeProvider ttlTimeProvider) {
this.ttlTimeProvider = ttlTimeProvider;
metadataStore.setTtlTimeProvider(ttlTimeProvider);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ public static List<Path> convertToPaths(
// metastore entries
deleted.forEach(path -> {
try {
metadataStore.delete(path, getStoreContext().getTimeProvider());
metadataStore.delete(path);
} catch (IOException e) {
// trouble: we failed to delete the far end entry
// try with the next one.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,7 @@ public synchronized void sourceObjectsDeleted(

@Override
public void completeRename() throws IOException {
metadataStore.move(sourcePaths, destMetas,
getStoreContext().getTimeProvider(),
getOperationState());
metadataStore.move(sourcePaths, destMetas, getOperationState());
super.completeRename();
}

Expand All @@ -147,12 +145,10 @@ public IOException renameFailed(final Exception ex) {
try (DurationInfo ignored = new DurationInfo(LOG,
"Cleaning up deleted paths")) {
// the destination paths are updated; the source is left alone.
metadataStore.move(new ArrayList<>(0), destMetas,
getStoreContext().getTimeProvider(),
getOperationState());
metadataStore.move(new ArrayList<>(0), destMetas, getOperationState());
for (Path deletedPath : deletedPaths) {
// this is not ideal in that it may leave parent stuff around.
metadataStore.delete(deletedPath, getStoreContext().getTimeProvider());
metadataStore.delete(deletedPath);
}
deleteParentPaths();
} catch (IOException | SdkBaseException e) {
Expand Down Expand Up @@ -185,7 +181,7 @@ private void deleteParentPaths() throws IOException {
PathMetadata md = metadataStore.get(parent, true);
if (md != null && md.isEmptyDirectory() == Tristate.TRUE) {
// if were confident that this is empty: delete it.
metadataStore.delete(parent, getStoreContext().getTimeProvider());
metadataStore.delete(parent);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,8 @@
* sub-tree.
*
* Some mutating operations, notably
* {@link MetadataStore#deleteSubtree(Path, ITtlTimeProvider)} and
* {@link MetadataStore#move(Collection, Collection, ITtlTimeProvider, BulkOperationState)}
* {@link MetadataStore#deleteSubtree(Path)} and
* {@link MetadataStore#move(Collection, Collection, BulkOperationState)}
* are less efficient with this schema.
* They require mutating multiple items in the DynamoDB table.
*
Expand Down Expand Up @@ -356,7 +356,7 @@ public class DynamoDBMetadataStore implements MetadataStore,
* Time source. This is used during writes when parent
* entries need to be created.
*/
private ITtlTimeProvider timeProvider;
private ITtlTimeProvider ttlTimeProvider;

/**
* A utility function to create DynamoDB instance.
Expand Down Expand Up @@ -391,11 +391,13 @@ private DynamoDB createDynamoDB(
* FS via {@link S3AFileSystem#shareCredentials(String)}; this will
* increment the reference counter of these credentials.
* @param fs {@code S3AFileSystem} associated with the MetadataStore
* @param ttlTp the time provider to use for metadata expiry
* @throws IOException on a failure
*/
@Override
@Retries.OnceRaw
public void initialize(FileSystem fs) throws IOException {
public void initialize(FileSystem fs, ITtlTimeProvider ttlTp)
throws IOException {
Preconditions.checkNotNull(fs, "Null filesystem");
Preconditions.checkArgument(fs instanceof S3AFileSystem,
"DynamoDBMetadataStore only supports S3A filesystem.");
Expand Down Expand Up @@ -433,7 +435,7 @@ public void initialize(FileSystem fs) throws IOException {
this::retryEvent
);

timeProvider = new S3Guard.TtlTimeProvider(conf);
this.ttlTimeProvider = ttlTp;
initTable();

instrumentation.initialized();
Expand All @@ -453,7 +455,7 @@ void bindToOwnerFilesystem(final S3AFileSystem fs) {
instrumentation = context.getInstrumentation().getS3GuardInstrumentation();
username = context.getUsername();
executor = context.createThrottledExecutor();
timeProvider = Preconditions.checkNotNull(
ttlTimeProvider = Preconditions.checkNotNull(
context.getTimeProvider(),
"ttlTimeProvider must not be null");
}
Expand All @@ -468,7 +470,8 @@ void bindToOwnerFilesystem(final S3AFileSystem fs) {
*
* This is used to operate the metadata store directly beyond the scope of the
* S3AFileSystem integration, e.g. command line tools.
* Generally, callers should use {@link #initialize(FileSystem)}
* Generally, callers should use
* {@link MetadataStore#initialize(FileSystem, ITtlTimeProvider)}
* with an initialized {@code S3AFileSystem} instance.
*
* Without a filesystem to act as a reference point, the configuration itself
Expand All @@ -479,13 +482,14 @@ void bindToOwnerFilesystem(final S3AFileSystem fs) {
* using the base fs.s3a.* options, as there is no bucket to infer per-bucket
* settings from.
*
* @see #initialize(FileSystem)
* @see MetadataStore#initialize(FileSystem, ITtlTimeProvider)
* @throws IOException if there is an error
* @throws IllegalArgumentException if the configuration is incomplete
*/
@Override
@Retries.OnceRaw
public void initialize(Configuration config) throws IOException {
public void initialize(Configuration config,
ITtlTimeProvider ttlTp) throws IOException {
conf = config;
// use the bucket as the DynamoDB table name if not specified in config
tableName = conf.getTrimmed(S3GUARD_DDB_TABLE_NAME_KEY);
Expand All @@ -512,7 +516,7 @@ public void initialize(Configuration config) throws IOException {
TimeUnit.SECONDS,
"s3a-ddb-" + tableName);
initDataAccessRetries(conf);
timeProvider = new S3Guard.TtlTimeProvider(conf);
this.ttlTimeProvider = ttlTp;
initTable();
}

Expand Down Expand Up @@ -540,16 +544,16 @@ private void initDataAccessRetries(Configuration config) {

@Override
@Retries.RetryTranslated
public void delete(Path path, ITtlTimeProvider ttlTimeProvider)
public void delete(Path path)
throws IOException {
innerDelete(path, true, ttlTimeProvider, null);
innerDelete(path, true, null);
}

@Override
@Retries.RetryTranslated
public void forgetMetadata(Path path) throws IOException {
LOG.debug("Forget metadata for {}", path);
innerDelete(path, false, null, null);
innerDelete(path, false, null);
}

/**
Expand All @@ -558,15 +562,12 @@ public void forgetMetadata(Path path) throws IOException {
* There is no check as to whether the entry exists in the table first.
* @param path path to delete
* @param tombstone flag to create a tombstone marker
* @param ttlTimeProvider The time provider to set last_updated. Must not
* be null if tombstone is true.
* @param ancestorState ancestor state for logging
* @throws IOException I/O error.
*/
@Retries.RetryTranslated
private void innerDelete(final Path path,
final boolean tombstone,
final ITtlTimeProvider ttlTimeProvider,
final AncestorState ancestorState)
throws IOException {
checkPath(path);
Expand Down Expand Up @@ -615,7 +616,7 @@ private void innerDelete(final Path path,

@Override
@Retries.RetryTranslated
public void deleteSubtree(Path path, ITtlTimeProvider ttlTimeProvider)
public void deleteSubtree(Path path)
throws IOException {
checkPath(path);
LOG.debug("Deleting subtree from table {} in region {}: {}",
Expand All @@ -639,7 +640,7 @@ public void deleteSubtree(Path path, ITtlTimeProvider ttlTimeProvider)
desc.hasNext();) {
final Path pathToDelete = desc.next().getPath();
futures.add(submit(executor, () -> {
innerDelete(pathToDelete, true, ttlTimeProvider, state);
innerDelete(pathToDelete, true, state);
return null;
}));
if (futures.size() > S3GUARD_DDB_SUBMITTED_TASK_LIMIT) {
Expand Down Expand Up @@ -776,7 +777,12 @@ public DirListingMetadata listChildren(final Path path) throws IOException {
final List<PathMetadata> metas = new ArrayList<>();
for (Item item : items) {
DDBPathMetadata meta = itemToPathMetadata(item, username);
metas.add(meta);
// handle expiry - only add not expired entries to listing.
if (meta.getLastUpdated() == 0 ||
!meta.isExpired(ttlTimeProvider.getMetadataTtl(),
ttlTimeProvider.getNow())) {
metas.add(meta);
}
}

// Minor race condition here - if the path is deleted between
Expand Down Expand Up @@ -823,13 +829,11 @@ DirListingMetadata getDirListingMetadataFromDirMetaAndList(Path path,
* Callers are required to synchronize on ancestorState.
* @param pathsToCreate paths to create
* @param ancestorState ongoing ancestor state.
* @param ttlTimeProvider Must not be null
* @return the full ancestry paths
*/
private Collection<DDBPathMetadata> completeAncestry(
final Collection<DDBPathMetadata> pathsToCreate,
final AncestorState ancestorState,
final ITtlTimeProvider ttlTimeProvider) throws PathIOException {
final AncestorState ancestorState) throws PathIOException {
// Key on path to allow fast lookup
Map<Path, DDBPathMetadata> ancestry = new HashMap<>();
LOG.debug("Completing ancestry for {} paths", pathsToCreate.size());
Expand Down Expand Up @@ -913,9 +917,7 @@ private Collection<DDBPathMetadata> completeAncestry(
@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
@Override
@Retries.RetryTranslated
public void addAncestors(
final Path qualifiedPath,
final ITtlTimeProvider ttlTimeProvider,
public void addAncestors(final Path qualifiedPath,
@Nullable final BulkOperationState operationState) throws IOException {

Collection<DDBPathMetadata> newDirs = new ArrayList<>();
Expand Down Expand Up @@ -1000,10 +1002,8 @@ public void addAncestors(
*/
@Override
@Retries.RetryTranslated
public void move(
@Nullable Collection<Path> pathsToDelete,
public void move(@Nullable Collection<Path> pathsToDelete,
@Nullable Collection<PathMetadata> pathsToCreate,
final ITtlTimeProvider ttlTimeProvider,
@Nullable final BulkOperationState operationState) throws IOException {
if (pathsToDelete == null && pathsToCreate == null) {
return;
Expand Down Expand Up @@ -1032,8 +1032,7 @@ public void move(
newItems.addAll(
completeAncestry(
pathMetaToDDBPathMeta(pathsToCreate),
ancestorState,
extractTimeProvider(ttlTimeProvider)));
ancestorState));
}
}
// sort all the new items topmost first.
Expand Down Expand Up @@ -1222,7 +1221,7 @@ public void put(
public void put(
final Collection<? extends PathMetadata> metas,
@Nullable final BulkOperationState operationState) throws IOException {
innerPut(pathMetaToDDBPathMeta(metas), operationState, timeProvider);
innerPut(pathMetaToDDBPathMeta(metas), operationState, ttlTimeProvider);
}

/**
Expand All @@ -1236,15 +1235,15 @@ public void put(
* create entries in the table without parents.
* @param metas metadata entries to write.
* @param operationState (nullable) operational state for a bulk update
* @param ttlTimeProvider
* @param ttlTp The time provider for metadata expiry
* @throws IOException failure.
*/
@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
@Retries.RetryTranslated
private void innerPut(
final Collection<DDBPathMetadata> metas,
@Nullable final BulkOperationState operationState,
final ITtlTimeProvider ttlTimeProvider) throws IOException {
final ITtlTimeProvider ttlTp) throws IOException {
if (metas.isEmpty()) {
// Happens when someone calls put() with an empty list.
LOG.debug("Ignoring empty list of entries to put");
Expand All @@ -1258,7 +1257,7 @@ private void innerPut(
Item[] items;
synchronized (ancestorState) {
items = pathMetadataToItem(
completeAncestry(metas, ancestorState, ttlTimeProvider));
completeAncestry(metas, ancestorState));
}
LOG.debug("Saving batch of {} items to table {}, region {}", items.length,
tableName, region);
Expand Down Expand Up @@ -1644,7 +1643,7 @@ private void removeAuthoritativeDirFlag(
try {
LOG.debug("innerPut on metas: {}", metas);
if (!metas.isEmpty()) {
innerPut(metas, state, timeProvider);
innerPut(metas, state, ttlTimeProvider);
}
} catch (IOException e) {
String msg = String.format("IOException while setting false "
Expand Down Expand Up @@ -2320,15 +2319,20 @@ public AncestorState initiateBulkWrite(
return new AncestorState(this, operation, dest);
}

@Override
public void setTtlTimeProvider(ITtlTimeProvider ttlTimeProvider) {
this.ttlTimeProvider = ttlTimeProvider;
}

/**
* Extract a time provider from the argument or fall back to the
* one in the constructor.
* @param ttlTimeProvider nullable time source passed in as an argument.
* @param ttlTp nullable time source passed in as an argument.
* @return a non-null time source.
*/
private ITtlTimeProvider extractTimeProvider(
@Nullable ITtlTimeProvider ttlTimeProvider) {
return ttlTimeProvider != null ? ttlTimeProvider : timeProvider;
@Nullable ITtlTimeProvider ttlTp) {
return ttlTp != null ? ttlTp : this.ttlTimeProvider;
}

/**
Expand Down
Loading