Skip to content

Commit

Permalink
HADOOP-18340 deleteOnExit does not work with S3AFileSystem
Browse files Browse the repository at this point in the history
  • Loading branch information
Huaxiang Sun committed Aug 2, 2022
1 parent d07256a commit 38cd6ad
Show file tree
Hide file tree
Showing 3 changed files with 305 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@
import java.util.Collections;
import java.util.Date;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.Objects;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
Expand Down Expand Up @@ -385,6 +387,12 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
*/
private ArnResource accessPoint;

/**
* A cache of files that should be deleted when the FileSystem is closed
* or the JVM is exited.
*/
private final Set<Path> deleteOnExit = new TreeSet<>();

/** Add any deprecated keys. */
@SuppressWarnings("deprecation")
private static void addDeprecatedKeys() {
Expand Down Expand Up @@ -3059,6 +3067,24 @@ public void removeKeys(
@AuditEntryPoint
public boolean delete(Path f, boolean recursive) throws IOException {
checkNotClosed();
return deleteWithoutCloseCheck(f, recursive);
}

/**
* Same as delete(), except that it does not check if fs is closed.
*
* @param f the path to delete.
* @param recursive if path is a directory and set to
* true, the directory is deleted else throws an exception. In
* case of a file the recursive can be set to either true or false.
* @return true if the path existed and then was deleted; false if there
* was no path in the first place, or the corner cases of root path deletion
* have surfaced.
* @throws IOException due to inability to delete a directory or file.
*/

@VisibleForTesting
protected boolean deleteWithoutCloseCheck(Path f, boolean recursive) throws IOException {
final Path path = qualify(f);
// span covers delete, getFileStatus, fake directory operations.
try (AuditSpan span = createSpan(INVOCATION_DELETE.getSymbol(),
Expand Down Expand Up @@ -3800,6 +3826,61 @@ UploadResult waitForUploadCompletion(String key, UploadInfo uploadInfo)
}
}

/**
* This override bypasses checking for existence.
*
* @param f the path to delete; this may be unqualified.
* @return true, always. * @param f the path to delete.
* @return true if deleteOnExit is successful, otherwise false.
* @throws IOException IO failure
*/
@Override
public boolean deleteOnExit(Path f) throws IOException {
Path qualifedPath = makeQualified(f);
synchronized (deleteOnExit) {
deleteOnExit.add(qualifedPath);
}
return true;
}

/**
* Cancel the scheduled deletion of the path when the FileSystem is closed.
* @param f the path to cancel deletion
* @return true if the path was found in the delete-on-exit list.
*/
@Override
public boolean cancelDeleteOnExit(Path f) {
Path qualifedPath = makeQualified(f);
synchronized (deleteOnExit) {
return deleteOnExit.remove(qualifedPath);
}
}

/**
* Delete all paths that were marked as delete-on-exit. This recursively
* deletes all files and directories in the specified paths. It does not
* check if file exists and filesystem is closed.
*
* The time to process this operation is {@code O(paths)}, with the actual
* time dependent on the time for existence and deletion operations to
* complete, successfully or not.
*/
@Override
protected void processDeleteOnExit() {
synchronized (deleteOnExit) {
for (Iterator<Path> iter = deleteOnExit.iterator(); iter.hasNext();) {
Path path = iter.next();
try {
deleteWithoutCloseCheck(path, true);
} catch (IOException e) {
LOG.info("Ignoring failure to deleteOnExit for path {}", path);
LOG.debug("The exception for deleteOnExit is {}", e);
}
iter.remove();
}
}
}

/**
* Close the filesystem. This shuts down all transfers.
* @throws IOException IO problem
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;

import org.junit.Test;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.io.IOUtils;

/**
* Test deleteOnExit for S3A.
* The following cases for deleteOnExit are tested:
* 1. A nonexist file, which is added to deleteOnExit set.
* 2. An existing file
* 3. A file is added to deleteOnExist set first, then created.
* 4. A directory with some files under it.
*/
public class ITestS3ADeleteOnExit extends AbstractS3ATestBase {

private static final String PARENT_DIR_PATH_STR = "testDeleteOnExitDir";
private static final String NON_EXIST_FILE_PATH_STR =
PARENT_DIR_PATH_STR + "/nonExistFile";
private static final String INORDER_FILE_PATH_STR =
PARENT_DIR_PATH_STR + "/inOrderFile";
private static final String OUT_OF_ORDER_FILE_PATH_STR =
PARENT_DIR_PATH_STR + "/outOfOrderFile";
private static final String SUBDIR_PATH_STR =
PARENT_DIR_PATH_STR + "/subDir";
private static final String FILE_UNDER_SUBDIR_PATH_STR =
SUBDIR_PATH_STR + "/subDirFile";

@Test
public void testDeleteOnExit() throws Exception {
FileSystem fs = getFileSystem();

// Get a new filesystem object which is same as fs.
FileSystem s3aFs = new S3AFileSystem();
s3aFs.initialize(fs.getUri(), fs.getConf());
Path nonExistFilePath = path(NON_EXIST_FILE_PATH_STR);
Path inOrderFilePath = path(INORDER_FILE_PATH_STR);
Path outOfOrderFilePath = path(OUT_OF_ORDER_FILE_PATH_STR);
Path subDirPath = path(SUBDIR_PATH_STR);
Path fileUnderSubDirPath = path(FILE_UNDER_SUBDIR_PATH_STR);
// 1. set up the test directory.
Path dir = path("testDeleteOnExitDir");
s3aFs.mkdirs(dir);

// 2. Add a nonexisting file to DeleteOnExit set.
s3aFs.deleteOnExit(nonExistFilePath);
ContractTestUtils.assertPathDoesNotExist(s3aFs,
"File " + NON_EXIST_FILE_PATH_STR + " should not exist", nonExistFilePath);

// 3. create a file and then add it to DeleteOnExit set.
FSDataOutputStream stream = s3aFs.create(inOrderFilePath, true);
byte[] data = ContractTestUtils.dataset(16, 'a', 26);
try {
stream.write(data);
} finally {
IOUtils.closeStream(stream);
}

ContractTestUtils.assertPathExists(s3aFs,
"File " + INORDER_FILE_PATH_STR + " should exist", inOrderFilePath);

s3aFs.deleteOnExit(inOrderFilePath);

// 4. add a path to DeleteOnExit set first, then create it.
s3aFs.deleteOnExit(outOfOrderFilePath);
stream = s3aFs.create(outOfOrderFilePath, true);
try {
stream.write(data);
} finally {
IOUtils.closeStream(stream);
}

ContractTestUtils.assertPathExists(s3aFs,
"File " + OUT_OF_ORDER_FILE_PATH_STR + " should exist", outOfOrderFilePath);

// 5. create a subdirectory, a file under it, and add subdirectory DeleteOnExit set.
s3aFs.mkdirs(subDirPath);
s3aFs.deleteOnExit(subDirPath);

stream = s3aFs.create(fileUnderSubDirPath, true);
try {
stream.write(data);
} finally {
IOUtils.closeStream(stream);
}

ContractTestUtils.assertPathExists(s3aFs,
"Directory " + SUBDIR_PATH_STR + " should exist", subDirPath);
ContractTestUtils.assertPathExists(s3aFs,
"File " + FILE_UNDER_SUBDIR_PATH_STR + " should exist", fileUnderSubDirPath);

s3aFs.close();

// After s3aFs is closed, make sure that all files/directories in deleteOnExit
// set are deleted.
ContractTestUtils.assertPathDoesNotExist(fs,
"File " + NON_EXIST_FILE_PATH_STR + " should not exist", nonExistFilePath);
ContractTestUtils.assertPathDoesNotExist(fs,
"File " + INORDER_FILE_PATH_STR + " should not exist", inOrderFilePath);
ContractTestUtils.assertPathDoesNotExist(fs,
"File " + OUT_OF_ORDER_FILE_PATH_STR + " should not exist", outOfOrderFilePath);
ContractTestUtils.assertPathDoesNotExist(fs,
"Directory " + SUBDIR_PATH_STR + " should not exist", subDirPath);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.s3a;

import static org.apache.hadoop.fs.s3a.Constants.FS_S3A;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.when;

import java.io.IOException;
import java.net.URI;
import java.util.Date;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
import com.amazonaws.services.s3.model.ObjectMetadata;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

import org.junit.Test;
import org.mockito.ArgumentMatcher;

/**
* deleteOnExit test for S3A.
*/
public class TestS3ADeleteOnExit extends AbstractS3AMockTest {

static class TestS3AFileSystem extends S3AFileSystem {
private int deleteOnDnExitCount;

public int getDeleteOnDnExitCount() {
return deleteOnDnExitCount;
}

@Override
public boolean deleteOnExit(Path f) throws IOException {
deleteOnDnExitCount++;
return super.deleteOnExit(f);
}

// This is specifically designed for deleteOnExit processing.
// In this specific case, deleteWithoutCloseCheck() will only be called in the path of
// processDeleteOnExit.
@Override
protected boolean deleteWithoutCloseCheck(Path f, boolean recursive) throws IOException {
boolean result = super.deleteWithoutCloseCheck(f, recursive);
deleteOnDnExitCount--;
return result;
}
}

@Test
public void testDeleteOnExit() throws Exception {
Configuration conf = createConfiguration();
TestS3AFileSystem testFs = new TestS3AFileSystem();
URI uri = URI.create(FS_S3A + "://" + BUCKET);
// unset S3CSE property from config to avoid pathIOE.
conf.unset(Constants.S3_ENCRYPTION_ALGORITHM);
testFs.initialize(uri, conf);
AmazonS3 testS3 = testFs.getAmazonS3ClientForTesting("mocking");

Path path = new Path("/file");
String key = path.toUri().getPath().substring(1);
ObjectMetadata meta = new ObjectMetadata();
meta.setContentLength(1L);
meta.setLastModified(new Date(2L));
when(testS3.getObjectMetadata(argThat(correctGetMetadataRequest(BUCKET, key))))
.thenReturn(meta);

testFs.deleteOnExit(path);
testFs.close();
assertEquals(0, testFs.getDeleteOnDnExitCount());
}

private ArgumentMatcher<GetObjectMetadataRequest> correctGetMetadataRequest(
String bucket, String key) {
return request -> request != null
&& request.getBucketName().equals(bucket)
&& request.getKey().equals(key);
}
}

0 comments on commit 38cd6ad

Please sign in to comment.