Skip to content

Commit a1a738b

Browse files
committed
HDFS-17632. RBF: Support listOpenFiles for routers
1 parent 4ddf19c commit a1a738b

File tree

3 files changed

+260
-4
lines changed

3 files changed

+260
-4
lines changed

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import static org.apache.hadoop.hdfs.server.federation.router.FederationUtil.updateMountPointStatus;
2323
import org.apache.hadoop.conf.Configuration;
2424
import org.apache.hadoop.crypto.CryptoProtocolVersion;
25+
import org.apache.hadoop.fs.BatchedRemoteIterator;
2526
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
2627
import org.apache.hadoop.fs.CacheFlag;
2728
import org.apache.hadoop.fs.ContentSummary;
@@ -1977,8 +1978,45 @@ public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId)
19771978
public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId,
19781979
EnumSet<OpenFilesIterator.OpenFilesType> openFilesTypes, String path)
19791980
throws IOException {
1980-
rpcServer.checkOperation(NameNode.OperationCategory.READ, false);
1981-
return null;
1981+
rpcServer.checkOperation(NameNode.OperationCategory.READ, true);
1982+
List<RemoteLocation> locations = rpcServer.getLocationsForPath(path, false, false);
1983+
RemoteMethod method =
1984+
new RemoteMethod("listOpenFiles", new Class<?>[] {long.class, EnumSet.class, String.class},
1985+
prevId, openFilesTypes, new RemoteParam());
1986+
Map<RemoteLocation, BatchedEntries> results =
1987+
rpcClient.invokeConcurrent(locations, method, true, false, -1, BatchedEntries.class);
1988+
1989+
// Get the largest inodeIds for each namespace, and the smallest inodeId of them
1990+
// then ignore all entries above this id to keep a consistent prevId for the next listOpenFiles
1991+
long minOfMax = Long.MAX_VALUE;
1992+
for (BatchedEntries nsEntries : results.values()) {
1993+
// Only need to care about namespaces that still have more files to report
1994+
if (!nsEntries.hasMore()) {
1995+
continue;
1996+
}
1997+
long max = 0;
1998+
for (int i = 0; i < nsEntries.size(); i++) {
1999+
max = Math.max(max, ((OpenFileEntry) nsEntries.get(i)).getId());
2000+
}
2001+
minOfMax = Math.min(minOfMax, max);
2002+
}
2003+
// Concatenate all entries into one result, sorted by inodeId
2004+
List<OpenFileEntry> routerEntries = new ArrayList<>();
2005+
boolean hasMore = false;
2006+
for (Map.Entry<RemoteLocation, BatchedEntries> entry : results.entrySet()) {
2007+
BatchedEntries nsEntries = entry.getValue();
2008+
hasMore |= nsEntries.hasMore();
2009+
for (int i = 0; i < nsEntries.size(); i++) {
2010+
OpenFileEntry ofe = (OpenFileEntry) nsEntries.get(i);
2011+
if (ofe.getId() > minOfMax) {
2012+
hasMore = true;
2013+
break;
2014+
}
2015+
routerEntries.add(ofe);
2016+
}
2017+
}
2018+
routerEntries.sort(Comparator.comparingLong(OpenFileEntry::getId));
2019+
return new BatchedRemoteIterator.BatchedListEntries<>(routerEntries, hasMore);
19822020
}
19832021

