Skip to content

Commit

Permalink
[yugabyte#9372] Add RPC call metrics
Browse files Browse the repository at this point in the history
Summary:
This diff adds 4 metric counters for each RPC method:

1) proxy_request_bytes_<SERVICE>_<METHOD>,
2) proxy_response_bytes_<SERVICE>_<METHOD>,
3) service_request_bytes_<SERVICE>_<METHOD>,
4) service_response_bytes_<SERVICE>_<METHOD>,

Test Plan: ybd --gtest_filter RpcStubTest.TrafficMetrics

Reviewers: bogdan, timur

Reviewed By: timur

Subscribers: ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D12348
  • Loading branch information
spolitov committed Jul 26, 2021
1 parent cc27d98 commit c3048aa
Show file tree
Hide file tree
Showing 30 changed files with 571 additions and 289 deletions.
32 changes: 29 additions & 3 deletions src/yb/rpc/inbound_call.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include "yb/rpc/rpc_introspection.pb.h"
#include "yb/rpc/rpc_metrics.h"
#include "yb/rpc/serialization.h"
#include "yb/rpc/service_if.h"
#include "yb/rpc/service_pool.h"

#include "yb/util/debug/trace_event.h"
Expand Down Expand Up @@ -158,13 +159,14 @@ MonoDelta InboundCall::GetTimeInQueue() const {
return timing_.time_handled.GetDeltaSince(timing_.time_received);
}

void InboundCall::RecordHandlingCompleted(scoped_refptr<Histogram> handler_run_time) {
void InboundCall::RecordHandlingCompleted() {
// Protect against multiple calls.
LOG_IF_WITH_PREFIX(DFATAL, timing_.time_completed.Initialized()) << "Already marked as completed";
timing_.time_completed = MonoTime::Now();
VLOG_WITH_PREFIX(4) << "Completed handling";
if (handler_run_time) {
handler_run_time->Increment((timing_.time_completed - timing_.time_handled).ToMicroseconds());
if (rpc_method_metrics_ && rpc_method_metrics_->handler_latency) {
rpc_method_metrics_->handler_latency->Increment(
(timing_.time_completed - timing_.time_handled).ToMicroseconds());
}
}

Expand Down Expand Up @@ -224,5 +226,29 @@ void InboundCall::InboundCallTask::Done(const Status& status) {
}
}

void InboundCall::SetRpcMethodMetrics(std::reference_wrapper<const RpcMethodMetrics> value) {
rpc_method_metrics_ = &value.get();
if (rpc_method_metrics_ && rpc_method_metrics_->request_bytes) {
auto request_size = request_data_.size();
if (request_size) {
rpc_method_metrics_->request_bytes->IncrementBy(request_size);
}
}
}

void InboundCall::Serialize(boost::container::small_vector_base<RefCntBuffer>* output) {
size_t old_size = output->size();
DoSerialize(output);
if (rpc_method_metrics_ && rpc_method_metrics_->response_bytes) {
auto response_size = 0;
for (size_t i = old_size; i != output->size(); ++i) {
response_size += (*output)[i].size();
}
if (response_size) {
rpc_method_metrics_->response_bytes->IncrementBy(response_size);
}
}
}

} // namespace rpc
} // namespace yb
15 changes: 9 additions & 6 deletions src/yb/rpc/inbound_call.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,6 @@ class Trace;

