Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,9 @@ message ContainerDataProto {
optional ContainerType containerType = 10 [default = KeyValueContainer];
}

message ContainerIdSetProto {
repeated int64 containerId = 1;
message Container2BCSIDMapProto {
// repeated Container2BCSIDMapEntryProto container2BCSID = 1;
map <int64, int64> container2BCSID = 1;
}

enum ContainerType {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,14 +240,46 @@ public Set<Long> getMissingContainerSet() {
}

/**
* Builds the missing container set by taking a diff total no containers
* actually found and number of containers which actually got created.
* Builds the missing container set by taking a diff between total no
* containers actually found and number of containers which actually
* got created. It also validates the BCSID stored in the snapshot file
* for each container as against what is reported in containerScan.
* This will only be called during the initialization of Datanode Service
* when it still not a part of any write Pipeline.
* @param createdContainerSet ContainerId set persisted in the Ratis snapshot
* @param container2BCSIDMap Map of containerId to BCSID persisted in the
* Ratis snapshot
*/
public void buildMissingContainerSet(Set<Long> createdContainerSet) {
missingContainerSet.addAll(createdContainerSet);
missingContainerSet.removeAll(containerMap.keySet());
public void buildMissingContainerSetAndValidate(
Map<Long, Long> container2BCSIDMap) {
container2BCSIDMap.entrySet().parallelStream().forEach((mapEntry) -> {
long id = mapEntry.getKey();
if (!containerMap.containsKey(id)) {
LOG.warn("Adding container {} to missing container set.", id);
missingContainerSet.add(id);
} else {
Container container = containerMap.get(id);
long containerBCSID = container.getBlockCommitSequenceId();
long snapshotBCSID = mapEntry.getValue();
if (containerBCSID < snapshotBCSID) {
LOG.warn(
"Marking container {} unhealthy as reported BCSID {} is smaller"
+ " than ratis snapshot recorded value {}", id,
containerBCSID, snapshotBCSID);
// just mark the container unhealthy. Once the DatanodeStateMachine
// thread starts it will send container report to SCM where these
// unhealthy containers would be detected
try {
container.markContainerUnhealthy();
} catch (StorageContainerException sce) {
// The container will still be marked unhealthy in memory even if
// exception occurs. It won't accept any new transactions and will
// be handled by SCM. Eve if dn restarts, it will still be detected
// as unheathy as its BCSID won't change.
LOG.error("Unable to persist unhealthy state for container {}", id);
}
}
}
});

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,10 @@ private boolean canIgnoreException(Result result) {
}

@Override
public void buildMissingContainerSet(Set<Long> createdContainerSet) {
containerSet.buildMissingContainerSet(createdContainerSet);
public void buildMissingContainerSetAndValidate(
Map<Long, Long> container2BCSIDMap) {
containerSet
.buildMissingContainerSetAndValidate(container2BCSIDMap);
}

@Override
Expand Down Expand Up @@ -185,9 +187,9 @@ private ContainerCommandResponseProto dispatchRequest(
cmdType == ContainerProtos.Type.WriteChunk && (dispatcherContext == null
|| dispatcherContext.getStage()
== DispatcherContext.WriteChunkStage.COMBINED);
Set<Long> containerIdSet = null;
Map<Long, Long> container2BCSIDMap = null;
if (dispatcherContext != null) {
containerIdSet = dispatcherContext.getCreateContainerSet();
container2BCSIDMap = dispatcherContext.getContainer2BCSIDMap();
}
if (isWriteCommitStage) {
// check if the container Id exist in the loaded snapshot file. if
Expand All @@ -196,9 +198,10 @@ private ContainerCommandResponseProto dispatchRequest(
// snapshot.
// just add it to the list, and remove it from missing container set
// as it might have been added in the list during "init".
Preconditions.checkNotNull(containerIdSet);
if (!containerIdSet.contains(containerID)) {
containerIdSet.add(containerID);
Preconditions.checkNotNull(container2BCSIDMap);
if (container2BCSIDMap.get(containerID) == null) {
container2BCSIDMap
.put(containerID, container.getBlockCommitSequenceId());
containerSet.getMissingContainerSet().remove(containerID);
}
}
Expand Down Expand Up @@ -228,11 +231,14 @@ private ContainerCommandResponseProto dispatchRequest(
audit(action, eventType, params, AuditEventStatus.FAILURE, sce);
return ContainerUtils.logAndReturnError(LOG, sce, msg);
}
Preconditions.checkArgument(isWriteStage && containerIdSet != null
Preconditions.checkArgument(isWriteStage && container2BCSIDMap != null
|| dispatcherContext == null);
if (containerIdSet != null) {
if (container2BCSIDMap != null) {
// adds this container to list of containers created in the pipeline
containerIdSet.add(containerID);
// with initial BCSID recorded as 0.
Preconditions
.checkArgument(!container2BCSIDMap.containsKey(containerID));
container2BCSIDMap.put(containerID, Long.valueOf(0));
}
container = getContainer(containerID);
}
Expand Down Expand Up @@ -313,7 +319,8 @@ private ContainerCommandResponseProto dispatchRequest(
sendCloseContainerActionIfNeeded(container);
}

if(result == Result.SUCCESS) {
if (result == Result.SUCCESS) {
updateBCSID(container, dispatcherContext, cmdType);
audit(action, eventType, params, AuditEventStatus.SUCCESS, null);
} else {
audit(action, eventType, params, AuditEventStatus.FAILURE,
Expand All @@ -329,6 +336,22 @@ private ContainerCommandResponseProto dispatchRequest(
}
}

private void updateBCSID(Container container,
DispatcherContext dispatcherContext, ContainerProtos.Type cmdType) {
if (dispatcherContext != null && (cmdType == ContainerProtos.Type.PutBlock
|| cmdType == ContainerProtos.Type.PutSmallFile)) {
Preconditions.checkNotNull(container);
long bcsID = container.getBlockCommitSequenceId();
long containerId = container.getContainerData().getContainerID();
Map<Long, Long> container2BCSIDMap;
container2BCSIDMap = dispatcherContext.getContainer2BCSIDMap();
Preconditions.checkNotNull(container2BCSIDMap);
Preconditions.checkArgument(container2BCSIDMap.containsKey(containerId));
// updates the latest BCSID on every putBlock or putSmallFile
// transaction over Ratis.
container2BCSIDMap.computeIfPresent(containerId, (u, v) -> v = bcsID);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

computeIfPresent is not needed here, can be replaced with Map#put.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will address in the next patch.

}
}
/**
* Create a container using the input container request.
* @param containerRequest - the container request which requires container
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,13 @@ ContainerReplicaProto getContainerReport()
void updateBlockCommitSequenceId(long blockCommitSequenceId);

/**
* Returns the blockCommitSequenceId.
*/
long getBlockCommitSequenceId();

/**
* check and report the structural integrity of the container.
* @return true if the integrity checks pass
* Scan the container metadata to detect corruption.
*/
boolean scanMetaData();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;

import java.util.Set;
import java.util.Map;

/**
* Dispatcher acts as the bridge between the transport layer and
Expand Down Expand Up @@ -62,9 +62,9 @@ void validateContainerCommand(

/**
* finds and builds the missing containers in case of a lost disk etc
* in the ContainerSet.
* in the ContainerSet. It also validates the BCSID of the containers found.
*/
void buildMissingContainerSet(Set<Long> createdContainers);
void buildMissingContainerSetAndValidate(Map<Long, Long> container2BCSIDMap);

/**
* Shutdown Dispatcher services.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
.InvalidProtocolBufferException;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
ContainerIdSetProto;
Container2BCSIDMapProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
Expand Down Expand Up @@ -88,8 +88,6 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.Executors;
import java.io.FileOutputStream;
import java.io.FileInputStream;
Expand Down Expand Up @@ -146,7 +144,7 @@ public class ContainerStateMachine extends BaseStateMachine {
CompletableFuture<ContainerCommandResponseProto>> writeChunkFutureMap;

// keeps track of the containers created per pipeline
private final Set<Long> createContainerSet;
private final Map<Long, Long> container2BCSIDMap;
private ExecutorService[] executors;
private final Map<Long, Long> applyTransactionCompletionMap;
private final Cache<Long, ByteString> stateMachineDataCache;
Expand Down Expand Up @@ -181,7 +179,7 @@ public ContainerStateMachine(RaftGroupId gid, ContainerDispatcher dispatcher,
.maximumSize(chunkExecutor.getCorePoolSize()).build();
this.isBlockTokenEnabled = isBlockTokenEnabled;
this.tokenVerifier = tokenVerifier;
this.createContainerSet = new ConcurrentSkipListSet<>();
this.container2BCSIDMap = new ConcurrentHashMap<>();

final int numContainerOpExecutors = conf.getInt(
OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_CONTAINER_OP_EXECUTORS_KEY,
Expand Down Expand Up @@ -244,14 +242,15 @@ private long loadSnapshot(SingleFileSnapshotInfo snapshot)
// initialize the dispatcher with snapshot so that it build the missing
// container list
try (FileInputStream fin = new FileInputStream(snapshotFile)) {
byte[] containerIds = IOUtils.toByteArray(fin);
ContainerProtos.ContainerIdSetProto proto =
ContainerProtos.ContainerIdSetProto.parseFrom(containerIds);
byte[] container2BCSIDData = IOUtils.toByteArray(fin);
ContainerProtos.Container2BCSIDMapProto proto =
ContainerProtos.Container2BCSIDMapProto
.parseFrom(container2BCSIDData);
// read the created containers list from the snapshot file and add it to
// the createContainerSet here.
// createContainerSet will further grow as and when containers get created
createContainerSet.addAll(proto.getContainerIdList());
dispatcher.buildMissingContainerSet(createContainerSet);
// the container2BCSIDMap here.
// container2BCSIDMap will further grow as and when containers get created
container2BCSIDMap.putAll(proto.getContainer2BCSIDMap());
dispatcher.buildMissingContainerSetAndValidate(container2BCSIDMap);
}
return last.getIndex();
}
Expand All @@ -263,8 +262,9 @@ private long loadSnapshot(SingleFileSnapshotInfo snapshot)
* @throws IOException
*/
public void persistContainerSet(OutputStream out) throws IOException {
ContainerIdSetProto.Builder builder = ContainerIdSetProto.newBuilder();
builder.addAllContainerId(createContainerSet);
Container2BCSIDMapProto.Builder builder =
Container2BCSIDMapProto.newBuilder();
builder.putAllContainer2BCSID(container2BCSIDMap);
// TODO : while snapshot is being taken, deleteContainer call should not
// should not happen. Lock protection will be required if delete
// container happens outside of Ratis.
Expand Down Expand Up @@ -433,7 +433,7 @@ private CompletableFuture<Message> handleWriteChunk(
.setTerm(term)
.setLogIndex(entryIndex)
.setStage(DispatcherContext.WriteChunkStage.WRITE_DATA)
.setCreateContainerSet(createContainerSet)
.setContainer2BCSIDMap(container2BCSIDMap)
.build();
// ensure the write chunk happens asynchronously in writeChunkExecutor pool
// thread.
Expand Down Expand Up @@ -697,8 +697,9 @@ public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
builder
.setStage(DispatcherContext.WriteChunkStage.COMMIT_DATA);
}
if (cmdType == Type.WriteChunk || cmdType ==Type.PutSmallFile) {
builder.setCreateContainerSet(createContainerSet);
if (cmdType == Type.WriteChunk || cmdType == Type.PutSmallFile
|| cmdType == Type.PutBlock) {
builder.setContainer2BCSIDMap(container2BCSIDMap);
}
CompletableFuture<Message> applyTransactionFuture =
new CompletableFuture<>();
Expand Down Expand Up @@ -811,7 +812,7 @@ public void notifyGroupRemove() {
// Make best effort to quasi-close all the containers on group removal.
// Containers already in terminal state like CLOSED or UNHEALTHY will not
// be affected.
for (Long cid : createContainerSet) {
for (Long cid : container2BCSIDMap.keySet()) {
try {
containerController.markContainerForClose(cid);
containerController.quasiCloseContainer(cid);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

import java.util.Set;
import java.util.Map;

/**
* DispatcherContext class holds transport protocol specific context info
Expand All @@ -45,15 +45,15 @@ public enum WriteChunkStage {
// the log index in Ratis log to which the request belongs to
private final long logIndex;

private final Set<Long> createContainerSet;
private final Map<Long, Long> container2BCSIDMap;

private DispatcherContext(long term, long index, WriteChunkStage stage,
boolean readFromTmpFile, Set<Long> containerSet) {
boolean readFromTmpFile, Map<Long, Long> container2BCSIDMap) {
this.term = term;
this.logIndex = index;
this.stage = stage;
this.readFromTmpFile = readFromTmpFile;
this.createContainerSet = containerSet;
this.container2BCSIDMap = container2BCSIDMap;
}

public long getLogIndex() {
Expand All @@ -72,8 +72,8 @@ public WriteChunkStage getStage() {
return stage;
}

public Set<Long> getCreateContainerSet() {
return createContainerSet;
public Map<Long, Long> getContainer2BCSIDMap() {
return container2BCSIDMap;
}

/**
Expand All @@ -84,7 +84,7 @@ public static final class Builder {
private boolean readFromTmpFile = false;
private long term;
private long logIndex;
private Set<Long> createContainerSet;
private Map<Long, Long> container2BCSIDMap;

/**
* Sets the WriteChunkStage.
Expand Down Expand Up @@ -131,13 +131,13 @@ public Builder setLogIndex(long index) {
}

/**
* Sets the createContainerSet to contain all the containerIds per
* Sets the container2BCSIDMap to contain all the containerIds per
* RaftGroup.
* @param set createContainerSet
* @param map container2BCSIDMap
* @return Builder
*/
public Builder setCreateContainerSet(Set<Long> set) {
this.createContainerSet = set;
public Builder setContainer2BCSIDMap(Map<Long, Long> map) {
this.container2BCSIDMap = map;
return this;
}
/**
Expand All @@ -147,7 +147,7 @@ public Builder setCreateContainerSet(Set<Long> set) {
*/
public DispatcherContext build() {
return new DispatcherContext(term, logIndex, stage, readFromTmpFile,
createContainerSet);
container2BCSIDMap);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,11 @@ public void updateBlockCommitSequenceId(long blockCommitSequenceId) {
containerData.updateBlockCommitSequenceId(blockCommitSequenceId);
}

@Override
public long getBlockCommitSequenceId() {
return containerData.getBlockCommitSequenceId();
}


/**
* Returns KeyValueContainerReport for the KeyValueContainer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,5 +282,4 @@ public List<BlockData> listBlock(Container container, long startLocalID, int
public void shutdown() {
BlockUtils.shutdownCache(ContainerCache.getInstance(config));
}

}
}
Loading