Skip to content

HBASE-27053 IOException during caching of uncompressed block to the block cache #4610

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jul 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -493,18 +493,8 @@ private void overwriteHeader() {
* @return the buffer with header skipped and checksum omitted.
*/
public ByteBuff getBufferWithoutHeader() {
return this.getBufferWithoutHeader(false);
}

/**
* Returns a buffer that does not include the header or checksum.
* @param withChecksum to indicate whether include the checksum or not.
* @return the buffer with header skipped and checksum omitted.
*/
public ByteBuff getBufferWithoutHeader(boolean withChecksum) {
ByteBuff dup = getBufferReadOnly();
int delta = withChecksum ? 0 : totalChecksumBytes();
return dup.position(headerSize()).limit(buf.limit() - delta).slice();
return dup.position(headerSize()).slice();
}

/**
Expand Down Expand Up @@ -568,19 +558,21 @@ void sanityCheck() throws IOException {
sanityCheckAssertion(dup.getInt(), onDiskDataSizeWithHeader, "onDiskDataSizeWithHeader");
}

int cksumBytes = totalChecksumBytes();
int expectedBufLimit = onDiskDataSizeWithHeader + cksumBytes;
if (dup.limit() != expectedBufLimit) {
throw new AssertionError("Expected limit " + expectedBufLimit + ", got " + dup.limit());
if (dup.limit() != onDiskDataSizeWithHeader) {
throw new AssertionError(
"Expected limit " + onDiskDataSizeWithHeader + ", got " + dup.limit());
}

// We might optionally allocate HFILEBLOCK_HEADER_SIZE more bytes to read the next
// block's header, so there are two sensible values for buffer capacity.
int hdrSize = headerSize();
dup.rewind();
if (dup.remaining() != expectedBufLimit && dup.remaining() != expectedBufLimit + hdrSize) {
if (
dup.remaining() != onDiskDataSizeWithHeader
&& dup.remaining() != onDiskDataSizeWithHeader + hdrSize
) {
throw new AssertionError("Invalid buffer capacity: " + dup.remaining() + ", expected "
+ expectedBufLimit + " or " + (expectedBufLimit + hdrSize));
+ onDiskDataSizeWithHeader + " or " + (onDiskDataSizeWithHeader + hdrSize));
}
}

Expand Down Expand Up @@ -641,12 +633,13 @@ HFileBlock unpack(HFileContext fileContext, FSReader reader) throws IOException
? reader.getBlockDecodingContext()
: reader.getDefaultBlockDecodingContext();
// Create a duplicated buffer without the header part.
int headerSize = this.headerSize();
ByteBuff dup = this.buf.duplicate();
dup.position(this.headerSize());
dup.position(headerSize);
dup = dup.slice();
// Decode the dup into unpacked#buf
ctx.prepareDecoding(unpacked.getOnDiskSizeWithoutHeader(),
unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(true), dup);
ctx.prepareDecoding(unpacked.getOnDiskDataSizeWithHeader() - headerSize,
unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(), dup);
succ = true;
return unpacked;
} finally {
Expand All @@ -661,9 +654,8 @@ HFileBlock unpack(HFileContext fileContext, FSReader reader) throws IOException
* buffer. Does not change header fields. Reserve room to keep checksum bytes too.
*/
private ByteBuff allocateBufferForUnpacking() {
int cksumBytes = totalChecksumBytes();
int headerSize = headerSize();
int capacityNeeded = headerSize + uncompressedSizeWithoutHeader + cksumBytes;
int capacityNeeded = headerSize + uncompressedSizeWithoutHeader;

ByteBuff source = buf.duplicate();
ByteBuff newBuf = allocator.allocate(capacityNeeded);
Expand All @@ -682,9 +674,8 @@ private ByteBuff allocateBufferForUnpacking() {
* calculated heuristic, not tracked attribute of the block.
*/
public boolean isUnpacked() {
final int cksumBytes = totalChecksumBytes();
final int headerSize = headerSize();
final int expectedCapacity = headerSize + uncompressedSizeWithoutHeader + cksumBytes;
final int expectedCapacity = headerSize + uncompressedSizeWithoutHeader;
final int bufCapacity = buf.remaining();
return bufCapacity == expectedCapacity || bufCapacity == expectedCapacity + headerSize;
}
Expand Down Expand Up @@ -1709,6 +1700,9 @@ protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset,
if (verifyChecksum && !validateChecksum(offset, curBlock, hdrSize)) {
return null;
}
// remove checksum from buffer now that it's verified
int sizeWithoutChecksum = curBlock.getInt(Header.ON_DISK_DATA_SIZE_WITH_HEADER_INDEX);
curBlock.limit(sizeWithoutChecksum);
long duration = EnvironmentEdgeManager.currentTime() - startTime;
if (updateMetrics) {
HFile.updateReadLatency(duration, pread);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,13 @@ public void testVerifyCheckSum() throws IOException {
HFileBlock b = hbr.readBlockData(0, -1, false, false, true);
assertTrue(!b.isSharedMem());

ByteBuff bufferWithChecksum = getBufferWithChecksum(b);

// verify SingleByteBuff checksum.
verifySBBCheckSum(b.getBufferReadOnly());
verifySBBCheckSum(bufferWithChecksum);

// verify MultiByteBuff checksum.
verifyMBBCheckSum(b.getBufferReadOnly());
verifyMBBCheckSum(bufferWithChecksum);

ByteBuff data = b.getBufferWithoutHeader();
for (int i = 0; i < intCount; i++) {
Expand All @@ -169,6 +171,28 @@ public void testVerifyCheckSum() throws IOException {
}
}

/**
* HFileBlock buffer does not include checksum because it is discarded after verifying upon
* reading from disk. We artificially add a checksum onto the buffer for use in testing that
* ChecksumUtil.validateChecksum works for SingleByteBuff and MultiByteBuff in
* {@link #verifySBBCheckSum(ByteBuff)} and {@link #verifyMBBCheckSum(ByteBuff)}
*/
private ByteBuff getBufferWithChecksum(HFileBlock block) throws IOException {
ByteBuff buf = block.getBufferReadOnly();

int numBytes =
(int) ChecksumUtil.numBytes(buf.remaining(), block.getHFileContext().getBytesPerChecksum());
byte[] checksum = new byte[numBytes];
ChecksumUtil.generateChecksums(buf.array(), 0, buf.limit(), checksum, 0,
block.getHFileContext().getChecksumType(), block.getBytesPerChecksum());

ByteBuff bufWithChecksum = ByteBuffAllocator.HEAP.allocate(buf.limit() + numBytes);
bufWithChecksum.put(buf.array(), 0, buf.limit());
bufWithChecksum.put(checksum);

return bufWithChecksum.rewind();
}

/**
* Introduce checksum failures and check that we can still read the data
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -652,14 +652,13 @@ protected void testPreviousOffsetInternals() throws IOException {
// verifies that the unpacked value read back off disk matches the unpacked value
// generated before writing to disk.
HFileBlock newBlock = b.unpack(meta, hbr);
// b's buffer has header + data + checksum while
// expectedContents have header + data only
// neither b's unpacked nor the expectedContents have checksum.
// they should be identical
ByteBuff bufRead = newBlock.getBufferReadOnly();
ByteBuffer bufExpected = expectedContents.get(i);
byte[] tmp = new byte[bufRead.limit() - newBlock.totalChecksumBytes()];
bufRead.get(tmp, 0, tmp.length);
boolean bytesAreCorrect = Bytes.compareTo(tmp, 0, tmp.length, bufExpected.array(),
bufExpected.arrayOffset(), bufExpected.limit()) == 0;
byte[] bytesRead = bufRead.toBytes();
boolean bytesAreCorrect = Bytes.compareTo(bytesRead, 0, bytesRead.length,
bufExpected.array(), bufExpected.arrayOffset(), bufExpected.limit()) == 0;
String wrongBytesMsg = "";

if (!bytesAreCorrect) {
Expand All @@ -669,8 +668,7 @@ protected void testPreviousOffsetInternals() throws IOException {
+ pread + ", cacheOnWrite=" + cacheOnWrite + "):\n";
wrongBytesMsg += Bytes.toStringBinary(bufExpected.array(),
bufExpected.arrayOffset(), Math.min(32 + 10, bufExpected.limit())) + ", actual:\n"
+ Bytes.toStringBinary(bufRead.array(), bufRead.arrayOffset(),
Math.min(32 + 10, bufRead.limit()));
+ Bytes.toStringBinary(bytesRead, 0, Math.min(32 + 10, bytesRead.length));
if (detailedLogging) {
LOG.warn(
"expected header" + HFileBlock.toStringHeader(new SingleByteBuff(bufExpected))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@
*/
package org.apache.hadoop.hbase.io.hfile;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
Expand All @@ -30,11 +33,13 @@
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
Expand Down Expand Up @@ -71,7 +76,62 @@ public void setUp() throws Exception {
Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
conf.setInt(ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY, MIN_ALLOCATION_SIZE);
allocator = ByteBuffAllocator.create(conf, true);
}

/**
* It's important that if you read and unpack the same HFileBlock twice, it results in an
* identical buffer each time. Otherwise we end up with validation failures in block cache, since
* contents may not match if the same block is cached twice. See
* https://issues.apache.org/jira/browse/HBASE-27053
*/
@Test
public void itUnpacksIdenticallyEachTime() throws IOException {
Path path = new Path(TEST_UTIL.getDataTestDir(), name.getMethodName());
int totalSize = createTestBlock(path);

// Allocate a bunch of random buffers, so we can be sure that unpack will only have "dirty"
// buffers to choose from when allocating itself.
Random random = new Random();
byte[] temp = new byte[HConstants.DEFAULT_BLOCKSIZE];
List<ByteBuff> buffs = new ArrayList<>();
for (int i = 0; i < 10; i++) {
ByteBuff buff = allocator.allocate(HConstants.DEFAULT_BLOCKSIZE);
random.nextBytes(temp);
buff.put(temp);
buffs.add(buff);
}

buffs.forEach(ByteBuff::release);

// read the same block twice. we should expect the underlying buffer below to
// be identical each time
HFileBlockWrapper blockOne = readBlock(path, totalSize);
HFileBlockWrapper blockTwo = readBlock(path, totalSize);

// first check size fields
assertEquals(blockOne.original.getOnDiskSizeWithHeader(),
blockTwo.original.getOnDiskSizeWithHeader());
assertEquals(blockOne.original.getUncompressedSizeWithoutHeader(),
blockTwo.original.getUncompressedSizeWithoutHeader());

// next check packed buffers
assertBuffersEqual(blockOne.original.getBufferWithoutHeader(),
blockTwo.original.getBufferWithoutHeader(),
blockOne.original.getOnDiskDataSizeWithHeader() - blockOne.original.headerSize());

// now check unpacked buffers. prior to HBASE-27053, this would fail because
// the unpacked buffer would include extra space for checksums at the end that was not written.
// so the checksum space would be filled with random junk when re-using pooled buffers.
assertBuffersEqual(blockOne.unpacked.getBufferWithoutHeader(),
blockTwo.unpacked.getBufferWithoutHeader(),
blockOne.original.getUncompressedSizeWithoutHeader());
}

private void assertBuffersEqual(ByteBuff bufferOne, ByteBuff bufferTwo, int expectedSize) {
assertEquals(expectedSize, bufferOne.limit());
assertEquals(expectedSize, bufferTwo.limit());
assertEquals(0,
ByteBuff.compareTo(bufferOne, 0, bufferOne.limit(), bufferTwo, 0, bufferTwo.limit()));
}

/**
Expand All @@ -83,53 +143,75 @@ public void setUp() throws Exception {
*/
@Test
public void itUsesSharedMemoryIfUnpackedBlockExceedsMinAllocationSize() throws IOException {
Configuration conf = TEST_UTIL.getConfiguration();
HFileContext meta =
new HFileContextBuilder().withCompression(Compression.Algorithm.GZ).withIncludesMvcc(false)
.withIncludesTags(false).withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM).build();

Path path = new Path(TEST_UTIL.getDataTestDir(), name.getMethodName());
int totalSize;
try (FSDataOutputStream os = fs.create(path)) {
HFileBlock.Writer hbw = new HFileBlock.Writer(conf, NoOpDataBlockEncoder.INSTANCE, meta);
hbw.startWriting(BlockType.DATA);
writeTestKeyValues(hbw, MIN_ALLOCATION_SIZE - 1);
hbw.writeHeaderAndData(os);
totalSize = hbw.getOnDiskSizeWithHeader();
assertTrue(
"expected generated block size " + totalSize + " to be less than " + MIN_ALLOCATION_SIZE,
totalSize < MIN_ALLOCATION_SIZE);
int totalSize = createTestBlock(path);
HFileBlockWrapper blockFromHFile = readBlock(path, totalSize);

assertFalse("expected hfile block to NOT be unpacked", blockFromHFile.original.isUnpacked());
assertFalse("expected hfile block to NOT use shared memory",
blockFromHFile.original.isSharedMem());

assertTrue(
"expected generated block size " + blockFromHFile.original.getOnDiskSizeWithHeader()
+ " to be less than " + MIN_ALLOCATION_SIZE,
blockFromHFile.original.getOnDiskSizeWithHeader() < MIN_ALLOCATION_SIZE);
assertTrue(
"expected generated block uncompressed size "
+ blockFromHFile.original.getUncompressedSizeWithoutHeader() + " to be more than "
+ MIN_ALLOCATION_SIZE,
blockFromHFile.original.getUncompressedSizeWithoutHeader() > MIN_ALLOCATION_SIZE);

assertTrue("expected unpacked block to be unpacked", blockFromHFile.unpacked.isUnpacked());
assertTrue("expected unpacked block to use shared memory",
blockFromHFile.unpacked.isSharedMem());
}

private final static class HFileBlockWrapper {
private final HFileBlock original;
private final HFileBlock unpacked;

private HFileBlockWrapper(HFileBlock original, HFileBlock unpacked) {
this.original = original;
this.unpacked = unpacked;
}
}

private HFileBlockWrapper readBlock(Path path, int totalSize) throws IOException {
try (FSDataInputStream is = fs.open(path)) {
meta =
HFileContext meta =
new HFileContextBuilder().withHBaseCheckSum(true).withCompression(Compression.Algorithm.GZ)
.withIncludesMvcc(false).withIncludesTags(false).build();
ReaderContext context =
new ReaderContextBuilder().withInputStreamWrapper(new FSDataInputStreamWrapper(is))
.withFileSize(totalSize).withFilePath(path).withFileSystem(fs).build();
HFileBlock.FSReaderImpl hbr = new HFileBlock.FSReaderImpl(context, meta, allocator, conf);
hbr.setDataBlockEncoder(NoOpDataBlockEncoder.INSTANCE, conf);
HFileBlock.FSReaderImpl hbr =
new HFileBlock.FSReaderImpl(context, meta, allocator, TEST_UTIL.getConfiguration());
hbr.setDataBlockEncoder(NoOpDataBlockEncoder.INSTANCE, TEST_UTIL.getConfiguration());
hbr.setIncludesMemStoreTS(false);
HFileBlock blockFromHFile = hbr.readBlockData(0, -1, false, false, false);
blockFromHFile.sanityCheck();
assertFalse("expected hfile block to NOT be unpacked", blockFromHFile.isUnpacked());
assertFalse("expected hfile block to NOT use shared memory", blockFromHFile.isSharedMem());
return new HFileBlockWrapper(blockFromHFile, blockFromHFile.unpack(meta, hbr));
}
}

private int createTestBlock(Path path) throws IOException {
HFileContext meta =
new HFileContextBuilder().withCompression(Compression.Algorithm.GZ).withIncludesMvcc(false)
.withIncludesTags(false).withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM).build();

int totalSize;
try (FSDataOutputStream os = fs.create(path)) {
HFileBlock.Writer hbw =
new HFileBlock.Writer(TEST_UTIL.getConfiguration(), NoOpDataBlockEncoder.INSTANCE, meta);
hbw.startWriting(BlockType.DATA);
writeTestKeyValues(hbw, MIN_ALLOCATION_SIZE - 1);
hbw.writeHeaderAndData(os);
totalSize = hbw.getOnDiskSizeWithHeader();
assertTrue(
"expected generated block size " + blockFromHFile.getOnDiskSizeWithHeader()
+ " to be less than " + MIN_ALLOCATION_SIZE,
blockFromHFile.getOnDiskSizeWithHeader() < MIN_ALLOCATION_SIZE);
assertTrue(
"expected generated block uncompressed size "
+ blockFromHFile.getUncompressedSizeWithoutHeader() + " to be more than "
+ MIN_ALLOCATION_SIZE,
blockFromHFile.getUncompressedSizeWithoutHeader() > MIN_ALLOCATION_SIZE);

HFileBlock blockUnpacked = blockFromHFile.unpack(meta, hbr);
assertTrue("expected unpacked block to be unpacked", blockUnpacked.isUnpacked());
assertTrue("expected unpacked block to use shared memory", blockUnpacked.isSharedMem());
"expected generated block size " + totalSize + " to be less than " + MIN_ALLOCATION_SIZE,
totalSize < MIN_ALLOCATION_SIZE);
}
return totalSize;
}

static int writeTestKeyValues(HFileBlock.Writer hbw, int desiredSize) throws IOException {
Expand Down