Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

upstream: implement UpstreamLocalityStats.loadMetricStats #18534

Merged
merged 8 commits into from
Jan 14, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions envoy/upstream/host_description.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,48 @@ struct HostStats {
}
};

/**
* Weakly-named load metrics to be reported as EndpointLoadMetricStats. Individual stats are
* accumulated by calling add(), which combines stats with the same name. The aggregated stats are
* retrieved by calling latch(), which also clears the current load metrics.
*/
class LoadMetricStats {
pianiststickman marked this conversation as resolved.
Show resolved Hide resolved
public:
struct Stat {
uint64_t num_requests_with_metric = 0;
double total_metric_value = 0.0;
};

using StatsMap = absl::flat_hash_map<std::string, Stat>;
pianiststickman marked this conversation as resolved.
Show resolved Hide resolved

LoadMetricStats() : map_(std::make_unique<StatsMap>()) {}
pianiststickman marked this conversation as resolved.
Show resolved Hide resolved

// Adds the given stat to the map. If the stat already exists in the map, then the stat is
// combined with the existing map entry by incrementing num_requests_with_metric and summing the
// total_metric_value fields.
void add(const std::string& key, double value) {
absl::MutexLock lock(&mu_);
Stat& stat = (*map_)[key];
pianiststickman marked this conversation as resolved.
Show resolved Hide resolved
++stat.num_requests_with_metric;
stat.total_metric_value += value;
}

// Returns an owning pointer to the current load metrics and clears the map.
std::unique_ptr<StatsMap> latch() {
absl::MutexLock lock(&mu_);
std::unique_ptr<StatsMap> latched = std::move(map_);
map_ = std::make_unique<StatsMap>();
return latched;
}

private:
absl::Mutex mu_;
std::unique_ptr<StatsMap> map_ ABSL_GUARDED_BY(mu_);
pianiststickman marked this conversation as resolved.
Show resolved Hide resolved

LoadMetricStats(const LoadMetricStats&) = delete;
LoadMetricStats& operator=(const LoadMetricStats&) = delete;
};

class ClusterInfo;

/**
Expand Down Expand Up @@ -131,6 +173,11 @@ class HostDescription {
*/
virtual HostStats& stats() const PURE;

/**
* @return custom stats for multi-dimensional load balancing.
*/
virtual LoadMetricStats& loadMetricStats() const PURE;

