Skip to content

Commit

Permalink
HDFS-13522: Add federated nameservices states to client protocol and …
Browse files Browse the repository at this point in the history
…propagate it between routers and clients.

Fixes apache#4311

Signed-off-by: Owen O'Malley <oomalley@linkedin.com>
  • Loading branch information
simbadzina authored and lgh committed Apr 2, 2024
1 parent 3a1f8c0 commit c927023
Show file tree
Hide file tree
Showing 14 changed files with 426 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -814,6 +814,9 @@ public static class Call implements Schedulable,
// the priority level assigned by scheduler, 0 by default
private long clientStateId;
private boolean isCallCoordinated;
// Serialized RouterFederatedStateProto message to
// store last seen states for multiple namespaces.
private ByteString federatedNamespaceState;

Call() {
this(RpcConstants.INVALID_CALL_ID, RpcConstants.INVALID_RETRY_COUNT,
Expand Down Expand Up @@ -871,6 +874,14 @@ public ProcessingDetails getProcessingDetails() {
return processingDetails;
}

public void setFederatedNamespaceState(ByteString federatedNamespaceState) {
this.federatedNamespaceState = federatedNamespaceState;
}

public ByteString getFederatedNamespaceState() {
return this.federatedNamespaceState;
}

@Override
public String toString() {
return "Call#" + callId + " Retry#" + retryCount;
Expand Down Expand Up @@ -2898,6 +2909,9 @@ private void processRpcRequest(RpcRequestHeaderProto header,
stateId = alignmentContext.receiveRequestState(
header, getMaxIdleTime());
call.setClientStateId(stateId);
if (header.hasRouterFederatedState()) {
call.setFederatedNamespaceState(header.getRouterFederatedState());
}
}
} catch (IOException ioe) {
throw new RpcServerException("Processing RPC request caught ", ioe);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import java.io.IOException;
import java.util.concurrent.atomic.LongAccumulator;
import org.apache.hadoop.thirdparty.protobuf.ByteString;

/**
* Global State Id context for the client.
Expand All @@ -37,8 +38,17 @@
@InterfaceStability.Evolving
public class ClientGSIContext implements AlignmentContext {

private final LongAccumulator lastSeenStateId =
new LongAccumulator(Math::max, Long.MIN_VALUE);
private final LongAccumulator lastSeenStateId;
private ByteString routerFederatedState;

public ClientGSIContext() {
this(new LongAccumulator(Math::max, Long.MIN_VALUE));
}

public ClientGSIContext(LongAccumulator lastSeenStateId) {
this.lastSeenStateId = lastSeenStateId;
routerFederatedState = null;
}

@Override
public long getLastSeenStateId() {
Expand All @@ -65,16 +75,25 @@ public void updateResponseState(RpcResponseHeaderProto.Builder header) {
* in responses.
*/
@Override
public void receiveResponseState(RpcResponseHeaderProto header) {
lastSeenStateId.accumulate(header.getStateId());
public synchronized void receiveResponseState(RpcResponseHeaderProto header) {
if (header.hasRouterFederatedState()) {
routerFederatedState = header.getRouterFederatedState();
} else {
lastSeenStateId.accumulate(header.getStateId());
}
}

/**
* Client side implementation for providing state alignment info in requests.
*/
@Override
public void updateRequestState(RpcRequestHeaderProto.Builder header) {
header.setStateId(lastSeenStateId.longValue());
public synchronized void updateRequestState(RpcRequestHeaderProto.Builder header) {
if (lastSeenStateId.get() != Long.MIN_VALUE) {
header.setStateId(lastSeenStateId.get());
}
if (routerFederatedState != null) {
header.setRouterFederatedState(routerFederatedState);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,9 @@ public static ClientProtocol createProxyWithAlignmentContext(
boolean withRetries, AtomicBoolean fallbackToSimpleAuth,
AlignmentContext alignmentContext)
throws IOException {
if (alignmentContext == null) {
alignmentContext = new ClientGSIContext();
}
RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class,
ProtobufRpcEngine2.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,14 @@ public class ConnectionManager {

/** Queue for creating new connections. */
private final BlockingQueue<ConnectionPool> creatorQueue;
/**
* Global federated namespace context for router.
*/
private final RouterStateIdContext routerStateIdContext;
/**
* Map from connection pool ID to namespace.
*/
private final Map<ConnectionPoolId, String> connectionPoolToNamespaceMap;
/** Max size of queue for creating new connections. */
private final int creatorQueueMaxSize;

Expand All @@ -85,15 +93,19 @@ public class ConnectionManager {
/** If the connection manager is running. */
private boolean running = false;

public ConnectionManager(Configuration config) {
this(config, new RouterStateIdContext(config));
}

/**
* Creates a proxy client connection pool manager.
*
* @param config Configuration for the connections.
*/
public ConnectionManager(Configuration config) {
public ConnectionManager(Configuration config, RouterStateIdContext routerStateIdContext) {
this.conf = config;

this.routerStateIdContext = routerStateIdContext;
this.connectionPoolToNamespaceMap = new HashMap<>();
// Configure minimum, maximum and active connection pools
this.maxSize = this.conf.getInt(
RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE,
Expand Down Expand Up @@ -160,6 +172,10 @@ public void close() {
pool.close();
}
this.pools.clear();
for (String nsID: connectionPoolToNamespaceMap.values()) {
routerStateIdContext.removeNamespaceStateId(nsID);
}
connectionPoolToNamespaceMap.clear();
} finally {
writeLock.unlock();
}
Expand All @@ -172,12 +188,12 @@ public void close() {
* @param ugi User group information.
* @param nnAddress Namenode address for the connection.
* @param protocol Protocol for the connection.
* @param nsId Nameservice identity.
* @return Proxy client to connect to nnId as UGI.
* @throws IOException If the connection cannot be obtained.
*/
public ConnectionContext getConnection(UserGroupInformation ugi,
String nnAddress, Class<?> protocol) throws IOException {

String nnAddress, Class<?> protocol, String nsId) throws IOException {
// Check if the manager is shutdown
if (!this.running) {
LOG.error(
Expand Down Expand Up @@ -205,9 +221,13 @@ public ConnectionContext getConnection(UserGroupInformation ugi,
if (pool == null) {
pool = new ConnectionPool(
this.conf, nnAddress, ugi, this.minSize, this.maxSize,
this.minActiveRatio, protocol);
this.minActiveRatio, protocol,
new PoolAlignmentContext(this.routerStateIdContext, nsId));
this.pools.put(connectionId, pool);
this.connectionPoolToNamespaceMap.put(connectionId, nsId);
}
long clientStateId = RouterStateIdContext.getClientStateIdFromCurrentCall(nsId);
pool.getPoolAlignmentContext().advanceClientStateId(clientStateId);
} finally {
writeLock.unlock();
}
Expand Down Expand Up @@ -430,6 +450,11 @@ public void run() {
try {
for (ConnectionPoolId poolId : toRemove) {
pools.remove(poolId);
String nsID = connectionPoolToNamespaceMap.get(poolId);
connectionPoolToNamespaceMap.remove(poolId);
if (!connectionPoolToNamespaceMap.values().contains(nsID)) {
routerStateIdContext.removeNamespaceStateId(nsID);
}
}
} finally {
writeLock.unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import javax.net.SocketFactory;

import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.ipc.AlignmentContext;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -108,6 +109,8 @@ public class ConnectionPool {

/** Enable using multiple physical socket or not. **/
private final boolean enableMultiSocket;
/** StateID alignment context. */
private final PoolAlignmentContext alignmentContext;

/** Map for the protocols and their protobuf implementations. */
private final static Map<Class<?>, ProtoImpl> PROTO_MAP = new HashMap<>();
Expand Down Expand Up @@ -138,7 +141,8 @@ private static class ProtoImpl {

protected ConnectionPool(Configuration config, String address,
UserGroupInformation user, int minPoolSize, int maxPoolSize,
float minActiveRatio, Class<?> proto) throws IOException {
float minActiveRatio, Class<?> proto, PoolAlignmentContext alignmentContext)
throws IOException {

this.conf = config;

Expand All @@ -157,6 +161,8 @@ protected ConnectionPool(Configuration config, String address,
RBFConfigKeys.DFS_ROUTER_NAMENODE_ENABLE_MULTIPLE_SOCKET_KEY,
RBFConfigKeys.DFS_ROUTER_NAMENODE_ENABLE_MULTIPLE_SOCKET_DEFAULT);

this.alignmentContext = alignmentContext;

// Add minimum connections to the pool
for (int i = 0; i < this.minSize; i++) {
ConnectionContext newConnection = newConnection();
Expand Down Expand Up @@ -211,6 +217,14 @@ public AtomicInteger getClientIndex() {
return this.clientIndex;
}

/**
* Get the alignment context for this pool
* @return Alignment context
*/
public PoolAlignmentContext getPoolAlignmentContext() {
return this.alignmentContext;
}

/**
* Return the next connection round-robin.
*
Expand Down Expand Up @@ -398,7 +412,7 @@ public String getJSON() {
public ConnectionContext newConnection() throws IOException {
return newConnection(this.conf, this.namenodeAddress,
this.ugi, this.protocol, this.enableMultiSocket,
this.socketIndex.incrementAndGet());
this.socketIndex.incrementAndGet(), alignmentContext);
}

/**
Expand All @@ -413,13 +427,15 @@ public ConnectionContext newConnection() throws IOException {
* @param ugi User context.
* @param proto Interface of the protocol.
* @param enableMultiSocket Enable multiple socket or not.
* @param alignmentContext Client alignment context.
* @return proto for the target ClientProtocol that contains the user's
* security context.
* @throws IOException If it cannot be created.
*/
protected static <T> ConnectionContext newConnection(Configuration conf,
String nnAddress, UserGroupInformation ugi, Class<T> proto,
boolean enableMultiSocket, int socketIndex) throws IOException {
boolean enableMultiSocket, int socketIndex,
AlignmentContext alignmentContext) throws IOException {
if (!PROTO_MAP.containsKey(proto)) {
String msg = "Unsupported protocol for connection to NameNode: "
+ ((proto != null) ? proto.getName() : "null");
Expand Down Expand Up @@ -448,10 +464,11 @@ protected static <T> ConnectionContext newConnection(Configuration conf,
socket, classes.protoPb, ugi, RPC.getRpcTimeout(conf),
defaultPolicy, conf, socketIndex);
proxy = RPC.getProtocolProxy(classes.protoPb, version, connectionId,
conf, factory).getProxy();
conf, factory, alignmentContext).getProxy();
} else {
proxy = RPC.getProtocolProxy(classes.protoPb, version, socket, ugi,
conf, factory, RPC.getRpcTimeout(conf), defaultPolicy, null).getProxy();
conf, factory, RPC.getRpcTimeout(conf), defaultPolicy, null,
alignmentContext).getProxy();
}

T client = newProtoClient(proto, classes, proxy);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdfs.server.federation.router;

import java.io.IOException;
import java.util.concurrent.atomic.LongAccumulator;
import org.apache.hadoop.ipc.AlignmentContext;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;


/**
* An alignment context shared by all connections in a {@link ConnectionPool}.
* There is a distinct connection pool for each [namespace,UGI] pairing.
* <p>
* {@link #sharedGlobalStateId} is a reference to a
* shared {@link LongAccumulator} object in the {@link RouterStateIdContext}.
* {@link #poolLocalStateId} is specific to each PoolAlignmentContext.
* <p>
* The shared {@link #sharedGlobalStateId} is updated only using
* responses from NameNodes, so clients cannot poison it.
* {@link #poolLocalStateId} is used to propagate client observed
* state into NameNode requests. A misbehaving client can poison this but the effect is only
* visible to other clients with the same UGI and accessing the same namespace.
*/
public class PoolAlignmentContext implements AlignmentContext {
private LongAccumulator sharedGlobalStateId;
private LongAccumulator poolLocalStateId;

PoolAlignmentContext(RouterStateIdContext routerStateIdContext, String namespaceId) {
sharedGlobalStateId = routerStateIdContext.getNamespaceStateId(namespaceId);
poolLocalStateId = new LongAccumulator(Math::max, Long.MIN_VALUE);
}

/**
* Client side implementation only receives state alignment info.
* It does not provide state alignment info therefore this does nothing.
*/
@Override
public void updateResponseState(RpcHeaderProtos.RpcResponseHeaderProto.Builder header) {
// Do nothing.
}

/**
* Router updates a globally shared value using response from
* namenodes.
*/
@Override
public void receiveResponseState(RpcHeaderProtos.RpcResponseHeaderProto header) {
sharedGlobalStateId.accumulate(header.getStateId());
}

/**
* Client side implementation for routers to provide state info in requests to
* namenodes.
*/
@Override
public void updateRequestState(RpcHeaderProtos.RpcRequestHeaderProto.Builder header) {
long maxStateId = Long.max(poolLocalStateId.get(), sharedGlobalStateId.get());
header.setStateId(maxStateId);
}

/**
* Client side implementation only provides state alignment info in requests.
* Client does not receive RPC requests therefore this does nothing.
*/
@Override
public long receiveRequestState(RpcHeaderProtos.RpcRequestHeaderProto header, long threshold)
throws IOException {
// Do nothing.
return 0;
}

@Override
public long getLastSeenStateId() {
return sharedGlobalStateId.get();
}

@Override
public boolean isCoordinatedCall(String protocolName, String method) {
throw new UnsupportedOperationException(
"Client should not be checking uncoordinated call");
}

public void advanceClientStateId(Long clientStateId) {
poolLocalStateId.accumulate(clientStateId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
FEDERATION_STORE_PREFIX + "enable";
public static final boolean DFS_ROUTER_STORE_ENABLE_DEFAULT = true;

public static final String DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE =
FEDERATION_ROUTER_PREFIX + "observer.federated.state.propagation.maxsize";
public static final int DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE_DEFAULT = 5;

public static final String FEDERATION_STORE_SERIALIZER_CLASS =
FEDERATION_STORE_PREFIX + "serializer";
public static final Class<StateStoreSerializerPBImpl>
Expand Down
Loading

0 comments on commit c927023

Please sign in to comment.