Skip to content

Commit

Permalink
upstream: implement UpstreamLocalityStats.loadMetricStats (envoyproxy…
Browse files Browse the repository at this point in the history
…#18534)

Implements a simple API allowing extensions to populate UpstreamLocalityStats.loadMetricStats in load reports, stored as a flat hash map of keys and values that may vary at runtime. Individual stats are accumulated by calling add(), which combines stats with the same name; the aggregated stats are retrieved by calling latch(), which also clears the current load metrics. The load metrics are latched and read by the load stats reporter.

Note: this implementation puts a copy of the stat name into every host that receives a copy of that metric. This can be improved by putting a single copy of the stat name into a thread-local key->index map and using the index as the key to the stat map instead.

Risk Level: low
Testing: CI
Docs Changes: n/a
Release Notes: n/a
Platform Specific Features: n/a

Signed-off-by: Eugene Chan <eugenechan@google.com>
Signed-off-by: Josh Perry <josh.perry@mx.com>
  • Loading branch information
pianiststickman authored and Josh Perry committed Feb 13, 2022
1 parent 5adb2bc commit 86c5a3c
Show file tree
Hide file tree
Showing 9 changed files with 224 additions and 7 deletions.
33 changes: 33 additions & 0 deletions envoy/upstream/host_description.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,34 @@ struct HostStats {
}
};

/**
* Weakly-named load metrics to be reported as EndpointLoadMetricStats. Individual stats are
* accumulated by calling add(), which combines stats with the same name. The aggregated stats are
* retrieved by calling latch(), which also clears the current load metrics.
*/
class LoadMetricStats {
public:
virtual ~LoadMetricStats() = default;

struct Stat {
uint64_t num_requests_with_metric = 0;
double total_metric_value = 0.0;
};

using StatMap = absl::flat_hash_map<std::string, Stat>;
using StatMapPtr = std::unique_ptr<StatMap>;

// Adds the given stat to the map. If the stat already exists in the map, then the stat is
// combined with the existing map entry by incrementing num_requests_with_metric and summing the
// total_metric_value fields. Otherwise, the stat is added with the provided value to the map,
// which retains all entries until the next call to latch(). This allows metrics to be added
// whose keys may not necessarily be known at startup time.
virtual void add(const absl::string_view key, double value) PURE;

// Returns an owning pointer to the current load metrics and clears the map.
virtual StatMapPtr latch() PURE;
};

class ClusterInfo;

/**
Expand Down Expand Up @@ -131,6 +159,11 @@ class HostDescription {
*/
virtual HostStats& stats() const PURE;

/**
* @return custom stats for multi-dimensional load balancing.
*/
virtual LoadMetricStats& loadMetricStats() const PURE;

