Skip to content

Commit

Permalink
HDFS-16748. RBF: DFSClient should uniquely identify writing files by …
Browse files Browse the repository at this point in the history
…namespace id and iNodeId via RBF (#4813). Contributed by ZanderXu.

Reviewed-by: He Xiaoqiao <hexiaoqiao@apache.org>
Signed-off-by: Ayush Saxena <ayushsaxena@apache.org>
  • Loading branch information
ZanderXu authored Sep 5, 2022
1 parent ac42519 commit be4c638
Show file tree
Hide file tree
Showing 9 changed files with 168 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ Configuration getConfiguration() {
* that are currently being written by this client.
* Note that a file can only be written by a single client.
*/
private final Map<Long, DFSOutputStream> filesBeingWritten = new HashMap<>();
private final Map<String, DFSOutputStream> filesBeingWritten = new HashMap<>();

/**
* Same as this(NameNode.getNNAddress(conf), conf);
Expand Down Expand Up @@ -502,9 +502,9 @@ public LeaseRenewer getLeaseRenewer() {
}

/** Get a lease and start automatic renewal */
private void beginFileLease(final long inodeId, final DFSOutputStream out) {
private void beginFileLease(final String key, final DFSOutputStream out) {
synchronized (filesBeingWritten) {
putFileBeingWritten(inodeId, out);
putFileBeingWritten(key, out);
LeaseRenewer renewer = getLeaseRenewer();
boolean result = renewer.put(this);
if (!result) {
Expand All @@ -518,9 +518,9 @@ private void beginFileLease(final long inodeId, final DFSOutputStream out) {
}

/** Stop renewal of lease for the file. */
void endFileLease(final long inodeId) {
void endFileLease(final String renewLeaseKey) {
synchronized (filesBeingWritten) {
removeFileBeingWritten(inodeId);
removeFileBeingWritten(renewLeaseKey);
// remove client from renewer if no files are open
if (filesBeingWritten.isEmpty()) {
getLeaseRenewer().closeClient(this);
Expand All @@ -532,10 +532,10 @@ void endFileLease(final long inodeId) {
* enforced to consistently update its local dfsclients array and
* client's filesBeingWritten map.
*/
public void putFileBeingWritten(final long inodeId,
public void putFileBeingWritten(final String key,
final DFSOutputStream out) {
synchronized(filesBeingWritten) {
filesBeingWritten.put(inodeId, out);
filesBeingWritten.put(key, out);
// update the last lease renewal time only when there was no
// writes. once there is one write stream open, the lease renewer
// thread keeps it updated well with in anyone's expiration time.
Expand All @@ -546,9 +546,9 @@ public void putFileBeingWritten(final long inodeId,
}

/** Remove a file. Only called from LeaseRenewer. */
public void removeFileBeingWritten(final long inodeId) {
public void removeFileBeingWritten(final String key) {
synchronized(filesBeingWritten) {
filesBeingWritten.remove(inodeId);
filesBeingWritten.remove(key);
if (filesBeingWritten.isEmpty()) {
lastLeaseRenewal = 0;
}
Expand Down Expand Up @@ -580,6 +580,13 @@ void updateLastLeaseRenewal() {
}
}

@VisibleForTesting
public int getNumOfFilesBeingWritten() {
synchronized (filesBeingWritten) {
return filesBeingWritten.size();
}
}

/**
* Get all namespaces of DFSOutputStreams.
*/
Expand Down Expand Up @@ -640,14 +647,14 @@ void closeConnectionToNamenode() {
/** Close/abort all files being written. */
public void closeAllFilesBeingWritten(final boolean abort) {
for(;;) {
final long inodeId;
final String key;
final DFSOutputStream out;
synchronized(filesBeingWritten) {
if (filesBeingWritten.isEmpty()) {
return;
}
inodeId = filesBeingWritten.keySet().iterator().next();
out = filesBeingWritten.remove(inodeId);
key = filesBeingWritten.keySet().iterator().next();
out = filesBeingWritten.remove(key);
}
if (out != null) {
try {
Expand All @@ -658,7 +665,7 @@ public void closeAllFilesBeingWritten(final boolean abort) {
}
} catch(IOException ie) {
LOG.error("Failed to " + (abort ? "abort" : "close") + " file: "
+ out.getSrc() + " with inode: " + inodeId, ie);
+ out.getSrc() + " with renewLeaseKey: " + key, ie);
}
}
}
Expand Down Expand Up @@ -1297,7 +1304,7 @@ public DFSOutputStream create(String src, FsPermission permission,
src, masked, flag, createParent, replication, blockSize, progress,
dfsClientConf.createChecksum(checksumOpt),
getFavoredNodesStr(favoredNodes), ecPolicyName, storagePolicy);
beginFileLease(result.getFileId(), result);
beginFileLease(result.getUniqKey(), result);
return result;
}

Expand Down Expand Up @@ -1352,7 +1359,7 @@ public DFSOutputStream primitiveCreate(String src, FsPermission absPermission,
flag, createParent, replication, blockSize, progress, checksum,
null, null, null);
}
beginFileLease(result.getFileId(), result);
beginFileLease(result.getUniqKey(), result);
return result;
}

Expand Down Expand Up @@ -1497,7 +1504,7 @@ private DFSOutputStream append(String src, int buffersize,
checkOpen();
final DFSOutputStream result = callAppend(src, flag, progress,
favoredNodes);
beginFileLease(result.getFileId(), result);
beginFileLease(result.getUniqKey(), result);
return result;
}

Expand Down Expand Up @@ -2418,8 +2425,8 @@ long rollEdits() throws IOException {
}

@VisibleForTesting
ExtendedBlock getPreviousBlock(long fileId) {
return filesBeingWritten.get(fileId).getBlock();
ExtendedBlock getPreviousBlock(String key) {
return filesBeingWritten.get(key).getBlock();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public class DFSOutputStream extends FSOutputSummer
protected final String src;
protected final long fileId;
private final String namespace;
private final String uniqKey;
protected final long blockSize;
protected final int bytesPerChecksum;

Expand Down Expand Up @@ -197,6 +198,14 @@ private DFSOutputStream(DFSClient dfsClient, String src,
this.src = src;
this.fileId = stat.getFileId();
this.namespace = stat.getNamespace();
if (this.namespace == null) {
String defaultKey = dfsClient.getConfiguration().get(
HdfsClientConfigKeys.DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY,
HdfsClientConfigKeys.DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY_DEFAULT);
this.uniqKey = defaultKey + "_" + this.fileId;
} else {
this.uniqKey = this.namespace + "_" + this.fileId;
}
this.blockSize = stat.getBlockSize();
this.blockReplication = stat.getReplication();
this.fileEncryptionInfo = stat.getFileEncryptionInfo();
Expand Down Expand Up @@ -820,7 +829,7 @@ boolean isClosed() {

void setClosed() {
closed = true;
dfsClient.endFileLease(fileId);
dfsClient.endFileLease(getUniqKey());
getStreamer().release();
}

Expand Down Expand Up @@ -923,7 +932,7 @@ protected synchronized void closeImpl() throws IOException {
protected void recoverLease(boolean recoverLeaseOnCloseException) {
if (recoverLeaseOnCloseException) {
try {
dfsClient.endFileLease(fileId);
dfsClient.endFileLease(getUniqKey());
dfsClient.recoverLease(src);
leaseRecovered = true;
} catch (Exception e) {
Expand Down Expand Up @@ -1091,6 +1100,11 @@ public String getNamespace() {
return namespace;
}

@VisibleForTesting
public String getUniqKey() {
return this.uniqKey;
}

/**
* Return the source of stream.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1055,7 +1055,7 @@ void abort() throws IOException {
}
}

dfsClient.endFileLease(fileId);
dfsClient.endFileLease(getUniqKey());
final IOException ioe = b.build();
if (ioe != null) {
throw ioe;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,10 @@ public interface HdfsClientConfigKeys {
"dfs.client.fsck.read.timeout";
int DFS_CLIENT_FSCK_READ_TIMEOUT_DEFAULT = 60 * 1000;

String DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY =
"dfs.client.output.stream.uniq.default.key";
String DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY_DEFAULT = "DEFAULT";

/**
* These are deprecated config keys to client code.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
import org.apache.hadoop.hdfs.server.federation.MockResolver;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

import java.io.IOException;

import static org.junit.Assert.assertEquals;

/**
* Testing DFSClient renewLease with same INodeId.
*/
public class TestRenewLeaseWithSameINodeId {

/** Federated HDFS cluster. */
private static MiniRouterDFSCluster cluster;

/** The first Router Context for this federated cluster. */
private static MiniRouterDFSCluster.RouterContext routerContext;

@BeforeClass
public static void globalSetUp() throws Exception {
cluster = new MiniRouterDFSCluster(false, 2);
cluster.setNumDatanodesPerNameservice(3);
cluster.startCluster();

Configuration routerConf = new RouterConfigBuilder()
.metrics()
.rpc()
.quota()
.build();
cluster.addRouterOverrides(routerConf);
cluster.startRouters();

// Register and verify all NNs with all routers
cluster.registerNamenodes();
cluster.waitNamenodeRegistration();

routerContext = cluster.getRouters().get(0);
}

@AfterClass
public static void tearDown() throws Exception {
cluster.shutdown();
}

/**
* Testing case:
* 1. One Router DFSClient writing multi files from different namespace with same iNodeId.
* 2. DFSClient Lease Renewer should work well.
*/
@Test
public void testRenewLeaseWithSameINodeId() throws IOException {
// Add mount point "/ns0" and "/ns1"
Router router = cluster.getRouters().get(0).getRouter();
MockResolver resolver = (MockResolver) router.getSubclusterResolver();
resolver.addLocation("/ns0", cluster.getNameservices().get(0), "/ns0");
resolver.addLocation("/ns1", cluster.getNameservices().get(1), "/ns1");

DistributedFileSystem fs = (DistributedFileSystem) routerContext.getFileSystem();

Path path1 = new Path("/ns0/file");
Path path2 = new Path("/ns1/file");

try (FSDataOutputStream ignored1 = fs.create(path1);
FSDataOutputStream ignored2 = fs.create(path2)) {
HdfsFileStatus fileStatus1 = fs.getClient().getFileInfo(path1.toUri().getPath());
HdfsFileStatus fileStatus2 = fs.getClient().getFileInfo(path2.toUri().getPath());

// The fileId of the files from different new namespaces should be same.
assertEquals(fileStatus2.getFileId(), fileStatus1.getFileId());

// The number of fileBeingWritten of this DFSClient should be two.
assertEquals(2, fs.getClient().getNumOfFilesBeingWritten());
}

// The number of fileBeingWritten of this DFSClient should be zero.
assertEquals(0, fs.getClient().getNumOfFilesBeingWritten());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6454,4 +6454,12 @@
frequently than this time, the client will give up waiting.
</description>
</property>
<property>
<name>dfs.client.output.stream.uniq.default.key</name>
<value>DEFAULT</value>
<description>
The default prefix key to construct the uniqKey for one DFSOutputStream.
If the namespace is DEFAULT, it's best to change this conf to other value.
</description>
</property>
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ public static DFSClient getClient(DistributedFileSystem dfs)
return dfs.dfs;
}

public static ExtendedBlock getPreviousBlock(DFSClient client, long fileId) {
return client.getPreviousBlock(fileId);
public static ExtendedBlock getPreviousBlock(DFSClient client, String renewLeaseKey) {
return client.getPreviousBlock(renewLeaseKey);
}

public static long getFileId(DFSOutputStream out) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,9 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
import org.mockito.Mockito;

import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

Expand Down Expand Up @@ -433,7 +434,7 @@ public void testEndLeaseCall() throws Exception {
EnumSet.of(CreateFlag.CREATE), (short) 3, 1024, null , 1024, null);
DFSOutputStream spyDFSOutputStream = Mockito.spy(dfsOutputStream);
spyDFSOutputStream.closeThreads(anyBoolean());
verify(spyClient, times(1)).endFileLease(anyLong());
verify(spyClient, times(1)).endFileLease(anyString());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -852,13 +852,13 @@ public void testOpenFileWhenNNAndClientCrashAfterAddBlock() throws Exception {
null);
create.write(testData.getBytes());
create.hflush();
long fileId = ((DFSOutputStream)create.
getWrappedStream()).getFileId();
String renewLeaseKey = ((DFSOutputStream)create.
getWrappedStream()).getUniqKey();
FileStatus fileStatus = dfs.getFileStatus(filePath);
DFSClient client = DFSClientAdapter.getClient(dfs);
// add one dummy block at NN, but not write to DataNode
ExtendedBlock previousBlock =
DFSClientAdapter.getPreviousBlock(client, fileId);
DFSClientAdapter.getPreviousBlock(client, renewLeaseKey);
DFSClientAdapter.getNamenode(client).addBlock(
pathString,
client.getClientName(),
Expand Down

0 comments on commit be4c638

Please sign in to comment.