Skip to content

Commit

Permalink
[yugabyte#9370] Implement network traffic compression
Browse files Browse the repository at this point in the history
Summary:
This diff implements support for network traffic compression.
There are 2 flags to configure it:
enable_stream_compression - whether we enable compression at all.
stream_compression_algo - algorithm index that should be used for compression:
0 - no compression
1 - gzip

It should be safe to enable compression and set the algorithm to 0.
But since this feature is pretty new, we fully disable compression by default.

Introduced a StreamRefiner for refined streams, ie; encryption / compression.

The following compression related work should be done in follow-up diffs:
1) Add tests for encryption+compression.
2) Add more compression algorithms.
3) Change `StreamRefiner` interface to avoid the extra copy of decompressed data.

Test Plan:
ybd --gtest_filter CompressedStreamTest.*
ybd --gtest_filter TestRpcCompression.*

Reviewers: bogdan

Reviewed By: bogdan

Subscribers: sanketh, ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D12328
  • Loading branch information
spolitov committed Jul 26, 2021
1 parent c3048aa commit 3df0149
Show file tree
Hide file tree
Showing 17 changed files with 1,264 additions and 445 deletions.
27 changes: 23 additions & 4 deletions ent/src/yb/server/secure.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

#include "yb/fs/fs_manager.h"

#include "yb/rpc/compressed_stream.h"
#include "yb/rpc/messenger.h"
#include "yb/rpc/secure_stream.h"
#include "yb/rpc/tcp_stream.h"
Expand Down Expand Up @@ -53,6 +54,8 @@ DEFINE_string(key_file_pattern, "node.$0.key", "Pattern used for key file");

DEFINE_string(cert_file_pattern, "node.$0.crt", "Pattern used for certificate file");

DEFINE_bool(enable_stream_compression, false, "Whether it is allowed to use stream compression.");

