Skip to content

Commit 61e8d2a

Browse files
committed
Fixed issues discovered during PR review
1 parent b148348 commit 61e8d2a

File tree

9 files changed

+43
-79
lines changed

9 files changed

+43
-79
lines changed

clickhouse/CMakeLists.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
SET ( clickhouse-cpp-lib-src
2-
base/coded.cpp
32
base/compressed.cpp
43
base/input.cpp
54
base/output.cpp

clickhouse/base/coded.cpp

Lines changed: 0 additions & 30 deletions
This file was deleted.

clickhouse/base/coded.h

Lines changed: 0 additions & 10 deletions
This file was deleted.

clickhouse/base/compressed.cpp

Lines changed: 32 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,13 @@
77
#include <stdexcept>
88
#include <system_error>
99

10-
#include <iostream>
11-
1210
namespace {
13-
static const size_t HEADER_SIZE = 9;
14-
static const size_t EXTRA_PREALLOCATE_COMPRESS_BUFFER = 15;
15-
static const uint8_t COMPRESSION_METHOD = 0x82;
16-
#define DBMS_MAX_COMPRESSED_SIZE 0x40000000ULL // 1GB
11+
constexpr size_t HEADER_SIZE = 9;
12+
// see DB::CompressionMethodByte::LZ4 from src/Compression/CompressionInfo.h of ClickHouse project
13+
constexpr uint8_t COMPRESSION_METHOD = 0x82;
14+
// Documentation says that compression is faster when output buffer is larger than LZ4_compressBound estimation.
15+
constexpr size_t EXTRA_COMPRESS_BUFFER_SIZE = 4096;
16+
constexpr size_t DBMS_MAX_COMPRESSED_SIZE = 0x40000000ULL; // 1GB
1717
}
1818

1919
namespace clickhouse {
@@ -30,7 +30,7 @@ CompressedInput::~CompressedInput() {
3030
#else
3131
if (!std::uncaught_exceptions()) {
3232
#endif
33-
throw std::runtime_error("some data was not readed");
33+
throw std::runtime_error("some data was not read");
3434
}
3535
}
3636
}
@@ -59,8 +59,7 @@ bool CompressedInput::Decompress() {
5959
}
6060

6161
if (method != COMPRESSION_METHOD) {
62-
throw std::runtime_error("unsupported compression method " +
63-
std::to_string(int(method)));
62+
throw std::runtime_error("unsupported compression method " + std::to_string(int(method)));
6463
} else {
6564
if (!WireFormat::ReadFixed(input_, &compressed)) {
6665
return false;
@@ -105,24 +104,27 @@ bool CompressedInput::Decompress() {
105104

106105

107106
CompressedOutput::CompressedOutput(OutputStream * destination, size_t max_compressed_chunk_size)
108-
: destination_(destination),
109-
max_compressed_chunk_size_(max_compressed_chunk_size)
107+
: destination_(destination)
108+
, max_compressed_chunk_size_(max_compressed_chunk_size)
110109
{
110+
PreallocateCompressBuffer(max_compressed_chunk_size);
111111
}
112112

113113
CompressedOutput::~CompressedOutput() {
114-
Flush();
114+
Flush();
115115
}
116116

117117
size_t CompressedOutput::DoWrite(const void* data, size_t len) {
118118
const size_t original_len = len;
119-
const size_t max_chunk_size = max_compressed_chunk_size_ ? max_compressed_chunk_size_ : len;
119+
// what if len > max_compressed_chunk_size_ ?
120+
const size_t max_chunk_size = max_compressed_chunk_size_ > 0 ? max_compressed_chunk_size_ : len;
121+
if (len > max_compressed_chunk_size_) {
122+
PreallocateCompressBuffer(len);
123+
}
120124

121-
while (len > 0)
122-
{
125+
while (len > 0) {
123126
auto to_compress = std::min(len, max_chunk_size);
124-
if (!Compress(data, to_compress))
125-
break;
127+
Compress(data, to_compress);
126128

127129
len -= to_compress;
128130
data = reinterpret_cast<const char*>(data) + to_compress;
@@ -135,16 +137,15 @@ void CompressedOutput::DoFlush() {
135137
destination_->Flush();
136138
}
137139

138-
bool CompressedOutput::Compress(const void * data, size_t len) {
139-
140-
const size_t expected_out_size = LZ4_compressBound(len);
141-
compressed_buffer_.resize(std::max(compressed_buffer_.size(), expected_out_size + HEADER_SIZE + EXTRA_PREALLOCATE_COMPRESS_BUFFER));
142-
143-
const int compressed_size = LZ4_compress_default(
140+
void CompressedOutput::Compress(const void * data, size_t len) {
141+
const auto compressed_size = LZ4_compress_default(
144142
(const char*)data,
145143
(char*)compressed_buffer_.data() + HEADER_SIZE,
146144
len,
147145
compressed_buffer_.size() - HEADER_SIZE);
146+
if (compressed_size <= 0)
147+
throw std::runtime_error("Failed to compress chunk of " + std::to_string(len) + " bytes, "
148+
"LZ4 error: " + std::to_string(compressed_size));
148149

149150
{
150151
auto header = compressed_buffer_.data();
@@ -160,7 +161,14 @@ bool CompressedOutput::Compress(const void * data, size_t len) {
160161
WireFormat::WriteBytes(destination_, compressed_buffer_.data(), compressed_size + HEADER_SIZE);
161162

162163
destination_->Flush();
163-
return true;
164+
}
165+
166+
void CompressedOutput::PreallocateCompressBuffer(size_t input_size) {
167+
const auto estimated_compressed_buffer_size = LZ4_compressBound(input_size);
168+
if (estimated_compressed_buffer_size <= 0)
169+
throw std::runtime_error("Failed to estimate compressed buffer size, LZ4 error: " + std::to_string(estimated_compressed_buffer_size));
170+
171+
compressed_buffer_.resize(estimated_compressed_buffer_size + HEADER_SIZE + EXTRA_COMPRESS_BUFFER_SIZE);
164172
}
165173

166174
}

clickhouse/base/compressed.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ namespace clickhouse {
88

99
class CompressedInput : public ZeroCopyInput {
1010
public:
11-
CompressedInput(InputStream* input);
11+
CompressedInput(InputStream* input);
1212
~CompressedInput();
1313

1414
protected:
@@ -31,13 +31,15 @@ class CompressedOutput : public OutputStream {
3131
protected:
3232
size_t DoWrite(const void* data, size_t len) override;
3333
void DoFlush() override;
34-
bool Compress(const void * data, size_t len);
3534

35+
private:
36+
void Compress(const void * data, size_t len);
37+
void PreallocateCompressBuffer(size_t input_size);
3638

3739
private:
3840
OutputStream * destination_;
41+
const size_t max_compressed_chunk_size_;
3942
Buffer compressed_buffer_;
40-
size_t max_compressed_chunk_size_;
4143
};
4244

4345
}

clickhouse/base/sslsocket.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ class SSLSocketInput : public InputStream {
6464
bool Skip(size_t /*bytes*/) override {
6565
return false;
6666
}
67+
6768
protected:
6869
size_t DoRead(void* buf, size_t len) override;
6970

clickhouse/base/wire_format.cpp

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
#include <stdexcept>
77

88
namespace {
9-
static const int MAX_VARINT_BYTES = 10;
9+
constexpr int MAX_VARINT_BYTES = 10;
1010
}
1111

1212
namespace clickhouse {
@@ -22,12 +22,7 @@ bool WireFormat::ReadAll(InputStream * input, void* buf, size_t len) {
2222
len -= read_previously;
2323
}
2424

25-
// true if all was read successfully
2625
return !len;
27-
// if (len) {
28-
// throw std::runtime_error("Failed to read " + std::to_string(original_len)
29-
// + " bytes, only read " + std::to_string(original_len - len));
30-
// }
3126
}
3227

3328
void WireFormat::WriteAll(OutputStream* output, const void* buf, size_t len) {
@@ -52,7 +47,7 @@ bool WireFormat::ReadVarint64(InputStream* input, uint64_t* value) {
5247
*value = 0;
5348

5449
for (size_t i = 0; i < MAX_VARINT_BYTES; ++i) {
55-
uint8_t byte;
50+
uint8_t byte = 0;
5651

5752
if (!input->ReadByte(&byte)) {
5853
return false;
@@ -90,7 +85,7 @@ void WireFormat::WriteVarint64(OutputStream* output, uint64_t value) {
9085
}
9186

9287
bool WireFormat::SkipString(InputStream* input) {
93-
uint64_t len;
88+
uint64_t len = 0;
9489

9590
if (ReadVarint64(input, &len)) {
9691
if (len > 0x00FFFFFFULL)

clickhouse/base/wire_format.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,7 @@ inline bool WireFormat::ReadFixed(InputStream* input, T* value) {
3535
}
3636

3737
inline bool WireFormat::ReadString(InputStream* input, std::string* value) {
38-
uint64_t len;
39-
38+
uint64_t len = 0;
4039
if (ReadVarint64(input, &len)) {
4140
if (len > 0x00FFFFFFULL) {
4241
return false;

ut/stream_ut.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ TEST(CodedStreamCase, Varint64) {
1616

1717
{
1818
ArrayInput input(buf.data(), buf.size());
19-
uint64_t value;
19+
uint64_t value = 0;
2020
ASSERT_TRUE(WireFormat::ReadVarint64(&input, &value));
2121
ASSERT_EQ(value, 18446744071965638648ULL);
2222
}

0 commit comments

Comments
 (0)