Skip to content

[HDDS-1200] Add support for checksum verification in data scrubber #1154

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,12 @@ public final class HddsConfigKeys {
public static final float HDDS_CONTAINER_CLOSE_THRESHOLD_DEFAULT = 0.9f;
public static final String HDDS_SCM_SAFEMODE_ENABLED =
"hdds.scm.safemode.enabled";
public static final String HDDS_CONTAINERSCRUB_ENABLED =
"hdds.containerscrub.enabled";
public static final boolean HDDS_CONTAINERSCRUB_ENABLED_DEFAULT = false;

public static final boolean HDDS_SCM_SAFEMODE_ENABLED_DEFAULT = true;
public static final String HDDS_SCM_SAFEMODE_MIN_DATANODE =
"hdds.scm.safemode.min.datanode";
public static final int HDDS_SCM_SAFEMODE_MIN_DATANODE_DEFAULT = 1;


public static final String
HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT =
"hdds.scm.wait.time.after.safemode.exit";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,14 @@ public class ChecksumData {
private List<ByteString> checksums;

public ChecksumData(ChecksumType checksumType, int bytesPerChecksum) {
this(checksumType, bytesPerChecksum, Lists.newArrayList());
}

public ChecksumData(ChecksumType checksumType, int bytesPerChecksum,
List<ByteString> checksums) {
this.type = checksumType;
this.bytesPerChecksum = bytesPerChecksum;
this.checksums = Lists.newArrayList();
this.checksums = checksums;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.apache.hadoop.hdds.scm.container.common.helpers
.StorageContainerException;

import org.apache.hadoop.hdfs.util.Canceler;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.hdfs.util.RwLock;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
Expand Down Expand Up @@ -153,9 +155,27 @@ ContainerReplicaProto getContainerReport()
void updateBlockCommitSequenceId(long blockCommitSequenceId);

/**
* check and report the structural integrity of the container.
* @return true if the integrity checks pass
* Scan the container metadata to detect corruption.
*/
boolean scanMetaData();

/**
* Return if the container data should be checksum verified to detect
* corruption. The result depends upon the current state of the container
* (e.g. if a container is accepting writes, it may not be a good idea to
* perform checksum verification to avoid concurrency issues).
*/
boolean shouldScanData();

/**
* Perform checksum verification for the container data.
*
* @param throttler A reference of {@link DataTransferThrottler} used to
* perform I/O bandwidth throttling
* @param canceler A reference of {@link Canceler} used to cancel the
* I/O bandwidth throttling (e.g. for shutdown purpose).
* @return true if the checksum verification succeeds
* false otherwise
*/
boolean check();
boolean scanData(DataTransferThrottler throttler, Canceler canceler);
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import org.apache.hadoop.hdds.scm.container.common.helpers
.StorageContainerException;
import org.apache.hadoop.hdfs.util.Canceler;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
Expand Down Expand Up @@ -671,52 +673,33 @@ public File getContainerDBFile() {
.getContainerID() + OzoneConsts.DN_CONTAINER_DB);
}

/**
* run integrity checks on the Container metadata.
*/
public boolean check() {
ContainerCheckLevel level = ContainerCheckLevel.NO_CHECK;
public boolean scanMetaData() {
long containerId = containerData.getContainerID();
KeyValueContainerCheck checker =
new KeyValueContainerCheck(containerData.getMetadataPath(), config,
containerId);
return checker.fastCheck();
}

switch (containerData.getState()) {
case OPEN:
level = ContainerCheckLevel.FAST_CHECK;
LOG.info("Doing Fast integrity checks for Container ID : {},"
+ " because it is OPEN", containerId);
break;
case CLOSING:
level = ContainerCheckLevel.FAST_CHECK;
LOG.info("Doing Fast integrity checks for Container ID : {},"
+ " because it is CLOSING", containerId);
break;
case CLOSED:
case QUASI_CLOSED:
level = ContainerCheckLevel.FULL_CHECK;
LOG.debug("Doing Full integrity checks for Container ID : {},"
+ " because it is in {} state", containerId,
containerData.getState());
break;
default:
break;
}
@Override
public boolean shouldScanData() {
return containerData.getState() == ContainerDataProto.State.CLOSED
|| containerData.getState() == ContainerDataProto.State.QUASI_CLOSED;
}

if (level == ContainerCheckLevel.NO_CHECK) {
LOG.debug("Skipping integrity checks for Container Id : {}", containerId);
return true;
public boolean scanData(DataTransferThrottler throttler, Canceler canceler) {
if (!shouldScanData()) {
throw new IllegalStateException("The checksum verification can not be" +
" done for container in state "
+ containerData.getState());
}

long containerId = containerData.getContainerID();
KeyValueContainerCheck checker =
new KeyValueContainerCheck(containerData.getMetadataPath(), config,
containerId);

switch (level) {
case FAST_CHECK:
return checker.fastCheck();
case FULL_CHECK:
return checker.fullCheck();
default:
return true;
}
return checker.fullCheck(throttler, canceler);
}

private enum ContainerCheckLevel {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
import com.google.common.primitives.Longs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdfs.util.Canceler;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.ozone.common.ChecksumData;
import org.apache.hadoop.ozone.common.OzoneChecksumException;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
Expand All @@ -30,12 +35,15 @@
import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
import org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils;
import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocationUtil;
import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.List;
import java.io.InputStream;
import java.util.Arrays;

import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -101,13 +109,13 @@ public boolean fastCheck() {
*
* @return true : integrity checks pass, false : otherwise.
*/
public boolean fullCheck() {
public boolean fullCheck(DataTransferThrottler throttler, Canceler canceler) {
boolean valid = false;

try {
valid = fastCheck();
if (valid) {
checkBlockDB();
scanData(throttler, canceler);
}
} catch (IOException e) {
handleCorruption(e);
Expand Down Expand Up @@ -194,7 +202,8 @@ private void checkContainerFile() throws IOException {
}
}

private void checkBlockDB() throws IOException {
private void scanData(DataTransferThrottler throttler, Canceler canceler)
throws IOException {
/**
* Check the integrity of the DB inside each container.
* In Scope:
Expand All @@ -220,43 +229,67 @@ private void checkBlockDB() throws IOException {
throw new IOException(dbFileErrorMsg);
}


onDiskContainerData.setDbFile(dbFile);
try(ReferenceCountedDB db =
BlockUtils.getDB(onDiskContainerData, checkConfig)) {
iterateBlockDB(db);
}
}
BlockUtils.getDB(onDiskContainerData, checkConfig);
KeyValueBlockIterator kvIter = new KeyValueBlockIterator(containerID,
new File(onDiskContainerData.getContainerPath()))) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please run this in a profiler mode -- and make sure there are no memory leaks in this code path. Nothing to do with your patch at all. Just that we have found some issues here earlier. Just run it under something like VisualVM and see if we release all memory when get out of the loop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


private void iterateBlockDB(ReferenceCountedDB db)
throws IOException {
Preconditions.checkState(db != null);

// get "normal" keys from the Block DB
try(KeyValueBlockIterator kvIter = new KeyValueBlockIterator(containerID,
new File(onDiskContainerData.getContainerPath()))) {

// ensure there is a chunk file for each key in the DB
while (kvIter.hasNext()) {
while(kvIter.hasNext()) {
BlockData block = kvIter.nextBlock();

List<ContainerProtos.ChunkInfo> chunkInfoList = block.getChunks();
for (ContainerProtos.ChunkInfo chunk : chunkInfoList) {
File chunkFile;
chunkFile = ChunkUtils.getChunkFile(onDiskContainerData,
for(ContainerProtos.ChunkInfo chunk : block.getChunks()) {
File chunkFile = ChunkUtils.getChunkFile(onDiskContainerData,
ChunkInfo.getFromProtoBuf(chunk));

if (!chunkFile.exists()) {
// concurrent mutation in Block DB? lookup the block again.
byte[] bdata = db.getStore().get(
Longs.toByteArray(block.getBlockID().getLocalID()));
if (bdata == null) {
LOG.trace("concurrency with delete, ignoring deleted block");
break; // skip to next block from kvIter
} else {
String errorStr = "Missing chunk file "
+ chunkFile.getAbsolutePath();
throw new IOException(errorStr);
if (bdata != null) {
throw new IOException("Missing chunk file "
+ chunkFile.getAbsolutePath());
}
} else if (chunk.getChecksumData().getType()
!= ContainerProtos.ChecksumType.NONE){
int length = chunk.getChecksumData().getChecksumsList().size();
ChecksumData cData = new ChecksumData(
chunk.getChecksumData().getType(),
chunk.getChecksumData().getBytesPerChecksum(),
chunk.getChecksumData().getChecksumsList());
long bytesRead = 0;
byte[] buffer = new byte[cData.getBytesPerChecksum()];
try (InputStream fs = new FileInputStream(chunkFile)) {
int i = 0, v = 0;
for (; i < length; i++) {
v = fs.read(buffer);
if (v == -1) {
break;
}
bytesRead += v;
throttler.throttle(v, canceler);
Checksum cal = new Checksum(cData.getChecksumType(),
cData.getBytesPerChecksum());
ByteString expected = cData.getChecksums().get(i);
ByteString actual = cal.computeChecksum(buffer)
.getChecksums().get(0);
if (!Arrays.equals(expected.toByteArray(),
actual.toByteArray())) {
throw new OzoneChecksumException(String
.format("Inconsistent read for chunk=%s len=%d expected" +
" checksum %s actual checksum %s for block %s",
chunk.getChunkName(), chunk.getLen(),
Arrays.toString(expected.toByteArray()),
Arrays.toString(actual.toByteArray()),
block.getBlockID()));
}

}
if (v == -1 && i < length) {
throw new OzoneChecksumException(String
.format("Inconsistent read for chunk=%s expected length=%d"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be a good idea to log the blockId as well in the exception msg.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

+ " actual length=%d for block %s",
chunk.getChunkName(),
chunk.getLen(), bytesRead, block.getBlockID()));
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,12 @@ private static ChunkManager createChunkManager(Configuration config,

if (!persist) {
boolean scrubber = config.getBoolean(
HddsConfigKeys.HDDS_CONTAINERSCRUB_ENABLED,
HddsConfigKeys.HDDS_CONTAINERSCRUB_ENABLED_DEFAULT);
"hdds.containerscrub.enabled",
false);
if (scrubber) {
// Data Scrubber needs to be disabled for non-persistent chunks.
LOG.warn("Failed to set " + HDDS_CONTAINER_PERSISTDATA + " to false."
+ " Please set " + HddsConfigKeys.HDDS_CONTAINERSCRUB_ENABLED
+ " Please set hdds.containerscrub.enabled"
+ " also to false to enable non-persistent containers.");
persist = true;
}
Expand Down
Loading