diff --git a/api/envoy/extensions/filters/http/ext_proc/v3/ext_proc.proto b/api/envoy/extensions/filters/http/ext_proc/v3/ext_proc.proto index 9874208aabdf..ebcc53d774d4 100644 --- a/api/envoy/extensions/filters/http/ext_proc/v3/ext_proc.proto +++ b/api/envoy/extensions/filters/http/ext_proc/v3/ext_proc.proto @@ -28,7 +28,6 @@ option (udpa.annotations.file_status).package_version_status = ACTIVE; // **Current Implementation Status:** // All options and processing modes are implemented except for the following: // -// * Dynamic metadata in responses from the external processor is ignored. // * "async mode" is not implemented. // The filter communicates with an external gRPC service called an "external processor" @@ -97,7 +96,7 @@ option (udpa.annotations.file_status).package_version_status = ACTIVE; // ` object in a namespace matching the filter // name. // -// [#next-free-field: 16] +// [#next-free-field: 17] message ExternalProcessor { // Configuration for the gRPC service that the filter will communicate with. // The filter supports both the "Envoy" and "Google" gRPC clients. @@ -203,6 +202,35 @@ message ExternalProcessor { // Instead, the stream to the external processor will be closed. There will be no // more external processing for this stream from now on. bool disable_immediate_response = 15; + + // Options related to the sending and receiving of dynamic metadata. + MetadataOptions metadata_options = 16; +} + +// The MetadataOptions structure defines options for the sending and receiving of +// dynamic metadata. Specifically, which namespaces to send to the server, whether +// metadata returned by the server may be written, and how that metadata may be written. +message MetadataOptions { + message MetadataNamespaces { + // Specifies a list of metadata namespaces whose values, if present, + // will be passed to the ext_proc service as an opaque *protobuf::Struct*. + repeated string untyped = 1; + + // Specifies a list of metadata namespaces whose values, if present, + // will be passed to the ext_proc service as a *protobuf::Any*. This allows + // envoy and the external processing server to share the protobuf message + // definition for safe parsing. + repeated string typed = 2; + } + + // Describes which typed or untyped dynamic metadata namespaces to forward to + // the external processing server. + MetadataNamespaces forwarding_namespaces = 1; + + // Describes which typed or untyped dynamic metadata namespaces to accept from + // the external processing server. Set to empty or leave unset to disallow writing + // any received dynamic metadata. Receiving of typed metadata is not supported. + MetadataNamespaces receiving_namespaces = 2; } // The HeaderForwardingRules structure specifies what headers are @@ -245,7 +273,7 @@ message ExtProcPerRoute { } // Overrides that may be set on a per-route basis -// [#next-free-field: 6] +// [#next-free-field: 7] message ExtProcOverrides { // Set a different processing mode for this route than the default. ProcessingMode processing_mode = 1; @@ -266,4 +294,11 @@ message ExtProcOverrides { // Set a different gRPC service for this route than the default. config.core.v3.GrpcService grpc_service = 5; + + // Options related to the sending and receiving of dynamic metadata. + // Lists of forwarding and receiving namespaces will be overridden in their entirety, + // meaning the most-specific config that specifies this override will be the final + // config used. It is the prerogative of the control plane to ensure this + // most-specific config contains the correct final overrides. + MetadataOptions metadata_options = 6; } diff --git a/api/envoy/service/ext_proc/v3/external_processor.proto b/api/envoy/service/ext_proc/v3/external_processor.proto index 50fba503f846..0813bdf6d724 100644 --- a/api/envoy/service/ext_proc/v3/external_processor.proto +++ b/api/envoy/service/ext_proc/v3/external_processor.proto @@ -56,7 +56,7 @@ service ExternalProcessor { // This represents the different types of messages that Envoy can send // to an external processing server. -// [#next-free-field: 8] +// [#next-free-field: 9] message ProcessingRequest { // Specify whether the filter that sent this request is running in synchronous // or asynchronous mode. The choice of synchronous or asynchronous mode @@ -115,6 +115,9 @@ message ProcessingRequest { // in the filter configuration. HttpTrailers response_trailers = 7; } + + // Dynamic metadata associated with the request. + config.core.v3.Metadata metadata_context = 8; } // For every ProcessingRequest received by the server with the ``async_mode`` field @@ -158,9 +161,9 @@ message ProcessingResponse { ImmediateResponse immediate_response = 7; } - // [#not-implemented-hide:] - // Optional metadata that will be emitted as dynamic metadata to be consumed by the next - // filter. This metadata will be placed in the namespace ``envoy.filters.http.ext_proc``. + // Optional metadata that will be emitted as dynamic metadata to be consumed by + // following filters. This metadata will be placed in the namespace(s) specified by the top-level + // field name(s) of the struct. google.protobuf.Struct dynamic_metadata = 8; // Override how parts of the HTTP request and response are processed diff --git a/changelogs/current.yaml b/changelogs/current.yaml index 3c5d29ced017..4efd550cd6be 100644 --- a/changelogs/current.yaml +++ b/changelogs/current.yaml @@ -28,5 +28,13 @@ new_features: and :ref:`response_attributes ` config APIs to enable sending and receiving attributes to/from the external processing server. +- area: ext_proc + change: | + added + :ref:`metadata_options ` + config API to enable sending and receiving metadata from/to the external processing server. Both typed and untyped dynamic + metadata may be sent to the server. If + :ref:`receiving_namespaces ` + is defined, returned metadata may be written to the specified allowed namespaces. deprecated: diff --git a/source/extensions/filters/http/ext_proc/ext_proc.cc b/source/extensions/filters/http/ext_proc/ext_proc.cc index 2a3ac43d314c..4b41dbdc57a2 100644 --- a/source/extensions/filters/http/ext_proc/ext_proc.cc +++ b/source/extensions/filters/http/ext_proc/ext_proc.cc @@ -115,6 +115,49 @@ ExtProcLoggingInfo::grpcCalls(envoy::config::core::v3::TrafficDirection traffic_ : encoding_processor_grpc_calls_; } +std::vector +FilterConfigPerRoute::initNamespaces(const Protobuf::RepeatedPtrField& ns) { + if (ns.empty()) { + return {}; + } + + std::vector namespaces; + for (const auto& single_ns : ns) { + namespaces.emplace_back(single_ns); + } + return namespaces; +} + +absl::optional> +FilterConfigPerRoute::initUntypedForwardingNamespaces(const ExtProcPerRoute& config) { + if (!config.has_overrides() || !config.overrides().has_metadata_options() || + !config.overrides().metadata_options().has_forwarding_namespaces()) { + return absl::nullopt; + } + + return {initNamespaces(config.overrides().metadata_options().forwarding_namespaces().untyped())}; +} + +absl::optional> +FilterConfigPerRoute::initTypedForwardingNamespaces(const ExtProcPerRoute& config) { + if (!config.has_overrides() || !config.overrides().has_metadata_options() || + !config.overrides().metadata_options().has_forwarding_namespaces()) { + return absl::nullopt; + } + + return {initNamespaces(config.overrides().metadata_options().forwarding_namespaces().typed())}; +} + +absl::optional> +FilterConfigPerRoute::initUntypedReceivingNamespaces(const ExtProcPerRoute& config) { + if (!config.has_overrides() || !config.overrides().has_metadata_options() || + !config.overrides().metadata_options().has_receiving_namespaces()) { + return absl::nullopt; + } + + return {initNamespaces(config.overrides().metadata_options().receiving_namespaces().untyped())}; +} + absl::optional FilterConfigPerRoute::initProcessingMode(const ExtProcPerRoute& config) { if (!config.disabled() && config.has_overrides() && config.overrides().has_processing_mode()) { @@ -142,14 +185,26 @@ FilterConfigPerRoute::mergeProcessingMode(const FilterConfigPerRoute& less_speci FilterConfigPerRoute::FilterConfigPerRoute(const ExtProcPerRoute& config) : disabled_(config.disabled()), processing_mode_(initProcessingMode(config)), - grpc_service_(initGrpcService(config)) {} + grpc_service_(initGrpcService(config)), + untyped_forwarding_namespaces_(initUntypedForwardingNamespaces(config)), + typed_forwarding_namespaces_(initTypedForwardingNamespaces(config)), + untyped_receiving_namespaces_(initUntypedReceivingNamespaces(config)) {} FilterConfigPerRoute::FilterConfigPerRoute(const FilterConfigPerRoute& less_specific, const FilterConfigPerRoute& more_specific) : disabled_(more_specific.disabled()), processing_mode_(mergeProcessingMode(less_specific, more_specific)), grpc_service_(more_specific.grpcService().has_value() ? more_specific.grpcService() - : less_specific.grpcService()) {} + : less_specific.grpcService()), + untyped_forwarding_namespaces_(more_specific.untypedForwardingMetadataNamespaces().has_value() + ? more_specific.untypedForwardingMetadataNamespaces() + : less_specific.untypedForwardingMetadataNamespaces()), + typed_forwarding_namespaces_(more_specific.typedForwardingMetadataNamespaces().has_value() + ? more_specific.typedForwardingMetadataNamespaces() + : less_specific.typedForwardingMetadataNamespaces()), + untyped_receiving_namespaces_(more_specific.untypedReceivingMetadataNamespaces().has_value() + ? more_specific.untypedReceivingMetadataNamespaces() + : less_specific.untypedReceivingMetadataNamespaces()) {} void Filter::setDecoderFilterCallbacks(Http::StreamDecoderFilterCallbacks& callbacks) { Http::PassThroughFilter::setDecoderFilterCallbacks(callbacks); @@ -236,6 +291,7 @@ FilterHeadersStatus Filter::onHeaders(ProcessorState& state, state.setHeaders(&headers); state.setHasNoBody(end_stream); ProcessingRequest req; + addDynamicMetadata(state, req); auto* headers_req = state.mutableHeaders(req); MutationUtils::headersToProto(headers, config_->allowedHeaders(), config_->disallowedHeaders(), *headers_req->mutable_headers()); @@ -265,8 +321,8 @@ FilterHeadersStatus Filter::decodeHeaders(RequestHeaderMap& headers, bool end_st if (config_->expressionManager().hasRequestExpr()) { auto activation_ptr = Filters::Common::Expr::createActivation( - &config_->expressionManager().localInfo(), decoding_state_.streamInfo(), &headers, - nullptr, nullptr); + &config_->expressionManager().localInfo(), decoding_state_.callbacks()->streamInfo(), + &headers, nullptr, nullptr); proto = config_->expressionManager().evaluateRequestAttributes(*activation_ptr); } @@ -553,8 +609,8 @@ FilterHeadersStatus Filter::encodeHeaders(ResponseHeaderMap& headers, bool end_s if (config_->expressionManager().hasResponseExpr()) { auto activation_ptr = Filters::Common::Expr::createActivation( - &config_->expressionManager().localInfo(), encoding_state_.streamInfo(), nullptr, - &headers, nullptr); + &config_->expressionManager().localInfo(), encoding_state_.callbacks()->streamInfo(), + nullptr, &headers, nullptr); proto = config_->expressionManager().evaluateResponseAttributes(*activation_ptr); } @@ -594,6 +650,7 @@ ProcessingRequest Filter::setupBodyChunk(ProcessorState& state, const Buffer::In bool end_stream) { ENVOY_LOG(debug, "Sending a body chunk of {} bytes, end_stream {}", data.length(), end_stream); ProcessingRequest req; + addDynamicMetadata(state, req); auto* body_req = state.mutableBody(req); body_req->set_end_of_stream(end_stream); body_req->set_body(data.toString()); @@ -610,6 +667,7 @@ void Filter::sendBodyChunk(ProcessorState& state, ProcessorState::CallbackState void Filter::sendTrailers(ProcessorState& state, const Http::HeaderMap& trailers) { ProcessingRequest req; + addDynamicMetadata(state, req); auto* trailers_req = state.mutableTrailers(req); MutationUtils::headersToProto(trailers, config_->allowedHeaders(), config_->disallowedHeaders(), *trailers_req->mutable_trailers()); @@ -663,6 +721,93 @@ void Filter::onNewTimeout(const ProtobufWkt::Duration& override_message_timeout) stats_.override_message_timeout_received_.inc(); } +void Filter::addDynamicMetadata(const ProcessorState& state, ProcessingRequest& req) { + // get the callbacks from the ProcessorState. This will be the appropriate + // callbacks for the current state of the filter + auto* cb = state.callbacks(); + envoy::config::core::v3::Metadata forwarding_metadata; + + // If metadata_context_namespaces is specified, pass matching filter metadata to the ext_proc + // service. If metadata key is set in both the connection and request metadata then the value + // will be the request metadata value. The metadata will only be searched for the callbacks + // corresponding to the traffic direction at the time of the external processing request. + const auto& request_metadata = cb->streamInfo().dynamicMetadata().filter_metadata(); + for (const auto& context_key : state.untypedForwardingMetadataNamespaces()) { + if (const auto metadata_it = request_metadata.find(context_key); + metadata_it != request_metadata.end()) { + (*forwarding_metadata.mutable_filter_metadata())[metadata_it->first] = metadata_it->second; + } else if (cb->connection().has_value()) { + const auto& connection_metadata = + cb->connection().value().get().streamInfo().dynamicMetadata().filter_metadata(); + if (const auto metadata_it = connection_metadata.find(context_key); + metadata_it != connection_metadata.end()) { + (*forwarding_metadata.mutable_filter_metadata())[metadata_it->first] = metadata_it->second; + } + } + } + + // If typed_metadata_context_namespaces is specified, pass matching typed filter metadata to the + // ext_proc service. If metadata key is set in both the connection and request metadata then + // the value will be the request metadata value. The metadata will only be searched for the + // callbacks corresponding to the traffic direction at the time of the external processing + // request. + const auto& request_typed_metadata = cb->streamInfo().dynamicMetadata().typed_filter_metadata(); + for (const auto& context_key : state.typedForwardingMetadataNamespaces()) { + if (const auto metadata_it = request_typed_metadata.find(context_key); + metadata_it != request_typed_metadata.end()) { + (*forwarding_metadata.mutable_typed_filter_metadata())[metadata_it->first] = + metadata_it->second; + } else if (cb->connection().has_value()) { + const auto& connection_typed_metadata = + cb->connection().value().get().streamInfo().dynamicMetadata().typed_filter_metadata(); + if (const auto metadata_it = connection_typed_metadata.find(context_key); + metadata_it != connection_typed_metadata.end()) { + (*forwarding_metadata.mutable_typed_filter_metadata())[metadata_it->first] = + metadata_it->second; + } + } + } + + *req.mutable_metadata_context() = forwarding_metadata; +} + +void Filter::setDynamicMetadata(Http::StreamFilterCallbacks* cb, const ProcessorState& state, + const ProcessingResponse& response) { + if (state.untypedReceivingMetadataNamespaces().empty() || !response.has_dynamic_metadata()) { + if (response.has_dynamic_metadata()) { + ENVOY_LOG(debug, "processing response included dynamic metadata, but no receiving " + "namespaces are configured."); + } + return; + } + + auto response_metadata = response.dynamic_metadata().fields(); + auto receiving_namespaces = state.untypedReceivingMetadataNamespaces(); + for (const auto& context_key : response_metadata) { + bool found_allowed_namespace = false; + if (auto metadata_it = + std::find(receiving_namespaces.begin(), receiving_namespaces.end(), context_key.first); + metadata_it != receiving_namespaces.end()) { + cb->streamInfo().setDynamicMetadata(context_key.first, + response_metadata.at(context_key.first).struct_value()); + found_allowed_namespace = true; + } + if (!found_allowed_namespace) { + ENVOY_LOG(debug, + "processing response included dynamic metadata for namespace not " + "configured for receiving: {}", + context_key.first); + } + } +} + +void Filter::setEncoderDynamicMetadata(const ProcessingResponse& response) { + setDynamicMetadata(encoder_callbacks_, encoding_state_, response); +} +void Filter::setDecoderDynamicMetadata(const ProcessingResponse& response) { + setDynamicMetadata(decoder_callbacks_, decoding_state_, response); +} + void Filter::onReceiveMessage(std::unique_ptr&& r) { if (processing_complete_) { ENVOY_LOG(debug, "Ignoring stream message received after processing complete"); @@ -693,21 +838,27 @@ void Filter::onReceiveMessage(std::unique_ptr&& r) { absl::Status processing_status; switch (response->response_case()) { case ProcessingResponse::ResponseCase::kRequestHeaders: + setDecoderDynamicMetadata(*response); processing_status = decoding_state_.handleHeadersResponse(response->request_headers()); break; case ProcessingResponse::ResponseCase::kResponseHeaders: + setEncoderDynamicMetadata(*response); processing_status = encoding_state_.handleHeadersResponse(response->response_headers()); break; case ProcessingResponse::ResponseCase::kRequestBody: + setDecoderDynamicMetadata(*response); processing_status = decoding_state_.handleBodyResponse(response->request_body()); break; case ProcessingResponse::ResponseCase::kResponseBody: + setEncoderDynamicMetadata(*response); processing_status = encoding_state_.handleBodyResponse(response->response_body()); break; case ProcessingResponse::ResponseCase::kRequestTrailers: + setDecoderDynamicMetadata(*response); processing_status = decoding_state_.handleTrailersResponse(response->request_trailers()); break; case ProcessingResponse::ResponseCase::kResponseTrailers: + setEncoderDynamicMetadata(*response); processing_status = encoding_state_.handleTrailersResponse(response->response_trailers()); break; case ProcessingResponse::ResponseCase::kImmediateResponse: @@ -717,6 +868,7 @@ void Filter::onReceiveMessage(std::unique_ptr&& r) { processing_status = absl::FailedPreconditionError("unhandled immediate response due to config disabled it"); } else { + setDecoderDynamicMetadata(*response); // We won't be sending anything more to the stream after we // receive this message. ENVOY_LOG(debug, "Sending immediate response"); @@ -906,7 +1058,7 @@ void Filter::mergePerRouteConfig() { ENVOY_LOG_MISC(debug, "Failed to retrieve the correct type of route specific filter config"); return; } - if (!merged_config) { + if (!merged_config.has_value()) { merged_config.emplace(*typed_cfg); } else { merged_config.emplace(FilterConfigPerRoute(merged_config.value(), *typed_cfg)); @@ -936,6 +1088,34 @@ void Filter::mergePerRouteConfig() { grpc_service_ = *merged_config->grpcService(); config_with_hash_key_.setConfig(*merged_config->grpcService()); } + + // For metadata namespaces, we only override the existing value if we have a + // value from our merged config. We indicate a lack of value from the merged + // config with absl::nullopt + + if (merged_config->untypedForwardingMetadataNamespaces().has_value()) { + untyped_forwarding_namespaces_ = merged_config->untypedForwardingMetadataNamespaces().value(); + ENVOY_LOG(trace, + "Setting new untyped forwarding metadata namespaces from per-route configuration"); + decoding_state_.setUntypedForwardingMetadataNamespaces(untyped_forwarding_namespaces_); + encoding_state_.setUntypedForwardingMetadataNamespaces(untyped_forwarding_namespaces_); + } + + if (merged_config->typedForwardingMetadataNamespaces().has_value()) { + typed_forwarding_namespaces_ = merged_config->typedForwardingMetadataNamespaces().value(); + ENVOY_LOG(trace, + "Setting new typed forwarding metadata namespaces from per-route configuration"); + decoding_state_.setTypedForwardingMetadataNamespaces(typed_forwarding_namespaces_); + encoding_state_.setTypedForwardingMetadataNamespaces(typed_forwarding_namespaces_); + } + + if (merged_config->untypedReceivingMetadataNamespaces().has_value()) { + untyped_receiving_namespaces_ = merged_config->untypedReceivingMetadataNamespaces().value(); + ENVOY_LOG(trace, + "Setting new untyped receiving metadata namespaces from per-route configuration"); + decoding_state_.setUntypedReceivingMetadataNamespaces(untyped_receiving_namespaces_); + encoding_state_.setUntypedReceivingMetadataNamespaces(untyped_receiving_namespaces_); + } } std::string responseCaseToString(const ProcessingResponse::ResponseCase response_case) { diff --git a/source/extensions/filters/http/ext_proc/ext_proc.h b/source/extensions/filters/http/ext_proc/ext_proc.h index 49954e6ac1e7..78d97cbe9bab 100644 --- a/source/extensions/filters/http/ext_proc/ext_proc.h +++ b/source/extensions/filters/http/ext_proc/ext_proc.h @@ -140,6 +140,15 @@ class FilterConfig { disable_immediate_response_(config.disable_immediate_response()), allowed_headers_(initHeaderMatchers(config.forward_rules().allowed_headers())), disallowed_headers_(initHeaderMatchers(config.forward_rules().disallowed_headers())), + untyped_forwarding_namespaces_( + config.metadata_options().forwarding_namespaces().untyped().begin(), + config.metadata_options().forwarding_namespaces().untyped().end()), + typed_forwarding_namespaces_( + config.metadata_options().forwarding_namespaces().typed().begin(), + config.metadata_options().forwarding_namespaces().typed().end()), + untyped_receiving_namespaces_( + config.metadata_options().receiving_namespaces().untyped().begin(), + config.metadata_options().receiving_namespaces().untyped().end()), expression_manager_(builder, local_info, config.request_attributes(), config.response_attributes()) {} @@ -173,6 +182,18 @@ class FilterConfig { const ExpressionManager& expressionManager() const { return expression_manager_; } + const std::vector& untypedForwardingMetadataNamespaces() const { + return untyped_forwarding_namespaces_; + } + + const std::vector& typedForwardingMetadataNamespaces() const { + return typed_forwarding_namespaces_; + } + + const std::vector& untypedReceivingMetadataNamespaces() const { + return untyped_receiving_namespaces_; + } + private: ExtProcFilterStats generateStats(const std::string& prefix, const std::string& filter_stats_prefix, Stats::Scope& scope) { @@ -198,6 +219,10 @@ class FilterConfig { // Empty disallowed_header_ means disallow nothing, i.e, allow all. const std::vector disallowed_headers_; + const std::vector untyped_forwarding_namespaces_; + const std::vector typed_forwarding_namespaces_; + const std::vector untyped_receiving_namespaces_; + const ExpressionManager expression_manager_; }; @@ -223,6 +248,17 @@ class FilterConfigPerRoute : public Router::RouteSpecificFilterConfig { return grpc_service_; } + const absl::optional>& + untypedForwardingMetadataNamespaces() const { + return untyped_forwarding_namespaces_; + } + const absl::optional>& typedForwardingMetadataNamespaces() const { + return typed_forwarding_namespaces_; + } + const absl::optional>& untypedReceivingMetadataNamespaces() const { + return untyped_receiving_namespaces_; + } + private: absl::optional initProcessingMode(const envoy::extensions::filters::http::ext_proc::v3::ExtProcPerRoute& config); @@ -230,6 +266,17 @@ class FilterConfigPerRoute : public Router::RouteSpecificFilterConfig { absl::optional initGrpcService(const envoy::extensions::filters::http::ext_proc::v3::ExtProcPerRoute& config); + std::vector initNamespaces(const Protobuf::RepeatedPtrField& ns); + + absl::optional> initUntypedForwardingNamespaces( + const envoy::extensions::filters::http::ext_proc::v3::ExtProcPerRoute& config); + + absl::optional> initTypedForwardingNamespaces( + const envoy::extensions::filters::http::ext_proc::v3::ExtProcPerRoute& config); + + absl::optional> initUntypedReceivingNamespaces( + const envoy::extensions::filters::http::ext_proc::v3::ExtProcPerRoute& config); + absl::optional mergeProcessingMode(const FilterConfigPerRoute& less_specific, const FilterConfigPerRoute& more_specific); @@ -238,6 +285,10 @@ class FilterConfigPerRoute : public Router::RouteSpecificFilterConfig { const absl::optional processing_mode_; const absl::optional grpc_service_; + + const absl::optional> untyped_forwarding_namespaces_; + const absl::optional> typed_forwarding_namespaces_; + const absl::optional> untyped_receiving_namespaces_; }; class Filter : public Logger::Loggable, @@ -260,8 +311,14 @@ class Filter : public Logger::Loggable, const envoy::config::core::v3::GrpcService& grpc_service) : config_(config), client_(std::move(client)), stats_(config->stats()), grpc_service_(grpc_service), config_with_hash_key_(grpc_service), - decoding_state_(*this, config->processingMode()), - encoding_state_(*this, config->processingMode()) {} + decoding_state_(*this, config->processingMode(), + config->untypedForwardingMetadataNamespaces(), + config->typedForwardingMetadataNamespaces(), + config->untypedReceivingMetadataNamespaces()), + encoding_state_(*this, config->processingMode(), + config->untypedForwardingMetadataNamespaces(), + config->typedForwardingMetadataNamespaces(), + config->untypedReceivingMetadataNamespaces()) {} const FilterConfig& config() const { return *config_; } @@ -303,6 +360,9 @@ class Filter : public Logger::Loggable, encoding_state_.callbackState() == ProcessorState::CallbackState::HeadersCallback); } + const ProcessorState& encodingState() { return encoding_state_; } + const ProcessorState& decodingState() { return decoding_state_; } + private: void mergePerRouteConfig(); StreamOpenState openStream(); @@ -320,6 +380,12 @@ class Filter : public Logger::Loggable, std::pair sendStreamChunk(ProcessorState& state); Http::FilterDataStatus onData(ProcessorState& state, Buffer::Instance& data, bool end_stream); Http::FilterTrailersStatus onTrailers(ProcessorState& state, Http::HeaderMap& trailers); + void setDynamicMetadata(Http::StreamFilterCallbacks* cb, const ProcessorState& state, + const envoy::service::ext_proc::v3::ProcessingResponse& response); + void setEncoderDynamicMetadata(const envoy::service::ext_proc::v3::ProcessingResponse& response); + void setDecoderDynamicMetadata(const envoy::service::ext_proc::v3::ProcessingResponse& response); + void addDynamicMetadata(const ProcessorState& state, + envoy::service::ext_proc::v3::ProcessingRequest& req); const FilterConfigSharedPtr config_; const ExternalProcessorClientPtr client_; @@ -347,6 +413,10 @@ class Filter : public Logger::Loggable, // Set to true when the mergePerRouteConfig() method has been called. bool route_config_merged_ = false; + + std::vector untyped_forwarding_namespaces_{}; + std::vector typed_forwarding_namespaces_{}; + std::vector untyped_receiving_namespaces_{}; }; extern std::string responseCaseToString( diff --git a/source/extensions/filters/http/ext_proc/processor_state.h b/source/extensions/filters/http/ext_proc/processor_state.h index c6c50fdecbbb..a3ebe27617f0 100644 --- a/source/extensions/filters/http/ext_proc/processor_state.h +++ b/source/extensions/filters/http/ext_proc/processor_state.h @@ -73,10 +73,16 @@ class ProcessorState : public Logger::Loggable { }; explicit ProcessorState(Filter& filter, - envoy::config::core::v3::TrafficDirection traffic_direction) + envoy::config::core::v3::TrafficDirection traffic_direction, + const std::vector& untyped_forwarding_namespaces, + const std::vector& typed_forwarding_namespaces, + const std::vector& untyped_receiving_namespaces) : filter_(filter), watermark_requested_(false), paused_(false), no_body_(false), complete_body_available_(false), trailers_available_(false), body_replaced_(false), - partial_body_processed_(false), traffic_direction_(traffic_direction) {} + partial_body_processed_(false), traffic_direction_(traffic_direction), + untyped_forwarding_namespaces_(&untyped_forwarding_namespaces), + typed_forwarding_namespaces_(&typed_forwarding_namespaces), + untyped_receiving_namespaces_(&untyped_receiving_namespaces) {} ProcessorState(const ProcessorState&) = delete; virtual ~ProcessorState() = default; ProcessorState& operator=(const ProcessorState&) = delete; @@ -100,6 +106,28 @@ class ProcessorState : public Logger::Loggable { virtual void setProcessingMode( const envoy::extensions::filters::http::ext_proc::v3::ProcessingMode& mode) PURE; + + const std::vector& untypedForwardingMetadataNamespaces() const { + return *untyped_forwarding_namespaces_; + }; + void setUntypedForwardingMetadataNamespaces(const std::vector& ns) { + untyped_forwarding_namespaces_ = &ns; + }; + + const std::vector& typedForwardingMetadataNamespaces() const { + return *typed_forwarding_namespaces_; + }; + void setTypedForwardingMetadataNamespaces(const std::vector& ns) { + typed_forwarding_namespaces_ = &ns; + }; + + const std::vector& untypedReceivingMetadataNamespaces() const { + return *untyped_receiving_namespaces_; + }; + void setUntypedReceivingMetadataNamespaces(const std::vector& ns) { + untyped_receiving_namespaces_ = &ns; + }; + bool sendHeaders() const { return send_headers_; } bool sendTrailers() const { return send_trailers_; } envoy::extensions::filters::http::ext_proc::v3::ProcessingMode_BodySendMode bodyMode() const { @@ -172,7 +200,7 @@ class ProcessorState : public Logger::Loggable { virtual envoy::service::ext_proc::v3::HttpTrailers* mutableTrailers(envoy::service::ext_proc::v3::ProcessingRequest& request) const PURE; - virtual StreamInfo::StreamInfo& streamInfo() PURE; + virtual Http::StreamFilterCallbacks* callbacks() const PURE; protected: void setBodyMode( @@ -218,6 +246,10 @@ class ProcessorState : public Logger::Loggable { absl::optional call_start_time_ = absl::nullopt; const envoy::config::core::v3::TrafficDirection traffic_direction_; + const std::vector* untyped_forwarding_namespaces_{}; + const std::vector* typed_forwarding_namespaces_{}; + const std::vector* untyped_receiving_namespaces_{}; + private: virtual void clearRouteCache(const envoy::service::ext_proc::v3::CommonResponse&) {} }; @@ -225,8 +257,13 @@ class ProcessorState : public Logger::Loggable { class DecodingProcessorState : public ProcessorState { public: explicit DecodingProcessorState( - Filter& filter, const envoy::extensions::filters::http::ext_proc::v3::ProcessingMode& mode) - : ProcessorState(filter, envoy::config::core::v3::TrafficDirection::INBOUND) { + Filter& filter, const envoy::extensions::filters::http::ext_proc::v3::ProcessingMode& mode, + const std::vector& untyped_forwarding_namespaces, + const std::vector& typed_forwarding_namespaces, + const std::vector& untyped_receiving_namespaces) + : ProcessorState(filter, envoy::config::core::v3::TrafficDirection::INBOUND, + untyped_forwarding_namespaces, typed_forwarding_namespaces, + untyped_receiving_namespaces) { setProcessingModeInternal(mode); } DecodingProcessorState(const DecodingProcessorState&) = delete; @@ -285,7 +322,7 @@ class DecodingProcessorState : public ProcessorState { void requestWatermark() override; void clearWatermark() override; - StreamInfo::StreamInfo& streamInfo() override { return decoder_callbacks_->streamInfo(); } + Http::StreamFilterCallbacks* callbacks() const override { return decoder_callbacks_; } private: void setProcessingModeInternal( @@ -300,8 +337,13 @@ class DecodingProcessorState : public ProcessorState { class EncodingProcessorState : public ProcessorState { public: explicit EncodingProcessorState( - Filter& filter, const envoy::extensions::filters::http::ext_proc::v3::ProcessingMode& mode) - : ProcessorState(filter, envoy::config::core::v3::TrafficDirection::OUTBOUND) { + Filter& filter, const envoy::extensions::filters::http::ext_proc::v3::ProcessingMode& mode, + const std::vector& untyped_forwarding_namespaces, + const std::vector& typed_forwarding_namespaces, + const std::vector& untyped_receiving_namespaces) + : ProcessorState(filter, envoy::config::core::v3::TrafficDirection::OUTBOUND, + untyped_forwarding_namespaces, typed_forwarding_namespaces, + untyped_receiving_namespaces) { setProcessingModeInternal(mode); } EncodingProcessorState(const EncodingProcessorState&) = delete; @@ -360,7 +402,7 @@ class EncodingProcessorState : public ProcessorState { void requestWatermark() override; void clearWatermark() override; - StreamInfo::StreamInfo& streamInfo() override { return encoder_callbacks_->streamInfo(); } + Http::StreamFilterCallbacks* callbacks() const override { return encoder_callbacks_; } private: void setProcessingModeInternal( diff --git a/test/extensions/filters/http/ext_proc/BUILD b/test/extensions/filters/http/ext_proc/BUILD index 0f00ed80ee5b..9d511fbd9d8a 100644 --- a/test/extensions/filters/http/ext_proc/BUILD +++ b/test/extensions/filters/http/ext_proc/BUILD @@ -58,6 +58,7 @@ envoy_extension_cc_test( "//test/mocks/runtime:runtime_mocks", "//test/mocks/server:factory_context_mocks", "//test/mocks/server:overload_manager_mocks", + "//test/proto:helloworld_proto_cc_proto", "//test/test_common:test_runtime_lib", "@envoy_api//envoy/config/core/v3:pkg_cc_proto", "@envoy_api//envoy/service/ext_proc/v3:pkg_cc_proto", @@ -136,7 +137,11 @@ envoy_extension_cc_test( "-DUSE_CEL_PARSER", ], }), - extension_names = ["envoy.filters.http.ext_proc"], + extension_names = [ + "envoy.filters.http.ext_proc", + # TODO(jbohanon) use a test filter here instead of production filter + "envoy.filters.http.set_metadata", + ], shard_count = 4, tags = [ "cpu:3", @@ -146,11 +151,15 @@ envoy_extension_cc_test( ":logging_test_filter_lib", ":utils_lib", "//source/extensions/filters/http/ext_proc:config", + "//source/extensions/filters/http/set_metadata:config", "//test/common/http:common_lib", "//test/integration:http_integration_lib", + "//test/integration/filters:stream_info_to_headers_filter_lib", + "//test/proto:helloworld_proto_cc_proto", "//test/test_common:test_runtime_lib", "//test/test_common:utility_lib", "@envoy_api//envoy/extensions/filters/http/ext_proc/v3:pkg_cc_proto", + "@envoy_api//envoy/extensions/filters/http/set_metadata/v3:pkg_cc_proto", "@envoy_api//envoy/service/ext_proc/v3:pkg_cc_proto", ], ) diff --git a/test/extensions/filters/http/ext_proc/config_test.cc b/test/extensions/filters/http/ext_proc/config_test.cc index 47e36558a260..fb982aee9b25 100644 --- a/test/extensions/filters/http/ext_proc/config_test.cc +++ b/test/extensions/filters/http/ext_proc/config_test.cc @@ -36,6 +36,15 @@ TEST(HttpExtProcConfigTest, CorrectConfig) { response_trailer_mode: send filter_metadata: hello: "world" + metadata_options: + forwarding_namespaces: + typed: + - ns1 + untyped: + - ns2 + receiving_namespaces: + untyped: + - ns2 )EOF"; ExternalProcessingFilterConfig factory; @@ -73,6 +82,15 @@ TEST(HttpExtProcConfigTest, CorrectConfigServerContext) { response_trailer_mode: send filter_metadata: hello: "world" + metadata_options: + forwarding_namespaces: + typed: + - ns1 + untyped: + - ns2 + receiving_namespaces: + untyped: + - ns2 )EOF"; ExternalProcessingFilterConfig factory; diff --git a/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc b/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc index 18cbbc77fc5d..d0becb1f2bab 100644 --- a/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc +++ b/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc @@ -1,6 +1,8 @@ #include +#include #include "envoy/extensions/filters/http/ext_proc/v3/ext_proc.pb.h" +#include "envoy/extensions/filters/http/set_metadata/v3/set_metadata.pb.h" #include "envoy/network/address.h" #include "envoy/service/ext_proc/v3/external_processor.pb.h" @@ -48,6 +50,7 @@ struct ConfigOptions { bool valid_grpc_server = true; bool add_logging_filter = false; bool http1_codec = false; + bool add_metadata = false; }; // These tests exercise the ext_proc filter through Envoy's integration test @@ -116,6 +119,38 @@ class ExtProcIntegrationTest : public HttpIntegrationTest, ext_proc_filter.mutable_typed_config()->PackFrom(proto_config_); config_helper_.prependFilter(MessageUtil::getJsonStringFromMessageOrError(ext_proc_filter)); + // Add set_metadata filter to inject dynamic metadata used for testing + if (config_option.add_metadata) { + envoy::config::listener::v3::Filter set_metadata_filter; + std::string set_metadata_filter_name = "envoy.filters.http.set_metadata"; + set_metadata_filter.set_name(set_metadata_filter_name); + + envoy::extensions::filters::http::set_metadata::v3::Config set_metadata_config; + auto* untyped_md = set_metadata_config.add_metadata(); + untyped_md->set_metadata_namespace("forwarding_ns_untyped"); + untyped_md->set_allow_overwrite(true); + ProtobufWkt::Struct test_md_val; + (*test_md_val.mutable_fields())["foo"].set_string_value("value from set_metadata"); + (*untyped_md->mutable_value()) = test_md_val; + + auto* typed_md = set_metadata_config.add_metadata(); + typed_md->set_metadata_namespace("forwarding_ns_typed"); + typed_md->set_allow_overwrite(true); + envoy::extensions::filters::http::set_metadata::v3::Metadata typed_md_to_stuff; + typed_md_to_stuff.set_metadata_namespace("typed_value from set_metadata"); + typed_md->mutable_typed_value()->PackFrom(typed_md_to_stuff); + + set_metadata_filter.mutable_typed_config()->PackFrom(set_metadata_config); + config_helper_.prependFilter( + MessageUtil::getJsonStringFromMessageOrError(set_metadata_filter)); + + // Add filter that dumps streamInfo into headers so we can check our receiving + // namespaces + config_helper_.prependFilter(fmt::format(R"EOF( + name: stream-info-to-headers-filter + )EOF")); + } + // Add logging test filter only in Envoy gRPC mode. // gRPC side stream logging is only supported in Envoy gRPC mode at the moment. if (clientType() == Grpc::ClientType::EnvoyGrpc && config_option.add_logging_filter && @@ -249,6 +284,26 @@ class ExtProcIntegrationTest : public HttpIntegrationTest, ASSERT_TRUE(processor_stream_->waitForGrpcMessage(*dispatcher_, request)); } + void processGenericMessage( + FakeUpstream& grpc_upstream, bool first_message, + absl::optional> cb) { + ProcessingRequest request; + if (first_message) { + ASSERT_TRUE(grpc_upstream.waitForHttpConnection(*dispatcher_, processor_connection_)); + ASSERT_TRUE(processor_connection_->waitForNewStream(*dispatcher_, processor_stream_)); + } + ASSERT_TRUE(processor_stream_->waitForGrpcMessage(*dispatcher_, request)); + ASSERT_TRUE(request.has_request_headers()); + if (first_message) { + processor_stream_->startGrpcStream(); + } + ProcessingResponse response; + const bool sendReply = !cb || (*cb)(request, response); + if (sendReply) { + processor_stream_->sendGrpcMessage(response); + } + } + void processRequestHeadersMessage( FakeUpstream& grpc_upstream, bool first_message, absl::optional> cb) { @@ -3307,6 +3362,72 @@ TEST_P(ExtProcIntegrationTest, SendBodyBufferedPartialWithTrailer) { verifyDownstreamResponse(*response, 200); } +TEST_P(ExtProcIntegrationTest, SendAndReceiveDynamicMetadata) { + proto_config_.mutable_processing_mode()->set_request_header_mode(ProcessingMode::SEND); + proto_config_.mutable_processing_mode()->set_response_header_mode(ProcessingMode::SKIP); + + auto* md_opts = proto_config_.mutable_metadata_options(); + md_opts->mutable_forwarding_namespaces()->add_untyped("forwarding_ns_untyped"); + md_opts->mutable_forwarding_namespaces()->add_typed("forwarding_ns_typed"); + md_opts->mutable_receiving_namespaces()->add_untyped("receiving_ns_untyped"); + + ConfigOptions config_option = {}; + config_option.add_metadata = true; + initializeConfig(config_option); + HttpIntegrationTest::initialize(); + + auto response = sendDownstreamRequest(absl::nullopt); + + ProtobufWkt::Struct test_md_struct; + (*test_md_struct.mutable_fields())["foo"].set_string_value("value from ext_proc"); + + ProtobufWkt::Value md_val; + *(md_val.mutable_struct_value()) = test_md_struct; + + processGenericMessage( + *grpc_upstreams_[0], true, [md_val](const ProcessingRequest& req, ProcessingResponse& resp) { + // Verify the processing request contains the untyped metadata we injected. + EXPECT_TRUE(req.metadata_context().filter_metadata().contains("forwarding_ns_untyped")); + const ProtobufWkt::Struct& fwd_metadata = + req.metadata_context().filter_metadata().at("forwarding_ns_untyped"); + EXPECT_EQ(1, fwd_metadata.fields_size()); + EXPECT_TRUE(fwd_metadata.fields().contains("foo")); + EXPECT_EQ("value from set_metadata", fwd_metadata.fields().at("foo").string_value()); + + // Verify the processing request contains the typed metadata we injected. + EXPECT_TRUE(req.metadata_context().typed_filter_metadata().contains("forwarding_ns_typed")); + const ProtobufWkt::Any& fwd_typed_metadata = + req.metadata_context().typed_filter_metadata().at("forwarding_ns_typed"); + EXPECT_EQ("type.googleapis.com/envoy.extensions.filters.http.set_metadata.v3.Metadata", + fwd_typed_metadata.type_url()); + envoy::extensions::filters::http::set_metadata::v3::Metadata typed_md_from_req; + fwd_typed_metadata.UnpackTo(&typed_md_from_req); + EXPECT_EQ("typed_value from set_metadata", typed_md_from_req.metadata_namespace()); + + // Spoof the response to contain receiving metadata. + HeadersResponse headers_resp; + (*resp.mutable_request_headers()) = headers_resp; + auto mut_md_fields = resp.mutable_dynamic_metadata()->mutable_fields(); + (*mut_md_fields).emplace("receiving_ns_untyped", md_val); + + return true; + }); + + handleUpstreamRequest(); + + ASSERT_TRUE(response->waitForEndStream()); + ASSERT_TRUE(response->complete()); + + // Verify the response received contains the headers from dynamic metadata we expect. + ASSERT_FALSE((*response).headers().empty()); + auto md_header_result = + (*response).headers().get(Http::LowerCaseString("receiving_ns_untyped.foo")); + ASSERT_EQ(1, md_header_result.size()); + EXPECT_EQ("value from ext_proc", md_header_result[0]->value().getStringView()); + + verifyDownstreamResponse(*response, 200); +} + #if defined(USE_CEL_PARSER) // Test the filter using the default configuration by connecting to // an ext_proc server that responds to the request_headers message diff --git a/test/extensions/filters/http/ext_proc/filter_test.cc b/test/extensions/filters/http/ext_proc/filter_test.cc index 7854a763dbac..907f52aef5c6 100644 --- a/test/extensions/filters/http/ext_proc/filter_test.cc +++ b/test/extensions/filters/http/ext_proc/filter_test.cc @@ -98,6 +98,16 @@ class HttpFilterTest : public testing::Test { EXPECT_CALL(decoder_callbacks_, dispatcher()).WillRepeatedly(ReturnRef(dispatcher_)); EXPECT_CALL(decoder_callbacks_, route()).WillRepeatedly(Return(route_)); EXPECT_CALL(decoder_callbacks_, streamInfo()).WillRepeatedly(ReturnRef(stream_info_)); + EXPECT_CALL(encoder_callbacks_, streamInfo()).WillRepeatedly(ReturnRef(stream_info_)); + EXPECT_CALL(stream_info_, dynamicMetadata()).WillRepeatedly(ReturnRef(dynamic_metadata_)); + EXPECT_CALL(stream_info_, setDynamicMetadata(_, _)) + .Times(AnyNumber()) + .WillRepeatedly(Invoke(this, &HttpFilterTest::doSetDynamicMetadata)); + + EXPECT_CALL(decoder_callbacks_, connection()) + .WillRepeatedly(Return(OptRef{connection_})); + EXPECT_CALL(encoder_callbacks_, connection()) + .WillRepeatedly(Return(OptRef{connection_})); // Pointing dispatcher_.time_system_ to a SimulatedTimeSystem object. test_time_ = new Envoy::Event::SimulatedTimeSystem(); @@ -173,8 +183,6 @@ class HttpFilterTest : public testing::Test { if (final_expected_grpc_service_.has_value()) { EXPECT_TRUE(TestUtility::protoEqual(final_expected_grpc_service_.value(), config_with_hash_key.config())); - std::cout << final_expected_grpc_service_.value().DebugString(); - std::cout << config_with_hash_key.config().DebugString(); } stream_callbacks_ = &callbacks; @@ -190,6 +198,10 @@ class HttpFilterTest : public testing::Test { return stream; } + void doSetDynamicMetadata(const std::string& ns, const ProtobufWkt::Struct& val) { + (*dynamic_metadata_.mutable_filter_metadata())[ns] = val; + }; + void doSend(ProcessingRequest&& request, Unused) { last_request_ = std::move(request); } bool doSendClose() { return !server_closed_stream_; } @@ -549,7 +561,7 @@ class HttpFilterTest : public testing::Test { ExternalProcessorCallbacks* stream_callbacks_ = nullptr; ProcessingRequest last_request_; bool server_closed_stream_ = false; - NiceMock stats_store_; + testing::NiceMock stats_store_; FilterConfigSharedPtr config_; std::shared_ptr filter_; testing::NiceMock dispatcher_; @@ -565,6 +577,8 @@ class HttpFilterTest : public testing::Test { std::vector timers_; TestScopedRuntime scoped_runtime_; Envoy::Event::SimulatedTimeSystem* test_time_; + envoy::config::core::v3::Metadata dynamic_metadata_; + testing::NiceMock connection_; testing::NiceMock local_info_; }; @@ -3107,6 +3121,538 @@ TEST_F(HttpFilterTest, ResponseTrailerMutationExceedSizeLimit) { EXPECT_EQ(1, config_->stats().rejected_header_mutations_.value()); } +TEST_F(HttpFilterTest, MetadataOptionsOverride) { + initialize(R"EOF( + grpc_service: + envoy_grpc: + cluster_name: "ext_proc_server" + processing_mode: + request_header_mode: "SKIP" + response_header_mode: "SEND" + request_body_mode: "NONE" + response_body_mode: "NONE" + request_trailer_mode: "SKIP" + response_trailer_mode: "SKIP" + metadata_options: + forwarding_namespaces: + untyped: + - untyped_ns_1 + typed: + - typed_ns_1 + receiving_namespaces: + untyped: + - untyped_receiving_ns_1 + )EOF"); + ExtProcPerRoute override_cfg; + const std::string override_yaml = R"EOF( + overrides: + metadata_options: + forwarding_namespaces: + untyped: + - untyped_ns_2 + typed: + - typed_ns_2 + receiving_namespaces: + untyped: + - untyped_receiving_ns_2 + )EOF"; + TestUtility::loadFromYaml(override_yaml, override_cfg); + + FilterConfigPerRoute route_config(override_cfg); + + EXPECT_CALL(decoder_callbacks_, traversePerFilterConfig(_)) + .WillOnce( + testing::Invoke([&](std::function cb) { + cb(route_config); + })); + + response_headers_.addCopy(LowerCaseString(":status"), "200"); + response_headers_.addCopy(LowerCaseString("content-type"), "text/plain"); + response_headers_.addCopy(LowerCaseString("content-length"), "3"); + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->encodeHeaders(response_headers_, false)); + processResponseHeaders(false, absl::nullopt); + + ASSERT_EQ(filter_->encodingState().untypedForwardingMetadataNamespaces().size(), 1); + EXPECT_EQ(filter_->encodingState().untypedForwardingMetadataNamespaces()[0], "untyped_ns_2"); + ASSERT_EQ(filter_->decodingState().untypedForwardingMetadataNamespaces().size(), 1); + EXPECT_EQ(filter_->decodingState().untypedForwardingMetadataNamespaces()[0], "untyped_ns_2"); + + ASSERT_EQ(filter_->encodingState().typedForwardingMetadataNamespaces().size(), 1); + EXPECT_EQ(filter_->decodingState().typedForwardingMetadataNamespaces()[0], "typed_ns_2"); + + ASSERT_EQ(filter_->encodingState().untypedReceivingMetadataNamespaces().size(), 1); + EXPECT_EQ(filter_->encodingState().untypedReceivingMetadataNamespaces()[0], + "untyped_receiving_ns_2"); + ASSERT_EQ(filter_->decodingState().untypedReceivingMetadataNamespaces().size(), 1); + EXPECT_EQ(filter_->decodingState().untypedReceivingMetadataNamespaces()[0], + "untyped_receiving_ns_2"); + + EXPECT_EQ(FilterTrailersStatus::Continue, filter_->encodeTrailers(response_trailers_)); + + filter_->onDestroy(); +} + +// Validate that when metadata options are not specified as an override, the less-specific +// namespaces lists are used. +TEST_F(HttpFilterTest, MetadataOptionsNoOverride) { + initialize(R"EOF( + grpc_service: + envoy_grpc: + cluster_name: "ext_proc_server" + processing_mode: + request_header_mode: "SKIP" + response_header_mode: "SEND" + request_body_mode: "NONE" + response_body_mode: "NONE" + request_trailer_mode: "SKIP" + response_trailer_mode: "SKIP" + metadata_options: + forwarding_namespaces: + untyped: + - untyped_ns_1 + typed: + - typed_ns_1 + receiving_namespaces: + untyped: + - untyped_receiving_ns_1 + )EOF"); + ExtProcPerRoute override_cfg; + const std::string override_yaml = R"EOF( + overrides: {} + )EOF"; + TestUtility::loadFromYaml(override_yaml, override_cfg); + + FilterConfigPerRoute route_config(override_cfg); + + EXPECT_CALL(decoder_callbacks_, traversePerFilterConfig(_)) + .WillOnce( + testing::Invoke([&](std::function cb) { + cb(route_config); + })); + + response_headers_.addCopy(LowerCaseString(":status"), "200"); + response_headers_.addCopy(LowerCaseString("content-type"), "text/plain"); + response_headers_.addCopy(LowerCaseString("content-length"), "3"); + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->encodeHeaders(response_headers_, false)); + processResponseHeaders(false, absl::nullopt); + + ASSERT_EQ(filter_->encodingState().untypedForwardingMetadataNamespaces().size(), 1); + EXPECT_EQ(filter_->encodingState().untypedForwardingMetadataNamespaces()[0], "untyped_ns_1"); + ASSERT_EQ(filter_->decodingState().untypedForwardingMetadataNamespaces().size(), 1); + EXPECT_EQ(filter_->decodingState().untypedForwardingMetadataNamespaces()[0], "untyped_ns_1"); + + ASSERT_EQ(filter_->encodingState().typedForwardingMetadataNamespaces().size(), 1); + EXPECT_EQ(filter_->decodingState().typedForwardingMetadataNamespaces()[0], "typed_ns_1"); + + ASSERT_EQ(filter_->encodingState().untypedReceivingMetadataNamespaces().size(), 1); + EXPECT_EQ(filter_->encodingState().untypedReceivingMetadataNamespaces()[0], + "untyped_receiving_ns_1"); + ASSERT_EQ(filter_->decodingState().untypedReceivingMetadataNamespaces().size(), 1); + EXPECT_EQ(filter_->decodingState().untypedReceivingMetadataNamespaces()[0], + "untyped_receiving_ns_1"); + + EXPECT_EQ(FilterTrailersStatus::Continue, filter_->encodeTrailers(response_trailers_)); + + filter_->onDestroy(); +} + +// Verify that the filter sets the processing request with dynamic metadata +// including when the metadata is on the connection stream info +TEST_F(HttpFilterTest, SendDynamicMetadata) { + initialize(R"EOF( + grpc_service: + envoy_grpc: + cluster_name: "ext_proc_server" + processing_mode: + request_header_mode: "SKIP" + response_header_mode: "SEND" + request_body_mode: "NONE" + response_body_mode: "NONE" + request_trailer_mode: "SKIP" + response_trailer_mode: "SKIP" + metadata_options: + forwarding_namespaces: + untyped: + - connection.and.request.have.data + - connection.has.data + - request.has.data + - neither.have.data + - untyped.and.typed.connection.data + - typed.connection.data + - untyped.connection.data + typed: + - untyped.and.typed.connection.data + - typed.connection.data + - typed.request.data + - untyped.connection.data + )EOF"); + + const std::string request_yaml = R"EOF( + filter_metadata: + connection.and.request.have.data: + data: request + request.has.data: + data: request + typed_filter_metadata: + typed.request.data: + # We are using ExtProcOverrides just because we know it is built and imported already. + '@type': type.googleapis.com/envoy.extensions.filters.http.ext_proc.v3.ExtProcOverrides + request_attributes: + - request_typed + )EOF"; + + const std::string connection_yaml = R"EOF( + filter_metadata: + connection.and.request.have.data: + data: connection_untyped + connection.has.data: + data: connection_untyped + untyped.and.typed.connection.data: + data: connection_untyped + untyped.connection.data: + data: connection_untyped + not.selected.data: + data: connection_untyped + typed_filter_metadata: + untyped.and.typed.connection.data: + # We are using ExtProcOverrides just because we know it is built and imported already. + '@type': type.googleapis.com/envoy.extensions.filters.http.ext_proc.v3.ExtProcOverrides + request_attributes: + - connection_typed + typed.connection.data: + # We are using ExtProcOverrides just because we know it is built and imported already. + '@type': type.googleapis.com/envoy.extensions.filters.http.ext_proc.v3.ExtProcOverrides + request_attributes: + - connection_typed + not.selected.data: + # We are using ExtProcOverrides just because we know it is built and imported already. + '@type': type.googleapis.com/envoy.extensions.filters.http.ext_proc.v3.ExtProcOverrides + request_attributes: + - connection_typed + )EOF"; + + envoy::config::core::v3::Metadata connection_metadata; + TestUtility::loadFromYaml(request_yaml, dynamic_metadata_); + TestUtility::loadFromYaml(connection_yaml, connection_metadata); + connection_.stream_info_.metadata_ = connection_metadata; + + Buffer::OwnedImpl empty_chunk; + + EXPECT_EQ(FilterHeadersStatus::Continue, filter_->decodeHeaders(request_headers_, true)); + EXPECT_EQ(FilterTrailersStatus::Continue, filter_->decodeTrailers(request_trailers_)); + + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->encodeHeaders(response_headers_, false)); + + // ensure the metadata that is attached to the processing request is identical to + // the metadata we specified above + EXPECT_EQ("request", last_request_.metadata_context() + .filter_metadata() + .at("connection.and.request.have.data") + .fields() + .at("data") + .string_value()); + + EXPECT_EQ("request", last_request_.metadata_context() + .filter_metadata() + .at("request.has.data") + .fields() + .at("data") + .string_value()); + + EXPECT_EQ("connection_untyped", last_request_.metadata_context() + .filter_metadata() + .at("connection.has.data") + .fields() + .at("data") + .string_value()); + + EXPECT_EQ("connection_untyped", last_request_.metadata_context() + .filter_metadata() + .at("untyped.and.typed.connection.data") + .fields() + .at("data") + .string_value()); + + EXPECT_EQ(0, last_request_.metadata_context().filter_metadata().count("neither.have.data")); + + EXPECT_EQ(0, last_request_.metadata_context().filter_metadata().count("not.selected.data")); + + EXPECT_EQ(0, last_request_.metadata_context().filter_metadata().count("typed.connection.data")); + + envoy::extensions::filters::http::ext_proc::v3::ExtProcOverrides typed_any; + last_request_.metadata_context() + .typed_filter_metadata() + .at("typed.connection.data") + .UnpackTo(&typed_any); + ASSERT_EQ(1, typed_any.request_attributes().size()); + EXPECT_EQ("connection_typed", typed_any.request_attributes()[0]); + + last_request_.metadata_context() + .typed_filter_metadata() + .at("untyped.and.typed.connection.data") + .UnpackTo(&typed_any); + ASSERT_EQ(1, typed_any.request_attributes().size()); + EXPECT_EQ("connection_typed", typed_any.request_attributes()[0]); + + EXPECT_EQ( + 0, last_request_.metadata_context().typed_filter_metadata().count("untyped.connection.data")); + + EXPECT_EQ(0, last_request_.metadata_context().typed_filter_metadata().count("not.selected.data")); + + processResponseHeaders(false, absl::nullopt); + + EXPECT_EQ(FilterTrailersStatus::Continue, filter_->encodeTrailers(response_trailers_)); + + filter_->onDestroy(); +} + +// Verify that when returning an response with dynamic_metadata field set, the filter emits +// dynamic metadata. +TEST_F(HttpFilterTest, EmitDynamicMetadata) { + // Configure the filter to only pass response headers to ext server. + initialize(R"EOF( + grpc_service: + envoy_grpc: + cluster_name: "ext_proc_server" + processing_mode: + request_header_mode: "SKIP" + response_header_mode: "SEND" + request_body_mode: "NONE" + response_body_mode: "NONE" + request_trailer_mode: "SKIP" + response_trailer_mode: "SKIP" + metadata_options: + receiving_namespaces: + untyped: + - envoy.filters.http.ext_proc + )EOF"); + + Buffer::OwnedImpl empty_chunk; + + EXPECT_EQ(FilterHeadersStatus::Continue, filter_->decodeHeaders(request_headers_, false)); + EXPECT_EQ(FilterDataStatus::Continue, filter_->decodeData(empty_chunk, true)); + EXPECT_EQ(FilterTrailersStatus::Continue, filter_->decodeTrailers(request_trailers_)); + + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->encodeHeaders(response_headers_, false)); + + processResponseHeaders(false, [](const HttpHeaders&, ProcessingResponse& resp, HeadersResponse&) { + ProtobufWkt::Struct foobar; + (*foobar.mutable_fields())["foo"].set_string_value("bar"); + auto metadata_mut = resp.mutable_dynamic_metadata()->mutable_fields(); + auto mut_struct = (*metadata_mut)["envoy.filters.http.ext_proc"].mutable_struct_value(); + *mut_struct = foobar; + }); + + EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(empty_chunk, true)); + EXPECT_EQ(FilterTrailersStatus::Continue, filter_->encodeTrailers(response_trailers_)); + + EXPECT_EQ("bar", dynamic_metadata_.filter_metadata() + .at("envoy.filters.http.ext_proc") + .fields() + .at("foo") + .string_value()); + + filter_->onDestroy(); +} + +// Verify that when returning an response with dynamic_metadata field set, the filter emits +// dynamic metadata to namespaces other than its own. +TEST_F(HttpFilterTest, EmitDynamicMetadataArbitraryNamespace) { + // Configure the filter to only pass response headers to ext server. + initialize(R"EOF( + grpc_service: + envoy_grpc: + cluster_name: "ext_proc_server" + processing_mode: + request_header_mode: "SKIP" + response_header_mode: "SEND" + request_body_mode: "NONE" + response_body_mode: "NONE" + request_trailer_mode: "SKIP" + response_trailer_mode: "SKIP" + metadata_options: + receiving_namespaces: + untyped: + - envoy.filters.http.ext_authz + )EOF"); + + Buffer::OwnedImpl empty_chunk; + + EXPECT_EQ(FilterHeadersStatus::Continue, filter_->decodeHeaders(request_headers_, false)); + EXPECT_EQ(FilterDataStatus::Continue, filter_->decodeData(empty_chunk, true)); + EXPECT_EQ(FilterTrailersStatus::Continue, filter_->decodeTrailers(request_trailers_)); + + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->encodeHeaders(response_headers_, false)); + + processResponseHeaders(false, [](const HttpHeaders&, ProcessingResponse& resp, HeadersResponse&) { + ProtobufWkt::Struct foobar; + (*foobar.mutable_fields())["foo"].set_string_value("bar"); + auto metadata_mut = resp.mutable_dynamic_metadata()->mutable_fields(); + auto mut_struct = (*metadata_mut)["envoy.filters.http.ext_authz"].mutable_struct_value(); + *mut_struct = foobar; + }); + + EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(empty_chunk, true)); + EXPECT_EQ(FilterTrailersStatus::Continue, filter_->encodeTrailers(response_trailers_)); + + EXPECT_EQ("bar", dynamic_metadata_.filter_metadata() + .at("envoy.filters.http.ext_authz") + .fields() + .at("foo") + .string_value()); + + filter_->onDestroy(); +} + +// Verify that when returning an response with dynamic_metadata field set, the +// filter does not emit metadata when no allowed namespaces are configured. +TEST_F(HttpFilterTest, DisableEmitDynamicMetadata) { + initialize(R"EOF( + grpc_service: + envoy_grpc: + cluster_name: "ext_proc_server" + processing_mode: + request_header_mode: "SEND" + response_header_mode: "SEND" + request_body_mode: "NONE" + response_body_mode: "NONE" + request_trailer_mode: "SKIP" + response_trailer_mode: "SKIP" + )EOF"); + + Buffer::OwnedImpl empty_chunk; + + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, false)); + processRequestHeaders(false, absl::nullopt); + + EXPECT_EQ(FilterDataStatus::Continue, filter_->decodeData(empty_chunk, true)); + EXPECT_EQ(FilterTrailersStatus::Continue, filter_->decodeTrailers(request_trailers_)); + + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->encodeHeaders(response_headers_, false)); + + processResponseHeaders(false, [](const HttpHeaders&, ProcessingResponse& resp, HeadersResponse&) { + ProtobufWkt::Struct foobar; + (*foobar.mutable_fields())["foo"].set_string_value("bar"); + auto metadata_mut = resp.mutable_dynamic_metadata()->mutable_fields(); + auto mut_struct = (*metadata_mut)["envoy.filters.http.ext_proc"].mutable_struct_value(); + *mut_struct = foobar; + }); + + EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(empty_chunk, true)); + EXPECT_EQ(FilterTrailersStatus::Continue, filter_->encodeTrailers(response_trailers_)); + + EXPECT_EQ(0, dynamic_metadata_.filter_metadata().size()); + + filter_->onDestroy(); +} + +// Verify that when returning an response with dynamic_metadata field set, the +// filter does not emit metadata to namespaces which are not allowed. +TEST_F(HttpFilterTest, DisableEmittingDynamicMetadataToDisallowedNamespaces) { + initialize(R"EOF( + grpc_service: + envoy_grpc: + cluster_name: "ext_proc_server" + processing_mode: + request_header_mode: "SEND" + response_header_mode: "SEND" + request_body_mode: "NONE" + response_body_mode: "NONE" + request_trailer_mode: "SKIP" + response_trailer_mode: "SKIP" + metadata_options: + receiving_namespaces: + untyped: + - envoy.filters.http.ext_proc + )EOF"); + + Buffer::OwnedImpl empty_chunk; + + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, false)); + processRequestHeaders(false, absl::nullopt); + + EXPECT_EQ(FilterDataStatus::Continue, filter_->decodeData(empty_chunk, true)); + EXPECT_EQ(FilterTrailersStatus::Continue, filter_->decodeTrailers(request_trailers_)); + + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->encodeHeaders(response_headers_, false)); + + processResponseHeaders(false, [](const HttpHeaders&, ProcessingResponse& resp, HeadersResponse&) { + ProtobufWkt::Struct foobar; + (*foobar.mutable_fields())["foo"].set_string_value("bar"); + auto metadata_mut = resp.mutable_dynamic_metadata()->mutable_fields(); + auto mut_struct = (*metadata_mut)["envoy.filters.http.ext_authz"].mutable_struct_value(); + *mut_struct = foobar; + }); + + EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(empty_chunk, true)); + EXPECT_EQ(FilterTrailersStatus::Continue, filter_->encodeTrailers(response_trailers_)); + + EXPECT_EQ(0, dynamic_metadata_.filter_metadata().size()); + + filter_->onDestroy(); +} + +// Verify that when returning an response with dynamic_metadata field set, the filter emits +// dynamic metadata and later emissions overwrite earlier ones. +TEST_F(HttpFilterTest, EmitDynamicMetadataUseLast) { + // Configure the filter to only pass response headers to ext server. + initialize(R"EOF( + grpc_service: + envoy_grpc: + cluster_name: "ext_proc_server" + processing_mode: + request_header_mode: "SEND" + response_header_mode: "SEND" + request_body_mode: "NONE" + response_body_mode: "NONE" + request_trailer_mode: "SKIP" + response_trailer_mode: "SKIP" + metadata_options: + receiving_namespaces: + untyped: + - envoy.filters.http.ext_proc + )EOF"); + + Buffer::OwnedImpl empty_chunk; + + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, false)); + processRequestHeaders(false, [](const HttpHeaders&, ProcessingResponse& resp, HeadersResponse&) { + ProtobufWkt::Struct batbaz; + (*batbaz.mutable_fields())["bat"].set_string_value("baz"); + auto metadata_mut = resp.mutable_dynamic_metadata()->mutable_fields(); + auto mut_struct = (*metadata_mut)["envoy.filters.http.ext_proc"].mutable_struct_value(); + *mut_struct = batbaz; + }); + EXPECT_EQ(FilterDataStatus::Continue, filter_->decodeData(empty_chunk, true)); + EXPECT_EQ(FilterTrailersStatus::Continue, filter_->decodeTrailers(request_trailers_)); + + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->encodeHeaders(response_headers_, false)); + + processResponseHeaders(false, [](const HttpHeaders&, ProcessingResponse& resp, HeadersResponse&) { + ProtobufWkt::Struct foobar; + (*foobar.mutable_fields())["foo"].set_string_value("bar"); + auto metadata_mut = resp.mutable_dynamic_metadata()->mutable_fields(); + auto mut_struct = (*metadata_mut)["envoy.filters.http.ext_proc"].mutable_struct_value(); + *mut_struct = foobar; + }); + + EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(empty_chunk, true)); + EXPECT_EQ(FilterTrailersStatus::Continue, filter_->encodeTrailers(response_trailers_)); + + EXPECT_FALSE(dynamic_metadata_.filter_metadata() + .at("envoy.filters.http.ext_proc") + .fields() + .contains("bat")); + + EXPECT_EQ("bar", dynamic_metadata_.filter_metadata() + .at("envoy.filters.http.ext_proc") + .fields() + .at("foo") + .string_value()); + + filter_->onDestroy(); +} + class HttpFilter2Test : public HttpFilterTest, public ::Envoy::Http::HttpConnectionManagerImplMixin {}; diff --git a/test/integration/filters/BUILD b/test/integration/filters/BUILD index 867bca9cef3e..a81d62141edd 100644 --- a/test/integration/filters/BUILD +++ b/test/integration/filters/BUILD @@ -913,6 +913,7 @@ envoy_cc_test_library( "//envoy/http:filter_interface", "//envoy/registry", "//envoy/server:filter_config_interface", + "//source/common/protobuf", "//source/extensions/filters/http/common:pass_through_filter_lib", "//test/extensions/filters/http/common:empty_http_filter_config_lib", ], diff --git a/test/integration/filters/stream_info_to_headers_filter.cc b/test/integration/filters/stream_info_to_headers_filter.cc index 41077688ff15..489e4d561440 100644 --- a/test/integration/filters/stream_info_to_headers_filter.cc +++ b/test/integration/filters/stream_info_to_headers_filter.cc @@ -1,6 +1,7 @@ #include "envoy/registry/registry.h" #include "envoy/server/filter_config.h" +#include "source/common/protobuf/protobuf.h" #include "source/extensions/filters/http/common/pass_through_filter.h" #include "test/extensions/filters/http/common/empty_http_filter_config.h" @@ -15,6 +16,38 @@ std::string toUsec(MonotonicTime time) { return absl::StrCat(time.time_since_epo } // namespace +void addValueHeaders(Http::ResponseHeaderMap& headers, std::string key_prefix, + const ProtobufWkt::Value& val) { + switch (val.kind_case()) { + case ProtobufWkt::Value::kNullValue: + headers.addCopy(Http::LowerCaseString(key_prefix), "null"); + break; + case ProtobufWkt::Value::kNumberValue: + headers.addCopy(Http::LowerCaseString(key_prefix), std::to_string(val.number_value())); + break; + case ProtobufWkt::Value::kStringValue: + headers.addCopy(Http::LowerCaseString(key_prefix), val.string_value()); + break; + case ProtobufWkt::Value::kBoolValue: + headers.addCopy(Http::LowerCaseString(key_prefix), val.bool_value() ? "true" : "false"); + break; + case ProtobufWkt::Value::kListValue: { + const auto& vals = val.list_value().values(); + for (auto i = 0; i < vals.size(); ++i) { + addValueHeaders(headers, key_prefix + "." + std::to_string(i), vals[i]); + } + break; + } + case ProtobufWkt::Value::kStructValue: + for (const auto& field : val.struct_value().fields()) { + addValueHeaders(headers, key_prefix + "." + field.first, field.second); + } + break; + default: + break; + } +} + // A filter that sticks stream info into headers for integration testing. class StreamInfoToHeadersFilter : public Http::PassThroughFilter { public: @@ -97,6 +130,17 @@ class StreamInfoToHeadersFilter : public Http::PassThroughFilter { upstream_timing.connectionPoolCallbackLatency().value().count()); } } + + if (decoder_callbacks_->streamInfo().dynamicMetadata().filter_metadata_size() > 0) { + const auto& md = decoder_callbacks_->streamInfo().dynamicMetadata().filter_metadata(); + for (const auto& md_entry : md) { + std::string key_prefix = md_entry.first; + for (const auto& field : md_entry.second.fields()) { + addValueHeaders(headers, key_prefix + "." + field.first, field.second); + } + } + } + return Http::FilterHeadersStatus::Continue; } };