Skip to content

Commit

Permalink
[SBN READ] Observer should throw ObserverRetryOnActiveException if st…
Browse files Browse the repository at this point in the history
…ateid is always delayed with Active Namenode for a period of time
  • Loading branch information
lgh committed Dec 25, 2023
1 parent b4fed58 commit 982fa77
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.hadoop.crypto.OpensslAesCtrCryptoCodec;
import org.apache.hadoop.crypto.OpensslSm4CtrCryptoCodec;

import java.util.concurrent.TimeUnit;

/**
* This class contains constants for configuration keys used
* in the common code.
Expand Down Expand Up @@ -1077,6 +1079,13 @@ public class CommonConfigurationKeysPublic {
"ipc.server.metrics.update.runner.interval";
public static final int IPC_SERVER_METRICS_UPDATE_RUNNER_INTERVAL_DEFAULT = 5000;

public static final String IPC_SERVER_OBSERVER_STABLE_RPC_ENABLE =
"ipc.server.observer.stable.rpc.enable";
public static final boolean IPC_SERVER_OBSERVER_STABLE_RPC_ENABLE_DEFAULT = false;
public static final String IPC_SERVER_OBSERVER_STABLE_RPC_INTERVAL =
"ipc.server.observer.stable.rpc.interval";
public static final long IPC_SERVER_OBSERVER_STABLE_RPC_DEFAULT = TimeUnit.SECONDS.toNanos(15);

/**
* @see
* <a href="{@docRoot}/../hadoop-project-dist/hadoop-common/core-default.xml">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,8 @@ protected ResponseBuffer initialValue() {
private int socketSendBufferSize;
private final int maxDataLength;
private final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
private final boolean rpcStableEnable;
private final long rpcStableInterval;

volatile private boolean running = true; // true while server runs
private CallQueueManager<Call> callQueue;
Expand Down Expand Up @@ -976,6 +978,7 @@ public static class Call implements Schedulable,
// Serialized RouterFederatedStateProto message to
// store last seen states for multiple namespaces.
private ByteString federatedNamespaceState;
private boolean isStable;

Call() {
this(RpcConstants.INVALID_CALL_ID, RpcConstants.INVALID_RETRY_COUNT,
Expand Down Expand Up @@ -1009,6 +1012,15 @@ public Call(int id, int retryCount, Void ignore1, Void ignore2,
this.callerContext = callerContext;
this.clientStateId = Long.MIN_VALUE;
this.isCallCoordinated = false;
this.isStable = false;
}

public boolean isStable() {
return isStable;
}

public void setStable(boolean stable) {
isStable = stable;
}

/**
Expand Down Expand Up @@ -1243,6 +1255,9 @@ public Void run() throws Exception {
ResponseParams responseParams = new ResponseParams();

try {
if (isStable()) {
throw new ObserverRetryOnActiveException("The rpc call in observer is stable.");
}
value = call(
rpcKind, connection.protocolName, rpcRequest, getTimestampNanos());
} catch (Throwable e) {
Expand Down Expand Up @@ -3177,10 +3192,14 @@ public void run() {
* In case of Observer, it handles only reads, which are
* commutative.
*/
// Re-queue the call and continue
requeueCall(call);
call = null;
continue;
if (rpcStableEnable && startTimeNanos - call.timestampNanos > rpcStableInterval) {
call.setStable(true);
} else {
// Re-queue the call and continue
requeueCall(call);
call = null;
continue;
}
}
LOG.debug("{}: {} for RpcKind {}.", Thread.currentThread().getName(), call, call.rpcKind);
CurCall.set(call);
Expand Down Expand Up @@ -3341,6 +3360,17 @@ protected Server(String bindAddress, int port,
CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY,
CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_DEFAULT);

