Skip to content

HDFS-17656. [ARR] RouterNamenodeProtocol and RouterUserProtocol supports asynchronous rpc. #7159

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.async;

import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod;
import org.apache.hadoop.hdfs.server.federation.router.RouterNamenodeProtocol;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
import org.apache.hadoop.hdfs.server.federation.router.async.AsyncApplyFunction;
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import java.io.IOException;
import java.util.Map;

import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply;
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncComplete;
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;

/**
* Module that implements all the asynchronous RPC calls in {@link NamenodeProtocol} in the
* {@link RouterRpcServer}.
*/
public class RouterAsyncNamenodeProtocol extends RouterNamenodeProtocol {

/** RPC server to receive client calls. */
private final RouterRpcServer rpcServer;
/** RPC clients to connect to the Namenodes. */
private final RouterRpcClient rpcClient;

public RouterAsyncNamenodeProtocol(RouterRpcServer server) {
super(server);
this.rpcServer = server;
this.rpcClient = this.rpcServer.getRPCClient();
}

/**
* Asynchronously get a list of blocks belonging to <code>datanode</code>
* whose total size equals <code>size</code>.
*
* @see org.apache.hadoop.hdfs.server.balancer.Balancer
* @param datanode a data node
* @param size requested size
* @param minBlockSize each block should be of this minimum Block Size
* @param hotBlockTimeInterval prefer to get blocks which are belong to
* the cold files accessed before the time interval
* @param storageType the given storage type {@link StorageType}
* @return BlocksWithLocations a list of blocks &amp; their locations
* @throws IOException if size is less than or equal to 0 or
datanode does not exist
*/
@Override
public BlocksWithLocations getBlocks(
DatanodeInfo datanode, long size,
long minBlockSize, long hotBlockTimeInterval, StorageType storageType) throws IOException {
rpcServer.checkOperation(NameNode.OperationCategory.READ);

// Get the namespace where the datanode is located
rpcServer.getDatanodeStorageReportMapAsync(HdfsConstants.DatanodeReportType.ALL);
asyncApply((AsyncApplyFunction<Map<String, DatanodeStorageReport[]>, Object>) map -> {
String nsId = null;
for (Map.Entry<String, DatanodeStorageReport[]> entry : map.entrySet()) {
DatanodeStorageReport[] dns = entry.getValue();
for (DatanodeStorageReport dn : dns) {
DatanodeInfo dnInfo = dn.getDatanodeInfo();
if (dnInfo.getDatanodeUuid().equals(datanode.getDatanodeUuid())) {
nsId = entry.getKey();
break;
}
}
// Break the loop if already found
if (nsId != null) {
break;
}
}
// Forward to the proper namenode
if (nsId != null) {
RemoteMethod method = new RemoteMethod(
NamenodeProtocol.class, "getBlocks", new Class<?>[]
{DatanodeInfo.class, long.class, long.class, long.class, StorageType.class},
datanode, size, minBlockSize, hotBlockTimeInterval, storageType);
rpcClient.invokeSingle(nsId, method, BlocksWithLocations.class);
} else {
asyncComplete(null);
}
});
return asyncReturn(BlocksWithLocations.class);
}

/**
* Asynchronously get the current block keys.
*
* @return ExportedBlockKeys containing current block keys
* @throws IOException if there is no namespace available or other ioExceptions.
*/
@Override
public ExportedBlockKeys getBlockKeys() throws IOException {
rpcServer.checkOperation(NameNode.OperationCategory.READ);

RemoteMethod method =
new RemoteMethod(NamenodeProtocol.class, "getBlockKeys");
rpcServer.invokeAtAvailableNsAsync(method, ExportedBlockKeys.class);
return asyncReturn(ExportedBlockKeys.class);
}

/**
* Asynchronously get the most recent transaction ID.
*
* @return The most recent transaction ID that has been synced to
* persistent storage, or applied from persistent storage in the
* case of a non-active node.
* @throws IOException if there is no namespace available or other ioExceptions.
*/
@Override
public long getTransactionID() throws IOException {
rpcServer.checkOperation(NameNode.OperationCategory.READ);

RemoteMethod method =
new RemoteMethod(NamenodeProtocol.class, "getTransactionID");
rpcServer.invokeAtAvailableNsAsync(method, long.class);
return asyncReturn(Long.class);
}

/**
* Asynchronously get the transaction ID of the most recent checkpoint.
*
* @return The transaction ID of the most recent checkpoint.
* @throws IOException if there is no namespace available or other ioExceptions.
*/
@Override
public long getMostRecentCheckpointTxId() throws IOException {
rpcServer.checkOperation(NameNode.OperationCategory.READ);

RemoteMethod method =
new RemoteMethod(NamenodeProtocol.class, "getMostRecentCheckpointTxId");
rpcServer.invokeAtAvailableNsAsync(method, long.class);
return asyncReturn(Long.class);
}

/**
* Asynchronously get the transaction ID of the most recent checkpoint
* for the given NameNodeFile.
*
* @return The transaction ID of the most recent checkpoint
* for the given NameNodeFile.
* @throws IOException if there is no namespace available or other ioExceptions.
*/
@Override
public long getMostRecentNameNodeFileTxId(NNStorage.NameNodeFile nnf)
throws IOException {
rpcServer.checkOperation(NameNode.OperationCategory.READ);

RemoteMethod method =
new RemoteMethod(NamenodeProtocol.class, "getMostRecentNameNodeFileTxId",
new Class<?>[] {NNStorage.NameNodeFile.class}, nnf);
rpcServer.invokeAtAvailableNsAsync(method, long.class);
return asyncReturn(Long.class);
}

/**
* Asynchronously request name-node version and storage information.
*
* @return {@link NamespaceInfo} identifying versions and storage information
* of the name-node.
* @throws IOException if there is no namespace available or other ioExceptions.
*/
@Override
public NamespaceInfo versionRequest() throws IOException {
rpcServer.checkOperation(NameNode.OperationCategory.READ);

RemoteMethod method =
new RemoteMethod(NamenodeProtocol.class, "versionRequest");
rpcServer.invokeAtAvailableNsAsync(method, NamespaceInfo.class);
return asyncReturn(NamespaceInfo.class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.async;

import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
import org.apache.hadoop.hdfs.server.federation.router.RouterUserProtocol;

import java.io.IOException;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.security.Groups;
import org.apache.hadoop.security.RefreshUserMappingsProtocol;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.tools.GetUserMappingsProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer.merge;
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply;
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncComplete;
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;

/**
* Module that implements all the asynchronous RPC calls in
* {@link RefreshUserMappingsProtocol} {@link GetUserMappingsProtocol} in the
* {@link RouterRpcServer}.
*/
public class RouterAsyncUserProtocol extends RouterUserProtocol {
private static final Logger LOG =
LoggerFactory.getLogger(RouterAsyncUserProtocol.class);

/** RPC server to receive client calls. */
private final RouterRpcServer rpcServer;
/** RPC clients to connect to the Namenodes. */
private final RouterRpcClient rpcClient;

private final ActiveNamenodeResolver namenodeResolver;

public RouterAsyncUserProtocol(RouterRpcServer server) {
super(server);
this.rpcServer = server;
this.rpcClient = this.rpcServer.getRPCClient();
this.namenodeResolver = this.rpcServer.getNamenodeResolver();
}

/**
* Asynchronously refresh user to group mappings.
* @throws IOException raised on errors performing I/O.
*/
@Override
public void refreshUserToGroupsMappings() throws IOException {
LOG.debug("Refresh user groups mapping in Router.");
rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED);
Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
if (nss.isEmpty()) {
Groups.getUserToGroupsMappingService().refresh();
asyncComplete(null);
} else {
RemoteMethod method = new RemoteMethod(RefreshUserMappingsProtocol.class,
"refreshUserToGroupsMappings");
rpcClient.invokeConcurrent(nss, method);
}
}

/**
* Asynchronously refresh superuser proxy group list.
* @throws IOException raised on errors performing I/O.
*/
@Override
public void refreshSuperUserGroupsConfiguration() throws IOException {
LOG.debug("Refresh superuser groups configuration in Router.");
rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED);
Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
if (nss.isEmpty()) {
ProxyUsers.refreshSuperUserGroupsConfiguration();
asyncComplete(null);
} else {
RemoteMethod method = new RemoteMethod(RefreshUserMappingsProtocol.class,
"refreshSuperUserGroupsConfiguration");
rpcClient.invokeConcurrent(nss, method);
}
}

/**
* Asynchronously get the groups which are mapped to the given user.
* @param user The user to get the groups for.
* @return The set of groups the user belongs to.
* @throws IOException raised on errors performing I/O.
*/
@Override
public String[] getGroupsForUser(String user) throws IOException {
LOG.debug("Getting groups for user {}", user);
rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED);
Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
if (nss.isEmpty()) {
asyncComplete(UserGroupInformation.createRemoteUser(user)
.getGroupNames());
} else {
RemoteMethod method = new RemoteMethod(GetUserMappingsProtocol.class,
"getGroupsForUser", new Class<?>[] {String.class}, user);
rpcClient.invokeConcurrent(nss, method, String[].class);
asyncApply((ApplyFunction<Map<FederationNamespaceInfo, String[]>, String[]>)
results -> merge(results, String.class));
}
return asyncReturn(String[].class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* This package contains classes that facilitate asynchronous operations within the Hadoop
* Distributed File System (HDFS) Federation router. These classes are designed to work with
* the Hadoop ecosystem, providing utilities and interfaces to perform non-blocking tasks that
* can improve the performance and responsiveness of HDFS operations.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving

package org.apache.hadoop.hdfs.server.federation.async;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY;
import static org.apache.hadoop.hdfs.server.federation.fairness.RefreshFairnessPolicyControllerHandler.HANDLER_IDENTIFIER;
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn;

import java.io.IOException;
import java.net.InetSocketAddress;
Expand Down Expand Up @@ -627,12 +628,15 @@ private List<String> getDestinationNameServices(
Map<RemoteLocation, HdfsFileStatus> responses =
rpcClient.invokeConcurrent(
locations, method, false, false, HdfsFileStatus.class);
if (rpcServer.isAsync()) {
responses = syncReturn(Map.class);
}
for (RemoteLocation location : locations) {
if (responses.get(location) != null) {
nsIds.add(location.getNameserviceId());
}
}
} catch (IOException ioe) {
} catch (Exception ioe) {
LOG.error("Cannot get location for {}: {}",
src, ioe.getMessage());
}
Expand Down
Loading
Loading