Skip to content

HDFS-13522. RBF: Support observer node from Router-Based Federation #3005

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -780,7 +780,7 @@ public static class Call implements Schedulable,
private volatile String detailedMetricsName = "";
final int callId; // the client's call id
final int retryCount; // the retry count of the call
long timestampNanos; // time the call was received
final long timestampNanos; // time the call was received
long responseTimestampNanos; // time the call was served
private AtomicInteger responseWaitCount = new AtomicInteger(1);
final RPC.RpcKind rpcKind;
Expand Down Expand Up @@ -959,6 +959,10 @@ public void setDeferredResponse(Writable response) {

public void setDeferredError(Throwable t) {
}

public long getTimestampNanos() {
return timestampNanos;
}
}

/** A RPC extended call queued for handling. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ public class ClientGSIContext implements AlignmentContext {
private final LongAccumulator lastSeenStateId =
new LongAccumulator(Math::max, Long.MIN_VALUE);

public void disableObserverRead() {
if(lastSeenStateId.get() > -1L) {
throw new IllegalStateException(
"Can't disable observer read after communicate.");
}
lastSeenStateId.accumulate(-1L);
}

@Override
public long getLastSeenStateId() {
return lastSeenStateId.get();
Expand All @@ -66,6 +74,10 @@ public void updateResponseState(RpcResponseHeaderProto.Builder header) {
*/
@Override
public void receiveResponseState(RpcResponseHeaderProto header) {
if(lastSeenStateId.get() == -1L){
//Observer read is disabled
return;
}
lastSeenStateId.accumulate(header.getStateId());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,18 @@ public static ClientProtocol createProxyWithAlignmentContext(
boolean withRetries, AtomicBoolean fallbackToSimpleAuth,
AlignmentContext alignmentContext)
throws IOException {
if (!conf.getBoolean(HdfsClientConfigKeys.DFS_OBSERVER_READ_ENABLE,
HdfsClientConfigKeys.DFS_OBSERVER_READ_ENABLE_DEFAULT)) {
//Disabled observer read
if (alignmentContext == null) {
alignmentContext = new ClientGSIContext();
}
if (alignmentContext instanceof ClientGSIContext) {
((ClientGSIContext) alignmentContext).disableObserverRead();
LOG.info("Observer read is disabled in client");
}
}

RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class,
ProtobufRpcEngine2.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ public interface HdfsClientConfigKeys {
String DFS_NAMENODE_HTTPS_ADDRESS_KEY = "dfs.namenode.https-address";
String DFS_HA_NAMENODES_KEY_PREFIX = "dfs.ha.namenodes";
int DFS_NAMENODE_RPC_PORT_DEFAULT = 8020;
String DFS_OBSERVER_READ_ENABLE = "dfs.observer.read.enable";
boolean DFS_OBSERVER_READ_ENABLE_DEFAULT = true;
String DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY =
"dfs.namenode.kerberos.principal";
String DFS_CLIENT_WRITE_PACKET_SIZE_KEY = "dfs.client-write-packet-size";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ public interface FederationRPCMBean {

long getProxyOps();

long getActiveProxyOps();

long getObserverProxyOps();

double getProxyAvg();

long getProcessingOps();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.annotation.Metric;
Expand Down Expand Up @@ -49,7 +50,10 @@ public class FederationRPCMetrics implements FederationRPCMBean {
private MutableRate proxy;
@Metric("Number of operations the Router proxied to a Namenode")
private MutableCounterLong proxyOp;

@Metric("Number of operations the Router proxied to a Active Namenode")
private MutableCounterLong activeProxyOp;
@Metric("Number of operations the Router proxied to a Observer Namenode")
private MutableCounterLong observerProxyOp;
@Metric("Number of operations to hit a standby NN")
private MutableCounterLong proxyOpFailureStandby;
@Metric("Number of operations to fail to reach NN")
Expand Down Expand Up @@ -232,9 +236,15 @@ public String getAsyncCallerPool() {
* Add the time to proxy an operation from the moment the Router sends it to
* the Namenode until it replied.
* @param time Proxy time of an operation in nanoseconds.
* @param state NameNode state. Maybe null
*/
public void addProxyTime(long time) {
public void addProxyTime(long time, FederationNamenodeServiceState state) {
proxy.add(time);
if(FederationNamenodeServiceState.ACTIVE == state) {
activeProxyOp.incr();
} else if (FederationNamenodeServiceState.OBSERVER == state) {
observerProxyOp.incr();
}
proxyOp.incr();
}

Expand All @@ -248,6 +258,16 @@ public long getProxyOps() {
return proxyOp.value();
}

@Override
public long getActiveProxyOps() {
return activeProxyOp.value();
}

@Override
public long getObserverProxyOps() {
return observerProxyOp.value();
}

/**
* Add the time to process a request in the Router from the time we receive
* the call until we send it to the Namenode.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import javax.management.StandardMBean;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcMonitor;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
Expand Down Expand Up @@ -136,11 +137,12 @@ public long proxyOp() {
}

@Override
public void proxyOpComplete(boolean success) {
public void proxyOpComplete(boolean success,
FederationNamenodeServiceState state) {
if (success) {
long proxyTime = getProxyTime();
if (metrics != null && proxyTime >= 0) {
metrics.addProxyTime(proxyTime);
metrics.addProxyTime(proxyTime, state);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -836,7 +836,7 @@ private List<MembershipState> getActiveNamenodeRegistrations()
// Fetch the most recent namenode registration
String nsId = nsInfo.getNameserviceId();
List<? extends FederationNamenodeContext> nns =
namenodeResolver.getNamenodesForNameserviceId(nsId);
namenodeResolver.getNamenodesForNameserviceId(nsId, false);
if (nns != null) {
FederationNamenodeContext nn = nns.get(0);
if (nn instanceof MembershipState) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,17 @@
@InterfaceStability.Evolving
public interface ActiveNamenodeResolver {

/**
* Report a failed, unavailable NN address for a nameservice or blockPool.
*
* @param ns Nameservice identifier.
* @param failedAddress The address the failed responded to the command.
*
* @throws IOException If the state store cannot be accessed.
*/
void updateUnavailableNamenode(
String ns, InetSocketAddress failedAddress) throws IOException;

/**
* Report a successful, active NN address for a nameservice or blockPool.
*
Expand All @@ -56,27 +67,38 @@ void updateActiveNamenode(

/**
* Returns a prioritized list of the most recent cached registration entries
* for a single nameservice ID.
* Returns an empty list if none are found. Returns entries in preference of:
* for a single nameservice ID. Returns an empty list if none are found.
* In the case of not observerRead Returns entries in preference of :
* <ul>
* <li>The most recent ACTIVE NN
* <li>The most recent OBSERVER NN
* <li>The most recent STANDBY NN
* <li>The most recent UNAVAILABLE NN
* </ul>
*
* In the case of observerRead Returns entries in preference of :
* <ul>
* <li>The most recent OBSERVER NN
* <li>The most recent ACTIVE NN
* <li>The most recent STANDBY NN
* <li>The most recent UNAVAILABLE NN
* </ul>
*
* @param nameserviceId Nameservice identifier.
* @param observerRead Observer read case, observer NN will be ranked first
* @return Prioritized list of namenode contexts.
* @throws IOException If the state store cannot be accessed.
*/
List<? extends FederationNamenodeContext>
getNamenodesForNameserviceId(String nameserviceId) throws IOException;
List<? extends FederationNamenodeContext> getNamenodesForNameserviceId(
String nameserviceId, boolean observerRead) throws IOException;

/**
* Returns a prioritized list of the most recent cached registration entries
* for a single block pool ID.
* Returns an empty list if none are found. Returns entries in preference of:
* <ul>
* <li>The most recent ACTIVE NN
* <li>The most recent OBSERVER NN
* <li>The most recent STANDBY NN
* <li>The most recent UNAVAILABLE NN
* </ul>
Expand Down
Loading