Skip to content

Commit

Permalink
KUDU-2305: Limit sidecars to INT_MAX and fortify socket code
Browse files Browse the repository at this point in the history
Inspection of the code revealed some other local variables
that could overflow with large messages. This patch takes
two approaches to eliminate the issues.

First, it limits the total size of the messages by limiting
the total size of the sidecars to INT_MAX. The total size
of the protobuf and header components of the message
should be considerably smaller, so limiting the sidecars
to INT_MAX eliminates messages that are larger than UINT_MAX.
This also means that the sidecar offsets, which are unsigned
32-bit integers, are also safe. Given that
FLAGS_rpc_max_message_size is limited to INT_MAX at startup,
the receiver would reject any message this large anyway.
This also helps with the networking codepath, as any given
sidecar will have a size less than INT_MAX, so every Slice
that interacts with Writev() is shorter than INT_MAX.

Second, even with sidecars limited to INT_MAX, the headers
and protobuf parts of the messages mean that certain messages
could still exceed INT_MAX. This patch changes some of the sockets
codepath to tolerate iovec's that reference more than INT_MAX
bytes total. Specifically, it changes Writev()'s nwritten bytes
to an int64_t for both TlsSocket and Socket. TlsSocket works
because it is sending each Slice individually. The first change
limited any given Slice to INT_MAX, so each individual Write()
should not be impacted. For Socket, Writev() uses sendmsg(). It
should do partial network sends to handle this case. Any Write()
call specifies its size with a 32-bit integer, and that will
not be impacted by this patch.

Testing:
 - Modified TestRpcSidecarLimits() to verify that sidecars are
   limited to INT_MAX bytes.
 - Added a test mode to TestRpcSidecarLimits() where it
   overrides rpc_max_message_size and sends the maximal
   message. This verifies that the client send codepath
   can handle the maximal message.

Reviewed-on: http://gerrit.cloudera.org:8080/9601
Reviewed-by: Todd Lipcon <todd@apache.org>
Tested-by: Todd Lipcon <todd@apache.org>

Changes from Kudu version:
 - Updated declaration of FLAGS_rpc_max_message_size
   in rpc-mgr.cc and added a warning not to set it
   larger than INT_MAX.

Change-Id: I469feff940fdd07e1e407c9df49de79ed303151e
Reviewed-on: http://gerrit.cloudera.org:8080/9748
Reviewed-by: Michael Ho <kwho@cloudera.com>
Tested-by: Impala Public Jenkins
  • Loading branch information
