11#include  " compressed.h" 
22#include  " wire_format.h" 
3+ #include  " output.h" 
34
45#include  < cityhash/city.h> 
56#include  < lz4/lz4.h> 
67#include  < stdexcept> 
78#include  < system_error> 
89
9- #define  DBMS_MAX_COMPRESSED_SIZE     0x40000000ULL    //  1GB
10+ namespace  {
11+ constexpr  size_t  HEADER_SIZE = 9 ;
12+ //  see DB::CompressionMethodByte::LZ4 from src/Compression/CompressionInfo.h of ClickHouse project
13+ constexpr  uint8_t  COMPRESSION_METHOD = 0x82 ;
14+ //  Documentation says that compression is faster when output buffer is larger than LZ4_compressBound estimation.
15+ constexpr  size_t  EXTRA_COMPRESS_BUFFER_SIZE = 4096 ;
16+ constexpr  size_t  DBMS_MAX_COMPRESSED_SIZE = 0x40000000ULL ;   //  1GB
17+ }
1018
1119namespace  clickhouse  {
1220
13- CompressedInput::CompressedInput (CodedInputStream * input)
21+ CompressedInput::CompressedInput (InputStream * input)
1422    : input_(input)
1523{
1624}
@@ -22,7 +30,7 @@ CompressedInput::~CompressedInput() {
2230#else 
2331        if  (!std::uncaught_exceptions ()) {
2432#endif
25-             throw  std::runtime_error (" some data was not readed "  );
33+             throw  std::runtime_error (" some data was not read "  );
2634        }
2735    }
2836}
@@ -50,9 +58,8 @@ bool CompressedInput::Decompress() {
5058        return  false ;
5159    }
5260
53-     if  (method != 0x82 ) {
54-         throw  std::runtime_error (" unsupported compression method "   +
55-                                  std::to_string (int (method)));
61+     if  (method != COMPRESSION_METHOD) {
62+         throw  std::runtime_error (" unsupported compression method "   + std::to_string (int (method)));
5663    } else  {
5764        if  (!WireFormat::ReadFixed (input_, &compressed)) {
5865            return  false ;
@@ -75,7 +82,7 @@ bool CompressedInput::Decompress() {
7582            out.Write (&original,   sizeof (original));
7683        }
7784
78-         if  (!WireFormat::ReadBytes (input_, tmp.data () + 9 , compressed - 9 )) {
85+         if  (!WireFormat::ReadBytes (input_, tmp.data () + HEADER_SIZE , compressed - HEADER_SIZE )) {
7986            return  false ;
8087        } else  {
8188            if  (hash != CityHash128 ((const  char *)tmp.data (), compressed)) {
@@ -85,7 +92,7 @@ bool CompressedInput::Decompress() {
8592
8693        data_ = Buffer (original);
8794
88-         if  (LZ4_decompress_safe ((const  char *)tmp.data () + 9 , (char *)data_.data (), compressed - 9 , original) < 0 ) {
95+         if  (LZ4_decompress_safe ((const  char *)tmp.data () + HEADER_SIZE , (char *)data_.data (), compressed - HEADER_SIZE , original) < 0 ) {
8996            throw  std::runtime_error (" can't decompress data"  );
9097        } else  {
9198            mem_.Reset (data_.data (), original);
@@ -95,4 +102,73 @@ bool CompressedInput::Decompress() {
95102    return  true ;
96103}
97104
105+ 
106+ CompressedOutput::CompressedOutput (OutputStream * destination, size_t  max_compressed_chunk_size)
107+     : destination_ (destination)
108+     , max_compressed_chunk_size_ (max_compressed_chunk_size)
109+ {
110+     PreallocateCompressBuffer (max_compressed_chunk_size);
111+ }
112+ 
113+ CompressedOutput::~CompressedOutput () {
114+     Flush ();
115+ }
116+ 
117+ size_t  CompressedOutput::DoWrite (const  void * data, size_t  len) {
118+     const  size_t  original_len = len;
119+     //  what if len > max_compressed_chunk_size_ ?
120+     const  size_t  max_chunk_size = max_compressed_chunk_size_ > 0  ? max_compressed_chunk_size_ : len;
121+     if  (max_chunk_size > max_compressed_chunk_size_) {
122+         PreallocateCompressBuffer (len);
123+     }
124+ 
125+     while  (len > 0 ) {
126+         auto  to_compress = std::min (len, max_chunk_size);
127+         Compress (data, to_compress);
128+ 
129+         len -= to_compress;
130+         data = reinterpret_cast <const  char *>(data) + to_compress;
131+     }
132+ 
133+     return  original_len - len;
134+ }
135+ 
136+ void  CompressedOutput::DoFlush () {
137+     destination_->Flush ();
138+ }
139+ 
140+ void  CompressedOutput::Compress (const  void  * data, size_t  len) {
141+     const  auto  compressed_size = LZ4_compress_default (
142+             (const  char *)data,
143+             (char *)compressed_buffer_.data () + HEADER_SIZE,
144+             len,
145+             compressed_buffer_.size () - HEADER_SIZE);
146+     if  (compressed_size <= 0 )
147+         throw  std::runtime_error (" Failed to compress chunk of "   + std::to_string (len) + "  bytes, " 
148+                 " LZ4 error: "   + std::to_string (compressed_size));
149+ 
150+     {
151+         auto  header = compressed_buffer_.data ();
152+         WriteUnaligned (header, COMPRESSION_METHOD);
153+         //  Compressed data size with header
154+         WriteUnaligned (header + 1 , static_cast <uint32_t >(compressed_size + HEADER_SIZE));
155+         //  Original data size
156+         WriteUnaligned (header + 5 , static_cast <uint32_t >(len));
157+     }
158+ 
159+     WireFormat::WriteFixed (destination_, CityHash128 (
160+         (const  char *)compressed_buffer_.data (), compressed_size + HEADER_SIZE));
161+     WireFormat::WriteBytes (destination_, compressed_buffer_.data (), compressed_size + HEADER_SIZE);
162+ 
163+     destination_->Flush ();
164+ }
165+ 
166+ void  CompressedOutput::PreallocateCompressBuffer (size_t  input_size) {
167+     const  auto  estimated_compressed_buffer_size = LZ4_compressBound (input_size);
168+     if  (estimated_compressed_buffer_size <= 0 )
169+         throw  std::runtime_error (" Failed to estimate compressed buffer size, LZ4 error: "   + std::to_string (estimated_compressed_buffer_size));
170+ 
171+     compressed_buffer_.resize (estimated_compressed_buffer_size + HEADER_SIZE + EXTRA_COMPRESS_BUFFER_SIZE);
172+ }
173+ 
98174}
0 commit comments