namespace yb {
namespace server {
namespace {
Expand Down Expand Up @@ -87,13 +90,28 @@ Result<std::unique_ptr<rpc::SecureContext>> SetupSecureContext(
return SetupSecureContext(std::string(), root_dir, name, type, builder);
}

void ApplyCompressedStream(
rpc::MessengerBuilder* builder, const rpc::StreamFactoryPtr lower_layer_factory) {
if (!FLAGS_enable_stream_compression) {
return;
}
builder->SetListenProtocol(rpc::CompressedStreamProtocol());
auto parent_mem_tracker = builder->last_used_parent_mem_tracker();
auto buffer_tracker = MemTracker::FindOrCreateTracker(
-1, "Compressed Read Buffer", parent_mem_tracker);
builder->AddStreamFactory(
rpc::CompressedStreamProtocol(),
rpc::CompressedStreamFactory(std::move(lower_layer_factory), buffer_tracker));
}

Result<std::unique_ptr<rpc::SecureContext>> SetupSecureContext(
const std::string& cert_dir, const std::string& root_dir, const std::string& name,
SecureContextType type, rpc::MessengerBuilder* builder) {
auto use = type == SecureContextType::kInternal ? FLAGS_use_node_to_node_encryption
: FLAGS_use_client_to_server_encryption;
if (!use) {
return std::unique_ptr<rpc::SecureContext>();
ApplyCompressedStream(builder, rpc::TcpStream::Factory());
return nullptr;
}

std::string dir;
Expand Down Expand Up @@ -157,10 +175,11 @@ void ApplySecureContext(const rpc::SecureContext* context, rpc::MessengerBuilder
auto buffer_tracker = MemTracker::FindOrCreateTracker(
-1, "Encrypted Read Buffer", parent_mem_tracker);

auto secure_stream_factory = rpc::SecureStreamFactory(
rpc::TcpStream::Factory(), buffer_tracker, context);
builder->SetListenProtocol(rpc::SecureStreamProtocol());
builder->AddStreamFactory(
rpc::SecureStreamProtocol(),
rpc::SecureStreamFactory(rpc::TcpStream::Factory(), buffer_tracker, context));
builder->AddStreamFactory(rpc::SecureStreamProtocol(), secure_stream_factory);
ApplyCompressedStream(builder, secure_stream_factory);
}

} // namespace server
Expand Down
1 change: 1 addition & 0 deletions src/yb/integration-tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ ADD_YB_TEST(tablet-split-itest)

# Not sure if we really need RUN_SERIAL here as this might not be a resource-intensive test.
ADD_YB_TEST(compaction-test)
ADD_YB_TEST(compressed_stream-test)
ADD_YB_TEST(logging-test)
ADD_YB_TEST(master_replication-itest)
ADD_YB_TEST(master_sysnamespace-itest)
Expand Down
123 changes: 123 additions & 0 deletions src/yb/integration-tests/compressed_stream-test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright (c) YugaByte, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations
// under the License.
//

#include "yb/client/ql-dml-test-base.h"
#include "yb/client/session.h"
#include "yb/client/table_handle.h"

#include "yb/common/ql_value.h"

#include "yb/rpc/compressed_stream.h"
#include "yb/rpc/messenger.h"
#include "yb/rpc/tcp_stream.h"

#include "yb/server/secure.h"

#include "yb/util/size_literals.h"
#include "yb/util/env_util.h"

#include "yb/yql/cql/ql/util/errcodes.h"
#include "yb/yql/cql/ql/util/statement_result.h"

using namespace std::literals;

DECLARE_int32(stream_compression_algo);
DECLARE_bool(enable_stream_compression);

namespace yb {

class CompressedStreamTest : public client::KeyValueTableTest<MiniCluster> {
public:
void SetUp() override {
FLAGS_enable_stream_compression = true;
FLAGS_stream_compression_algo = 1;
KeyValueTableTest::SetUp();
}

CHECKED_STATUS CreateClient() override {
auto host = "127.0.0.52";
client_ = VERIFY_RESULT(DoCreateClient(host, host));
return Status::OK();
}

Result<std::unique_ptr<client::YBClient>> DoCreateClient(
const std::string& name, const std::string& host) {
rpc::MessengerBuilder messenger_builder("test_client");
messenger_builder.SetListenProtocol(rpc::CompressedStreamProtocol());
messenger_builder.AddStreamFactory(
rpc::CompressedStreamProtocol(),
CompressedStreamFactory(rpc::TcpStream::Factory(), MemTracker::GetRootTracker()));
auto messenger = VERIFY_RESULT(messenger_builder.Build());
messenger->TEST_SetOutboundIpBase(VERIFY_RESULT(HostToAddress(host)));
return cluster_->CreateClient(std::move(messenger));
}

void TestSimpleOps();
};

void CompressedStreamTest::TestSimpleOps() {
CreateTable(client::Transactional::kFalse);

const int32_t kKey = 1;
const int32_t kValue = 2;

{
auto session = NewSession();
auto op = ASSERT_RESULT(WriteRow(session, kKey, kValue));
ASSERT_EQ(op->response().status(), QLResponsePB::YQL_STATUS_OK);
}

{
auto value = ASSERT_RESULT(SelectRow(NewSession(), kKey));
ASSERT_EQ(kValue, value);
}
}

TEST_F(CompressedStreamTest, Simple) {
TestSimpleOps();
}

TEST_F(CompressedStreamTest, BigWrite) {
client::YBSchemaBuilder builder;
builder.AddColumn(kKeyColumn)->Type(INT32)->HashPrimaryKey()->NotNull();
builder.AddColumn(kValueColumn)->Type(STRING);

ASSERT_OK(table_.Create(client::kTableName, 1, client_.get(), &builder));

const int32_t kKey = 1;
const std::string kValue(64_KB, 'X');

auto session = NewSession();
{
const auto op = table_.NewWriteOp(QLWriteRequestPB::QL_STMT_INSERT);
auto* const req = op->mutable_request();
QLAddInt32HashValue(req, kKey);
table_.AddStringColumnValue(req, kValueColumn, kValue);
ASSERT_OK(session->ApplyAndFlush(op));
ASSERT_OK(CheckOp(op.get()));
}

{
const auto op = table_.NewReadOp();
auto* const req = op->mutable_request();
QLAddInt32HashValue(req, kKey);
table_.AddColumns({kValueColumn}, req);
ASSERT_OK(session->ApplyAndFlush(op));
ASSERT_OK(CheckOp(op.get()));
auto rowblock = yb::ql::RowsResult(op.get()).GetRowBlock();
ASSERT_EQ(rowblock->row_count(), 1);
ASSERT_EQ(kValue, rowblock->row(0).column(0).string_value());
}
}

} // namespace yb
2 changes: 2 additions & 0 deletions src/yb/rpc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ set(YRPC_SRCS
acceptor.cc
binary_call_parser.cc
circular_read_buffer.cc
compressed_stream.cc
connection.cc
connection_context.cc
growable_buffer.cc
Expand All @@ -72,6 +73,7 @@ set(YRPC_SRCS
poller.cc
proxy.cc
reactor.cc
refined_stream.cc
remote_method.cc
rpc.cc
rpc_context.cc
Expand Down
Loading

0 comments on commit 3df0149

Please sign in to comment.