Skip to content

Commit 1a089cd

Browse files
HBASE-28485 Re-use ZstdDecompressCtx/ZstdCompressCtx for performance (apache#5797)
Co-authored-by: Charles Connell <cconnell@hubspot.com> Signed-off-by: Andrew Purtell <apurtell@apache.org> Signed-off-by: Nick Dimiduk <ndimiduk@apache.org>
1 parent bf836a9 commit 1a089cd

File tree

2 files changed

+26
-13
lines changed

2 files changed

+26
-13
lines changed

hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdCompressor.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.hadoop.hbase.io.compress.zstd;
1919

2020
import com.github.luben.zstd.Zstd;
21+
import com.github.luben.zstd.ZstdCompressCtx;
2122
import com.github.luben.zstd.ZstdDictCompress;
2223
import java.io.IOException;
2324
import java.nio.ByteBuffer;
@@ -39,16 +40,20 @@ public class ZstdCompressor implements CanReinit, Compressor {
3940
protected long bytesRead, bytesWritten;
4041
protected int dictId;
4142
protected ZstdDictCompress dict;
43+
protected ZstdCompressCtx ctx;
4244

4345
ZstdCompressor(final int level, final int bufferSize, final byte[] dictionary) {
4446
this.level = level;
4547
this.bufferSize = bufferSize;
4648
this.inBuf = ByteBuffer.allocateDirect(bufferSize);
4749
this.outBuf = ByteBuffer.allocateDirect(bufferSize);
4850
this.outBuf.position(bufferSize);
51+
this.ctx = new ZstdCompressCtx();
52+
this.ctx.setLevel(level);
4953
if (dictionary != null) {
5054
this.dictId = ZstdCodec.getDictionaryId(dictionary);
5155
this.dict = new ZstdDictCompress(dictionary, level);
56+
this.ctx.loadDict(this.dict);
5257
}
5358
}
5459

@@ -79,12 +84,7 @@ public int compress(final byte[] b, final int off, final int len) throws IOExcep
7984
} else {
8085
outBuf.clear();
8186
}
82-
int written;
83-
if (dict != null) {
84-
written = Zstd.compress(outBuf, inBuf, dict);
85-
} else {
86-
written = Zstd.compress(outBuf, inBuf, level);
87-
}
87+
int written = ctx.compress(outBuf, inBuf);
8888
bytesWritten += written;
8989
inBuf.clear();
9090
finished = true;
@@ -170,6 +170,14 @@ public void reset() {
170170
bytesWritten = 0;
171171
finish = false;
172172
finished = false;
173+
ctx.reset();
174+
ctx.setLevel(level);
175+
if (dict != null) {
176+
ctx.loadDict(dict);
177+
} else {
178+
// loadDict((byte[]) accepts null to clear the dictionary
179+
ctx.loadDict((byte[]) null);
180+
}
173181
}
174182

175183
@Override

hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdDecompressor.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
*/
1818
package org.apache.hadoop.hbase.io.compress.zstd;
1919

20-
import com.github.luben.zstd.Zstd;
20+
import com.github.luben.zstd.ZstdDecompressCtx;
2121
import com.github.luben.zstd.ZstdDictDecompress;
2222
import java.io.IOException;
2323
import java.nio.ByteBuffer;
@@ -39,15 +39,18 @@ public class ZstdDecompressor implements CanReinit, Decompressor {
3939
protected boolean finished;
4040
protected int dictId;
4141
protected ZstdDictDecompress dict;
42+
protected ZstdDecompressCtx ctx;
4243

4344
ZstdDecompressor(final int bufferSize, final byte[] dictionary) {
4445
this.bufferSize = bufferSize;
4546
this.inBuf = ByteBuffer.allocateDirect(bufferSize);
4647
this.outBuf = ByteBuffer.allocateDirect(bufferSize);
4748
this.outBuf.position(bufferSize);
49+
this.ctx = new ZstdDecompressCtx();
4850
if (dictionary != null) {
4951
this.dictId = ZstdCodec.getDictionaryId(dictionary);
5052
this.dict = new ZstdDictDecompress(dictionary);
53+
this.ctx.loadDict(this.dict);
5154
}
5255
}
5356

@@ -67,12 +70,7 @@ public int decompress(final byte[] b, final int off, final int len) throws IOExc
6770
int remaining = inBuf.remaining();
6871
inLen -= remaining;
6972
outBuf.clear();
70-
int written;
71-
if (dict != null) {
72-
written = Zstd.decompress(outBuf, inBuf, dict);
73-
} else {
74-
written = Zstd.decompress(outBuf, inBuf);
75-
}
73+
int written = ctx.decompress(outBuf, inBuf);
7674
inBuf.clear();
7775
outBuf.flip();
7876
int n = Math.min(written, len);
@@ -109,6 +107,13 @@ public void reset() {
109107
outBuf.clear();
110108
outBuf.position(outBuf.capacity());
111109
finished = false;
110+
ctx.reset();
111+
if (dict != null) {
112+
ctx.loadDict(dict);
113+
} else {
114+
// loadDict((byte[]) accepts null to clear the dictionary
115+
ctx.loadDict((byte[]) null);
116+
}
112117
}
113118

114119
@Override

0 commit comments

Comments
 (0)