/**
* @return the locality of the host (deployment specific). This will be the default instance if
* unknown.
Expand Down
38 changes: 31 additions & 7 deletions source/common/upstream/load_stats_reporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,19 +72,36 @@ void LoadStatsReporter::sendLoadStatsRequest() {
if (cluster.info()->edsServiceName().has_value()) {
cluster_stats->set_cluster_service_name(cluster.info()->edsServiceName().value());
}
for (auto& host_set : cluster.prioritySet().hostSetsPerPriority()) {
for (const HostSetPtr& host_set : cluster.prioritySet().hostSetsPerPriority()) {
ENVOY_LOG(trace, "Load report locality count {}", host_set->hostsPerLocality().get().size());
for (auto& hosts : host_set->hostsPerLocality().get()) {
for (const HostVector& hosts : host_set->hostsPerLocality().get()) {
ASSERT(!hosts.empty());
uint64_t rq_success = 0;
uint64_t rq_error = 0;
uint64_t rq_active = 0;
uint64_t rq_issued = 0;
for (const auto& host : hosts) {
rq_success += host->stats().rq_success_.latch();
rq_error += host->stats().rq_error_.latch();
rq_active += host->stats().rq_active_.value();
rq_issued += host->stats().rq_total_.latch();
LoadMetricStats::StatMap load_metrics;
for (const HostSharedPtr& host : hosts) {
uint64_t host_rq_success = host->stats().rq_success_.latch();
uint64_t host_rq_error = host->stats().rq_error_.latch();
uint64_t host_rq_active = host->stats().rq_active_.value();
uint64_t host_rq_issued = host->stats().rq_total_.latch();
rq_success += host_rq_success;
rq_error += host_rq_error;
rq_active += host_rq_active;
rq_issued += host_rq_issued;
if (host_rq_success + host_rq_error + host_rq_active != 0) {
const std::unique_ptr<LoadMetricStats::StatMap> latched_stats =
host->loadMetricStats().latch();
if (latched_stats != nullptr) {
for (const auto& metric : *latched_stats) {
const std::string& name = metric.first;
LoadMetricStats::Stat& stat = load_metrics[name];
stat.num_requests_with_metric += metric.second.num_requests_with_metric;
stat.total_metric_value += metric.second.total_metric_value;
}
}
}
}
if (rq_success + rq_error + rq_active != 0) {
auto* locality_stats = cluster_stats->add_upstream_locality_stats();
Expand All @@ -94,6 +111,13 @@ void LoadStatsReporter::sendLoadStatsRequest() {
locality_stats->set_total_error_requests(rq_error);
locality_stats->set_total_requests_in_progress(rq_active);
locality_stats->set_total_issued_requests(rq_issued);
for (const auto& metric : load_metrics) {
auto* load_metric_stats = locality_stats->add_load_metric_stats();
load_metric_stats->set_metric_name(metric.first);
load_metric_stats->set_num_requests_finished_with_metric(
metric.second.num_requests_with_metric);
load_metric_stats->set_total_metric_value(metric.second.total_metric_value);
}
}
}
}
Expand Down
1 change: 1 addition & 0 deletions source/common/upstream/logical_host.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ class RealHostDescription : public HostDescription {
return logical_host_->outlierDetector();
}
HostStats& stats() const override { return logical_host_->stats(); }
LoadMetricStats& loadMetricStats() const override { return logical_host_->loadMetricStats(); }
const std::string& hostnameForHealthChecks() const override {
return logical_host_->hostnameForHealthChecks();
}
Expand Down
21 changes: 21 additions & 0 deletions source/common/upstream/upstream_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,27 @@ HostVector filterHosts(const absl::node_hash_set<HostSharedPtr>& hosts,

} // namespace

// TODO(pianiststickman): this implementation takes a lock on the hot path and puts a copy of the
// stat name into every host that receives a copy of that metric. This can be improved by putting
// a single copy of the stat name into a thread-local key->index map so that the lock can be avoided
// and using the index as the key to the stat map instead.
void LoadMetricStatsImpl::add(const absl::string_view key, double value) {
absl::MutexLock lock(&mu_);
if (map_ == nullptr) {
map_ = std::make_unique<StatMap>();
}
Stat& stat = (*map_)[key];
++stat.num_requests_with_metric;
stat.total_metric_value += value;
}

LoadMetricStats::StatMapPtr LoadMetricStatsImpl::latch() {
absl::MutexLock lock(&mu_);
StatMapPtr latched = std::move(map_);
map_ = nullptr;
return latched;
}

HostDescriptionImpl::HostDescriptionImpl(
ClusterInfoConstSharedPtr cluster, const std::string& hostname,
Network::Address::InstanceConstSharedPtr dest_address, MetadataConstSharedPtr metadata,
Expand Down
15 changes: 15 additions & 0 deletions source/common/upstream/upstream_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,19 @@ class HealthCheckHostMonitorNullImpl : public HealthCheckHostMonitor {
void setUnhealthy(UnhealthyType) override {}
};

/**
* Implementation of LoadMetricStats.
*/
class LoadMetricStatsImpl : public LoadMetricStats {
public:
void add(const absl::string_view key, double value) override;
StatMapPtr latch() override;

private:
absl::Mutex mu_;
StatMapPtr map_ ABSL_GUARDED_BY(mu_);
};

