Skip to content

Commit 6f038f3

Browse files
committed
HDFS-17656. [ARR] RouterNamenodeProtocol and RouterUserProtocol supports asynchronous rpc. (apache#7159). Contributed by Jian Zhang.
Reviewed-by: Jian Zhang <keepromise@apache.org> Signed-off-by: Jian Zhang <keepromise@apache.org>
1 parent 1d7471f commit 6f038f3

File tree

8 files changed

+713
-13
lines changed

8 files changed

+713
-13
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hdfs.server.federation.async;
19+
20+
import org.apache.hadoop.fs.StorageType;
21+
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
22+
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
23+
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
24+
import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod;
25+
import org.apache.hadoop.hdfs.server.federation.router.RouterNamenodeProtocol;
26+
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient;
27+
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
28+
import org.apache.hadoop.hdfs.server.federation.router.async.AsyncApplyFunction;
29+
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
30+
import org.apache.hadoop.hdfs.server.namenode.NameNode;
31+
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
32+
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
33+
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
34+
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
35+
import java.io.IOException;
36+
import java.util.Map;
37+
38+
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply;
39+
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncComplete;
40+
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;
41+
42+
/**
43+
* Module that implements all the asynchronous RPC calls in {@link NamenodeProtocol} in the
44+
* {@link RouterRpcServer}.
45+
*/
46+
public class RouterAsyncNamenodeProtocol extends RouterNamenodeProtocol {
47+
48+
/** RPC server to receive client calls. */
49+
private final RouterRpcServer rpcServer;
50+
/** RPC clients to connect to the Namenodes. */
51+
private final RouterRpcClient rpcClient;
52+
53+
public RouterAsyncNamenodeProtocol(RouterRpcServer server) {
54+
super(server);
55+
this.rpcServer = server;
56+
this.rpcClient = this.rpcServer.getRPCClient();
57+
}
58+
59+
/**
60+
* Asynchronously get a list of blocks belonging to <code>datanode</code>
61+
* whose total size equals <code>size</code>.
62+
*
63+
* @see org.apache.hadoop.hdfs.server.balancer.Balancer
64+
* @param datanode a data node
65+
* @param size requested size
66+
* @param minBlockSize each block should be of this minimum Block Size
67+
* @param hotBlockTimeInterval prefer to get blocks which are belong to
68+
* the cold files accessed before the time interval
69+
* @param storageType the given storage type {@link StorageType}
70+
* @return BlocksWithLocations a list of blocks &amp; their locations
71+
* @throws IOException if size is less than or equal to 0 or
72+
datanode does not exist
73+
*/
74+
@Override
75+
public BlocksWithLocations getBlocks(
76+
DatanodeInfo datanode, long size,
77+
long minBlockSize, long hotBlockTimeInterval, StorageType storageType) throws IOException {
78+
rpcServer.checkOperation(NameNode.OperationCategory.READ);
79+
80+
// Get the namespace where the datanode is located
81+
rpcServer.getDatanodeStorageReportMapAsync(HdfsConstants.DatanodeReportType.ALL);
82+
asyncApply((AsyncApplyFunction<Map<String, DatanodeStorageReport[]>, Object>) map -> {
83+
String nsId = null;
84+
for (Map.Entry<String, DatanodeStorageReport[]> entry : map.entrySet()) {
85+
DatanodeStorageReport[] dns = entry.getValue();
86+
for (DatanodeStorageReport dn : dns) {
87+
DatanodeInfo dnInfo = dn.getDatanodeInfo();
88+
if (dnInfo.getDatanodeUuid().equals(datanode.getDatanodeUuid())) {
89+
nsId = entry.getKey();
90+
break;
91+
}
92+
}
93+
// Break the loop if already found
94+
if (nsId != null) {
95+
break;
96+
}
97+
}
98+
// Forward to the proper namenode
99+
if (nsId != null) {
100+
RemoteMethod method = new RemoteMethod(
101+
NamenodeProtocol.class, "getBlocks", new Class<?>[]
102+
{DatanodeInfo.class, long.class, long.class, long.class, StorageType.class},
103+
datanode, size, minBlockSize, hotBlockTimeInterval, storageType);
104+
rpcClient.invokeSingle(nsId, method, BlocksWithLocations.class);
105+
} else {
106+
asyncComplete(null);
107+
}
108+
});
109+
return asyncReturn(BlocksWithLocations.class);
110+
}
111+
112+
/**
113+
* Asynchronously get the current block keys.
114+
*
115+
* @return ExportedBlockKeys containing current block keys
116+
* @throws IOException if there is no namespace available or other ioExceptions.
117+
*/
118+
@Override
119+
public ExportedBlockKeys getBlockKeys() throws IOException {
120+
rpcServer.checkOperation(NameNode.OperationCategory.READ);
121+
122+
RemoteMethod method =
123+
new RemoteMethod(NamenodeProtocol.class, "getBlockKeys");
124+
rpcServer.invokeAtAvailableNsAsync(method, ExportedBlockKeys.class);
125+
return asyncReturn(ExportedBlockKeys.class);
126+
}
127+
128+
/**
129+
* Asynchronously get the most recent transaction ID.
130+
*
131+
* @return The most recent transaction ID that has been synced to
132+
* persistent storage, or applied from persistent storage in the
133+
* case of a non-active node.
134+
* @throws IOException if there is no namespace available or other ioExceptions.
135+
*/
136+
@Override
137+
public long getTransactionID() throws IOException {
138+
rpcServer.checkOperation(NameNode.OperationCategory.READ);
139+
140+
RemoteMethod method =
141+
new RemoteMethod(NamenodeProtocol.class, "getTransactionID");
142+
rpcServer.invokeAtAvailableNsAsync(method, long.class);
143+
return asyncReturn(Long.class);
144+
}
145+
146+
/**
147+
* Asynchronously get the transaction ID of the most recent checkpoint.
148+
*
149+
* @return The transaction ID of the most recent checkpoint.
150+
* @throws IOException if there is no namespace available or other ioExceptions.
151+
*/
152+
@Override
153+
public long getMostRecentCheckpointTxId() throws IOException {
154+
rpcServer.checkOperation(NameNode.OperationCategory.READ);
155+
156+
RemoteMethod method =
157+
new RemoteMethod(NamenodeProtocol.class, "getMostRecentCheckpointTxId");
158+
rpcServer.invokeAtAvailableNsAsync(method, long.class);
159+
return asyncReturn(Long.class);
160+
}
161+
162+
/**
163+
* Asynchronously get the transaction ID of the most recent checkpoint
164+
* for the given NameNodeFile.
165+
*
166+
* @return The transaction ID of the most recent checkpoint
167+
* for the given NameNodeFile.
168+
* @throws IOException if there is no namespace available or other ioExceptions.
169+
*/
170+
@Override
171+
public long getMostRecentNameNodeFileTxId(NNStorage.NameNodeFile nnf)
172+
throws IOException {
173+
rpcServer.checkOperation(NameNode.OperationCategory.READ);
174+
175+
RemoteMethod method =
176+
new RemoteMethod(NamenodeProtocol.class, "getMostRecentNameNodeFileTxId",
177+
new Class<?>[] {NNStorage.NameNodeFile.class}, nnf);
178+
rpcServer.invokeAtAvailableNsAsync(method, long.class);
179+
return asyncReturn(Long.class);
180+
}
181+
182+
/**
183+
* Asynchronously request name-node version and storage information.
184+
*
185+
* @return {@link NamespaceInfo} identifying versions and storage information
186+
* of the name-node.
187+
* @throws IOException if there is no namespace available or other ioExceptions.
188+
*/
189+
@Override
190+
public NamespaceInfo versionRequest() throws IOException {
191+
rpcServer.checkOperation(NameNode.OperationCategory.READ);
192+
193+
RemoteMethod method =
194+
new RemoteMethod(NamenodeProtocol.class, "versionRequest");
195+
rpcServer.invokeAtAvailableNsAsync(method, NamespaceInfo.class);
196+
return asyncReturn(NamespaceInfo.class);
197+
}
198+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hdfs.server.federation.async;
19+
20+
import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod;
21+
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient;
22+
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
23+
import org.apache.hadoop.hdfs.server.federation.router.RouterUserProtocol;
24+
25+
import java.io.IOException;
26+
import java.util.Map;
27+
import java.util.Set;
28+
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
29+
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
30+
import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction;
31+
import org.apache.hadoop.hdfs.server.namenode.NameNode;
32+
import org.apache.hadoop.security.Groups;
33+
import org.apache.hadoop.security.RefreshUserMappingsProtocol;
34+
import org.apache.hadoop.security.UserGroupInformation;
35+
import org.apache.hadoop.security.authorize.ProxyUsers;
36+
import org.apache.hadoop.tools.GetUserMappingsProtocol;
37+
import org.slf4j.Logger;
38+
import org.slf4j.LoggerFactory;
39+
40+
import static org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer.merge;
41+
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply;
42+
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncComplete;
43+
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;
44+
45+
/**
46+
* Module that implements all the asynchronous RPC calls in
47+
* {@link RefreshUserMappingsProtocol} {@link GetUserMappingsProtocol} in the
48+
* {@link RouterRpcServer}.
49+
*/
50+
public class RouterAsyncUserProtocol extends RouterUserProtocol {
51+
private static final Logger LOG =
52+
LoggerFactory.getLogger(RouterAsyncUserProtocol.class);
53+
54+
/** RPC server to receive client calls. */
55+
private final RouterRpcServer rpcServer;
56+
/** RPC clients to connect to the Namenodes. */
57+
private final RouterRpcClient rpcClient;
58+
59+
private final ActiveNamenodeResolver namenodeResolver;
60+
61+
public RouterAsyncUserProtocol(RouterRpcServer server) {
62+
super(server);
63+
this.rpcServer = server;
64+
this.rpcClient = this.rpcServer.getRPCClient();
65+
this.namenodeResolver = this.rpcServer.getNamenodeResolver();
66+
}
67+
68+
/**
69+
* Asynchronously refresh user to group mappings.
70+
* @throws IOException raised on errors performing I/O.
71+
*/
72+
@Override
73+
public void refreshUserToGroupsMappings() throws IOException {
74+
LOG.debug("Refresh user groups mapping in Router.");
75+
rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED);
76+
Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
77+
if (nss.isEmpty()) {
78+
Groups.getUserToGroupsMappingService().refresh();
79+
asyncComplete(null);
80+
} else {
81+
RemoteMethod method = new RemoteMethod(RefreshUserMappingsProtocol.class,
82+
"refreshUserToGroupsMappings");
83+
rpcClient.invokeConcurrent(nss, method);
84+
}
85+
}
86+
87+
/**
88+
* Asynchronously refresh superuser proxy group list.
89+
* @throws IOException raised on errors performing I/O.
90+
*/
91+
@Override
92+
public void refreshSuperUserGroupsConfiguration() throws IOException {
93+
LOG.debug("Refresh superuser groups configuration in Router.");
94+
rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED);
95+
Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
96+
if (nss.isEmpty()) {
97+
ProxyUsers.refreshSuperUserGroupsConfiguration();
98+
asyncComplete(null);
99+
} else {
100+
RemoteMethod method = new RemoteMethod(RefreshUserMappingsProtocol.class,
101+
"refreshSuperUserGroupsConfiguration");
102+
rpcClient.invokeConcurrent(nss, method);
103+
}
104+
}
105+
106+
/**
107+
* Asynchronously get the groups which are mapped to the given user.
108+
* @param user The user to get the groups for.
109+
* @return The set of groups the user belongs to.
110+
* @throws IOException raised on errors performing I/O.
111+
*/
112+
@Override
113+
public String[] getGroupsForUser(String user) throws IOException {
114+
LOG.debug("Getting groups for user {}", user);
115+
rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED);
116+
Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
117+
if (nss.isEmpty()) {
118+
asyncComplete(UserGroupInformation.createRemoteUser(user)
119+
.getGroupNames());
120+
} else {
121+
RemoteMethod method = new RemoteMethod(GetUserMappingsProtocol.class,
122+
"getGroupsForUser", new Class<?>[] {String.class}, user);
123+
rpcClient.invokeConcurrent(nss, method, String[].class);
124+
asyncApply((ApplyFunction<Map<FederationNamespaceInfo, String[]>, String[]>)
125+
results -> merge(results, String.class));
126+
}
127+
return asyncReturn(String[].class);
128+
}
129+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
/**
20+
* This package contains classes that facilitate asynchronous operations within the Hadoop
21+
* Distributed File System (HDFS) Federation router. These classes are designed to work with
22+
* the Hadoop ecosystem, providing utilities and interfaces to perform non-blocking tasks that
23+
* can improve the performance and responsiveness of HDFS operations.
24+
*/
25+
@InterfaceAudience.Private
26+
@InterfaceStability.Evolving
27+
28+
package org.apache.hadoop.hdfs.server.federation.async;
29+
30+
import org.apache.hadoop.classification.InterfaceAudience;
31+
import org.apache.hadoop.classification.InterfaceStability;

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT;
2222
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY;
2323
import static org.apache.hadoop.hdfs.server.federation.fairness.RefreshFairnessPolicyControllerHandler.HANDLER_IDENTIFIER;
24+
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn;
2425

2526
import java.io.IOException;
2627
import java.net.InetSocketAddress;
@@ -627,12 +628,15 @@ private List<String> getDestinationNameServices(
627628
Map<RemoteLocation, HdfsFileStatus> responses =
628629
rpcClient.invokeConcurrent(
629630
locations, method, false, false, HdfsFileStatus.class);
631+
if (rpcServer.isAsync()) {
632+
responses = syncReturn(Map.class);
633+
}
630634
for (RemoteLocation location : locations) {
631635
if (responses.get(location) != null) {
632636
nsIds.add(location.getNameserviceId());
633637
}
634638
}
635-
} catch (IOException ioe) {
639+
} catch (Exception ioe) {
636640
LOG.error("Cannot get location for {}: {}",
637641
src, ioe.getMessage());
638642
}

0 commit comments

Comments
 (0)