Skip to content

HDDS-1555. Disable install snapshot for ContainerStateMachine. #846

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 10 commits into from
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -18,52 +18,54 @@

package org.apache.hadoop.hdds.scm;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.OptionalLong;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;

import io.opentracing.Scope;
import io.opentracing.util.GlobalTracer;
import org.apache.hadoop.util.Time;
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.GroupMismatchException;
import org.apache.ratis.protocol.RaftRetryFailureException;
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.thirdparty.com.google.protobuf
.InvalidProtocolBufferException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.hadoop.hdds.tracing.TracingUtil;

import org.apache.hadoop.util.Time;
import org.apache.ratis.RatisHelper;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.GroupMismatchException;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftException;
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;

import io.opentracing.Scope;
import io.opentracing.util.GlobalTracer;

/**
* An abstract implementation of {@link XceiverClientSpi} using Ratis.
Expand Down Expand Up @@ -309,10 +311,7 @@ public XceiverClientReply sendCommandAsync(
Time.monotonicNowNanos() - requestTime);
}).thenApply(reply -> {
try {
// we need to handle RaftRetryFailure Exception
RaftRetryFailureException raftRetryFailureException =
reply.getRetryFailureException();
if (raftRetryFailureException != null) {
if (!reply.isSuccess()) {
// in case of raft retry failure, the raft client is
// not able to connect to the leader hence the pipeline
// can not be used but this instance of RaftClient will close
Expand All @@ -324,7 +323,10 @@ public XceiverClientReply sendCommandAsync(
// to SCM as in this case, it is the raft client which is not
// able to connect to leader in the pipeline, though the
// pipeline can still be functional.
throw new CompletionException(raftRetryFailureException);
RaftException exception = reply.getException();
Preconditions.checkNotNull(exception, "Raft reply failure but " +
"no exception propagated.");
throw new CompletionException(exception);
}
ContainerCommandResponseProto response =
ContainerCommandResponseProto
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ public final class ScmConfigKeys {
"dfs.container.ratis.log.appender.queue.byte-limit";
public static final String
DFS_CONTAINER_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT = "32MB";
public static final String DFS_CONTAINER_RATIS_LOG_PURGE_GAP =
"dfs.container.ratis.log.purge.gap";
// TODO: Set to 1024 once RATIS issue around purge is fixed.
public static final int DFS_CONTAINER_RATIS_LOG_PURGE_GAP_DEFAULT =
1000000000;
// expiry interval stateMachineData cache entry inside containerStateMachine
public static final String
DFS_CONTAINER_RATIS_STATEMACHINEDATA_CACHE_EXPIRY_INTERVAL =
Expand Down Expand Up @@ -146,7 +151,7 @@ public final class ScmConfigKeys {

public static final String DFS_RATIS_SNAPSHOT_THRESHOLD_KEY =
"dfs.ratis.snapshot.threshold";
public static final long DFS_RATIS_SNAPSHOT_THRESHOLD_DEFAULT = 10000;
public static final long DFS_RATIS_SNAPSHOT_THRESHOLD_DEFAULT = 100000;

public static final String DFS_RATIS_SERVER_FAILURE_DURATION_KEY =
"dfs.ratis.server.failure.duration";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,10 @@ public final class OzoneConfigKeys {
public static final String
DFS_CONTAINER_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT =
ScmConfigKeys.DFS_CONTAINER_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT;
public static final String DFS_CONTAINER_RATIS_LOG_PURGE_GAP =
ScmConfigKeys.DFS_CONTAINER_RATIS_LOG_PURGE_GAP;
public static final int DFS_CONTAINER_RATIS_LOG_PURGE_GAP_DEFAULT =
ScmConfigKeys.DFS_CONTAINER_RATIS_LOG_PURGE_GAP_DEFAULT;
public static final String DFS_RATIS_SERVER_REQUEST_TIMEOUT_DURATION_KEY =
ScmConfigKeys.DFS_RATIS_SERVER_REQUEST_TIMEOUT_DURATION_KEY;
public static final TimeDuration
Expand Down
8 changes: 8 additions & 0 deletions hadoop-hdds/common/src/main/resources/ozone-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,14 @@
<description>Byte limit for ratis leader's log appender queue.
</description>
</property>
<property>
<name>dfs.container.ratis.log.purge.gap</name>
<value>1000000000</value>
<tag>OZONE, DEBUG, CONTAINER, RATIS</tag>
<description>Purge gap between the last purged commit index
and the current index, when the leader decides to purge its log.
</description>
</property>
<property>
<name>dfs.container.ratis.datanode.storage.dir</name>
<value/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.impl.RaftServerConstants;
import org.apache.ratis.server.impl.RaftServerProxy;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
import org.apache.ratis.thirdparty.com.google.protobuf
.InvalidProtocolBufferException;
Expand Down Expand Up @@ -195,12 +195,12 @@ private long loadSnapshot(SingleFileSnapshotInfo snapshot)
throws IOException {
if (snapshot == null) {
TermIndex empty =
TermIndex.newTermIndex(0, RaftServerConstants.INVALID_LOG_INDEX);
TermIndex.newTermIndex(0, RaftLog.INVALID_LOG_INDEX);
LOG.info(
"The snapshot info is null." + "Setting the last applied index to:"
+ empty);
setLastAppliedTermIndex(empty);
return RaftServerConstants.INVALID_LOG_INDEX;
return RaftLog.INVALID_LOG_INDEX;
}

final File snapshotFile = snapshot.getFile().getPath().toFile();
Expand Down Expand Up @@ -243,7 +243,7 @@ public void persistContainerSet(OutputStream out) throws IOException {
public long takeSnapshot() throws IOException {
TermIndex ti = getLastAppliedTermIndex();
LOG.info("Taking snapshot at termIndex:" + ti);
if (ti != null && ti.getIndex() != RaftServerConstants.INVALID_LOG_INDEX) {
if (ti != null && ti.getIndex() != RaftLog.INVALID_LOG_INDEX) {
final File snapshotFile =
storage.getSnapshotFile(ti.getTerm(), ti.getIndex());
LOG.info("Taking a snapshot to file {}", snapshotFile);
Expand Down Expand Up @@ -651,14 +651,13 @@ private void evictStateMachineCache() {
}

@Override
public void notifySlowness(RaftGroup group, RoleInfoProto roleInfoProto) {
ratisServer.handleNodeSlowness(group, roleInfoProto);
public void notifySlowness(RoleInfoProto roleInfoProto) {
ratisServer.handleNodeSlowness(gid, roleInfoProto);
}

@Override
public void notifyExtendedNoLeader(RaftGroup group,
RoleInfoProto roleInfoProto) {
ratisServer.handleNoLeader(group, roleInfoProto);
public void notifyExtendedNoLeader(RoleInfoProto roleInfoProto) {
ratisServer.handleNoLeader(gid, roleInfoProto);
}

@Override
Expand All @@ -667,6 +666,16 @@ public void notifyNotLeader(Collection<TransactionContext> pendingEntries)
evictStateMachineCache();
}

@Override
public CompletableFuture<TermIndex> notifyInstallSnapshotFromLeader(
RoleInfoProto roleInfoProto, TermIndex firstTermIndexInLog) {
ratisServer.handleInstallSnapshotFromLeader(gid, roleInfoProto,
firstTermIndexInLog);
final CompletableFuture<TermIndex> future = new CompletableFuture<>();
future.complete(firstTermIndexInLog);
return future;
}

@Override
public void close() throws IOException {
evictStateMachineCache();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
import org.apache.ratis.protocol.NotLeaderException;
import org.apache.ratis.protocol.StateMachineException;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.rpc.SupportedRpcType;
Expand All @@ -66,6 +65,7 @@
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.proto.RaftProtos.RoleInfoProto;
import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
Expand Down Expand Up @@ -240,8 +240,9 @@ private RaftProperties newRaftProperties(Configuration conf) {
OzoneConfigKeys.DFS_CONTAINER_RATIS_LOG_QUEUE_BYTE_LIMIT,
OzoneConfigKeys.DFS_CONTAINER_RATIS_LOG_QUEUE_BYTE_LIMIT_DEFAULT,
StorageUnit.BYTES);
RaftServerConfigKeys.Log.setElementLimit(properties, logQueueNumElements);
RaftServerConfigKeys.Log.setByteLimit(properties, logQueueByteLimit);
RaftServerConfigKeys.Log.setQueueElementLimit(
properties, logQueueNumElements);
RaftServerConfigKeys.Log.setQueueByteLimit(properties, logQueueByteLimit);

int numSyncRetries = conf.getInt(
OzoneConfigKeys.DFS_CONTAINER_RATIS_STATEMACHINEDATA_SYNC_RETRIES,
Expand All @@ -251,8 +252,17 @@ private RaftProperties newRaftProperties(Configuration conf) {
numSyncRetries);

// Enable the StateMachineCaching
RaftServerConfigKeys.Log.StateMachineData
.setCachingEnabled(properties, true);
RaftServerConfigKeys.Log.StateMachineData.setCachingEnabled(
properties, true);

RaftServerConfigKeys.Log.Appender.setInstallSnapshotEnabled(properties,
false);

int purgeGap = conf.getInt(
OzoneConfigKeys.DFS_CONTAINER_RATIS_LOG_PURGE_GAP,
OzoneConfigKeys.DFS_CONTAINER_RATIS_LOG_PURGE_GAP_DEFAULT);
RaftServerConfigKeys.Log.setPurgeGap(properties, purgeGap);

return properties;
}

Expand Down Expand Up @@ -590,11 +600,32 @@ public List<PipelineID> getPipelineIds() {
return pipelineIDs;
}

void handleNodeSlowness(RaftGroup group, RoleInfoProto roleInfoProto) {
handlePipelineFailure(group.getGroupId(), roleInfoProto);
void handleNodeSlowness(RaftGroupId groupId, RoleInfoProto roleInfoProto) {
handlePipelineFailure(groupId, roleInfoProto);
}

void handleNoLeader(RaftGroup group, RoleInfoProto roleInfoProto) {
handlePipelineFailure(group.getGroupId(), roleInfoProto);
void handleNoLeader(RaftGroupId groupId, RoleInfoProto roleInfoProto) {
handlePipelineFailure(groupId, roleInfoProto);
}

/**
* The fact that the snapshot contents cannot be used to actually catch up
* the follower, it is the reason to initiate close pipeline and
* not install the snapshot. The follower will basically never be able to
* catch up.
*
* @param groupId raft group information
* @param roleInfoProto information about the current node role and
* rpc delay information.
* @param firstTermIndexInLog After the snapshot installation is complete,
* return the last included term index in the snapshot.
*/
void handleInstallSnapshotFromLeader(RaftGroupId groupId,
RoleInfoProto roleInfoProto,
TermIndex firstTermIndexInLog) {
LOG.warn("Install snapshot notification received from Leader with " +
"termIndex: {}, terminating pipeline: {}",
firstTermIndexInLog, groupId);
handlePipelineFailure(groupId, roleInfoProto);
}
}
}
2 changes: 1 addition & 1 deletion hadoop-hdds/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<hdds.version>0.5.0-SNAPSHOT</hdds.version>

<!-- Apache Ratis version -->
<ratis.version>0.4.0-fe2b15d-SNAPSHOT</ratis.version>
<ratis.version>0.4.0-2337318-SNAPSHOT</ratis.version>

<bouncycastle.version>1.60</bouncycastle.version>

Expand Down
Loading