Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
*.swp
bazel-*
compile_commands.json
build
settings.json
76 changes: 66 additions & 10 deletions muduo/net/ZlibStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,70 @@ class ZlibInputStream : noncopyable
{
finish();
}
int zlibErrorCode() const { return zerror_; }
bool write(StringPiece buf)
{
if (zerror_ != Z_OK && zerror_ != Z_STREAM_END)
return false;

assert(zstream_.next_in == NULL && zstream_.avail_in == 0);
void* in = const_cast<char*>(buf.data());
zstream_.next_in = static_cast<Bytef*>(in);
zstream_.avail_in = buf.size();
while (zstream_.avail_in > 0 && zerror_ == Z_OK)
{
zerror_ = decompress(Z_NO_FLUSH);
}
if (zstream_.avail_in == 0)
{
assert(static_cast<const void*>(zstream_.next_in) == buf.end());
zstream_.next_in = NULL;
}
return zerror_ == Z_OK || zerror_ == Z_STREAM_END;
}

// compress input as much as possible, not guarantee consuming all data.
bool write(Buffer* input)
{
if (zerror_ != Z_OK && zerror_ != Z_STREAM_END)
return false;

bool write(StringPiece buf);
bool write(Buffer* input);
bool finish();
// inflateEnd(&zstream_);
void* in = const_cast<char*>(input->peek());
zstream_.next_in = static_cast<Bytef*>(in);
zstream_.avail_in = static_cast<int>(input->readableBytes());
if (zstream_.avail_in > 0 && zerror_ == Z_OK)
{
zerror_ = decompress(Z_NO_FLUSH);
}
input->retrieve(input->readableBytes() - zstream_.avail_in);
return zerror_ == Z_OK || zerror_ == Z_STREAM_END;
}
bool finish()
{
if (zerror_ != Z_OK && zerror_ != Z_STREAM_END) {
return false;
}
zerror_ = inflateEnd(&zstream_);
bool ok = (zerror_ == Z_OK || zerror_ == Z_STREAM_END);
zerror_ = Z_STREAM_END;
return ok;
}

private:
int decompress(int flush);
int decompress(int flush)
{
int buffer_size = 1024;
output_->ensureWritableBytes(buffer_size);
zstream_.next_out = reinterpret_cast<Bytef*>(output_->beginWrite());
zstream_.avail_out = static_cast<int>(output_->writableBytes());
int error = ::inflate(&zstream_, flush);
output_->hasWritten(output_->writableBytes() - zstream_.avail_out);
if (output_->writableBytes() == 0 && buffer_size < 65536)
{
buffer_size *= 2;
}
return error;
}

Buffer* output_;
z_stream zstream_;
Expand Down Expand Up @@ -69,7 +125,7 @@ class ZlibOutputStream : noncopyable

bool write(StringPiece buf)
{
if (zerror_ != Z_OK)
if (zerror_ != Z_OK && zerror_ != Z_STREAM_END)
return false;

assert(zstream_.next_in == NULL && zstream_.avail_in == 0);
Expand All @@ -85,13 +141,13 @@ class ZlibOutputStream : noncopyable
assert(static_cast<const void*>(zstream_.next_in) == buf.end());
zstream_.next_in = NULL;
}
return zerror_ == Z_OK;
return zerror_ == Z_OK || zerror_ == Z_STREAM_END;
}

// compress input as much as possible, not guarantee consuming all data.
bool write(Buffer* input)
{
if (zerror_ != Z_OK)
if (zerror_ != Z_OK && zerror_ != Z_STREAM_END)
return false;

void* in = const_cast<char*>(input->peek());
Expand All @@ -102,12 +158,12 @@ class ZlibOutputStream : noncopyable
zerror_ = compress(Z_NO_FLUSH);
}
input->retrieve(input->readableBytes() - zstream_.avail_in);
return zerror_ == Z_OK;
return zerror_ == Z_OK || zerror_ == Z_STREAM_END;
}

bool finish()
{
if (zerror_ != Z_OK)
if (zerror_ != Z_OK && zerror_ != Z_STREAM_END)
return false;

while (zerror_ == Z_OK)
Expand Down
53 changes: 49 additions & 4 deletions muduo/net/tests/ZlibStream_unittest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,55 @@ BOOST_AUTO_TEST_CASE(testZlibOutputStream5)
{
BOOST_CHECK(stream.write(input));
}
printf("bufsiz %d\n", stream.internalOutputBufferSize());
LOG_INFO << "total_in " << stream.inputBytes();
LOG_INFO << "total_out " << stream.outputBytes();
stream.finish();
printf("total %zd\n", output.readableBytes());
BOOST_CHECK_EQUAL(stream.zlibErrorCode(), Z_STREAM_END);
}


BOOST_AUTO_TEST_CASE(testZlibInputStream) {
muduo::net::Buffer input;// zlib_data
muduo::net::Buffer output;// data
{
muduo::net::ZlibOutputStream outStream(&input);
outStream.write("Hello, Zlib!");
outStream.finish();
}
muduo::net::ZlibInputStream inStream(&output);
std::string result;
while (input.readableBytes() > 0 && (inStream.zlibErrorCode() == Z_OK || inStream.zlibErrorCode() == Z_NEED_DICT)) {
BOOST_CHECK(inStream.write(&input));
}
inStream.finish();
BOOST_CHECK_EQUAL(inStream.zlibErrorCode(), Z_STREAM_END);
}

BOOST_AUTO_TEST_CASE(testZlibInputStreamEmpty) {
muduo::net::Buffer input;
muduo::net::ZlibInputStream inStream(&input);
muduo::net::Buffer output;
BOOST_CHECK(inStream.write(&output));
BOOST_CHECK_EQUAL(output.readableBytes(), 0);
}

BOOST_AUTO_TEST_CASE(testZlibInputStreamLargeData) {
muduo::net::Buffer input;
std::string largeData;
for (int i = 0; i < 1000; ++i) {
largeData += "This is a test for large data compression and decompression. ";
}

{
muduo::net::ZlibOutputStream outStream(&input);
outStream.write(largeData);
outStream.finish();
}

muduo::net::Buffer output;
muduo::net::ZlibInputStream inStream(&output);
std::string result;
while (input.readableBytes() > 0 && (inStream.zlibErrorCode() == Z_OK || inStream.zlibErrorCode() == Z_NEED_DICT)) {
BOOST_CHECK(inStream.write(&input));
}
result.append(output.peek(), output.readableBytes());
BOOST_CHECK_EQUAL(result, largeData);
}