Skip to content

HADOOP-17528. SFTP File System: close the connection pool when closing a FileSystem #2701

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Feb 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.net.URLDecoder;
import java.util.ArrayList;
import java.util.Vector;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
Expand All @@ -50,6 +51,7 @@ public class SFTPFileSystem extends FileSystem {

private SFTPConnectionPool connectionPool;
private URI uri;
private final AtomicBoolean closed = new AtomicBoolean(false);

private static final int DEFAULT_SFTP_PORT = 22;
private static final int DEFAULT_MAX_CONNECTION = 5;
Expand Down Expand Up @@ -83,6 +85,7 @@ public class SFTPFileSystem extends FileSystem {
"Destination path %s already exist, cannot rename!";
public static final String E_FAILED_GETHOME = "Failed to get home directory";
public static final String E_FAILED_DISCONNECT = "Failed to disconnect";
public static final String E_FS_CLOSED = "FileSystem is closed!";

/**
* Set configuration from UI.
Expand Down Expand Up @@ -138,8 +141,9 @@ private void setConfigurationFromURI(URI uriInfo, Configuration conf)
* @throws IOException
*/
private ChannelSftp connect() throws IOException {
Configuration conf = getConf();
checkNotClosed();

Configuration conf = getConf();
String host = conf.get(FS_SFTP_HOST, null);
int port = conf.getInt(FS_SFTP_HOST_PORT, DEFAULT_SFTP_PORT);
String user = conf.get(FS_SFTP_USER_PREFIX + host, null);
Expand Down Expand Up @@ -703,6 +707,31 @@ public FileStatus getFileStatus(Path f) throws IOException {
}
}

@Override
public void close() throws IOException {
if (closed.getAndSet(true)) {
return;
}
try {
super.close();
} finally {
if (connectionPool != null) {
connectionPool.shutdown();
}
}
}

/**
* Verify that the input stream is open. Non blocking; this gives
* the last state of the volatile {@link #closed} field.
* @throws IOException if the connection is closed.
*/
private void checkNotClosed() throws IOException {
if (closed.get()) {
throw new IOException(uri + ": " + E_FS_CLOSED);
}
}

@VisibleForTesting
SFTPConnectionPool getConnectionPool() {
return connectionPool;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,4 +374,15 @@ public void testMkDirs() throws IOException {
assertThat(((SFTPFileSystem) sftpFs).getConnectionPool().getLiveConnCount(),
is(1));
}

@Test
public void testCloseFileSystemClosesConnectionPool() throws Exception {
SFTPFileSystem fs = (SFTPFileSystem) sftpFs;
fs.getHomeDirectory();
assertThat(fs.getConnectionPool().getLiveConnCount(), is(1));
fs.close();
assertThat(fs.getConnectionPool().getLiveConnCount(), is(0));
///making sure that re-entrant close calls are safe
fs.close();
}
}