Skip to content

Commit dc87785

Browse files
committed
HPCC-34656 Allow BlockCompressed nodes to have configurable compression
- Updated hybrid index compressor to use blockcompressed for leafs - Modified BlockCompressed nodes to make compression configurable - Removed uncompressed pathways from BlockCompressed Signed-off-by: James McMullan James.McMullan@lexisnexis.com
1 parent 979377a commit dc87785

File tree

5 files changed

+123
-235
lines changed

5 files changed

+123
-235
lines changed

system/jhtree/ctfile.hpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,9 @@
4040
#define USE_TRAILING_HEADER 0x80 // Real index header node located at end of file
4141
#define HTREE_COMPRESSED_KEY 0x40
4242
#define HTREE_QUICK_COMPRESSED_KEY 0x48
43-
#define KEYBUILD_VERSION 2 // unsigned short. NB: This should upped if a change would make existing keys incompatible with current build.
44-
// We can read indexes at versions 1 or 2
45-
// We build indexes with version set to 1 if they are compatible with version 1 readers, otherwise 2
43+
#define KEYBUILD_VERSION 3 // unsigned short. NB: This should upped if a change would make existing keys incompatible with current build.
44+
// We can read indexes at versions 1, 2 or 3
45+
// We build indexes with version set to 1 if they are compatible with version 1 readers, 2 for inplace indexes, and 3 for hybrid indexes
4646
#define KEYBUILD_MAXLENGTH 0x7FFF
4747

4848
// structure to be read into - NO VIRTUALS.

system/jhtree/jhblockcompressed.cpp

Lines changed: 79 additions & 219 deletions
Original file line numberDiff line numberDiff line change
@@ -83,177 +83,59 @@ int CJHBlockCompressedSearchNode::locateGT(const char * search, unsigned minInde
8383
return a;
8484
}
8585

