Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions envoy/upstream/host_description.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,11 @@ class HostLbPolicyData {
* Please ensure that the implementation is thread-safe.
*
* @param report supplies the ORCA load report of this upstream host.
* @param stream_info supplies the downstream stream info.
* @param stream_info carries the downstream stream info for in-band reports and is empty for
* out-of-band (OOB) reports.
*/
virtual absl::Status onOrcaLoadReport(const OrcaLoadReport& /*report*/,
const StreamInfo::StreamInfo& /*stream_info*/) {
OptRef<const StreamInfo::StreamInfo> /*stream_info*/) {
return absl::OkStatus();
}
};
Expand Down
1 change: 1 addition & 0 deletions source/common/router/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ envoy_cc_library(
"//source/common/orca:orca_load_metrics_lib",
"//source/common/orca:orca_parser",
"//source/common/stream_info:uint32_accessor_lib",
"//source/common/upstream:host_utility_lib",
"//source/common/upstream:load_balancer_context_base_lib",
"//source/common/upstream:upstream_factory_context_lib",
"//source/extensions/common/proxy_protocol:proxy_protocol_header_lib",
Expand Down
9 changes: 3 additions & 6 deletions source/common/router/router.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "source/common/router/retry_state_impl.h"
#include "source/common/runtime/runtime_features.h"
#include "source/common/stream_info/uint32_accessor_impl.h"
#include "source/common/upstream/host_utility.h"

#include "absl/container/inlined_vector.h"

Expand Down Expand Up @@ -2565,12 +2566,8 @@ void Filter::maybeProcessOrcaLoadReport(const Envoy::Http::HeaderMap& headers_or

// Inline capacity of 2 covers the typical case of 1-2 LB policies per host.
absl::InlinedVector<Upstream::HostLbPolicyData*, 2> orca_recipients;
for (size_t i = 0; i < upstream_host->lbPolicyDataCount(); ++i) {
auto host_lb_policy_data = upstream_host->lbPolicyDataAt(i);
if (host_lb_policy_data.has_value() && host_lb_policy_data->receivesOrcaLoadReport()) {
orca_recipients.push_back(host_lb_policy_data.ptr());
}
}
Upstream::HostUtility::forEachOrcaLoadReportRecipient(
*upstream_host, [&](Upstream::HostLbPolicyData& data) { orca_recipients.push_back(&data); });

if (!cluster_->lrsReportMetricNames().has_value() && orca_recipients.empty()) {
// If the cluster doesn't have LRS metric names configured then there is no need to
Expand Down
1 change: 1 addition & 0 deletions source/common/upstream/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ envoy_cc_library(
"//envoy/upstream:upstream_interface",
"//source/common/config:well_known_names",
"//source/common/runtime:runtime_lib",
"@abseil-cpp//absl/functional:function_ref",
],
)

Expand Down
10 changes: 10 additions & 0 deletions source/common/upstream/host_utility.cc
Original file line number Diff line number Diff line change
Expand Up @@ -245,5 +245,15 @@ void HostUtility::forEachHostMetric(
});
}

void HostUtility::forEachOrcaLoadReportRecipient(
const HostDescription& host, absl::FunctionRef<void(HostLbPolicyData&)> callback) {
for (size_t i = 0; i < host.lbPolicyDataCount(); ++i) {
OptRef<HostLbPolicyData> data = host.lbPolicyDataAt(i);
if (data.has_value() && data->receivesOrcaLoadReport()) {
callback(*data);
}
}
}

} // namespace Upstream
} // namespace Envoy
9 changes: 9 additions & 0 deletions source/common/upstream/host_utility.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
#include "envoy/upstream/load_balancer.h"
#include "envoy/upstream/upstream.h"

#include "absl/functional/function_ref.h"

