Skip to content

Commit

Permalink
[improvement](status) Change the return type for block_compression (a…
Browse files Browse the repository at this point in the history
…pache#47566)

### What problem does this PR solve?
The previous block_compression.cpp didn't make sense for the type of
Status returned, so I changed it

Issue Number: close #xxx

Related PR: #xxx

Problem Summary:
In the past, a function returned multiple states, perhaps directly
returning one state. I made different treatments according to different
states

### Release note

None

### Check List (For Author)

- Test <!-- At least one of them must be included. -->
    - [ ] Regression test
    - [ ] Unit Test
    - [ ] Manual test (add detailed scripts or steps below)
    - [x] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
        - [x] Previous test can cover this change.
        - [ ] No code files have been changed.
        - [ ] Other reason <!-- Add your reason?  -->

- Behavior changed:
    - [x] No.
    - [ ] Yes. <!-- Explain the behavior change -->

- Does this need documentation?
    - [x] No.
- [ ] Yes. <!-- Add document PR link here. eg:
apache/doris-website#1214 -->

### Check List (For Reviewer who merge this PR)

- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
  • Loading branch information
lzyy2024 authored Feb 8, 2025
1 parent b0fa458 commit 0b49470
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 39 deletions.
130 changes: 92 additions & 38 deletions be/src/util/block_compression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ class Lz4BlockCompression : public BlockCompressionCodec {
auto decompressed_len =
LZ4_decompress_safe(input.data, output->data, input.size, output->size);
if (decompressed_len < 0) {
return Status::InvalidArgument("fail to do LZ4 decompress, error={}", decompressed_len);
return Status::InternalError("fail to do LZ4 decompress, error={}", decompressed_len);
}
output->size = decompressed_len;
return Status::OK();
Expand Down Expand Up @@ -458,8 +458,8 @@ class Lz4fBlockCompression : public BlockCompressionCodec {
&input_size, nullptr);
if (LZ4F_isError(lres)) {
decompress_failed = true;
return Status::InvalidArgument("Fail to do LZ4F decompress, res={}",
LZ4F_getErrorName(lres));
return Status::InternalError("Fail to do LZ4F decompress, res={}",
LZ4F_getErrorName(lres));
} else if (input_size != input.size) {
decompress_failed = true;
return Status::InvalidArgument(
Expand Down Expand Up @@ -635,7 +635,10 @@ class Lz4HCBlockCompression : public BlockCompressionCodec {
auto decompressed_len =
LZ4_decompress_safe(input.data, output->data, input.size, output->size);
if (decompressed_len < 0) {
return Status::InvalidArgument("fail to do LZ4 decompress, error={}", decompressed_len);
return Status::InvalidArgument(
"destination buffer is not large enough or the source stream is detected "
"malformed, fail to do LZ4 decompress, error={}",
decompressed_len);
}
output->size = decompressed_len;
return Status::OK();
Expand Down Expand Up @@ -854,8 +857,12 @@ class ZlibBlockCompression : public BlockCompressionCodec {
Slice s(*output);

auto zres = ::compress((Bytef*)s.data, &s.size, (Bytef*)input.data, input.size);
if (zres != Z_OK) {
return Status::InvalidArgument("Fail to do ZLib compress, error={}", zError(zres));
if (zres == Z_MEM_ERROR) {
throw Exception(Status::MemoryLimitExceeded(fmt::format(
"ZLib compression failed due to memory allocationerror.error = {}, res = {} ",
zError(zres), zres)));
} else if (zres != Z_OK) {
return Status::InternalError("Fail to do Zlib compress, error={}", zError(zres));
}
output->resize(s.size);
return Status::OK();
Expand All @@ -871,9 +878,12 @@ class ZlibBlockCompression : public BlockCompressionCodec {
zstrm.zfree = Z_NULL;
zstrm.opaque = Z_NULL;
auto zres = deflateInit(&zstrm, Z_DEFAULT_COMPRESSION);
if (zres != Z_OK) {
return Status::InvalidArgument("Fail to do ZLib stream compress, error={}, res={}",
zError(zres), zres);
if (zres == Z_MEM_ERROR) {
throw Exception(Status::MemoryLimitExceeded(
"Fail to do ZLib stream compress, error={}, res={}", zError(zres), zres));
} else if (zres != Z_OK) {
return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}",
zError(zres), zres);
}
// we assume that output is e
zstrm.next_out = (Bytef*)output->data();
Expand All @@ -888,16 +898,19 @@ class ZlibBlockCompression : public BlockCompressionCodec {

zres = deflate(&zstrm, flush);
if (zres != Z_OK && zres != Z_STREAM_END) {
return Status::InvalidArgument("Fail to do ZLib stream compress, error={}, res={}",
zError(zres), zres);
return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}",
zError(zres), zres);
}
}

output->resize(zstrm.total_out);
zres = deflateEnd(&zstrm);
if (zres != Z_OK) {
return Status::InvalidArgument("Fail to do deflateEnd on ZLib stream, error={}, res={}",
zError(zres), zres);
if (zres == Z_DATA_ERROR) {
return Status::InvalidArgument("Fail to do deflateEnd, error={}, res={}", zError(zres),
zres);
} else if (zres != Z_OK) {
return Status::InternalError("Fail to do deflateEnd on ZLib stream, error={}, res={}",
zError(zres), zres);
}
return Status::OK();
}
Expand All @@ -906,8 +919,13 @@ class ZlibBlockCompression : public BlockCompressionCodec {
size_t input_size = input.size;
auto zres =
::uncompress2((Bytef*)output->data, &output->size, (Bytef*)input.data, &input_size);
if (zres != Z_OK) {
if (zres == Z_DATA_ERROR) {
return Status::InvalidArgument("Fail to do ZLib decompress, error={}", zError(zres));
} else if (zres == Z_MEM_ERROR) {
throw Exception(Status::MemoryLimitExceeded("Fail to do ZLib decompress, error={}",
zError(zres)));
} else if (zres != Z_OK) {
return Status::InternalError("Fail to do ZLib decompress, error={}", zError(zres));
}
return Status::OK();
}
Expand All @@ -932,8 +950,14 @@ class Bzip2BlockCompression : public BlockCompressionCodec {
uint32_t size = output->size();
auto bzres = BZ2_bzBuffToBuffCompress((char*)output->data(), &size, (char*)input.data,
input.size, 9, 0, 0);
if (bzres != BZ_OK) {
return Status::InternalError("Fail to do Bzip2 compress, ret={}", bzres);
if (bzres == BZ_MEM_ERROR) {
throw Exception(
Status::MemoryLimitExceeded("Fail to do Bzip2 compress, ret={}", bzres));
} else if (bzres == BZ_PARAM_ERROR) {
return Status::InvalidArgument("Fail to do Bzip2 compress, ret={}", bzres);
} else if (bzres != BZ_RUN_OK && bzres != BZ_FLUSH_OK && bzres != BZ_FINISH_OK &&
bzres != BZ_STREAM_END && bzres != BZ_OK) {
return Status::InternalError("Failed to init bz2. status code: {}", bzres);
}
output->resize(size);
return Status::OK();
Expand All @@ -947,7 +971,12 @@ class Bzip2BlockCompression : public BlockCompressionCodec {
bz_stream bzstrm;
bzero(&bzstrm, sizeof(bzstrm));
int bzres = BZ2_bzCompressInit(&bzstrm, 9, 0, 0);
if (bzres != BZ_OK) {
if (bzres == BZ_PARAM_ERROR) {
return Status::InvalidArgument("Failed to init bz2. status code: {}", bzres);
} else if (bzres == BZ_MEM_ERROR) {
throw Exception(
Status::MemoryLimitExceeded("Failed to init bz2. status code: {}", bzres));
} else if (bzres != BZ_OK) {
return Status::InternalError("Failed to init bz2. status code: {}", bzres);
}
// we assume that output is e
Expand All @@ -962,15 +991,20 @@ class Bzip2BlockCompression : public BlockCompressionCodec {
int flush = (i == (inputs.size() - 1)) ? BZ_FINISH : BZ_RUN;

bzres = BZ2_bzCompress(&bzstrm, flush);
if (bzres != BZ_OK && bzres != BZ_STREAM_END) {
return Status::InternalError("Fail to do bzip2 stream compress, res={}", bzres);
if (bzres == BZ_PARAM_ERROR) {
return Status::InvalidArgument("Failed to init bz2. status code: {}", bzres);
} else if (bzres != BZ_RUN_OK && bzres != BZ_FLUSH_OK && bzres != BZ_FINISH_OK &&
bzres != BZ_STREAM_END && bzres != BZ_OK) {
return Status::InternalError("Failed to init bz2. status code: {}", bzres);
}
}

size_t total_out = (size_t)bzstrm.total_out_hi32 << 32 | (size_t)bzstrm.total_out_lo32;
output->resize(total_out);
bzres = BZ2_bzCompressEnd(&bzstrm);
if (bzres != BZ_OK) {
if (bzres == BZ_PARAM_ERROR) {
return Status::InvalidArgument("Fail to do deflateEnd on bzip2 stream, res={}", bzres);
} else if (bzres != BZ_OK) {
return Status::InternalError("Fail to do deflateEnd on bzip2 stream, res={}", bzres);
}
return Status::OK();
Expand Down Expand Up @@ -1102,14 +1136,14 @@ class ZstdBlockCompression : public BlockCompressionCodec {

if (ZSTD_isError(ret)) {
compress_failed = true;
return Status::InvalidArgument("ZSTD_compressStream2 error: {}",
ZSTD_getErrorString(ZSTD_getErrorCode(ret)));
return Status::InternalError("ZSTD_compressStream2 error: {}",
ZSTD_getErrorString(ZSTD_getErrorCode(ret)));
}

// ret is ZSTD hint for needed output buffer size
if (ret > 0 && out_buf.pos == out_buf.size) {
compress_failed = true;
return Status::InvalidArgument("ZSTD_compressStream2 output buffer full");
return Status::InternalError("ZSTD_compressStream2 output buffer full");
}

finished = last_input ? (ret == 0) : (in_buf.pos == inputs[i].size);
Expand Down Expand Up @@ -1146,8 +1180,8 @@ class ZstdBlockCompression : public BlockCompressionCodec {
input.size);
if (ZSTD_isError(ret)) {
decompress_failed = true;
return Status::InvalidArgument("ZSTD_decompressDCtx error: {}",
ZSTD_getErrorString(ZSTD_getErrorCode(ret)));
return Status::InternalError("ZSTD_decompressDCtx error: {}",
ZSTD_getErrorString(ZSTD_getErrorCode(ret)));
}

// set decompressed size for caller
Expand Down Expand Up @@ -1239,8 +1273,12 @@ class GzipBlockCompression : public ZlibBlockCompression {
int zres = deflateInit2(&z_strm, Z_DEFAULT_COMPRESSION, Z_DEFLATED, MAX_WBITS + GZIP_CODEC,
8, Z_DEFAULT_STRATEGY);

if (zres != Z_OK) {
return Status::InvalidArgument("Fail to init zlib compress");
if (zres == Z_MEM_ERROR) {
throw Exception(Status::MemoryLimitExceeded(
"Fail to init ZLib compress, error={}, res={}", zError(zres), zres));
} else if (zres != Z_OK) {
return Status::InternalError("Fail to init ZLib compress, error={}, res={}",
zError(zres), zres);
}

z_strm.next_in = (Bytef*)input.get_data();
Expand All @@ -1250,14 +1288,16 @@ class GzipBlockCompression : public ZlibBlockCompression {

zres = deflate(&z_strm, Z_FINISH);
if (zres != Z_OK && zres != Z_STREAM_END) {
return Status::InvalidArgument("Fail to do ZLib stream compress, error={}, res={}",
zError(zres), zres);
return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}",
zError(zres), zres);
}

output->resize(z_strm.total_out);
zres = deflateEnd(&z_strm);
if (zres != Z_OK) {
if (zres == Z_DATA_ERROR) {
return Status::InvalidArgument("Fail to end zlib compress");
} else if (zres != Z_OK) {
return Status::InternalError("Fail to end zlib compress");
}
return Status::OK();
}
Expand All @@ -1273,10 +1313,14 @@ class GzipBlockCompression : public ZlibBlockCompression {
zstrm.opaque = Z_NULL;
auto zres = deflateInit2(&zstrm, Z_DEFAULT_COMPRESSION, Z_DEFLATED, MAX_WBITS + GZIP_CODEC,
8, Z_DEFAULT_STRATEGY);
if (zres != Z_OK) {
return Status::InvalidArgument("Fail to do ZLib stream compress, error={}, res={}",
zError(zres), zres);
if (zres == Z_MEM_ERROR) {
throw Exception(Status::MemoryLimitExceeded(
"Fail to init ZLib stream compress, error={}, res={}", zError(zres), zres));
} else if (zres != Z_OK) {
return Status::InternalError("Fail to init ZLib stream compress, error={}, res={}",
zError(zres), zres);
}

// we assume that output is e
zstrm.next_out = (Bytef*)output->data();
zstrm.avail_out = output->size();
Expand All @@ -1290,16 +1334,19 @@ class GzipBlockCompression : public ZlibBlockCompression {

zres = deflate(&zstrm, flush);
if (zres != Z_OK && zres != Z_STREAM_END) {
return Status::InvalidArgument("Fail to do ZLib stream compress, error={}, res={}",
zError(zres), zres);
return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}",
zError(zres), zres);
}
}

output->resize(zstrm.total_out);
zres = deflateEnd(&zstrm);
if (zres != Z_OK) {
if (zres == Z_DATA_ERROR) {
return Status::InvalidArgument("Fail to do deflateEnd on ZLib stream, error={}, res={}",
zError(zres), zres);
} else if (zres != Z_OK) {
return Status::InternalError("Fail to do deflateEnd on ZLib stream, error={}, res={}",
zError(zres), zres);
}
return Status::OK();
}
Expand All @@ -1312,7 +1359,7 @@ class GzipBlockCompression : public ZlibBlockCompression {

int ret = inflateInit2(&z_strm, MAX_WBITS + GZIP_CODEC);
if (ret != Z_OK) {
return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}",
return Status::InternalError("Fail to init ZLib decompress, error={}, res={}",
zError(ret), ret);
}

Expand All @@ -1327,6 +1374,13 @@ class GzipBlockCompression : public ZlibBlockCompression {
ret = inflate(&z_strm, Z_FINISH);
if (ret != Z_OK && ret != Z_STREAM_END) {
(void)inflateEnd(&z_strm);
if (ret == Z_MEM_ERROR) {
throw Exception(Status::MemoryLimitExceeded(
"Fail to do ZLib stream compress, error={}, res={}", zError(ret), ret));
} else if (ret == Z_DATA_ERROR) {
return Status::InvalidArgument(
"Fail to do ZLib stream compress, error={}, res={}", zError(ret), ret);
}
return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}",
zError(ret), ret);
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/functions/function_compress.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class FunctionCompress : public IFunction {
}

// Z_MEM_ERROR and Z_BUF_ERROR are already handled in compress, making sure st is always Z_OK
auto st = compression_codec->compress(data, &compressed_str);
RETURN_IF_ERROR(compression_codec->compress(data, &compressed_str));
col_data.resize(col_data.size() + 4 + compressed_str.size());

std::memcpy(col_data.data() + idx, &length, sizeof(length));
Expand Down

0 comments on commit 0b49470

Please sign in to comment.