Skip to content

Commit eb88819

Browse files
author
Hrishikesh Gadre
committed
[HDDS-1200] Add support for checksum verification in data scrubber
1 parent 3c11716 commit eb88819

File tree

15 files changed

+542
-265
lines changed

15 files changed

+542
-265
lines changed

hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,15 +65,12 @@ public final class HddsConfigKeys {
6565
public static final float HDDS_CONTAINER_CLOSE_THRESHOLD_DEFAULT = 0.9f;
6666
public static final String HDDS_SCM_SAFEMODE_ENABLED =
6767
"hdds.scm.safemode.enabled";
68-
public static final String HDDS_CONTAINERSCRUB_ENABLED =
69-
"hdds.containerscrub.enabled";
70-
public static final boolean HDDS_CONTAINERSCRUB_ENABLED_DEFAULT = false;
68+
7169
public static final boolean HDDS_SCM_SAFEMODE_ENABLED_DEFAULT = true;
7270
public static final String HDDS_SCM_SAFEMODE_MIN_DATANODE =
7371
"hdds.scm.safemode.min.datanode";
7472
public static final int HDDS_SCM_SAFEMODE_MIN_DATANODE_DEFAULT = 1;
7573

76-
7774
public static final String
7875
HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT =
7976
"hdds.scm.wait.time.after.safemode.exit";

hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChecksumData.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,14 @@ public class ChecksumData {
4040
private List<ByteString> checksums;
4141

4242
public ChecksumData(ChecksumType checksumType, int bytesPerChecksum) {
43+
this(checksumType, bytesPerChecksum, Lists.newArrayList());
44+
}
45+
46+
public ChecksumData(ChecksumType checksumType, int bytesPerChecksum,
47+
List<ByteString> checksums) {
4348
this.type = checksumType;
4449
this.bytesPerChecksum = bytesPerChecksum;
45-
this.checksums = Lists.newArrayList();
50+
this.checksums = checksums;
4651
}
4752

4853
/**

hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import org.apache.hadoop.hdds.scm.container.common.helpers
3131
.StorageContainerException;
3232

33+
import org.apache.hadoop.hdfs.util.Canceler;
34+
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
3335
import org.apache.hadoop.hdfs.util.RwLock;
3436
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
3537
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
@@ -153,9 +155,27 @@ ContainerReplicaProto getContainerReport()
153155
void updateBlockCommitSequenceId(long blockCommitSequenceId);
154156

155157
/**
156-
* check and report the structural integrity of the container.
157-
* @return true if the integrity checks pass
158+
* Scan the container metadata to detect corruption.
159+
*/
160+
boolean scanMetaData();
161+
162+
/**
163+
* Return if the container data should be checksum verified to detect
164+
* corruption. The result depends upon the current state of the container
165+
* (e.g. if a container is accepting writes, it may not be a good idea to
166+
* perform checksum verification to avoid concurrency issues).
167+
*/
168+
boolean shouldScanData();
169+
170+
/**
171+
* Perform checksum verification for the container data.
172+
*
173+
* @param throttler A reference of {@link DataTransferThrottler} used to
174+
* perform I/O bandwidth throttling
175+
* @param canceler A reference of {@link Canceler} used to cancel the
176+
* I/O bandwidth throttling (e.g. for shutdown purpose).
177+
* @return true if the checksum verification succeeds
158178
* false otherwise
159179
*/
160-
boolean check();
180+
boolean scanData(DataTransferThrottler throttler, Canceler canceler);
161181
}

hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java

Lines changed: 20 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@
3939
.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
4040
import org.apache.hadoop.hdds.scm.container.common.helpers
4141
.StorageContainerException;
42+
import org.apache.hadoop.hdfs.util.Canceler;
43+
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
4244
import org.apache.hadoop.io.nativeio.NativeIO;
4345
import org.apache.hadoop.ozone.OzoneConfigKeys;
4446
import org.apache.hadoop.ozone.OzoneConsts;
@@ -671,52 +673,33 @@ public File getContainerDBFile() {
671673
.getContainerID() + OzoneConsts.DN_CONTAINER_DB);
672674
}
673675