/**
* @return the locality of the host (deployment specific). This will be the default instance if
* unknown.
Expand Down
22 changes: 19 additions & 3 deletions source/common/upstream/load_stats_reporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,19 +72,28 @@ void LoadStatsReporter::sendLoadStatsRequest() {
if (cluster.info()->edsServiceName().has_value()) {
cluster_stats->set_cluster_service_name(cluster.info()->edsServiceName().value());
}
for (auto& host_set : cluster.prioritySet().hostSetsPerPriority()) {
for (const HostSetPtr& host_set : cluster.prioritySet().hostSetsPerPriority()) {
ENVOY_LOG(trace, "Load report locality count {}", host_set->hostsPerLocality().get().size());
for (auto& hosts : host_set->hostsPerLocality().get()) {
for (const HostVector& hosts : host_set->hostsPerLocality().get()) {
ASSERT(!hosts.empty());
uint64_t rq_success = 0;
uint64_t rq_error = 0;
uint64_t rq_active = 0;
uint64_t rq_issued = 0;
for (const auto& host : hosts) {
LoadMetricStats::StatsMap load_metrics;
pianiststickman marked this conversation as resolved.
Show resolved Hide resolved
for (const HostSharedPtr& host : hosts) {
rq_success += host->stats().rq_success_.latch();
rq_error += host->stats().rq_error_.latch();
rq_active += host->stats().rq_active_.value();
rq_issued += host->stats().rq_total_.latch();
const std::unique_ptr<LoadMetricStats::StatsMap> latched_stats =
pianiststickman marked this conversation as resolved.
Show resolved Hide resolved
host->loadMetricStats().latch();
for (const auto& metric : *latched_stats) {
const std::string& name = metric.first;
LoadMetricStats::Stat& stat = load_metrics[name];
stat.num_requests_with_metric += metric.second.num_requests_with_metric;
stat.total_metric_value += metric.second.total_metric_value;
}
}
if (rq_success + rq_error + rq_active != 0) {
auto* locality_stats = cluster_stats->add_upstream_locality_stats();
Expand All @@ -94,6 +103,13 @@ void LoadStatsReporter::sendLoadStatsRequest() {
locality_stats->set_total_error_requests(rq_error);
locality_stats->set_total_requests_in_progress(rq_active);
locality_stats->set_total_issued_requests(rq_issued);
for (const auto& metric : load_metrics) {
auto* load_metric_stats = locality_stats->add_load_metric_stats();
load_metric_stats->set_metric_name(metric.first);
load_metric_stats->set_num_requests_finished_with_metric(
metric.second.num_requests_with_metric);
load_metric_stats->set_total_metric_value(metric.second.total_metric_value);
}
}
}
}
Expand Down
1 change: 1 addition & 0 deletions source/common/upstream/logical_host.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ class RealHostDescription : public HostDescription {
return logical_host_->outlierDetector();
}
HostStats& stats() const override { return logical_host_->stats(); }
LoadMetricStats& loadMetricStats() const override { return logical_host_->loadMetricStats(); }
const std::string& hostnameForHealthChecks() const override {
return logical_host_->hostnameForHealthChecks();
}
Expand Down
2 changes: 2 additions & 0 deletions source/common/upstream/upstream_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ class HostDescriptionImpl : virtual public HostDescription,
return *null_outlier_detector;
}
HostStats& stats() const override { return stats_; }
LoadMetricStats& loadMetricStats() const override { return load_metric_stats_; }
const std::string& hostnameForHealthChecks() const override { return health_checks_hostname_; }
const std::string& hostname() const override { return hostname_; }
Network::Address::InstanceConstSharedPtr address() const override { return address_; }
Expand Down Expand Up @@ -185,6 +186,7 @@ class HostDescriptionImpl : virtual public HostDescription,
const envoy::config::core::v3::Locality locality_;
Stats::StatNameDynamicStorage locality_zone_stat_name_;
mutable HostStats stats_;
mutable LoadMetricStats load_metric_stats_;
Outlier::DetectorHostMonitorPtr outlier_detector_;
HealthCheckHostMonitorPtr health_checker_;
std::atomic<uint32_t> priority_;
Expand Down
1 change: 1 addition & 0 deletions test/common/upstream/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,7 @@ envoy_cc_test(
deps = [
"//source/common/stats:stats_lib",
"//source/common/upstream:load_stats_reporter_lib",
"//test/common/upstream:utility_lib",
"//test/mocks/event:event_mocks",
"//test/mocks/grpc:grpc_mocks",
"//test/mocks/local_info:local_info_mocks",
Expand Down
129 changes: 129 additions & 0 deletions test/common/upstream/load_stats_reporter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include "source/common/upstream/load_stats_reporter.h"

#include "test/common/upstream/utility.h"
#include "test/mocks/event/mocks.h"
#include "test/mocks/grpc/mocks.h"
#include "test/mocks/local_info/mocks.h"
Expand Down Expand Up @@ -221,6 +222,134 @@ TEST_F(LoadStatsReporterTest, ExistingClusters) {
response_timer_cb_();
}

HostSharedPtr makeTestHost(const std::string& hostname,
const ::envoy::config::core::v3::Locality& locality) {
const auto host = std::make_shared<NiceMock<::Envoy::Upstream::MockHost>>();
ON_CALL(*host, hostname()).WillByDefault(::testing::ReturnRef(hostname));
ON_CALL(*host, locality()).WillByDefault(::testing::ReturnRef(locality));
return host;
}

// Validate that per-locality metrics are aggregated across hosts and included in the load report.
TEST_F(LoadStatsReporterTest, UpstreamLocalityStats) {
EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_));
expectSendMessage({});
createLoadStatsReporter();
time_system_.setMonotonicTime(std::chrono::microseconds(3));

// Set up some load metrics
NiceMock<MockClusterMockPrioritySet> cluster;
MockHostSet& host_set_ = *cluster.prioritySet().getMockHostSet(0);

::envoy::config::core::v3::Locality locality0, locality1;
locality0.set_region("mars");
locality1.set_region("jupiter");
HostSharedPtr host0 = makeTestHost("host0", locality0), host1 = makeTestHost("host1", locality0),
host2 = makeTestHost("host2", locality1);
host_set_.hosts_per_locality_ = makeHostsPerLocality({{host0, host1}, {host2}});

host0->stats().rq_success_.inc();
host0->loadMetricStats().add("metric_a", 0.11111);
pianiststickman marked this conversation as resolved.
Show resolved Hide resolved
host0->loadMetricStats().add("metric_b", 1.0);

host0->stats().rq_success_.inc();
host0->loadMetricStats().add("metric_a", 0.33333);
host0->loadMetricStats().add("metric_c", 3.14159);

host1->stats().rq_success_.inc();
host1->loadMetricStats().add("metric_a", 0.44444);
host1->loadMetricStats().add("metric_b", 0.12345);

host2->stats().rq_success_.inc();
host2->loadMetricStats().add("metric_a", 10.01);
host2->loadMetricStats().add("metric_c", 20.02);
host2->loadMetricStats().add("metric_d", 30.03);

cluster.info_->eds_service_name_ = "bar";
MockClusterManager::ClusterInfoMaps cluster_info{{{"foo", cluster}}, {}};
ON_CALL(cm_, clusters()).WillByDefault(Return(cluster_info));
deliverLoadStatsResponse({"foo"});
// First stats report on timer tick.
time_system_.setMonotonicTime(std::chrono::microseconds(4));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider using advanceTimeWait with a delta, rather than just setting the time to an absolute time.

{
envoy::config::endpoint::v3::ClusterStats expected_cluster_stats;
expected_cluster_stats.set_cluster_name("foo");
expected_cluster_stats.set_cluster_service_name("bar");
expected_cluster_stats.mutable_load_report_interval()->MergeFrom(
Protobuf::util::TimeUtil::MicrosecondsToDuration(1));

auto expected_locality0_stats = expected_cluster_stats.add_upstream_locality_stats();
expected_locality0_stats->mutable_locality()->set_region("mars");
expected_locality0_stats->set_total_successful_requests(3);
auto locality0_metric_a = expected_locality0_stats->add_load_metric_stats();
locality0_metric_a->set_metric_name("metric_a");
locality0_metric_a->set_num_requests_finished_with_metric(3);
locality0_metric_a->set_total_metric_value(0.88888);
auto locality0_metric_b = expected_locality0_stats->add_load_metric_stats();
locality0_metric_b->set_metric_name("metric_b");
locality0_metric_b->set_num_requests_finished_with_metric(2);
locality0_metric_b->set_total_metric_value(1.12345);
auto locality0_metric_c = expected_locality0_stats->add_load_metric_stats();
locality0_metric_c->set_metric_name("metric_c");
locality0_metric_c->set_num_requests_finished_with_metric(1);
locality0_metric_c->set_total_metric_value(3.14159);

auto expected_locality1_stats = expected_cluster_stats.add_upstream_locality_stats();
expected_locality1_stats->mutable_locality()->set_region("jupiter");
expected_locality1_stats->set_total_successful_requests(1);
auto locality1_metric_a = expected_locality1_stats->add_load_metric_stats();
locality1_metric_a->set_metric_name("metric_a");
locality1_metric_a->set_num_requests_finished_with_metric(1);
locality1_metric_a->set_total_metric_value(10.01);
auto locality1_metric_c = expected_locality1_stats->add_load_metric_stats();
locality1_metric_c->set_metric_name("metric_c");
locality1_metric_c->set_num_requests_finished_with_metric(1);
locality1_metric_c->set_total_metric_value(20.02);
auto locality1_metric_d = expected_locality1_stats->add_load_metric_stats();
locality1_metric_d->set_metric_name("metric_d");
locality1_metric_d->set_num_requests_finished_with_metric(1);
locality1_metric_d->set_total_metric_value(30.03);

expectSendMessage({expected_cluster_stats});
}
EXPECT_CALL(*response_timer_, enableTimer(std::chrono::milliseconds(42000), _));
response_timer_cb_();

// Traffic between previous request and next response. Previous latched metrics are cleared.
host1->stats().rq_success_.inc();
host1->loadMetricStats().add("metric_a", 1.41421);
host1->loadMetricStats().add("metric_e", 2.71828);

time_system_.setMonotonicTime(std::chrono::microseconds(6));
deliverLoadStatsResponse({"foo"});
// Second stats report on timer tick.
time_system_.setMonotonicTime(std::chrono::microseconds(28));
{
envoy::config::endpoint::v3::ClusterStats expected_cluster_stats;
expected_cluster_stats.set_cluster_name("foo");
expected_cluster_stats.set_cluster_service_name("bar");
expected_cluster_stats.mutable_load_report_interval()->MergeFrom(
Protobuf::util::TimeUtil::MicrosecondsToDuration(24));

auto expected_locality0_stats = expected_cluster_stats.add_upstream_locality_stats();
expected_locality0_stats->mutable_locality()->set_region("mars");
expected_locality0_stats->set_total_successful_requests(1);
auto locality0_metric_a = expected_locality0_stats->add_load_metric_stats();
locality0_metric_a->set_metric_name("metric_a");
locality0_metric_a->set_num_requests_finished_with_metric(1);
locality0_metric_a->set_total_metric_value(1.41421);
auto locality0_metric_e = expected_locality0_stats->add_load_metric_stats();
locality0_metric_e->set_metric_name("metric_e");
locality0_metric_e->set_num_requests_finished_with_metric(1);
locality0_metric_e->set_total_metric_value(2.71828);

// No stats for locality 1 since there was no traffic to it.
expectSendMessage({expected_cluster_stats});
}
EXPECT_CALL(*response_timer_, enableTimer(std::chrono::milliseconds(42000), _));
response_timer_cb_();
}

// Validate that the client can recover from a remote stream closure via retry.
TEST_F(LoadStatsReporterTest, RemoteStreamClose) {
EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_));
Expand Down
2 changes: 2 additions & 0 deletions test/mocks/upstream/host.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ MockHostDescription::MockHostDescription()
ON_CALL(*this, address()).WillByDefault(Return(address_));
ON_CALL(*this, outlierDetector()).WillByDefault(ReturnRef(outlier_detector_));
ON_CALL(*this, stats()).WillByDefault(ReturnRef(stats_));
ON_CALL(*this, loadMetricStats()).WillByDefault(ReturnRef(load_metric_stats_));
ON_CALL(*this, locality()).WillByDefault(ReturnRef(locality_));
ON_CALL(*this, cluster()).WillByDefault(ReturnRef(cluster_));
ON_CALL(*this, healthChecker()).WillByDefault(ReturnRef(health_checker_));
Expand All @@ -49,6 +50,7 @@ MockHost::MockHost() : socket_factory_(new testing::NiceMock<Network::MockTransp
ON_CALL(*this, cluster()).WillByDefault(ReturnRef(cluster_));
ON_CALL(*this, outlierDetector()).WillByDefault(ReturnRef(outlier_detector_));
ON_CALL(*this, stats()).WillByDefault(ReturnRef(stats_));
ON_CALL(*this, loadMetricStats()).WillByDefault(ReturnRef(load_metric_stats_));
ON_CALL(*this, warmed()).WillByDefault(Return(true));
ON_CALL(*this, transportSocketFactory()).WillByDefault(ReturnRef(*socket_factory_));
}
Expand Down
4 changes: 4 additions & 0 deletions test/mocks/upstream/host.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ class MockHostDescription : public HostDescription {
MOCK_METHOD(const std::string&, hostname, (), (const));
MOCK_METHOD(Network::TransportSocketFactory&, transportSocketFactory, (), (const));
MOCK_METHOD(HostStats&, stats, (), (const));
MOCK_METHOD(LoadMetricStats&, loadMetricStats, (), (const));
MOCK_METHOD(const envoy::config::core::v3::Locality&, locality, (), (const));
MOCK_METHOD(uint32_t, priority, (), (const));
MOCK_METHOD(void, priority, (uint32_t));
Expand All @@ -115,6 +116,7 @@ class MockHostDescription : public HostDescription {
Network::TransportSocketFactoryPtr socket_factory_;
testing::NiceMock<MockClusterInfo> cluster_;
HostStats stats_;
LoadMetricStats load_metric_stats_;
envoy::config::core::v3::Locality locality_;
mutable Stats::TestUtil::TestSymbolTable symbol_table_;
mutable std::unique_ptr<Stats::StatNameManagedStorage> locality_zone_stat_name_;
Expand Down Expand Up @@ -189,6 +191,7 @@ class MockHost : public Host {
MOCK_METHOD(void, setHealthChecker_, (HealthCheckHostMonitorPtr & health_checker));
MOCK_METHOD(void, setOutlierDetector_, (Outlier::DetectorHostMonitorPtr & outlier_detector));
MOCK_METHOD(HostStats&, stats, (), (const));
MOCK_METHOD(LoadMetricStats&, loadMetricStats, (), (const));
MOCK_METHOD(uint32_t, weight, (), (const));
MOCK_METHOD(void, weight, (uint32_t new_weight));
MOCK_METHOD(bool, used, (), (const));
Expand All @@ -203,6 +206,7 @@ class MockHost : public Host {
Network::TransportSocketFactoryPtr socket_factory_;
testing::NiceMock<Outlier::MockDetectorHostMonitor> outlier_detector_;
HostStats stats_;
LoadMetricStats load_metric_stats_;
mutable Stats::TestUtil::TestSymbolTable symbol_table_;
mutable std::unique_ptr<Stats::StatNameManagedStorage> locality_zone_stat_name_;
};
Expand Down