diff --git a/source/common/orca/BUILD b/source/common/orca/BUILD index 79aa43340804..56d750828fb6 100644 --- a/source/common/orca/BUILD +++ b/source/common/orca/BUILD @@ -13,8 +13,11 @@ envoy_cc_library( srcs = ["orca_parser.cc"], hdrs = ["orca_parser.h"], deps = [ + "//envoy/common:exception_lib", "//envoy/http:header_map_interface", "//source/common/common:base64_lib", + "//source/common/http:header_utility_lib", + "//source/common/protobuf:utility_lib_header", "@com_github_cncf_xds//xds/data/orca/v3:pkg_cc_proto", "@com_github_fmtlib_fmt//:fmtlib", "@com_google_absl//absl/status:statusor", diff --git a/source/common/orca/orca_parser.cc b/source/common/orca/orca_parser.cc index a18061bb256c..2dd29bc7944f 100644 --- a/source/common/orca/orca_parser.cc +++ b/source/common/orca/orca_parser.cc @@ -1,13 +1,27 @@ #include "source/common/orca/orca_parser.h" +#include +#include #include +#include +#include +#include "envoy/common/exception.h" #include "envoy/http/header_map.h" #include "source/common/common/base64.h" #include "source/common/common/fmt.h" +#include "source/common/http/header_utility.h" +#include "source/common/protobuf/utility.h" +#include "absl/container/flat_hash_set.h" +#include "absl/status/status.h" +#include "absl/strings/match.h" +#include "absl/strings/numbers.h" +#include "absl/strings/str_cat.h" +#include "absl/strings/str_split.h" #include "absl/strings/string_view.h" +#include "absl/strings/strip.h" using ::Envoy::Http::HeaderMap; using xds::data::orca::v3::OrcaLoadReport; @@ -17,29 +31,147 @@ namespace Orca { namespace { +const Http::LowerCaseString& endpointLoadMetricsHeader() { + CONSTRUCT_ON_FIRST_USE(Http::LowerCaseString, kEndpointLoadMetricsHeader); +} + const Http::LowerCaseString& endpointLoadMetricsHeaderBin() { CONSTRUCT_ON_FIRST_USE(Http::LowerCaseString, kEndpointLoadMetricsHeaderBin); } +absl::Status tryCopyNamedMetricToOrcaLoadReport(absl::string_view metric_name, double metric_value, + OrcaLoadReport& orca_load_report) { + if (metric_name.empty()) { + return absl::InvalidArgumentError("named metric key is empty."); + } + + orca_load_report.mutable_named_metrics()->insert({std::string(metric_name), metric_value}); + return absl::OkStatus(); +} + +std::vector parseCommaDelimitedHeader(const absl::string_view entry) { + std::vector values; + std::vector tokens = + Envoy::Http::HeaderUtility::parseCommaDelimitedHeader(entry); + values.insert(values.end(), tokens.begin(), tokens.end()); + return values; +} + +absl::Status tryCopyMetricToOrcaLoadReport(absl::string_view metric_name, + absl::string_view metric_value, + OrcaLoadReport& orca_load_report) { + if (metric_name.empty()) { + return absl::InvalidArgumentError("metric names cannot be empty strings"); + } + + if (metric_value.empty()) { + return absl::InvalidArgumentError("metric values cannot be empty strings"); + } + + double value; + if (!absl::SimpleAtod(metric_value, &value)) { + return absl::InvalidArgumentError(fmt::format( + "unable to parse custom backend load metric value({}): {}", metric_name, metric_value)); + } + + if (std::isnan(value)) { + return absl::InvalidArgumentError( + fmt::format("custom backend load metric value({}) cannot be NaN.", metric_name)); + } + + if (std::isinf(value)) { + return absl::InvalidArgumentError( + fmt::format("custom backend load metric value({}) cannot be infinity.", metric_name)); + } + + if (absl::StartsWith(metric_name, kNamedMetricsFieldPrefix)) { + auto metric_name_without_prefix = absl::StripPrefix(metric_name, kNamedMetricsFieldPrefix); + return tryCopyNamedMetricToOrcaLoadReport(metric_name_without_prefix, value, orca_load_report); + } + + if (metric_name == kCpuUtilizationField) { + orca_load_report.set_cpu_utilization(value); + } else if (metric_name == kMemUtilizationField) { + orca_load_report.set_mem_utilization(value); + } else if (metric_name == kApplicationUtilizationField) { + orca_load_report.set_application_utilization(value); + } else if (metric_name == kEpsField) { + orca_load_report.set_eps(value); + } else if (metric_name == kRpsFractionalField) { + orca_load_report.set_rps_fractional(value); + } else { + return absl::InvalidArgumentError(absl::StrCat("unsupported metric name: ", metric_name)); + } + return absl::OkStatus(); +} + +absl::Status tryParseNativeHttpEncoded(const absl::string_view header, + OrcaLoadReport& orca_load_report) { + const std::vector values = parseCommaDelimitedHeader(header); + + // Check for duplicate metric names here because OrcaLoadReport fields are not + // marked as optional and therefore don't differentiate between unset and + // default values. + absl::flat_hash_set metric_names; + for (const auto value : values) { + std::pair entry = + absl::StrSplit(value, absl::MaxSplits(':', 1), absl::SkipWhitespace()); + if (metric_names.contains(entry.first)) { + return absl::AlreadyExistsError( + absl::StrCat(kEndpointLoadMetricsHeader, " contains duplicate metric: ", entry.first)); + } + RETURN_IF_NOT_OK(tryCopyMetricToOrcaLoadReport(entry.first, entry.second, orca_load_report)); + metric_names.insert(entry.first); + } + return absl::OkStatus(); +} + +absl::Status tryParseSerializedBinary(const absl::string_view header, + OrcaLoadReport& orca_load_report) { + if (header.empty()) { + return absl::InvalidArgumentError("ORCA binary header value is empty"); + } + const std::string decoded_value = Envoy::Base64::decode(header); + if (decoded_value.empty()) { + return absl::InvalidArgumentError( + fmt::format("unable to decode ORCA binary header value: {}", header)); + } + if (!orca_load_report.ParseFromString(decoded_value)) { + return absl::InvalidArgumentError( + fmt::format("unable to parse binaryheader to OrcaLoadReport: {}", header)); + } + return absl::OkStatus(); +} + } // namespace absl::StatusOr parseOrcaLoadReportHeaders(const HeaderMap& headers) { OrcaLoadReport load_report; - // Binary protobuf format. + // Binary protobuf format. Legacy header from gRPC implementation. if (const auto header_bin = headers.get(endpointLoadMetricsHeaderBin()); !header_bin.empty()) { const auto header_value = header_bin[0]->value().getStringView(); - if (header_value.empty()) { - return absl::InvalidArgumentError("ORCA binary header value is empty"); - } - const std::string decoded_value = Envoy::Base64::decode(header_value); - if (decoded_value.empty()) { - return absl::InvalidArgumentError( - fmt::format("unable to decode ORCA binary header value: {}", header_value)); - } - if (!load_report.ParseFromString(decoded_value)) { + RETURN_IF_NOT_OK(tryParseSerializedBinary(header_value, load_report)); + } else if (const auto header = headers.get(endpointLoadMetricsHeader()); !header.empty()) { + std::pair split_header = + absl::StrSplit(header[0]->value().getStringView(), absl::MaxSplits(' ', 1)); + + if (split_header.first == kHeaderFormatPrefixBin) { // Binary protobuf format. + RETURN_IF_NOT_OK(tryParseSerializedBinary(split_header.second, load_report)); + } else if (split_header.first == kHeaderFormatPrefixText) { // Native HTTP format. + RETURN_IF_NOT_OK(tryParseNativeHttpEncoded(split_header.second, load_report)); + } else if (split_header.first == kHeaderFormatPrefixJson) { // JSON format. +#if defined(ENVOY_ENABLE_FULL_PROTOS) && defined(ENVOY_ENABLE_YAML) + const std::string json_string = std::string(split_header.second); + bool has_unknown_field = false; + RETURN_IF_ERROR( + Envoy::MessageUtil::loadFromJsonNoThrow(json_string, load_report, has_unknown_field)); +#else + IS_ENVOY_BUG("JSON formatted ORCA header support not implemented for this build"); +#endif // !ENVOY_ENABLE_FULL_PROTOS || !ENVOY_ENABLE_YAML + } else { return absl::InvalidArgumentError( - fmt::format("unable to parse binaryheader to OrcaLoadReport: {}", header_value)); + fmt::format("unsupported ORCA header format: {}", split_header.first)); } } else { return absl::NotFoundError("no ORCA data sent from the backend"); diff --git a/source/common/orca/orca_parser.h b/source/common/orca/orca_parser.h index 86fd23944017..6c6f4552757c 100644 --- a/source/common/orca/orca_parser.h +++ b/source/common/orca/orca_parser.h @@ -8,11 +8,24 @@ namespace Envoy { namespace Orca { -// Header used to send ORCA load metrics from the backend. +// Headers used to send ORCA load metrics from the backend. +static constexpr absl::string_view kEndpointLoadMetricsHeader = "endpoint-load-metrics"; static constexpr absl::string_view kEndpointLoadMetricsHeaderBin = "endpoint-load-metrics-bin"; +// Prefix used to determine format expected in kEndpointLoadMetricsHeader. +static constexpr absl::string_view kHeaderFormatPrefixBin = "BIN"; +static constexpr absl::string_view kHeaderFormatPrefixJson = "JSON"; +static constexpr absl::string_view kHeaderFormatPrefixText = "TEXT"; +// The following fields are the names of the metrics tracked in the ORCA load +// report proto. +static constexpr absl::string_view kApplicationUtilizationField = "application_utilization"; +static constexpr absl::string_view kCpuUtilizationField = "cpu_utilization"; +static constexpr absl::string_view kMemUtilizationField = "mem_utilization"; +static constexpr absl::string_view kEpsField = "eps"; +static constexpr absl::string_view kRpsFractionalField = "rps_fractional"; +static constexpr absl::string_view kNamedMetricsFieldPrefix = "named_metrics."; // Parses ORCA load metrics from a header map into an OrcaLoadReport proto. -// Supports serialized binary formats. +// Supports native HTTP, JSON and serialized binary formats. absl::StatusOr parseOrcaLoadReportHeaders(const Envoy::Http::HeaderMap& headers); } // namespace Orca diff --git a/test/common/orca/orca_parser_test.cc b/test/common/orca/orca_parser_test.cc index 6b56f72d55aa..b86fcc7d31cc 100644 --- a/test/common/orca/orca_parser_test.cc +++ b/test/common/orca/orca_parser_test.cc @@ -1,4 +1,4 @@ -#include +#include #include "source/common/common/base64.h" #include "source/common/orca/orca_parser.h" @@ -7,12 +7,25 @@ #include "test/test_common/utility.h" #include "absl/status/status.h" +#include "absl/strings/str_cat.h" #include "xds/data/orca/v3/orca_load_report.pb.h" namespace Envoy { namespace Orca { namespace { +const std::string formattedHeaderPrefixText() { + CONSTRUCT_ON_FIRST_USE(std::string, absl::StrCat(kHeaderFormatPrefixText, " ")); +} + +const std::string formattedHeaderPrefixJson() { + CONSTRUCT_ON_FIRST_USE(std::string, absl::StrCat(kHeaderFormatPrefixJson, " ")); +} + +const std::string formattedHeaderPrefixBin() { + CONSTRUCT_ON_FIRST_USE(std::string, absl::StrCat(kHeaderFormatPrefixBin, " ")); +} + // Returns an example OrcaLoadReport proto with all fields populated. static xds::data::orca::v3::OrcaLoadReport exampleOrcaLoadReport() { xds::data::orca::v3::OrcaLoadReport orca_load_report; @@ -42,7 +55,157 @@ TEST(OrcaParserUtilTest, MissingOrcaHeaders) { StatusHelpers::HasStatus(absl::NotFoundError("no ORCA data sent from the backend"))); } -TEST(OrcaParserUtilTest, BinaryHeader) { +TEST(OrcaParserUtilTest, InvalidOrcaHeaderPrefix) { + // Verify that error is returned when unknown/invalid prefix is found in ORCA + // header value. + Http::TestRequestHeaderMapImpl headers{ + {std::string(kEndpointLoadMetricsHeader), "BAD random-value"}}; + EXPECT_THAT( + parseOrcaLoadReportHeaders(headers), + StatusHelpers::HasStatus(absl::InvalidArgumentError("unsupported ORCA header format: BAD"))); +} + +TEST(OrcaParserUtilTest, EmptyOrcaHeader) { + Http::TestRequestHeaderMapImpl headers{{std::string(kEndpointLoadMetricsHeader), ""}}; + EXPECT_THAT( + parseOrcaLoadReportHeaders(headers), + StatusHelpers::HasStatus(absl::InvalidArgumentError("unsupported ORCA header format: "))); +} + +TEST(OrcaParserUtilTest, NativeHttpEncodedHeader) { + Http::TestRequestHeaderMapImpl headers{ + {std::string(kEndpointLoadMetricsHeader), + absl::StrCat(formattedHeaderPrefixText(), + "cpu_utilization:0.7,application_utilization:0.8,mem_utilization:0.9," + "rps_fractional:1000,eps:2," + "named_metrics.foo:123,named_metrics.bar:0.2")}}; + EXPECT_THAT(parseOrcaLoadReportHeaders(headers), + StatusHelpers::IsOkAndHolds(ProtoEq(exampleOrcaLoadReport()))); +} + +TEST(OrcaParserUtilTest, NativeHttpEncodedHeaderIncorrectFieldType) { + Http::TestRequestHeaderMapImpl headers{ + {std::string(kEndpointLoadMetricsHeader), + absl::StrCat(formattedHeaderPrefixText(), "cpu_utilization:\"0.7\"")}}; + EXPECT_THAT(parseOrcaLoadReportHeaders(headers), + StatusHelpers::HasStatus( + absl::InvalidArgumentError("unable to parse custom backend load metric " + "value(cpu_utilization): \"0.7\""))); +} + +TEST(OrcaParserUtilTest, NativeHttpEncodedHeaderNanMetricValue) { + Http::TestRequestHeaderMapImpl headers{ + {std::string(kEndpointLoadMetricsHeader), + absl::StrCat(formattedHeaderPrefixText(), + "cpu_utilization:", std::numeric_limits::quiet_NaN())}}; + EXPECT_THAT(parseOrcaLoadReportHeaders(headers), + StatusHelpers::HasStatus(absl::InvalidArgumentError( + "custom backend load metric value(cpu_utilization) cannot be NaN."))); +} + +TEST(OrcaParserUtilTest, NativeHttpEncodedHeaderInfinityMetricValue) { + Http::TestRequestHeaderMapImpl headers{ + {std::string(kEndpointLoadMetricsHeader), + absl::StrCat(formattedHeaderPrefixText(), + "cpu_utilization:", std::numeric_limits::infinity())}}; + EXPECT_THAT(parseOrcaLoadReportHeaders(headers), + StatusHelpers::HasStatus(absl::InvalidArgumentError( + "custom backend load metric value(cpu_utilization) cannot be " + "infinity."))); +} + +TEST(OrcaParserUtilTest, NativeHttpEncodedHeaderContainsDuplicateMetric) { + Http::TestRequestHeaderMapImpl headers{ + {std::string(kEndpointLoadMetricsHeader), + absl::StrCat(formattedHeaderPrefixText(), "cpu_utilization:0.7,cpu_utilization:0.8")}}; + EXPECT_THAT(parseOrcaLoadReportHeaders(headers), + StatusHelpers::HasStatus(absl::AlreadyExistsError(absl::StrCat( + kEndpointLoadMetricsHeader, " contains duplicate metric: cpu_utilization")))); +} + +TEST(OrcaParserUtilTest, NativeHttpEncodedHeaderUnsupportedMetric) { + Http::TestRequestHeaderMapImpl headers{ + {std::string(kEndpointLoadMetricsHeader), + absl::StrCat(formattedHeaderPrefixText(), "cpu_utilization:0.7,unsupported_metric:0.8")}}; + EXPECT_THAT(parseOrcaLoadReportHeaders(headers), + StatusHelpers::HasStatus( + absl::InvalidArgumentError("unsupported metric name: unsupported_metric"))); +} + +TEST(OrcaParserUtilTest, NativeHttpEncodedHeaderContainsDuplicateNamedMetric) { + Http::TestRequestHeaderMapImpl headers{ + {std::string(kEndpointLoadMetricsHeader), + absl::StrCat( + formattedHeaderPrefixText(), + "named_metrics.foo:123,named_metrics.duplicate:123,named_metrics.duplicate:0.2")}}; + EXPECT_THAT( + parseOrcaLoadReportHeaders(headers), + StatusHelpers::HasStatus(absl::AlreadyExistsError(absl::StrCat( + kEndpointLoadMetricsHeader, " contains duplicate metric: named_metrics.duplicate")))); +} + +TEST(OrcaParserUtilTest, NativeHttpEncodedHeaderContainsEmptyNamedMetricKey) { + Http::TestRequestHeaderMapImpl headers{ + {std::string(kEndpointLoadMetricsHeader), + absl::StrCat(formattedHeaderPrefixText(), "named_metrics.:123")}}; + EXPECT_THAT(parseOrcaLoadReportHeaders(headers), + StatusHelpers::HasStatus(absl::InvalidArgumentError("named metric key is empty."))); +} + +TEST(OrcaParserUtilTest, InvalidNativeHttpEncodedHeader) { + Http::TestRequestHeaderMapImpl headers{ + {std::string(kEndpointLoadMetricsHeader), + absl::StrCat(formattedHeaderPrefixText(), "not-a-list-of-key-value-pairs")}}; + EXPECT_THAT(parseOrcaLoadReportHeaders(headers), + StatusHelpers::HasStatus( + absl::InvalidArgumentError("metric values cannot be empty strings"))); +} + +TEST(OrcaParserUtilTest, JsonHeader) { + Http::TestRequestHeaderMapImpl headers{ + {std::string(kEndpointLoadMetricsHeader), + absl::StrCat(formattedHeaderPrefixJson(), + "{\"cpu_utilization\": 0.7, \"application_utilization\": 0.8, " + "\"mem_utilization\": 0.9, \"rps_fractional\": 1000, \"eps\": 2, " + "\"named_metrics\": {\"foo\": 123,\"bar\": 0.2}}")}}; + EXPECT_THAT(parseOrcaLoadReportHeaders(headers), + StatusHelpers::IsOkAndHolds(ProtoEq(exampleOrcaLoadReport()))); +} + +TEST(OrcaParserUtilTest, InvalidJsonHeader) { + Http::TestRequestHeaderMapImpl headers{ + {std::string(kEndpointLoadMetricsHeader), + absl::StrCat(formattedHeaderPrefixJson(), "JSON not-a-valid-json-string")}}; + EXPECT_THAT(parseOrcaLoadReportHeaders(headers), + StatusHelpers::HasStatus(absl::StatusCode::kInvalidArgument, + testing::HasSubstr("invalid JSON"))); +} + +TEST(OrcaParserUtilTest, JsonHeaderUnknownField) { + Http::TestRequestHeaderMapImpl headers{ + {std::string(kEndpointLoadMetricsHeader), + absl::StrCat(formattedHeaderPrefixJson(), + "{\"cpu_utilization\": 0.7, \"application_utilization\": 0.8, " + "\"mem_utilization\": 0.9, \"rps_fractional\": 1000, \"eps\": 2, " + "\"unknown_field\": 2," + "\"named_metrics\": {\"foo\": 123,\"bar\": 0.2}}")}}; + EXPECT_THAT(parseOrcaLoadReportHeaders(headers), + StatusHelpers::HasStatus(absl::StatusCode::kInvalidArgument, + testing::HasSubstr("invalid JSON"))); +} + +TEST(OrcaParserUtilTest, JsonHeaderIncorrectFieldType) { + Http::TestRequestHeaderMapImpl headers{ + {std::string(kEndpointLoadMetricsHeader), + absl::StrCat(formattedHeaderPrefixJson(), "{\"cpu_utilization\": \"0.7\"")}}; + EXPECT_THAT(parseOrcaLoadReportHeaders(headers), + StatusHelpers::HasStatus(absl::StatusCode::kInvalidArgument, + testing::HasSubstr("invalid JSON"))); +} + +TEST(OrcaParserUtilTest, LegacyBinaryHeader) { + // Verify processing of headers sent in legacy ORCA header inherited from gRPC + // implementation works as intended. const std::string proto_string = TestUtility::getProtobufBinaryStringFromMessage(exampleOrcaLoadReport()); const auto orca_load_report_header_bin = @@ -53,6 +216,20 @@ TEST(OrcaParserUtilTest, BinaryHeader) { StatusHelpers::IsOkAndHolds(ProtoEq(exampleOrcaLoadReport()))); } +TEST(OrcaParserUtilTest, BinaryHeader) { + // Verify serialized binary header processing when using default ORCA header + // and appropriate format prefix in the header value. + const std::string proto_string = + TestUtility::getProtobufBinaryStringFromMessage(exampleOrcaLoadReport()); + const auto orca_load_report_header_bin = + Envoy::Base64::encode(proto_string.c_str(), proto_string.length()); + Http::TestRequestHeaderMapImpl headers{ + {std::string(kEndpointLoadMetricsHeader), + absl::StrCat(formattedHeaderPrefixBin(), orca_load_report_header_bin)}}; + EXPECT_THAT(parseOrcaLoadReportHeaders(headers), + StatusHelpers::IsOkAndHolds(ProtoEq(exampleOrcaLoadReport()))); +} + TEST(OrcaParserUtilTest, InvalidBinaryHeader) { const std::string proto_string = TestUtility::getProtobufBinaryStringFromMessage(exampleOrcaLoadReport()); @@ -84,6 +261,20 @@ TEST(OrcaParserUtilTest, EmptyBinaryHeader) { testing::HasSubstr("ORCA binary header value is empty"))); } +TEST(OrcaParserUtilTest, BinHeaderPrecedence) { + // Verifies that the order of precedence (binary proto over native http + // format) is observed when multiple ORCA headers are sent from the backend. + const std::string proto_string = + TestUtility::getProtobufBinaryStringFromMessage(exampleOrcaLoadReport()); + const auto orca_load_report_header_bin = + Envoy::Base64::encode(proto_string.c_str(), proto_string.length()); + Http::TestRequestHeaderMapImpl headers{ + {std::string(kEndpointLoadMetricsHeader), "cpu_utilization:0.7"}, + {std::string(kEndpointLoadMetricsHeaderBin), orca_load_report_header_bin}}; + EXPECT_THAT(parseOrcaLoadReportHeaders(headers), + StatusHelpers::IsOkAndHolds(ProtoEq(exampleOrcaLoadReport()))); +} + } // namespace } // namespace Orca } // namespace Envoy