/**
* Implementation of Upstream::HostDescription.
*/
Expand Down Expand Up @@ -133,6 +146,7 @@ class HostDescriptionImpl : virtual public HostDescription,
return *null_outlier_detector;
}
HostStats& stats() const override { return stats_; }
LoadMetricStats& loadMetricStats() const override { return load_metric_stats_; }
const std::string& hostnameForHealthChecks() const override { return health_checks_hostname_; }
const std::string& hostname() const override { return hostname_; }
Network::Address::InstanceConstSharedPtr address() const override { return address_; }
Expand Down Expand Up @@ -185,6 +199,7 @@ class HostDescriptionImpl : virtual public HostDescription,
const envoy::config::core::v3::Locality locality_;
Stats::StatNameDynamicStorage locality_zone_stat_name_;
mutable HostStats stats_;
mutable LoadMetricStatsImpl load_metric_stats_;
Outlier::DetectorHostMonitorPtr outlier_detector_;
HealthCheckHostMonitorPtr health_checker_;
std::atomic<uint32_t> priority_;
Expand Down
1 change: 1 addition & 0 deletions test/common/upstream/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ envoy_cc_test(
deps = [
"//source/common/stats:stats_lib",
"//source/common/upstream:load_stats_reporter_lib",
"//test/common/upstream:utility_lib",
"//test/mocks/event:event_mocks",
"//test/mocks/grpc:grpc_mocks",
"//test/mocks/local_info:local_info_mocks",
Expand Down
116 changes: 116 additions & 0 deletions test/common/upstream/load_stats_reporter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include "source/common/upstream/load_stats_reporter.h"

#include "test/common/upstream/utility.h"
#include "test/mocks/event/mocks.h"
#include "test/mocks/grpc/mocks.h"
#include "test/mocks/local_info/mocks.h"
Expand Down Expand Up @@ -221,6 +222,121 @@ TEST_F(LoadStatsReporterTest, ExistingClusters) {
response_timer_cb_();
}

HostSharedPtr makeTestHost(const std::string& hostname,
const ::envoy::config::core::v3::Locality& locality) {
const auto host = std::make_shared<NiceMock<::Envoy::Upstream::MockHost>>();
ON_CALL(*host, hostname()).WillByDefault(::testing::ReturnRef(hostname));
ON_CALL(*host, locality()).WillByDefault(::testing::ReturnRef(locality));
return host;
}

void addStats(const HostSharedPtr& host, double a, double b = 0, double c = 0, double d = 0) {
host->stats().rq_success_.inc();
host->loadMetricStats().add("metric_a", a);
if (b != 0) {
host->loadMetricStats().add("metric_b", b);
}
if (c != 0) {
host->loadMetricStats().add("metric_c", c);
}
if (d != 0) {
host->loadMetricStats().add("metric_d", d);
}
}

void addStatExpectation(envoy::config::endpoint::v3::UpstreamLocalityStats* stats,
const std::string& metric_name, int num_requests_with_metric,
double total_metric_value) {
auto metric = stats->add_load_metric_stats();
metric->set_metric_name(metric_name);
metric->set_num_requests_finished_with_metric(num_requests_with_metric);
metric->set_total_metric_value(total_metric_value);
}

// Validate that per-locality metrics are aggregated across hosts and included in the load report.
TEST_F(LoadStatsReporterTest, UpstreamLocalityStats) {
EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_));
expectSendMessage({});
createLoadStatsReporter();
time_system_.setMonotonicTime(std::chrono::microseconds(3));

// Set up some load metrics
NiceMock<MockClusterMockPrioritySet> cluster;
MockHostSet& host_set_ = *cluster.prioritySet().getMockHostSet(0);

::envoy::config::core::v3::Locality locality0, locality1;
locality0.set_region("mars");
locality1.set_region("jupiter");
HostSharedPtr host0 = makeTestHost("host0", locality0), host1 = makeTestHost("host1", locality0),
host2 = makeTestHost("host2", locality1);
host_set_.hosts_per_locality_ = makeHostsPerLocality({{host0, host1}, {host2}});

addStats(host0, 0.11111, 1.0);
addStats(host0, 0.33333, 0, 3.14159);
addStats(host1, 0.44444, 0.12345);
addStats(host2, 10.01, 0, 20.02, 30.03);

cluster.info_->eds_service_name_ = "bar";
MockClusterManager::ClusterInfoMaps cluster_info{{{"foo", cluster}}, {}};
ON_CALL(cm_, clusters()).WillByDefault(Return(cluster_info));
deliverLoadStatsResponse({"foo"});
// First stats report on timer tick.
time_system_.setMonotonicTime(std::chrono::microseconds(4));
{
envoy::config::endpoint::v3::ClusterStats expected_cluster_stats;
expected_cluster_stats.set_cluster_name("foo");
expected_cluster_stats.set_cluster_service_name("bar");
expected_cluster_stats.mutable_load_report_interval()->MergeFrom(
Protobuf::util::TimeUtil::MicrosecondsToDuration(1));

auto expected_locality0_stats = expected_cluster_stats.add_upstream_locality_stats();
expected_locality0_stats->mutable_locality()->set_region("mars");
expected_locality0_stats->set_total_successful_requests(3);
addStatExpectation(expected_locality0_stats, "metric_a", 3, 0.88888);
addStatExpectation(expected_locality0_stats, "metric_b", 2, 1.12345);
addStatExpectation(expected_locality0_stats, "metric_c", 1, 3.14159);

auto expected_locality1_stats = expected_cluster_stats.add_upstream_locality_stats();
expected_locality1_stats->mutable_locality()->set_region("jupiter");
expected_locality1_stats->set_total_successful_requests(1);
addStatExpectation(expected_locality1_stats, "metric_a", 1, 10.01);
addStatExpectation(expected_locality1_stats, "metric_c", 1, 20.02);
addStatExpectation(expected_locality1_stats, "metric_d", 1, 30.03);

expectSendMessage({expected_cluster_stats});
}
EXPECT_CALL(*response_timer_, enableTimer(std::chrono::milliseconds(42000), _));
response_timer_cb_();

// Traffic between previous request and next response. Previous latched metrics are cleared.
host1->stats().rq_success_.inc();
host1->loadMetricStats().add("metric_a", 1.41421);
host1->loadMetricStats().add("metric_e", 2.71828);

time_system_.setMonotonicTime(std::chrono::microseconds(6));
deliverLoadStatsResponse({"foo"});
// Second stats report on timer tick.
time_system_.setMonotonicTime(std::chrono::microseconds(28));
{
envoy::config::endpoint::v3::ClusterStats expected_cluster_stats;
expected_cluster_stats.set_cluster_name("foo");
expected_cluster_stats.set_cluster_service_name("bar");
expected_cluster_stats.mutable_load_report_interval()->MergeFrom(
Protobuf::util::TimeUtil::MicrosecondsToDuration(24));

auto expected_locality0_stats = expected_cluster_stats.add_upstream_locality_stats();
expected_locality0_stats->mutable_locality()->set_region("mars");
expected_locality0_stats->set_total_successful_requests(1);
addStatExpectation(expected_locality0_stats, "metric_a", 1, 1.41421);
addStatExpectation(expected_locality0_stats, "metric_e", 1, 2.71828);

// No stats for locality 1 since there was no traffic to it.
expectSendMessage({expected_cluster_stats});
}
EXPECT_CALL(*response_timer_, enableTimer(std::chrono::milliseconds(42000), _));
response_timer_cb_();
}