joemcdonnell authored and Impala Public Jenkins committed Mar 22, 2018
1 parent 4e4cbc6 commit 942de72
Show file tree
Hide file tree
Showing 17 changed files with 135 additions and 38 deletions.
23 changes: 17 additions & 6 deletions be/src/kudu/rpc/inbound_call.cc
Original file line number Diff line number Diff line change
Expand Up @@ -161,16 +161,17 @@ void InboundCall::SerializeResponseBuffer(const MessageLite& response,
ResponseHeader resp_hdr;
resp_hdr.set_call_id(header_.call_id());
resp_hdr.set_is_error(!is_success);
uint32_t absolute_sidecar_offset = protobuf_msg_size;
int32_t sidecar_byte_size = 0;
for (const unique_ptr<RpcSidecar>& car : outbound_sidecars_) {
resp_hdr.add_sidecar_offsets(absolute_sidecar_offset);
absolute_sidecar_offset += car->AsSlice().size();
resp_hdr.add_sidecar_offsets(sidecar_byte_size + protobuf_msg_size);
int32_t sidecar_bytes = car->AsSlice().size();
DCHECK_LE(sidecar_byte_size, TransferLimits::kMaxTotalSidecarBytes - sidecar_bytes);
sidecar_byte_size += sidecar_bytes;
}

int additional_size = absolute_sidecar_offset - protobuf_msg_size;
serialization::SerializeMessage(response, &response_msg_buf_,
additional_size, true);
int main_msg_size = additional_size + response_msg_buf_.size();
sidecar_byte_size, true);
int64_t main_msg_size = sidecar_byte_size + response_msg_buf_.size();
serialization::SerializeHeader(resp_hdr, main_msg_size,
&response_hdr_buf_);
}
Expand Down Expand Up @@ -198,7 +199,17 @@ Status InboundCall::AddOutboundSidecar(unique_ptr<RpcSidecar> car, int* idx) {
if (outbound_sidecars_.size() > TransferLimits::kMaxSidecars) {
return Status::ServiceUnavailable("All available sidecars already used");
}
int64_t sidecar_bytes = car->AsSlice().size();
if (outbound_sidecars_total_bytes_ >
TransferLimits::kMaxTotalSidecarBytes - sidecar_bytes) {
return Status::RuntimeError(Substitute("Total size of sidecars $0 would exceed limit $1",
static_cast<int64_t>(outbound_sidecars_total_bytes_) + sidecar_bytes,
TransferLimits::kMaxTotalSidecarBytes));
}

outbound_sidecars_.emplace_back(std::move(car));
outbound_sidecars_total_bytes_ += sidecar_bytes;
DCHECK_GE(outbound_sidecars_total_bytes_, 0);
*idx = outbound_sidecars_.size() - 1;
return Status::OK();
}
Expand Down
4 changes: 4 additions & 0 deletions be/src/kudu/rpc/inbound_call.h
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,10 @@ class InboundCall {
// after serialization of the protobuf. See rpc/rpc_sidecar.h for more info.
std::vector<std::unique_ptr<RpcSidecar>> outbound_sidecars_;

// Total size of sidecars in outbound_sidecars_. This is limited to a maximum
// of TransferLimits::kMaxTotalSidecarBytes.
int32_t outbound_sidecars_total_bytes_ = 0;

// Inbound sidecars from the request. The slices are views onto transfer_. There are as
// many slices as header_.sidecar_offsets_size().
Slice inbound_sidecar_slices_[TransferLimits::kMaxSidecars];
Expand Down
4 changes: 3 additions & 1 deletion be/src/kudu/rpc/outbound_call.cc
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,9 @@ void OutboundCall::SetRequestPayload(const Message& req,
sidecar_byte_size_ = 0;
for (const unique_ptr<RpcSidecar>& car: sidecars_) {
header_.add_sidecar_offsets(sidecar_byte_size_ + message_size);
sidecar_byte_size_ += car->AsSlice().size();
int32_t sidecar_bytes = car->AsSlice().size();
DCHECK_LE(sidecar_byte_size_, TransferLimits::kMaxTotalSidecarBytes - sidecar_bytes);
sidecar_byte_size_ += sidecar_bytes;
}

serialization::SerializeMessage(req, &request_buf_, sidecar_byte_size_, true);
Expand Down
3 changes: 2 additions & 1 deletion be/src/kudu/rpc/outbound_call.h
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,8 @@ class OutboundCall {
std::vector<std::unique_ptr<RpcSidecar>> sidecars_;

// Total size in bytes of all sidecars in 'sidecars_'. Set in SetRequestPayload().
int64_t sidecar_byte_size_ = -1;
// This cannot exceed TransferLimits::kMaxTotalSidecarBytes.
int32_t sidecar_byte_size_ = -1;

// True if cancellation was requested on this call.
bool cancellation_requested_;
Expand Down
42 changes: 37 additions & 5 deletions be/src/kudu/rpc/rpc-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

#include "kudu/rpc/rpc-test-base.h"

#include <limits.h>
#include <limits>
#include <memory>
#include <string>
#include <unistd.h>
Expand Down Expand Up @@ -668,8 +668,42 @@ TEST_P(TestRpc, TestRpcSidecarLimits) {
CHECK(!controller.AddOutboundSidecar(RpcSidecar::FromSlice(Slice(s)), &idx).ok());
}

// Construct a string to use as a maximal payload in following tests
string max_string(TransferLimits::kMaxTotalSidecarBytes, 'a');

{
// Test that the payload may not exceed --rpc_max_message_size.
// Test that limit on the total size of sidecars is respected. The maximal payload
// reaches the limit exactly.
RpcController controller;
int idx;
ASSERT_OK(controller.AddOutboundSidecar(RpcSidecar::FromSlice(Slice(max_string)), &idx));

// Trying to add another byte will fail.
int dummy = 0;
string s2(1, 'b');
Status max_sidecar_status =
controller.AddOutboundSidecar(RpcSidecar::FromSlice(Slice(s2)), &dummy);
ASSERT_FALSE(max_sidecar_status.ok());
ASSERT_STR_MATCHES(max_sidecar_status.ToString(), "Total size of sidecars");
}

// Test two cases:
// 1) The RPC has maximal size and exceeds rpc_max_message_size. This tests the
// functionality of rpc_max_message_size. The server will close the connection
// immediately.
// 2) The RPC has maximal size, but rpc_max_message_size has been set to a higher
// value. This tests the client's ability to send the maximal message.
// The server will reject the message after it has been transferred.
// This test is disabled for TSAN due to high memory requirements.
std::vector<int64_t> rpc_max_message_values;
rpc_max_message_values.push_back(FLAGS_rpc_max_message_size);
#ifndef THREAD_SANITIZER
rpc_max_message_values.push_back(std::numeric_limits<int64_t>::max());
#endif
for (int64_t rpc_max_message_size_val : rpc_max_message_values) {
// Set rpc_max_message_size
FLAGS_rpc_max_message_size = rpc_max_message_size_val;

// Set up server.
Sockaddr server_addr;
bool enable_ssl = GetParam();
Expand All @@ -683,10 +717,8 @@ TEST_P(TestRpc, TestRpcSidecarLimits) {
RpcController controller;
// KUDU-2305: Test with a maximal payload to verify that the implementation
// can handle the limits.
string s;
s.resize(INT_MAX, 'a');
int idx;
CHECK_OK(controller.AddOutboundSidecar(RpcSidecar::FromSlice(Slice(s)), &idx));
ASSERT_OK(controller.AddOutboundSidecar(RpcSidecar::FromSlice(Slice(max_string)), &idx));

PushTwoStringsRequestPB request;
request.set_sidecar1_idx(idx);
Expand Down
15 changes: 14 additions & 1 deletion be/src/kudu/rpc/rpc_controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@

#include <glog/logging.h>

#include "kudu/gutil/strings/substitute.h"
#include "kudu/rpc/messenger.h"
#include "kudu/rpc/outbound_call.h"
#include "kudu/rpc/rpc_header.pb.h"

using std::unique_ptr;

using strings::Substitute;
namespace kudu {
namespace rpc {

Expand All @@ -51,6 +52,7 @@ void RpcController::Swap(RpcController* other) {
}

std::swap(outbound_sidecars_, other->outbound_sidecars_);
std::swap(outbound_sidecars_total_bytes_, other->outbound_sidecars_total_bytes_);
std::swap(timeout_, other->timeout_);
std::swap(credentials_policy_, other->credentials_policy_);
std::swap(call_, other->call_);
Expand All @@ -65,6 +67,7 @@ void RpcController::Reset() {
required_server_features_.clear();
credentials_policy_ = CredentialsPolicy::ANY_CREDENTIALS;
messenger_ = nullptr;
outbound_sidecars_total_bytes_ = 0;
}

bool RpcController::finished() const {
Expand Down Expand Up @@ -137,7 +140,17 @@ Status RpcController::AddOutboundSidecar(unique_ptr<RpcSidecar> car, int* idx) {
if (outbound_sidecars_.size() >= TransferLimits::kMaxSidecars) {
return Status::RuntimeError("All available sidecars already used");
}
int64_t sidecar_bytes = car->AsSlice().size();
if (outbound_sidecars_total_bytes_ >
TransferLimits::kMaxTotalSidecarBytes - sidecar_bytes) {
return Status::RuntimeError(Substitute("Total size of sidecars $0 would exceed limit $1",
static_cast<int64_t>(outbound_sidecars_total_bytes_) + sidecar_bytes,
TransferLimits::kMaxTotalSidecarBytes));
}

outbound_sidecars_.emplace_back(std::move(car));
outbound_sidecars_total_bytes_ += sidecar_bytes;
DCHECK_GE(outbound_sidecars_total_bytes_, 0);
*idx = outbound_sidecars_.size() - 1;
return Status::OK();
}
Expand Down
7 changes: 6 additions & 1 deletion be/src/kudu/rpc/rpc_controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,8 @@ class RpcController {

// Adds a sidecar to the outbound request. The index of the sidecar is written to
// 'idx'. Returns an error if TransferLimits::kMaxSidecars have already been added
// to this request.
// to this request. Also returns an error if the total size of all sidecars would
// exceed TransferLimits::kMaxTotalSidecarBytes.
Status AddOutboundSidecar(std::unique_ptr<RpcSidecar> car, int* idx);

// Cancel the call associated with the RpcController. This function should only be
Expand Down Expand Up @@ -269,6 +270,10 @@ class RpcController {

std::vector<std::unique_ptr<RpcSidecar>> outbound_sidecars_;

// Total size of sidecars in outbound_sidecars_. This is limited to a maximum
// of TransferLimits::kMaxTotalSidecarBytes.
int32_t outbound_sidecars_total_bytes_ = 0;

DISALLOW_COPY_AND_ASSIGN(RpcController);
};

Expand Down
8 changes: 7 additions & 1 deletion be/src/kudu/rpc/rpc_sidecar.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,16 @@ Status RpcSidecar::ParseSidecars(
int last = offsets.size() - 1;
if (last >= TransferLimits::kMaxSidecars) {
return Status::Corruption(strings::Substitute(
"Received $0 additional payload slices, expected at most %d",
"Received $0 additional payload slices, expected at most $1",
last, TransferLimits::kMaxSidecars));
}

if (buffer.size() > TransferLimits::kMaxTotalSidecarBytes) {
return Status::Corruption(strings::Substitute(
"Received $0 payload bytes, expected at most $1",
buffer.size(), TransferLimits::kMaxTotalSidecarBytes));
}

for (int i = 0; i < last; ++i) {
int64_t cur_offset = offsets.Get(i);
int64_t next_offset = offsets.Get(i + 1);
Expand Down
13 changes: 12 additions & 1 deletion be/src/kudu/rpc/serialization.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
#include "kudu/util/slice.h"
#include "kudu/util/status.h"

DECLARE_int32(rpc_max_message_size);
DECLARE_int64(rpc_max_message_size);

using google::protobuf::MessageLite;
using google::protobuf::io::CodedInputStream;
Expand All @@ -49,6 +49,7 @@ enum {

void SerializeMessage(const MessageLite& message, faststring* param_buf,
int additional_size, bool use_cached_size) {
DCHECK_GE(additional_size, 0);
int pb_size = use_cached_size ? message.GetCachedSize() : message.ByteSize();
DCHECK_EQ(message.ByteSize(), pb_size);
// Use 8-byte integers to avoid overflowing when additional_size approaches INT_MAX.
Expand All @@ -57,6 +58,10 @@ void SerializeMessage(const MessageLite& message, faststring* param_buf,
int64_t size_with_delim = static_cast<int64_t>(pb_size) +
static_cast<int64_t>(CodedOutputStream::VarintSize32(recorded_size));
int64_t total_size = size_with_delim + static_cast<int64_t>(additional_size);
// The message format relies on an unsigned 32-bit integer to express the size, so
// the message must not exceed this size. Since additional_size is limited to INT_MAX,
// this is a safe limitation.
CHECK_LE(total_size, std::numeric_limits<uint32_t>::max());

if (total_size > FLAGS_rpc_max_message_size) {
LOG(WARNING) << Substitute("Serialized $0 ($1 bytes) is larger than the maximum configured "
Expand Down Expand Up @@ -116,6 +121,12 @@ Status ParseMessage(const Slice& buf,
DCHECK_EQ(total_len, buf.size() - kMsgLengthPrefixLength)
<< "Got mis-sized buffer: " << KUDU_REDACT(buf.ToDebugString());

if (total_len > std::numeric_limits<int32_t>::max()) {
return Status::Corruption(Substitute("Invalid packet: message had a length of $0, "
"but we only support messages up to $1 bytes\n",
total_len, std::numeric_limits<int32_t>::max()));
}

CodedInputStream in(buf.data(), buf.size());
// Protobuf enforces a 64MB total bytes limit on CodedInputStream by default.
// Override this default with the actual size of the buffer to allow messages
Expand Down
20 changes: 14 additions & 6 deletions be/src/kudu/rpc/transfer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,22 @@
#include "kudu/util/net/sockaddr.h"
#include "kudu/util/net/socket.h"

DEFINE_int32_hidden(rpc_max_message_size, (50 * 1024 * 1024),
DEFINE_int64_hidden(rpc_max_message_size, (50 * 1024 * 1024),
"The maximum size of a message that any RPC that the server will accept. "
"Must be at least 1MB.");
TAG_FLAG(rpc_max_message_size, advanced);
TAG_FLAG(rpc_max_message_size, runtime);

static bool ValidateMaxMessageSize(const char* flagname, int32_t value) {
static bool ValidateMaxMessageSize(const char* flagname, int64_t value) {
if (value < 1 * 1024 * 1024) {
LOG(ERROR) << flagname << " must be at least 1MB.";
return false;
}
if (value > std::numeric_limits<int32_t>::max()) {
LOG(ERROR) << flagname << " must be less than "
<< std::numeric_limits<int32_t>::max() << " bytes.";
}

return true;
}
static bool dummy = google::RegisterFlagValidator(
Expand Down Expand Up @@ -116,9 +121,12 @@ Status InboundTransfer::ReceiveBuffer(Socket &socket) {
// receive message body
int32_t nread;

// If total_length_ > INT_MAX, then it would exceed the maximum rpc_max_message_size
// and exit above. Hence, it is safe to use int32_t here.
int32_t rem = total_length_ - cur_offset_;
// Socket::Recv() handles at most INT_MAX at a time, so cap the remainder at
// INT_MAX. The message will be split across multiple Recv() calls.
// Note that this is only needed when rpc_max_message_size > INT_MAX, which is
// currently only used for unit tests.
int32_t rem = std::min(total_length_ - cur_offset_,
static_cast<uint32_t>(std::numeric_limits<int32_t>::max()));
Status status = socket.Recv(&buf_[cur_offset_], rem, &nread);
RETURN_ON_ERROR_OR_SOCKET_NOT_READY(status);
cur_offset_ += nread;
Expand Down Expand Up @@ -200,7 +208,7 @@ Status OutboundTransfer::SendBuffer(Socket &socket) {
}
}

int32_t written;
int64_t written;
Status status = socket.Writev(iovec, n_iovecs, &written);
RETURN_ON_ERROR_OR_SOCKET_NOT_READY(status);

Expand Down
5 changes: 3 additions & 2 deletions be/src/kudu/rpc/transfer.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
#include "kudu/util/net/sockaddr.h"
#include "kudu/util/status.h"

DECLARE_int32(rpc_max_message_size);
DECLARE_int64(rpc_max_message_size);

namespace google {
namespace protobuf {
Expand All @@ -51,7 +51,8 @@ class TransferLimits {
public:
enum {
kMaxSidecars = 10,
kMaxPayloadSlices = kMaxSidecars + 2 // (header + msg)
kMaxPayloadSlices = kMaxSidecars + 2, // (header + msg)
kMaxTotalSidecarBytes = INT_MAX
};

DISALLOW_IMPLICIT_CONSTRUCTORS(TransferLimits);
Expand Down
14 changes: 7 additions & 7 deletions be/src/kudu/security/tls_socket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,29 +71,29 @@ Status TlsSocket::Write(const uint8_t *buf, int32_t amt, int32_t *nwritten) {
return Status::OK();
}

Status TlsSocket::Writev(const struct ::iovec *iov, int iov_len, int32_t *nwritten) {
Status TlsSocket::Writev(const struct ::iovec *iov, int iov_len, int64_t *nwritten) {
SCOPED_OPENSSL_NO_PENDING_ERRORS;
CHECK(ssl_);
int32_t total_written = 0;
*nwritten = 0;
// Allows packets to be aggresively be accumulated before sending.
RETURN_NOT_OK(SetTcpCork(1));
Status write_status = Status::OK();
for (int i = 0; i < iov_len; ++i) {
int32_t frame_size = iov[i].iov_len;
int32_t bytes_written;
// Don't return before unsetting TCP_CORK.
write_status = Write(static_cast<uint8_t*>(iov[i].iov_base), frame_size, nwritten);
write_status = Write(static_cast<uint8_t*>(iov[i].iov_base), frame_size, &bytes_written);
if (!write_status.ok()) break;

// nwritten should have the correct amount written.
total_written += *nwritten;
if (*nwritten < frame_size) break;
*nwritten += bytes_written;
if (bytes_written < frame_size) break;
}
RETURN_NOT_OK(SetTcpCork(0));
*nwritten = total_written;
// If we did manage to write something, but not everything, due to a temporary socket
// error, then we should still return an OK status indicating a successful _partial_
// write.
if (total_written > 0 && Socket::IsTemporarySocketError(write_status.posix_code())) {
if (*nwritten > 0 && Socket::IsTemporarySocketError(write_status.posix_code())) {
return Status::OK();
}
return write_status;
Expand Down
2 changes: 1 addition & 1 deletion be/src/kudu/security/tls_socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class TlsSocket : public Socket {

Status Writev(const struct ::iovec *iov,
int iov_len,
int32_t *nwritten) override WARN_UNUSED_RESULT;
int64_t *nwritten) override WARN_UNUSED_RESULT;

Status Recv(uint8_t *buf, int32_t amt, int32_t *nread) override WARN_UNUSED_RESULT;

Expand Down
Loading

0 comments on commit 942de72

Please sign in to comment.