From c92702339650adcae84d95b8f6df4b47a5a6a01f Mon Sep 17 00:00:00 2001 From: Simbarashe Dzinamarira Date: Wed, 7 Sep 2022 10:54:41 -0700 Subject: [PATCH] HDFS-13522: Add federated nameservices states to client protocol and propagate it between routers and clients. Fixes #4311 Signed-off-by: Owen O'Malley --- .../java/org/apache/hadoop/ipc/Server.java | 14 ++ .../apache/hadoop/hdfs/ClientGSIContext.java | 31 +++- .../hadoop/hdfs/NameNodeProxiesClient.java | 3 + .../federation/router/ConnectionManager.java | 35 +++- .../federation/router/ConnectionPool.java | 27 ++- .../router/PoolAlignmentContext.java | 103 +++++++++++ .../federation/router/RBFConfigKeys.java | 4 + .../federation/router/RouterRpcClient.java | 9 +- .../federation/router/RouterRpcServer.java | 10 +- .../router/RouterStateIdContext.java | 168 ++++++++++++++++++ .../src/main/resources/hdfs-rbf-default.xml | 11 ++ .../federation/FederationTestUtils.java | 3 +- .../router/TestConnectionManager.java | 36 ++-- .../router/TestRouterFederatedState.java | 25 +-- 14 files changed, 426 insertions(+), 53 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PoolAlignmentContext.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 6e112209599114..bad8aa00b2a718 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -814,6 +814,9 @@ public static class Call implements Schedulable, // the priority level assigned by scheduler, 0 by default private long clientStateId; private boolean isCallCoordinated; + // Serialized RouterFederatedStateProto message to + // store last seen states for multiple namespaces. + private ByteString federatedNamespaceState; Call() { this(RpcConstants.INVALID_CALL_ID, RpcConstants.INVALID_RETRY_COUNT, @@ -871,6 +874,14 @@ public ProcessingDetails getProcessingDetails() { return processingDetails; } + public void setFederatedNamespaceState(ByteString federatedNamespaceState) { + this.federatedNamespaceState = federatedNamespaceState; + } + + public ByteString getFederatedNamespaceState() { + return this.federatedNamespaceState; + } + @Override public String toString() { return "Call#" + callId + " Retry#" + retryCount; @@ -2898,6 +2909,9 @@ private void processRpcRequest(RpcRequestHeaderProto header, stateId = alignmentContext.receiveRequestState( header, getMaxIdleTime()); call.setClientStateId(stateId); + if (header.hasRouterFederatedState()) { + call.setFederatedNamespaceState(header.getRouterFederatedState()); + } } } catch (IOException ioe) { throw new RpcServerException("Processing RPC request caught ", ioe); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java index 4de969642d5747..bcbb4b96c2aeb9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java @@ -26,6 +26,7 @@ import java.io.IOException; import java.util.concurrent.atomic.LongAccumulator; +import org.apache.hadoop.thirdparty.protobuf.ByteString; /** * Global State Id context for the client. @@ -37,8 +38,17 @@ @InterfaceStability.Evolving public class ClientGSIContext implements AlignmentContext { - private final LongAccumulator lastSeenStateId = - new LongAccumulator(Math::max, Long.MIN_VALUE); + private final LongAccumulator lastSeenStateId; + private ByteString routerFederatedState; + + public ClientGSIContext() { + this(new LongAccumulator(Math::max, Long.MIN_VALUE)); + } + + public ClientGSIContext(LongAccumulator lastSeenStateId) { + this.lastSeenStateId = lastSeenStateId; + routerFederatedState = null; + } @Override public long getLastSeenStateId() { @@ -65,16 +75,25 @@ public void updateResponseState(RpcResponseHeaderProto.Builder header) { * in responses. */ @Override - public void receiveResponseState(RpcResponseHeaderProto header) { - lastSeenStateId.accumulate(header.getStateId()); + public synchronized void receiveResponseState(RpcResponseHeaderProto header) { + if (header.hasRouterFederatedState()) { + routerFederatedState = header.getRouterFederatedState(); + } else { + lastSeenStateId.accumulate(header.getStateId()); + } } /** * Client side implementation for providing state alignment info in requests. */ @Override - public void updateRequestState(RpcRequestHeaderProto.Builder header) { - header.setStateId(lastSeenStateId.longValue()); + public synchronized void updateRequestState(RpcRequestHeaderProto.Builder header) { + if (lastSeenStateId.get() != Long.MIN_VALUE) { + header.setStateId(lastSeenStateId.get()); + } + if (routerFederatedState != null) { + header.setRouterFederatedState(routerFederatedState); + } } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java index 3725fc21590c57..967e0c0f111cdc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java @@ -349,6 +349,9 @@ public static ClientProtocol createProxyWithAlignmentContext( boolean withRetries, AtomicBoolean fallbackToSimpleAuth, AlignmentContext alignmentContext) throws IOException { + if (alignmentContext == null) { + alignmentContext = new ClientGSIContext(); + } RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine2.class); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java index 258dfbfab144bd..31abeacf715313 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java @@ -73,6 +73,14 @@ public class ConnectionManager { /** Queue for creating new connections. */ private final BlockingQueue creatorQueue; + /** + * Global federated namespace context for router. + */ + private final RouterStateIdContext routerStateIdContext; + /** + * Map from connection pool ID to namespace. + */ + private final Map connectionPoolToNamespaceMap; /** Max size of queue for creating new connections. */ private final int creatorQueueMaxSize; @@ -85,15 +93,19 @@ public class ConnectionManager { /** If the connection manager is running. */ private boolean running = false; + public ConnectionManager(Configuration config) { + this(config, new RouterStateIdContext(config)); + } /** * Creates a proxy client connection pool manager. * * @param config Configuration for the connections. */ - public ConnectionManager(Configuration config) { + public ConnectionManager(Configuration config, RouterStateIdContext routerStateIdContext) { this.conf = config; - + this.routerStateIdContext = routerStateIdContext; + this.connectionPoolToNamespaceMap = new HashMap<>(); // Configure minimum, maximum and active connection pools this.maxSize = this.conf.getInt( RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE, @@ -160,6 +172,10 @@ public void close() { pool.close(); } this.pools.clear(); + for (String nsID: connectionPoolToNamespaceMap.values()) { + routerStateIdContext.removeNamespaceStateId(nsID); + } + connectionPoolToNamespaceMap.clear(); } finally { writeLock.unlock(); } @@ -172,12 +188,12 @@ public void close() { * @param ugi User group information. * @param nnAddress Namenode address for the connection. * @param protocol Protocol for the connection. + * @param nsId Nameservice identity. * @return Proxy client to connect to nnId as UGI. * @throws IOException If the connection cannot be obtained. */ public ConnectionContext getConnection(UserGroupInformation ugi, - String nnAddress, Class protocol) throws IOException { - + String nnAddress, Class protocol, String nsId) throws IOException { // Check if the manager is shutdown if (!this.running) { LOG.error( @@ -205,9 +221,13 @@ public ConnectionContext getConnection(UserGroupInformation ugi, if (pool == null) { pool = new ConnectionPool( this.conf, nnAddress, ugi, this.minSize, this.maxSize, - this.minActiveRatio, protocol); + this.minActiveRatio, protocol, + new PoolAlignmentContext(this.routerStateIdContext, nsId)); this.pools.put(connectionId, pool); + this.connectionPoolToNamespaceMap.put(connectionId, nsId); } + long clientStateId = RouterStateIdContext.getClientStateIdFromCurrentCall(nsId); + pool.getPoolAlignmentContext().advanceClientStateId(clientStateId); } finally { writeLock.unlock(); } @@ -430,6 +450,11 @@ public void run() { try { for (ConnectionPoolId poolId : toRemove) { pools.remove(poolId); + String nsID = connectionPoolToNamespaceMap.get(poolId); + connectionPoolToNamespaceMap.remove(poolId); + if (!connectionPoolToNamespaceMap.values().contains(nsID)) { + routerStateIdContext.removeNamespaceStateId(nsID); + } } } finally { writeLock.unlock(); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java index 826a01eb7d85b2..9c20d13ea0f4ec 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java @@ -32,6 +32,7 @@ import javax.net.SocketFactory; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.ipc.AlignmentContext; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; @@ -108,6 +109,8 @@ public class ConnectionPool { /** Enable using multiple physical socket or not. **/ private final boolean enableMultiSocket; + /** StateID alignment context. */ + private final PoolAlignmentContext alignmentContext; /** Map for the protocols and their protobuf implementations. */ private final static Map, ProtoImpl> PROTO_MAP = new HashMap<>(); @@ -138,7 +141,8 @@ private static class ProtoImpl { protected ConnectionPool(Configuration config, String address, UserGroupInformation user, int minPoolSize, int maxPoolSize, - float minActiveRatio, Class proto) throws IOException { + float minActiveRatio, Class proto, PoolAlignmentContext alignmentContext) + throws IOException { this.conf = config; @@ -157,6 +161,8 @@ protected ConnectionPool(Configuration config, String address, RBFConfigKeys.DFS_ROUTER_NAMENODE_ENABLE_MULTIPLE_SOCKET_KEY, RBFConfigKeys.DFS_ROUTER_NAMENODE_ENABLE_MULTIPLE_SOCKET_DEFAULT); + this.alignmentContext = alignmentContext; + // Add minimum connections to the pool for (int i = 0; i < this.minSize; i++) { ConnectionContext newConnection = newConnection(); @@ -211,6 +217,14 @@ public AtomicInteger getClientIndex() { return this.clientIndex; } + /** + * Get the alignment context for this pool + * @return Alignment context + */ + public PoolAlignmentContext getPoolAlignmentContext() { + return this.alignmentContext; + } + /** * Return the next connection round-robin. * @@ -398,7 +412,7 @@ public String getJSON() { public ConnectionContext newConnection() throws IOException { return newConnection(this.conf, this.namenodeAddress, this.ugi, this.protocol, this.enableMultiSocket, - this.socketIndex.incrementAndGet()); + this.socketIndex.incrementAndGet(), alignmentContext); } /** @@ -413,13 +427,15 @@ public ConnectionContext newConnection() throws IOException { * @param ugi User context. * @param proto Interface of the protocol. * @param enableMultiSocket Enable multiple socket or not. + * @param alignmentContext Client alignment context. * @return proto for the target ClientProtocol that contains the user's * security context. * @throws IOException If it cannot be created. */ protected static ConnectionContext newConnection(Configuration conf, String nnAddress, UserGroupInformation ugi, Class proto, - boolean enableMultiSocket, int socketIndex) throws IOException { + boolean enableMultiSocket, int socketIndex, + AlignmentContext alignmentContext) throws IOException { if (!PROTO_MAP.containsKey(proto)) { String msg = "Unsupported protocol for connection to NameNode: " + ((proto != null) ? proto.getName() : "null"); @@ -448,10 +464,11 @@ protected static ConnectionContext newConnection(Configuration conf, socket, classes.protoPb, ugi, RPC.getRpcTimeout(conf), defaultPolicy, conf, socketIndex); proxy = RPC.getProtocolProxy(classes.protoPb, version, connectionId, - conf, factory).getProxy(); + conf, factory, alignmentContext).getProxy(); } else { proxy = RPC.getProtocolProxy(classes.protoPb, version, socket, ugi, - conf, factory, RPC.getRpcTimeout(conf), defaultPolicy, null).getProxy(); + conf, factory, RPC.getRpcTimeout(conf), defaultPolicy, null, + alignmentContext).getProxy(); } T client = newProtoClient(proto, classes, proxy); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PoolAlignmentContext.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PoolAlignmentContext.java new file mode 100644 index 00000000000000..571f41c4d542c3 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PoolAlignmentContext.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.federation.router; + +import java.io.IOException; +import java.util.concurrent.atomic.LongAccumulator; +import org.apache.hadoop.ipc.AlignmentContext; +import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos; + + +/** + * An alignment context shared by all connections in a {@link ConnectionPool}. + * There is a distinct connection pool for each [namespace,UGI] pairing. + *