86+
char *CJHBlockCompressedSearchNode::expandBlock(const void *src, size32_t &decompressedSize, CompressionMethod compressionMethod)
87+
{
88+
ICompressHandler * handler = queryCompressHandler(compressionMethod);
89+
if (!handler)
90+
throw makeStringExceptionV(JHTREE_KEY_UNKNOWN_COMPRESSION, "Unknown payload compression method %d", (int)compressionMethod);
91+
92+
const char * options = nullptr;
93+
Owned<IExpander> exp = handler->getExpander(options);
94+
95+
int len=exp->init(src);
96+
if (len==0)
97+
{
98+
decompressedSize = 0;
99+
return NULL;
100+
}
101+
char *outkeys=(char *) allocMem(len);
102+
exp->expand(outkeys);
103+
decompressedSize = len;
104+
return outkeys;
105+
}
106+
86107
void CJHBlockCompressedSearchNode::load(CKeyHdr *_keyHdr, const void *rawData, offset_t _fpos, bool needCopy)
87108
{
88109
CJHSearchNode::load(_keyHdr, rawData, _fpos, needCopy);
89-
keyLen = _keyHdr->getMaxKeyLength();
110+
111+
keyLen = keyHdr->getMaxKeyLength();
90112
keyCompareLen = _keyHdr->getNodeKeyLength();
91-
if (hdr.nodeType == NodeBranch)
92-
keyLen = keyHdr->getNodeKeyLength();
93113
keyRecLen = keyLen + sizeof(offset_t);
94-
char keyType = keyHdr->getKeyType();
114+
95115
const char *keys = ((const char *) rawData) + sizeof(hdr);
96-
if (hdr.nodeType==NodeLeaf)
97-
{
98-
firstSequence = *(unsigned __int64 *) keys;
99-
keys += sizeof(unsigned __int64);
100-
_WINREV(firstSequence);
101-
}
102116

103-
CCycleTimer expansionTimer(isLeaf());
104-
if (hdr.nodeType==NodeLeaf && (keyType & HTREE_COMPRESSED_KEY))
105-
{
106-
inMemorySize = keyHdr->getNodeSize();
107-
if ((keyType & (HTREE_QUICK_COMPRESSED_KEY|HTREE_VARSIZE))==HTREE_QUICK_COMPRESSED_KEY)
108-
keyBuf = nullptr;
109-
else
110-
keyBuf = expandData(keys, inMemorySize);
111-
}
112-
else
113-
{
114-
if (hdr.numKeys)
115-
{
116-
bool handleVariable = keyHdr->isVariable() && isLeaf();
117-
KEYRECSIZE_T workRecLen;
118-
MemoryBuffer keyBufMb;
119-
const char *source = keys;
120-
char *target;
121-
// do first row
122-
if (handleVariable) {
123-
memcpy(&workRecLen, source, sizeof(workRecLen));
124-
_WINREV(workRecLen);
125-
size32_t tmpSz = sizeof(workRecLen) + sizeof(offset_t);
126-
target = (char *)keyBufMb.reserve(tmpSz+workRecLen);
127-
memcpy(target, source, tmpSz);
128-
source += tmpSz;
129-
target += tmpSz;
130-
}
131-
else {
132-
target = (char *)keyBufMb.reserveTruncate(hdr.numKeys * keyRecLen);
133-
workRecLen = keyRecLen - sizeof(offset_t);
134-
memcpy(target, source, sizeof(offset_t));
135-
source += sizeof(offset_t);
136-
target += sizeof(offset_t);
137-
}
138-
139-
// this is where next row gets data from
140-
const char *prev, *next = NULL;
141-
unsigned prevOffset = 0;
142-
if (handleVariable)
143-
prevOffset = target-((char *)keyBufMb.bufferBase());
144-
else
145-
next = target;
146-
147-
unsigned char pack1 = *source++;
148-
assert(0==pack1); // 1st time will be always be 0
149-
KEYRECSIZE_T left = workRecLen;
150-
while (left--) {
151-
*target = *source;
152-
source++;
153-
target++;
154-
}
155-
// do subsequent rows
156-
for (int i = 1; i < hdr.numKeys; i++) {
157-
if (handleVariable) {
158-
memcpy(&workRecLen, source, sizeof(workRecLen));
159-
_WINREV(workRecLen);
160-
target = (char *)keyBufMb.reserve(sizeof(workRecLen)+sizeof(offset_t)+workRecLen);
161-
size32_t tmpSz = sizeof(workRecLen)+sizeof(offset_t);
162-
memcpy(target, source, tmpSz);
163-
target += tmpSz;
164-
source += tmpSz;
165-
}
166-
else
167-
{
168-
memcpy(target, source, sizeof(offset_t));
169-
source += sizeof(offset_t);
170-
target += sizeof(offset_t);
171-
}
172-
pack1 = *source++;
173-
assert(pack1<=workRecLen);
174-
if (handleVariable) {
175-
prev = ((char *)keyBufMb.bufferBase())+prevOffset;
176-
// for next
177-
prevOffset = target-((char *)keyBufMb.bufferBase());
178-
}
179-
else {
180-
prev = next;
181-
next = target;
182-
}
183-
left = workRecLen - pack1;
184-
while (pack1--) {
185-
*target = *prev;
186-
prev++;
187-
target++;
188-
}
189-
while (left--) {
190-
*target = *source;
191-
source++;
192-
target++;
193-
}
194-
}
195-
inMemorySize = keyBufMb.length();
196-
keyBuf = (char *)keyBufMb.detach();
197-
assertex(keyBuf);
198-
}
199-
else {
200-
keyBuf = NULL;
201-
inMemorySize = 0;
202-
if (hdr.nodeType == NodeBranch)
203-
{
204-
if ((hdr.leftSib == 0) && (hdr.rightSib == 0))
205-
{
206-
//Sanity check to catch error where a section of the file has unexpectedly been zeroed.
207-
//which is otherwise tricky to track down.
208-
//This can only legally happen if there is an index with 0 entries
209-
if (keyHdr->getNumRecords() != 0)
210-
throw MakeStringException(0, "Zeroed index node detected at offset %llu", getFpos());
211-
}
212-
}
213-
}
214-
}
117+
firstSequence = *(unsigned __int64 *) keys;
118+
keys += sizeof(unsigned __int64);
119+
_WINREV(firstSequence);
120+
121+
CompressionMethod compressionMethod = *(CompressionMethod*) keys;
122+
keys += sizeof(CompressionMethod);
215123

216-
if (isLeaf())
217-
loadExpandTime = expansionTimer.elapsedNs();
124+
CCycleTimer expansionTimer(true);
125+
keyBuf = expandBlock(keys, inMemorySize, compressionMethod);
126+
loadExpandTime = expansionTimer.elapsedNs();
218127
}
219128

220129
int CJHBlockCompressedSearchNode::compareValueAt(const char *src, unsigned int index) const
221130
{
222131
return memcmp(src, keyBuf + index*keyRecLen + (keyHdr->hasSpecialFileposition() ? sizeof(offset_t) : 0), keyCompareLen);
223132
}
224133

