From 6e7d4864227b95055ecc384ea27b413ce0208103 Mon Sep 17 00:00:00 2001 From: "Adi (Suissa) Peleg" Date: Tue, 1 Aug 2023 21:08:19 -0400 Subject: [PATCH] eds: Adding eds caching support to grpc-mux (#28273) Continuation of PR #28079 (as part of the work for issue #26749). Currently after an EDS-cluster update, Envoy waits for an EDS response. If a timeout occurs, the EDS-cluster will be used without endpoints. This PR adds the use of caching into the GrpcMux. The GrpcMux object adds an EDS resource to the cache when it is received/updated, and removes it when there are no longer subscriptions (watchers). A runtime flag is added to disable the use of the cache, and will be enabled in a future PR when ADS is used. Next PR will plumb this into ADS, and add fetching of resources from the cache as part of the EdsClusterImpl. The entire change can be looked here: adisuissa@f0b7ac8 Risk Level: Low - the disabled runtime flag should prevent the use of the cache in non-tests code. Testing: Added unit tests. Docs Changes: N/A. Release Notes: N/A (future PR). Platform Specific Features: N/A. Runtime guard: disabled by default: envoy_restart_features_use_eds_cache_for_ads Signed-off-by: Adi Suissa-Peleg --- envoy/config/BUILD | 1 + envoy/config/grpc_mux.h | 7 + envoy/config/subscription_factory.h | 2 +- source/common/config/null_grpc_mux_impl.h | 2 + source/common/runtime/runtime_features.cc | 3 + .../common/upstream/cluster_manager_impl.cc | 4 +- .../extensions/config_subscription/grpc/BUILD | 6 + .../grpc_collection_subscription_factory.cc | 4 +- .../config_subscription/grpc/grpc_mux_impl.cc | 50 +++- .../config_subscription/grpc/grpc_mux_impl.h | 44 +++- .../grpc/grpc_subscription_factory.cc | 10 +- .../grpc/new_grpc_mux_impl.cc | 39 ++- .../grpc/new_grpc_mux_impl.h | 22 +- .../config_subscription/grpc/watch_map.cc | 53 +++- .../config_subscription/grpc/watch_map.h | 20 +- .../config_subscription/grpc/xds_mux/BUILD | 2 + .../grpc/xds_mux/grpc_mux_impl.cc | 50 +++- .../grpc/xds_mux/grpc_mux_impl.h | 12 +- .../config/grpc_subscription_test_harness.h | 8 +- .../extensions/clusters/eds/eds_speed_test.cc | 4 +- .../extensions/config_subscription/grpc/BUILD | 6 + .../grpc/delta_subscription_impl_test.cc | 6 +- .../grpc/delta_subscription_test_harness.h | 6 +- .../grpc/grpc_mux_impl_test.cc | 239 +++++++++++++++++- .../grpc/new_grpc_mux_impl_test.cc | 197 ++++++++++++++- .../grpc/watch_map_test.cc | 114 ++++++++- .../grpc/xds_grpc_mux_impl_test.cc | 191 +++++++++++++- test/mocks/config/BUILD | 10 + test/mocks/config/eds_resources_cache.cc | 16 ++ test/mocks/config/eds_resources_cache.h | 31 +++ test/mocks/config/mocks.h | 2 + 31 files changed, 1064 insertions(+), 97 deletions(-) create mode 100644 test/mocks/config/eds_resources_cache.cc create mode 100644 test/mocks/config/eds_resources_cache.h diff --git a/envoy/config/BUILD b/envoy/config/BUILD index 1944cb4ff14d..5ef3fea5f5cf 100644 --- a/envoy/config/BUILD +++ b/envoy/config/BUILD @@ -66,6 +66,7 @@ envoy_cc_library( name = "grpc_mux_interface", hdrs = ["grpc_mux.h"], deps = [ + ":eds_resources_cache_interface", ":subscription_interface", "//envoy/stats:stats_macros", "//source/common/common:cleanup_lib", diff --git a/envoy/config/grpc_mux.h b/envoy/config/grpc_mux.h index e945fe5e3d32..a08724fcdf6c 100644 --- a/envoy/config/grpc_mux.h +++ b/envoy/config/grpc_mux.h @@ -4,6 +4,7 @@ #include "envoy/common/exception.h" #include "envoy/common/pure.h" +#include "envoy/config/eds_resources_cache.h" #include "envoy/config/subscription.h" #include "envoy/stats/stats_macros.h" @@ -105,6 +106,12 @@ class GrpcMux { virtual void requestOnDemandUpdate(const std::string& type_url, const absl::flat_hash_set& for_update) PURE; + + /** + * Returns an EdsResourcesCache for this GrpcMux if there is one. + * @return EdsResourcesCacheOptRef optional eds resources cache for the gRPC-mux. + */ + virtual EdsResourcesCacheOptRef edsResourcesCache() PURE; }; using GrpcMuxPtr = std::unique_ptr; diff --git a/envoy/config/subscription_factory.h b/envoy/config/subscription_factory.h index c59a648a44d3..de9d02100ef3 100644 --- a/envoy/config/subscription_factory.h +++ b/envoy/config/subscription_factory.h @@ -121,7 +121,7 @@ class MuxFactory : public Config::UntypedFactory { const LocalInfo::LocalInfo& local_info, std::unique_ptr&& config_validators, BackOffStrategyPtr&& backoff_strategy, OptRef xds_config_tracker, - OptRef xds_resources_delegate) PURE; + OptRef xds_resources_delegate, bool use_eds_resources_cache) PURE; }; } // namespace Config diff --git a/source/common/config/null_grpc_mux_impl.h b/source/common/config/null_grpc_mux_impl.h index b4211e3a21ec..5e82f42e5447 100644 --- a/source/common/config/null_grpc_mux_impl.h +++ b/source/common/config/null_grpc_mux_impl.h @@ -27,6 +27,8 @@ class NullGrpcMuxImpl : public GrpcMux, ENVOY_BUG(false, "unexpected request for on demand update"); } + EdsResourcesCacheOptRef edsResourcesCache() override { return absl::nullopt; } + void onWriteable() override {} void onStreamEstablished() override {} void onEstablishmentFailure() override {} diff --git a/source/common/runtime/runtime_features.cc b/source/common/runtime/runtime_features.cc index 123a64b2cec6..019897e37059 100644 --- a/source/common/runtime/runtime_features.cc +++ b/source/common/runtime/runtime_features.cc @@ -119,6 +119,9 @@ FALSE_RUNTIME_GUARD(envoy_reloadable_features_enable_include_histograms); FALSE_RUNTIME_GUARD(envoy_reloadable_features_refresh_rtt_after_request); // TODO(danzh) false deprecate it once QUICHE has its own enable/disable flag. FALSE_RUNTIME_GUARD(envoy_reloadable_features_quic_reject_all); +// TODO(adisuissa) this will be enabled by default once the work on the feature is +// done in EdsClusterImpl. +FALSE_RUNTIME_GUARD(envoy_restart_features_use_eds_cache_for_ads); // Block of non-boolean flags. Use of int flags is deprecated. Do not add more. ABSL_FLAG(uint64_t, re2_max_program_size_error_level, 100, ""); // NOLINT diff --git a/source/common/upstream/cluster_manager_impl.cc b/source/common/upstream/cluster_manager_impl.cc index a26a46a8a213..3c1d580d2bf5 100644 --- a/source/common/upstream/cluster_manager_impl.cc +++ b/source/common/upstream/cluster_manager_impl.cc @@ -418,7 +418,7 @@ ClusterManagerImpl::ClusterManagerImpl( ->createUncachedRawAsyncClient(), main_thread_dispatcher, random_, *stats_.rootScope(), dyn_resources.ads_config(), local_info, std::move(custom_config_validators), std::move(backoff_strategy), - makeOptRefFromPtr(xds_config_tracker_.get()), {}); + makeOptRefFromPtr(xds_config_tracker_.get()), {}, false); } else { Config::Utility::checkTransportVersion(dyn_resources.ads_config()); auto xds_delegate_opt_ref = makeOptRefFromPtr(xds_resources_delegate_.get()); @@ -439,7 +439,7 @@ ClusterManagerImpl::ClusterManagerImpl( ->createUncachedRawAsyncClient(), main_thread_dispatcher, random_, *stats_.rootScope(), dyn_resources.ads_config(), local_info, std::move(custom_config_validators), std::move(backoff_strategy), - makeOptRefFromPtr(xds_config_tracker_.get()), xds_delegate_opt_ref); + makeOptRefFromPtr(xds_config_tracker_.get()), xds_delegate_opt_ref, false); } } else { ads_mux_ = std::make_unique(); diff --git a/source/extensions/config_subscription/grpc/BUILD b/source/extensions/config_subscription/grpc/BUILD index a3e3fce0d05e..a703ad258a90 100644 --- a/source/extensions/config_subscription/grpc/BUILD +++ b/source/extensions/config_subscription/grpc/BUILD @@ -14,6 +14,7 @@ envoy_cc_library( srcs = ["grpc_mux_impl.cc"], hdrs = ["grpc_mux_impl.h"], deps = [ + ":eds_resources_cache_lib", ":grpc_stream_lib", ":xds_source_id_lib", "//envoy/config:custom_config_validators_interface", @@ -34,6 +35,7 @@ envoy_cc_library( "//source/common/memory:utils_lib", "//source/common/protobuf", "@com_google_absl//absl/container:btree", + "@envoy_api//envoy/config/endpoint/v3:pkg_cc_proto", "@envoy_api//envoy/service/discovery/v3:pkg_cc_proto", ], ) @@ -44,6 +46,7 @@ envoy_cc_library( hdrs = ["new_grpc_mux_impl.h"], deps = [ ":delta_subscription_state_lib", + ":eds_resources_cache_lib", ":grpc_stream_lib", ":pausable_ack_queue_lib", ":watch_map_lib", @@ -54,6 +57,7 @@ envoy_cc_library( "//source/common/config:xds_context_params_lib", "//source/common/config:xds_resource_lib", "//source/common/memory:utils_lib", + "@envoy_api//envoy/config/endpoint/v3:pkg_cc_proto", "@envoy_api//envoy/service/discovery/v3:pkg_cc_proto", ], ) @@ -63,6 +67,7 @@ envoy_cc_library( srcs = ["grpc_subscription_impl.cc"], hdrs = ["grpc_subscription_impl.h"], deps = [ + ":eds_resources_cache_lib", ":grpc_mux_lib", ":new_grpc_mux_lib", "//envoy/config:subscription_interface", @@ -210,6 +215,7 @@ envoy_cc_library( "//source/common/common:minimal_logger_lib", "//source/common/common:utility_lib", "//source/common/config:decoded_resource_lib", + "//source/common/config:resource_name_lib", "//source/common/config:utility_lib", "//source/common/config:xds_resource_lib", "//source/common/protobuf", diff --git a/source/extensions/config_subscription/grpc/grpc_collection_subscription_factory.cc b/source/extensions/config_subscription/grpc/grpc_collection_subscription_factory.cc index 987524f0c204..25c11bb02b58 100644 --- a/source/extensions/config_subscription/grpc/grpc_collection_subscription_factory.cc +++ b/source/extensions/config_subscription/grpc/grpc_collection_subscription_factory.cc @@ -31,7 +31,9 @@ SubscriptionPtr DeltaGrpcCollectionConfigSubscriptionFactory::create( data.dispatcher_, deltaGrpcMethod(data.type_url_), data.scope_, Utility::parseRateLimitSettings(api_config_source), data.local_info_, std::move(custom_config_validators), std::move(backoff_strategy), - data.xds_config_tracker_), + data.xds_config_tracker_, + // No EDS resources cache needed from collections. + /*eds_resources_cache=*/nullptr), data.callbacks_, data.resource_decoder_, data.stats_, data.dispatcher_, Utility::configSourceInitialFetchTimeout(data.config_), /*is_aggregated=*/false, data.options_); diff --git a/source/extensions/config_subscription/grpc/grpc_mux_impl.cc b/source/extensions/config_subscription/grpc/grpc_mux_impl.cc index 924596483a2c..72a9a41a33ae 100644 --- a/source/extensions/config_subscription/grpc/grpc_mux_impl.cc +++ b/source/extensions/config_subscription/grpc/grpc_mux_impl.cc @@ -6,6 +6,7 @@ #include "source/common/config/utility.h" #include "source/common/memory/utils.h" #include "source/common/protobuf/protobuf.h" +#include "source/extensions/config_subscription/grpc/eds_resources_cache_impl.h" #include "source/extensions/config_subscription/grpc/xds_source_id.h" #include "absl/container/btree_map.h" @@ -56,21 +57,21 @@ std::string convertToWildcard(const std::string& resource_name) { } } // namespace -GrpcMuxImpl::GrpcMuxImpl(const LocalInfo::LocalInfo& local_info, - Grpc::RawAsyncClientPtr async_client, Event::Dispatcher& dispatcher, - const Protobuf::MethodDescriptor& service_method, Stats::Scope& scope, - const RateLimitSettings& rate_limit_settings, bool skip_subsequent_node, - CustomConfigValidatorsPtr&& config_validators, - BackOffStrategyPtr backoff_strategy, - XdsConfigTrackerOptRef xds_config_tracker, - XdsResourcesDelegateOptRef xds_resources_delegate, - const std::string& target_xds_authority) +GrpcMuxImpl::GrpcMuxImpl( + const LocalInfo::LocalInfo& local_info, Grpc::RawAsyncClientPtr async_client, + Event::Dispatcher& dispatcher, const Protobuf::MethodDescriptor& service_method, + Stats::Scope& scope, const RateLimitSettings& rate_limit_settings, bool skip_subsequent_node, + CustomConfigValidatorsPtr&& config_validators, BackOffStrategyPtr backoff_strategy, + XdsConfigTrackerOptRef xds_config_tracker, XdsResourcesDelegateOptRef xds_resources_delegate, + EdsResourcesCachePtr eds_resources_cache, const std::string& target_xds_authority) : grpc_stream_(this, std::move(async_client), service_method, dispatcher, scope, std::move(backoff_strategy), rate_limit_settings), local_info_(local_info), skip_subsequent_node_(skip_subsequent_node), config_validators_(std::move(config_validators)), xds_config_tracker_(xds_config_tracker), - xds_resources_delegate_(xds_resources_delegate), target_xds_authority_(target_xds_authority), - first_stream_request_(true), dispatcher_(dispatcher), + xds_resources_delegate_(xds_resources_delegate), + eds_resources_cache_(std::move(eds_resources_cache)), + target_xds_authority_(target_xds_authority), first_stream_request_(true), + dispatcher_(dispatcher), dynamic_update_callback_handle_(local_info.contextProvider().addDynamicContextUpdateCallback( [this](absl::string_view resource_type_url) { onDynamicContextUpdate(resource_type_url); @@ -187,8 +188,14 @@ GrpcMuxWatchPtr GrpcMuxImpl::addWatch(const std::string& type_url, SubscriptionCallbacks& callbacks, OpaqueResourceDecoderSharedPtr resource_decoder, const SubscriptionOptions& options) { + // Resource cache is only used for EDS resources. + EdsResourcesCacheOptRef resources_cache{absl::nullopt}; + if (eds_resources_cache_ && + (type_url == Config::getTypeUrl())) { + resources_cache = makeOptRefFromPtr(eds_resources_cache_.get()); + } auto watch = std::make_unique(resources, callbacks, resource_decoder, type_url, - *this, options, local_info_); + *this, options, local_info_, resources_cache); ENVOY_LOG(debug, "gRPC mux addWatch for " + type_url); // Lazily kick off the requests based on first subscription. This has the @@ -412,6 +419,19 @@ void GrpcMuxImpl::processDiscoveryResources(const std::vectorcallbacks_.onConfigUpdate(found_resources, version_info); + // Resource cache is only used for EDS resources. + if (eds_resources_cache_ && + (type_url == Config::getTypeUrl())) { + for (const auto& resource : found_resources) { + const envoy::config::endpoint::v3::ClusterLoadAssignment& cluster_load_assignment = + dynamic_cast( + resource.get().resource()); + eds_resources_cache_->setResource(resource.get().name(), cluster_load_assignment); + } + // No need to remove resources from the cache, as currently only non-collection + // subscriptions are supported, and these resources are removed in the call + // to updateWatchInterest(). + } } } @@ -532,7 +552,7 @@ class GrpcMuxFactory : public MuxFactory { const envoy::config::core::v3::ApiConfigSource& ads_config, const LocalInfo::LocalInfo& local_info, CustomConfigValidatorsPtr&& config_validators, BackOffStrategyPtr&& backoff_strategy, XdsConfigTrackerOptRef xds_config_tracker, - XdsResourcesDelegateOptRef xds_resources_delegate) override { + XdsResourcesDelegateOptRef xds_resources_delegate, bool use_eds_resources_cache) override { return std::make_shared( local_info, std::move(async_client), dispatcher, *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( @@ -540,6 +560,10 @@ class GrpcMuxFactory : public MuxFactory { scope, Utility::parseRateLimitSettings(ads_config), ads_config.set_node_on_first_message_only(), std::move(config_validators), std::move(backoff_strategy), xds_config_tracker, xds_resources_delegate, + (use_eds_resources_cache && + Runtime::runtimeFeatureEnabled("envoy.restart_features.use_eds_cache_for_ads")) + ? std::make_unique(dispatcher) + : nullptr, Config::Utility::getGrpcControlPlane(ads_config).value_or("")); } }; diff --git a/source/extensions/config_subscription/grpc/grpc_mux_impl.h b/source/extensions/config_subscription/grpc/grpc_mux_impl.h index 83ac9c558c7e..a018dd0a57e1 100644 --- a/source/extensions/config_subscription/grpc/grpc_mux_impl.h +++ b/source/extensions/config_subscription/grpc/grpc_mux_impl.h @@ -7,6 +7,7 @@ #include "envoy/common/random_generator.h" #include "envoy/common/time.h" #include "envoy/config/custom_config_validators.h" +#include "envoy/config/endpoint/v3/endpoint.pb.h" #include "envoy/config/grpc_mux.h" #include "envoy/config/subscription.h" #include "envoy/config/xds_config_tracker.h" @@ -20,6 +21,7 @@ #include "source/common/common/logger.h" #include "source/common/common/utility.h" #include "source/common/config/api_version.h" +#include "source/common/config/resource_name.h" #include "source/common/config/ttl.h" #include "source/common/config/utility.h" #include "source/common/config/xds_context_params.h" @@ -44,7 +46,7 @@ class GrpcMuxImpl : public GrpcMux, bool skip_subsequent_node, CustomConfigValidatorsPtr&& config_validators, BackOffStrategyPtr backoff_strategy, XdsConfigTrackerOptRef xds_config_tracker, XdsResourcesDelegateOptRef xds_resources_delegate, - const std::string& target_xds_authority); + EdsResourcesCachePtr eds_resources_cache, const std::string& target_xds_authority); ~GrpcMuxImpl() override; @@ -72,6 +74,10 @@ class GrpcMuxImpl : public GrpcMux, void requestOnDemandUpdate(const std::string&, const absl::flat_hash_set&) override { } + EdsResourcesCacheOptRef edsResourcesCache() override { + return makeOptRefFromPtr(eds_resources_cache_.get()); + } + void handleDiscoveryResponse( std::unique_ptr&& message); @@ -99,17 +105,29 @@ class GrpcMuxImpl : public GrpcMux, SubscriptionCallbacks& callbacks, OpaqueResourceDecoderSharedPtr resource_decoder, const std::string& type_url, GrpcMuxImpl& parent, const SubscriptionOptions& options, - const LocalInfo::LocalInfo& local_info) + const LocalInfo::LocalInfo& local_info, + EdsResourcesCacheOptRef eds_resources_cache) : callbacks_(callbacks), resource_decoder_(resource_decoder), type_url_(type_url), parent_(parent), subscription_options_(options), local_info_(local_info), - watches_(parent.apiStateFor(type_url).watches_) { + watches_(parent.apiStateFor(type_url).watches_), + eds_resources_cache_(eds_resources_cache) { updateResources(resources); + // If eds resources cache is provided, then the type must be ClusterLoadAssignment. + ASSERT( + !eds_resources_cache_.has_value() || + (type_url == Config::getTypeUrl())); } ~GrpcMuxWatchImpl() override { watches_.erase(iter_); if (!resources_.empty()) { parent_.queueDiscoveryRequest(type_url_); + if (eds_resources_cache_.has_value()) { + // All resources are to be removed, so remove them from the cache. + for (const auto& resource_name : resources_) { + eds_resources_cache_->removeResource(resource_name); + } + } } } @@ -131,7 +149,12 @@ class GrpcMuxImpl : public GrpcMux, private: void updateResources(const absl::flat_hash_set& resources) { - resources_.clear(); + // Finding the list of removed resources by keeping the current resources + // set until the end the function and computing the diff. + // Temporarily keep the resources prior to the update to find which ones + // were removed. + std::set previous_resources; + previous_resources.swap(resources_); std::transform( resources.begin(), resources.end(), std::inserter(resources_, resources_.begin()), [this](const std::string& resource_name) -> std::string { @@ -148,6 +171,16 @@ class GrpcMuxImpl : public GrpcMux, } return resource_name; }); + if (eds_resources_cache_.has_value()) { + // Compute the removed resources and remove them from the cache. + std::vector removed_resources; + std::set_difference(previous_resources.begin(), previous_resources.end(), + resources_.begin(), resources_.end(), + std::back_inserter(removed_resources)); + for (const auto& resource_name : removed_resources) { + eds_resources_cache_->removeResource(resource_name); + } + } // move this watch to the beginning of the list iter_ = watches_.emplace(watches_.begin(), this); } @@ -157,6 +190,8 @@ class GrpcMuxImpl : public GrpcMux, const LocalInfo::LocalInfo& local_info_; WatchList& watches_; WatchList::iterator iter_; + // Optional cache for the specific ClusterLoadAssignments of this watch. + EdsResourcesCacheOptRef eds_resources_cache_; }; // Per muxed API state. @@ -212,6 +247,7 @@ class GrpcMuxImpl : public GrpcMux, CustomConfigValidatorsPtr config_validators_; XdsConfigTrackerOptRef xds_config_tracker_; XdsResourcesDelegateOptRef xds_resources_delegate_; + EdsResourcesCachePtr eds_resources_cache_; const std::string target_xds_authority_; bool first_stream_request_; diff --git a/source/extensions/config_subscription/grpc/grpc_subscription_factory.cc b/source/extensions/config_subscription/grpc/grpc_subscription_factory.cc index c7296942e9e7..435df4f3a1ef 100644 --- a/source/extensions/config_subscription/grpc/grpc_subscription_factory.cc +++ b/source/extensions/config_subscription/grpc/grpc_subscription_factory.cc @@ -33,6 +33,7 @@ GrpcConfigSubscriptionFactory::create(ConfigSubscriptionFactory::SubscriptionDat Utility::parseRateLimitSettings(api_config_source), data.local_info_, api_config_source.set_node_on_first_message_only(), std::move(custom_config_validators), std::move(backoff_strategy), data.xds_config_tracker_, data.xds_resources_delegate_, + /*std::move(data.eds_resources_cache_)*/ nullptr, // EDS cache is only used for ADS. control_plane_id); } else { mux = std::make_shared( @@ -44,6 +45,7 @@ GrpcConfigSubscriptionFactory::create(ConfigSubscriptionFactory::SubscriptionDat Utility::parseRateLimitSettings(api_config_source), api_config_source.set_node_on_first_message_only(), std::move(custom_config_validators), std::move(backoff_strategy), data.xds_config_tracker_, data.xds_resources_delegate_, + /*std::move(data.eds_resources_cache_)*/ nullptr, // EDS cache is only used for ADS. control_plane_id); } return std::make_unique( @@ -73,7 +75,9 @@ DeltaGrpcConfigSubscriptionFactory::create(ConfigSubscriptionFactory::Subscripti data.dispatcher_, deltaGrpcMethod(data.type_url_), data.scope_, Utility::parseRateLimitSettings(api_config_source), data.local_info_, api_config_source.set_node_on_first_message_only(), std::move(custom_config_validators), - std::move(backoff_strategy), data.xds_config_tracker_); + std::move(backoff_strategy), data.xds_config_tracker_, + /*std::move(data.eds_resources_cache_)*/ nullptr // EDS cache is only used for ADS. + ); } else { mux = std::make_shared( Config::Utility::factoryForGrpcApiConfigSource(data.cm_.grpcAsyncClientManager(), @@ -81,7 +85,9 @@ DeltaGrpcConfigSubscriptionFactory::create(ConfigSubscriptionFactory::Subscripti ->createUncachedRawAsyncClient(), data.dispatcher_, deltaGrpcMethod(data.type_url_), data.scope_, Utility::parseRateLimitSettings(api_config_source), data.local_info_, - std::move(custom_config_validators), std::move(backoff_strategy), data.xds_config_tracker_); + std::move(custom_config_validators), std::move(backoff_strategy), data.xds_config_tracker_, + /*std::move(data.eds_resources_cache_)*/ nullptr // EDS cache is only used for ADS. + ); } return std::make_unique( std::move(mux), data.callbacks_, data.resource_decoder_, data.stats_, data.type_url_, diff --git a/source/extensions/config_subscription/grpc/new_grpc_mux_impl.cc b/source/extensions/config_subscription/grpc/new_grpc_mux_impl.cc index 2bf989b4c4b1..8507e3b525fd 100644 --- a/source/extensions/config_subscription/grpc/new_grpc_mux_impl.cc +++ b/source/extensions/config_subscription/grpc/new_grpc_mux_impl.cc @@ -11,6 +11,7 @@ #include "source/common/memory/utils.h" #include "source/common/protobuf/protobuf.h" #include "source/common/protobuf/utility.h" +#include "source/extensions/config_subscription/grpc/eds_resources_cache_impl.h" namespace Envoy { namespace Config { @@ -34,14 +35,12 @@ class AllMuxesState { using AllMuxes = ThreadSafeSingleton; } // namespace -NewGrpcMuxImpl::NewGrpcMuxImpl(Grpc::RawAsyncClientPtr&& async_client, - Event::Dispatcher& dispatcher, - const Protobuf::MethodDescriptor& service_method, - Stats::Scope& scope, const RateLimitSettings& rate_limit_settings, - const LocalInfo::LocalInfo& local_info, - CustomConfigValidatorsPtr&& config_validators, - BackOffStrategyPtr backoff_strategy, - XdsConfigTrackerOptRef xds_config_tracker) +NewGrpcMuxImpl::NewGrpcMuxImpl( + Grpc::RawAsyncClientPtr&& async_client, Event::Dispatcher& dispatcher, + const Protobuf::MethodDescriptor& service_method, Stats::Scope& scope, + const RateLimitSettings& rate_limit_settings, const LocalInfo::LocalInfo& local_info, + CustomConfigValidatorsPtr&& config_validators, BackOffStrategyPtr backoff_strategy, + XdsConfigTrackerOptRef xds_config_tracker, EdsResourcesCachePtr eds_resources_cache) : grpc_stream_(this, std::move(async_client), service_method, dispatcher, scope, std::move(backoff_strategy), rate_limit_settings), local_info_(local_info), config_validators_(std::move(config_validators)), @@ -49,7 +48,8 @@ NewGrpcMuxImpl::NewGrpcMuxImpl(Grpc::RawAsyncClientPtr&& async_client, [this](absl::string_view resource_type_url) { onDynamicContextUpdate(resource_type_url); })), - dispatcher_(dispatcher), xds_config_tracker_(xds_config_tracker) { + dispatcher_(dispatcher), xds_config_tracker_(xds_config_tracker), + eds_resources_cache_(std::move(eds_resources_cache)) { AllMuxes::get().insert(this); } @@ -242,9 +242,16 @@ void NewGrpcMuxImpl::removeWatch(const std::string& type_url, Watch* watch) { void NewGrpcMuxImpl::addSubscription(const std::string& type_url, const bool use_namespace_matching) { - subscriptions_.emplace(type_url, std::make_unique( - type_url, local_info_, use_namespace_matching, dispatcher_, - *config_validators_.get(), xds_config_tracker_)); + // Resource cache is only used for EDS resources. + EdsResourcesCacheOptRef resources_cache{absl::nullopt}; + if (eds_resources_cache_ && + (type_url == Config::getTypeUrl())) { + resources_cache = makeOptRefFromPtr(eds_resources_cache_.get()); + } + subscriptions_.emplace( + type_url, std::make_unique(type_url, local_info_, use_namespace_matching, + dispatcher_, *config_validators_.get(), + xds_config_tracker_, resources_cache)); subscription_ordering_.emplace_back(type_url); } @@ -340,13 +347,17 @@ class NewGrpcMuxFactory : public MuxFactory { const envoy::config::core::v3::ApiConfigSource& ads_config, const LocalInfo::LocalInfo& local_info, CustomConfigValidatorsPtr&& config_validators, BackOffStrategyPtr&& backoff_strategy, XdsConfigTrackerOptRef xds_config_tracker, - OptRef) override { + OptRef, bool use_eds_resources_cache) override { return std::make_shared( std::move(async_client), dispatcher, *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( "envoy.service.discovery.v3.AggregatedDiscoveryService.DeltaAggregatedResources"), scope, Utility::parseRateLimitSettings(ads_config), local_info, - std::move(config_validators), std::move(backoff_strategy), xds_config_tracker); + std::move(config_validators), std::move(backoff_strategy), xds_config_tracker, + (use_eds_resources_cache && + Runtime::runtimeFeatureEnabled("envoy.restart_features.use_eds_cache_for_ads")) + ? std::make_unique(dispatcher) + : nullptr); } }; diff --git a/source/extensions/config_subscription/grpc/new_grpc_mux_impl.h b/source/extensions/config_subscription/grpc/new_grpc_mux_impl.h index 39a27eeed84b..2c82453a7488 100644 --- a/source/extensions/config_subscription/grpc/new_grpc_mux_impl.h +++ b/source/extensions/config_subscription/grpc/new_grpc_mux_impl.h @@ -4,6 +4,7 @@ #include "envoy/common/random_generator.h" #include "envoy/common/token_bucket.h" +#include "envoy/config/endpoint/v3/endpoint.pb.h" #include "envoy/config/grpc_mux.h" #include "envoy/config/subscription.h" #include "envoy/config/xds_config_tracker.h" @@ -11,6 +12,7 @@ #include "source/common/common/logger.h" #include "source/common/config/api_version.h" +#include "source/common/config/resource_name.h" #include "source/common/grpc/common.h" #include "source/common/runtime/runtime_features.h" #include "source/extensions/config_subscription/grpc/delta_subscription_state.h" @@ -36,7 +38,8 @@ class NewGrpcMuxImpl const RateLimitSettings& rate_limit_settings, const LocalInfo::LocalInfo& local_info, CustomConfigValidatorsPtr&& config_validators, BackOffStrategyPtr backoff_strategy, - XdsConfigTrackerOptRef xds_config_tracker); + XdsConfigTrackerOptRef xds_config_tracker, + EdsResourcesCachePtr eds_resources_cache); ~NewGrpcMuxImpl() override; @@ -58,6 +61,10 @@ class NewGrpcMuxImpl void requestOnDemandUpdate(const std::string& type_url, const absl::flat_hash_set& for_update) override; + EdsResourcesCacheOptRef edsResourcesCache() override { + return makeOptRefFromPtr(eds_resources_cache_.get()); + } + ScopedResume pause(const std::string& type_url) override; ScopedResume pause(const std::vector type_urls) override; @@ -86,9 +93,15 @@ class NewGrpcMuxImpl SubscriptionStuff(const std::string& type_url, const LocalInfo::LocalInfo& local_info, const bool use_namespace_matching, Event::Dispatcher& dispatcher, CustomConfigValidators& config_validators, - XdsConfigTrackerOptRef xds_config_tracker) - : watch_map_(use_namespace_matching, type_url, config_validators), - sub_state_(type_url, watch_map_, local_info, dispatcher, xds_config_tracker) {} + XdsConfigTrackerOptRef xds_config_tracker, + EdsResourcesCacheOptRef eds_resources_cache) + : watch_map_(use_namespace_matching, type_url, config_validators, eds_resources_cache), + sub_state_(type_url, watch_map_, local_info, dispatcher, xds_config_tracker) { + // If eds resources cache is provided, then the type must be ClusterLoadAssignment. + ASSERT( + !eds_resources_cache.has_value() || + (type_url == Config::getTypeUrl())); + } WatchMap watch_map_; DeltaSubscriptionState sub_state_; @@ -182,6 +195,7 @@ class NewGrpcMuxImpl Common::CallbackHandlePtr dynamic_update_callback_handle_; Event::Dispatcher& dispatcher_; XdsConfigTrackerOptRef xds_config_tracker_; + EdsResourcesCachePtr eds_resources_cache_; // True iff Envoy is shutting down; no messages should be sent on the `grpc_stream_` when this is // true because it may contain dangling pointers. diff --git a/source/extensions/config_subscription/grpc/watch_map.cc b/source/extensions/config_subscription/grpc/watch_map.cc index b9a1907fd3d3..9c16a4182e45 100644 --- a/source/extensions/config_subscription/grpc/watch_map.cc +++ b/source/extensions/config_subscription/grpc/watch_map.cc @@ -64,8 +64,20 @@ WatchMap::updateWatchInterest(Watch* watch, watch->resource_names_ = update_to_these_names; - return AddedRemoved(findAdditions(newly_added_to_watch, watch), - findRemovals(newly_removed_from_watch, watch)); + // First resources are added and only then removed, so a watch won't be removed + // if its interest has been replaced (rather than completely removed). + absl::flat_hash_set added_resources = findAdditions(newly_added_to_watch, watch); + absl::flat_hash_set removed_resources = + findRemovals(newly_removed_from_watch, watch); + // Remove cached resource that are no longer relevant. + if (eds_resources_cache_.has_value()) { + for (const auto& resource_name : removed_resources) { + // This may pass a resource_name that is not in the cache, for example + // if the resource contents has never arrived. + eds_resources_cache_->removeResource(resource_name); + } + } + return AddedRemoved(std::move(added_resources), std::move(removed_resources)); } absl::flat_hash_set WatchMap::watchesInterestedIn(const std::string& resource_name) { @@ -121,6 +133,10 @@ void WatchMap::onConfigUpdate(const std::vector& resources, ASSERT(deferred_removed_during_update_ == nullptr); deferred_removed_during_update_ = std::make_unique>(); Cleanup cleanup([this] { removeDeferredWatches(); }); + // The xDS server may send a resource that Envoy isn't interested in. This bit array + // will hold an "interesting" bit for each of the resources sent in the update. + std::vector interesting_resources; + interesting_resources.reserve(resources.size()); // Build a map from watches, to the set of updated resources that each watch cares about. Each // entry in the map is then a nice little bundle that can be fed directly into the individual // onConfigUpdate()s. @@ -130,6 +146,9 @@ void WatchMap::onConfigUpdate(const std::vector& resources, for (const auto& interested_watch : interested_in_r) { per_watch_updates[interested_watch].emplace_back(*r); } + // Set the corresponding interested_resources entry to true iff there is a + // watch interested in the resource. + interesting_resources.emplace_back(!interested_in_r.empty()); } // Execute external config validators. @@ -159,6 +178,23 @@ void WatchMap::onConfigUpdate(const std::vector& resources, watch->callbacks_.onConfigUpdate(this_watch_updates->second, version_info); } } + + if (eds_resources_cache_.has_value()) { + // Add/update the watched resources to/in the cache. + // Only resources that have a watcher should be updated. + for (uint32_t resource_idx = 0; resource_idx < resources.size(); ++resource_idx) { + if (interesting_resources[resource_idx]) { + const auto& resource = resources[resource_idx]; + const envoy::config::endpoint::v3::ClusterLoadAssignment& cluster_load_assignment = + dynamic_cast( + resource.get()->resource()); + eds_resources_cache_->setResource(resource.get()->name(), cluster_load_assignment); + } + } + // Note: No need to remove resources from the cache, as currently only non-collection + // subscriptions are supported, and these resources are removed in the call + // to updateWatchInterest(). + } } void WatchMap::onConfigUpdate(const Protobuf::RepeatedPtrField& resources, @@ -242,6 +278,19 @@ void WatchMap::onConfigUpdate( cur_watch->callbacks_.onConfigUpdate({}, {}, system_version_info); } } + + if (eds_resources_cache_.has_value()) { + // Add/update the watched resources to/in the cache. + for (const auto& resource : decoded_resources) { + const envoy::config::endpoint::v3::ClusterLoadAssignment& cluster_load_assignment = + dynamic_cast( + resource.get()->resource()); + eds_resources_cache_->setResource(resource.get()->name(), cluster_load_assignment); + } + // No need to remove resources from the cache, as currently only non-collection + // subscriptions are supported, and these resources are removed in the call + // to updateWatchInterest(). + } } void WatchMap::onConfigUpdateFailed(ConfigUpdateFailureReason reason, const EnvoyException* e) { diff --git a/source/extensions/config_subscription/grpc/watch_map.h b/source/extensions/config_subscription/grpc/watch_map.h index cd5c1973fd02..47cd43b4581f 100644 --- a/source/extensions/config_subscription/grpc/watch_map.h +++ b/source/extensions/config_subscription/grpc/watch_map.h @@ -5,11 +5,13 @@ #include #include "envoy/config/custom_config_validators.h" +#include "envoy/config/eds_resources_cache.h" #include "envoy/config/subscription.h" #include "envoy/service/discovery/v3/discovery.pb.h" #include "source/common/common/assert.h" #include "source/common/common/logger.h" +#include "source/common/config/resource_name.h" #include "absl/container/flat_hash_map.h" #include "absl/container/flat_hash_set.h" @@ -59,12 +61,25 @@ struct Watch { // update the subscription accordingly. // // A WatchMap is assumed to be dedicated to a single type_url type of resource (EDS, CDS, etc). +// +// The WatchMap can also store the fetched resources in a cache, and allow others to fetch +// resources directly from the cache. This is done for EDS in the following case: +// Assume an active EDS cluster exists with some load-assignment that is kept in the cache. +// If the cluster is updated, and no load-assignment is sent from the xDS server, the +// cached version will be used. +// The WatchMap is responsible to update the cache with the resource contents, and it is +// up to the specific xDS type subscription handler (i.e., EdsClusterImpl), to fetch +// the resource from the cache. class WatchMap : public UntypedConfigUpdateCallbacks, public Logger::Loggable { public: WatchMap(const bool use_namespace_matching, const std::string& type_url, - CustomConfigValidators& config_validators) + CustomConfigValidators& config_validators, EdsResourcesCacheOptRef eds_resources_cache) : use_namespace_matching_(use_namespace_matching), type_url_(type_url), - config_validators_(config_validators) {} + config_validators_(config_validators), eds_resources_cache_(eds_resources_cache) { + // If eds resources cache is provided, then the type must be ClusterLoadAssignment. + ASSERT(!eds_resources_cache_.has_value() || + (type_url == Config::getTypeUrl())); + } // Adds 'callbacks' to the WatchMap, with every possible resource being watched. // (Use updateWatchInterest() to narrow it down to some specific names). @@ -133,6 +148,7 @@ class WatchMap : public UntypedConfigUpdateCallbacks, public Logger::Loggable::GrpcMuxImpl( Stats::Scope& scope, const RateLimitSettings& rate_limit_settings, CustomConfigValidatorsPtr&& config_validators, BackOffStrategyPtr backoff_strategy, XdsConfigTrackerOptRef xds_config_tracker, XdsResourcesDelegateOptRef xds_resources_delegate, - const std::string& target_xds_authority) + EdsResourcesCachePtr eds_resources_cache, const std::string& target_xds_authority) : grpc_stream_(this, std::move(async_client), service_method, dispatcher, scope, std::move(backoff_strategy), rate_limit_settings), subscription_state_factory_(std::move(subscription_state_factory)), @@ -53,7 +55,9 @@ GrpcMuxImpl::GrpcMuxImpl( onDynamicContextUpdate(resource_type_url); })), config_validators_(std::move(config_validators)), xds_config_tracker_(xds_config_tracker), - xds_resources_delegate_(xds_resources_delegate), target_xds_authority_(target_xds_authority) { + xds_resources_delegate_(xds_resources_delegate), + eds_resources_cache_(std::move(eds_resources_cache)), + target_xds_authority_(target_xds_authority) { Config::Utility::checkLocalInfo("ads", local_info); AllMuxes::get().insert(this); } @@ -84,12 +88,19 @@ Config::GrpcMuxWatchPtr GrpcMuxImpl::addWatch( const SubscriptionOptions& options) { auto watch_map = watch_maps_.find(type_url); if (watch_map == watch_maps_.end()) { + // Resource cache is only used for EDS resources. + EdsResourcesCacheOptRef resources_cache{absl::nullopt}; + if (eds_resources_cache_ && + (type_url == Config::getTypeUrl())) { + resources_cache = makeOptRefFromPtr(eds_resources_cache_.get()); + } + // We don't yet have a subscription for type_url! Make one! - watch_map = - watch_maps_ - .emplace(type_url, std::make_unique(options.use_namespace_matching_, type_url, - *config_validators_.get())) - .first; + watch_map = watch_maps_ + .emplace(type_url, + std::make_unique(options.use_namespace_matching_, type_url, + *config_validators_.get(), resources_cache)) + .first; subscriptions_.emplace(type_url, subscription_state_factory_->makeSubscriptionState( type_url, *watch_maps_[type_url], resource_decoder, xds_config_tracker_, xds_resources_delegate_, @@ -367,11 +378,12 @@ GrpcMuxDelta::GrpcMuxDelta(Grpc::RawAsyncClientPtr&& async_client, Event::Dispat const LocalInfo::LocalInfo& local_info, bool skip_subsequent_node, CustomConfigValidatorsPtr&& config_validators, BackOffStrategyPtr backoff_strategy, - XdsConfigTrackerOptRef xds_config_tracker) + XdsConfigTrackerOptRef xds_config_tracker, + EdsResourcesCachePtr eds_resources_cache) : GrpcMuxImpl(std::make_unique(dispatcher), skip_subsequent_node, local_info, std::move(async_client), dispatcher, service_method, scope, rate_limit_settings, std::move(config_validators), std::move(backoff_strategy), - xds_config_tracker) {} + xds_config_tracker, absl::nullopt, std::move(eds_resources_cache)) {} // GrpcStreamCallbacks for GrpcMuxDelta void GrpcMuxDelta::requestOnDemandUpdate(const std::string& type_url, @@ -392,11 +404,13 @@ GrpcMuxSotw::GrpcMuxSotw(Grpc::RawAsyncClientPtr&& async_client, Event::Dispatch BackOffStrategyPtr backoff_strategy, XdsConfigTrackerOptRef xds_config_tracker, XdsResourcesDelegateOptRef xds_resources_delegate, + EdsResourcesCachePtr eds_resources_cache, const std::string& target_xds_authority) : GrpcMuxImpl(std::make_unique(dispatcher), skip_subsequent_node, local_info, std::move(async_client), dispatcher, service_method, scope, rate_limit_settings, std::move(config_validators), std::move(backoff_strategy), - xds_config_tracker, xds_resources_delegate, target_xds_authority) {} + xds_config_tracker, xds_resources_delegate, std::move(eds_resources_cache), + target_xds_authority) {} Config::GrpcMuxWatchPtr NullGrpcMuxImpl::addWatch(const std::string&, const absl::flat_hash_set&, @@ -416,7 +430,7 @@ class DeltaGrpcMuxFactory : public MuxFactory { const envoy::config::core::v3::ApiConfigSource& ads_config, const LocalInfo::LocalInfo& local_info, CustomConfigValidatorsPtr&& config_validators, BackOffStrategyPtr&& backoff_strategy, XdsConfigTrackerOptRef xds_config_tracker, - XdsResourcesDelegateOptRef) override { + XdsResourcesDelegateOptRef, bool use_eds_resources_cache) override { return std::make_shared( std::move(async_client), dispatcher, *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( @@ -424,7 +438,11 @@ class DeltaGrpcMuxFactory : public MuxFactory { "DeltaAggregatedResources"), scope, Envoy::Config::Utility::parseRateLimitSettings(ads_config), local_info, ads_config.set_node_on_first_message_only(), std::move(config_validators), - std::move(backoff_strategy), xds_config_tracker); + std::move(backoff_strategy), xds_config_tracker, + (use_eds_resources_cache && + Runtime::runtimeFeatureEnabled("envoy.restart_features.use_eds_cache_for_ads")) + ? std::make_unique(dispatcher) + : nullptr); } }; @@ -438,7 +456,7 @@ class SotwGrpcMuxFactory : public MuxFactory { const envoy::config::core::v3::ApiConfigSource& ads_config, const LocalInfo::LocalInfo& local_info, CustomConfigValidatorsPtr&& config_validators, BackOffStrategyPtr&& backoff_strategy, XdsConfigTrackerOptRef xds_config_tracker, - XdsResourcesDelegateOptRef) override { + XdsResourcesDelegateOptRef, bool use_eds_resources_cache) override { return std::make_shared( std::move(async_client), dispatcher, *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( @@ -446,7 +464,11 @@ class SotwGrpcMuxFactory : public MuxFactory { "StreamAggregatedResources"), scope, Envoy::Config::Utility::parseRateLimitSettings(ads_config), local_info, ads_config.set_node_on_first_message_only(), std::move(config_validators), - std::move(backoff_strategy), xds_config_tracker); + std::move(backoff_strategy), xds_config_tracker, absl::nullopt, + (use_eds_resources_cache && + Runtime::runtimeFeatureEnabled("envoy.restart_features.use_eds_cache_for_ads")) + ? std::make_unique(dispatcher) + : nullptr); } }; diff --git a/source/extensions/config_subscription/grpc/xds_mux/grpc_mux_impl.h b/source/extensions/config_subscription/grpc/xds_mux/grpc_mux_impl.h index 7669ca73cffd..67746534b092 100644 --- a/source/extensions/config_subscription/grpc/xds_mux/grpc_mux_impl.h +++ b/source/extensions/config_subscription/grpc/xds_mux/grpc_mux_impl.h @@ -66,6 +66,7 @@ class GrpcMuxImpl : public GrpcStreamCallbacks, CustomConfigValidatorsPtr&& config_validators, BackOffStrategyPtr backoff_strategy, XdsConfigTrackerOptRef xds_config_tracker, XdsResourcesDelegateOptRef xds_resources_delegate = absl::nullopt, + EdsResourcesCachePtr eds_resources_cache = nullptr, const std::string& target_xds_authority = ""); ~GrpcMuxImpl() override; @@ -108,6 +109,10 @@ class GrpcMuxImpl : public GrpcStreamCallbacks, genericHandleResponse(message->type_url(), *message, control_plane_stats); } + EdsResourcesCacheOptRef edsResourcesCache() override { + return makeOptRefFromPtr(eds_resources_cache_.get()); + } + GrpcStream& grpcStreamForTest() { return grpc_stream_; } protected: @@ -209,6 +214,7 @@ class GrpcMuxImpl : public GrpcStreamCallbacks, CustomConfigValidatorsPtr config_validators_; XdsConfigTrackerOptRef xds_config_tracker_; XdsResourcesDelegateOptRef xds_resources_delegate_; + EdsResourcesCachePtr eds_resources_cache_; const std::string target_xds_authority_; // True iff Envoy is shutting down; no messages should be sent on the `grpc_stream_` when this is @@ -224,7 +230,8 @@ class GrpcMuxDelta : public GrpcMuxImpl&) override { ENVOY_BUG(false, "unexpected request for on demand update"); } + + EdsResourcesCacheOptRef edsResourcesCache() override { return {}; } }; } // namespace XdsMux diff --git a/test/common/config/grpc_subscription_test_harness.h b/test/common/config/grpc_subscription_test_harness.h index e1e41448ef87..c478901eca49 100644 --- a/test/common/config/grpc_subscription_test_harness.h +++ b/test/common/config/grpc_subscription_test_harness.h @@ -64,15 +64,17 @@ class GrpcSubscriptionTestHarness : public SubscriptionTestHarness { std::unique_ptr(async_client_), dispatcher_, *method_descriptor_, *stats_store_.rootScope(), rate_limit_settings_, local_info_, true, std::move(config_validators_), std::move(backoff_strategy), - /*xds_config_tracker=*/XdsConfigTrackerOptRef()); + /*xds_config_tracker=*/XdsConfigTrackerOptRef(), + /*xds_resources_delegate=*/XdsResourcesDelegateOptRef(), + /*eds_resources_cache=*/nullptr); } else { mux_ = std::make_shared( local_info_, std::unique_ptr(async_client_), dispatcher_, *method_descriptor_, *stats_store_.rootScope(), rate_limit_settings_, true, std::move(config_validators_), std::move(backoff_strategy), /*xds_config_tracker=*/XdsConfigTrackerOptRef(), - /*xds_resources_delegate=*/ - XdsResourcesDelegateOptRef(), + /*xds_resources_delegate=*/XdsResourcesDelegateOptRef(), + /*eds_resources_cache=*/nullptr, /*target_xds_authority=*/""); } subscription_ = std::make_unique( diff --git a/test/extensions/clusters/eds/eds_speed_test.cc b/test/extensions/clusters/eds/eds_speed_test.cc index d30b1e1e1129..68c076f7b6f2 100644 --- a/test/extensions/clusters/eds/eds_speed_test.cc +++ b/test/extensions/clusters/eds/eds_speed_test.cc @@ -58,7 +58,8 @@ class EdsSpeedTest { *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( "envoy.service.endpoint.v3.EndpointDiscoveryService.StreamEndpoints"), scope_, {}, local_info_, true, std::move(config_validators_), std::move(backoff_strategy), - /*xds_config_tracker=*/Config::XdsConfigTrackerOptRef())); + /*xds_config_tracker=*/Config::XdsConfigTrackerOptRef(), + /*xds_resources_delegate=*/{}, /*eds_resources_cache=*/nullptr)); } else { grpc_mux_.reset(new Config::GrpcMuxImpl( local_info_, std::unique_ptr(async_client_), @@ -68,6 +69,7 @@ class EdsSpeedTest { scope_, {}, true, std::move(config_validators_), std::move(backoff_strategy), /*xds_config_tracker=*/Config::XdsConfigTrackerOptRef(), /*xds_resources_delegate=*/Config::XdsResourcesDelegateOptRef(), + /*eds_resources_cache=*/nullptr, /*target_xds_authority=*/"")); } resetCluster(R"EOF( diff --git a/test/extensions/config_subscription/grpc/BUILD b/test/extensions/config_subscription/grpc/BUILD index 46e75f764c04..f043d00891c0 100644 --- a/test/extensions/config_subscription/grpc/BUILD +++ b/test/extensions/config_subscription/grpc/BUILD @@ -22,6 +22,7 @@ envoy_cc_test( "//test/mocks:common_lib", "//test/mocks/config:config_mocks", "//test/mocks/config:custom_config_validators_mocks", + "//test/mocks/config:eds_resources_cache_mocks", "//test/mocks/event:event_mocks", "//test/mocks/grpc:grpc_mocks", "//test/mocks/local_info:local_info_mocks", @@ -43,6 +44,7 @@ envoy_cc_test( "//envoy/config:xds_config_tracker_interface", "//envoy/config:xds_resources_delegate_interface", "//source/common/config:api_version_lib", + "//source/common/config:null_grpc_mux_lib", "//source/common/config:protobuf_link_hacks", "//source/common/protobuf", "//source/common/stats:isolated_store_lib", @@ -51,6 +53,7 @@ envoy_cc_test( "//test/mocks:common_lib", "//test/mocks/config:config_mocks", "//test/mocks/config:custom_config_validators_mocks", + "//test/mocks/config:eds_resources_cache_mocks", "//test/mocks/event:event_mocks", "//test/mocks/grpc:grpc_mocks", "//test/mocks/local_info:local_info_mocks", @@ -76,6 +79,7 @@ envoy_cc_test( "//source/extensions/config_subscription/grpc:grpc_subscription_lib", "//test/mocks:common_lib", "//test/mocks/config:config_mocks", + "//test/mocks/config:eds_resources_cache_mocks", "//test/mocks/event:event_mocks", "//test/mocks/grpc:grpc_mocks", "//test/mocks/local_info:local_info_mocks", @@ -181,6 +185,7 @@ envoy_cc_test( "//test/mocks:common_lib", "//test/mocks/config:config_mocks", "//test/mocks/config:custom_config_validators_mocks", + "//test/mocks/config:eds_resources_cache_mocks", "//test/mocks/event:event_mocks", "//test/mocks/grpc:grpc_mocks", "//test/mocks/local_info:local_info_mocks", @@ -228,6 +233,7 @@ envoy_cc_test( "//source/extensions/config_subscription/grpc:watch_map_lib", "//test/mocks/config:config_mocks", "//test/mocks/config:custom_config_validators_mocks", + "//test/mocks/config:eds_resources_cache_mocks", "//test/test_common:utility_lib", "@envoy_api//envoy/config/endpoint/v3:pkg_cc_proto", "@envoy_api//envoy/service/discovery/v3:pkg_cc_proto", diff --git a/test/extensions/config_subscription/grpc/delta_subscription_impl_test.cc b/test/extensions/config_subscription/grpc/delta_subscription_impl_test.cc index 2b435f510aac..14a7760b6501 100644 --- a/test/extensions/config_subscription/grpc/delta_subscription_impl_test.cc +++ b/test/extensions/config_subscription/grpc/delta_subscription_impl_test.cc @@ -159,13 +159,15 @@ TEST_P(DeltaSubscriptionNoGrpcStreamTest, NoGrpcStream) { std::unique_ptr(async_client), dispatcher, *method_descriptor, *stats_store.rootScope(), rate_limit_settings, local_info, false, std::make_unique>(), std::move(backoff_strategy), - /*xds_config_tracker=*/XdsConfigTrackerOptRef()); + /*xds_config_tracker=*/XdsConfigTrackerOptRef(), + /*eds_resources_cache=*/nullptr); } else { xds_context = std::make_shared( std::unique_ptr(async_client), dispatcher, *method_descriptor, *stats_store.rootScope(), rate_limit_settings, local_info, std::make_unique>(), std::move(backoff_strategy), - /*xds_config_tracker=*/XdsConfigTrackerOptRef()); + /*xds_config_tracker=*/XdsConfigTrackerOptRef(), + /*eds_resources_cache=*/nullptr); } GrpcSubscriptionImplPtr subscription = std::make_unique( diff --git a/test/extensions/config_subscription/grpc/delta_subscription_test_harness.h b/test/extensions/config_subscription/grpc/delta_subscription_test_harness.h index 77c4310137aa..a597f4f7b2dc 100644 --- a/test/extensions/config_subscription/grpc/delta_subscription_test_harness.h +++ b/test/extensions/config_subscription/grpc/delta_subscription_test_harness.h @@ -55,13 +55,15 @@ class DeltaSubscriptionTestHarness : public SubscriptionTestHarness { std::unique_ptr(async_client_), dispatcher_, *method_descriptor_, *stats_store_.rootScope(), rate_limit_settings_, local_info_, false, std::make_unique>(), std::move(backoff_strategy), - /*xds_config_tracker=*/XdsConfigTrackerOptRef()); + /*xds_config_tracker=*/XdsConfigTrackerOptRef(), + /*eds_resources_cache=*/nullptr); } else { xds_context_ = std::make_shared( std::unique_ptr(async_client_), dispatcher_, *method_descriptor_, *stats_store_.rootScope(), rate_limit_settings_, local_info_, std::make_unique>(), std::move(backoff_strategy), - /*xds_config_tracker=*/XdsConfigTrackerOptRef()); + /*xds_config_tracker=*/XdsConfigTrackerOptRef(), + /*eds_resources_cache=*/nullptr); } subscription_ = std::make_unique( xds_context_, callbacks_, resource_decoder_, stats_, diff --git a/test/extensions/config_subscription/grpc/grpc_mux_impl_test.cc b/test/extensions/config_subscription/grpc/grpc_mux_impl_test.cc index af24bf376a88..abe52c8c7098 100644 --- a/test/extensions/config_subscription/grpc/grpc_mux_impl_test.cc +++ b/test/extensions/config_subscription/grpc/grpc_mux_impl_test.cc @@ -8,6 +8,7 @@ #include "source/common/common/empty_string.h" #include "source/common/config/api_version.h" +#include "source/common/config/null_grpc_mux_impl.h" #include "source/common/config/protobuf_link_hacks.h" #include "source/common/config/utility.h" #include "source/common/protobuf/protobuf.h" @@ -17,6 +18,7 @@ #include "test/common/stats/stat_test_utility.h" #include "test/mocks/common.h" #include "test/mocks/config/custom_config_validators.h" +#include "test/mocks/config/eds_resources_cache.h" #include "test/mocks/config/mocks.h" #include "test/mocks/event/mocks.h" #include "test/mocks/grpc/mocks.h" @@ -69,7 +71,8 @@ class GrpcMuxImplTestBase : public testing::Test { SubscriptionFactory::RetryInitialDelayMs, SubscriptionFactory::RetryMaxDelayMs, random_), /*xds_config_tracker=*/XdsConfigTrackerOptRef(), - /*xds_resources_delegate=*/XdsResourcesDelegateOptRef(), /*target_xds_authority=*/""); + /*xds_resources_delegate=*/XdsResourcesDelegateOptRef(), + std::unique_ptr(eds_resources_cache_), /*target_xds_authority=*/""); } void setup(const RateLimitSettings& custom_rate_limit_settings) { @@ -82,7 +85,8 @@ class GrpcMuxImplTestBase : public testing::Test { SubscriptionFactory::RetryInitialDelayMs, SubscriptionFactory::RetryMaxDelayMs, random_), /*xds_config_tracker=*/XdsConfigTrackerOptRef(), - /*xds_resources_delegate=*/XdsResourcesDelegateOptRef(), /*target_xds_authority=*/""); + /*xds_resources_delegate=*/XdsResourcesDelegateOptRef(), + std::unique_ptr(eds_resources_cache_), /*target_xds_authority=*/""); } void expectSendMessage(const std::string& type_url, @@ -123,6 +127,7 @@ class GrpcMuxImplTestBase : public testing::Test { Envoy::Config::RateLimitSettings rate_limit_settings_; Stats::Gauge& control_plane_connected_state_; Stats::Gauge& control_plane_pending_requests_; + MockEdsResourcesCache* eds_resources_cache_{nullptr}; }; class GrpcMuxImplTest : public GrpcMuxImplTestBase { @@ -962,7 +967,7 @@ TEST_F(GrpcMuxImplTest, BadLocalInfoEmptyClusterName) { random_), /*xds_config_tracker=*/XdsConfigTrackerOptRef(), /*xds_resources_delegate=*/XdsResourcesDelegateOptRef(), - /*target_xds_authority=*/""), + /*eds_resources_cache=*/nullptr, /*target_xds_authority=*/""), EnvoyException, "ads: node 'id' and 'cluster' are required. Set it either in 'node' config or via " "--service-node and --service-cluster options."); @@ -981,12 +986,238 @@ TEST_F(GrpcMuxImplTest, BadLocalInfoEmptyNodeName) { SubscriptionFactory::RetryInitialDelayMs, SubscriptionFactory::RetryMaxDelayMs, random_), /*xds_config_tracker=*/XdsConfigTrackerOptRef(), - /*xds_resources_delegate=*/XdsResourcesDelegateOptRef(), /*target_xds_authority=*/""), + /*xds_resources_delegate=*/XdsResourcesDelegateOptRef(), + /*eds_resources_cache=*/nullptr, /*target_xds_authority=*/""), EnvoyException, "ads: node 'id' and 'cluster' are required. Set it either in 'node' config or via " "--service-node and --service-cluster options."); } +// Validates that the EDS cache getter returns the cache. +TEST_F(GrpcMuxImplTest, EdsResourcesCacheForEds) { + eds_resources_cache_ = new NiceMock(); + setup(); + EXPECT_NE({}, grpc_mux_->edsResourcesCache()); +} + +// Validates that the EDS cache getter returns empty if there is no cache. +TEST_F(GrpcMuxImplTest, EdsResourcesCacheForEdsNoCache) { + setup(); + EXPECT_EQ({}, grpc_mux_->edsResourcesCache()); +} + +// Validate that an EDS resource is cached if there's a cache. +TEST_F(GrpcMuxImplTest, CacheEdsResource) { + // Create the cache that will also be passed to the GrpcMux object via setup(). + eds_resources_cache_ = new NiceMock(); + setup(); + + OpaqueResourceDecoderSharedPtr resource_decoder( + std::make_shared>("cluster_name")); + const std::string& type_url = Config::TypeUrl::get().ClusterLoadAssignment; + InSequence s; + auto eds_sub = grpc_mux_->addWatch(type_url, {"x"}, callbacks_, resource_decoder, {}); + + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); + expectSendMessage(type_url, {"x"}, "", true); + grpc_mux_->start(); + + // Reply with the resource, it will be added to the cache. + { + auto response = std::make_unique(); + response->set_type_url(type_url); + response->set_version_info("1"); + envoy::config::endpoint::v3::ClusterLoadAssignment load_assignment; + load_assignment.set_cluster_name("x"); + response->add_resources()->PackFrom(load_assignment); + + EXPECT_CALL(callbacks_, onConfigUpdate(_, "1")) + .WillOnce(Invoke([](const std::vector& resources, const std::string&) { + EXPECT_EQ(1, resources.size()); + })); + EXPECT_CALL(*eds_resources_cache_, setResource("x", ProtoEq(load_assignment))); + expectSendMessage(type_url, {"x"}, "1"); + grpc_mux_->grpcStreamForTest().onReceiveMessage(std::move(response)); + } + + // Envoy will unsubscribe from all resources. + expectSendMessage(type_url, {}, "1"); + EXPECT_CALL(*eds_resources_cache_, removeResource("x")); +} + +// Validate that an update to an EDS resource watcher is reflected in the cache, +// if there's a cache. +TEST_F(GrpcMuxImplTest, UpdateCacheEdsResource) { + // Create the cache that will also be passed to the GrpcMux object via setup(). + eds_resources_cache_ = new NiceMock(); + setup(); + + OpaqueResourceDecoderSharedPtr resource_decoder( + std::make_shared>("cluster_name")); + const std::string& type_url = Config::TypeUrl::get().ClusterLoadAssignment; + InSequence s; + auto eds_sub = grpc_mux_->addWatch(type_url, {"x"}, callbacks_, resource_decoder, {}); + + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); + expectSendMessage(type_url, {"x"}, "", true); + grpc_mux_->start(); + + // Reply with the resource, it will be added to the cache. + { + auto response = std::make_unique(); + response->set_type_url(type_url); + response->set_version_info("1"); + envoy::config::endpoint::v3::ClusterLoadAssignment load_assignment; + load_assignment.set_cluster_name("x"); + response->add_resources()->PackFrom(load_assignment); + + EXPECT_CALL(callbacks_, onConfigUpdate(_, "1")) + .WillOnce(Invoke([](const std::vector& resources, const std::string&) { + EXPECT_EQ(1, resources.size()); + })); + EXPECT_CALL(*eds_resources_cache_, setResource("x", ProtoEq(load_assignment))); + expectSendMessage(type_url, {"x"}, "1"); + grpc_mux_->grpcStreamForTest().onReceiveMessage(std::move(response)); + } + + // Update the cache to another resource. + expectSendMessage(type_url, {}, "1"); + EXPECT_CALL(*eds_resources_cache_, removeResource("x")); + expectSendMessage(type_url, {"y"}, "1"); + eds_sub->update({"y"}); + + // Envoy will unsubscribe from all resources. + expectSendMessage(type_url, {}, "1"); + EXPECT_CALL(*eds_resources_cache_, removeResource("y")); +} + +// Validate that adding and removing watchers reflects on the cache changes, +// if there's a cache. +TEST_F(GrpcMuxImplTest, AddRemoveSubscriptions) { + // Create the cache that will also be passed to the GrpcMux object via setup(). + eds_resources_cache_ = new NiceMock(); + setup(); + + OpaqueResourceDecoderSharedPtr resource_decoder( + std::make_shared>("cluster_name")); + const std::string& type_url = Config::TypeUrl::get().ClusterLoadAssignment; + InSequence s; + + { + auto eds_sub = grpc_mux_->addWatch(type_url, {"x"}, callbacks_, resource_decoder, {}); + + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); + expectSendMessage(type_url, {"x"}, "", true); + grpc_mux_->start(); + + // Reply with the resource, it will be added to the cache. + { + auto response = std::make_unique(); + response->set_type_url(type_url); + response->set_version_info("1"); + envoy::config::endpoint::v3::ClusterLoadAssignment load_assignment; + load_assignment.set_cluster_name("x"); + response->add_resources()->PackFrom(load_assignment); + + EXPECT_CALL(callbacks_, onConfigUpdate(_, "1")) + .WillOnce(Invoke([](const std::vector& resources, + const std::string&) { EXPECT_EQ(1, resources.size()); })); + EXPECT_CALL(*eds_resources_cache_, setResource("x", ProtoEq(load_assignment))); + expectSendMessage(type_url, {"x"}, "1"); // Ack. + grpc_mux_->grpcStreamForTest().onReceiveMessage(std::move(response)); + } + + // Watcher (eds_sub) going out of scope, the resource should be removed, as well as + // the interest. + expectSendMessage(type_url, {}, "1"); + EXPECT_CALL(*eds_resources_cache_, removeResource("x")); + } + + // Update to a new resource interest. + { + expectSendMessage(type_url, {"y"}, "1"); + auto eds_sub2 = grpc_mux_->addWatch(type_url, {"y"}, callbacks_, resource_decoder, {}); + + // Reply with the resource, it will be added to the cache. + { + auto response = std::make_unique(); + response->set_type_url(type_url); + response->set_version_info("2"); + envoy::config::endpoint::v3::ClusterLoadAssignment load_assignment; + load_assignment.set_cluster_name("y"); + response->add_resources()->PackFrom(load_assignment); + + EXPECT_CALL(callbacks_, onConfigUpdate(_, "2")) + .WillOnce(Invoke([](const std::vector& resources, + const std::string&) { EXPECT_EQ(1, resources.size()); })); + EXPECT_CALL(*eds_resources_cache_, setResource("y", ProtoEq(load_assignment))); + expectSendMessage(type_url, {"y"}, "2"); // Ack. + grpc_mux_->grpcStreamForTest().onReceiveMessage(std::move(response)); + } + + // Watcher (eds_sub2) going out of scope, the resource should be removed, as well as + // the interest. + expectSendMessage(type_url, {}, "2"); + EXPECT_CALL(*eds_resources_cache_, removeResource("y")); + } +} + +/** + * Tests the NullGrpcMuxImpl object to increase code-coverage. + */ +class NullGrpcMuxImplTest : public testing::Test { +public: + NullGrpcMuxImplTest() {} + NullGrpcMuxImpl null_mux_; + NiceMock callbacks_; +}; + +TEST_F(NullGrpcMuxImplTest, StartImplemented) { EXPECT_NO_THROW(null_mux_.start()); } + +TEST_F(NullGrpcMuxImplTest, PauseImplemented) { + ScopedResume scoped; + EXPECT_NO_THROW(scoped = null_mux_.pause("ignored")); +} + +TEST_F(NullGrpcMuxImplTest, PauseMultipleArgsImplemented) { + ScopedResume scoped; + const std::vector params = {"ignored", "another_ignored"}; + EXPECT_NO_THROW(scoped = null_mux_.pause(params)); +} + +TEST_F(NullGrpcMuxImplTest, RequestOnDemandNotImplemented) { + EXPECT_ENVOY_BUG(null_mux_.requestOnDemandUpdate("type_url", {"for_update"}), + "unexpected request for on demand update"); +} + +TEST_F(NullGrpcMuxImplTest, AddWatchRaisesException) { + NiceMock callbacks; + OpaqueResourceDecoderSharedPtr resource_decoder( + std::make_shared>("cluster_name")); + + EXPECT_THROW_WITH_REGEX(null_mux_.addWatch("type_url", {}, callbacks, resource_decoder, {}), + EnvoyException, "ADS must be configured to support an ADS config source"); +} + +TEST_F(NullGrpcMuxImplTest, NoEdsResourcesCache) { EXPECT_EQ({}, null_mux_.edsResourcesCache()); } +TEST_F(NullGrpcMuxImplTest, OnWriteableImplemented) { EXPECT_NO_THROW(null_mux_.onWriteable()); } +TEST_F(NullGrpcMuxImplTest, OnStreamEstablishedImplemented) { + EXPECT_NO_THROW(null_mux_.onStreamEstablished()); +} +TEST_F(NullGrpcMuxImplTest, OnEstablishmentFailureImplemented) { + EXPECT_NO_THROW(null_mux_.onEstablishmentFailure()); +} +TEST_F(NullGrpcMuxImplTest, OnDiscoveryResponseImplemented) { + std::unique_ptr response; + Stats::TestUtil::TestStore stats; + ControlPlaneStats cp_stats{Utility::generateControlPlaneStats(*stats.rootScope())}; + EXPECT_NO_THROW(null_mux_.onDiscoveryResponse(std::move(response), cp_stats)); +} + } // namespace } // namespace Config } // namespace Envoy diff --git a/test/extensions/config_subscription/grpc/new_grpc_mux_impl_test.cc b/test/extensions/config_subscription/grpc/new_grpc_mux_impl_test.cc index 3712709f9f73..ebec8abfc6c4 100644 --- a/test/extensions/config_subscription/grpc/new_grpc_mux_impl_test.cc +++ b/test/extensions/config_subscription/grpc/new_grpc_mux_impl_test.cc @@ -17,6 +17,7 @@ #include "test/config/v2_link_hacks.h" #include "test/mocks/common.h" #include "test/mocks/config/custom_config_validators.h" +#include "test/mocks/config/eds_resources_cache.h" #include "test/mocks/config/mocks.h" #include "test/mocks/event/mocks.h" #include "test/mocks/grpc/mocks.h" @@ -70,7 +71,8 @@ class NewGrpcMuxImplTestBase : public testing::TestWithParam { "envoy.service.discovery.v2.AggregatedDiscoveryService.StreamAggregatedResources"), *stats_.rootScope(), rate_limit_settings_, local_info_, false, std::move(config_validators_), std::move(backoff_strategy), - /*xds_config_tracker=*/XdsConfigTrackerOptRef()); + /*xds_config_tracker=*/XdsConfigTrackerOptRef(), + std::unique_ptr(eds_resources_cache_)); return; } grpc_mux_ = std::make_unique( @@ -79,7 +81,8 @@ class NewGrpcMuxImplTestBase : public testing::TestWithParam { "envoy.service.discovery.v3.AggregatedDiscoveryService.StreamAggregatedResources"), *stats_.rootScope(), rate_limit_settings_, local_info_, std::move(config_validators_), std::move(backoff_strategy), - /*xds_config_tracker=*/XdsConfigTrackerOptRef()); + /*xds_config_tracker=*/XdsConfigTrackerOptRef(), + std::unique_ptr(eds_resources_cache_)); } void expectSendMessage(const std::string& type_url, @@ -172,6 +175,7 @@ class NewGrpcMuxImplTestBase : public testing::TestWithParam { ControlPlaneStats control_plane_stats_; Stats::Gauge& control_plane_connected_state_; bool should_use_unified_; + MockEdsResourcesCache* eds_resources_cache_{nullptr}; }; class NewGrpcMuxImplTest : public NewGrpcMuxImplTestBase { @@ -561,6 +565,195 @@ TEST_P(NewGrpcMuxImplTest, Shutdown) { // There won't be any unsubscribe messages for the legacy mux either for the same reason } +// Validates that the EDS cache getter returns the cache. +TEST_P(NewGrpcMuxImplTest, EdsResourcesCacheForEds) { + eds_resources_cache_ = new NiceMock(); + setup(); + EXPECT_NE({}, grpc_mux_->edsResourcesCache()); +} + +// Validates that the EDS cache getter returns empty if there is no cache. +TEST_P(NewGrpcMuxImplTest, EdsResourcesCacheForEdsNoCache) { + setup(); + EXPECT_EQ({}, grpc_mux_->edsResourcesCache()); +} + +// Validate that an EDS resource is cached if there's a cache. +TEST_P(NewGrpcMuxImplTest, CacheEdsResource) { + // Create the cache that will also be passed to the GrpcMux object via setup(). + eds_resources_cache_ = new NiceMock(); + setup(); + InSequence s; + const std::string& type_url = Config::TypeUrl::get().ClusterLoadAssignment; + auto watch = grpc_mux_->addWatch(type_url, {"x"}, callbacks_, resource_decoder_, {}); + + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); + expectSendMessage(type_url, {"x"}, {}); + grpc_mux_->start(); + + // Reply with the resource, it will be added to the cache. + { + auto response = std::make_unique(); + response->set_type_url(type_url); + response->set_system_version_info("1"); + envoy::config::endpoint::v3::ClusterLoadAssignment load_assignment; + auto res = response->add_resources(); + res->set_name("x"); + res->set_version("1"); + load_assignment.set_cluster_name("x"); + res->mutable_resource()->PackFrom(load_assignment); + + EXPECT_CALL(callbacks_, onConfigUpdate(_, _, "1")) + .WillOnce(Invoke([&load_assignment](const std::vector& added_resources, + const Protobuf::RepeatedPtrField&, + const std::string&) { + EXPECT_EQ(1, added_resources.size()); + EXPECT_TRUE( + TestUtility::protoEqual(added_resources[0].get().resource(), load_assignment)); + })); + EXPECT_CALL(*eds_resources_cache_, setResource("x", ProtoEq(load_assignment))); + expectSendMessage(type_url, {}, {}); // Ack. + onDiscoveryResponse(std::move(response)); + } + + // Envoy will unsubscribe from all resources. + EXPECT_CALL(*eds_resources_cache_, removeResource("x")); + expectSendMessage(type_url, {}, {"x"}); +} + +// Validate that an update to an EDS resource watcher is reflected in the cache, +// if there's a cache. +TEST_P(NewGrpcMuxImplTest, UpdateCacheEdsResource) { + // Create the cache that will also be passed to the GrpcMux object via setup(). + eds_resources_cache_ = new NiceMock(); + setup(); + InSequence s; + const std::string& type_url = Config::TypeUrl::get().ClusterLoadAssignment; + auto watch = grpc_mux_->addWatch(type_url, {"x"}, callbacks_, resource_decoder_, {}); + + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); + expectSendMessage(type_url, {"x"}, {}); + grpc_mux_->start(); + + // Reply with the resource, it will be added to the cache. + { + auto response = std::make_unique(); + response->set_type_url(type_url); + response->set_system_version_info("1"); + envoy::config::endpoint::v3::ClusterLoadAssignment load_assignment; + auto res = response->add_resources(); + res->set_name("x"); + res->set_version("1"); + load_assignment.set_cluster_name("x"); + res->mutable_resource()->PackFrom(load_assignment); + + EXPECT_CALL(callbacks_, onConfigUpdate(_, _, "1")) + .WillOnce(Invoke([&load_assignment](const std::vector& added_resources, + const Protobuf::RepeatedPtrField&, + const std::string&) { + EXPECT_EQ(1, added_resources.size()); + EXPECT_TRUE( + TestUtility::protoEqual(added_resources[0].get().resource(), load_assignment)); + })); + EXPECT_CALL(*eds_resources_cache_, setResource("x", ProtoEq(load_assignment))); + expectSendMessage(type_url, {}, {}); // Ack. + onDiscoveryResponse(std::move(response)); + } + + // Update the cache to another resource. + EXPECT_CALL(*eds_resources_cache_, removeResource("x")); + expectSendMessage(type_url, {"y"}, {"x"}); + watch->update({"y"}); + + // Envoy will unsubscribe from all resources. + EXPECT_CALL(*eds_resources_cache_, removeResource("y")); + expectSendMessage(type_url, {}, {"y"}); +} + +// Validate that adding and removing watchers reflects on the cache changes, +// if there's a cache. +TEST_P(NewGrpcMuxImplTest, AddRemoveSubscriptions) { + // Create the cache that will also be passed to the GrpcMux object via setup(). + eds_resources_cache_ = new NiceMock(); + setup(); + InSequence s; + const std::string& type_url = Config::TypeUrl::get().ClusterLoadAssignment; + + { + auto watch1 = grpc_mux_->addWatch(type_url, {"x"}, callbacks_, resource_decoder_, {}); + + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); + expectSendMessage(type_url, {"x"}, {}); + grpc_mux_->start(); + + // Reply with the resource, it will be added to the cache. + { + auto response = std::make_unique(); + response->set_type_url(type_url); + response->set_system_version_info("1"); + envoy::config::endpoint::v3::ClusterLoadAssignment load_assignment; + auto res = response->add_resources(); + res->set_name("x"); + res->set_version("1"); + load_assignment.set_cluster_name("x"); + res->mutable_resource()->PackFrom(load_assignment); + + EXPECT_CALL(callbacks_, onConfigUpdate(_, _, "1")) + .WillOnce(Invoke([&load_assignment]( + const std::vector& added_resources, + const Protobuf::RepeatedPtrField&, const std::string&) { + EXPECT_EQ(1, added_resources.size()); + EXPECT_TRUE( + TestUtility::protoEqual(added_resources[0].get().resource(), load_assignment)); + })); + EXPECT_CALL(*eds_resources_cache_, setResource("x", ProtoEq(load_assignment))); + expectSendMessage(type_url, {}, {}); // Ack. + onDiscoveryResponse(std::move(response)); + } + + // Watcher (watch1) going out of scope, the resource should be removed, as well as + // the interest. + EXPECT_CALL(*eds_resources_cache_, removeResource("x")); + expectSendMessage(type_url, {}, {"x"}); + } + + // Update to a new resource interest. + { + expectSendMessage(type_url, {"y"}, {}); + auto watch2 = grpc_mux_->addWatch(type_url, {"y"}, callbacks_, resource_decoder_, {}); + + // Reply with the resource, it will be added to the cache. + { + auto response = std::make_unique(); + response->set_type_url(type_url); + response->set_system_version_info("2"); + envoy::config::endpoint::v3::ClusterLoadAssignment load_assignment; + auto res = response->add_resources(); + res->set_name("y"); + res->set_version("2"); + load_assignment.set_cluster_name("y"); + res->mutable_resource()->PackFrom(load_assignment); + + EXPECT_CALL(callbacks_, onConfigUpdate(_, _, "2")) + .WillOnce(Invoke([&load_assignment]( + const std::vector& added_resources, + const Protobuf::RepeatedPtrField&, const std::string&) { + EXPECT_EQ(1, added_resources.size()); + EXPECT_TRUE( + TestUtility::protoEqual(added_resources[0].get().resource(), load_assignment)); + })); + EXPECT_CALL(*eds_resources_cache_, setResource("y", ProtoEq(load_assignment))); + expectSendMessage(type_url, {}, {}); // Ack. + onDiscoveryResponse(std::move(response)); + } + + // Watcher (watch2) going out of scope, the resource should be removed, as well as + // the interest. + EXPECT_CALL(*eds_resources_cache_, removeResource("y")); + expectSendMessage(type_url, {}, {"y"}); + } +} + } // namespace } // namespace Config } // namespace Envoy diff --git a/test/extensions/config_subscription/grpc/watch_map_test.cc b/test/extensions/config_subscription/grpc/watch_map_test.cc index dcdbe9a418ee..771c26cea73c 100644 --- a/test/extensions/config_subscription/grpc/watch_map_test.cc +++ b/test/extensions/config_subscription/grpc/watch_map_test.cc @@ -9,6 +9,7 @@ #include "source/extensions/config_subscription/grpc/watch_map.h" #include "test/mocks/config/custom_config_validators.h" +#include "test/mocks/config/eds_resources_cache.h" #include "test/mocks/config/mocks.h" #include "test/test_common/utility.h" @@ -127,7 +128,7 @@ TEST(WatchMapTest, Basic) { TestUtility::TestOpaqueResourceDecoderImpl resource_decoder("cluster_name"); NiceMock config_validators; - WatchMap watch_map(false, "ClusterLoadAssignmentType", config_validators); + WatchMap watch_map(false, "ClusterLoadAssignmentType", config_validators, {}); Watch* watch = watch_map.addWatch(callbacks, resource_decoder); { @@ -201,7 +202,7 @@ TEST(WatchMapTest, Overlap) { TestUtility::TestOpaqueResourceDecoderImpl resource_decoder("cluster_name"); NiceMock config_validators; - WatchMap watch_map(false, "ClusterLoadAssignmentType", config_validators); + WatchMap watch_map(false, "ClusterLoadAssignmentType", config_validators, {}); Watch* watch1 = watch_map.addWatch(callbacks1, resource_decoder); Watch* watch2 = watch_map.addWatch(callbacks2, resource_decoder); @@ -259,12 +260,99 @@ TEST(WatchMapTest, Overlap) { } } +// Checks that a resource is added to the cache the first time it is received, +// and removed when there's no more interest. +TEST(WatchMapTest, CacheResourceAddResource) { + MockSubscriptionCallbacks callbacks1; + MockSubscriptionCallbacks callbacks2; + TestUtility::TestOpaqueResourceDecoderImpl + resource_decoder("cluster_name"); + NiceMock config_validators; + NiceMock eds_resources_cache; + const std::string eds_type_url = + Config::getTypeUrl(); + WatchMap watch_map(false, eds_type_url, config_validators, + makeOptRef(eds_resources_cache)); + // The test uses 2 watchers to ensure that interest is kept regardless of + // which watcher was the first to add a watch for the assignment. + Watch* watch1 = watch_map.addWatch(callbacks1, resource_decoder); + Watch* watch2 = watch_map.addWatch(callbacks2, resource_decoder); + + Protobuf::RepeatedPtrField updated_resources; + envoy::config::endpoint::v3::ClusterLoadAssignment alice; + alice.set_cluster_name("alice"); + updated_resources.Add()->PackFrom(alice); + + // First watch becomes interested. + { + absl::flat_hash_set update_to({"alice", "dummy"}); + // "alice" isn't known - no need to remove from the cache. + EXPECT_CALL(eds_resources_cache, removeResource("alice")).Times(0); + AddedRemoved added_removed = watch_map.updateWatchInterest(watch1, update_to); + EXPECT_EQ(update_to, added_removed.added_); // add to subscription + EXPECT_TRUE(added_removed.removed_.empty()); + watch_map.updateWatchInterest(watch2, {"dummy"}); + + // *Only* first watch receives update. + expectDeltaAndSotwUpdate(callbacks1, {alice}, {}, "version1"); + expectNoUpdate(callbacks2, "version1"); + // A call for SotW and a call for Delta to the cache's setResource method. + EXPECT_CALL(eds_resources_cache, setResource("alice", ProtoEq(alice))).Times(2); + doDeltaAndSotwUpdate(watch_map, updated_resources, {}, "version1"); + } + // Second watch becomes interested. + { + absl::flat_hash_set update_to({"alice", "dummy"}); + // "alice" is known, and there's still interest - no removal. + EXPECT_CALL(eds_resources_cache, removeResource("alice")).Times(0); + AddedRemoved added_removed = watch_map.updateWatchInterest(watch2, update_to); + EXPECT_TRUE(added_removed.added_.empty()); // nothing happens + EXPECT_TRUE(added_removed.removed_.empty()); + + // Both watches receive update. + expectDeltaAndSotwUpdate(callbacks1, {alice}, {}, "version2"); + expectDeltaAndSotwUpdate(callbacks2, {alice}, {}, "version2"); + // A call for SotW and a call for Delta to the cache's setResource method. + EXPECT_CALL(eds_resources_cache, setResource("alice", ProtoEq(alice))).Times(2); + doDeltaAndSotwUpdate(watch_map, updated_resources, {}, "version2"); + } + // First watch loses interest. + { + // "alice" is known, and there's still interest - no removal. + EXPECT_CALL(eds_resources_cache, removeResource("alice")).Times(0); + AddedRemoved added_removed = watch_map.updateWatchInterest(watch1, {"dummy"}); + EXPECT_TRUE(added_removed.added_.empty()); // nothing happens + EXPECT_TRUE(added_removed.removed_.empty()); + + // Both watches receive the update. For watch2, this is obviously desired. + expectDeltaAndSotwUpdate(callbacks2, {alice}, {}, "version3"); + // For watch1, it's more subtle: the WatchMap sees that this update has no + // resources watch1 cares about, but also knows that watch1 previously had + // some resources. So, it must inform watch1 that it now has no resources. + // (SotW only: delta's explicit removals avoid the need for this guessing.) + expectEmptySotwNoDeltaUpdate(callbacks1, "version3"); + // A call for SotW and a call for Delta to the cache's setResource method. + EXPECT_CALL(eds_resources_cache, setResource("alice", ProtoEq(alice))).Times(2); + doDeltaAndSotwUpdate(watch_map, updated_resources, {}, "version3"); + } + // Second watch loses interest. + { + // A call for the cache's removeResource method as there's no more + // interest in "alice". + EXPECT_CALL(eds_resources_cache, removeResource("alice")); + AddedRemoved added_removed = watch_map.updateWatchInterest(watch2, {"dummy"}); + EXPECT_TRUE(added_removed.added_.empty()); + EXPECT_EQ(absl::flat_hash_set({"alice"}), + added_removed.removed_); // remove from subscription + } +} + // These are regression tests for #11877, validate that when two watches point at the same // watched resource, and an update to one of the watches removes one or both of them, that // WatchMap defers deletes and doesn't crash. class SameWatchRemoval : public testing::Test { public: - SameWatchRemoval() : watch_map_(false, "ClusterLoadAssignmentType", config_validators) {} + SameWatchRemoval() : watch_map_(false, "ClusterLoadAssignmentType", config_validators, {}) {} void SetUp() override { envoy::config::endpoint::v3::ClusterLoadAssignment alice; @@ -343,7 +431,7 @@ TEST(WatchMapTest, AddRemoveAdd) { TestUtility::TestOpaqueResourceDecoderImpl resource_decoder("cluster_name"); NiceMock config_validators; - WatchMap watch_map(false, "ClusterLoadAssignmentType", config_validators); + WatchMap watch_map(false, "ClusterLoadAssignmentType", config_validators, {}); Watch* watch1 = watch_map.addWatch(callbacks1, resource_decoder); Watch* watch2 = watch_map.addWatch(callbacks2, resource_decoder); @@ -400,7 +488,7 @@ TEST(WatchMapTest, UninterestingUpdate) { TestUtility::TestOpaqueResourceDecoderImpl resource_decoder("cluster_name"); NiceMock config_validators; - WatchMap watch_map(false, "ClusterLoadAssignmentType", config_validators); + WatchMap watch_map(false, "ClusterLoadAssignmentType", config_validators, {}); Watch* watch = watch_map.addWatch(callbacks, resource_decoder); watch_map.updateWatchInterest(watch, {"alice"}); @@ -445,7 +533,7 @@ TEST(WatchMapTest, WatchingEverything) { TestUtility::TestOpaqueResourceDecoderImpl resource_decoder("cluster_name"); NiceMock config_validators; - WatchMap watch_map(false, "ClusterLoadAssignmentType", config_validators); + WatchMap watch_map(false, "ClusterLoadAssignmentType", config_validators, {}); /*Watch* watch1 = */ watch_map.addWatch(callbacks1, resource_decoder); Watch* watch2 = watch_map.addWatch(callbacks2, resource_decoder); // watch1 never specifies any names, and so is treated as interested in everything. @@ -482,7 +570,7 @@ TEST(WatchMapTest, DeltaOnConfigUpdate) { TestUtility::TestOpaqueResourceDecoderImpl resource_decoder("cluster_name"); NiceMock config_validators; - WatchMap watch_map(false, "ClusterLoadAssignmentType", config_validators); + WatchMap watch_map(false, "ClusterLoadAssignmentType", config_validators, {}); Watch* watch1 = watch_map.addWatch(callbacks1, resource_decoder); Watch* watch2 = watch_map.addWatch(callbacks2, resource_decoder); Watch* watch3 = watch_map.addWatch(callbacks3, resource_decoder); @@ -516,7 +604,7 @@ TEST(WatchMapTest, DeltaOnConfigUpdate) { TEST(WatchMapTest, OnConfigUpdateFailed) { NiceMock config_validators; - WatchMap watch_map(false, "ClusterLoadAssignmentType", config_validators); + WatchMap watch_map(false, "ClusterLoadAssignmentType", config_validators, {}); // calling on empty map doesn't break watch_map.onConfigUpdateFailed(ConfigUpdateFailureReason::UpdateRejected, nullptr); @@ -538,7 +626,7 @@ TEST(WatchMapTest, OnConfigUpdateXdsTpGlobCollections) { TestUtility::TestOpaqueResourceDecoderImpl resource_decoder("cluster_name"); NiceMock config_validators; - WatchMap watch_map(false, "ClusterLoadAssignmentType", config_validators); + WatchMap watch_map(false, "ClusterLoadAssignmentType", config_validators, {}); Watch* watch = watch_map.addWatch(callbacks, resource_decoder); watch_map.updateWatchInterest(watch, {"xdstp://foo/bar/baz/*?some=thing&thing=some"}); @@ -583,7 +671,7 @@ TEST(WatchMapTest, OnConfigUpdateXdsTpSingletons) { TestUtility::TestOpaqueResourceDecoderImpl resource_decoder("cluster_name"); NiceMock config_validators; - WatchMap watch_map(false, "ClusterLoadAssignmentType", config_validators); + WatchMap watch_map(false, "ClusterLoadAssignmentType", config_validators, {}); Watch* watch = watch_map.addWatch(callbacks, resource_decoder); watch_map.updateWatchInterest(watch, {"xdstp://foo/bar/baz?some=thing&thing=some"}); @@ -624,7 +712,7 @@ TEST(WatchMapTest, OnConfigUpdateUsingNamespaces) { TestUtility::TestOpaqueResourceDecoderImpl resource_decoder("cluster_name"); NiceMock config_validators; - WatchMap watch_map(true, "ClusterLoadAssignmentType", config_validators); + WatchMap watch_map(true, "ClusterLoadAssignmentType", config_validators, {}); Watch* watch1 = watch_map.addWatch(callbacks1, resource_decoder); Watch* watch2 = watch_map.addWatch(callbacks2, resource_decoder); Watch* watch3 = watch_map.addWatch(callbacks3, resource_decoder); @@ -677,6 +765,10 @@ TEST(WatchMapTest, OnConfigUpdateUsingNamespaces) { } } +// TODO(adip): Add tests that use the eds cache. +// Needs to test the following function onConfigUpdate (sotw&delta) and +// updateWatchInterest + } // namespace } // namespace Config } // namespace Envoy diff --git a/test/extensions/config_subscription/grpc/xds_grpc_mux_impl_test.cc b/test/extensions/config_subscription/grpc/xds_grpc_mux_impl_test.cc index 8530afb7a64f..0ecb2e8cad5e 100644 --- a/test/extensions/config_subscription/grpc/xds_grpc_mux_impl_test.cc +++ b/test/extensions/config_subscription/grpc/xds_grpc_mux_impl_test.cc @@ -17,6 +17,7 @@ #include "test/config/v2_link_hacks.h" #include "test/mocks/common.h" #include "test/mocks/config/custom_config_validators.h" +#include "test/mocks/config/eds_resources_cache.h" #include "test/mocks/config/mocks.h" #include "test/mocks/event/mocks.h" #include "test/mocks/grpc/mocks.h" @@ -58,17 +59,7 @@ class GrpcMuxImplTestBase : public testing::Test { control_plane_pending_requests_(stats_.gauge("control_plane.pending_requests", Stats::Gauge::ImportMode::NeverImport)) {} - void setup() { - grpc_mux_ = std::make_unique( - std::unique_ptr(async_client_), dispatcher_, - *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( - "envoy.service.discovery.v2.AggregatedDiscoveryService.StreamAggregatedResources"), - *stats_.rootScope(), rate_limit_settings_, local_info_, true, std::move(config_validators_), - std::make_unique( - SubscriptionFactory::RetryInitialDelayMs, SubscriptionFactory::RetryMaxDelayMs, - random_), - /*xds_config_tracker=*/XdsConfigTrackerOptRef()); - } + void setup() { setup(rate_limit_settings_); } void setup(const RateLimitSettings& custom_rate_limit_settings) { grpc_mux_ = std::make_unique( @@ -80,7 +71,9 @@ class GrpcMuxImplTestBase : public testing::Test { std::make_unique( SubscriptionFactory::RetryInitialDelayMs, SubscriptionFactory::RetryMaxDelayMs, random_), - /*xds_config_tracker=*/XdsConfigTrackerOptRef()); + /*xds_config_tracker=*/XdsConfigTrackerOptRef(), + /*xds_resources_delegate=*/XdsResourcesDelegateOptRef(), + std::unique_ptr(eds_resources_cache_), /*target_xds_authority=*/""); } void expectSendMessage(const std::string& type_url, @@ -138,6 +131,7 @@ class GrpcMuxImplTestBase : public testing::Test { Envoy::Config::RateLimitSettings rate_limit_settings_; Stats::Gauge& control_plane_connected_state_; Stats::Gauge& control_plane_pending_requests_; + MockEdsResourcesCache* eds_resources_cache_{nullptr}; }; class GrpcMuxImplTest : public GrpcMuxImplTestBase { @@ -1045,6 +1039,177 @@ TEST_F(GrpcMuxImplTest, AllMuxesStateTest) { EXPECT_TRUE(grpc_mux_1->isShutdown()); } +// Validates that the EDS cache getter returns the cache. +TEST_F(GrpcMuxImplTest, EdsResourcesCacheForEds) { + eds_resources_cache_ = new NiceMock(); + setup(); + EXPECT_NE({}, grpc_mux_->edsResourcesCache()); +} + +// Validates that the EDS cache getter returns empty if there is no cache. +TEST_F(GrpcMuxImplTest, EdsResourcesCacheForEdsNoCache) { + setup(); + EXPECT_EQ({}, grpc_mux_->edsResourcesCache()); +} + +// Validate that an EDS resource is cached if there's a cache. +TEST_F(GrpcMuxImplTest, CacheEdsResource) { + // Create the cache that will also be passed to the GrpcMux object via setup(). + eds_resources_cache_ = new NiceMock(); + setup(); + + OpaqueResourceDecoderSharedPtr resource_decoder( + std::make_shared>("cluster_name")); + const std::string& type_url = Config::TypeUrl::get().ClusterLoadAssignment; + InSequence s; + auto eds_sub = makeWatch(type_url, {"x"}); + + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); + expectSendMessage(type_url, {"x"}, "", true); + grpc_mux_->start(); + + // Reply with the resource, it will be added to the cache. + { + auto response = std::make_unique(); + response->set_type_url(type_url); + response->set_version_info("1"); + envoy::config::endpoint::v3::ClusterLoadAssignment load_assignment; + load_assignment.set_cluster_name("x"); + response->add_resources()->PackFrom(load_assignment); + + EXPECT_CALL(callbacks_, onConfigUpdate(_, "1")) + .WillOnce(Invoke([](const std::vector& resources, const std::string&) { + EXPECT_EQ(1, resources.size()); + })); + EXPECT_CALL(*eds_resources_cache_, setResource("x", ProtoEq(load_assignment))); + expectSendMessage(type_url, {"x"}, "1"); + grpc_mux_->onDiscoveryResponse(std::move(response), control_plane_stats_); + } + + // Envoy will unsubscribe from all resources. + EXPECT_CALL(*eds_resources_cache_, removeResource("x")); + expectSendMessage(type_url, {}, "1"); +} + +// Validate that an update to an EDS resource watcher is reflected in the cache, +// if there's a cache. +TEST_F(GrpcMuxImplTest, UpdateCacheEdsResource) { + // Create the cache that will also be passed to the GrpcMux object via setup(). + eds_resources_cache_ = new NiceMock(); + setup(); + + OpaqueResourceDecoderSharedPtr resource_decoder( + std::make_shared>("cluster_name")); + const std::string& type_url = Config::TypeUrl::get().ClusterLoadAssignment; + InSequence s; + auto eds_sub = makeWatch(type_url, {"x"}); + + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); + expectSendMessage(type_url, {"x"}, "", true); + grpc_mux_->start(); + + // Reply with the resource, it will be added to the cache. + { + auto response = std::make_unique(); + response->set_type_url(type_url); + response->set_version_info("1"); + envoy::config::endpoint::v3::ClusterLoadAssignment load_assignment; + load_assignment.set_cluster_name("x"); + response->add_resources()->PackFrom(load_assignment); + + EXPECT_CALL(callbacks_, onConfigUpdate(_, "1")) + .WillOnce(Invoke([](const std::vector& resources, const std::string&) { + EXPECT_EQ(1, resources.size()); + })); + EXPECT_CALL(*eds_resources_cache_, setResource("x", ProtoEq(load_assignment))); + expectSendMessage(type_url, {"x"}, "1"); + grpc_mux_->onDiscoveryResponse(std::move(response), control_plane_stats_); + } + + // Update the cache to another resource. + EXPECT_CALL(*eds_resources_cache_, removeResource("x")); + expectSendMessage(type_url, {"y"}, "1"); + eds_sub->update({"y"}); + + // Envoy will unsubscribe from all resources. + EXPECT_CALL(*eds_resources_cache_, removeResource("y")); + expectSendMessage(type_url, {}, "1"); +} + +// Validate that adding and removing watchers reflects on the cache changes, +// if there's a cache. +TEST_F(GrpcMuxImplTest, AddRemoveSubscriptions) { + // Create the cache that will also be passed to the GrpcMux object via setup(). + eds_resources_cache_ = new NiceMock(); + setup(); + + OpaqueResourceDecoderSharedPtr resource_decoder( + std::make_shared>("cluster_name")); + const std::string& type_url = Config::TypeUrl::get().ClusterLoadAssignment; + InSequence s; + + { + auto eds_sub = makeWatch(type_url, {"x"}); + + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); + expectSendMessage(type_url, {"x"}, "", true); + grpc_mux_->start(); + + // Reply with the resource, it will be added to the cache. + { + auto response = std::make_unique(); + response->set_type_url(type_url); + response->set_version_info("1"); + envoy::config::endpoint::v3::ClusterLoadAssignment load_assignment; + load_assignment.set_cluster_name("x"); + response->add_resources()->PackFrom(load_assignment); + + EXPECT_CALL(callbacks_, onConfigUpdate(_, "1")) + .WillOnce(Invoke([](const std::vector& resources, + const std::string&) { EXPECT_EQ(1, resources.size()); })); + EXPECT_CALL(*eds_resources_cache_, setResource("x", ProtoEq(load_assignment))); + expectSendMessage(type_url, {"x"}, "1"); // Ack. + grpc_mux_->onDiscoveryResponse(std::move(response), control_plane_stats_); + } + + // Watcher (eds_sub) going out of scope, the resource should be removed, as well as + // the interest. + EXPECT_CALL(*eds_resources_cache_, removeResource("x")); + expectSendMessage(type_url, {}, "1"); + } + + // Update to a new resource interest. + { + expectSendMessage(type_url, {"y"}, "1"); + auto eds_sub2 = makeWatch(type_url, {"y"}); + + // Reply with the resource, it will be added to the cache. + { + auto response = std::make_unique(); + response->set_type_url(type_url); + response->set_version_info("2"); + envoy::config::endpoint::v3::ClusterLoadAssignment load_assignment; + load_assignment.set_cluster_name("y"); + response->add_resources()->PackFrom(load_assignment); + + EXPECT_CALL(callbacks_, onConfigUpdate(_, "2")) + .WillOnce(Invoke([](const std::vector& resources, + const std::string&) { EXPECT_EQ(1, resources.size()); })); + EXPECT_CALL(*eds_resources_cache_, setResource("y", ProtoEq(load_assignment))); + expectSendMessage(type_url, {"y"}, "2"); // Ack. + grpc_mux_->onDiscoveryResponse(std::move(response), control_plane_stats_); + } + + // Watcher (eds_sub2) going out of scope, the resource should be removed, as well as + // the interest. + EXPECT_CALL(*eds_resources_cache_, removeResource("y")); + expectSendMessage(type_url, {}, "2"); + } +} + class NullGrpcMuxImplTest : public testing::Test { public: NullGrpcMuxImplTest() : null_mux_(std::make_unique()) {} @@ -1080,6 +1245,8 @@ TEST_F(NullGrpcMuxImplTest, AddWatchRaisesException) { EnvoyException, "ADS must be configured to support an ADS config source"); } +TEST_F(NullGrpcMuxImplTest, NoEdsResourcesCache) { EXPECT_EQ({}, null_mux_->edsResourcesCache()); } + } // namespace } // namespace XdsMux } // namespace Config diff --git a/test/mocks/config/BUILD b/test/mocks/config/BUILD index ea9057e1aed7..82c7ba41ecf0 100644 --- a/test/mocks/config/BUILD +++ b/test/mocks/config/BUILD @@ -35,3 +35,13 @@ envoy_cc_mock( "//envoy/config:config_validator_interface", ], ) + +envoy_cc_mock( + name = "eds_resources_cache_mocks", + srcs = ["eds_resources_cache.cc"], + hdrs = ["eds_resources_cache.h"], + deps = [ + "//envoy/config:eds_resources_cache_interface", + "//source/common/common:logger_lib", + ], +) diff --git a/test/mocks/config/eds_resources_cache.cc b/test/mocks/config/eds_resources_cache.cc new file mode 100644 index 000000000000..1f11831228d5 --- /dev/null +++ b/test/mocks/config/eds_resources_cache.cc @@ -0,0 +1,16 @@ +#include "test/mocks/config/eds_resources_cache.h" + +#include "source/common/common/logger.h" + +namespace Envoy { +namespace Config { +using testing::_; +using testing::Return; + +using testing::Invoke; +MockEdsResourcesCache::MockEdsResourcesCache() { + ON_CALL(*this, getResource(_, _)).WillByDefault(Return(absl::nullopt)); +} + +} // namespace Config +} // namespace Envoy diff --git a/test/mocks/config/eds_resources_cache.h b/test/mocks/config/eds_resources_cache.h new file mode 100644 index 000000000000..72661c0312bd --- /dev/null +++ b/test/mocks/config/eds_resources_cache.h @@ -0,0 +1,31 @@ +#pragma once + +#include "envoy/config/eds_resources_cache.h" + +#include "gmock/gmock.h" + +namespace Envoy { +namespace Config { + +class MockEdsResourcesCache : public EdsResourcesCache { +public: + MockEdsResourcesCache(); + ~MockEdsResourcesCache() override = default; + + MOCK_METHOD(void, setResource, + (absl::string_view resource_name, + const envoy::config::endpoint::v3::ClusterLoadAssignment& resource)); + MOCK_METHOD(void, removeResource, (absl::string_view resource_name)); + MOCK_METHOD(OptRef, getResource, + (absl::string_view resource_name, EdsResourceRemovalCallback* removal_cb)); + MOCK_METHOD(void, removeCallback, + (absl::string_view resource_name, EdsResourceRemovalCallback* removal_cb)); + MOCK_METHOD(uint32_t, cacheSizeForTest, (), (const)); + + MOCK_METHOD(void, setExpiryTimer, + (absl::string_view resource_name, std::chrono::milliseconds ms)); + MOCK_METHOD(void, disableExpiryTimer, (absl::string_view resource_name)); +}; + +} // namespace Config +} // namespace Envoy diff --git a/test/mocks/config/mocks.h b/test/mocks/config/mocks.h index 4147b4046f1d..d6fe23e3c3f9 100644 --- a/test/mocks/config/mocks.h +++ b/test/mocks/config/mocks.h @@ -129,6 +129,8 @@ class MockGrpcMux : public GrpcMux { const absl::flat_hash_set& add_these_names)); MOCK_METHOD(bool, paused, (const std::string& type_url), (const)); + + MOCK_METHOD(EdsResourcesCacheOptRef, edsResourcesCache, ()); }; class MockGrpcStreamCallbacks