674-
/**
675-
* run integrity checks on the Container metadata.
676-
*/
677-
public boolean check() {
678-
ContainerCheckLevel level = ContainerCheckLevel.NO_CHECK;
676+
public boolean scanMetaData() {
679677
long containerId = containerData.getContainerID();
678+
KeyValueContainerCheck checker =
679+
new KeyValueContainerCheck(containerData.getMetadataPath(), config,
680+
containerId);
681+
return checker.fastCheck();
682+
}
680683

681-
switch (containerData.getState()) {
682-
case OPEN:
683-
level = ContainerCheckLevel.FAST_CHECK;
684-
LOG.info("Doing Fast integrity checks for Container ID : {},"
685-
+ " because it is OPEN", containerId);
686-
break;
687-
case CLOSING:
688-
level = ContainerCheckLevel.FAST_CHECK;
689-
LOG.info("Doing Fast integrity checks for Container ID : {},"
690-
+ " because it is CLOSING", containerId);
691-
break;
692-
case CLOSED:
693-
case QUASI_CLOSED:
694-
level = ContainerCheckLevel.FULL_CHECK;
695-
LOG.debug("Doing Full integrity checks for Container ID : {},"
696-
+ " because it is in {} state", containerId,
697-
containerData.getState());
698-
break;
699-
default:
700-
break;
701-
}
684+
@Override
685+
public boolean shouldScanData() {
686+
return containerData.getState() == ContainerDataProto.State.CLOSED
687+
|| containerData.getState() == ContainerDataProto.State.QUASI_CLOSED;
688+
}
702689

703-
if (level == ContainerCheckLevel.NO_CHECK) {
704-
LOG.debug("Skipping integrity checks for Container Id : {}", containerId);
705-
return true;
690+
public boolean scanData(DataTransferThrottler throttler, Canceler canceler) {
691+
if (!shouldScanData()) {
692+
throw new IllegalStateException("The checksum verification can not be" +
693+
" done for container in state "
694+
+ containerData.getState());
706695
}
707696

697+
long containerId = containerData.getContainerID();
708698
KeyValueContainerCheck checker =
709699
new KeyValueContainerCheck(containerData.getMetadataPath(), config,
710700
containerId);
711701

712-
switch (level) {
713-
case FAST_CHECK:
714-
return checker.fastCheck();
715-
case FULL_CHECK:
716-
return checker.fullCheck();
717-
default:
718-
return true;
719-
}
702+
return checker.fullCheck(throttler, canceler);
720703
}
721704

722705
private enum ContainerCheckLevel {

hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java

Lines changed: 66 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,11 @@
2222
import com.google.common.primitives.Longs;
2323
import org.apache.hadoop.conf.Configuration;
2424
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
25+
import org.apache.hadoop.hdfs.util.Canceler;
26+
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
27+
import org.apache.hadoop.ozone.common.Checksum;
28+
import org.apache.hadoop.ozone.common.ChecksumData;
29+
import org.apache.hadoop.ozone.common.OzoneChecksumException;
2530
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
2631
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
2732
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
@@ -30,12 +35,15 @@
3035
import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
3136
import org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils;
3237
import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocationUtil;
38+
import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB;
3339

3440
import java.io.File;
41+
import java.io.FileInputStream;
3542
import java.io.IOException;
36-
import java.util.List;
43+
import java.io.InputStream;
44+
import java.util.Arrays;
3745

38-
import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB;
46+
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
3947
import org.slf4j.Logger;
4048
import org.slf4j.LoggerFactory;
4149

@@ -101,13 +109,13 @@ public boolean fastCheck() {
101109
*
102110
* @return true : integrity checks pass, false : otherwise.
103111
*/
104-
public boolean fullCheck() {
112+
public boolean fullCheck(DataTransferThrottler throttler, Canceler canceler) {
105113
boolean valid = false;
106114

107115
try {
108116
valid = fastCheck();
109117
if (valid) {
110-
checkBlockDB();
118+
scanData(throttler, canceler);
111119
}
112120
} catch (IOException e) {
113121
handleCorruption(e);
@@ -194,7 +202,8 @@ private void checkContainerFile() throws IOException {
194202
}
195203
}
196204

197-
private void checkBlockDB() throws IOException {
205+
private void scanData(DataTransferThrottler throttler, Canceler canceler)
206+
throws IOException {
198207
/**
199208
* Check the integrity of the DB inside each container.
200209
* In Scope:
@@ -220,43 +229,67 @@ private void checkBlockDB() throws IOException {
220229
throw new IOException(dbFileErrorMsg);
221230
}
222231

223-
224232
onDiskContainerData.setDbFile(dbFile);
225233
try(ReferenceCountedDB db =
226-
BlockUtils.getDB(onDiskContainerData, checkConfig)) {
227-
iterateBlockDB(db);
228-
}
229-
}
234+
BlockUtils.getDB(onDiskContainerData, checkConfig);
235+
KeyValueBlockIterator kvIter = new KeyValueBlockIterator(containerID,
236+
new File(onDiskContainerData.getContainerPath()))) {
230237

231-
private void iterateBlockDB(ReferenceCountedDB db)
232-
throws IOException {
233-
Preconditions.checkState(db != null);
234-
235-
// get "normal" keys from the Block DB
236-
try(KeyValueBlockIterator kvIter = new KeyValueBlockIterator(containerID,
237-
new File(onDiskContainerData.getContainerPath()))) {
238-
239-
// ensure there is a chunk file for each key in the DB
240-
while (kvIter.hasNext()) {
238+
while(kvIter.hasNext()) {
241239
BlockData block = kvIter.nextBlock();
242-
243-
List<ContainerProtos.ChunkInfo> chunkInfoList = block.getChunks();
244-
for (ContainerProtos.ChunkInfo chunk : chunkInfoList) {
245-
File chunkFile;
246-
chunkFile = ChunkUtils.getChunkFile(onDiskContainerData,
240+
for(ContainerProtos.ChunkInfo chunk : block.getChunks()) {
241+
File chunkFile = ChunkUtils.getChunkFile(onDiskContainerData,
247242
ChunkInfo.getFromProtoBuf(chunk));
248-
249243
if (!chunkFile.exists()) {
250244
// concurrent mutation in Block DB? lookup the block again.
251245
byte[] bdata = db.getStore().get(
252246
Longs.toByteArray(block.getBlockID().getLocalID()));
253-
if (bdata == null) {
254-
LOG.trace("concurrency with delete, ignoring deleted block");
255-
break; // skip to next block from kvIter
256-
} else {
257-
String errorStr = "Missing chunk file "
258-
+ chunkFile.getAbsolutePath();
259-
throw new IOException(errorStr);
247+
if (bdata != null) {
248+
throw new IOException("Missing chunk file "
249+
+ chunkFile.getAbsolutePath());
250+
}
251+
} else if (chunk.getChecksumData().getType()
252+
!= ContainerProtos.ChecksumType.NONE){
253+
int length = chunk.getChecksumData().getChecksumsList().size();
254+
ChecksumData cData = new ChecksumData(
255+
chunk.getChecksumData().getType(),
256+
chunk.getChecksumData().getBytesPerChecksum(),
257+
chunk.getChecksumData().getChecksumsList());
258+
long bytesRead = 0;
259+
byte[] buffer = new byte[cData.getBytesPerChecksum()];
260+
try (InputStream fs = new FileInputStream(chunkFile)) {
261+
int i = 0, v = 0;
262+
for (; i < length; i++) {
263+
v = fs.read(buffer);
264+
if (v == -1) {
265+
break;
266+
}
267+
bytesRead += v;
268+
throttler.throttle(v, canceler);
269+
Checksum cal = new Checksum(cData.getChecksumType(),
270+
cData.getBytesPerChecksum());
271+
ByteString expected = cData.getChecksums().get(i);
272+
ByteString actual = cal.computeChecksum(buffer)
273+
.getChecksums().get(0);
274+
if (!Arrays.equals(expected.toByteArray(),
275+
actual.toByteArray())) {
276+
throw new OzoneChecksumException(String
277+
.format("Inconsistent read for chunk=%s len=%d expected" +
278+
" checksum %s actual checksum %s for block %s",
279+
chunk.getChunkName(), chunk.getLen(),
280+
Arrays.toString(expected.toByteArray()),
281+
Arrays.toString(actual.toByteArray()),
282+
block.getBlockID()));
283+
}
284+
285+
}
286+
if (v == -1 && i < length) {
287+
throw new OzoneChecksumException(String
288+
.format("Inconsistent read for chunk=%s expected length=%d"
289+
+ " actual length=%d for block %s",
290+
chunk.getChunkName(),
291+
chunk.getLen(), bytesRead, block.getBlockID()));
292+
}
260293
}
261294
}
262295
}

hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerFactory.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,12 +65,12 @@ private static ChunkManager createChunkManager(Configuration config,
6565

6666
if (!persist) {
6767
boolean scrubber = config.getBoolean(
68-
HddsConfigKeys.HDDS_CONTAINERSCRUB_ENABLED,
69-
HddsConfigKeys.HDDS_CONTAINERSCRUB_ENABLED_DEFAULT);
68+
"hdds.containerscrub.enabled",
69+
false);
7070
if (scrubber) {
7171
// Data Scrubber needs to be disabled for non-persistent chunks.
7272
LOG.warn("Failed to set " + HDDS_CONTAINER_PERSISTDATA + " to false."
73-
+ " Please set " + HddsConfigKeys.HDDS_CONTAINERSCRUB_ENABLED
73+
+ " Please set hdds.containerscrub.enabled"
7474
+ " also to false to enable non-persistent containers.");
7575
persist = true;
7676
}

0 commit comments

Comments
 (0)