this.rpcStableEnable =
conf.getBoolean(CommonConfigurationKeys.IPC_SERVER_OBSERVER_STABLE_RPC_ENABLE,
CommonConfigurationKeys.IPC_SERVER_OBSERVER_STABLE_RPC_ENABLE_DEFAULT);
if (this.rpcStableEnable) {
this.rpcStableInterval =
conf.getTimeDuration(CommonConfigurationKeys.IPC_SERVER_OBSERVER_STABLE_RPC_INTERVAL,
CommonConfigurationKeys.IPC_SERVER_OBSERVER_STABLE_RPC_DEFAULT, TimeUnit.NANOSECONDS);
} else {
this.rpcStableInterval = -1;
}

// Setup appropriate callqueue
final String prefix = getQueueClassPrefix();
this.callQueue = new CallQueueManager<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3834,6 +3834,23 @@ The switch to turn S3A auditing on or off.
</description>
</property>

<property>
<name>ipc.server.observer.stable.rpc.enable</name>
<value>true</value>
<description>
Whether to enable observer stable rpc. If enable when Observer NN's stateid is always
delayed it will thrown ObserverRetryOnActiveException after ipc.server.observer.stable.rpc.interval
is reached.
</description>
</property>

<property>
<name>ipc.server.observer.stable.rpc.interval</name>
<value>15</value>
<description>
Times observer rpc is stable in seconds.
</description>
</property>

<!-- YARN registry -->

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
*/
package org.apache.hadoop.hdfs.server.namenode.ha;

import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_SERVER_OBSERVER_STABLE_RPC_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_SERVER_OBSERVER_STABLE_RPC_ENABLE;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_SERVER_OBSERVER_STABLE_RPC_ENABLE_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_SERVER_OBSERVER_STABLE_RPC_INTERVAL;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_OBSERVER_ENABLED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY;
import static org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter.getServiceState;
Expand Down Expand Up @@ -59,6 +63,7 @@
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
Expand Down Expand Up @@ -106,6 +111,9 @@ public static void startUpCluster() throws Exception {
// Observer and immediately try to read from it.
conf.setTimeDuration(
OBSERVER_PROBE_RETRY_PERIOD_KEY, 0, TimeUnit.MILLISECONDS);
conf.setBoolean(IPC_SERVER_OBSERVER_STABLE_RPC_ENABLE, true);
conf.setTimeDuration(IPC_SERVER_OBSERVER_STABLE_RPC_INTERVAL,
TimeUnit.SECONDS.toNanos(10), TimeUnit.NANOSECONDS);
qjmhaCluster = HATestUtil.setUpObserverCluster(conf, 1, 1, true);
dfsCluster = qjmhaCluster.getDfsCluster();
}
Expand Down Expand Up @@ -170,6 +178,34 @@ public void testObserverRequeue() throws Exception {
}
}

@Test
public void testObserverStableRpc() throws Exception {
FSNamesystem observerFsNS = dfsCluster.getNamesystem(2);
try {
// Stop EditlogTailer of Observer NameNode.
observerFsNS.getEditLogTailer().stop();

Path tmpTestPath = new Path("/TestObserverStableRpc");
dfs.create(tmpTestPath, (short)1).close();
assertSentTo(0);
// This operation will be blocked in ObserverNameNode
// until observer rpc stable time is reached
// and then throw ObserverRetryOnActiveException and client will retry to Active NN
FileStatus fileStatus = dfs.getFileStatus(tmpTestPath);
assertSentTo(0);
assertNotNull(fileStatus);

observerFsNS.getEditLogTailer().doTailEdits();
fileStatus = dfs.getFileStatus(tmpTestPath);
assertSentTo(2);
assertNotNull(fileStatus);
} finally {
EditLogTailer editLogTailer = new EditLogTailer(observerFsNS, conf);
observerFsNS.setEditLogTailerForTests(editLogTailer);
editLogTailer.start();
}
}

@Test
public void testNoActiveToObserver() throws Exception {
try {
Expand Down

0 comments on commit 982fa77

Please sign in to comment.