Skip to content

Commit da17f18

Browse files
committed
support_compression: code cleanup
1 parent 7b92ea0 commit da17f18

File tree

2 files changed

+77
-246
lines changed

2 files changed

+77
-246
lines changed

driver/lz4_inflating_input_stream.cpp

Lines changed: 36 additions & 158 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,11 @@
22
#include "Poco/Exception.h"
33

44
#include "exception.h"
5-
#include <csignal>
65

76
#include "driver.h"
87

9-
LZ4InflatingStreamBuf::LZ4InflatingStreamBuf(std::istream& istr)
10-
:Poco::BufferedStreamBuf(INFLATE_BUFFER_SIZE /* 4 * 1024 * 1024 */ /* STREAM_BUFFER_SIZE */, std::ios::in),
11-
pIstr(&istr) {
8+
LZ4InflatingStreamBuf::LZ4InflatingStreamBuf(std::istream & istr)
9+
: Poco::BufferedStreamBuf(INFLATE_BUFFER_SIZE, std::ios::in), pIstr(&istr) {
1210
LOG("LZ4InflatingInputStream ctor");
1311
size_t ret = LZ4F_createDecompressionContext(&dctx, LZ4F_VERSION);
1412

@@ -18,170 +16,50 @@ LZ4InflatingStreamBuf::LZ4InflatingStreamBuf(std::istream& istr)
1816
compressedBuffer.resize(INFLATE_BUFFER_SIZE);
1917
}
2018

21-
LZ4InflatingStreamBuf::~LZ4InflatingStreamBuf()
22-
{
19+
LZ4InflatingStreamBuf::~LZ4InflatingStreamBuf() {
20+
/// seems Ok with repeatable calls, so no extra care needed
2321
LZ4F_freeDecompressionContext(dctx);
2422
}
2523

26-
#if 0
27-
int LZ4InflatingInputStream::readFromDevice(char* buffer, std::streamsize length)
28-
{
29-
if (_eof || !_pIstr) return 0;
30-
31-
if (/*_zstr.*/avail_in == 0)
32-
{
33-
int n = 0;
34-
if (_pIstr->good())
35-
{
36-
_pIstr->read(_buffer, INFLATE_BUFFER_SIZE);
37-
n = static_cast<int>(_pIstr->gcount());
38-
}
39-
/*_zstr.*/next_in = (unsigned char*) _buffer;
40-
/*_zstr.*/avail_in = n;
41-
}
42-
/*_zstr.*/next_out = (unsigned char*) buffer;
43-
/*_zstr.*/avail_out = static_cast<unsigned>(length);
44-
45-
size_t ret;
46-
47-
48-
for (;;)
49-
{
50-
// int rc = inflate(&_zstr, Z_NO_FLUSH);
51-
size_t bytes_read = avail_in;
52-
size_t bytes_written = avail_out;
53-
54-
55-
ret = LZ4F_decompress(dctx, next_out, &bytes_written, next_in, &bytes_read, /* LZ4F_decompressOptions_t */ nullptr);
56-
57-
if (LZ4F_isError(ret))
58-
throw Poco::IOException(std::string("LZ4 decompression failed. LZ4F version:") + std::to_string(LZ4F_VERSION) + "Error: " + LZ4F_getErrorName(ret));
59-
60-
if (bytes_written == 0 || /* _pIstr->good() || */ !_pIstr->eof())
61-
{
62-
63-
// if (rc == Z_DATA_ERROR && !_check)
64-
// {
65-
// if (/*_zstr.*/avail_in == 0)
66-
// {
67-
// if (_pIstr->good())
68-
// rc = Z_OK;
69-
// else
70-
// rc = Z_STREAM_END;
71-
// }
72-
// }
73-
// if (rc == Z_STREAM_END)
74-
// {
75-
_eof = true;
76-
return static_cast<int>(length) - /*_zstr.*/avail_out;
77-
}
78-
// if (rc != Z_OK) throw IOException(zError(rc));
79-
if (/*_zstr.*/avail_out == 0)
80-
return static_cast<int>(length);
81-
if (/*_zstr.*/avail_in == 0)
82-
{
83-
int n = 0;
84-
if (_pIstr->good())
85-
{
86-
_pIstr->read(_buffer, INFLATE_BUFFER_SIZE);
87-
n = static_cast<int>(_pIstr->gcount());
88-
}
89-
if (n > 0)
90-
{
91-
/*_zstr.*/next_in = (unsigned char*) _buffer;
92-
/*_zstr.*/avail_in = n;
93-
}
94-
else return static_cast<int>(length) - /*_zstr.*/avail_out;
95-
}
96-
}
97-
}
98-
#endif
99-
100-
int LZ4InflatingStreamBuf::readFromDevice(char *dst_buffer,
101-
std::streamsize length) {
102-
LOG("LZ4InflatingInputStream::readFromDevice");
103-
104-
if (!bytes_left) {
105-
if (!pIstr->good()) {
106-
LOG("LZ4InflatingInputStream::readFromDevice EOF or error (stream is not "
107-
"good)");
108-
return -1; // EOF or error
24+
int LZ4InflatingStreamBuf::readFromDevice(char * dst_buffer, std::streamsize length) {
25+
LOG("LZ4InflatingInputStream::readFromDevice");
26+
27+
if (!bytes_left) {
28+
if (!pIstr->good()) {
29+
LOG("LZ4InflatingInputStream::readFromDevice EOF or error (stream is not "
30+
"good)");
31+
return -1; // EOF or error
32+
}
33+
pIstr->read(compressedBuffer.data(), static_cast<std::streamsize>(compressedBuffer.size()));
34+
bytes_left = pIstr->gcount();
35+
src_buffer = compressedBuffer.data();
36+
LOG("LZ4InflatingInputStream::readFromDevice: bytes_left=" << bytes_left);
10937
}
110-
pIstr->read(compressedBuffer.data(),
111-
static_cast<std::streamsize>(compressedBuffer.size()));
112-
bytes_left = pIstr->gcount();
113-
src_buffer = compressedBuffer.data();
114-
LOG("LZ4InflatingInputStream::readFromDevice: bytes_left=" << bytes_left);
115-
}
116-
117-
size_t bytesRead = bytes_left;
118-
LOG("LZ4InflatingInputStream::readFromDevice: shift="
119-
<< src_buffer - compressedBuffer.data() << ", bytesRead=" << bytesRead);
120-
121-
// size_t in_available = pIstr->gcount();
122-
123-
if (bytesRead <= 0) // negative ??
124-
{
125-
LOG("LZ4InflatingInputStream::readFromDevice: No more compressed data");
126-
return 0; // No more compressed data
127-
}
128-
129-
// Decompress the data into 'buffer'
130-
// Note: LZ4 requires you to know or guess the max decompressed size.
131-
// Here we assume 'length' is enough for decompressed data.
132-
133-
// int decompressedSize = LZ4_decompress_safe(compressedBuffer.data(), buffer,
134-
// static_cast<int>(bytesRead), static_cast<int>(length));
135-
size_t decompressedSize = length;
136-
137-
auto ret = LZ4F_decompress(dctx, dst_buffer, &decompressedSize, src_buffer,
138-
&bytesRead, nullptr);
139-
LOG("decompressedSize=" << decompressedSize << ", ret=" << ret
140-
<< ", bytesRead=" << bytesRead);
141-
142-
if (LZ4F_isError(ret)) {
143-
LOG("throwing error " << std::string(
144-
"LZ4 decompression failed. LZ4F version:") +
145-
std::to_string(LZ4F_VERSION) +
146-
"Error: " + LZ4F_getErrorName(ret));
147-
148-
throw Poco::IOException(
149-
std::string("LZ4 decompression failed. LZ4F version:") +
150-
std::to_string(LZ4F_VERSION) + "Error: " + LZ4F_getErrorName(ret));
151-
}
152-
153-
bytes_left -= bytesRead;
154-
src_buffer += bytesRead;
155-
156-
return decompressedSize;
157-
}
15838

159-
LZ4InflatingInputStream::LZ4InflatingInputStream(std::istream& istr):
160-
std::istream(&_buf),
161-
LZ4InflatingIOS(istr)
162-
{
163-
}
39+
size_t bytesRead = bytes_left;
16440

41+
if (!bytesRead) {
42+
LOG("LZ4InflatingInputStream::readFromDevice: No more compressed data");
43+
return 0; // No more compressed data
44+
}
16545

166-
LZ4InflatingInputStream::~LZ4InflatingInputStream()
167-
{
168-
}
169-
46+
size_t decompressedSize = length;
17047

171-
void LZ4InflatingInputStream::reset()
172-
{
173-
_buf.reset();
174-
clear();
175-
}
48+
auto ret = LZ4F_decompress(dctx, dst_buffer, &decompressedSize, src_buffer, &bytesRead, nullptr);
49+
LOG("decompressedSize=" << decompressedSize << ", ret=" << ret << ", bytesRead=" << bytesRead);
17650

177-
LZ4InflatingIOS::LZ4InflatingIOS(std::istream& istr):
178-
_buf(istr)
179-
{
180-
poco_ios_init(&_buf);
181-
}
51+
if (LZ4F_isError(ret)) {
52+
// LOG("throwing error " << std::string(
53+
// "LZ4 decompression failed. LZ4F version:") +
54+
// std::to_string(LZ4F_VERSION) +
55+
// "Error: " + LZ4F_getErrorName(ret));
18256

57+
throw Poco::IOException(
58+
std::string("LZ4 decompression failed. LZ4F version:") + std::to_string(LZ4F_VERSION) + "Error: " + LZ4F_getErrorName(ret));
59+
}
18360

61+
bytes_left -= bytesRead;
62+
src_buffer += bytesRead;
18463

185-
LZ4InflatingIOS::~LZ4InflatingIOS()
186-
{
64+
return decompressedSize;
18765
}
Lines changed: 41 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -1,117 +1,70 @@
11
#pragma once
22

3-
#include "Poco/BufferedStreamBuf.h"
43
#include <istream>
4+
#include "Poco/BufferedStreamBuf.h"
55

66
#include <lz4.h>
77
#include <lz4frame.h>
88

9-
class LZ4InflatingStreamBuf : public Poco::BufferedStreamBuf
10-
/// This stream decompresses all data passing through it
11-
/// using zlib's inflate algorithm.
12-
/// Example:
13-
/// std::ifstream istr("data.gz", std::ios::binary);
14-
/// InflatingInputStream inflater(istr, InflatingStreamBuf::STREAM_GZIP);
15-
/// std::string data;
16-
/// inflater >> data;
17-
///
18-
/// The underlying input stream can contain more than one gzip/deflate stream.
19-
/// After a gzip/deflate stream has been processed, reset() can be called
20-
/// to inflate the next stream.
21-
{
9+
class LZ4InflatingStreamBuf : public Poco::BufferedStreamBuf {
2210
public:
23-
LZ4InflatingStreamBuf(const LZ4InflatingStreamBuf &) = delete;
24-
LZ4InflatingStreamBuf(LZ4InflatingStreamBuf &&) = delete;
25-
LZ4InflatingStreamBuf &operator=(const LZ4InflatingStreamBuf &) = delete;
26-
LZ4InflatingStreamBuf &operator=(LZ4InflatingStreamBuf &&) = delete;
27-
LZ4InflatingStreamBuf(std::istream &istr);
28-
/// Creates an InflatingInputStream for expanding the compressed data read
29-
/// from the given input stream.
30-
31-
~LZ4InflatingStreamBuf();
32-
/// Destroys the InflatingInputStream.
11+
LZ4InflatingStreamBuf(const LZ4InflatingStreamBuf &) = delete;
12+
LZ4InflatingStreamBuf(LZ4InflatingStreamBuf &&) = delete;
13+
LZ4InflatingStreamBuf & operator=(const LZ4InflatingStreamBuf &) = delete;
14+
LZ4InflatingStreamBuf & operator=(LZ4InflatingStreamBuf &&) = delete;
15+
LZ4InflatingStreamBuf(std::istream & istr);
3316

34-
// int close();
17+
~LZ4InflatingStreamBuf();
3518

36-
/// Finishes up the stream.
37-
///
38-
/// Must be called when inflating to an output stream.
19+
void reset() { }
3920

40-
void reset() {
41-
}
42-
43-
/// Resets the stream buffer.
4421
protected:
4522
int readFromDevice(char * buffer, std::streamsize length);
46-
// int writeToDevice(const char * buffer, std::streamsize length);
47-
// int sync();
4823

4924
private:
50-
std::istream* pIstr;
25+
std::istream * pIstr;
26+
27+
/// place to keep incoming data
5128
std::vector<char> compressedBuffer;
52-
enum
53-
{
54-
STREAM_BUFFER_SIZE = 1024,
55-
INFLATE_BUFFER_SIZE = 32768
29+
char * src_buffer;
30+
enum {
31+
/// proper buffer sizes are not required from functional standpoint, but should speed up processing
32+
STREAM_BUFFER_SIZE = (256 * 1024) + 4096, // CH uses 256k frames
33+
INFLATE_BUFFER_SIZE = 256 * 1024 * 4 // lets estimate ratio is 4
5634
};
57-
LZ4F_dctx* dctx;
58-
// char* _buffer;
59-
// std::istream* _pIstr;
60-
// std::ostream* _pOstr;
61-
// // z_stream _zstr;
62-
// bool _eof;
63-
// bool _check;
64-
65-
// uint16_t avail_in; /* number of bytes available at next_in */
66-
// uint32_t total_in; /* total number of input bytes read so far */
67-
68-
// unsigned char *next_out; /* next output byte will go here */
69-
// unsigned char *next_in; /* next output byte will go here */
70-
// uint16_t avail_out; /* remaining free space at next_out */
71-
// uint32_t total_out; /* total number of bytes output so far */
72-
size_t bytes_left = 0;
73-
char * src_buffer;
35+
LZ4F_dctx * dctx;
36+
size_t bytes_left = 0;
7437
};
7538

7639

77-
class LZ4InflatingIOS: public virtual std::ios
78-
/// The base class for InflatingOutputStream and InflatingInputStream.
79-
///
80-
/// This class is needed to ensure the correct initialization
81-
/// order of the stream buffer and base classes.
82-
{
40+
/// Theses classes are needed to ensure the correct initialization
41+
/// order of the stream buffer and base classes.
42+
/// Exacltly the same as Poco::InflatingIOS and Poco:InflatingInputStream
43+
template <typename StreamBufType>
44+
class GenericInflatingIOS : public virtual std::ios {
8345
public:
84-
LZ4InflatingIOS(std::istream& istr);
85-
/// Creates an LZ4InflatingIOS for expanding the compressed data read from
86-
/// the given input stream.
87-
///
88-
/// Please refer to the zlib documentation of inflateInit2() for a description
89-
/// of the windowBits parameter.
46+
GenericInflatingIOS(std::istream & istr) : buf(istr) {
47+
poco_ios_init(&buf);
48+
}
9049

91-
~LZ4InflatingIOS();
92-
/// Destroys the LZ4InflatingIOS.
9350

94-
LZ4InflatingStreamBuf* rdbuf();
95-
/// Returns a pointer to the underlying stream buffer.
51+
~GenericInflatingIOS() {};
9652

9753
protected:
98-
LZ4InflatingStreamBuf _buf;
54+
StreamBufType buf;
9955
};
10056

101-
class LZ4InflatingInputStream: public std::istream, public LZ4InflatingIOS
102-
{
57+
58+
using LZ4InflatingIOS = GenericInflatingIOS<LZ4InflatingStreamBuf>;
59+
60+
61+
template <typename IOSType>
62+
class GenericInflatingInputStream : public std::istream, public IOSType {
10363
public:
104-
LZ4InflatingInputStream(std::istream& istr);
105-
/// Creates an InflatingInputStreamLZ4 for expanding the compressed data read from
106-
/// the given input stream.
107-
///
108-
/// Please refer to the zlib documentation of inflateInit2() for a description
109-
/// of the windowBits parameter.
110-
111-
~LZ4InflatingInputStream();
112-
/// Destroys the InflatingInputStreamLZ4.
113-
114-
void reset();
115-
/// Resets the zlib machinery so that another zlib stream can be read from
116-
/// the same underlying input stream.
64+
using IOSType::buf;
65+
GenericInflatingInputStream(std::istream & istr) : std::istream(&buf), IOSType(istr) { }
66+
67+
~GenericInflatingInputStream() { }
11768
};
69+
70+
using LZ4InflatingInputStream = GenericInflatingInputStream<LZ4InflatingIOS>;

0 commit comments

Comments
 (0)