225-
//This critical section will be shared by all legacy nodes, but it is only used when recording events, so the potential contention is not a problem.
226-
static CriticalSection payloadExpandCs;
227-
228134
bool CJHBlockCompressedSearchNode::fetchPayload(unsigned int index, char *dst, PayloadReference & activePayload) const
229135
{
230136
if (index >= hdr.numKeys) return false;
231137
if (!dst) return true;
232138

233-
bool recording = recordingEvents();
234-
if (recording)
235-
{
236-
std::shared_ptr<byte []> sharedPayload;
237-
238-
{
239-
CriticalBlock block(payloadExpandCs);
240-
241-
sharedPayload = expandedPayload.lock();
242-
if (!sharedPayload)
243-
{
244-
//Allocate a dummy payload so we can track whether it is hit or not
245-
sharedPayload = std::shared_ptr<byte []>(new byte[1]);
246-
expandedPayload = sharedPayload;
247-
}
248-
}
249-
250-
queryRecorder().recordIndexPayload(keyHdr->getKeyId(), getFpos(), 0, getMemSize());
251-
252-
//Ensure the payload stays alive for the duration of this call, and is likely preserved until
253-
//the next call. Always replacing is as efficient as conditional - since we are using a move operator.
254-
activePayload.data = std::move(sharedPayload);
255-
}
256-
257139
const char * p = keyBuf + index*keyRecLen;
258140
if (keyHdr->hasSpecialFileposition())
259141
{
@@ -339,8 +221,10 @@ void CJHBlockCompressedSearchNode::dump(FILE *out, int length, unsigned rowCount
339221

340222
//=========================================================================================================
341223

342-
CBlockCompressedWriteNode::CBlockCompressedWriteNode(offset_t _fpos, CKeyHdr *_keyHdr, bool isLeafNode) : CWriteNode(_fpos, _keyHdr, isLeafNode)
224+
CBlockCompressedWriteNode::CBlockCompressedWriteNode(offset_t _fpos, CKeyHdr *_keyHdr, bool isLeafNode, const CBlockCompressedBuildContext& ctx) :
225+
CWriteNode(_fpos, _keyHdr, isLeafNode), context(ctx)
343226
{
227+
hdr.compressionType = BlockCompression;
344228
keyLen = keyHdr->getMaxKeyLength();
345229
if (!isLeafNode)
346230
{
@@ -357,66 +241,32 @@ CBlockCompressedWriteNode::~CBlockCompressedWriteNode()
357241

358242
bool CBlockCompressedWriteNode::add(offset_t pos, const void *indata, size32_t insize, unsigned __int64 sequence)
359243
{
360-
char keyType = keyHdr->getKeyType();
361-
if (isLeaf() && !hdr.numKeys)
244+
if (hdr.numKeys == 0)
362245
{
363246
unsigned __int64 rsequence = sequence;
364247
_WINREV(rsequence);
365248
memcpy(keyPtr, &rsequence, sizeof(rsequence));
366249
keyPtr += sizeof(rsequence);
367250
hdr.keyBytes += sizeof(rsequence);
368-
}
369-
if (isLeaf() && (keyType & HTREE_COMPRESSED_KEY))
370-
{
371-
// For HTREE_COMPRESSED_KEY hdr.keyBytes is only updated on the first row (above),
372-
// and then in the finalize() call after the compressor has been closed.
373-
if (0 == hdr.numKeys)
374-
{
375-
bool isVariable = keyHdr->isVariable();
376-
//Adjust the fixed key size to include the fileposition field which is written by writekey.
377-
size32_t fixedKeySize = isVariable ? 0 : keyLen + sizeof(offset_t);
378-
bool rowCompressed = (keyType&HTREE_QUICK_COMPRESSED_KEY)==HTREE_QUICK_COMPRESSED_KEY;
379-
lzwcomp.open(keyPtr, maxBytes-hdr.keyBytes, isVariable, rowCompressed, fixedKeySize);
380-
}
381-
if (0xffff == hdr.numKeys || 0 == lzwcomp.writekey(pos, (const char *)indata, insize))
382-
return false;
383-
}
384-
else
385-
{
386-
if (0xffff == hdr.numKeys)
387-
return false;
388-
assertex (indata);
389-
//assertex(insize==keyLen);
390-
const void *data;
391-
int size;
392-
393-
char *result = (char *) alloca(insize+1); // Gets bigger if no leading common!
394-
size = compressValue((const char *) indata, insize, result);
395-
data = result;
396-
397-
int bytes = sizeof(pos) + size;
398-
if (keyHdr->isVariable())
399-
bytes += sizeof(KEYRECSIZE_T);
400-
if (hdr.keyBytes + bytes >= maxBytes) // probably could be '>' (loses byte)
401-
return false;
402-
403-
if (keyHdr->isVariable() && isLeaf())
404-
{
405-
KEYRECSIZE_T _insize = insize;
406-
_WINREV(_insize);
407-
memcpy(keyPtr, &_insize, sizeof(_insize));
408-
keyPtr += sizeof(_insize);
409-
}
410-
_WINREV(pos);
411-
memcpy(keyPtr, &pos, sizeof(pos));
412-
keyPtr += sizeof(pos);
413-
memcpy(keyPtr, data, size);
414-
keyPtr += size;
415-
hdr.keyBytes += bytes;
251+
252+
memcpy(keyPtr, &context.compressionMethod, sizeof(context.compressionMethod));
253+
keyPtr += sizeof(context.compressionMethod);
254+
hdr.keyBytes += sizeof(context.compressionMethod);
255+
256+
//Adjust the fixed key size to include the fileposition field which is written by writekey.
257+
bool isVariable = keyHdr->isVariable();
258+
size32_t fixedKeySize = isVariable ? 0 : keyLen + sizeof(offset_t);
259+
260+
ICompressHandler * handler = queryCompressHandler(context.compressionMethod);
261+
compressor.open(keyPtr, maxBytes-hdr.keyBytes, handler, context.compressionOptions, isVariable, fixedKeySize);
416262
}
417263

264+
if (0xffff == hdr.numKeys || 0 == compressor.writekey(pos, (const char *)indata, insize))
265+
return false;
266+
418267
if (insize>keyLen)
419268
throw MakeStringException(0, "key+payload (%u) exceeds max length (%u)", insize, keyLen);
269+
420270
memcpy(lastKeyValue, indata, insize);
421271
lastSequence = sequence;
422272
hdr.numKeys++;
@@ -426,28 +276,38 @@ bool CBlockCompressedWriteNode::add(offset_t pos, const void *indata, size32_t i
426276

427277
void CBlockCompressedWriteNode::finalize()
428278
{
429-
if (isLeaf() && (keyHdr->getKeyType() & HTREE_COMPRESSED_KEY))
430-
{
431-
lzwcomp.close();
432-
if (hdr.numKeys)
433-
hdr.keyBytes = lzwcomp.buflen() + sizeof(unsigned __int64); // rsequence
434-
}
279+
compressor.close();
280+
if (hdr.numKeys)
281+
hdr.keyBytes = compressor.buflen() + sizeof(unsigned __int64) + sizeof(CompressionMethod); // rsequence
435282
}
436283

437-
size32_t CBlockCompressedWriteNode::compressValue(const char *keyData, size32_t size, char *result)
284+
BlockCompressedIndexCompressor::BlockCompressedIndexCompressor(unsigned keyedSize, IHThorIndexWriteArg *helper, const char* options)
438285
{
439-
unsigned int pack = 0;
440-
if (hdr.numKeys)
286+
CompressionMethod compressionMethod = COMPRESS_METHOD_ZSTDS;
287+
StringBuffer compressionOptions;
288+
289+
auto processOption = [this] (const char * option, const char * value)
441290
{
442-
for (; pack<size && pack<255; pack++)
291+
CompressionMethod method = translateToCompMethod(option, COMPRESS_METHOD_NONE);
292+
if (method != COMPRESS_METHOD_NONE)
443293
{
444-
if (keyData[pack] != lastKeyValue[pack])
445-
break;
294+
context.compressionMethod = method;
295+
if (!streq(value, "1"))
296+
context.compressionOptions.append(',').append(value);
446297
}
447-
}
298+
else if (strieq(option, "compression"))
299+
{
300+
context.compressionMethod = translateToCompMethod(value, COMPRESS_METHOD_ZSTDS);
301+
}
302+
else if (strieq(option, "compressopt"))
303+
{
304+
context.compressionOptions.append(',').append(value);
305+
}
306+
};
448307

449-
result[0] = pack;
450-
memcpy(&result[1], keyData+pack, size-pack);
451-
return size-pack+1;
452-
}
308+
processOptionString(options, processOption);
453309

310+
context.compressionHandler = queryCompressHandler(compressionMethod);
311+
if (!context.compressionHandler)
312+
throw MakeStringException(0, "Unknown compression method %d", (int)compressionMethod);
313+
}

0 commit comments

Comments
 (0)