// Validate that the client can recover from a remote stream closure via retry.
TEST_F(LoadStatsReporterTest, RemoteStreamClose) {
EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_));
Expand Down
2 changes: 2 additions & 0 deletions test/mocks/upstream/host.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ MockHostDescription::MockHostDescription()
ON_CALL(*this, address()).WillByDefault(Return(address_));
ON_CALL(*this, outlierDetector()).WillByDefault(ReturnRef(outlier_detector_));
ON_CALL(*this, stats()).WillByDefault(ReturnRef(stats_));
ON_CALL(*this, loadMetricStats()).WillByDefault(ReturnRef(load_metric_stats_));
ON_CALL(*this, locality()).WillByDefault(ReturnRef(locality_));
ON_CALL(*this, cluster()).WillByDefault(ReturnRef(cluster_));
ON_CALL(*this, healthChecker()).WillByDefault(ReturnRef(health_checker_));
Expand All @@ -49,6 +50,7 @@ MockHost::MockHost() : socket_factory_(new testing::NiceMock<Network::MockTransp
ON_CALL(*this, cluster()).WillByDefault(ReturnRef(cluster_));
ON_CALL(*this, outlierDetector()).WillByDefault(ReturnRef(outlier_detector_));
ON_CALL(*this, stats()).WillByDefault(ReturnRef(stats_));
ON_CALL(*this, loadMetricStats()).WillByDefault(ReturnRef(load_metric_stats_));
ON_CALL(*this, warmed()).WillByDefault(Return(true));
ON_CALL(*this, transportSocketFactory()).WillByDefault(ReturnRef(*socket_factory_));
}
Expand Down
4 changes: 4 additions & 0 deletions test/mocks/upstream/host.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ class MockHostDescription : public HostDescription {
MOCK_METHOD(const std::string&, hostname, (), (const));
MOCK_METHOD(Network::TransportSocketFactory&, transportSocketFactory, (), (const));
MOCK_METHOD(HostStats&, stats, (), (const));
MOCK_METHOD(LoadMetricStats&, loadMetricStats, (), (const));
MOCK_METHOD(const envoy::config::core::v3::Locality&, locality, (), (const));
MOCK_METHOD(uint32_t, priority, (), (const));
MOCK_METHOD(void, priority, (uint32_t));
Expand All @@ -115,6 +116,7 @@ class MockHostDescription : public HostDescription {
Network::TransportSocketFactoryPtr socket_factory_;
testing::NiceMock<MockClusterInfo> cluster_;
HostStats stats_;
LoadMetricStatsImpl load_metric_stats_;
envoy::config::core::v3::Locality locality_;
mutable Stats::TestUtil::TestSymbolTable symbol_table_;
mutable std::unique_ptr<Stats::StatNameManagedStorage> locality_zone_stat_name_;
Expand Down Expand Up @@ -189,6 +191,7 @@ class MockHost : public Host {
MOCK_METHOD(void, setHealthChecker_, (HealthCheckHostMonitorPtr & health_checker));
MOCK_METHOD(void, setOutlierDetector_, (Outlier::DetectorHostMonitorPtr & outlier_detector));
MOCK_METHOD(HostStats&, stats, (), (const));
MOCK_METHOD(LoadMetricStats&, loadMetricStats, (), (const));
MOCK_METHOD(uint32_t, weight, (), (const));
MOCK_METHOD(void, weight, (uint32_t new_weight));
MOCK_METHOD(bool, used, (), (const));
Expand All @@ -203,6 +206,7 @@ class MockHost : public Host {
Network::TransportSocketFactoryPtr socket_factory_;
testing::NiceMock<Outlier::MockDetectorHostMonitor> outlier_detector_;
HostStats stats_;
LoadMetricStatsImpl load_metric_stats_;
mutable Stats::TestUtil::TestSymbolTable symbol_table_;
mutable std::unique_ptr<Stats::StatNameManagedStorage> locality_zone_stat_name_;
};
Expand Down

0 comments on commit 86c5a3c

Please sign in to comment.