namespace rpc {

class DumpRunningRpcsRequestPB;
class RpcCallInProgressPB;
class RpcCallDetailsPB;
class CQLCallDetailsPB;

struct InboundCallTiming {
MonoTime time_received; // Time the call was first accepted.
MonoTime time_handled; // Time the call handler was kicked off.
Expand Down Expand Up @@ -106,6 +101,8 @@ class InboundCall : public RpcCall, public MPSCQueueEntry<InboundCall> {
CallProcessedListener call_processed_listener);
virtual ~InboundCall();

void SetRpcMethodMetrics(std::reference_wrapper<const RpcMethodMetrics> value);

// Return the serialized request parameter protobuf.
const Slice &serialized_request() const {
return serialized_request_;
Expand Down Expand Up @@ -134,7 +131,7 @@ class InboundCall : public RpcCall, public MPSCQueueEntry<InboundCall> {
// Updates the Histogram with time elapsed since the call was started,
// and should only be called once on a given instance.
// Not thread-safe. Should only be called by the current "owner" thread.
void RecordHandlingCompleted(scoped_refptr<Histogram> handler_run_time);
void RecordHandlingCompleted();

// Return true if the deadline set by the client has already elapsed.
// In this case, the server may stop processing the call, since the
Expand All @@ -146,6 +143,8 @@ class InboundCall : public RpcCall, public MPSCQueueEntry<InboundCall> {
// If the client did not specify a deadline, returns MonoTime::Max().
virtual CoarseTimePoint GetClientDeadline() const = 0;

virtual void DoSerialize(boost::container::small_vector_base<RefCntBuffer>* output) = 0;

// Returns the time spent in the service queue -- from the time the call was received, until
// it gets handled.
MonoDelta GetTimeInQueue() const;
Expand Down Expand Up @@ -197,6 +196,8 @@ class InboundCall : public RpcCall, public MPSCQueueEntry<InboundCall> {

size_t DynamicMemoryUsage() const override;

void Serialize(boost::container::small_vector_base<RefCntBuffer>* output) override final;

const CallData& request_data() const { return request_data_; }

protected:
Expand Down Expand Up @@ -229,6 +230,8 @@ class InboundCall : public RpcCall, public MPSCQueueEntry<InboundCall> {

std::atomic<bool> responded_{false};

const RpcMethodMetrics* rpc_method_metrics_ = nullptr;

private:
// The connection on which this inbound call arrived. Can be null for LocalYBInboundCall.
ConnectionPtr conn_ = nullptr;
Expand Down
7 changes: 4 additions & 3 deletions src/yb/rpc/local_call.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ LocalOutboundCall::LocalOutboundCall(
const shared_ptr<OutboundCallMetrics>& outbound_call_metrics,
google::protobuf::Message* response_storage, RpcController* controller,
RpcMetrics* rpc_metrics, ResponseCallback callback)
: OutboundCall(remote_method, outbound_call_metrics, response_storage, controller, rpc_metrics,
std::move(callback), nullptr /* callback_thread_pool */) {
: OutboundCall(remote_method, outbound_call_metrics, /* method_metrics= */ nullptr,
response_storage, controller, rpc_metrics, std::move(callback),
/* callback_thread_pool= */ nullptr) {
}

Status LocalOutboundCall::SetRequestParam(
Expand All @@ -46,7 +47,7 @@ const std::shared_ptr<LocalYBInboundCall>& LocalOutboundCall::CreateLocalInbound
DCHECK(inbound_call_.get() == nullptr);
const MonoDelta timeout = controller()->timeout();
const CoarseTimePoint deadline =
timeout.Initialized() ? ToCoarse(start_) + timeout : CoarseTimePoint::max();
timeout.Initialized() ? start_ + timeout : CoarseTimePoint::max();
auto outbound_call = std::static_pointer_cast<LocalOutboundCall>(shared_from(this));
inbound_call_ = InboundCall::Create<LocalYBInboundCall>(
&rpc_metrics(), remote_method(), outbound_call, deadline);
Expand Down
3 changes: 2 additions & 1 deletion src/yb/rpc/mt-rpc-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,8 @@ TEST_F(MultiThreadedRpcTest, MemoryLimit) {
RpcController controller;
controller.set_timeout(500ms);
auto status = proxy->SyncRequest(
CalculatorServiceMethods::EchoMethod(), req, &resp, &controller);
CalculatorServiceMethods::EchoMethod(), /* method_metrics= */ nullptr, req, &resp,
&controller);
if (big_call) {
ASSERT_NOK(status);
} else {
Expand Down
40 changes: 23 additions & 17 deletions src/yb/rpc/outbound_call.cc
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,14 @@ void InvokeCallbackTask::Done(const Status& status) {

OutboundCall::OutboundCall(const RemoteMethod* remote_method,
const std::shared_ptr<OutboundCallMetrics>& outbound_call_metrics,
std::shared_ptr<const OutboundMethodMetrics> method_metrics,
google::protobuf::Message* response_storage,
RpcController* controller,
RpcMetrics* rpc_metrics,
ResponseCallback callback,
ThreadPool* callback_thread_pool)
: hostname_(&kEmptyString),
start_(MonoTime::Now()),
start_(CoarseMonoClock::Now()),
controller_(DCHECK_NOTNULL(controller)),
response_(DCHECK_NOTNULL(response_storage)),
call_id_(NextCallId()),
Expand All @@ -157,7 +158,8 @@ OutboundCall::OutboundCall(const RemoteMethod* remote_method,
callback_thread_pool_(callback_thread_pool),
trace_(new Trace),
outbound_call_metrics_(outbound_call_metrics),
rpc_metrics_(rpc_metrics) {
rpc_metrics_(rpc_metrics),
method_metrics_(std::move(method_metrics)) {
// Avoid expensive conn_id.ToString() in production.
TRACE_TO_WITH_TIME(trace_, start_, "Outbound Call initiated.");

Expand All @@ -179,8 +181,7 @@ OutboundCall::~OutboundCall() {

if (PREDICT_FALSE(FLAGS_rpc_dump_all_traces)) {
LOG(INFO) << ToString() << " took "
<< MonoTime::Now().GetDeltaSince(start_).ToMicroseconds()
<< "us. Trace:";
<< MonoDelta(CoarseMonoClock::Now() - start_).ToMicroseconds() << "us. Trace:";
trace_->Dump(&LOG(INFO), true);
}

Expand Down Expand Up @@ -248,11 +249,12 @@ Status OutboundCall::SetRequestParam(
buffer_consumption_ = ScopedTrackedConsumption(mem_tracker, buffer_.size());
}

return SerializeMessage(message,
&buffer_,
/* additional_size */ 0,
/* use_cached_size */ true,
header_size);
RETURN_NOT_OK(SerializeMessage(
message, &buffer_, /* additional_size */ 0, /* use_cached_size */ true, header_size));
if (method_metrics_ && method_metrics_->request_bytes) {
method_metrics_->request_bytes->IncrementBy(buffer_.size());
}
return Status::OK();
}

Status OutboundCall::status() const {
Expand Down Expand Up @@ -371,16 +373,20 @@ void OutboundCall::InvokeCallbackSync() {
void OutboundCall::SetResponse(CallResponse&& resp) {
DCHECK(!IsFinished());

auto now = MonoTime::Now();
auto now = CoarseMonoClock::Now();
TRACE_TO_WITH_TIME(trace_, now, "Response received.");
// Track time taken to be responded.

if (outbound_call_metrics_) {
outbound_call_metrics_->time_to_response->Increment(now.GetDeltaSince(start_).ToMicroseconds());
outbound_call_metrics_->time_to_response->Increment(MonoDelta(now - start_).ToMicroseconds());
}
call_response_ = std::move(resp);
Slice r(call_response_.serialized_response());

if (method_metrics_ && method_metrics_->response_bytes) {
method_metrics_->response_bytes->IncrementBy(r.size());
}

if (call_response_.is_success()) {
// TODO: here we're deserializing the call response within the reactor thread,
// which isn't great, since it would block processing of other RPCs in parallel.
Expand Down Expand Up @@ -410,20 +416,20 @@ void OutboundCall::SetResponse(CallResponse&& resp) {
}

void OutboundCall::SetQueued() {
auto end_time = MonoTime::Now();
auto end_time = CoarseMonoClock::Now();
// Track time taken to be queued.
if (outbound_call_metrics_) {
outbound_call_metrics_->queue_time->Increment(end_time.GetDeltaSince(start_).ToMicroseconds());
outbound_call_metrics_->queue_time->Increment(MonoDelta(end_time - start_).ToMicroseconds());
}
SetState(ON_OUTBOUND_QUEUE);
TRACE_TO_WITH_TIME(trace_, end_time, "Queued.");
}

void OutboundCall::SetSent() {
auto end_time = MonoTime::Now();
auto end_time = CoarseMonoClock::Now();
// Track time taken to be sent
if (outbound_call_metrics_) {
outbound_call_metrics_->send_time->Increment(end_time.GetDeltaSince(start_).ToMicroseconds());
outbound_call_metrics_->send_time->Increment(MonoDelta(end_time - start_).ToMicroseconds());
}
SetState(SENT);
TRACE_TO_WITH_TIME(trace_, end_time, "Call Sent.");
Expand All @@ -435,7 +441,7 @@ void OutboundCall::SetFinished() {
// Track time taken to be responded.
if (outbound_call_metrics_) {
outbound_call_metrics_->time_to_response->Increment(
MonoTime::Now().GetDeltaSince(start_).ToMicroseconds());
MonoDelta(CoarseMonoClock::Now() - start_).ToMicroseconds());
}
if (SetState(FINISHED_SUCCESS)) {
InvokeCallback();
Expand Down Expand Up @@ -516,7 +522,7 @@ bool OutboundCall::DumpPB(const DumpRunningRpcsRequestPB& req,
// is only used for dumping the PB and not to send the RPC over the wire.
return false;
}
resp->set_elapsed_millis(MonoTime::Now().GetDeltaSince(start_).ToMilliseconds());
resp->set_elapsed_millis(MonoDelta(CoarseMonoClock::Now() - start_).ToMilliseconds());
resp->set_state(state_value);
if (req.include_traces() && trace_) {
resp->set_trace_buffer(trace_->DumpToString(true));
Expand Down
11 changes: 10 additions & 1 deletion src/yb/rpc/outbound_call.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,11 @@ class InvokeCallbackTask : public rpc::ThreadPoolTask {
OutboundCallPtr call_;
};

struct OutboundMethodMetrics {
scoped_refptr<Counter> request_bytes;
scoped_refptr<Counter> response_bytes;
};

// Tracks the status of a call on the client side.
//
// This is an internal-facing class -- clients interact with the
Expand All @@ -217,11 +222,13 @@ class OutboundCall : public RpcCall {
public:
OutboundCall(const RemoteMethod* remote_method,
const std::shared_ptr<OutboundCallMetrics>& outbound_call_metrics,
std::shared_ptr<const OutboundMethodMetrics> method_metrics,
google::protobuf::Message* response_storage,
RpcController* controller,
RpcMetrics* rpc_metrics,
ResponseCallback callback,
ThreadPool* callback_thread_pool);

virtual ~OutboundCall();

// Serialize the given request PB into this call's internal storage.
Expand Down Expand Up @@ -329,7 +336,7 @@ class OutboundCall : public RpcCall {

ConnectionId conn_id_;
const std::string* hostname_;
MonoTime start_;
CoarseTimePoint start_;
RpcController* controller_;
// Pointer for the protobuf where the response should be written.
// Can be used only while callback_ object is alive.
Expand Down Expand Up @@ -400,6 +407,8 @@ class OutboundCall : public RpcCall {

Status thread_pool_failure_;

std::shared_ptr<const OutboundMethodMetrics> method_metrics_;

DISALLOW_COPY_AND_ASSIGN(OutboundCall);
};

Expand Down
Loading

0 comments on commit c3048aa

Please sign in to comment.