diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index f314ac9c6e35d..acfca6799f4f8 100755 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -275,7 +275,7 @@ Configuration getConfiguration() { * that are currently being written by this client. * Note that a file can only be written by a single client. */ - private final Map filesBeingWritten = new HashMap<>(); + private final Map filesBeingWritten = new HashMap<>(); /** * Same as this(NameNode.getNNAddress(conf), conf); @@ -502,9 +502,9 @@ public LeaseRenewer getLeaseRenewer() { } /** Get a lease and start automatic renewal */ - private void beginFileLease(final long inodeId, final DFSOutputStream out) { + private void beginFileLease(final String key, final DFSOutputStream out) { synchronized (filesBeingWritten) { - putFileBeingWritten(inodeId, out); + putFileBeingWritten(key, out); LeaseRenewer renewer = getLeaseRenewer(); boolean result = renewer.put(this); if (!result) { @@ -518,9 +518,9 @@ private void beginFileLease(final long inodeId, final DFSOutputStream out) { } /** Stop renewal of lease for the file. */ - void endFileLease(final long inodeId) { + void endFileLease(final String renewLeaseKey) { synchronized (filesBeingWritten) { - removeFileBeingWritten(inodeId); + removeFileBeingWritten(renewLeaseKey); // remove client from renewer if no files are open if (filesBeingWritten.isEmpty()) { getLeaseRenewer().closeClient(this); @@ -532,10 +532,10 @@ void endFileLease(final long inodeId) { * enforced to consistently update its local dfsclients array and * client's filesBeingWritten map. */ - public void putFileBeingWritten(final long inodeId, + public void putFileBeingWritten(final String key, final DFSOutputStream out) { synchronized(filesBeingWritten) { - filesBeingWritten.put(inodeId, out); + filesBeingWritten.put(key, out); // update the last lease renewal time only when there was no // writes. once there is one write stream open, the lease renewer // thread keeps it updated well with in anyone's expiration time. @@ -546,9 +546,9 @@ public void putFileBeingWritten(final long inodeId, } /** Remove a file. Only called from LeaseRenewer. */ - public void removeFileBeingWritten(final long inodeId) { + public void removeFileBeingWritten(final String key) { synchronized(filesBeingWritten) { - filesBeingWritten.remove(inodeId); + filesBeingWritten.remove(key); if (filesBeingWritten.isEmpty()) { lastLeaseRenewal = 0; } @@ -580,6 +580,13 @@ void updateLastLeaseRenewal() { } } + @VisibleForTesting + public int getNumOfFilesBeingWritten() { + synchronized (filesBeingWritten) { + return filesBeingWritten.size(); + } + } + /** * Get all namespaces of DFSOutputStreams. */ @@ -640,14 +647,14 @@ void closeConnectionToNamenode() { /** Close/abort all files being written. */ public void closeAllFilesBeingWritten(final boolean abort) { for(;;) { - final long inodeId; + final String key; final DFSOutputStream out; synchronized(filesBeingWritten) { if (filesBeingWritten.isEmpty()) { return; } - inodeId = filesBeingWritten.keySet().iterator().next(); - out = filesBeingWritten.remove(inodeId); + key = filesBeingWritten.keySet().iterator().next(); + out = filesBeingWritten.remove(key); } if (out != null) { try { @@ -658,7 +665,7 @@ public void closeAllFilesBeingWritten(final boolean abort) { } } catch(IOException ie) { LOG.error("Failed to " + (abort ? "abort" : "close") + " file: " - + out.getSrc() + " with inode: " + inodeId, ie); + + out.getSrc() + " with renewLeaseKey: " + key, ie); } } } @@ -1297,7 +1304,7 @@ public DFSOutputStream create(String src, FsPermission permission, src, masked, flag, createParent, replication, blockSize, progress, dfsClientConf.createChecksum(checksumOpt), getFavoredNodesStr(favoredNodes), ecPolicyName, storagePolicy); - beginFileLease(result.getFileId(), result); + beginFileLease(result.getUniqKey(), result); return result; } @@ -1352,7 +1359,7 @@ public DFSOutputStream primitiveCreate(String src, FsPermission absPermission, flag, createParent, replication, blockSize, progress, checksum, null, null, null); } - beginFileLease(result.getFileId(), result); + beginFileLease(result.getUniqKey(), result); return result; } @@ -1497,7 +1504,7 @@ private DFSOutputStream append(String src, int buffersize, checkOpen(); final DFSOutputStream result = callAppend(src, flag, progress, favoredNodes); - beginFileLease(result.getFileId(), result); + beginFileLease(result.getUniqKey(), result); return result; } @@ -2418,8 +2425,8 @@ long rollEdits() throws IOException { } @VisibleForTesting - ExtendedBlock getPreviousBlock(long fileId) { - return filesBeingWritten.get(fileId).getBlock(); + ExtendedBlock getPreviousBlock(String key) { + return filesBeingWritten.get(key).getBlock(); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index 92df7c51b23e1..6ddd56cf72703 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -114,6 +114,7 @@ public class DFSOutputStream extends FSOutputSummer protected final String src; protected final long fileId; private final String namespace; + private final String uniqKey; protected final long blockSize; protected final int bytesPerChecksum; @@ -197,6 +198,14 @@ private DFSOutputStream(DFSClient dfsClient, String src, this.src = src; this.fileId = stat.getFileId(); this.namespace = stat.getNamespace(); + if (this.namespace == null) { + String defaultKey = dfsClient.getConfiguration().get( + HdfsClientConfigKeys.DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY, + HdfsClientConfigKeys.DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY_DEFAULT); + this.uniqKey = defaultKey + "_" + this.fileId; + } else { + this.uniqKey = this.namespace + "_" + this.fileId; + } this.blockSize = stat.getBlockSize(); this.blockReplication = stat.getReplication(); this.fileEncryptionInfo = stat.getFileEncryptionInfo(); @@ -820,7 +829,7 @@ boolean isClosed() { void setClosed() { closed = true; - dfsClient.endFileLease(fileId); + dfsClient.endFileLease(getUniqKey()); getStreamer().release(); } @@ -923,7 +932,7 @@ protected synchronized void closeImpl() throws IOException { protected void recoverLease(boolean recoverLeaseOnCloseException) { if (recoverLeaseOnCloseException) { try { - dfsClient.endFileLease(fileId); + dfsClient.endFileLease(getUniqKey()); dfsClient.recoverLease(src); leaseRecovered = true; } catch (Exception e) { @@ -1091,6 +1100,11 @@ public String getNamespace() { return namespace; } + @VisibleForTesting + public String getUniqKey() { + return this.uniqKey; + } + /** * Return the source of stream. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java index 0f60027269d8f..1233c033ee0e6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java @@ -1055,7 +1055,7 @@ void abort() throws IOException { } } - dfsClient.endFileLease(fileId); + dfsClient.endFileLease(getUniqKey()); final IOException ioe = b.build(); if (ioe != null) { throw ioe; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java index 8e9a5b62490d0..e3e01fde3a51c 100755 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java @@ -281,6 +281,10 @@ public interface HdfsClientConfigKeys { "dfs.client.fsck.read.timeout"; int DFS_CLIENT_FSCK_READ_TIMEOUT_DEFAULT = 60 * 1000; + String DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY = + "dfs.client.output.stream.uniq.default.key"; + String DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY_DEFAULT = "DEFAULT"; + /** * These are deprecated config keys to client code. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRenewLeaseWithSameINodeId.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRenewLeaseWithSameINodeId.java new file mode 100644 index 0000000000000..76d64e4eb5583 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRenewLeaseWithSameINodeId.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster; +import org.apache.hadoop.hdfs.server.federation.MockResolver; +import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; + +import static org.junit.Assert.assertEquals; + +/** + * Testing DFSClient renewLease with same INodeId. + */ +public class TestRenewLeaseWithSameINodeId { + + /** Federated HDFS cluster. */ + private static MiniRouterDFSCluster cluster; + + /** The first Router Context for this federated cluster. */ + private static MiniRouterDFSCluster.RouterContext routerContext; + + @BeforeClass + public static void globalSetUp() throws Exception { + cluster = new MiniRouterDFSCluster(false, 2); + cluster.setNumDatanodesPerNameservice(3); + cluster.startCluster(); + + Configuration routerConf = new RouterConfigBuilder() + .metrics() + .rpc() + .quota() + .build(); + cluster.addRouterOverrides(routerConf); + cluster.startRouters(); + + // Register and verify all NNs with all routers + cluster.registerNamenodes(); + cluster.waitNamenodeRegistration(); + + routerContext = cluster.getRouters().get(0); + } + + @AfterClass + public static void tearDown() throws Exception { + cluster.shutdown(); + } + + /** + * Testing case: + * 1. One Router DFSClient writing multi files from different namespace with same iNodeId. + * 2. DFSClient Lease Renewer should work well. + */ + @Test + public void testRenewLeaseWithSameINodeId() throws IOException { + // Add mount point "/ns0" and "/ns1" + Router router = cluster.getRouters().get(0).getRouter(); + MockResolver resolver = (MockResolver) router.getSubclusterResolver(); + resolver.addLocation("/ns0", cluster.getNameservices().get(0), "/ns0"); + resolver.addLocation("/ns1", cluster.getNameservices().get(1), "/ns1"); + + DistributedFileSystem fs = (DistributedFileSystem) routerContext.getFileSystem(); + + Path path1 = new Path("/ns0/file"); + Path path2 = new Path("/ns1/file"); + + try (FSDataOutputStream ignored1 = fs.create(path1); + FSDataOutputStream ignored2 = fs.create(path2)) { + HdfsFileStatus fileStatus1 = fs.getClient().getFileInfo(path1.toUri().getPath()); + HdfsFileStatus fileStatus2 = fs.getClient().getFileInfo(path2.toUri().getPath()); + + // The fileId of the files from different new namespaces should be same. + assertEquals(fileStatus2.getFileId(), fileStatus1.getFileId()); + + // The number of fileBeingWritten of this DFSClient should be two. + assertEquals(2, fs.getClient().getNumOfFilesBeingWritten()); + } + + // The number of fileBeingWritten of this DFSClient should be zero. + assertEquals(0, fs.getClient().getNumOfFilesBeingWritten()); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 28fe382b17151..da19904cfbd01 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -6454,4 +6454,12 @@ frequently than this time, the client will give up waiting. + + dfs.client.output.stream.uniq.default.key + DEFAULT + + The default prefix key to construct the uniqKey for one DFSOutputStream. + If the namespace is DEFAULT, it's best to change this conf to other value. + + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSClientAdapter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSClientAdapter.java index 6a211ef51c194..ccf45f6afc9a4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSClientAdapter.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSClientAdapter.java @@ -54,8 +54,8 @@ public static DFSClient getClient(DistributedFileSystem dfs) return dfs.dfs; } - public static ExtendedBlock getPreviousBlock(DFSClient client, long fileId) { - return client.getPreviousBlock(fileId); + public static ExtendedBlock getPreviousBlock(DFSClient client, String renewLeaseKey) { + return client.getPreviousBlock(renewLeaseKey); } public static long getFileId(DFSOutputStream out) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java index 4c1633a1e8605..8b90287e82fe1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java @@ -68,8 +68,9 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.anyBoolean; -import static org.mockito.ArgumentMatchers.anyLong; import org.mockito.Mockito; + +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -433,7 +434,7 @@ public void testEndLeaseCall() throws Exception { EnumSet.of(CreateFlag.CREATE), (short) 3, 1024, null , 1024, null); DFSOutputStream spyDFSOutputStream = Mockito.spy(dfsOutputStream); spyDFSOutputStream.closeThreads(anyBoolean()); - verify(spyClient, times(1)).endFileLease(anyLong()); + verify(spyClient, times(1)).endFileLease(anyString()); } @Test diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHASafeMode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHASafeMode.java index 37e279dfa5ee7..16a57c6867242 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHASafeMode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHASafeMode.java @@ -852,13 +852,13 @@ public void testOpenFileWhenNNAndClientCrashAfterAddBlock() throws Exception { null); create.write(testData.getBytes()); create.hflush(); - long fileId = ((DFSOutputStream)create. - getWrappedStream()).getFileId(); + String renewLeaseKey = ((DFSOutputStream)create. + getWrappedStream()).getUniqKey(); FileStatus fileStatus = dfs.getFileStatus(filePath); DFSClient client = DFSClientAdapter.getClient(dfs); // add one dummy block at NN, but not write to DataNode ExtendedBlock previousBlock = - DFSClientAdapter.getPreviousBlock(client, fileId); + DFSClientAdapter.getPreviousBlock(client, renewLeaseKey); DFSClientAdapter.getNamenode(client).addBlock( pathString, client.getClientName(),