diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java index 24a3167b3db2dd..b87e54c194dd3b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java @@ -25,6 +25,8 @@ import org.apache.hadoop.crypto.OpensslAesCtrCryptoCodec; import org.apache.hadoop.crypto.OpensslSm4CtrCryptoCodec; +import java.util.concurrent.TimeUnit; + /** * This class contains constants for configuration keys used * in the common code. @@ -1077,6 +1079,13 @@ public class CommonConfigurationKeysPublic { "ipc.server.metrics.update.runner.interval"; public static final int IPC_SERVER_METRICS_UPDATE_RUNNER_INTERVAL_DEFAULT = 5000; + public static final String IPC_SERVER_OBSERVER_STABLE_RPC_ENABLE = + "ipc.server.observer.stable.rpc.enable"; + public static final boolean IPC_SERVER_OBSERVER_STABLE_RPC_ENABLE_DEFAULT = false; + public static final String IPC_SERVER_OBSERVER_STABLE_RPC_INTERVAL = + "ipc.server.observer.stable.rpc.interval"; + public static final long IPC_SERVER_OBSERVER_STABLE_RPC_DEFAULT = TimeUnit.SECONDS.toNanos(15); + /** * @see * diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 53497e9707807f..65d024c2ce5cae 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -494,6 +494,8 @@ protected ResponseBuffer initialValue() { private int socketSendBufferSize; private final int maxDataLength; private final boolean tcpNoDelay; // if T then disable Nagle's Algorithm + private final boolean rpcStableEnable; + private final long rpcStableInterval; volatile private boolean running = true; // true while server runs private CallQueueManager callQueue; @@ -976,6 +978,7 @@ public static class Call implements Schedulable, // Serialized RouterFederatedStateProto message to // store last seen states for multiple namespaces. private ByteString federatedNamespaceState; + private boolean isStable; Call() { this(RpcConstants.INVALID_CALL_ID, RpcConstants.INVALID_RETRY_COUNT, @@ -1009,6 +1012,15 @@ public Call(int id, int retryCount, Void ignore1, Void ignore2, this.callerContext = callerContext; this.clientStateId = Long.MIN_VALUE; this.isCallCoordinated = false; + this.isStable = false; + } + + public boolean isStable() { + return isStable; + } + + public void setStable(boolean stable) { + isStable = stable; } /** @@ -1243,6 +1255,9 @@ public Void run() throws Exception { ResponseParams responseParams = new ResponseParams(); try { + if (isStable()) { + throw new ObserverRetryOnActiveException("The rpc call in observer is stable."); + } value = call( rpcKind, connection.protocolName, rpcRequest, getTimestampNanos()); } catch (Throwable e) { @@ -3177,10 +3192,14 @@ public void run() { * In case of Observer, it handles only reads, which are * commutative. */ - // Re-queue the call and continue - requeueCall(call); - call = null; - continue; + if (rpcStableEnable && startTimeNanos - call.timestampNanos > rpcStableInterval) { + call.setStable(true); + } else { + // Re-queue the call and continue + requeueCall(call); + call = null; + continue; + } } LOG.debug("{}: {} for RpcKind {}.", Thread.currentThread().getName(), call, call.rpcKind); CurCall.set(call); @@ -3341,6 +3360,17 @@ protected Server(String bindAddress, int port, CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY, CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_DEFAULT); + this.rpcStableEnable = + conf.getBoolean(CommonConfigurationKeys.IPC_SERVER_OBSERVER_STABLE_RPC_ENABLE, + CommonConfigurationKeys.IPC_SERVER_OBSERVER_STABLE_RPC_ENABLE_DEFAULT); + if (this.rpcStableEnable) { + this.rpcStableInterval = + conf.getTimeDuration(CommonConfigurationKeys.IPC_SERVER_OBSERVER_STABLE_RPC_INTERVAL, + CommonConfigurationKeys.IPC_SERVER_OBSERVER_STABLE_RPC_DEFAULT, TimeUnit.NANOSECONDS); + } else { + this.rpcStableInterval = -1; + } + // Setup appropriate callqueue final String prefix = getQueueClassPrefix(); this.callQueue = new CallQueueManager<>( diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml index 5a5171056d0488..1172c38347719f 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml +++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml @@ -3834,6 +3834,23 @@ The switch to turn S3A auditing on or off. + + ipc.server.observer.stable.rpc.enable + true + + Whether to enable observer stable rpc. If enable when Observer NN's stateid is always + delayed it will thrown ObserverRetryOnActiveException after ipc.server.observer.stable.rpc.interval + is reached. + + + + + ipc.server.observer.stable.rpc.interval + 15 + + Times observer rpc is stable in seconds. + + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java index a293cb4d17c47e..36e92f385dd82a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java @@ -17,6 +17,10 @@ */ package org.apache.hadoop.hdfs.server.namenode.ha; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_SERVER_OBSERVER_STABLE_RPC_DEFAULT; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_SERVER_OBSERVER_STABLE_RPC_ENABLE; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_SERVER_OBSERVER_STABLE_RPC_ENABLE_DEFAULT; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_SERVER_OBSERVER_STABLE_RPC_INTERVAL; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_OBSERVER_ENABLED_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY; import static org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter.getServiceState; @@ -59,6 +63,7 @@ import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster; @@ -106,6 +111,9 @@ public static void startUpCluster() throws Exception { // Observer and immediately try to read from it. conf.setTimeDuration( OBSERVER_PROBE_RETRY_PERIOD_KEY, 0, TimeUnit.MILLISECONDS); + conf.setBoolean(IPC_SERVER_OBSERVER_STABLE_RPC_ENABLE, true); + conf.setTimeDuration(IPC_SERVER_OBSERVER_STABLE_RPC_INTERVAL, + TimeUnit.SECONDS.toNanos(10), TimeUnit.NANOSECONDS); qjmhaCluster = HATestUtil.setUpObserverCluster(conf, 1, 1, true); dfsCluster = qjmhaCluster.getDfsCluster(); } @@ -170,6 +178,34 @@ public void testObserverRequeue() throws Exception { } } + @Test + public void testObserverStableRpc() throws Exception { + FSNamesystem observerFsNS = dfsCluster.getNamesystem(2); + try { + // Stop EditlogTailer of Observer NameNode. + observerFsNS.getEditLogTailer().stop(); + + Path tmpTestPath = new Path("/TestObserverStableRpc"); + dfs.create(tmpTestPath, (short)1).close(); + assertSentTo(0); + // This operation will be blocked in ObserverNameNode + // until observer rpc stable time is reached + // and then throw ObserverRetryOnActiveException and client will retry to Active NN + FileStatus fileStatus = dfs.getFileStatus(tmpTestPath); + assertSentTo(0); + assertNotNull(fileStatus); + + observerFsNS.getEditLogTailer().doTailEdits(); + fileStatus = dfs.getFileStatus(tmpTestPath); + assertSentTo(2); + assertNotNull(fileStatus); + } finally { + EditLogTailer editLogTailer = new EditLogTailer(observerFsNS, conf); + observerFsNS.setEditLogTailerForTests(editLogTailer); + editLogTailer.start(); + } + } + @Test public void testNoActiveToObserver() throws Exception { try {