namespace Envoy {
namespace Upstream {

Expand Down Expand Up @@ -60,6 +62,13 @@ class HostUtility {
forEachHostMetric(const ClusterManager& cluster_manager,
const std::function<void(Stats::PrimitiveCounterSnapshot&& metric)>& counter_cb,
const std::function<void(Stats::PrimitiveGaugeSnapshot&& metric)>& gauge_cb);

/**
* Invokes `callback(HostLbPolicyData&)` for each load-balancing-policy data entry on `host` whose
* receivesOrcaLoadReport() is true.
*/
static void forEachOrcaLoadReportRecipient(const HostDescription& host,
absl::FunctionRef<void(HostLbPolicyData&)> callback);
};

} // namespace Upstream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,7 @@ ClientSideWeightedRoundRobinLoadBalancer::ClientSideWeightedRoundRobinLoadBalanc
orca_oob_manager_ =
std::make_unique<Extensions::LoadBalancingPolicies::Common::ProdOrcaOobManager>(
typed_lb_config->oob_manager_config, priority_set,
typed_lb_config->main_thread_dispatcher_, random, cluster_info.statsScope(),
orca_weight_manager_->reportHandler());
typed_lb_config->main_thread_dispatcher_, random, cluster_info.statsScope());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@ class ClientSideWeightedRoundRobinLoadBalancer : public Upstream::ThreadAwareLoa
orca_weight_manager_;

// ORCA out-of-band manager. Constructed only when enable_oob_load_report is true; null
// otherwise. Shares the OrcaWeightManager's report handler so OOB reports feed the same
// per-host atomics as in-band reports.
// otherwise. Delivers decoded OOB reports through each host's HostLbPolicyData::onOrcaLoadReport,
// so policies receive OOB and in-band reports through the same callback path.
std::unique_ptr<Extensions::LoadBalancingPolicies::Common::OrcaOobManager> orca_oob_manager_;
};