19842022
@Override
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
* <p>
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
* <p>
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hdfs.server.federation.router;
19+
20+
import java.io.IOException;
21+
import java.io.OutputStream;
22+
import java.util.ArrayList;
23+
import java.util.Collections;
24+
import java.util.EnumSet;
25+
import java.util.List;
26+
27+
import org.junit.jupiter.api.AfterAll;
28+
import org.junit.jupiter.api.BeforeAll;
29+
import org.junit.jupiter.api.BeforeEach;
30+
import org.junit.jupiter.api.Test;
31+
32+
import org.apache.hadoop.conf.Configuration;
33+
import org.apache.hadoop.fs.BatchedRemoteIterator;
34+
import org.apache.hadoop.fs.RemoteIterator;
35+
import org.apache.hadoop.hdfs.DFSClient;
36+
import org.apache.hadoop.hdfs.DFSConfigKeys;
37+
import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
38+
import org.apache.hadoop.hdfs.protocol.OpenFilesIterator;
39+
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
40+
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
41+
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
42+
import org.apache.hadoop.hdfs.server.federation.resolver.MultipleDestinationMountTableResolver;
43+
import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
44+
45+
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.createMountTableEntry;
46+
import static org.apache.hadoop.test.GenericTestUtils.getMethodName;
47+
import static org.junit.jupiter.api.Assertions.assertEquals;
48+
import static org.junit.jupiter.api.Assertions.assertTrue;
49+
50+
public class TestRouterListOpenFiles {
51+
final private static int NUM_SUBCLUSTERS = 2;
52+
final private static int BATCH_SIZE = 3;
53+
private static StateStoreDFSCluster cluster;
54+
private static MiniRouterDFSCluster.RouterContext routerContext;
55+
private static RouterClientProtocol routerProtocol;
56+
private static DFSClient client0;
57+
private static DFSClient client1;
58+
private static DFSClient routerClient;
59+
60+
@BeforeAll
61+
public static void setup() throws Exception {
62+
cluster = new StateStoreDFSCluster(false, NUM_SUBCLUSTERS,
63+
MultipleDestinationMountTableResolver.class);
64+
Configuration conf = new RouterConfigBuilder().stateStore().heartbeat().admin().rpc().build();
65+
conf.set(RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE, "ns0,ns1");
66+
conf.setBoolean(RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE, true);
67+
conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES, BATCH_SIZE);
68+
cluster.addRouterOverrides(conf);
69+
cluster.startCluster(conf);
70+
cluster.startRouters();
71+
cluster.waitClusterUp();
72+
routerContext = cluster.getRandomRouter();
73+
routerProtocol = routerContext.getRouterRpcServer().getClientProtocolModule();
74+
routerClient = routerContext.getClient();
75+
client0 = cluster.getNamenode("ns0", null).getClient();
76+
client1 = cluster.getNamenode("ns1", null).getClient();
77+
}
78+
79+
@AfterAll
80+
public static void cleanup() {
81+
if (cluster != null) {
82+
cluster.shutdown();
83+
cluster = null;
84+
}
85+
}
86+
87+
@BeforeEach
88+
public void resetInodeId() {
89+
cluster.getNamenode("ns0", null).getNamenode().getNamesystem().getFSDirectory()
90+
.resetLastInodeIdWithoutChecking(12345);
91+
cluster.getNamenode("ns1", null).getNamenode().getNamesystem().getFSDirectory()
92+
.resetLastInodeIdWithoutChecking(12345);
93+
}
94+
95+
@Test
96+
public void testSingleDestination() throws Exception {
97+
String testPath = "/" + getMethodName();
98+
createMountTableEntry(routerContext.getRouter(), testPath, DestinationOrder.HASH,
99+
Collections.singletonList("ns0"));
100+
101+
// Create 2 dirs with the same name on 2 different nss
102+
client0.mkdirs(testPath);
103+
client1.mkdirs(testPath);
104+
// Open 2 files with different names
105+
OutputStream os0 = client0.create(testPath + "/file0", true);
106+
OutputStream os1 = client1.create(testPath + "/file1", true);
107+
108+
BatchedRemoteIterator.BatchedEntries<OpenFileEntry> result =
109+
routerProtocol.listOpenFiles(0, EnumSet.of(OpenFilesIterator.OpenFilesType.ALL_OPEN_FILES),
110+
testPath);
111+
// Should list only the entry on ns0
112+
assertEquals(1, result.size());
113+
assertEquals(testPath + "/file0", result.get(0).getFilePath());
114+
os0.close();
115+
os1.close();
116+
}
117+
118+
@Test
119+
public void testMultipleDestinations() throws Exception {
120+
String testPath = "/" + getMethodName();
121+
createMountTableEntry(routerContext.getRouter(), testPath, DestinationOrder.HASH_ALL,
122+
cluster.getNameservices());
123+
124+
// Create 2 dirs with the same name on 2 different nss
125+
client0.mkdirs(testPath);
126+
client1.mkdirs(testPath);
127+
128+
// Open 2 files with different names
129+
OutputStream os0 = client0.create(testPath + "/file0", true);
130+
OutputStream os1 = client1.create(testPath + "/file1", true);
131+
BatchedRemoteIterator.BatchedEntries<OpenFileEntry> result =
132+
routerProtocol.listOpenFiles(0, EnumSet.of(OpenFilesIterator.OpenFilesType.ALL_OPEN_FILES),
133+
testPath);
134+
// Should list both entries on ns0 and ns1
135+
assertEquals(2, result.size());
136+
assertEquals(testPath + "/file0", result.get(0).getFilePath());
137+
assertEquals(testPath + "/file1", result.get(1).getFilePath());
138+
RemoteIterator<OpenFileEntry> ite = routerClient.listOpenFiles(testPath);
139+
while (ite.hasNext()) {
140+
OpenFileEntry ofe = ite.next();
141+
assertTrue(ofe.getFilePath().equals(testPath + "/file0") || ofe.getFilePath()
142+
.equals(testPath + "/file1"));
143+
}
144+
os0.close();
145+
os1.close();
146+
147+
// Open 2 files with same name
148+
os0 = client0.create(testPath + "/file2", true);
149+
os1 = client1.create(testPath + "/file2", true);
150+
result =
151+
routerProtocol.listOpenFiles(0, EnumSet.of(OpenFilesIterator.OpenFilesType.ALL_OPEN_FILES),
152+
testPath);
153+
// Should list both entries
154+
assertEquals(2, result.size());
155+
assertEquals(testPath + "/file2", result.get(0).getFilePath());
156+
assertEquals(testPath + "/file2", result.get(1).getFilePath());
157+
ite = routerClient.listOpenFiles(testPath);
158+
while (ite.hasNext()) {
159+
OpenFileEntry ofe = ite.next();
160+
assertTrue(ofe.getFilePath().equals(testPath + "/file2"));
161+
}
162+
os0.close();
163+
os1.close();
164+
}
165+
166+
@Test
167+
public void testMultipleDestinationsMultipleBatches() throws Exception {
168+
String testPath = "/" + getMethodName();
169+
createMountTableEntry(routerContext.getRouter(), testPath, DestinationOrder.HASH_ALL,
170+
cluster.getNameservices());
171+
172+
// Make ns1 have a much bigger inodeid than ns0
173+
cluster.getNamenode("ns0", null).getNamenode().getNamesystem().getFSDirectory()
174+
.resetLastInodeIdWithoutChecking((long) 1E6);
175+
cluster.getNamenode("ns1", null).getNamenode().getNamesystem().getFSDirectory()
176+
.resetLastInodeIdWithoutChecking((long) 2E6);
177+
runBatchListOpenFilesTest(testPath);
178+
179+
// Rerun the test with ns0 having a much bigger inodeid than ns1
180+
cluster.getNamenode("ns0", null).getNamenode().getNamesystem().getFSDirectory()
181+
.resetLastInodeIdWithoutChecking((long) 4E6);
182+
cluster.getNamenode("ns1", null).getNamenode().getNamesystem().getFSDirectory()
183+
.resetLastInodeIdWithoutChecking((long) 3E6);
184+
runBatchListOpenFilesTest(testPath);
185+
}
186+
187+
private static void runBatchListOpenFilesTest(String testPath) throws IOException {
188+
// Create 2 dirs with the same name on 2 different nss
189+
client0.mkdirs(testPath);
190+
client1.mkdirs(testPath);
191+
// Open 3 batches on both namespaces
192+
OutputStream[] oss0 = new OutputStream[3 * BATCH_SIZE];
193+
OutputStream[] oss1 = new OutputStream[3 * BATCH_SIZE];
194+
for (int i = 0; i < 3 * BATCH_SIZE; i++) {
195+
oss0[i] = client0.create(testPath + "/file0a_" + i, true);
196+
oss1[i] = client1.create(testPath + "/file1a_" + i, true);
197+
}
198+
RemoteIterator<OpenFileEntry> ite = routerClient.listOpenFiles(testPath);
199+
List<OpenFileEntry> allEntries = new ArrayList<>();
200+
while (ite.hasNext()) {
201+
allEntries.add(ite.next());
202+
}
203+
// All files should be reported once
204+
assertEquals(3 * 2 * BATCH_SIZE, allEntries.size());
205+
206+
// Clean up
207+
for (int i = 0; i < 3 * BATCH_SIZE; i++) {
208+
oss0[i].close();
209+
oss1[i].close();
210+
}
211+
client0.delete(testPath, true);
212+
client1.delete(testPath, true);
213+
}
214+
}

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2060,8 +2060,12 @@ void resetLastInodeId(long newValue) throws IOException {
20602060
}
20612061
}
20622062

2063-
/** Should only be used for tests to reset to any value */
2064-
void resetLastInodeIdWithoutChecking(long newValue) {
2063+
/**
2064+
* Should only be used for tests to reset to any value.
2065+
* @param newValue new value to set to
2066+
*/
2067+
@VisibleForTesting
2068+
public void resetLastInodeIdWithoutChecking(long newValue) {
20652069
inodeId.setCurrentValue(newValue);
20662070
}
20672071

0 commit comments

Comments
 (0)