Skip to content

Commit 8df3212

Browse files
committed
HBASE-27621 Also clear the Dictionary when resetting when reading compressed WAL file (#5016)
Signed-off-by: Xiaolin Ha <haxiaolin@apache.org> (cherry picked from commit 833b10e)
1 parent af6b63a commit 8df3212

File tree

7 files changed

+308
-9
lines changed

7 files changed

+308
-9
lines changed

hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,15 +106,15 @@ public void uncompressTags(InputStream src, byte[] dest, int offset, int length)
106106
throws IOException {
107107
int endOffset = offset + length;
108108
while (offset < endOffset) {
109-
byte status = (byte) src.read();
109+
byte status = StreamUtils.readByte(src);
110110
if (status == Dictionary.NOT_IN_DICTIONARY) {
111111
int tagLen = StreamUtils.readRawVarint32(src);
112112
offset = Bytes.putAsShort(dest, offset, tagLen);
113113
IOUtils.readFully(src, dest, offset, tagLen);
114114
tagDict.addEntry(dest, offset, tagLen);
115115
offset += tagLen;
116116
} else {
117-
short dictIdx = StreamUtils.toShort(status, (byte) src.read());
117+
short dictIdx = StreamUtils.toShort(status, StreamUtils.readByte(src));
118118
byte[] entry = tagDict.getEntry(dictIdx);
119119
if (entry == null) {
120120
throw new IOException("Missing dictionary entry for index " + dictIdx);

hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.hadoop.hbase.io.util;
1919

20+
import java.io.EOFException;
2021
import java.io.IOException;
2122
import java.io.InputStream;
2223
import java.io.OutputStream;
@@ -206,6 +207,22 @@ public static Pair<Integer, Integer> readRawVarint32(ByteBuffer input, int offse
206207
return new Pair<>(result, newOffset - offset);
207208
}
208209

210+
/**
211+
* Read a byte from the given stream using the read method, and throw EOFException if it returns
212+
* -1, like the implementation in {@code DataInputStream}.
213+
* <p/>
214+
* This is useful because casting the return value of read method into byte directly will make us
215+
* lose the ability to check whether there is a byte and its value is -1 or we reach EOF, as
216+
* casting int -1 to byte also returns -1.
217+
*/
218+
public static byte readByte(InputStream in) throws IOException {
219+
int r = in.read();
220+
if (r < 0) {
221+
throw new EOFException();
222+
}
223+
return (byte) r;
224+
}
225+
209226
public static short toShort(byte hi, byte lo) {
210227
short s = (short) (((hi & 0xFF) << 8) | (lo & 0xFF));
211228
Preconditions.checkArgument(s >= 0);

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,9 @@ static byte[] readCompressed(DataInput in, Dictionary dict) throws IOException {
108108
// if this isn't in the dictionary, we need to add to the dictionary.
109109
byte[] arr = new byte[length];
110110
in.readFully(arr);
111-
if (dict != null) dict.addEntry(arr, 0, length);
111+
if (dict != null) {
112+
dict.addEntry(arr, 0, length);
113+
}
112114
return arr;
113115
} else {
114116
// Status here is the higher-order byte of index of the dictionary entry

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,9 @@ public class ProtobufLogReader extends ReaderBase {
9595
// cell codec classname
9696
private String codecClsName = null;
9797

98+
// a flag indicate that whether we need to reset compression context when seeking back
99+
private boolean resetCompression;
100+
98101
@InterfaceAudience.Private
99102
public long trailerSize() {
100103
if (trailerPresent) {
@@ -157,6 +160,9 @@ public long getPosition() throws IOException {
157160
@Override
158161
public void reset() throws IOException {
159162
String clsName = initInternal(null, false);
163+
if (resetCompression) {
164+
resetCompression();
165+
}
160166
initAfterCompression(clsName); // We need a new decoder (at least).
161167
}
162168

@@ -339,6 +345,8 @@ protected boolean readNext(Entry entry) throws IOException {
339345
WALKey.Builder builder = WALKey.newBuilder();
340346
long size = 0;
341347
boolean resetPosition = false;
348+
// by default, we should reset the compression when seeking back after reading something
349+
resetCompression = true;
342350
try {
343351
long available = -1;
344352
try {
@@ -350,6 +358,14 @@ protected boolean readNext(Entry entry) throws IOException {
350358
// available may be < 0 on local fs for instance. If so, can't depend on it.
351359
available = this.inputStream.available();
352360
if (available > 0 && available < size) {
361+
// if we quit here, we have just read the length, no actual data yet, which means we
362+
// haven't put anything into the compression dictionary yet, so when seeking back to the
363+
// last good position, we do not need to reset compression context.
364+
// This is very useful for saving the extra effort for reconstructing the compression
365+
// dictionary, where we need to read from the beginning instead of just seek to the
366+
// position, as DFSInputStream implement the available method, so in most cases we will
367+
// reach here if there are not enough data.
368+
resetCompression = false;
353369
throw new EOFException("Available stream not enough for edit, "
354370
+ "inputStream.available()= " + this.inputStream.available() + ", " + "entry size= "
355371
+ size + " at offset = " + this.inputStream.getPos());

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public abstract class ReaderBase implements AbstractFSWALProvider.Reader {
4444
* Compression context to use reading. Can be null if no compression.
4545
*/
4646
protected CompressionContext compressionContext = null;
47-
protected boolean emptyCompressionContext = true;
47+
private boolean emptyCompressionContext = true;
4848

4949
/**
5050
* Default constructor.
@@ -121,6 +121,17 @@ public void seek(long pos) throws IOException {
121121
seekOnFs(pos);
122122
}
123123

124+
/**
125+
* Clear the {@link ReaderBase#compressionContext}, and also set {@link #emptyCompressionContext}
126+
* to true, so when seeking, we will try to skip to the position and reconstruct the dictionary.
127+
*/
128+
protected final void resetCompression() {
129+
if (compressionContext != null) {
130+
compressionContext.clear();
131+
emptyCompressionContext = true;
132+
}
133+
}
134+
124135
/**
125136
* Initializes the log reader with a particular stream (may be null). Reader assumes ownership of
126137
* the stream if not null and may use it. Called once.

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -197,18 +197,20 @@ public byte[] uncompress(ByteString data, Enum dictIndex) {
197197

198198
private static byte[] uncompressByteString(ByteString bs, Dictionary dict) throws IOException {
199199
InputStream in = bs.newInput();
200-
byte status = (byte) in.read();
200+
byte status = StreamUtils.readByte(in);
201201
if (status == Dictionary.NOT_IN_DICTIONARY) {
202202
byte[] arr = new byte[StreamUtils.readRawVarint32(in)];
203203
int bytesRead = in.read(arr);
204204
if (bytesRead != arr.length) {
205205
throw new IOException("Cannot read; wanted " + arr.length + ", but got " + bytesRead);
206206
}
207-
if (dict != null) dict.addEntry(arr, 0, arr.length);
207+
if (dict != null) {
208+
dict.addEntry(arr, 0, arr.length);
209+
}
208210
return arr;
209211
} else {
210212
// Status here is the higher-order byte of index of the dictionary entry.
211-
short dictIdx = StreamUtils.toShort(status, (byte) in.read());
213+
short dictIdx = StreamUtils.toShort(status, StreamUtils.readByte(in));
212214
byte[] entry = dict.getEntry(dictIdx);
213215
if (entry == null) {
214216
throw new IOException("Missing dictionary entry for index " + dictIdx);
@@ -322,7 +324,7 @@ protected Cell parseCell() throws IOException {
322324
}
323325

324326
private int readIntoArray(byte[] to, int offset, Dictionary dict) throws IOException {
325-
byte status = (byte) in.read();
327+
byte status = StreamUtils.readByte(in);
326328
if (status == Dictionary.NOT_IN_DICTIONARY) {
327329
// status byte indicating that data to be read is not in dictionary.
328330
// if this isn't in the dictionary, we need to add to the dictionary.
@@ -332,7 +334,7 @@ private int readIntoArray(byte[] to, int offset, Dictionary dict) throws IOExcep
332334
return length;
333335
} else {
334336
// the status byte also acts as the higher order byte of the dictionary entry.
335-
short dictIdx = StreamUtils.toShort(status, (byte) in.read());
337+
short dictIdx = StreamUtils.toShort(status, StreamUtils.readByte(in));
336338
byte[] entry = dict.getEntry(dictIdx);
337339
if (entry == null) {
338340
throw new IOException("Missing dictionary entry for index " + dictIdx);

0 commit comments

Comments
 (0)