Skip to content

[Draft] HDFS-13522: Allow routers to use observer namenode without an msync on even read. [Not ready for review] #4523

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public interface AlignmentContext {
void updateResponseState(RpcResponseHeaderProto.Builder header);

/**
* This is the intended client method call to implement to recieve state info
* This is the intended client method call to implement to receive state info
* during RPC response processing.
*
* @param header The RPC response header.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ private RpcConstants() {


public static final int INVALID_RETRY_COUNT = -1;
// Special value to indicate client request header has nameserviceStateIds set.
public static final long REQUEST_HEADER_NAMESPACE_STATEIDS_SET = -2L;

/**
* The Rpc-connection header is as follows
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -928,7 +928,7 @@ public static class Call implements Schedulable,
private volatile String detailedMetricsName = "";
final int callId; // the client's call id
final int retryCount; // the retry count of the call
long timestampNanos; // time the call was received
private final long timestampNanos; // time the call was received
long responseTimestampNanos; // time the call was served
private AtomicInteger responseWaitCount = new AtomicInteger(1);
final RPC.RpcKind rpcKind;
Expand Down Expand Up @@ -1110,6 +1110,10 @@ public void setDeferredResponse(Writable response) {

public void setDeferredError(Throwable t) {
}

public long getTimestampNanos() {
return timestampNanos;
}
}

/** A RPC extended call queued for handling. */
Expand Down Expand Up @@ -1191,7 +1195,7 @@ public Void run() throws Exception {

try {
value = call(
rpcKind, connection.protocolName, rpcRequest, timestampNanos);
rpcKind, connection.protocolName, rpcRequest, getTimestampNanos());
} catch (Throwable e) {
populateResponseParamsOnError(e, responseParams);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ message RpcRequestHeaderProto { // the header for the RpcRequest
optional RPCTraceInfoProto traceInfo = 6; // tracing info
optional RPCCallerContextProto callerContext = 7; // call context
optional int64 stateId = 8; // The last seen Global State ID
map<string, int64> nameserviceStateIds = 9; // Last seen state IDs for multiple nameservices.
}


Expand Down Expand Up @@ -157,6 +158,7 @@ message RpcResponseHeaderProto {
optional bytes clientId = 7; // Globally unique client ID
optional sint32 retryCount = 8 [default = -1];
optional int64 stateId = 9; // The last written Global State ID
map<string, int64> nameserviceStateIds = 10; // Last seen state IDs for multiple nameservices.
}

message RpcSaslProto {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,20 @@
@InterfaceStability.Evolving
public class ClientGSIContext implements AlignmentContext {

private static Long STATEID_DEFAULT_VALUE = Long.MIN_VALUE;
private final LongAccumulator lastSeenStateId =
new LongAccumulator(Math::max, Long.MIN_VALUE);
new LongAccumulator(Math::max, STATEID_DEFAULT_VALUE);
private FederatedGSIContext federatedGSIContext = new FederatedGSIContext();

@Override
public long getLastSeenStateId() {
return lastSeenStateId.get();
}

public void updateLastSeenStateID(Long stateId) {
lastSeenStateId.accumulate(stateId);
}

@Override
public boolean isCoordinatedCall(String protocolName, String method) {
throw new UnsupportedOperationException(
Expand All @@ -66,6 +72,7 @@ public void updateResponseState(RpcResponseHeaderProto.Builder header) {
*/
@Override
public void receiveResponseState(RpcResponseHeaderProto header) {
federatedGSIContext.updateStateUsingResponseHeader(header);
lastSeenStateId.accumulate(header.getStateId());
}

Expand All @@ -74,7 +81,10 @@ public void receiveResponseState(RpcResponseHeaderProto header) {
*/
@Override
public void updateRequestState(RpcRequestHeaderProto.Builder header) {
header.setStateId(lastSeenStateId.longValue());
if (lastSeenStateId.longValue() != STATEID_DEFAULT_VALUE) {
header.setStateId(lastSeenStateId.longValue());
}
federatedGSIContext.setRequestHeaderState(header);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdfs;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.ipc.AlignmentContext;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;


public class FederatedGSIContext {
private final Map<String, ClientGSIContext> gsiContextMap = new ConcurrentHashMap<>();

public void updateStateUsingRequestHeader(RpcHeaderProtos.RpcRequestHeaderProto header) {
header.getNameserviceStateIdsMap().forEach(this::updateNameserviceState);
}

public void updateStateUsingResponseHeader(RpcHeaderProtos.RpcResponseHeaderProto header) {
header.getNameserviceStateIdsMap().forEach(this::updateNameserviceState);
}

public void updateNameserviceState(String nsId, Long stateId) {
if (!gsiContextMap.containsKey(nsId)) {
gsiContextMap.putIfAbsent(nsId, new ClientGSIContext());
}
gsiContextMap.get(nsId).updateLastSeenStateID(stateId);
}

public void setRequestHeaderState(RpcHeaderProtos.RpcRequestHeaderProto.Builder headerBuilder) {
gsiContextMap
.forEach((k, v) -> headerBuilder.putNameserviceStateIds(k, v.getLastSeenStateId()));
}

public void setResponseHeaderState(RpcHeaderProtos.RpcResponseHeaderProto.Builder headerBuilder) {
gsiContextMap
.forEach((k, v) -> headerBuilder.putNameserviceStateIds(k, v.getLastSeenStateId()));
}

public AlignmentContext getNameserviceAlignmentContext(String nsId) {
gsiContextMap
.putIfAbsent(nsId, new ClientGSIContext());
return gsiContextMap.get(nsId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,9 @@ public static ClientProtocol createProxyWithAlignmentContext(
boolean withRetries, AtomicBoolean fallbackToSimpleAuth,
AlignmentContext alignmentContext)
throws IOException {
if (alignmentContext == null) {
alignmentContext = new ClientGSIContext();
}
RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class,
ProtobufRpcEngine2.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ public interface FederationRPCMBean {

long getProxyOps();

long getActiveProxyOps();

long getObserverProxyOps();

double getProxyAvg();

long getProcessingOps();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.annotation.Metric;
Expand Down Expand Up @@ -49,7 +50,10 @@ public class FederationRPCMetrics implements FederationRPCMBean {
private MutableRate proxy;
@Metric("Number of operations the Router proxied to a Namenode")
private MutableCounterLong proxyOp;

@Metric("Number of operations the Router proxied to a Active Namenode")
private MutableCounterLong activeProxyOp;
@Metric("Number of operations the Router proxied to a Observer Namenode")
private MutableCounterLong observerProxyOp;
@Metric("Number of operations to hit a standby NN")
private MutableCounterLong proxyOpFailureStandby;
@Metric("Number of operations to fail to reach NN")
Expand Down Expand Up @@ -256,9 +260,15 @@ public String getAsyncCallerPool() {
* Add the time to proxy an operation from the moment the Router sends it to
* the Namenode until it replied.
* @param time Proxy time of an operation in nanoseconds.
* @param state NameNode state. Maybe null
*/
public void addProxyTime(long time) {
public void addProxyTime(long time, FederationNamenodeServiceState state) {
proxy.add(time);
if(FederationNamenodeServiceState.ACTIVE == state) {
activeProxyOp.incr();
} else if (FederationNamenodeServiceState.OBSERVER == state) {
observerProxyOp.incr();
}
proxyOp.incr();
}

Expand All @@ -272,6 +282,16 @@ public long getProxyOps() {
return proxyOp.value();
}

@Override
public long getActiveProxyOps() {
return activeProxyOp.value();
}

@Override
public long getObserverProxyOps() {
return observerProxyOp.value();
}

/**
* Add the time to process a request in the Router from the time we receive
* the call until we send it to the Namenode.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.router.FederationUtil;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcMonitor;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
Expand Down Expand Up @@ -147,12 +148,13 @@ public long proxyOp() {
}

@Override
public void proxyOpComplete(boolean success, String nsId) {
public void proxyOpComplete(boolean success, String nsId,
FederationNamenodeServiceState state) {
if (success) {
long proxyTime = getProxyTime();
if (proxyTime >= 0) {
if (metrics != null) {
metrics.addProxyTime(proxyTime);
metrics.addProxyTime(proxyTime, state);
}
if (nameserviceRPCMetricsMap != null &&
nameserviceRPCMetricsMap.containsKey(nsId)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -877,7 +877,7 @@ private List<MembershipState> getActiveNamenodeRegistrations()
// Fetch the most recent namenode registration
String nsId = nsInfo.getNameserviceId();
List<? extends FederationNamenodeContext> nns =
namenodeResolver.getNamenodesForNameserviceId(nsId);
namenodeResolver.getNamenodesForNameserviceId(nsId, false);
if (nns != null) {
FederationNamenodeContext nn = nns.get(0);
if (nn instanceof MembershipState) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,17 @@
@InterfaceStability.Evolving
public interface ActiveNamenodeResolver {

/**
* Report a failed, unavailable NN address for a nameservice or blockPool.
*
* @param ns Nameservice identifier.
* @param failedAddress The address the failed responded to the command.
*
* @throws IOException If the state store cannot be accessed.
*/
void updateUnavailableNamenode(
String ns, InetSocketAddress failedAddress) throws IOException;

/**
* Report a successful, active NN address for a nameservice or blockPool.
*
Expand All @@ -56,27 +67,38 @@ void updateActiveNamenode(

/**
* Returns a prioritized list of the most recent cached registration entries
* for a single nameservice ID.
* Returns an empty list if none are found. Returns entries in preference of:
* for a single nameservice ID. Returns an empty list if none are found.
* In the case of not observerRead Returns entries in preference of :
* <ul>
* <li>The most recent ACTIVE NN
* <li>The most recent OBSERVER NN
* <li>The most recent STANDBY NN
* <li>The most recent UNAVAILABLE NN
* </ul>
*
* In the case of observerRead Returns entries in preference of :
* <ul>
* <li>The most recent OBSERVER NN
* <li>The most recent ACTIVE NN
* <li>The most recent STANDBY NN
* <li>The most recent UNAVAILABLE NN
* </ul>
*
* @param nameserviceId Nameservice identifier.
* @param observerRead Observer read case, observer NN will be ranked first
* @return Prioritized list of namenode contexts.
* @throws IOException If the state store cannot be accessed.
*/
List<? extends FederationNamenodeContext>
getNamenodesForNameserviceId(String nameserviceId) throws IOException;
List<? extends FederationNamenodeContext> getNamenodesForNameserviceId(
String nameserviceId, boolean observerRead) throws IOException;

/**
* Returns a prioritized list of the most recent cached registration entries
* for a single block pool ID.
* Returns an empty list if none are found. Returns entries in preference of:
* <ul>
* <li>The most recent ACTIVE NN
* <li>The most recent OBSERVER NN
* <li>The most recent STANDBY NN
* <li>The most recent UNAVAILABLE NN
* </ul>
Expand Down
Loading