+ * {@link #sharedGlobalStateId} is a reference to a + * shared {@link LongAccumulator} object in the {@link RouterStateIdContext}. + * {@link #poolLocalStateId} is specific to each PoolAlignmentContext. + *

+ * The shared {@link #sharedGlobalStateId} is updated only using + * responses from NameNodes, so clients cannot poison it. + * {@link #poolLocalStateId} is used to propagate client observed + * state into NameNode requests. A misbehaving client can poison this but the effect is only + * visible to other clients with the same UGI and accessing the same namespace. + */ +public class PoolAlignmentContext implements AlignmentContext { + private LongAccumulator sharedGlobalStateId; + private LongAccumulator poolLocalStateId; + + PoolAlignmentContext(RouterStateIdContext routerStateIdContext, String namespaceId) { + sharedGlobalStateId = routerStateIdContext.getNamespaceStateId(namespaceId); + poolLocalStateId = new LongAccumulator(Math::max, Long.MIN_VALUE); + } + + /** + * Client side implementation only receives state alignment info. + * It does not provide state alignment info therefore this does nothing. + */ + @Override + public void updateResponseState(RpcHeaderProtos.RpcResponseHeaderProto.Builder header) { + // Do nothing. + } + + /** + * Router updates a globally shared value using response from + * namenodes. + */ + @Override + public void receiveResponseState(RpcHeaderProtos.RpcResponseHeaderProto header) { + sharedGlobalStateId.accumulate(header.getStateId()); + } + + /** + * Client side implementation for routers to provide state info in requests to + * namenodes. + */ + @Override + public void updateRequestState(RpcHeaderProtos.RpcRequestHeaderProto.Builder header) { + long maxStateId = Long.max(poolLocalStateId.get(), sharedGlobalStateId.get()); + header.setStateId(maxStateId); + } + + /** + * Client side implementation only provides state alignment info in requests. + * Client does not receive RPC requests therefore this does nothing. + */ + @Override + public long receiveRequestState(RpcHeaderProtos.RpcRequestHeaderProto header, long threshold) + throws IOException { + // Do nothing. + return 0; + } + + @Override + public long getLastSeenStateId() { + return sharedGlobalStateId.get(); + } + + @Override + public boolean isCoordinatedCall(String protocolName, String method) { + throw new UnsupportedOperationException( + "Client should not be checking uncoordinated call"); + } + + public void advanceClientStateId(Long clientStateId) { + poolLocalStateId.accumulate(clientStateId); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java index 4eabcdaf5f2106..ed6d8103cc1c58 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java @@ -180,6 +180,10 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic { FEDERATION_STORE_PREFIX + "enable"; public static final boolean DFS_ROUTER_STORE_ENABLE_DEFAULT = true; + public static final String DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE = + FEDERATION_ROUTER_PREFIX + "observer.federated.state.propagation.maxsize"; + public static final int DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE_DEFAULT = 5; + public static final String FEDERATION_STORE_SERIALIZER_CLASS = FEDERATION_STORE_PREFIX + "serializer"; public static final Class diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index b007d791561504..247b5bd6b61239 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -168,7 +168,8 @@ public class RouterRpcClient { * @param monitor Optional performance monitor. */ public RouterRpcClient(Configuration conf, Router router, - ActiveNamenodeResolver resolver, RouterRpcMonitor monitor) { + ActiveNamenodeResolver resolver, RouterRpcMonitor monitor, + RouterStateIdContext routerStateIdContext) { this.router = router; this.namenodeResolver = resolver; @@ -177,7 +178,7 @@ public RouterRpcClient(Configuration conf, Router router, this.contextFieldSeparator = clientConf.get(HADOOP_CALLER_CONTEXT_SEPARATOR_KEY, HADOOP_CALLER_CONTEXT_SEPARATOR_DEFAULT); - this.connectionManager = new ConnectionManager(clientConf); + this.connectionManager = new ConnectionManager(clientConf, routerStateIdContext); this.connectionManager.start(); this.routerRpcFairnessPolicyController = FederationUtil.newFairnessPolicyController(conf); @@ -471,7 +472,7 @@ private ConnectionContext getConnection(UserGroupInformation ugi, String nsId, ugi.getUserName(), routerUser); } connection = this.connectionManager.getConnection( - connUGI, rpcAddress, proto); + connUGI, rpcAddress, proto, nsId); LOG.debug("User {} NN {} is using connection {}", ugi.getUserName(), rpcAddress, connection); } catch (Exception ex) { @@ -1872,7 +1873,7 @@ public Long getAcceptedPermitForNsUser(String nsUser) { } /** * Refreshes/changes the fairness policy controller implementation if possible - * and returns the controller class name + * and returns the controller class name. * @param conf Configuration * @return New controller class name if successfully refreshed, else old controller class name */ diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index c261315ad8c925..92f5dd8e9f4093 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -222,18 +222,18 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol, /** * Construct a router RPC server. * - * @param configuration HDFS Configuration. + * @param conf HDFS Configuration. * @param router A router using this RPC server. * @param nnResolver The NN resolver instance to determine active NNs in HA. * @param fileResolver File resolver to resolve file paths to subclusters. * @throws IOException If the RPC server could not be created. */ - public RouterRpcServer(Configuration configuration, Router router, + public RouterRpcServer(Configuration conf, Router router, ActiveNamenodeResolver nnResolver, FileSubclusterResolver fileResolver) throws IOException { super(RouterRpcServer.class.getName()); - this.conf = configuration; + this.conf = conf; this.router = router; this.namenodeResolver = nnResolver; this.subclusterResolver = fileResolver; @@ -291,6 +291,7 @@ public RouterRpcServer(Configuration configuration, Router router, // Create security manager this.securityManager = new RouterSecurityManager(this.conf); + RouterStateIdContext routerStateIdContext = new RouterStateIdContext(conf); this.rpcServer = new RPC.Builder(this.conf) .setProtocol(ClientNamenodeProtocolPB.class) @@ -301,6 +302,7 @@ public RouterRpcServer(Configuration configuration, Router router, .setnumReaders(readerCount) .setQueueSizePerHandler(handlerQueueSize) .setVerbose(false) + .setAlignmentContext(routerStateIdContext) .setSecretManager(this.securityManager.getSecretManager()) .build(); @@ -354,7 +356,7 @@ public RouterRpcServer(Configuration configuration, Router router, // Create the client this.rpcClient = new RouterRpcClient(this.conf, this.router, - this.namenodeResolver, this.rpcMonitor); + this.namenodeResolver, this.rpcMonitor, routerStateIdContext); // Initialize modules this.quotaCall = new Quota(this.router, this); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java new file mode 100644 index 00000000000000..9d2b75b0b552b6 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.federation.router; + +import java.lang.reflect.Method; +import java.util.Collections; +import java.util.HashSet; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.LongAccumulator; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.server.namenode.ha.ReadOnly; +import org.apache.hadoop.ipc.AlignmentContext; +import org.apache.hadoop.ipc.RetriableException; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto; +import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto; +import org.apache.hadoop.thirdparty.protobuf.ByteString; +import org.apache.hadoop.thirdparty.protobuf.InvalidProtocolBufferException; + + +/** + * This is the router implementation to hold the state Ids for all + * namespaces. This object is only updated by responses from NameNodes. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +class RouterStateIdContext implements AlignmentContext { + + private final HashSet coordinatedMethods; + /** + * Collection of last-seen namespace state Ids for a set of namespaces. + * Each value is globally shared by all outgoing connections to a particular namespace, + * so updates should only be performed using reliable responses from NameNodes. + */ + private final ConcurrentHashMap namespaceIdMap; + // Size limit for the map of state Ids to send to clients. + private final int maxSizeOfFederatedStateToPropagate; + + RouterStateIdContext(Configuration conf) { + this.coordinatedMethods = new HashSet<>(); + // For now, only ClientProtocol methods can be coordinated, so only checking + // against ClientProtocol. + for (Method method : ClientProtocol.class.getDeclaredMethods()) { + if (method.isAnnotationPresent(ReadOnly.class) + && method.getAnnotationsByType(ReadOnly.class)[0].isCoordinated()) { + coordinatedMethods.add(method.getName()); + } + } + + namespaceIdMap = new ConcurrentHashMap<>(); + + maxSizeOfFederatedStateToPropagate = + conf.getInt(RBFConfigKeys.DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE, + RBFConfigKeys.DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE_DEFAULT); + } + + /** + * Adds the {@link #namespaceIdMap} to the response header that will be sent to a client. + */ + public void setResponseHeaderState(RpcResponseHeaderProto.Builder headerBuilder) { + if (namespaceIdMap.isEmpty()) { + return; + } + HdfsServerFederationProtos.RouterFederatedStateProto.Builder federatedStateBuilder = + HdfsServerFederationProtos.RouterFederatedStateProto.newBuilder(); + namespaceIdMap.forEach((k, v) -> federatedStateBuilder.putNamespaceStateIds(k, v.get())); + headerBuilder.setRouterFederatedState(federatedStateBuilder.build().toByteString()); + } + + public LongAccumulator getNamespaceStateId(String nsId) { + return namespaceIdMap.computeIfAbsent(nsId, key -> new LongAccumulator(Math::max, Long.MIN_VALUE)); + } + + public void removeNamespaceStateId(String nsId) { + namespaceIdMap.remove(nsId); + } + + /** + * Utility function to parse routerFederatedState field in RPC headers. + */ + public static Map getRouterFederatedStateMap(ByteString byteString) { + if (byteString != null) { + HdfsServerFederationProtos.RouterFederatedStateProto federatedState; + try { + federatedState = HdfsServerFederationProtos.RouterFederatedStateProto.parseFrom(byteString); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + return federatedState.getNamespaceStateIdsMap(); + } else { + return Collections.emptyMap(); + } + } + + public static long getClientStateIdFromCurrentCall(String nsId) { + Long clientStateID = Long.MIN_VALUE; + Server.Call call = Server.getCurCall().get(); + if (call != null) { + ByteString callFederatedNamespaceState = call.getFederatedNamespaceState(); + if (callFederatedNamespaceState != null) { + Map clientFederatedStateIds = getRouterFederatedStateMap(callFederatedNamespaceState); + clientStateID = clientFederatedStateIds.getOrDefault(nsId, Long.MIN_VALUE); + } + } + return clientStateID; + } + + @Override + public void updateResponseState(RpcResponseHeaderProto.Builder header) { + if (namespaceIdMap.size() <= maxSizeOfFederatedStateToPropagate) { + setResponseHeaderState(header); + } + } + + @Override + public void receiveResponseState(RpcResponseHeaderProto header) { + // Do nothing. + } + + @Override + public void updateRequestState(RpcRequestHeaderProto.Builder header) { + // Do nothing. + } + + /** + * Routers do not update their state using information from clients + * to avoid clients interfering with one another. + */ + @Override + public long receiveRequestState(RpcRequestHeaderProto header, + long clientWaitTime) throws RetriableException { + // Do nothing. + return 0; + } + + @Override + public long getLastSeenStateId() { + return 0; + } + + @Override + public boolean isCoordinatedCall(String protocolName, String methodName) { + return protocolName.equals(ClientProtocol.class.getCanonicalName()) + && coordinatedMethods.contains(methodName); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml index 34cb12d01767de..e9250cddfb9fae 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml @@ -725,4 +725,15 @@ The maximum time to wait for a permit. + + + dfs.federation.router.observer.federated.state.propagation.maxsize + 5 + + The maximum size of the federated state to send in the RPC header. Sending the federated + state removes the need to msync on every read call, but at the expense of having a larger + header. The cost tradeoff between the larger header and always msync'ing depends on the number + of namespaces in use and the latency of the msync requests. + + diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java index 2017a45de1299f..31889fc5bd0d4d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java @@ -398,7 +398,8 @@ public Object answer(InvocationOnMock invocation) throws Throwable { throw new IOException("Simulate connectionManager throw IOException"); } }).when(spyConnectionManager).getConnection( - any(UserGroupInformation.class), any(String.class), any(Class.class)); + any(UserGroupInformation.class), any(String.class), any(Class.class), + any(String.class)); Whitebox.setInternalState(rpcClient, "connectionManager", spyConnectionManager); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java index f1cf6ab0d64b6c..bca542c34a115b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java @@ -81,15 +81,15 @@ public void shutdown() { public void testCleanup() throws Exception { Map poolMap = connManager.getPools(); - ConnectionPool pool1 = new ConnectionPool( - conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10, 0.5f, ClientProtocol.class); + ConnectionPool pool1 = new ConnectionPool(conf, TEST_NN_ADDRESS, TEST_USER1, + 0, 10, 0.5f, ClientProtocol.class, null); addConnectionsToPool(pool1, 9, 4); poolMap.put( new ConnectionPoolId(TEST_USER1, TEST_NN_ADDRESS, ClientProtocol.class), pool1); - ConnectionPool pool2 = new ConnectionPool( - conf, TEST_NN_ADDRESS, TEST_USER2, 0, 10, 0.5f, ClientProtocol.class); + ConnectionPool pool2 = new ConnectionPool(conf, TEST_NN_ADDRESS, TEST_USER2, + 0, 10, 0.5f, ClientProtocol.class, null); addConnectionsToPool(pool2, 10, 10); poolMap.put( new ConnectionPoolId(TEST_USER2, TEST_NN_ADDRESS, ClientProtocol.class), @@ -111,8 +111,8 @@ public void testCleanup() throws Exception { checkPoolConnections(TEST_USER2, 10, 10); // Make sure the number of connections doesn't go below minSize - ConnectionPool pool3 = new ConnectionPool( - conf, TEST_NN_ADDRESS, TEST_USER3, 2, 10, 0.5f, ClientProtocol.class); + ConnectionPool pool3 = new ConnectionPool(conf, TEST_NN_ADDRESS, TEST_USER3, + 2, 10, 0.5f, ClientProtocol.class, null); addConnectionsToPool(pool3, 8, 0); poolMap.put( new ConnectionPoolId(TEST_USER3, TEST_NN_ADDRESS, ClientProtocol.class), @@ -140,7 +140,7 @@ public void testGetConnectionWithConcurrency() throws Exception { ConnectionPool pool = new ConnectionPool( copyConf, TEST_NN_ADDRESS, TEST_USER1, 1, 10, 0.5f, - ClientProtocol.class); + ClientProtocol.class, null); poolMap.put( new ConnectionPoolId(TEST_USER1, TEST_NN_ADDRESS, ClientProtocol.class), pool); @@ -174,8 +174,8 @@ public void testGetConnectionWithConcurrency() throws Exception { public void testConnectionCreatorWithException() throws Exception { // Create a bad connection pool pointing to unresolvable namenode address. ConnectionPool badPool = new ConnectionPool( - conf, UNRESOLVED_TEST_NN_ADDRESS, TEST_USER1, 0, 10, 0.5f, - ClientProtocol.class); + conf, UNRESOLVED_TEST_NN_ADDRESS, TEST_USER1, 0, 10, 0.5f, + ClientProtocol.class, null); BlockingQueue queue = new ArrayBlockingQueue<>(1); queue.add(badPool); ConnectionManager.ConnectionCreator connectionCreator = @@ -201,7 +201,7 @@ public void testGetConnectionWithException() throws Exception { // Create a bad connection pool pointing to unresolvable namenode address. ConnectionPool badPool = new ConnectionPool( conf, UNRESOLVED_TEST_NN_ADDRESS, TEST_USER1, 1, 10, 0.5f, - ClientProtocol.class); + ClientProtocol.class, null); } @Test @@ -210,8 +210,8 @@ public void testGetConnection() throws Exception { final int totalConns = 10; int activeConns = 5; - ConnectionPool pool = new ConnectionPool( - conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10, 0.5f, ClientProtocol.class); + ConnectionPool pool = new ConnectionPool(conf, TEST_NN_ADDRESS, TEST_USER1, + 0, 10, 0.5f, ClientProtocol.class, null); addConnectionsToPool(pool, totalConns, activeConns); poolMap.put( new ConnectionPoolId(TEST_USER1, TEST_NN_ADDRESS, ClientProtocol.class), @@ -235,8 +235,8 @@ public void testGetConnection() throws Exception { @Test public void testValidClientIndex() throws Exception { - ConnectionPool pool = new ConnectionPool( - conf, TEST_NN_ADDRESS, TEST_USER1, 2, 2, 0.5f, ClientProtocol.class); + ConnectionPool pool = new ConnectionPool(conf, TEST_NN_ADDRESS, TEST_USER1, + 2, 2, 0.5f, ClientProtocol.class, null); for(int i = -3; i <= 3; i++) { pool.getClientIndex().set(i); ConnectionContext conn = pool.getConnection(); @@ -251,8 +251,8 @@ public void getGetConnectionNamenodeProtocol() throws Exception { final int totalConns = 10; int activeConns = 5; - ConnectionPool pool = new ConnectionPool( - conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10, 0.5f, NamenodeProtocol.class); + ConnectionPool pool = new ConnectionPool(conf, TEST_NN_ADDRESS, TEST_USER1, + 0, 10, 0.5f, NamenodeProtocol.class, null); addConnectionsToPool(pool, totalConns, activeConns); poolMap.put( new ConnectionPoolId( @@ -325,7 +325,7 @@ private void testConnectionCleanup(float ratio, int totalConns, // Create one new connection pool tmpConnManager.getConnection(TEST_USER1, TEST_NN_ADDRESS, - NamenodeProtocol.class); + NamenodeProtocol.class, "ns0"); Map poolMap = tmpConnManager.getPools(); ConnectionPoolId connectionPoolId = new ConnectionPoolId(TEST_USER1, @@ -356,6 +356,6 @@ public void testUnsupportedProtoExceptionMsg() throws Exception { "Unsupported protocol for connection to NameNode: " + TestConnectionManager.class.getName(), () -> ConnectionPool.newConnection(conf, TEST_NN_ADDRESS, TEST_USER1, - TestConnectionManager.class, false, 0)); + TestConnectionManager.class, false, 0, null)); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFederatedState.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFederatedState.java index 60ab4b2c0bc480..2bc8cfc21b2304 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFederatedState.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFederatedState.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hdfs.server.federation.router; -import java.io.IOException; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.ipc.AlignmentContext; @@ -38,16 +37,22 @@ public class TestRouterFederatedState { @Test public void testRpcRouterFederatedState() throws InvalidProtocolBufferException { byte[] uuid = ClientId.getClientId(); - Map expectedStateIds = new HashMap() {{ - put("namespace1", 11L ); - put("namespace2", 22L); - }}; + Map expectedStateIds = new HashMap() { + { + put("namespace1", 11L); + put("namespace2", 22L); + } + }; AlignmentContext alignmentContext = new AlignmentContextWithRouterState(expectedStateIds); RpcHeaderProtos.RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader( - RPC.RpcKind.RPC_PROTOCOL_BUFFER, RpcHeaderProtos.RpcRequestHeaderProto.OperationProto.RPC_FINAL_PACKET, 0, - RpcConstants.INVALID_RETRY_COUNT, uuid, alignmentContext); + RPC.RpcKind.RPC_PROTOCOL_BUFFER, + RpcHeaderProtos.RpcRequestHeaderProto.OperationProto.RPC_FINAL_PACKET, + 0, + RpcConstants.INVALID_RETRY_COUNT, + uuid, + alignmentContext); Map stateIdsFromHeader = RouterFederatedStateProto.parseFrom( @@ -59,9 +64,9 @@ public void testRpcRouterFederatedState() throws InvalidProtocolBufferException private static class AlignmentContextWithRouterState implements AlignmentContext { - Map routerFederatedState; + private Map routerFederatedState; - public AlignmentContextWithRouterState(Map namespaceStates) { + AlignmentContextWithRouterState(Map namespaceStates) { this.routerFederatedState = namespaceStates; } @@ -82,7 +87,7 @@ public void updateResponseState(RpcHeaderProtos.RpcResponseHeaderProto.Builder h public void receiveResponseState(RpcHeaderProtos.RpcResponseHeaderProto header) {} @Override - public long receiveRequestState(RpcHeaderProtos.RpcRequestHeaderProto header, long threshold) throws IOException { + public long receiveRequestState(RpcHeaderProtos.RpcRequestHeaderProto header, long threshold) { return 0; }