From 709d1c31749a6ba2eab2865927f10300570ac533 Mon Sep 17 00:00:00 2001 From: chaoqin-li1123 <55518381+chaoqin-li1123@users.noreply.github.com> Date: Wed, 9 Sep 2020 10:45:08 -0500 Subject: [PATCH] On demand loading of ScopedRouteConfiguration (#12640) Add a field to the current protobuf of ScopedRouteConfiguration to enable on demand scoped route table loading. The on demand scope route tables will be loaded lazily. The lazy loading feature of route table associated with scope is achieved by extending the current vhds on_demand filter to support lazy loading of RouteConfigurationscoped route discovery service.If a scoped route configuration is set to be loaded lazily, upon a http request using SRDS, when the corresponding route table of a scope is not found, post a callback to control plane, request the route table from the management server, after the route table has been initialized, continue the filter chain. https://docs.google.com/document/d/15GX30U5CH2bsWUyQRkiiQ_nbMCoklvgP_ObrDaSlkuc/edit?usp=sharing Risk Level: Low Testing: add unit tests and integration test to verifiy behavior changes Fixes #10641 Signed-off-by: chaoqinli --- api/envoy/config/route/v3/scoped_route.proto | 3 + .../config/route/v4alpha/scoped_route.proto | 3 + .../http_filters/on_demand_updates_filter.rst | 14 +- .../intro/arch_overview/http/http_routing.rst | 3 +- .../envoy/config/route/v3/scoped_route.proto | 3 + .../config/route/v4alpha/scoped_route.proto | 3 + include/envoy/http/filter.h | 9 - include/envoy/router/scopes.h | 78 ++++ source/common/http/BUILD | 1 + source/common/http/async_client_impl.h | 1 - source/common/http/conn_manager_impl.cc | 73 ++- source/common/http/conn_manager_impl.h | 61 +-- source/common/http/filter_manager.cc | 3 +- source/common/http/filter_manager.h | 5 +- source/common/router/scoped_config_impl.cc | 9 + source/common/router/scoped_config_impl.h | 73 +-- source/common/router/scoped_rds.cc | 161 ++++++- source/common/router/scoped_rds.h | 72 ++- .../http/on_demand/on_demand_update.cc | 16 +- .../filters/http/on_demand/on_demand_update.h | 1 + test/common/grpc/grpc_client_integration.h | 2 +- test/common/router/scoped_rds_test.cc | 441 +++++++++++++++++- .../http/on_demand/on_demand_filter_test.cc | 6 +- test/integration/BUILD | 2 +- .../scoped_rds_integration_test.cc | 318 ++++++++++++- test/test_common/utility.h | 13 + 26 files changed, 1194 insertions(+), 180 deletions(-) diff --git a/api/envoy/config/route/v3/scoped_route.proto b/api/envoy/config/route/v3/scoped_route.proto index f2b28ed974c0..d6611b0b1d06 100644 --- a/api/envoy/config/route/v3/scoped_route.proto +++ b/api/envoy/config/route/v3/scoped_route.proto @@ -104,6 +104,9 @@ message ScopedRouteConfiguration { repeated Fragment fragments = 1 [(validate.rules).repeated = {min_items: 1}]; } + // Whether the RouteConfiguration should be loaded on demand. + bool on_demand = 4; + // The name assigned to the routing scope. string name = 1 [(validate.rules).string = {min_bytes: 1}]; diff --git a/api/envoy/config/route/v4alpha/scoped_route.proto b/api/envoy/config/route/v4alpha/scoped_route.proto index b1f6915c161b..33fc756a60a4 100644 --- a/api/envoy/config/route/v4alpha/scoped_route.proto +++ b/api/envoy/config/route/v4alpha/scoped_route.proto @@ -104,6 +104,9 @@ message ScopedRouteConfiguration { repeated Fragment fragments = 1 [(validate.rules).repeated = {min_items: 1}]; } + // Whether the RouteConfiguration should be loaded on demand. + bool on_demand = 4; + // The name assigned to the routing scope. string name = 1 [(validate.rules).string = {min_bytes: 1}]; diff --git a/docs/root/configuration/http/http_filters/on_demand_updates_filter.rst b/docs/root/configuration/http/http_filters/on_demand_updates_filter.rst index d856d3e7597c..256fd634da1b 100644 --- a/docs/root/configuration/http/http_filters/on_demand_updates_filter.rst +++ b/docs/root/configuration/http/http_filters/on_demand_updates_filter.rst @@ -1,15 +1,21 @@ .. _config_http_filters_on_demand: -On-demand VHDS Updates -====================== +On-demand VHDS and S/RDS Updates +================================ -The on-demand VHDS filter is used to request a :ref:`virtual host ` +The on demand filter can be used to support either on demand VHDS or S/RDS update if configured in the filter chain. + +The on-demand update filter can be used to request a :ref:`virtual host ` data if it's not already present in the :ref:`Route Configuration `. The contents of the *Host* or *:authority* header is used to create the on-demand request. For an on-demand request to be created, :ref:`VHDS ` must be enabled and either *Host* or *:authority* header be present. -On-demand VHDS cannot be used with SRDS at this point. +The on-demand update filter can also be used to request a *Route Configuration* data if RouteConfiguration is specified to be +loaded on demand in the :ref:`Scoped RouteConfiguration `. +The contents of the HTTP header is used to find the scope and create the on-demand request. + +On-demand VHDS and on-demand S/RDS can not be used at the same time at this point. Configuration ------------- diff --git a/docs/root/intro/arch_overview/http/http_routing.rst b/docs/root/intro/arch_overview/http/http_routing.rst index 884963dfcb80..c133890b159e 100644 --- a/docs/root/intro/arch_overview/http/http_routing.rst +++ b/docs/root/intro/arch_overview/http/http_routing.rst @@ -59,10 +59,11 @@ Route Scope Scoped routing enables Envoy to put constraints on search space of domains and route rules. A :ref:`Route Scope` associates a key with a :ref:`route table `. For each request, a scope key is computed dynamically by the HTTP connection manager to pick the :ref:`route table`. +RouteConfiguration associated with scope can be loaded on demand with :ref:`v3 API reference ` configured and on demand filed in protobuf set to true. The Scoped RDS (SRDS) API contains a set of :ref:`Scopes ` resources, each defining independent routing configuration, along with a :ref:`ScopeKeyBuilder ` -defining the key construction algorithm used by Envoy to look up the scope corresponding to each request. +defining the key construction algorithm used by Envoy to look up the scope corresponding to each request. For example, for the following scoped route configuration, Envoy will look into the "addr" header value, split the header value by ";" first, and use the first value for key 'x-foo-key' as the scope key. If the "addr" header value is "foo=1;x-foo-key=127.0.0.1;x-bar-key=1.1.1.1", then "127.0.0.1" will be computed as the scope key to look up for corresponding route configuration. diff --git a/generated_api_shadow/envoy/config/route/v3/scoped_route.proto b/generated_api_shadow/envoy/config/route/v3/scoped_route.proto index f2b28ed974c0..d6611b0b1d06 100644 --- a/generated_api_shadow/envoy/config/route/v3/scoped_route.proto +++ b/generated_api_shadow/envoy/config/route/v3/scoped_route.proto @@ -104,6 +104,9 @@ message ScopedRouteConfiguration { repeated Fragment fragments = 1 [(validate.rules).repeated = {min_items: 1}]; } + // Whether the RouteConfiguration should be loaded on demand. + bool on_demand = 4; + // The name assigned to the routing scope. string name = 1 [(validate.rules).string = {min_bytes: 1}]; diff --git a/generated_api_shadow/envoy/config/route/v4alpha/scoped_route.proto b/generated_api_shadow/envoy/config/route/v4alpha/scoped_route.proto index b1f6915c161b..33fc756a60a4 100644 --- a/generated_api_shadow/envoy/config/route/v4alpha/scoped_route.proto +++ b/generated_api_shadow/envoy/config/route/v4alpha/scoped_route.proto @@ -104,6 +104,9 @@ message ScopedRouteConfiguration { repeated Fragment fragments = 1 [(validate.rules).repeated = {min_items: 1}]; } + // Whether the RouteConfiguration should be loaded on demand. + bool on_demand = 4; + // The name assigned to the routing scope. string name = 1 [(validate.rules).string = {min_bytes: 1}]; diff --git a/include/envoy/http/filter.h b/include/envoy/http/filter.h index 1bb611a2a1b1..440792171783 100644 --- a/include/envoy/http/filter.h +++ b/include/envoy/http/filter.h @@ -523,15 +523,6 @@ class StreamDecoderFilterCallbacks : public virtual StreamFilterCallbacks { */ virtual void requestRouteConfigUpdate(RouteConfigUpdatedCallbackSharedPtr route_config_updated_cb) PURE; - - /** - * - * @return absl::optional. Contains a value if a non-scoped RDS - * route config provider is used. Scoped RDS provides are not supported at the moment, as - * retrieval of a route configuration in their case requires passing of http request headers - * as a parameter. - */ - virtual absl::optional routeConfig() PURE; }; /** diff --git a/include/envoy/router/scopes.h b/include/envoy/router/scopes.h index 9eda8e8e3d9b..a7949facb92e 100644 --- a/include/envoy/router/scopes.h +++ b/include/envoy/router/scopes.h @@ -8,6 +8,77 @@ namespace Envoy { namespace Router { +/** + * Scope key fragment base class. + */ +class ScopeKeyFragmentBase { +public: + bool operator!=(const ScopeKeyFragmentBase& other) const { return !(*this == other); } + + bool operator==(const ScopeKeyFragmentBase& other) const { + if (typeid(*this) == typeid(other)) { + return hash() == other.hash(); + } + return false; + } + virtual ~ScopeKeyFragmentBase() = default; + + // Hash of the fragment. + virtual uint64_t hash() const PURE; +}; + +/** + * Scope Key is composed of non-null fragments. + **/ +class ScopeKey { +public: + ScopeKey() = default; + ScopeKey(ScopeKey&& other) = default; + + // Scopekey is not copy-assignable and copy-constructible as it contains unique_ptr inside itself. + ScopeKey(const ScopeKey&) = delete; + ScopeKey operator=(const ScopeKey&) = delete; + + // Caller should guarantee the fragment is not nullptr. + void addFragment(std::unique_ptr&& fragment) { + ASSERT(fragment != nullptr, "null fragment not allowed in ScopeKey."); + updateHash(*fragment); + fragments_.emplace_back(std::move(fragment)); + } + + uint64_t hash() const { return hash_; } + bool operator!=(const ScopeKey& other) const; + bool operator==(const ScopeKey& other) const; + +private: + // Update the key's hash with the new fragment hash. + void updateHash(const ScopeKeyFragmentBase& fragment) { + std::stringbuf buffer; + buffer.sputn(reinterpret_cast(&hash_), sizeof(hash_)); + const auto& fragment_hash = fragment.hash(); + buffer.sputn(reinterpret_cast(&fragment_hash), sizeof(fragment_hash)); + hash_ = HashUtil::xxHash64(buffer.str()); + } + + uint64_t hash_{0}; + std::vector> fragments_; +}; + +using ScopeKeyPtr = std::unique_ptr; + +// String fragment. +class StringKeyFragment : public ScopeKeyFragmentBase { +public: + explicit StringKeyFragment(absl::string_view value) + : value_(value), hash_(HashUtil::xxHash64(value_)) {} + + uint64_t hash() const override { return hash_; } + +private: + const std::string value_; + const uint64_t hash_; +}; + /** * The scoped routing configuration. */ @@ -22,6 +93,13 @@ class ScopedConfig : public Envoy::Config::ConfigProvider::Config { * @return ConfigConstSharedPtr the router's Config matching the request headers. */ virtual ConfigConstSharedPtr getRouteConfig(const Http::HeaderMap& headers) const PURE; + + /** + * Based on the incoming HTTP request headers, returns the hash value of its scope key. + * @param headers the request headers to match the scoped routing configuration against. + * @return unique_ptr of the scope key computed from header. + */ + virtual ScopeKeyPtr computeScopeKey(const Http::HeaderMap&) const { return {}; } }; using ScopedConfigConstSharedPtr = std::shared_ptr; diff --git a/source/common/http/BUILD b/source/common/http/BUILD index 55324b89cc6a..580a30ac64bc 100644 --- a/source/common/http/BUILD +++ b/source/common/http/BUILD @@ -240,6 +240,7 @@ envoy_cc_library( "//source/common/http/http3:well_known_names", "//source/common/network:utility_lib", "//source/common/router:config_lib", + "//source/common/router:scoped_rds_lib", "//source/common/stats:timespan_lib", "//source/common/stream_info:stream_info_lib", "//source/common/tracing:http_tracer_lib", diff --git a/source/common/http/async_client_impl.h b/source/common/http/async_client_impl.h index a365320cdf3c..1653599d9f27 100644 --- a/source/common/http/async_client_impl.h +++ b/source/common/http/async_client_impl.h @@ -86,7 +86,6 @@ class AsyncStreamImpl : public AsyncClient::Stream, void requestRouteConfigUpdate(Http::RouteConfigUpdatedCallbackSharedPtr) override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; } - absl::optional routeConfig() override { return {}; } // Http::AsyncClient::Stream void sendHeaders(RequestHeaderMap& headers, bool end_stream) override; diff --git a/source/common/http/conn_manager_impl.cc b/source/common/http/conn_manager_impl.cc index d9f59fcc234b..b21b1d78a339 100644 --- a/source/common/http/conn_manager_impl.cc +++ b/source/common/http/conn_manager_impl.cc @@ -480,13 +480,61 @@ void ConnectionManagerImpl::chargeTracingStats(const Tracing::Reason& tracing_re } } +// TODO(chaoqin-li1123): Make on demand vhds and on demand srds works at the same time. void ConnectionManagerImpl::RdsRouteConfigUpdateRequester::requestRouteConfigUpdate( - const std::string host_header, Event::Dispatcher& thread_local_dispatcher, + Http::RouteConfigUpdatedCallbackSharedPtr route_config_updated_cb) { + absl::optional route_config = parent_.routeConfig(); + Event::Dispatcher& thread_local_dispatcher = + parent_.connection_manager_.read_callbacks_->connection().dispatcher(); + if (route_config.has_value() && route_config.value()->usesVhds()) { + ASSERT(!parent_.request_headers_->Host()->value().empty()); + const auto& host_header = absl::AsciiStrToLower(parent_.request_headers_->getHostValue()); + requestVhdsUpdate(host_header, thread_local_dispatcher, std::move(route_config_updated_cb)); + return; + } else if (parent_.snapped_scoped_routes_config_ != nullptr) { + Router::ScopeKeyPtr scope_key = + parent_.snapped_scoped_routes_config_->computeScopeKey(*parent_.request_headers_); + // If scope_key is not null, the scope exists but RouteConfiguration is not initialized. + if (scope_key != nullptr) { + requestSrdsUpdate(std::move(scope_key), thread_local_dispatcher, + std::move(route_config_updated_cb)); + return; + } + } + // Continue the filter chain if no on demand update is requested. + (*route_config_updated_cb)(false); +} + +void ConnectionManagerImpl::RdsRouteConfigUpdateRequester::requestVhdsUpdate( + const std::string& host_header, Event::Dispatcher& thread_local_dispatcher, Http::RouteConfigUpdatedCallbackSharedPtr route_config_updated_cb) { route_config_provider_->requestVirtualHostsUpdate(host_header, thread_local_dispatcher, std::move(route_config_updated_cb)); } +void ConnectionManagerImpl::RdsRouteConfigUpdateRequester::requestSrdsUpdate( + Router::ScopeKeyPtr scope_key, Event::Dispatcher& thread_local_dispatcher, + Http::RouteConfigUpdatedCallbackSharedPtr route_config_updated_cb) { + // Since inline scope_route_config_provider is not fully implemented and never used, + // dynamic cast in constructor always succeed and the pointer should not be null here. + ASSERT(scoped_route_config_provider_ != nullptr); + Http::RouteConfigUpdatedCallback scoped_route_config_updated_cb = + Http::RouteConfigUpdatedCallback( + [this, weak_route_config_updated_cb = std::weak_ptr( + route_config_updated_cb)](bool scope_exist) { + // If the callback can be locked, this ActiveStream is still alive. + if (auto cb = weak_route_config_updated_cb.lock()) { + // Refresh the route before continue the filter chain. + if (scope_exist) { + parent_.refreshCachedRoute(); + } + (*cb)(scope_exist && parent_.hasCachedRoute()); + } + }); + scoped_route_config_provider_->onDemandRdsUpdate(std::move(scope_key), thread_local_dispatcher, + std::move(scoped_route_config_updated_cb)); +} + ConnectionManagerImpl::ActiveStream::ActiveStream(ConnectionManagerImpl& connection_manager, uint32_t buffer_limit) : connection_manager_(connection_manager), @@ -520,11 +568,12 @@ ConnectionManagerImpl::ActiveStream::ActiveStream(ConnectionManagerImpl& connect connection_manager.config_.routeConfigProvider() != nullptr) { route_config_update_requester_ = std::make_unique( - connection_manager.config_.routeConfigProvider()); + connection_manager.config_.routeConfigProvider(), *this); } else if (connection_manager_.config_.isRoutable() && connection_manager.config_.scopedRouteConfigProvider() != nullptr) { route_config_update_requester_ = - std::make_unique(); + std::make_unique( + connection_manager.config_.scopedRouteConfigProvider(), *this); } ScopeTrackerScopeState scope(this, connection_manager_.read_callbacks_->connection().dispatcher()); @@ -1138,21 +1187,18 @@ void ConnectionManagerImpl::ActiveStream::refreshCachedTracingCustomTags() { } } +// TODO(chaoqin-li1123): Make on demand vhds and on demand srds works at the same time. void ConnectionManagerImpl::ActiveStream::requestRouteConfigUpdate( - Event::Dispatcher& thread_local_dispatcher, Http::RouteConfigUpdatedCallbackSharedPtr route_config_updated_cb) { - ASSERT(!request_headers_->Host()->value().empty()); - const auto& host_header = absl::AsciiStrToLower(request_headers_->getHostValue()); - route_config_update_requester_->requestRouteConfigUpdate(host_header, thread_local_dispatcher, - std::move(route_config_updated_cb)); + route_config_update_requester_->requestRouteConfigUpdate(route_config_updated_cb); } absl::optional ConnectionManagerImpl::ActiveStream::routeConfig() { - if (connection_manager_.config_.routeConfigProvider() == nullptr) { - return {}; + if (connection_manager_.config_.routeConfigProvider() != nullptr) { + return absl::optional( + connection_manager_.config_.routeConfigProvider()->config()); } - return absl::optional( - connection_manager_.config_.routeConfigProvider()->config()); + return {}; } void ConnectionManagerImpl::ActiveStream::onLocalReply(Code code) { @@ -1196,7 +1242,8 @@ void ConnectionManagerImpl::ActiveStream::encodeHeaders(ResponseHeaderMap& heade connection_manager_.config_.dateProvider().setDateHeader(headers); } - // Following setReference() is safe because serverName() is constant for the life of the listener. + // Following setReference() is safe because serverName() is constant for the life of the + // listener. const auto transformation = connection_manager_.config_.serverHeaderTransformation(); if (transformation == ConnectionManagerConfig::HttpConnectionManagerProto::OVERWRITE || (transformation == ConnectionManagerConfig::HttpConnectionManagerProto::APPEND_IF_ABSENT && diff --git a/source/common/http/conn_manager_impl.h b/source/common/http/conn_manager_impl.h index aa3b5cdf6739..b84a814120f1 100644 --- a/source/common/http/conn_manager_impl.h +++ b/source/common/http/conn_manager_impl.h @@ -43,6 +43,7 @@ #include "common/http/user_agent.h" #include "common/http/utility.h" #include "common/local_reply/local_reply.h" +#include "common/router/scoped_rds.h" #include "common/stream_info/stream_info_impl.h" #include "common/tracing/http_tracer_impl.h" @@ -112,34 +113,35 @@ class ConnectionManagerImpl : Logger::Loggable, private: struct ActiveStream; - // Used to abstract making of RouteConfig update request. - // RdsRouteConfigUpdateRequester is used when an RdsRouteConfigProvider is configured, - // NullRouteConfigUpdateRequester is used in all other cases (specifically when - // ScopedRdsConfigProvider/InlineScopedRoutesConfigProvider is configured) - class RouteConfigUpdateRequester { + class RdsRouteConfigUpdateRequester { public: - virtual ~RouteConfigUpdateRequester() = default; - virtual void requestRouteConfigUpdate(const std::string, Event::Dispatcher&, - Http::RouteConfigUpdatedCallbackSharedPtr) { - NOT_IMPLEMENTED_GCOVR_EXCL_LINE; - }; - }; - - class RdsRouteConfigUpdateRequester : public RouteConfigUpdateRequester { - public: - RdsRouteConfigUpdateRequester(Router::RouteConfigProvider* route_config_provider) - : route_config_provider_(route_config_provider) {} - void requestRouteConfigUpdate( - const std::string host_header, Event::Dispatcher& thread_local_dispatcher, - Http::RouteConfigUpdatedCallbackSharedPtr route_config_updated_cb) override; + RdsRouteConfigUpdateRequester(Router::RouteConfigProvider* route_config_provider, + ActiveStream& parent) + : route_config_provider_(route_config_provider), parent_(parent) {} + + RdsRouteConfigUpdateRequester(Config::ConfigProvider* scoped_route_config_provider, + ActiveStream& parent) + // Expect the dynamic cast to succeed because only ScopedRdsConfigProvider is fully + // implemented. Inline provider will be cast to nullptr here but it is not full implemented + // and can't not be used at this point. Should change this implementation if we have a + // functional inline scope route provider in the future. + : scoped_route_config_provider_( + dynamic_cast(scoped_route_config_provider)), + parent_(parent) {} + + void + requestRouteConfigUpdate(Http::RouteConfigUpdatedCallbackSharedPtr route_config_updated_cb); + void requestVhdsUpdate(const std::string& host_header, + Event::Dispatcher& thread_local_dispatcher, + Http::RouteConfigUpdatedCallbackSharedPtr route_config_updated_cb); + void requestSrdsUpdate(Router::ScopeKeyPtr scope_key, + Event::Dispatcher& thread_local_dispatcher, + Http::RouteConfigUpdatedCallbackSharedPtr route_config_updated_cb); private: Router::RouteConfigProvider* route_config_provider_; - }; - - class NullRouteConfigUpdateRequester : public RouteConfigUpdateRequester { - public: - NullRouteConfigUpdateRequester() = default; + Router::ScopedRdsConfigProvider* scoped_route_config_provider_; + ActiveStream& parent_; }; /** @@ -267,9 +269,6 @@ class ConnectionManagerImpl : Logger::Loggable, Router::RouteConstSharedPtr route(const Router::RouteCallback& cb) override; void clearRouteCache() override; absl::optional routeConfig() override; - void requestRouteConfigUpdate( - Event::Dispatcher& thread_local_dispatcher, - Http::RouteConfigUpdatedCallbackSharedPtr route_config_updated_cb) override; Tracing::Span& activeSpan() override; void onResponseDataTooLarge() override; void onRequestDataTooLarge() override; @@ -286,6 +285,8 @@ class ConnectionManagerImpl : Logger::Loggable, void refreshCachedRoute(); void refreshCachedRoute(const Router::RouteCallback& cb); + void requestRouteConfigUpdate( + Http::RouteConfigUpdatedCallbackSharedPtr route_config_updated_cb) override; void refreshCachedTracingCustomTags(); @@ -362,8 +363,10 @@ class ConnectionManagerImpl : Logger::Loggable, absl::optional cached_route_; absl::optional cached_cluster_info_; const std::string* decorated_operation_{nullptr}; - std::unique_ptr route_config_update_requester_; + std::unique_ptr route_config_update_requester_; std::unique_ptr tracing_custom_tags_{nullptr}; + + friend FilterManager; }; using ActiveStreamPtr = std::unique_ptr; @@ -431,4 +434,4 @@ class ConnectionManagerImpl : Logger::Loggable, }; } // namespace Http -} // namespace Envoy +} // namespace Envoy \ No newline at end of file diff --git a/source/common/http/filter_manager.cc b/source/common/http/filter_manager.cc index 35c82287465f..d81cdc00fdaa 100644 --- a/source/common/http/filter_manager.cc +++ b/source/common/http/filter_manager.cc @@ -1308,8 +1308,7 @@ Network::Socket::OptionsSharedPtr ActiveStreamDecoderFilter::getUpstreamSocketOp void ActiveStreamDecoderFilter::requestRouteConfigUpdate( Http::RouteConfigUpdatedCallbackSharedPtr route_config_updated_cb) { - parent_.filter_manager_callbacks_.requestRouteConfigUpdate(dispatcher(), - std::move(route_config_updated_cb)); + parent_.filter_manager_callbacks_.requestRouteConfigUpdate(std::move(route_config_updated_cb)); } absl::optional ActiveStreamDecoderFilter::routeConfig() { diff --git a/source/common/http/filter_manager.h b/source/common/http/filter_manager.h index 9423e0a8ca4f..7af9c11ecfd5 100644 --- a/source/common/http/filter_manager.h +++ b/source/common/http/filter_manager.h @@ -204,7 +204,7 @@ struct ActiveStreamDecoderFilter : public ActiveStreamFilterBase, void requestRouteConfigUpdate( Http::RouteConfigUpdatedCallbackSharedPtr route_config_updated_cb) override; - absl::optional routeConfig() override; + absl::optional routeConfig(); StreamDecoderFilterSharedPtr handle_; bool is_grpc_request_{}; @@ -429,8 +429,7 @@ class FilterManagerCallbacks { * Update the current route configuration. */ virtual void - requestRouteConfigUpdate(Event::Dispatcher& thread_local_dispatcher, - Http::RouteConfigUpdatedCallbackSharedPtr route_config_updated_cb) PURE; + requestRouteConfigUpdate(Http::RouteConfigUpdatedCallbackSharedPtr route_config_updated_cb) PURE; /** * Returns the current active span. diff --git a/source/common/router/scoped_config_impl.cc b/source/common/router/scoped_config_impl.cc index 8cef0a7e4a30..ef8c7e612471 100644 --- a/source/common/router/scoped_config_impl.cc +++ b/source/common/router/scoped_config_impl.cc @@ -153,5 +153,14 @@ ScopedConfigImpl::getRouteConfig(const Http::HeaderMap& headers) const { return nullptr; } +ScopeKeyPtr ScopedConfigImpl::computeScopeKey(const Http::HeaderMap& headers) const { + ScopeKeyPtr scope_key = scope_key_builder_.computeScopeKey(headers); + if (scope_key && + scoped_route_info_by_key_.find(scope_key->hash()) != scoped_route_info_by_key_.end()) { + return scope_key; + } + return nullptr; +} + } // namespace Router } // namespace Envoy diff --git a/source/common/router/scoped_config_impl.h b/source/common/router/scoped_config_impl.h index 5a1703caf82c..17c6847de79d 100644 --- a/source/common/router/scoped_config_impl.h +++ b/source/common/router/scoped_config_impl.h @@ -22,77 +22,6 @@ namespace Router { using envoy::extensions::filters::network::http_connection_manager::v3::ScopedRoutes; -/** - * Scope key fragment base class. - */ -class ScopeKeyFragmentBase { -public: - bool operator!=(const ScopeKeyFragmentBase& other) const { return !(*this == other); } - - bool operator==(const ScopeKeyFragmentBase& other) const { - if (typeid(*this) == typeid(other)) { - return hash() == other.hash(); - } - return false; - } - virtual ~ScopeKeyFragmentBase() = default; - - // Hash of the fragment. - virtual uint64_t hash() const PURE; -}; - -/** - * Scope Key is composed of non-null fragments. - **/ -class ScopeKey { -public: - ScopeKey() = default; - ScopeKey(ScopeKey&& other) = default; - - // Scopekey is not copy-assignable and copy-constructible as it contains unique_ptr inside itself. - ScopeKey(const ScopeKey&) = delete; - ScopeKey operator=(const ScopeKey&) = delete; - - // Caller should guarantee the fragment is not nullptr. - void addFragment(std::unique_ptr&& fragment) { - ASSERT(fragment != nullptr, "null fragment not allowed in ScopeKey."); - updateHash(*fragment); - fragments_.emplace_back(std::move(fragment)); - } - - uint64_t hash() const { return hash_; } - bool operator!=(const ScopeKey& other) const; - bool operator==(const ScopeKey& other) const; - -private: - // Update the key's hash with the new fragment hash. - void updateHash(const ScopeKeyFragmentBase& fragment) { - std::stringbuf buffer; - buffer.sputn(reinterpret_cast(&hash_), sizeof(hash_)); - const auto& fragment_hash = fragment.hash(); - buffer.sputn(reinterpret_cast(&fragment_hash), sizeof(fragment_hash)); - hash_ = HashUtil::xxHash64(buffer.str()); - } - - uint64_t hash_{0}; - std::vector> fragments_; -}; - -using ScopeKeyPtr = std::unique_ptr; - -// String fragment. -class StringKeyFragment : public ScopeKeyFragmentBase { -public: - explicit StringKeyFragment(absl::string_view value) - : value_(value), hash_(HashUtil::xxHash64(value_)) {} - - uint64_t hash() const override { return hash_; } - -private: - const std::string value_; - const uint64_t hash_; -}; - /** * Base class for fragment builders. */ @@ -190,6 +119,8 @@ class ScopedConfigImpl : public ScopedConfig { // Envoy::Router::ScopedConfig Router::ConfigConstSharedPtr getRouteConfig(const Http::HeaderMap& headers) const override; + // The return value is not null only if the scope corresponding to the header exists. + ScopeKeyPtr computeScopeKey(const Http::HeaderMap& headers) const override; private: ScopeKeyBuilderImpl scope_key_builder_; diff --git a/source/common/router/scoped_rds.cc b/source/common/router/scoped_rds.cc index 8d7c7566b764..d9ca4781e7b5 100644 --- a/source/common/router/scoped_rds.cc +++ b/source/common/router/scoped_rds.cc @@ -105,7 +105,7 @@ ScopedRdsConfigSubscription::ScopedRdsConfigSubscription( factory_context.messageValidationContext().dynamicValidationVisitor(), "name"), factory_context_(factory_context), name_(name), scope_key_builder_(scope_key_builder), scope_(factory_context.scope().createScope(stat_prefix + "scoped_rds." + name + ".")), - stats_({ALL_SCOPED_RDS_STATS(POOL_COUNTER(*scope_))}), + stats_({ALL_SCOPED_RDS_STATS(POOL_COUNTER(*scope_), POOL_GAUGE(*scope_))}), rds_config_source_(std::move(rds_config_source)), stat_prefix_(stat_prefix), route_config_provider_manager_(route_config_provider_manager) { const auto resource_name = getResourceName(); @@ -121,19 +121,102 @@ ScopedRdsConfigSubscription::ScopedRdsConfigSubscription( }); } +// Constructor for RdsRouteConfigProviderHelper when scope is eager loading. +// Initialize RdsRouteConfigProvider by default. ScopedRdsConfigSubscription::RdsRouteConfigProviderHelper::RdsRouteConfigProviderHelper( ScopedRdsConfigSubscription& parent, std::string scope_name, envoy::extensions::filters::network::http_connection_manager::v3::Rds& rds, Init::Manager& init_manager) - : parent_(parent), scope_name_(scope_name), - route_provider_(std::dynamic_pointer_cast( - parent_.route_config_provider_manager_.createRdsRouteConfigProvider( - rds, parent_.factory_context_, parent_.stat_prefix_, init_manager))), + : parent_(parent), scope_name_(scope_name), on_demand_(false) { + initRdsConfigProvider(rds, init_manager); +} + +// Constructor for RdsRouteConfigProviderHelper when scope is on demand. +// Leave the RdsRouteConfigProvider uninitialized. +ScopedRdsConfigSubscription::RdsRouteConfigProviderHelper::RdsRouteConfigProviderHelper( + ScopedRdsConfigSubscription& parent, std::string scope_name) + : parent_(parent), scope_name_(scope_name), on_demand_(true) { + parent_.stats_.on_demand_scopes_.inc(); +} + +// When on demand callback is received from main thread, there are 4 cases. +// 1. Scope is not found, post a scope not found callback back to worker thread. +// 2. Scope is found but route provider has not been initialized, create route provider. +// 3. After route provider has been initialized, if RouteConfiguration has been fetched, +// post scope found callback to worker thread. +// 4. After route provider has been initialized, if RouteConfiguration is null, +// cache the callback and wait for RouteConfiguration to come. +void ScopedRdsConfigSubscription::RdsRouteConfigProviderHelper::addOnDemandUpdateCallback( + std::function callback) { + // If RouteConfiguration has been initialized, run the callback to continue in filter chain, + // otherwise cache it and wait for the route table to be initialized. If RouteConfiguration hasn't + // been initialized, routeConfig() return a shared_ptr to NullConfigImpl. The name of + // NullConfigImpl is an empty string. + if (route_provider_ != nullptr && !routeConfig()->name().empty()) { + callback(); + return; + } + on_demand_update_callbacks_.push_back(callback); + // Initialize the rds provider if it has not been initialized. There is potential race here + // because other worker threads may also post callback to on demand update the RouteConfiguration + // associated with this scope. If rds provider has been initialized, just wait for + // RouteConfiguration to be updated. + maybeInitRdsConfigProvider(); +} + +void ScopedRdsConfigSubscription::RdsRouteConfigProviderHelper::runOnDemandUpdateCallback() { + for (auto& callback : on_demand_update_callbacks_) { + callback(); + } + on_demand_update_callbacks_.clear(); +} + +void ScopedRdsConfigSubscription::RdsRouteConfigProviderHelper::initRdsConfigProvider( + envoy::extensions::filters::network::http_connection_manager::v3::Rds& rds, + Init::Manager& init_manager) { + route_provider_ = std::dynamic_pointer_cast( + parent_.route_config_provider_manager_.createRdsRouteConfigProvider( + rds, parent_.factory_context_, parent_.stat_prefix_, init_manager)); + + rds_update_callback_handle_ = route_provider_->subscription().addUpdateCallback([this]() { + // Subscribe to RDS update. + parent_.onRdsConfigUpdate(scope_name_, route_provider_->subscription()); + }); + parent_.stats_.active_scopes_.inc(); +} - rds_update_callback_handle_(route_provider_->subscription().addUpdateCallback([this]() { - // Subscribe to RDS update. - parent_.onRdsConfigUpdate(scope_name_, route_provider_->subscription()); - })) {} +void ScopedRdsConfigSubscription::RdsRouteConfigProviderHelper::maybeInitRdsConfigProvider() { + // If the route provider have been initialized, return and wait for rds config update. + if (route_provider_ != nullptr) { + return; + } + + // Create a init_manager to create a rds provider. + // No transitive warming dependency here because only on demand update reach this point. + std::unique_ptr srds_init_mgr = + std::make_unique(fmt::format("SRDS on demand init manager.")); + std::unique_ptr srds_initialization_continuation = + std::make_unique([this, &srds_init_mgr] { + Init::WatcherImpl noop_watcher( + fmt::format("SRDS on demand ConfigUpdate watcher: {}", scope_name_), + []() { /*Do nothing.*/ }); + srds_init_mgr->initialize(noop_watcher); + }); + // Create route provider. + envoy::extensions::filters::network::http_connection_manager::v3::Rds rds; + rds.mutable_config_source()->MergeFrom(parent_.rds_config_source_); + rds.set_route_config_name( + parent_.scoped_route_map_[scope_name_]->configProto().route_configuration_name()); + initRdsConfigProvider(rds, *srds_init_mgr); + ENVOY_LOG(debug, fmt::format("Scope on demand update: {}", scope_name_)); + // If RouteConfiguration hasn't been initialized, routeConfig() return a shared_ptr to + // NullConfigImpl. The name of NullConfigImpl is an empty string. + if (routeConfig()->name().empty()) { + return; + } + // If RouteConfiguration has been initialized, apply update to all the threads. + parent_.onRdsConfigUpdate(scope_name_, route_provider_->subscription()); +} bool ScopedRdsConfigSubscription::addOrUpdateScopes( const std::vector& resources, Init::Manager& init_manager, @@ -148,14 +231,24 @@ bool ScopedRdsConfigSubscription::addOrUpdateScopes( dynamic_cast( resource.get().resource()); const std::string scope_name = scoped_route_config.name(); - // TODO(stevenzzz): Creating a new RdsRouteConfigProvider likely expensive, migrate RDS to - // config-provider-framework to make it light weight. rds.set_route_config_name(scoped_route_config.route_configuration_name()); - auto rds_config_provider_helper = - std::make_unique(*this, scope_name, rds, init_manager); - auto scoped_route_info = std::make_shared( - std::move(scoped_route_config), rds_config_provider_helper->routeConfig()); - route_provider_by_scope_.insert({scope_name, std::move(rds_config_provider_helper)}); + std::unique_ptr rds_config_provider_helper; + std::shared_ptr scoped_route_info = nullptr; + if (scoped_route_config.on_demand() == false) { + // For default scopes, create a rds helper with rds provider initialized. + rds_config_provider_helper = + std::make_unique(*this, scope_name, rds, init_manager); + scoped_route_info = std::make_shared( + std::move(scoped_route_config), rds_config_provider_helper->routeConfig()); + } else { + // For on demand scopes, create a rds helper with rds provider uninitialized. + rds_config_provider_helper = + std::make_unique(*this, scope_name); + // scope_route_info->routeConfig() will be nullptr, because RouteConfiguration is not loaded. + scoped_route_info = + std::make_shared(std::move(scoped_route_config), nullptr); + } + route_provider_by_scope_[scope_name] = std::move(rds_config_provider_helper); scope_name_by_hash_[scoped_route_info->scopeKey().hash()] = scoped_route_info->scopeName(); scoped_route_map_[scoped_route_info->scopeName()] = scoped_route_info; updated_scopes.push_back(scoped_route_info); @@ -164,6 +257,9 @@ bool ScopedRdsConfigSubscription::addOrUpdateScopes( scoped_route_info->scopeName(), version_info); } + // scoped_route_info of both eager loading and on demand scopes will be propagated to work + // threads. Upon a scoped RouteConfiguration miss, if the scope exists, an on demand update + // callback will be posted to main thread. if (!updated_scopes.empty()) { applyConfigUpdate([updated_scopes](ConfigProvider::ConfigConstSharedPtr config) -> ConfigProvider::ConfigConstSharedPtr { @@ -274,6 +370,7 @@ void ScopedRdsConfigSubscription::onConfigUpdate( if (any_applied) { setLastConfigInfo(absl::optional({absl::nullopt, version_info})); } + stats_.all_scopes_.set(scoped_route_map_.size()); stats_.config_reload_.inc(); } @@ -294,6 +391,8 @@ void ScopedRdsConfigSubscription::onRdsConfigUpdate(const std::string& scope_nam thread_local_scoped_config->addOrUpdateRoutingScopes({new_scoped_route_info}); return config; }); + // The data plane may wait for the route configuration to come back. + route_provider_by_scope_[scope_name]->runOnDemandUpdateCallback(); } // TODO(stevenzzzz): see issue #7508, consider generalizing this function as it overlaps with @@ -363,6 +462,36 @@ ScopedRdsConfigSubscription::detectUpdateConflictAndCleanupRemoved( return clean_removed_resources; } +void ScopedRdsConfigSubscription::onDemandRdsUpdate( + std::shared_ptr scope_key, Event::Dispatcher& thread_local_dispatcher, + Http::RouteConfigUpdatedCallback&& route_config_updated_cb, + std::weak_ptr weak_subscription) { + factory_context_.dispatcher().post([this, &thread_local_dispatcher, scope_key, + route_config_updated_cb, weak_subscription]() { + // If the subscription has been destroyed, return immediately. + if (!weak_subscription.lock()) { + thread_local_dispatcher.post([route_config_updated_cb] { route_config_updated_cb(false); }); + return; + } + + auto iter = scope_name_by_hash_.find(scope_key->hash()); + // Return to filter chain if we can't find the scope. + // The scope may have been destroyed when callback reach the main thread. + if (iter == scope_name_by_hash_.end()) { + thread_local_dispatcher.post([route_config_updated_cb] { route_config_updated_cb(false); }); + return; + } + // Wrap the thread local dispatcher inside the callback. + std::function thread_local_updated_callback = [route_config_updated_cb, + &thread_local_dispatcher]() { + thread_local_dispatcher.post([route_config_updated_cb] { route_config_updated_cb(true); }); + }; + std::string scope_name = iter->second; + // On demand initialization inside main thread. + route_provider_by_scope_[scope_name]->addOnDemandUpdateCallback(thread_local_updated_callback); + }); +} + ScopedRdsConfigProvider::ScopedRdsConfigProvider( ScopedRdsConfigSubscriptionSharedPtr&& subscription) : MutableConfigProviderCommonBase(std::move(subscription), ConfigProvider::ApiType::Delta) {} diff --git a/source/common/router/scoped_rds.h b/source/common/router/scoped_rds.h index b00ab1a4ef8a..c3cdb7557047 100644 --- a/source/common/router/scoped_rds.h +++ b/source/common/router/scoped_rds.h @@ -39,6 +39,11 @@ Envoy::Config::ConfigProviderPtr create( class ScopedRoutesConfigProviderManager; // A ConfigProvider for inline scoped routing configuration. +// InlineScopedRoutesConfigProvider is not fully implemented at this point. It doesn't load +// ScopedRouteConfigurations and propagate them to worker threads. If +// InlineScopedRoutesConfigProvider is fully implemented, when it is loading +// ScopedRouteConfiguration, the on demand field should be ignored and all scopes should be loaded +// eagerly. class InlineScopedRoutesConfigProvider : public Envoy::Config::ImmutableConfigProviderBase { public: InlineScopedRoutesConfigProvider(ProtobufTypes::ConstMessagePtrVector&& config_protos, @@ -77,14 +82,22 @@ class InlineScopedRoutesConfigProvider : public Envoy::Config::ImmutableConfigPr * All SRDS stats. @see stats_macros.h */ // clang-format off -#define ALL_SCOPED_RDS_STATS(COUNTER) \ +#define ALL_SCOPED_RDS_STATS(COUNTER, GAUGE) \ COUNTER(config_reload) \ - COUNTER(update_empty) + COUNTER(update_empty) \ + GAUGE(all_scopes, Accumulate) \ + GAUGE(on_demand_scopes, Accumulate) \ + GAUGE(active_scopes, Accumulate) // clang-format on struct ScopedRdsStats { - ALL_SCOPED_RDS_STATS(GENERATE_COUNTER_STRUCT) + ALL_SCOPED_RDS_STATS(GENERATE_COUNTER_STRUCT, GENERATE_GAUGE_STRUCT) + + static ScopedRdsStats generateStats(const std::string& prefix, Stats::Scope& scope) { + return ScopedRdsStats{ + ALL_SCOPED_RDS_STATS(POOL_COUNTER_PREFIX(scope, prefix), POOL_GAUGE_PREFIX(scope, prefix))}; + } }; // A scoped RDS subscription to be used with the dynamic scoped RDS ConfigProvider. @@ -111,6 +124,12 @@ class ScopedRdsConfigSubscription const ScopedRouteMap& scopedRouteMap() const { return scoped_route_map_; } + void + onDemandRdsUpdate(std::shared_ptr scope_key, + Event::Dispatcher& thread_local_dispatcher, + Http::RouteConfigUpdatedCallback&& route_config_updated_cb, + std::weak_ptr weak_subscription); + private: // A helper class that takes care of the life cycle management of a RDS route provider and the // update callback handle. @@ -119,15 +138,42 @@ class ScopedRdsConfigSubscription ScopedRdsConfigSubscription& parent, std::string scope_name, envoy::extensions::filters::network::http_connection_manager::v3::Rds& rds, Init::Manager& init_manager); - ~RdsRouteConfigProviderHelper() { rds_update_callback_handle_->remove(); } + + RdsRouteConfigProviderHelper(ScopedRdsConfigSubscription& parent, std::string scope_name); + + ~RdsRouteConfigProviderHelper() { + // Only remove the rds update when the rds provider has been initialized. + if (route_provider_) { + rds_update_callback_handle_->remove(); + parent_.stats_.active_scopes_.dec(); + } + if (on_demand_) { + parent_.stats_.on_demand_scopes_.dec(); + } + } ConfigConstSharedPtr routeConfig() { return route_provider_->config(); } + void addOnDemandUpdateCallback(std::function callback); + + // Runs all the callback from worker thread to continue filter chain. + void runOnDemandUpdateCallback(); + + // If route provider has not been initialized, initialize it. + void maybeInitRdsConfigProvider(); + + // Initialize route provider and register for rds update. + void initRdsConfigProvider( + envoy::extensions::filters::network::http_connection_manager::v3::Rds& rds, + Init::Manager& init_manager); + ScopedRdsConfigSubscription& parent_; std::string scope_name_; + bool on_demand_; RdsRouteConfigProviderImplSharedPtr route_provider_; // This handle_ is owned by the route config provider's RDS subscription, when the helper // destructs, the handle is deleted as well. Common::CallbackHandle* rds_update_callback_handle_; + std::vector> on_demand_update_callbacks_; }; using RdsRouteConfigProviderHelperPtr = std::unique_ptr; @@ -176,10 +222,6 @@ class ScopedRdsConfigSubscription // ScopedRouteInfo by scope name. ScopedRouteMap scoped_route_map_; - // RdsRouteConfigProvider by scope name. - absl::flat_hash_map route_provider_by_scope_; - // A map of (hash, scope-name), used to detect the key conflict between scopes. - absl::flat_hash_map scope_name_by_hash_; // For creating RDS subscriptions. Server::Configuration::ServerFactoryContext& factory_context_; const std::string name_; @@ -191,6 +233,11 @@ class ScopedRdsConfigSubscription const envoy::config::core::v3::ConfigSource rds_config_source_; const std::string stat_prefix_; RouteConfigProviderManager& route_config_provider_manager_; + + // RdsRouteConfigProvider by scope name. + absl::flat_hash_map route_provider_by_scope_; + // A map of (hash, scope-name), used to detect the key conflict between scopes. + absl::flat_hash_map scope_name_by_hash_; }; using ScopedRdsConfigSubscriptionSharedPtr = std::shared_ptr; @@ -201,9 +248,16 @@ class ScopedRdsConfigProvider : public Envoy::Config::MutableConfigProviderCommo public: ScopedRdsConfigProvider(ScopedRdsConfigSubscriptionSharedPtr&& subscription); - ScopedRdsConfigSubscription& subscription() { + ScopedRdsConfigSubscription& subscription() const { return *static_cast(subscription_.get()); } + void onDemandRdsUpdate(std::shared_ptr scope_key, + Event::Dispatcher& thread_local_dispatcher, + Http::RouteConfigUpdatedCallback&& route_config_updated_cb) const { + subscription().onDemandRdsUpdate( + std::move(scope_key), thread_local_dispatcher, std::move(route_config_updated_cb), + std::weak_ptr(subscription_)); + } }; // A ConfigProviderManager for scoped routing configuration that creates static/inline and dynamic diff --git a/source/extensions/filters/http/on_demand/on_demand_update.cc b/source/extensions/filters/http/on_demand/on_demand_update.cc index da5b2ec6bc10..ef75595fb7ff 100644 --- a/source/extensions/filters/http/on_demand/on_demand_update.cc +++ b/source/extensions/filters/http/on_demand/on_demand_update.cc @@ -2,6 +2,7 @@ #include "common/common/assert.h" #include "common/common/enum_to_int.h" +#include "common/common/logger.h" #include "common/http/codes.h" namespace Envoy { @@ -10,16 +11,20 @@ namespace HttpFilters { namespace OnDemand { Http::FilterHeadersStatus OnDemandRouteUpdate::decodeHeaders(Http::RequestHeaderMap&, bool) { - if (callbacks_->route() != nullptr || - !(callbacks_->routeConfig().has_value() && callbacks_->routeConfig().value()->usesVhds())) { + + if (callbacks_->route() != nullptr) { filter_iteration_state_ = Http::FilterHeadersStatus::Continue; return filter_iteration_state_; } + // decodeHeaders() is interrupted. + decode_headers_active_ = true; route_config_updated_callback_ = std::make_shared(Http::RouteConfigUpdatedCallback( [this](bool route_exists) -> void { onRouteConfigUpdateCompletion(route_exists); })); - callbacks_->requestRouteConfigUpdate(route_config_updated_callback_); filter_iteration_state_ = Http::FilterHeadersStatus::StopIteration; + callbacks_->requestRouteConfigUpdate(route_config_updated_callback_); + // decodeHeaders() is completed. + decode_headers_active_ = false; return filter_iteration_state_; } @@ -48,6 +53,11 @@ void OnDemandRouteUpdate::onDestroy() { route_config_updated_callback_.reset(); void OnDemandRouteUpdate::onRouteConfigUpdateCompletion(bool route_exists) { filter_iteration_state_ = Http::FilterHeadersStatus::Continue; + // Don't call continueDecoding in the middle of decodeHeaders() + if (decode_headers_active_) { + return; + } + if (route_exists && // route can be resolved after an on-demand // VHDS update !callbacks_->decodingBuffer() && // Redirects with body not yet supported. diff --git a/source/extensions/filters/http/on_demand/on_demand_update.h b/source/extensions/filters/http/on_demand/on_demand_update.h index 455ef4160aa5..55c9a382453c 100644 --- a/source/extensions/filters/http/on_demand/on_demand_update.h +++ b/source/extensions/filters/http/on_demand/on_demand_update.h @@ -33,6 +33,7 @@ class OnDemandRouteUpdate : public Http::StreamDecoderFilter { Http::StreamDecoderFilterCallbacks* callbacks_{}; Http::RouteConfigUpdatedCallbackSharedPtr route_config_updated_callback_; Envoy::Http::FilterHeadersStatus filter_iteration_state_{Http::FilterHeadersStatus::Continue}; + bool decode_headers_active_{false}; }; } // namespace OnDemand diff --git a/test/common/grpc/grpc_client_integration.h b/test/common/grpc/grpc_client_integration.h index e8d63387db6d..a7bd2ee4b5d7 100644 --- a/test/common/grpc/grpc_client_integration.h +++ b/test/common/grpc/grpc_client_integration.h @@ -140,4 +140,4 @@ class DeltaSotwIntegrationParamTest #endif // ENVOY_GOOGLE_GRPC } // namespace Grpc -} // namespace Envoy +} // namespace Envoy \ No newline at end of file diff --git a/test/common/router/scoped_rds_test.cc b/test/common/router/scoped_rds_test.cc index b00466a702a4..acbcd0a0db84 100644 --- a/test/common/router/scoped_rds_test.cc +++ b/test/common/router/scoped_rds_test.cc @@ -105,6 +105,8 @@ class ScopedRoutesTestBase : public testing::Test { ScopedRoutesConfigProviderManagerPtr config_provider_manager_; Event::SimulatedTimeSystem time_system_; + + NiceMock event_dispatcher_; }; class ScopedRdsTest : public ScopedRoutesTestBase { @@ -180,6 +182,17 @@ name: foo_scoped_routes srds_subscription_ = server_factory_context_.cluster_manager_.subscription_factory_.callbacks_; } + void srdsUpdateWithYaml(std::vector const& config_yamls, + std::string const& version) { + std::vector resources; + resources.reserve(config_yamls.size()); + for (std::string const& config_yaml : config_yamls) { + resources.push_back(parseScopedRouteConfigurationFromYaml(config_yaml)); + } + const auto decoded_resources = TestUtility::decodeResources(resources); + EXPECT_NO_THROW(srds_subscription_->onConfigUpdate(decoded_resources.refvec_, version)); + } + // Helper function which pushes an update to given RDS subscription, the start(_) of the // subscription must have been called. void pushRdsConfig(const std::vector& route_config_names, @@ -198,6 +211,9 @@ name: foo_scoped_routes TestUtility::parseYaml( fmt::format(route_config_tmpl, name)); const auto decoded_resources = TestUtility::decodeResources({route_config}); + if (rds_subscription_by_name_.find(name) == rds_subscription_by_name_.end()) { + continue; + } rds_subscription_by_name_[name]->onConfigUpdate(decoded_resources.refvec_, version); } } @@ -219,6 +235,13 @@ name: foo_scoped_routes absl::flat_hash_map rds_subscription_by_config_subscription_; absl::flat_hash_map rds_subscription_by_name_; + + Envoy::Stats::Gauge& all_scopes_{server_factory_context_.scope_.gauge( + "foo.scoped_rds.foo_scoped_routes.all_scopes", Stats::Gauge::ImportMode::Accumulate)}; + Envoy::Stats::Gauge& active_scopes_{server_factory_context_.scope_.gauge( + "foo.scoped_rds.foo_scoped_routes.active_scopes", Stats::Gauge::ImportMode::Accumulate)}; + Envoy::Stats::Gauge& on_demand_scopes_{server_factory_context_.scope_.gauge( + "foo.scoped_rds.foo_scoped_routes.on_demand_scopes", Stats::Gauge::ImportMode::Accumulate)}; }; // Tests that multiple uniquely named non-conflict resources are allowed in config updates. @@ -248,6 +271,8 @@ route_configuration_name: foo_routes EXPECT_EQ(1UL, server_factory_context_.scope_.counter("foo.scoped_rds.foo_scoped_routes.config_reload") .value()); + EXPECT_EQ(2UL, all_scopes_.value()); + EXPECT_EQ(2UL, active_scopes_.value()); // Verify the config is a ScopedConfigImpl instance, both scopes point to "" as RDS hasn't kicked // in yet(NullConfigImpl returned). @@ -279,7 +304,7 @@ route_configuration_name: foo_routes // Delete foo_scope2. const auto decoded_resources_2 = TestUtility::decodeResources({resource}); EXPECT_NO_THROW(srds_subscription_->onConfigUpdate(decoded_resources_2.refvec_, "3")); - EXPECT_EQ(getScopedRouteMap().size(), 1); + EXPECT_EQ(1UL, all_scopes_.value()); EXPECT_EQ(getScopedRouteMap().count("foo_scope"), 1); EXPECT_EQ(2UL, server_factory_context_.scope_.counter("foo.scoped_rds.foo_scoped_routes.config_reload") @@ -323,7 +348,7 @@ route_configuration_name: foo_routes EXPECT_EQ(1UL, server_factory_context_.scope_.counter("foo.scoped_rds.foo_scoped_routes.config_reload") .value()); - EXPECT_EQ(getScopedRouteMap().size(), 2); + EXPECT_EQ(2UL, all_scopes_.value()); // Verify the config is a ScopedConfigImpl instance, both scopes point to "" as RDS hasn't kicked // in yet(NullConfigImpl returned). @@ -357,7 +382,7 @@ route_configuration_name: foo_routes *deletes.Add() = "foo_scope2"; const auto decoded_resources_2 = TestUtility::decodeResources({resource}); EXPECT_NO_THROW(srds_subscription_->onConfigUpdate(decoded_resources_2.refvec_, deletes, "2")); - EXPECT_EQ(getScopedRouteMap().size(), 1); + EXPECT_EQ(1UL, all_scopes_.value()); EXPECT_EQ(getScopedRouteMap().count("foo_scope"), 1); EXPECT_EQ(2UL, server_factory_context_.scope_.counter("foo.scoped_rds.foo_scoped_routes.config_reload") @@ -517,7 +542,7 @@ route_configuration_name: foo_routes server_factory_context_.scope_.counter("foo.scoped_rds.foo_scoped_routes.config_reload") .value()); // foo_scope is deleted, and foo_scope2 is added. - EXPECT_EQ(getScopedRouteMap().size(), 2UL); + EXPECT_EQ(all_scopes_.value(), 2UL); EXPECT_EQ(getScopedRouteMap().count("foo_scope1"), 0); EXPECT_EQ(getScopedRouteMap().count("foo_scope2"), 1); EXPECT_EQ(getScopedRouteMap().count("foo_scope3"), 1); @@ -543,7 +568,7 @@ route_configuration_name: foo_routes EXPECT_THROW_WITH_REGEX( srds_subscription_->onConfigUpdate(decoded_resources_3.refvec_, "3"), EnvoyException, "scope key conflict found, first scope is 'foo_scope2', second scope is 'foo_scope4'"); - EXPECT_EQ(getScopedRouteMap().size(), 2UL); + EXPECT_EQ(2UL, all_scopes_.value()); EXPECT_EQ(getScopedRouteMap().count("foo_scope1"), 0); EXPECT_EQ(getScopedRouteMap().count("foo_scope2"), 1); EXPECT_EQ(getScopedRouteMap().count("foo_scope3"), 1); @@ -559,7 +584,7 @@ route_configuration_name: foo_routes EXPECT_EQ(server_factory_context_.scope_.counter("foo.scoped_rds.foo_scoped_routes.config_reload") .value(), 3UL); - EXPECT_EQ(getScopedRouteMap().size(), 2UL); + EXPECT_EQ(2UL, all_scopes_.value()); EXPECT_EQ(getScopedRouteMap().count("foo_scope3"), 1); EXPECT_EQ(getScopedRouteMap().count("foo_scope4"), 1); EXPECT_EQ(getScopedRdsProvider() @@ -854,7 +879,7 @@ route_configuration_name: foo_routes EXPECT_EQ(1UL, server_factory_context_.scope_.counter("foo.scoped_rds.foo_scoped_routes.config_reload") .value()); - EXPECT_EQ(getScopedRouteMap().size(), 2); + EXPECT_EQ(2UL, all_scopes_.value()); const std::string config_yaml3 = R"EOF( name: bar_scope @@ -877,7 +902,7 @@ route_configuration_name: foo_routes EXPECT_EQ(2UL, server_factory_context_.scope_.counter("foo.scoped_rds.foo_scoped_routes.config_reload") .value()); - EXPECT_EQ(getScopedRouteMap().size(), 2); + EXPECT_EQ(2UL, all_scopes_.value()); } // Tests whether scope key conflict with updated scopes is ignored. @@ -907,7 +932,7 @@ route_configuration_name: foo_routes EXPECT_EQ(1UL, server_factory_context_.scope_.counter("foo.scoped_rds.foo_scoped_routes.config_reload") .value()); - EXPECT_EQ(getScopedRouteMap().size(), 2); + EXPECT_EQ(2UL, all_scopes_.value()); const std::string config_yaml3 = R"EOF( name: bar_scope @@ -930,7 +955,403 @@ route_configuration_name: foo_routes EXPECT_EQ(2UL, server_factory_context_.scope_.counter("foo.scoped_rds.foo_scoped_routes.config_reload") .value()); - EXPECT_EQ(getScopedRouteMap().size(), 2); + EXPECT_EQ(2UL, all_scopes_.value()); +} + +// Compare behavior of a lazy scope and an eager scope scopes that share that same route +// configuration. Route config of on demand scope shouldn't be loaded. +TEST_F(ScopedRdsTest, OnDemandScopeNotLoadedWithoutRequest) { + setup(); + init_watcher_.expectReady(); + context_init_manager_.initialize(init_watcher_); + // Scope should be loaded eagerly by default. + const std::string eager_resource = R"EOF( +name: foo_scope +route_configuration_name: foo_routes +key: + fragments: + - string_key: x-foo-key +)EOF"; + + // On demand scope should be loaded lazily. + const std::string lazy_resource = R"EOF( +name: foo_scope2 +route_configuration_name: foo_routes +on_demand: true +key: + fragments: + - string_key: x-bar-key +)EOF"; + + srdsUpdateWithYaml({lazy_resource, eager_resource}, "1"); + EXPECT_EQ(1UL, + server_factory_context_.scope_.counter("foo.scoped_rds.foo_scoped_routes.config_reload") + .value()); + EXPECT_EQ(2UL, all_scopes_.value()); + + // Verify the config is a ScopedConfigImpl instance, both scopes point to "" as RDS hasn't kicked + // in yet(NullConfigImpl returned). + ASSERT_THAT(getScopedRdsProvider(), Not(IsNull())); + ASSERT_THAT(getScopedRdsProvider()->config(), Not(IsNull())); + // Route config for foo key is NullConfigImpl and route config for bar key is nullptr + EXPECT_EQ(getScopedRdsProvider() + ->config() + ->getRouteConfig(TestRequestHeaderMapImpl{{"Addr", "x-foo-key;x-foo-key"}}) + ->name(), + ""); + EXPECT_THAT(getScopedRdsProvider()->config()->getRouteConfig( + TestRequestHeaderMapImpl{{"Addr", "x-foo-key;x-bar-key"}}), + IsNull()); + pushRdsConfig({"foo_routes"}, "111"); + // Scope foo now have route config but route config for scope bar is still nullptr. + EXPECT_EQ(getScopedRdsProvider() + ->config() + ->getRouteConfig(TestRequestHeaderMapImpl{{"Addr", "x-foo-key;x-foo-key"}}) + ->name(), + "foo_routes"); + EXPECT_THAT(getScopedRdsProvider()->config()->getRouteConfig( + TestRequestHeaderMapImpl{{"Addr", "x-foo-key;x-bar-key"}}), + IsNull()); + EXPECT_EQ(2UL, all_scopes_.value()); + EXPECT_EQ(1UL, active_scopes_.value()); + EXPECT_EQ(1UL, on_demand_scopes_.value()); +} + +// Push Rds update after on demand request, route configuration should be initialized. +TEST_F(ScopedRdsTest, PushRdsAfterOndemandRequest) { + setup(); + init_watcher_.expectReady(); + context_init_manager_.initialize(init_watcher_); + // Scope should be loaded eagerly by default. + const std::string eager_resource = R"EOF( +name: foo_scope +route_configuration_name: foo_routes +key: + fragments: + - string_key: x-foo-key +)EOF"; + + // On demand scope should be loaded lazily. + const std::string lazy_resource = R"EOF( +name: foo_scope2 +route_configuration_name: foo_routes +on_demand: true +key: + fragments: + - string_key: x-bar-key +)EOF"; + + srdsUpdateWithYaml({eager_resource, lazy_resource}, "1"); + EXPECT_EQ(1UL, + server_factory_context_.scope_.counter("foo.scoped_rds.foo_scoped_routes.config_reload") + .value()); + EXPECT_EQ(2UL, all_scopes_.value()); + + // Verify the config is a ScopedConfigImpl instance, both scopes point to "" as RDS hasn't kicked + // in yet(NullConfigImpl returned). + ASSERT_THAT(getScopedRdsProvider(), Not(IsNull())); + ASSERT_THAT(getScopedRdsProvider()->config(), Not(IsNull())); + EXPECT_EQ(getScopedRdsProvider() + ->config() + ->getRouteConfig(TestRequestHeaderMapImpl{{"Addr", "x-foo-key;x-foo-key"}}) + ->name(), + ""); + EXPECT_THAT(getScopedRdsProvider()->config()->getRouteConfig( + TestRequestHeaderMapImpl{{"Addr", "x-foo-key;x-bar-key"}}), + IsNull()); + EXPECT_EQ(1UL, active_scopes_.value()); + + ScopeKeyPtr scope_key = getScopedRdsProvider()->config()->computeScopeKey( + TestRequestHeaderMapImpl{{"Addr", "x-foo-key;x-bar-key"}}); + EXPECT_CALL(event_dispatcher_, post(_)).Times(1); + std::function route_config_updated_cb = [](bool) {}; + getScopedRdsProvider()->onDemandRdsUpdate(std::move(scope_key), event_dispatcher_, + std::move(route_config_updated_cb)); + // After on demand request, push rds update, both scopes should find the route configuration. + pushRdsConfig({"foo_routes"}, "111"); + EXPECT_EQ(getScopedRdsProvider() + ->config() + ->getRouteConfig(TestRequestHeaderMapImpl{{"Addr", "x-foo-key;x-foo-key"}}) + ->name(), + "foo_routes"); + EXPECT_EQ(getScopedRdsProvider() + ->config() + ->getRouteConfig(TestRequestHeaderMapImpl{{"Addr", "x-foo-key;x-bar-key"}}) + ->name(), + "foo_routes"); + // Now we have 1 active on demand scope and 1 eager loading scope. + EXPECT_EQ(2UL, all_scopes_.value()); + EXPECT_EQ(2UL, active_scopes_.value()); + EXPECT_EQ(1UL, on_demand_scopes_.value()); +} + +TEST_F(ScopedRdsTest, PushRdsBeforeOndemandRequest) { + setup(); + init_watcher_.expectReady(); + context_init_manager_.initialize(init_watcher_); + // Scope should be loaded eagerly by default. + const std::string eager_resource = R"EOF( +name: foo_scope +route_configuration_name: foo_routes +key: + fragments: + - string_key: x-foo-key +)EOF"; + + // On demand scope should be loaded lazily. + const std::string lazy_resource = R"EOF( +name: foo_scope2 +route_configuration_name: foo_routes +on_demand: true +key: + fragments: + - string_key: x-bar-key +)EOF"; + + srdsUpdateWithYaml({eager_resource, lazy_resource}, "1"); + EXPECT_EQ(1UL, + server_factory_context_.scope_.counter("foo.scoped_rds.foo_scoped_routes.config_reload") + .value()); + EXPECT_EQ(2UL, all_scopes_.value()); + + // Verify the config is a ScopedConfigImpl instance, both scopes point to "" as RDS hasn't kicked + // in yet(NullConfigImpl returned). + ASSERT_THAT(getScopedRdsProvider(), Not(IsNull())); + ASSERT_THAT(getScopedRdsProvider()->config(), Not(IsNull())); + EXPECT_EQ(getScopedRdsProvider() + ->config() + ->getRouteConfig(TestRequestHeaderMapImpl{{"Addr", "x-foo-key;x-foo-key"}}) + ->name(), + ""); + EXPECT_THAT(getScopedRdsProvider()->config()->getRouteConfig( + TestRequestHeaderMapImpl{{"Addr", "x-foo-key;x-bar-key"}}), + IsNull()); + // Push rds update before on demand srds request. + pushRdsConfig({"foo_routes"}, "111"); + EXPECT_EQ(getScopedRdsProvider() + ->config() + ->getRouteConfig(TestRequestHeaderMapImpl{{"Addr", "x-foo-key;x-foo-key"}}) + ->name(), + "foo_routes"); + ScopeKeyPtr scope_key = getScopedRdsProvider()->config()->computeScopeKey( + TestRequestHeaderMapImpl{{"Addr", "x-foo-key;x-bar-key"}}); + EXPECT_CALL(server_factory_context_.dispatcher_, post(_)).Times(1); + EXPECT_CALL(event_dispatcher_, post(_)).Times(1); + std::function route_config_updated_cb = [](bool) {}; + getScopedRdsProvider()->onDemandRdsUpdate(std::move(scope_key), event_dispatcher_, + std::move(route_config_updated_cb)); + EXPECT_EQ(getScopedRdsProvider() + ->config() + ->getRouteConfig(TestRequestHeaderMapImpl{{"Addr", "x-foo-key;x-bar-key"}}) + ->name(), + "foo_routes"); +} + +// Change a scope from lazy to eager will enable eager loading. +TEST_F(ScopedRdsTest, UpdateOnDemandScopeToEagerScope) { + setup(); + init_watcher_.expectReady(); + context_init_manager_.initialize(init_watcher_); + // On demand scope should be loaded lazily. + const std::string lazy_resource = R"EOF( +name: foo_scope +route_configuration_name: foo_routes +on_demand: true +key: + fragments: + - string_key: x-foo-key +)EOF"; + + srdsUpdateWithYaml({lazy_resource}, "1"); + ASSERT_THAT(getScopedRdsProvider(), Not(IsNull())); + ASSERT_THAT(getScopedRdsProvider()->config(), Not(IsNull())); + + EXPECT_THAT(getScopedRdsProvider()->config()->getRouteConfig( + TestRequestHeaderMapImpl{{"Addr", "x-foo-key;x-foo-key"}}), + IsNull()); + EXPECT_EQ(0UL, active_scopes_.value()); + EXPECT_EQ(1UL, on_demand_scopes_.value()); + // The on demand scope will be overwritten. + const std::string eager_resource = R"EOF( +name: foo_scope +route_configuration_name: foo_routes +key: + fragments: + - string_key: x-foo-key +)EOF"; + srdsUpdateWithYaml({eager_resource}, "2"); + EXPECT_EQ(1UL, all_scopes_.value()); + EXPECT_EQ(getScopedRdsProvider() + ->config() + ->getRouteConfig(TestRequestHeaderMapImpl{{"Addr", "x-foo-key;x-foo-key"}}) + ->name(), + ""); + pushRdsConfig({"foo_routes"}, "111"); + EXPECT_EQ(getScopedRdsProvider() + ->config() + ->getRouteConfig(TestRequestHeaderMapImpl{{"Addr", "x-foo-key;x-foo-key"}}) + ->name(), + "foo_routes"); + // Now we have 1 eager scope. + EXPECT_EQ(1UL, active_scopes_.value()); + EXPECT_EQ(0UL, on_demand_scopes_.value()); + EXPECT_EQ(1UL, all_scopes_.value()); +} + +// Change a scope from eager to lazy will delete the route table. +TEST_F(ScopedRdsTest, UpdateEagerScopeToOnDemandScope) { + setup(); + init_watcher_.expectReady(); + context_init_manager_.initialize(init_watcher_); + + const std::string eager_resource = R"EOF( +name: foo_scope +route_configuration_name: foo_routes +key: + fragments: + - string_key: x-foo-key +)EOF"; + + srdsUpdateWithYaml({eager_resource}, "1"); + EXPECT_EQ(1UL, active_scopes_.value()); + EXPECT_EQ(0UL, on_demand_scopes_.value()); + // The scope is eager loading and rds update will be accepted. + pushRdsConfig({"foo_routes"}, "111"); + ASSERT_THAT(getScopedRdsProvider(), Not(IsNull())); + ASSERT_THAT(getScopedRdsProvider()->config(), Not(IsNull())); + EXPECT_EQ(getScopedRdsProvider() + ->config() + ->getRouteConfig(TestRequestHeaderMapImpl{{"Addr", "x-foo-key;x-foo-key"}}) + ->name(), + "foo_routes"); + // Update the scope to on demand, rds provider and the route config will be deleted. + const std::string lazy_resource = R"EOF( + name: foo_scope + route_configuration_name: foo_routes + on_demand: true + key: + fragments: + - string_key: x-bar-key + )EOF"; + srdsUpdateWithYaml({lazy_resource}, "2"); + EXPECT_THAT(getScopedRdsProvider()->config()->getRouteConfig( + TestRequestHeaderMapImpl{{"Addr", "x-foo-key;x-foo-key"}}), + IsNull()); + // The new scope will be on demand and inactive after srds update. + EXPECT_EQ(0UL, active_scopes_.value()); + EXPECT_EQ(1UL, on_demand_scopes_.value()); + EXPECT_EQ(1UL, all_scopes_.value()); +} // namespace + +// Post on demand callbacks multiple times, all should be executed after rds update. +TEST_F(ScopedRdsTest, MultipleOnDemandUpdatedCallback) { + setup(); + init_watcher_.expectReady(); + context_init_manager_.initialize(init_watcher_); + // On demand scope should be loaded lazily. + const std::string lazy_resource = R"EOF( +name: foo_scope +route_configuration_name: foo_routes +on_demand: true +key: + fragments: + - string_key: x-foo-key +)EOF"; + srdsUpdateWithYaml({lazy_resource}, "1"); + + EXPECT_EQ(0UL, active_scopes_.value()); + EXPECT_EQ(1UL, on_demand_scopes_.value()); + // All the on demand updated callbacks will be executed when the route table comes. + for (int i = 0; i < 5; i++) { + ScopeKeyPtr scope_key = getScopedRdsProvider()->config()->computeScopeKey( + TestRequestHeaderMapImpl{{"Addr", "x-foo-key;x-foo-key"}}); + std::function route_config_updated_cb = [](bool) {}; + getScopedRdsProvider()->onDemandRdsUpdate(std::move(scope_key), event_dispatcher_, + std::move(route_config_updated_cb)); + } + // After on demand request, push rds update, the callbacks will be executed. + EXPECT_CALL(event_dispatcher_, post(_)).Times(5); + pushRdsConfig({"foo_routes"}, "111"); + // Route table have been fetched, callbacks will be executed immediately. + for (int i = 0; i < 5; i++) { + EXPECT_CALL(event_dispatcher_, post(_)).Times(1); + ScopeKeyPtr scope_key = getScopedRdsProvider()->config()->computeScopeKey( + TestRequestHeaderMapImpl{{"Addr", "x-foo-key;x-foo-key"}}); + std::function route_config_updated_cb = [](bool) {}; + getScopedRdsProvider()->onDemandRdsUpdate(std::move(scope_key), event_dispatcher_, + std::move(route_config_updated_cb)); + } + // Activating the same on_demand scope multiple times, active_scopes is still 1. + EXPECT_EQ(getScopedRdsProvider() + ->config() + ->getRouteConfig(TestRequestHeaderMapImpl{{"Addr", "x-foo-key;x-foo-key"}}) + ->name(), + "foo_routes"); + EXPECT_EQ(1UL, active_scopes_.value()); + EXPECT_EQ(1UL, on_demand_scopes_.value()); +} + +TEST_F(ScopedRdsTest, DanglingSubscriptionOnDemandUpdate) { + setup(); + std::function route_config_updated_cb = [](bool) {}; + Event::PostCb temp_post_cb; + EXPECT_CALL(server_factory_context_.dispatcher_, post(_)) + .WillOnce(testing::SaveArg<0>(&temp_post_cb)); + std::shared_ptr scope_key = + getScopedRdsProvider()->config()->computeScopeKey( + TestRequestHeaderMapImpl{{"Addr", "x-foo-key;x-foo-key"}}); + getScopedRdsProvider()->onDemandRdsUpdate(scope_key, event_dispatcher_, + std::move(route_config_updated_cb)); + // Destroy the scoped_rds subscription by destroying its only config provider. + provider_.reset(); + EXPECT_CALL(event_dispatcher_, post(_)).Times(1); + EXPECT_NO_THROW(temp_post_cb()); +} + +// Delete the on demand scope before on demand update in main thread. +TEST_F(ScopedRdsTest, OnDemandScopeDeleted) { + setup(); + init_watcher_.expectReady(); + context_init_manager_.initialize(init_watcher_); + // On demand scope should be loaded lazily. + const std::string lazy_resource = R"EOF( +name: foo_scope +route_configuration_name: foo_routes +on_demand: true +key: + fragments: + - string_key: x-foo-key +)EOF"; + + srdsUpdateWithYaml({lazy_resource}, "1"); + EXPECT_EQ(0UL, active_scopes_.value()); + EXPECT_EQ(1UL, on_demand_scopes_.value()); + // All the on demand updated callbacks will be executed when the route table comes. + { + ScopeKeyPtr scope_key = getScopedRdsProvider()->config()->computeScopeKey( + TestRequestHeaderMapImpl{{"Addr", "x-foo-key;x-foo-key"}}); + std::function route_config_updated_cb = [](bool scope_exist) { + EXPECT_TRUE(scope_exist); + }; + getScopedRdsProvider()->onDemandRdsUpdate(std::move(scope_key), event_dispatcher_, + std::move(route_config_updated_cb)); + } + // After on demand request, push rds update, the callbacks will be executed. + EXPECT_CALL(event_dispatcher_, post(_)).Times(1); + pushRdsConfig({"foo_routes"}, "111"); + + ScopeKeyPtr scope_key = getScopedRdsProvider()->config()->computeScopeKey( + TestRequestHeaderMapImpl{{"Addr", "x-foo-key;x-foo-key"}}); + // Delete the scope route. + EXPECT_NO_THROW(srds_subscription_->onConfigUpdate({}, "2")); + EXPECT_EQ(0UL, all_scopes_.value()); + EXPECT_CALL(event_dispatcher_, post(_)).Times(1); + // Scope no longer exists after srds update. + std::function route_config_updated_cb = [](bool scope_exist) { + EXPECT_FALSE(scope_exist); + }; + getScopedRdsProvider()->onDemandRdsUpdate(std::move(scope_key), event_dispatcher_, + std::move(route_config_updated_cb)); } } // namespace diff --git a/test/extensions/filters/http/on_demand/on_demand_filter_test.cc b/test/extensions/filters/http/on_demand/on_demand_filter_test.cc index d226ee4dcec6..2df0f265d24e 100644 --- a/test/extensions/filters/http/on_demand/on_demand_filter_test.cc +++ b/test/extensions/filters/http/on_demand/on_demand_filter_test.cc @@ -34,8 +34,6 @@ TEST_F(OnDemandFilterTest, TestDecodeHeaders) { Http::TestRequestHeaderMapImpl headers; std::shared_ptr route_config_ptr{new NiceMock()}; EXPECT_CALL(decoder_callbacks_, route()).WillOnce(Return(nullptr)); - EXPECT_CALL(decoder_callbacks_, routeConfig()).Times(2).WillRepeatedly(Return(route_config_ptr)); - EXPECT_CALL(*route_config_ptr, usesVhds()).WillOnce(Return(true)); EXPECT_CALL(decoder_callbacks_, requestRouteConfigUpdate(_)); EXPECT_EQ(Http::FilterHeadersStatus::StopIteration, filter_->decodeHeaders(headers, true)); } @@ -51,8 +49,8 @@ TEST_F(OnDemandFilterTest, TestDecodeHeadersWhenRouteConfigIsNotAvailable) { Http::TestRequestHeaderMapImpl headers; std::shared_ptr route_config_ptr{new NiceMock()}; EXPECT_CALL(decoder_callbacks_, route()).WillOnce(Return(nullptr)); - EXPECT_CALL(decoder_callbacks_, routeConfig()).WillOnce(Return(absl::nullopt)); - EXPECT_EQ(Http::FilterHeadersStatus::Continue, filter_->decodeHeaders(headers, true)); + EXPECT_CALL(decoder_callbacks_, requestRouteConfigUpdate(_)); + EXPECT_EQ(Http::FilterHeadersStatus::StopIteration, filter_->decodeHeaders(headers, true)); } TEST_F(OnDemandFilterTest, TestDecodeTrailers) { diff --git a/test/integration/BUILD b/test/integration/BUILD index 95d0780a96a4..2433d82c18e6 100644 --- a/test/integration/BUILD +++ b/test/integration/BUILD @@ -1460,11 +1460,11 @@ envoy_cc_test( "//test/common/grpc:grpc_client_integration_lib", "//test/test_common:resources_lib", "//test/test_common:utility_lib", - "@envoy_api//envoy/api/v2:pkg_cc_proto", "@envoy_api//envoy/config/bootstrap/v3:pkg_cc_proto", "@envoy_api//envoy/config/core/v3:pkg_cc_proto", "@envoy_api//envoy/config/route/v3:pkg_cc_proto", "@envoy_api//envoy/extensions/filters/network/http_connection_manager/v3:pkg_cc_proto", + "@envoy_api//envoy/service/discovery/v3:pkg_cc_proto", ], ) diff --git a/test/integration/scoped_rds_integration_test.cc b/test/integration/scoped_rds_integration_test.cc index 89fb71c261ab..d80ab1b87e53 100644 --- a/test/integration/scoped_rds_integration_test.cc +++ b/test/integration/scoped_rds_integration_test.cc @@ -1,10 +1,10 @@ -#include "envoy/api/v2/discovery.pb.h" #include "envoy/config/bootstrap/v3/bootstrap.pb.h" #include "envoy/config/core/v3/config_source.pb.h" #include "envoy/config/core/v3/grpc_service.pb.h" #include "envoy/config/route/v3/route.pb.h" #include "envoy/config/route/v3/scoped_route.pb.h" #include "envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.pb.h" +#include "envoy/service/discovery/v3/discovery.pb.h" #include "common/config/api_version.h" #include "common/config/version_converter.h" @@ -111,10 +111,11 @@ class ScopedRdsIntegrationTest : public HttpIntegrationTest, } else { srds_api_config_source->set_api_type(envoy::config::core::v3::ApiConfigSource::GRPC); } + srds_api_config_source->set_transport_api_version( + envoy::config::core::v3::ApiVersion::V3); grpc_service = srds_api_config_source->add_grpc_services(); setGrpcService(*grpc_service, "srds_cluster", getScopedRdsFakeUpstream().localAddress()); }); - HttpIntegrationTest::initialize(); } @@ -256,6 +257,7 @@ INSTANTIATE_TEST_SUITE_P(IpVersionsAndGrpcTypes, ScopedRdsIntegrationTest, DELTA_SOTW_GRPC_CLIENT_INTEGRATION_PARAMS); // Test that a SRDS DiscoveryResponse is successfully processed. + TEST_P(ScopedRdsIntegrationTest, BasicSuccess) { const std::string scope_tmpl = R"EOF( name: {} @@ -461,7 +463,317 @@ route_configuration_name: foo_route1 {"Addr", "x-foo-key=foo"}}, 456, Http::TestResponseHeaderMapImpl{{":status", "200"}, {"service", "bluh"}}, 123, /*cluster_0*/ 0); + cleanupUpstreamAndDownstream(); +} + +// Test that a scoped route config update is performed on demand and http request will succeed. +TEST_P(ScopedRdsIntegrationTest, OnDemandUpdateSuccess) { + config_helper_.addFilter(R"EOF( + name: envoy.filters.http.on_demand + )EOF"); + const std::string scope_route1 = R"EOF( +name: foo_scope1 +route_configuration_name: foo_route1 +on_demand: true +key: + fragments: + - string_key: foo +)EOF"; + on_server_init_function_ = [this, &scope_route1]() { + createScopedRdsStream(); + sendSrdsResponse({scope_route1}, {scope_route1}, {}, "1"); + }; + initialize(); + registerTestServerPorts({"http"}); + + const std::string route_config_tmpl = R"EOF( + name: {} + virtual_hosts: + - name: integration + domains: ["*"] + routes: + - match: {{ prefix: "/" }} + route: {{ cluster: {} }} +)EOF"; + codec_client_ = makeHttpConnection(makeClientConnection((lookupPort("http")))); + // Request that match lazily loaded scope will trigger on demand loading. + auto response = codec_client_->makeHeaderOnlyRequest( + Http::TestRequestHeaderMapImpl{{":method", "GET"}, + {":path", "/meh"}, + {":authority", "host"}, + {":scheme", "http"}, + {"Addr", "x-foo-key=foo"}}); + createRdsStream("foo_route1"); + sendRdsResponse(fmt::format(route_config_tmpl, "foo_route1", "cluster_0"), "1"); + test_server_->waitForCounterGe("http.config_test.rds.foo_route1.update_success", 1); + + waitForNextUpstreamRequest(); + // Send response headers, and end_stream if there is no response body. + upstream_request_->encodeHeaders(default_response_headers_, true); + + response->waitForHeaders(); + EXPECT_EQ("200", response->headers().Status()->value().getStringView()); + + cleanupUpstreamAndDownstream(); +} + +// With on demand update filter configured, scope not match should still return 404 +TEST_P(ScopedRdsIntegrationTest, OnDemandUpdateScopeNotMatch) { + + config_helper_.addFilter(R"EOF( + name: envoy.filters.http.on_demand + )EOF"); + + const std::string scope_tmpl = R"EOF( +name: {} +route_configuration_name: {} +key: + fragments: + - string_key: {} +)EOF"; + const std::string scope_route1 = fmt::format(scope_tmpl, "foo_scope1", "foo_route1", "foo-route"); + + const std::string route_config_tmpl = R"EOF( + name: {} + virtual_hosts: + - name: integration + domains: ["*"] + routes: + - match: {{ prefix: "/meh" }} + route: {{ cluster: {} }} +)EOF"; + + on_server_init_function_ = [&]() { + createScopedRdsStream(); + sendSrdsResponse({scope_route1}, {scope_route1}, {}, "1"); + createRdsStream("foo_route1"); + // CreateRdsStream waits for connection which is fired by RDS subscription. + sendRdsResponse(fmt::format(route_config_tmpl, "foo_route1", "cluster_0"), "1"); + }; + initialize(); + registerTestServerPorts({"http"}); + + // No scope key matches "bar". + codec_client_ = makeHttpConnection(lookupPort("http")); + auto response = codec_client_->makeHeaderOnlyRequest( + Http::TestRequestHeaderMapImpl{{":method", "GET"}, + {":path", "/meh"}, + {":authority", "host"}, + {":scheme", "http"}, + {"Addr", "x-foo-key=bar"}}); + response->waitForEndStream(); + verifyResponse(std::move(response), "404", Http::TestResponseHeaderMapImpl{}, ""); + cleanupUpstreamAndDownstream(); +} + +// With on demand update filter configured, scope match but virtual host don't match, should still +// return 404 +TEST_P(ScopedRdsIntegrationTest, OnDemandUpdatePrimaryVirtualHostNotMatch) { + + config_helper_.addFilter(R"EOF( + name: envoy.filters.http.on_demand + )EOF"); + + const std::string scope_tmpl = R"EOF( +name: {} +route_configuration_name: {} +key: + fragments: + - string_key: {} +)EOF"; + const std::string scope_route1 = fmt::format(scope_tmpl, "foo_scope1", "foo_route1", "foo-route"); + + const std::string route_config_tmpl = R"EOF( + name: {} + virtual_hosts: + - name: integration + domains: ["*"] + routes: + - match: {{ prefix: "/meh" }} + route: {{ cluster: {} }} +)EOF"; + + on_server_init_function_ = [&]() { + createScopedRdsStream(); + sendSrdsResponse({scope_route1}, {scope_route1}, {}, "1"); + createRdsStream("foo_route1"); + // CreateRdsStream waits for connection which is fired by RDS subscription. + sendRdsResponse(fmt::format(route_config_tmpl, "foo_route1", "cluster_0"), "1"); + }; + initialize(); + registerTestServerPorts({"http"}); + + // No virtual host matches "neh". + codec_client_ = makeHttpConnection(lookupPort("http")); + auto response = codec_client_->makeHeaderOnlyRequest( + Http::TestRequestHeaderMapImpl{{":method", "GET"}, + {":path", "/neh"}, + {":authority", "host"}, + {":scheme", "http"}, + {"Addr", "x-foo-key=foo"}}); + response->waitForEndStream(); + verifyResponse(std::move(response), "404", Http::TestResponseHeaderMapImpl{}, ""); + cleanupUpstreamAndDownstream(); +} + +// With on demand update filter configured, scope match but virtual host don't match, should still +// return 404 +TEST_P(ScopedRdsIntegrationTest, OnDemandUpdateVirtualHostNotMatch) { + + config_helper_.addFilter(R"EOF( + name: envoy.filters.http.on_demand + )EOF"); + + const std::string scope_route1 = R"EOF( +name: foo_scope +route_configuration_name: foo_route1 +key: + fragments: + - string_key: foo +)EOF"; + const std::string scope_route2 = R"EOF( +name: bar_scope +route_configuration_name: foo_route1 +on_demand: true +key: + fragments: + - string_key: bar +)EOF"; + const std::string route_config_tmpl = R"EOF( + name: {} + virtual_hosts: + - name: integration + domains: ["*"] + routes: + - match: {{ prefix: "/meh" }} + route: {{ cluster: {} }} +)EOF"; + + on_server_init_function_ = [&]() { + createScopedRdsStream(); + sendSrdsResponse({scope_route1, scope_route2}, {scope_route1, scope_route2}, {}, "1"); + createRdsStream("foo_route1"); + // CreateRdsStream waits for connection which is fired by RDS subscription. + sendRdsResponse(fmt::format(route_config_tmpl, "foo_route1", "cluster_0"), "1"); + }; + initialize(); + registerTestServerPorts({"http"}); + + // No scope key matches "bar". + codec_client_ = makeHttpConnection(lookupPort("http")); + auto response = codec_client_->makeHeaderOnlyRequest( + Http::TestRequestHeaderMapImpl{{":method", "GET"}, + {":path", "/neh"}, + {":authority", "host"}, + {":scheme", "http"}, + {"Addr", "x-foo-key=bar"}}); + response->waitForEndStream(); + verifyResponse(std::move(response), "404", Http::TestResponseHeaderMapImpl{}, ""); + cleanupUpstreamAndDownstream(); +} + +// Eager and lazy scopes share the same route configuration +TEST_P(ScopedRdsIntegrationTest, DifferentPriorityScopeShareRoute) { + config_helper_.addFilter(R"EOF( + name: envoy.filters.http.on_demand + )EOF"); + + const std::string scope_route1 = R"EOF( +name: foo_scope +route_configuration_name: foo_route1 +key: + fragments: + - string_key: foo +)EOF"; + const std::string scope_route2 = R"EOF( +name: bar_scope +route_configuration_name: foo_route1 +on_demand: true +key: + fragments: + - string_key: bar +)EOF"; + + const std::string route_config_tmpl = R"EOF( + name: {} + virtual_hosts: + - name: integration + domains: ["*"] + routes: + - match: {{ prefix: "/" }} + route: {{ cluster: {} }} +)EOF"; + + on_server_init_function_ = [&]() { + createScopedRdsStream(); + sendSrdsResponse({scope_route1, scope_route2}, {scope_route1, scope_route2}, {}, "1"); + createRdsStream("foo_route1"); + // CreateRdsStream waits for connection which is fired by RDS subscription. + sendRdsResponse(fmt::format(route_config_tmpl, "foo_route1", "cluster_0"), "1"); + }; + initialize(); + registerTestServerPorts({"http"}); + codec_client_ = makeHttpConnection(lookupPort("http")); + test_server_->waitForCounterGe("http.config_test.rds.foo_route1.update_success", 1); + cleanupUpstreamAndDownstream(); + // "foo" request should succeed because the foo scope is loaded eagerly by default. + // "bar" request will initialize rds provider on demand and also succeed. + for (const std::string& scope_key : std::vector{"foo", "bar"}) { + sendRequestAndVerifyResponse( + Http::TestRequestHeaderMapImpl{{":method", "GET"}, + {":path", "/meh"}, + {":authority", "host"}, + {":scheme", "http"}, + {"Addr", fmt::format("x-foo-key={}", scope_key)}}, + 456, Http::TestResponseHeaderMapImpl{{":status", "200"}, {"service", scope_key}}, 123, 0); + } +} + +TEST_P(ScopedRdsIntegrationTest, OnDemandUpdateAfterActiveStreamDestroyed) { + config_helper_.addFilter(R"EOF( + name: envoy.filters.http.on_demand + )EOF"); + const std::string scope_route1 = R"EOF( +name: foo_scope1 +route_configuration_name: foo_route1 +on_demand: true +key: + fragments: + - string_key: foo +)EOF"; + on_server_init_function_ = [this, &scope_route1]() { + createScopedRdsStream(); + sendSrdsResponse({scope_route1}, {scope_route1}, {}, "1"); + }; + initialize(); + registerTestServerPorts({"http"}); + + const std::string route_config_tmpl = R"EOF( + name: {} + virtual_hosts: + - name: integration + domains: ["*"] + routes: + - match: {{ prefix: "/" }} + route: {{ cluster: {} }} +)EOF"; + codec_client_ = makeHttpConnection(makeClientConnection((lookupPort("http")))); + // A request that match lazily loaded scope will trigger on demand loading. + auto response = codec_client_->makeHeaderOnlyRequest( + Http::TestRequestHeaderMapImpl{{":method", "GET"}, + {":path", "/meh"}, + {":authority", "host"}, + {":scheme", "http"}, + {"Addr", "x-foo-key=foo"}}); + test_server_->waitForCounterGe("http.config_test.rds.foo_route1.update_attempt", 1); + // Close the connection and destroy the active stream. + cleanupUpstreamAndDownstream(); + // Push rds update, on demand updated callback is post to worker thread. + // There is no exception thrown even when active stream is dead because weak_ptr can't be locked. + createRdsStream("foo_route1"); + sendRdsResponse(fmt::format(route_config_tmpl, "foo_route1", "cluster_0"), "1"); + test_server_->waitForCounterGe("http.config_test.rds.foo_route1.update_success", 1); } } // namespace -} // namespace Envoy +} // namespace Envoy \ No newline at end of file diff --git a/test/test_common/utility.h b/test/test_common/utility.h index 8c1d983c3ada..20a20246ef82 100644 --- a/test/test_common/utility.h +++ b/test/test_common/utility.h @@ -651,6 +651,19 @@ class TestUtility { return decoded_resources; } + template + static Config::DecodedResourcesWrapper decodeResources(std::vector resources, + const std::string& name_field = "name") { + Config::DecodedResourcesWrapper decoded_resources; + for (const auto& resource : resources) { + auto owned_resource = std::make_unique(resource); + decoded_resources.owned_resources_.emplace_back(new Config::DecodedResourceImpl( + std::move(owned_resource), MessageUtil::getStringField(resource, name_field), {}, "")); + decoded_resources.refvec_.emplace_back(*decoded_resources.owned_resources_.back()); + } + return decoded_resources; + } + template static Config::DecodedResourcesWrapper decodeResources(const Protobuf::RepeatedPtrField& resources,