Expand Down
2 changes: 1 addition & 1 deletion source/extensions/load_balancing_policies/common/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ envoy_cc_library(
srcs = ["orca_oob_manager.cc"],
hdrs = ["orca_oob_manager.h"],
deps = [
":orca_weight_manager_lib",
"//envoy/common:random_generator_interface",
"//envoy/event:dispatcher_interface",
"//envoy/event:timer_interface",
Expand All @@ -91,6 +90,7 @@ envoy_cc_library(
"//source/common/http:utility_lib",
"//source/common/network:transport_socket_options_lib",
"//source/common/network:utility_lib",
"//source/common/upstream:host_utility_lib",
"@abseil-cpp//absl/container:node_hash_map",
"@abseil-cpp//absl/status",
"@envoy_api//envoy/config/core/v3:pkg_cc_proto",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "source/common/network/transport_socket_options_impl.h"
#include "source/common/network/utility.h"
#include "source/common/protobuf/protobuf.h"
#include "source/common/upstream/host_utility.h"

#include "xds/service/orca/v3/orca.pb.h"

Expand Down Expand Up @@ -71,11 +72,9 @@ void applyOrcaOobConnectionOverrides(
OrcaOobManager::OrcaOobManager(OrcaOobManagerConfig config,
const Upstream::PrioritySet& priority_set,
Event::Dispatcher& dispatcher, Random::RandomGenerator& random,
Stats::Scope& stats_scope,
OrcaLoadReportHandlerSharedPtr report_handler)
Stats::Scope& stats_scope)
: dispatcher_(dispatcher), random_(random), config_(sanitizeConfig(std::move(config))),
priority_set_(priority_set), report_handler_(std::move(report_handler)),
oob_stats_(generateOrcaOobStats(stats_scope)) {}
priority_set_(priority_set), oob_stats_(generateOrcaOobStats(stats_scope)) {}

OrcaOobManager::~OrcaOobManager() {
for (auto& [host, session] : oob_sessions_) {
Expand Down Expand Up @@ -385,14 +384,18 @@ void OrcaOobManager::OobSession::onReport(const xds::data::orca::v3::OrcaLoadRep
parent_.oob_stats_.reports_received_.inc();
backoff_->reset();
inactivity_timer_->enableTimer(parent_.config_.reporting_period * kInactivityWatchdogMultiplier);
auto data_opt = host_->typedLbPolicyData<OrcaHostLbPolicyData>();
if (!data_opt.has_value()) {
parent_.oob_stats_.report_errors_.inc();
return;
}
const absl::Status status =
parent_.report_handler_->updateClientSideDataFromOrcaLoadReport(report, *data_opt);
if (!status.ok()) {
// Deliver to every ORCA-interested HostLbPolicyData (empty stream_info; OOB has no downstream
// stream). No recipient, or any recipient rejecting, counts as a report error.
bool had_recipient = false;
bool apply_failed = false;
Upstream::HostUtility::forEachOrcaLoadReportRecipient(
*host_, [&](Upstream::HostLbPolicyData& data) {
had_recipient = true;
if (!data.onOrcaLoadReport(report, std::nullopt).ok()) {
apply_failed = true;
}
});
if (!had_recipient || apply_failed) {
parent_.oob_stats_.report_errors_.inc();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
#include "source/common/grpc/codec.h"
#include "source/common/http/codec_client.h"
#include "source/common/http/response_decoder_impl_base.h"
#include "source/extensions/load_balancing_policies/common/orca_weight_manager.h"

#include "absl/container/node_hash_map.h"
#include "absl/status/status.h"
Expand Down Expand Up @@ -65,20 +64,21 @@ void applyOrcaOobConnectionOverrides(
* Cluster-level manager owning per-host ORCA OOB streams. Modeled on
* source/extensions/health_checkers/grpc/health_checker_impl.h: per-host nested OobSession holds
* a CodecClient and Http stream callbacks; the manager reacts to PrioritySet::addMemberUpdateCb
* to add/remove sessions. All activity runs on the supplied dispatcher's thread; the only
* cross-thread surface is OrcaHostLbPolicyData atomics (workers read; this manager writes via the
* shared OrcaLoadReportHandler).
* to add/remove sessions. All activity runs on the supplied dispatcher's thread. Each decoded
* report is delivered to the host's `HostLbPolicyData` via `onOrcaLoadReport`, the same entry point
* as the in-band request path.
*/
class OrcaOobManager : protected Logger::Loggable<Logger::Id::upstream> {
public:
OrcaOobManager(OrcaOobManagerConfig config, const Upstream::PrioritySet& priority_set,
Event::Dispatcher& dispatcher, Random::RandomGenerator& random,
Stats::Scope& stats_scope, OrcaLoadReportHandlerSharedPtr report_handler);
Stats::Scope& stats_scope);
virtual ~OrcaOobManager();

// Iterate priority set, open a session per existing host, register member-update callback.
// Init order constraint: caller must invoke OrcaWeightManager::initialize() FIRST so the
// OrcaHostLbPolicyData attachment exists before sessions decode their first report.
// Init order: recipients' per-host data must be attached before the first report is decoded
// (for ClientSideWeightedRoundRobin, call OrcaWeightManager::initialize() first); a report that
// finds no recipient is counted as a report error.
absl::Status initialize();

protected:
Expand Down Expand Up @@ -173,7 +173,6 @@ class OrcaOobManager : protected Logger::Loggable<Logger::Id::upstream> {

const OrcaOobManagerConfig config_;
const Upstream::PrioritySet& priority_set_;
OrcaLoadReportHandlerSharedPtr report_handler_;
Envoy::Common::CallbackHandlePtr member_update_cb_;
// node_hash_map for pointer/reference stability across rehash.
absl::node_hash_map<Upstream::HostConstSharedPtr, OobSessionPtr> oob_sessions_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ absl::Status OrcaLoadReportHandler::updateClientSideDataFromOrcaLoadReport(
}

absl::Status OrcaHostLbPolicyData::onOrcaLoadReport(const Upstream::OrcaLoadReport& report,
const StreamInfo::StreamInfo&) {
OptRef<const StreamInfo::StreamInfo>) {
ASSERT(report_handler_ != nullptr);
return report_handler_->updateClientSideDataFromOrcaLoadReport(report, *this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ struct OrcaHostLbPolicyData : public Envoy::Upstream::HostLbPolicyData {

bool receivesOrcaLoadReport() const override { return true; }
absl::Status onOrcaLoadReport(const Upstream::OrcaLoadReport& report,
const StreamInfo::StreamInfo& stream_info) override;
OptRef<const StreamInfo::StreamInfo> stream_info) override;

// Update the weight and timestamps for first and last update time.
void updateWeightNow(uint32_t weight, const MonotonicTime& now) {
Expand Down
9 changes: 5 additions & 4 deletions test/common/router/router_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8448,7 +8448,7 @@ class TestOrcaLoadReportLbData : public Upstream::HostLbPolicyData {
public:
bool receivesOrcaLoadReport() const override { return true; }
MOCK_METHOD(absl::Status, onOrcaLoadReport,
(const Upstream::OrcaLoadReport&, const StreamInfo::StreamInfo&), (override));
(const Upstream::OrcaLoadReport&, OptRef<const StreamInfo::StreamInfo>), (override));
};

TEST_F(RouterTest, OrcaLoadReportCallbacks) {
Expand All @@ -8472,7 +8472,7 @@ TEST_F(RouterTest, OrcaLoadReportCallbacks) {
xds::data::orca::v3::OrcaLoadReport received_orca_load_report;
EXPECT_CALL(*host_lb_policy_data_raw_ptr, onOrcaLoadReport(_, _))
.WillOnce(Invoke([&](const xds::data::orca::v3::OrcaLoadReport& orca_load_report,
const StreamInfo::StreamInfo&) {
OptRef<const StreamInfo::StreamInfo>) {
received_orca_load_report = orca_load_report;
return absl::OkStatus();
}));
Expand Down Expand Up @@ -8526,7 +8526,7 @@ TEST_F(RouterTest, OrcaLoadReportCallbackReturnsError) {
xds::data::orca::v3::OrcaLoadReport received_orca_load_report;
EXPECT_CALL(*host_lb_policy_data_raw_ptr, onOrcaLoadReport(_, _))
.WillOnce(Invoke([&](const xds::data::orca::v3::OrcaLoadReport& orca_load_report,
const StreamInfo::StreamInfo&) {
OptRef<const StreamInfo::StreamInfo>) {
received_orca_load_report = orca_load_report;
// Return an error that gets logged by router filter.
return absl::InvalidArgumentError("Unexpected ORCA load Report");
Expand Down Expand Up @@ -8628,7 +8628,8 @@ TEST_F(RouterTest, OrcaLoadReportSkipsEntriesNotInterestedInOrca) {
public:
bool receivesOrcaLoadReport() const override { return false; }
MOCK_METHOD(absl::Status, onOrcaLoadReport,
(const Upstream::OrcaLoadReport&, const StreamInfo::StreamInfo&), (override));
(const Upstream::OrcaLoadReport&, OptRef<const StreamInfo::StreamInfo>),
(override));
};

auto non_orca_data = std::make_unique<NonOrcaLbData>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,24 @@ using ::testing::AtLeast;
using ::testing::NiceMock;
using ::testing::Return;

// Minimal HostLbPolicyData that records delivered reports; proves the OOB manager delivers to any
// HostLbPolicyData (not just OrcaHostLbPolicyData) via the standard onOrcaLoadReport entry point.
class RecordingLbPolicyData : public Upstream::HostLbPolicyData {
public:
explicit RecordingLbPolicyData(absl::Status status = absl::OkStatus())
: status_(std::move(status)) {}
bool receivesOrcaLoadReport() const override { return true; }
absl::Status onOrcaLoadReport(const Upstream::OrcaLoadReport& report,
OptRef<const StreamInfo::StreamInfo>) override {
++calls_;
last_util_ = report.application_utilization();
return status_;
}
int calls_ = 0;
double last_util_ = -1.0;
absl::Status status_;
};

// MOCK_METHOD returns a raw Http::CodecClient*; createCodecClient wraps in unique_ptr to
// transfer ownership to OobSession.
class TestOrcaOobManager : public OrcaOobManager {
Expand Down Expand Up @@ -90,7 +108,7 @@ class OrcaOobManagerLifecycleTest : public testing::Test, public Event::TestUsin

std::unique_ptr<TestOrcaOobManager> makeManager(const OrcaOobManagerConfig& config) {
return std::make_unique<TestOrcaOobManager>(config, priority_set_, dispatcher_, random_,
*stats_store_.rootScope(), report_handler_);
*stats_store_.rootScope());
}

uint64_t activeOobSessions() {
Expand Down Expand Up @@ -505,9 +523,9 @@ TEST_F(OrcaOobManagerWireTest, GoAwayOtherIsImmediateTransient) {
}

TEST_F(OrcaOobManagerWireTest, ReportWithoutLbPolicyDataIncrementsReportErrors) {
// Host has no OrcaHostLbPolicyData attached (would be done by OrcaWeightManager
// in production; this test simulates the init-order race the architecture
// documents as v1-acceptable). onReport increments report_errors and bails.
// A report that finds no ORCA-interested recipient on the host is counted as a report error
// (e.g. an init-order gap where interested data is not yet attached). reports_received still
// bumps before the delivery attempt.
auto manager = makeManager();
ASSERT_OK(manager->initialize());

Expand All @@ -525,8 +543,70 @@ TEST_F(OrcaOobManagerWireTest, ReportWithoutLbPolicyDataIncrementsReportErrors)
report.set_application_utilization(0.5);
report.set_rps_fractional(1000);
respondReport(*attempt, report);
EXPECT_EQ(oobCounter("reports_received"), 1);
EXPECT_EQ(oobCounter("report_errors"), 1);

EXPECT_CALL(dispatcher_, deferredDelete_(_)).Times(AtLeast(1));
manager.reset();
}

TEST_F(OrcaOobManagerWireTest, ReportDeliveredToArbitraryLbPolicyData) {
// The manager is policy-agnostic: any HostLbPolicyData attached to the host (not just
// OrcaHostLbPolicyData or the weight manager) receives every decoded report via the standard
// onOrcaLoadReport entry point.
auto manager = makeManager();
ASSERT_OK(manager->initialize());

auto* attempt_timer = installAttemptTimer();
auto host = makeWiredHost();
auto data = std::make_unique<RecordingLbPolicyData>();
auto* data_ptr = data.get();
host->addLbPolicyData(std::move(data));
priority_set_.runUpdateCallbacks(0, {host}, {});

auto attempt = makeAttempt();
wireConnectionFor(host, *attempt);
expectCreateCodecClient(*manager, *attempt);
attempt_timer->invokeCallback();

respondHeadersOk(*attempt);
xds::data::orca::v3::OrcaLoadReport report;
report.set_application_utilization(0.75);
respondReport(*attempt, report);

EXPECT_EQ(data_ptr->calls_, 1);
EXPECT_DOUBLE_EQ(data_ptr->last_util_, 0.75);
EXPECT_EQ(oobCounter("reports_received"), 1);
EXPECT_EQ(oobCounter("report_errors"), 0);

EXPECT_CALL(dispatcher_, deferredDelete_(_)).Times(AtLeast(1));
manager.reset();
}

TEST_F(OrcaOobManagerWireTest, LbPolicyDataErrorIncrementsReportErrors) {
// Any non-OK onOrcaLoadReport return is counted as a report_error; the manager does not
// inspect the reason.
auto manager = makeManager();
ASSERT_OK(manager->initialize());

auto* attempt_timer = installAttemptTimer();
auto host = makeWiredHost();
host->addLbPolicyData(
std::make_unique<RecordingLbPolicyData>(absl::InternalError("sink rejected report")));
priority_set_.runUpdateCallbacks(0, {host}, {});

auto attempt = makeAttempt();
wireConnectionFor(host, *attempt);
expectCreateCodecClient(*manager, *attempt);
attempt_timer->invokeCallback();

respondHeadersOk(*attempt);
xds::data::orca::v3::OrcaLoadReport report;
report.set_application_utilization(0.5);
respondReport(*attempt, report);

EXPECT_EQ(oobCounter("reports_received"), 1);
EXPECT_EQ(oobCounter("report_errors"), 1);
EXPECT_EQ(oobCounter("reports_received"), 1); // counter still bumps before the data check

EXPECT_CALL(dispatcher_, deferredDelete_(_)).Times(AtLeast(1));
manager.reset();
Expand Down
Loading