diff --git a/envoy/config/BUILD b/envoy/config/BUILD index 1944cb4ff14d..5ef3fea5f5cf 100644 --- a/envoy/config/BUILD +++ b/envoy/config/BUILD @@ -66,6 +66,7 @@ envoy_cc_library( name = "grpc_mux_interface", hdrs = ["grpc_mux.h"], deps = [ + ":eds_resources_cache_interface", ":subscription_interface", "//envoy/stats:stats_macros", "//source/common/common:cleanup_lib", diff --git a/envoy/config/grpc_mux.h b/envoy/config/grpc_mux.h index e945fe5e3d32..a08724fcdf6c 100644 --- a/envoy/config/grpc_mux.h +++ b/envoy/config/grpc_mux.h @@ -4,6 +4,7 @@ #include "envoy/common/exception.h" #include "envoy/common/pure.h" +#include "envoy/config/eds_resources_cache.h" #include "envoy/config/subscription.h" #include "envoy/stats/stats_macros.h" @@ -105,6 +106,12 @@ class GrpcMux { virtual void requestOnDemandUpdate(const std::string& type_url, const absl::flat_hash_set& for_update) PURE; + + /** + * Returns an EdsResourcesCache for this GrpcMux if there is one. + * @return EdsResourcesCacheOptRef optional eds resources cache for the gRPC-mux. + */ + virtual EdsResourcesCacheOptRef edsResourcesCache() PURE; }; using GrpcMuxPtr = std::unique_ptr; diff --git a/envoy/config/subscription_factory.h b/envoy/config/subscription_factory.h index c59a648a44d3..de9d02100ef3 100644 --- a/envoy/config/subscription_factory.h +++ b/envoy/config/subscription_factory.h @@ -121,7 +121,7 @@ class MuxFactory : public Config::UntypedFactory { const LocalInfo::LocalInfo& local_info, std::unique_ptr&& config_validators, BackOffStrategyPtr&& backoff_strategy, OptRef xds_config_tracker, - OptRef xds_resources_delegate) PURE; + OptRef xds_resources_delegate, bool use_eds_resources_cache) PURE; }; } // namespace Config diff --git a/source/common/config/null_grpc_mux_impl.h b/source/common/config/null_grpc_mux_impl.h index b4211e3a21ec..5e82f42e5447 100644 --- a/source/common/config/null_grpc_mux_impl.h +++ b/source/common/config/null_grpc_mux_impl.h @@ -27,6 +27,8 @@ class NullGrpcMuxImpl : public GrpcMux, ENVOY_BUG(false, "unexpected request for on demand update"); } + EdsResourcesCacheOptRef edsResourcesCache() override { return absl::nullopt; } + void onWriteable() override {} void onStreamEstablished() override {} void onEstablishmentFailure() override {} diff --git a/source/common/runtime/runtime_features.cc b/source/common/runtime/runtime_features.cc index 123a64b2cec6..019897e37059 100644 --- a/source/common/runtime/runtime_features.cc +++ b/source/common/runtime/runtime_features.cc @@ -119,6 +119,9 @@ FALSE_RUNTIME_GUARD(envoy_reloadable_features_enable_include_histograms); FALSE_RUNTIME_GUARD(envoy_reloadable_features_refresh_rtt_after_request); // TODO(danzh) false deprecate it once QUICHE has its own enable/disable flag. FALSE_RUNTIME_GUARD(envoy_reloadable_features_quic_reject_all); +// TODO(adisuissa) this will be enabled by default once the work on the feature is +// done in EdsClusterImpl. +FALSE_RUNTIME_GUARD(envoy_restart_features_use_eds_cache_for_ads); // Block of non-boolean flags. Use of int flags is deprecated. Do not add more. ABSL_FLAG(uint64_t, re2_max_program_size_error_level, 100, ""); // NOLINT diff --git a/source/common/upstream/cluster_manager_impl.cc b/source/common/upstream/cluster_manager_impl.cc index a26a46a8a213..3c1d580d2bf5 100644 --- a/source/common/upstream/cluster_manager_impl.cc +++ b/source/common/upstream/cluster_manager_impl.cc @@ -418,7 +418,7 @@ ClusterManagerImpl::ClusterManagerImpl( ->createUncachedRawAsyncClient(), main_thread_dispatcher, random_, *stats_.rootScope(), dyn_resources.ads_config(), local_info, std::move(custom_config_validators), std::move(backoff_strategy), - makeOptRefFromPtr(xds_config_tracker_.get()), {}); + makeOptRefFromPtr(xds_config_tracker_.get()), {}, false); } else { Config::Utility::checkTransportVersion(dyn_resources.ads_config()); auto xds_delegate_opt_ref = makeOptRefFromPtr(xds_resources_delegate_.get()); @@ -439,7 +439,7 @@ ClusterManagerImpl::ClusterManagerImpl( ->createUncachedRawAsyncClient(), main_thread_dispatcher, random_, *stats_.rootScope(), dyn_resources.ads_config(), local_info, std::move(custom_config_validators), std::move(backoff_strategy), - makeOptRefFromPtr(xds_config_tracker_.get()), xds_delegate_opt_ref); + makeOptRefFromPtr(xds_config_tracker_.get()), xds_delegate_opt_ref, false); } } else { ads_mux_ = std::make_unique(); diff --git a/source/extensions/config_subscription/grpc/BUILD b/source/extensions/config_subscription/grpc/BUILD index a3e3fce0d05e..a703ad258a90 100644 --- a/source/extensions/config_subscription/grpc/BUILD +++ b/source/extensions/config_subscription/grpc/BUILD @@ -14,6 +14,7 @@ envoy_cc_library( srcs = ["grpc_mux_impl.cc"], hdrs = ["grpc_mux_impl.h"], deps = [ + ":eds_resources_cache_lib", ":grpc_stream_lib", ":xds_source_id_lib", "//envoy/config:custom_config_validators_interface", @@ -34,6 +35,7 @@ envoy_cc_library( "//source/common/memory:utils_lib", "//source/common/protobuf", "@com_google_absl//absl/container:btree", + "@envoy_api//envoy/config/endpoint/v3:pkg_cc_proto", "@envoy_api//envoy/service/discovery/v3:pkg_cc_proto", ], ) @@ -44,6 +46,7 @@ envoy_cc_library( hdrs = ["new_grpc_mux_impl.h"], deps = [ ":delta_subscription_state_lib", + ":eds_resources_cache_lib", ":grpc_stream_lib", ":pausable_ack_queue_lib", ":watch_map_lib", @@ -54,6 +57,7 @@ envoy_cc_library( "//source/common/config:xds_context_params_lib", "//source/common/config:xds_resource_lib", "//source/common/memory:utils_lib", + "@envoy_api//envoy/config/endpoint/v3:pkg_cc_proto", "@envoy_api//envoy/service/discovery/v3:pkg_cc_proto", ], ) @@ -63,6 +67,7 @@ envoy_cc_library( srcs = ["grpc_subscription_impl.cc"], hdrs = ["grpc_subscription_impl.h"], deps = [ + ":eds_resources_cache_lib", ":grpc_mux_lib", ":new_grpc_mux_lib", "//envoy/config:subscription_interface", @@ -210,6 +215,7 @@ envoy_cc_library( "//source/common/common:minimal_logger_lib", "//source/common/common:utility_lib", "//source/common/config:decoded_resource_lib", + "//source/common/config:resource_name_lib", "//source/common/config:utility_lib", "//source/common/config:xds_resource_lib", "//source/common/protobuf", diff --git a/source/extensions/config_subscription/grpc/grpc_collection_subscription_factory.cc b/source/extensions/config_subscription/grpc/grpc_collection_subscription_factory.cc index 987524f0c204..25c11bb02b58 100644 --- a/source/extensions/config_subscription/grpc/grpc_collection_subscription_factory.cc +++ b/source/extensions/config_subscription/grpc/grpc_collection_subscription_factory.cc @@ -31,7 +31,9 @@ SubscriptionPtr DeltaGrpcCollectionConfigSubscriptionFactory::create( data.dispatcher_, deltaGrpcMethod(data.type_url_), data.scope_, Utility::parseRateLimitSettings(api_config_source), data.local_info_, std::move(custom_config_validators), std::move(backoff_strategy), - data.xds_config_tracker_), + data.xds_config_tracker_, + // No EDS resources cache needed from collections. + /*eds_resources_cache=*/nullptr), data.callbacks_, data.resource_decoder_, data.stats_, data.dispatcher_, Utility::configSourceInitialFetchTimeout(data.config_), /*is_aggregated=*/false, data.options_); diff --git a/source/extensions/config_subscription/grpc/grpc_mux_impl.cc b/source/extensions/config_subscription/grpc/grpc_mux_impl.cc index 924596483a2c..72a9a41a33ae 100644 --- a/source/extensions/config_subscription/grpc/grpc_mux_impl.cc +++ b/source/extensions/config_subscription/grpc/grpc_mux_impl.cc @@ -6,6 +6,7 @@ #include "source/common/config/utility.h" #include "source/common/memory/utils.h" #include "source/common/protobuf/protobuf.h" +#include "source/extensions/config_subscription/grpc/eds_resources_cache_impl.h" #include "source/extensions/config_subscription/grpc/xds_source_id.h" #include "absl/container/btree_map.h" @@ -56,21 +57,21 @@ std::string convertToWildcard(const std::string& resource_name) { } } // namespace -GrpcMuxImpl::GrpcMuxImpl(const LocalInfo::LocalInfo& local_info, - Grpc::RawAsyncClientPtr async_client, Event::Dispatcher& dispatcher, - const Protobuf::MethodDescriptor& service_method, Stats::Scope& scope, - const RateLimitSettings& rate_limit_settings, bool skip_subsequent_node, - CustomConfigValidatorsPtr&& config_validators, - BackOffStrategyPtr backoff_strategy, - XdsConfigTrackerOptRef xds_config_tracker, - XdsResourcesDelegateOptRef xds_resources_delegate, - const std::string& target_xds_authority) +GrpcMuxImpl::GrpcMuxImpl( + const LocalInfo::LocalInfo& local_info, Grpc::RawAsyncClientPtr async_client, + Event::Dispatcher& dispatcher, const Protobuf::MethodDescriptor& service_method, + Stats::Scope& scope, const RateLimitSettings& rate_limit_settings, bool skip_subsequent_node, + CustomConfigValidatorsPtr&& config_validators, BackOffStrategyPtr backoff_strategy, + XdsConfigTrackerOptRef xds_config_tracker, XdsResourcesDelegateOptRef xds_resources_delegate, + EdsResourcesCachePtr eds_resources_cache, const std::string& target_xds_authority) : grpc_stream_(this, std::move(async_client), service_method, dispatcher, scope, std::move(backoff_strategy), rate_limit_settings), local_info_(local_info), skip_subsequent_node_(skip_subsequent_node), config_validators_(std::move(config_validators)), xds_config_tracker_(xds_config_tracker), - xds_resources_delegate_(xds_resources_delegate), target_xds_authority_(target_xds_authority), - first_stream_request_(true), dispatcher_(dispatcher), + xds_resources_delegate_(xds_resources_delegate), + eds_resources_cache_(std::move(eds_resources_cache)), + target_xds_authority_(target_xds_authority), first_stream_request_(true), + dispatcher_(dispatcher), dynamic_update_callback_handle_(local_info.contextProvider().addDynamicContextUpdateCallback( [this](absl::string_view resource_type_url) { onDynamicContextUpdate(resource_type_url); @@ -187,8 +188,14 @@ GrpcMuxWatchPtr GrpcMuxImpl::addWatch(const std::string& type_url, SubscriptionCallbacks& callbacks, OpaqueResourceDecoderSharedPtr resource_decoder, const SubscriptionOptions& options) { + // Resource cache is only used for EDS resources. + EdsResourcesCacheOptRef resources_cache{absl::nullopt}; + if (eds_resources_cache_ && + (type_url == Config::getTypeUrl())) { + resources_cache = makeOptRefFromPtr(eds_resources_cache_.get()); + } auto watch = std::make_unique(resources, callbacks, resource_decoder, type_url, - *this, options, local_info_); + *this, options, local_info_, resources_cache); ENVOY_LOG(debug, "gRPC mux addWatch for " + type_url); // Lazily kick off the requests based on first subscription. This has the @@ -412,6 +419,19 @@ void GrpcMuxImpl::processDiscoveryResources(const std::vectorcallbacks_.onConfigUpdate(found_resources, version_info); + // Resource cache is only used for EDS resources. + if (eds_resources_cache_ && + (type_url == Config::getTypeUrl())) { + for (const auto& resource : found_resources) { + const envoy::config::endpoint::v3::ClusterLoadAssignment& cluster_load_assignment = + dynamic_cast( + resource.get().resource()); + eds_resources_cache_->setResource(resource.get().name(), cluster_load_assignment); + } + // No need to remove resources from the cache, as currently only non-collection + // subscriptions are supported, and these resources are removed in the call + // to updateWatchInterest(). + } } } @@ -532,7 +552,7 @@ class GrpcMuxFactory : public MuxFactory { const envoy::config::core::v3::ApiConfigSource& ads_config, const LocalInfo::LocalInfo& local_info, CustomConfigValidatorsPtr&& config_validators, BackOffStrategyPtr&& backoff_strategy, XdsConfigTrackerOptRef xds_config_tracker, - XdsResourcesDelegateOptRef xds_resources_delegate) override { + XdsResourcesDelegateOptRef xds_resources_delegate, bool use_eds_resources_cache) override { return std::make_shared( local_info, std::move(async_client), dispatcher, *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( @@ -540,6 +560,10 @@ class GrpcMuxFactory : public MuxFactory { scope, Utility::parseRateLimitSettings(ads_config), ads_config.set_node_on_first_message_only(), std::move(config_validators), std::move(backoff_strategy), xds_config_tracker, xds_resources_delegate, + (use_eds_resources_cache && + Runtime::runtimeFeatureEnabled("envoy.restart_features.use_eds_cache_for_ads")) + ? std::make_unique(dispatcher) + : nullptr, Config::Utility::getGrpcControlPlane(ads_config).value_or("")); } }; diff --git a/source/extensions/config_subscription/grpc/grpc_mux_impl.h b/source/extensions/config_subscription/grpc/grpc_mux_impl.h index 83ac9c558c7e..a018dd0a57e1 100644 --- a/source/extensions/config_subscription/grpc/grpc_mux_impl.h +++ b/source/extensions/config_subscription/grpc/grpc_mux_impl.h @@ -7,6 +7,7 @@ #include "envoy/common/random_generator.h" #include "envoy/common/time.h" #include "envoy/config/custom_config_validators.h" +#include "envoy/config/endpoint/v3/endpoint.pb.h" #include "envoy/config/grpc_mux.h" #include "envoy/config/subscription.h" #include "envoy/config/xds_config_tracker.h" @@ -20,6 +21,7 @@ #include "source/common/common/logger.h" #include "source/common/common/utility.h" #include "source/common/config/api_version.h" +#include "source/common/config/resource_name.h" #include "source/common/config/ttl.h" #include "source/common/config/utility.h" #include "source/common/config/xds_context_params.h" @@ -44,7 +46,7 @@ class GrpcMuxImpl : public GrpcMux, bool skip_subsequent_node, CustomConfigValidatorsPtr&& config_validators, BackOffStrategyPtr backoff_strategy, XdsConfigTrackerOptRef xds_config_tracker, XdsResourcesDelegateOptRef xds_resources_delegate, - const std::string& target_xds_authority); + EdsResourcesCachePtr eds_resources_cache, const std::string& target_xds_authority); ~GrpcMuxImpl() override; @@ -72,6 +74,10 @@ class GrpcMuxImpl : public GrpcMux, void requestOnDemandUpdate(const std::string&, const absl::flat_hash_set&) override { } + EdsResourcesCacheOptRef edsResourcesCache() override { + return makeOptRefFromPtr(eds_resources_cache_.get()); + } + void handleDiscoveryResponse( std::unique_ptr&& message); @@ -99,17 +105,29 @@ class GrpcMuxImpl : public GrpcMux, SubscriptionCallbacks& callbacks, OpaqueResourceDecoderSharedPtr resource_decoder, const std::string& type_url, GrpcMuxImpl& parent, const SubscriptionOptions& options, - const LocalInfo::LocalInfo& local_info) + const LocalInfo::LocalInfo& local_info, + EdsResourcesCacheOptRef eds_resources_cache) : callbacks_(callbacks), resource_decoder_(resource_decoder), type_url_(type_url), parent_(parent), subscription_options_(options), local_info_(local_info), - watches_(parent.apiStateFor(type_url).watches_) { + watches_(parent.apiStateFor(type_url).watches_), + eds_resources_cache_(eds_resources_cache) { updateResources(resources); + // If eds resources cache is provided, then the type must be ClusterLoadAssignment. + ASSERT( + !eds_resources_cache_.has_value() || + (type_url == Config::getTypeUrl())); } ~GrpcMuxWatchImpl() override { watches_.erase(iter_); if (!resources_.empty()) { parent_.queueDiscoveryRequest(type_url_); + if (eds_resources_cache_.has_value()) { + // All resources are to be removed, so remove them from the cache. + for (const auto& resource_name : resources_) { + eds_resources_cache_->removeResource(resource_name); + } + } } } @@ -131,7 +149,12 @@ class GrpcMuxImpl : public GrpcMux, private: void updateResources(const absl::flat_hash_set& resources) { - resources_.clear(); + // Finding the list of removed resources by keeping the current resources + // set until the end the function and computing the diff. + // Temporarily keep the resources prior to the update to find which ones + // were removed. + std::set previous_resources; + previous_resources.swap(resources_); std::transform( resources.begin(), resources.end(), std::inserter(resources_, resources_.begin()), [this](const std::string& resource_name) -> std::string { @@ -148,6 +171,16 @@ class GrpcMuxImpl : public GrpcMux, } return resource_name; }); + if (eds_resources_cache_.has_value()) { + // Compute the removed resources and remove them from the cache. + std::vector removed_resources; + std::set_difference(previous_resources.begin(), previous_resources.end(), + resources_.begin(), resources_.end(), + std::back_inserter(removed_resources)); + for (const auto& resource_name : removed_resources) { + eds_resources_cache_->removeResource(resource_name); + } + } // move this watch to the beginning of the list iter_ = watches_.emplace(watches_.begin(), this); } @@ -157,6 +190,8 @@ class GrpcMuxImpl : public GrpcMux, const LocalInfo::LocalInfo& local_info_; WatchList& watches_; WatchList::iterator iter_; + // Optional cache for the specific ClusterLoadAssignments of this watch. + EdsResourcesCacheOptRef eds_resources_cache_; }; // Per muxed API state. @@ -212,6 +247,7 @@ class GrpcMuxImpl : public GrpcMux, CustomConfigValidatorsPtr config_validators_; XdsConfigTrackerOptRef xds_config_tracker_; XdsResourcesDelegateOptRef xds_resources_delegate_; + EdsResourcesCachePtr eds_resources_cache_; const std::string target_xds_authority_; bool first_stream_request_; diff --git a/source/extensions/config_subscription/grpc/grpc_subscription_factory.cc b/source/extensions/config_subscription/grpc/grpc_subscription_factory.cc index c7296942e9e7..435df4f3a1ef 100644 --- a/source/extensions/config_subscription/grpc/grpc_subscription_factory.cc +++ b/source/extensions/config_subscription/grpc/grpc_subscription_factory.cc @@ -33,6 +33,7 @@ GrpcConfigSubscriptionFactory::create(ConfigSubscriptionFactory::SubscriptionDat Utility::parseRateLimitSettings(api_config_source), data.local_info_, api_config_source.set_node_on_first_message_only(), std::move(custom_config_validators), std::move(backoff_strategy), data.xds_config_tracker_, data.xds_resources_delegate_, + /*std::move(data.eds_resources_cache_)*/ nullptr, // EDS cache is only used for ADS. control_plane_id); } else { mux = std::make_shared( @@ -44,6 +45,7 @@ GrpcConfigSubscriptionFactory::create(ConfigSubscriptionFactory::SubscriptionDat Utility::parseRateLimitSettings(api_config_source), api_config_source.set_node_on_first_message_only(), std::move(custom_config_validators), std::move(backoff_strategy), data.xds_config_tracker_, data.xds_resources_delegate_, + /*std::move(data.eds_resources_cache_)*/ nullptr, // EDS cache is only used for ADS. control_plane_id); } return std::make_unique( @@ -73,7 +75,9 @@ DeltaGrpcConfigSubscriptionFactory::create(ConfigSubscriptionFactory::Subscripti data.dispatcher_, deltaGrpcMethod(data.type_url_), data.scope_, Utility::parseRateLimitSettings(api_config_source), data.local_info_, api_config_source.set_node_on_first_message_only(), std::move(custom_config_validators), - std::move(backoff_strategy), data.xds_config_tracker_); + std::move(backoff_strategy), data.xds_config_tracker_, + /*std::move(data.eds_resources_cache_)*/ nullptr // EDS cache is only used for ADS. + ); } else { mux = std::make_shared( Config::Utility::factoryForGrpcApiConfigSource(data.cm_.grpcAsyncClientManager(), @@ -81,7 +85,9 @@ DeltaGrpcConfigSubscriptionFactory::create(ConfigSubscriptionFactory::Subscripti ->createUncachedRawAsyncClient(), data.dispatcher_, deltaGrpcMethod(data.type_url_), data.scope_, Utility::parseRateLimitSettings(api_config_source), data.local_info_, - std::move(custom_config_validators), std::move(backoff_strategy), data.xds_config_tracker_); + std::move(custom_config_validators), std::move(backoff_strategy), data.xds_config_tracker_, + /*std::move(data.eds_resources_cache_)*/ nullptr // EDS cache is only used for ADS. + ); } return std::make_unique( std::move(mux), data.callbacks_, data.resource_decoder_, data.stats_, data.type_url_, diff --git a/source/extensions/config_subscription/grpc/new_grpc_mux_impl.cc b/source/extensions/config_subscription/grpc/new_grpc_mux_impl.cc index 2bf989b4c4b1..8507e3b525fd 100644 --- a/source/extensions/config_subscription/grpc/new_grpc_mux_impl.cc +++ b/source/extensions/config_subscription/grpc/new_grpc_mux_impl.cc @@ -11,6 +11,7 @@ #include "source/common/memory/utils.h" #include "source/common/protobuf/protobuf.h" #include "source/common/protobuf/utility.h" +#include "source/extensions/config_subscription/grpc/eds_resources_cache_impl.h" namespace Envoy { namespace Config { @@ -34,14 +35,12 @@ class AllMuxesState { using AllMuxes = ThreadSafeSingleton; } // namespace -NewGrpcMuxImpl::NewGrpcMuxImpl(Grpc::RawAsyncClientPtr&& async_client, - Event::Dispatcher& dispatcher, - const Protobuf::MethodDescriptor& service_method, - Stats::Scope& scope, const RateLimitSettings& rate_limit_settings, - const LocalInfo::LocalInfo& local_info, - CustomConfigValidatorsPtr&& config_validators, - BackOffStrategyPtr backoff_strategy, - XdsConfigTrackerOptRef xds_config_tracker) +NewGrpcMuxImpl::NewGrpcMuxImpl( + Grpc::RawAsyncClientPtr&& async_client, Event::Dispatcher& dispatcher, + const Protobuf::MethodDescriptor& service_method, Stats::Scope& scope, + const RateLimitSettings& rate_limit_settings, const LocalInfo::LocalInfo& local_info, + CustomConfigValidatorsPtr&& config_validators, BackOffStrategyPtr backoff_strategy, + XdsConfigTrackerOptRef xds_config_tracker, EdsResourcesCachePtr eds_resources_cache) : grpc_stream_(this, std::move(async_client), service_method, dispatcher, scope, std::move(backoff_strategy), rate_limit_settings), local_info_(local_info), config_validators_(std::move(config_validators)), @@ -49,7 +48,8 @@ NewGrpcMuxImpl::NewGrpcMuxImpl(Grpc::RawAsyncClientPtr&& async_client, [this](absl::string_view resource_type_url) { onDynamicContextUpdate(resource_type_url); })), - dispatcher_(dispatcher), xds_config_tracker_(xds_config_tracker) { + dispatcher_(dispatcher), xds_config_tracker_(xds_config_tracker), + eds_resources_cache_(std::move(eds_resources_cache)) { AllMuxes::get().insert(this); } @@ -242,9 +242,16 @@ void NewGrpcMuxImpl::removeWatch(const std::string& type_url, Watch* watch) { void NewGrpcMuxImpl::addSubscription(const std::string& type_url, const bool use_namespace_matching) { - subscriptions_.emplace(type_url, std::make_unique( - type_url, local_info_, use_namespace_matching, dispatcher_, - *config_validators_.get(), xds_config_tracker_)); + // Resource cache is only used for EDS resources. + EdsResourcesCacheOptRef resources_cache{absl::nullopt}; + if (eds_resources_cache_ && + (type_url == Config::getTypeUrl())) { + resources_cache = makeOptRefFromPtr(eds_resources_cache_.get()); + } + subscriptions_.emplace( + type_url, std::make_unique(type_url, local_info_, use_namespace_matching, + dispatcher_, *config_validators_.get(), + xds_config_tracker_, resources_cache)); subscription_ordering_.emplace_back(type_url); } @@ -340,13 +347,17 @@ class NewGrpcMuxFactory : public MuxFactory { const envoy::config::core::v3::ApiConfigSource& ads_config, const LocalInfo::LocalInfo& local_info, CustomConfigValidatorsPtr&& config_validators, BackOffStrategyPtr&& backoff_strategy, XdsConfigTrackerOptRef xds_config_tracker, - OptRef) override { + OptRef, bool use_eds_resources_cache) override { return std::make_shared( std::move(async_client), dispatcher, *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( "envoy.service.discovery.v3.AggregatedDiscoveryService.DeltaAggregatedResources"), scope, Utility::parseRateLimitSettings(ads_config), local_info, - std::move(config_validators), std::move(backoff_strategy), xds_config_tracker); + std::move(config_validators), std::move(backoff_strategy), xds_config_tracker, + (use_eds_resources_cache && + Runtime::runtimeFeatureEnabled("envoy.restart_features.use_eds_cache_for_ads")) + ? std::make_unique(dispatcher) + : nullptr); } }; diff --git a/source/extensions/config_subscription/grpc/new_grpc_mux_impl.h b/source/extensions/config_subscription/grpc/new_grpc_mux_impl.h index 39a27eeed84b..2c82453a7488 100644 --- a/source/extensions/config_subscription/grpc/new_grpc_mux_impl.h +++ b/source/extensions/config_subscription/grpc/new_grpc_mux_impl.h @@ -4,6 +4,7 @@ #include "envoy/common/random_generator.h" #include "envoy/common/token_bucket.h" +#include "envoy/config/endpoint/v3/endpoint.pb.h" #include "envoy/config/grpc_mux.h" #include "envoy/config/subscription.h" #include "envoy/config/xds_config_tracker.h" @@ -11,6 +12,7 @@ #include "source/common/common/logger.h" #include "source/common/config/api_version.h" +#include "source/common/config/resource_name.h" #include "source/common/grpc/common.h" #include "source/common/runtime/runtime_features.h" #include "source/extensions/config_subscription/grpc/delta_subscription_state.h" @@ -36,7 +38,8 @@ class NewGrpcMuxImpl const RateLimitSettings& rate_limit_settings, const LocalInfo::LocalInfo& local_info, CustomConfigValidatorsPtr&& config_validators, BackOffStrategyPtr backoff_strategy, - XdsConfigTrackerOptRef xds_config_tracker); + XdsConfigTrackerOptRef xds_config_tracker, + EdsResourcesCachePtr eds_resources_cache); ~NewGrpcMuxImpl() override; @@ -58,6 +61,10 @@ class NewGrpcMuxImpl void requestOnDemandUpdate(const std::string& type_url, const absl::flat_hash_set& for_update) override; + EdsResourcesCacheOptRef edsResourcesCache() override { + return makeOptRefFromPtr(eds_resources_cache_.get()); + } + ScopedResume pause(const std::string& type_url) override; ScopedResume pause(const std::vector type_urls) override; @@ -86,9 +93,15 @@ class NewGrpcMuxImpl SubscriptionStuff(const std::string& type_url, const LocalInfo::LocalInfo& local_info, const bool use_namespace_matching, Event::Dispatcher& dispatcher, CustomConfigValidators& config_validators, - XdsConfigTrackerOptRef xds_config_tracker) - : watch_map_(use_namespace_matching, type_url, config_validators), - sub_state_(type_url, watch_map_, local_info, dispatcher, xds_config_tracker) {} + XdsConfigTrackerOptRef xds_config_tracker, + EdsResourcesCacheOptRef eds_resources_cache) + : watch_map_(use_namespace_matching, type_url, config_validators, eds_resources_cache), + sub_state_(type_url, watch_map_, local_info, dispatcher, xds_config_tracker) { + // If eds resources cache is provided, then the type must be ClusterLoadAssignment. + ASSERT( + !eds_resources_cache.has_value() || + (type_url == Config::getTypeUrl())); + } WatchMap watch_map_; DeltaSubscriptionState sub_state_; @@ -182,6 +195,7 @@ class NewGrpcMuxImpl Common::CallbackHandlePtr dynamic_update_callback_handle_; Event::Dispatcher& dispatcher_; XdsConfigTrackerOptRef xds_config_tracker_; + EdsResourcesCachePtr eds_resources_cache_; // True iff Envoy is shutting down; no messages should be sent on the `grpc_stream_` when this is // true because it may contain dangling pointers. diff --git a/source/extensions/config_subscription/grpc/watch_map.cc b/source/extensions/config_subscription/grpc/watch_map.cc index b9a1907fd3d3..9c16a4182e45 100644 --- a/source/extensions/config_subscription/grpc/watch_map.cc +++ b/source/extensions/config_subscription/grpc/watch_map.cc @@ -64,8 +64,20 @@ WatchMap::updateWatchInterest(Watch* watch, watch->resource_names_ = update_to_these_names; - return AddedRemoved(findAdditions(newly_added_to_watch, watch), - findRemovals(newly_removed_from_watch, watch)); + // First resources are added and only then removed, so a watch won't be removed + // if its interest has been replaced (rather than completely removed). + absl::flat_hash_set added_resources = findAdditions(newly_added_to_watch, watch); + absl::flat_hash_set removed_resources = + findRemovals(newly_removed_from_watch, watch); + // Remove cached resource that are no longer relevant. + if (eds_resources_cache_.has_value()) { + for (const auto& resource_name : removed_resources) { + // This may pass a resource_name that is not in the cache, for example + // if the resource contents has never arrived. + eds_resources_cache_->removeResource(resource_name); + } + } + return AddedRemoved(std::move(added_resources), std::move(removed_resources)); } absl::flat_hash_set WatchMap::watchesInterestedIn(const std::string& resource_name) { @@ -121,6 +133,10 @@ void WatchMap::onConfigUpdate(const std::vector& resources, ASSERT(deferred_removed_during_update_ == nullptr); deferred_removed_during_update_ = std::make_unique>(); Cleanup cleanup([this] { removeDeferredWatches(); }); + // The xDS server may send a resource that Envoy isn't interested in. This bit array + // will hold an "interesting" bit for each of the resources sent in the update. + std::vector interesting_resources; + interesting_resources.reserve(resources.size()); // Build a map from watches, to the set of updated resources that each watch cares about. Each // entry in the map is then a nice little bundle that can be fed directly into the individual // onConfigUpdate()s. @@ -130,6 +146,9 @@ void WatchMap::onConfigUpdate(const std::vector& resources, for (const auto& interested_watch : interested_in_r) { per_watch_updates[interested_watch].emplace_back(*r); } + // Set the corresponding interested_resources entry to true iff there is a + // watch interested in the resource. + interesting_resources.emplace_back(!interested_in_r.empty()); } // Execute external config validators. @@ -159,6 +178,23 @@ void WatchMap::onConfigUpdate(const std::vector& resources, watch->callbacks_.onConfigUpdate(this_watch_updates->second, version_info); } } + + if (eds_resources_cache_.has_value()) { + // Add/update the watched resources to/in the cache. + // Only resources that have a watcher should be updated. + for (uint32_t resource_idx = 0; resource_idx < resources.size(); ++resource_idx) { + if (interesting_resources[resource_idx]) { + const auto& resource = resources[resource_idx]; + const envoy::config::endpoint::v3::ClusterLoadAssignment& cluster_load_assignment = + dynamic_cast( + resource.get()->resource()); + eds_resources_cache_->setResource(resource.get()->name(), cluster_load_assignment); + } + } + // Note: No need to remove resources from the cache, as currently only non-collection + // subscriptions are supported, and these resources are removed in the call + // to updateWatchInterest(). + } } void WatchMap::onConfigUpdate(const Protobuf::RepeatedPtrField& resources, @@ -242,6 +278,19 @@ void WatchMap::onConfigUpdate( cur_watch->callbacks_.onConfigUpdate({}, {}, system_version_info); } } + + if (eds_resources_cache_.has_value()) { + // Add/update the watched resources to/in the cache. + for (const auto& resource : decoded_resources) { + const envoy::config::endpoint::v3::ClusterLoadAssignment& cluster_load_assignment = + dynamic_cast( + resource.get()->resource()); + eds_resources_cache_->setResource(resource.get()->name(), cluster_load_assignment); + } + // No need to remove resources from the cache, as currently only non-collection + // subscriptions are supported, and these resources are removed in the call + // to updateWatchInterest(). + } } void WatchMap::onConfigUpdateFailed(ConfigUpdateFailureReason reason, const EnvoyException* e) { diff --git a/source/extensions/config_subscription/grpc/watch_map.h b/source/extensions/config_subscription/grpc/watch_map.h index cd5c1973fd02..47cd43b4581f 100644 --- a/source/extensions/config_subscription/grpc/watch_map.h +++ b/source/extensions/config_subscription/grpc/watch_map.h @@ -5,11 +5,13 @@ #include #include "envoy/config/custom_config_validators.h" +#include "envoy/config/eds_resources_cache.h" #include "envoy/config/subscription.h" #include "envoy/service/discovery/v3/discovery.pb.h" #include "source/common/common/assert.h" #include "source/common/common/logger.h" +#include "source/common/config/resource_name.h" #include "absl/container/flat_hash_map.h" #include "absl/container/flat_hash_set.h" @@ -59,12 +61,25 @@ struct Watch { // update the subscription accordingly. // // A WatchMap is assumed to be dedicated to a single type_url type of resource (EDS, CDS, etc). +// +// The WatchMap can also store the fetched resources in a cache, and allow others to fetch +// resources directly from the cache. This is done for EDS in the following case: +// Assume an active EDS cluster exists with some load-assignment that is kept in the cache. +// If the cluster is updated, and no load-assignment is sent from the xDS server, the +// cached version will be used. +// The WatchMap is responsible to update the cache with the resource contents, and it is +// up to the specific xDS type subscription handler (i.e., EdsClusterImpl), to fetch +// the resource from the cache. class WatchMap : public UntypedConfigUpdateCallbacks, public Logger::Loggable { public: WatchMap(const bool use_namespace_matching, const std::string& type_url, - CustomConfigValidators& config_validators) + CustomConfigValidators& config_validators, EdsResourcesCacheOptRef eds_resources_cache) : use_namespace_matching_(use_namespace_matching), type_url_(type_url), - config_validators_(config_validators) {} + config_validators_(config_validators), eds_resources_cache_(eds_resources_cache) { + // If eds resources cache is provided, then the type must be ClusterLoadAssignment. + ASSERT(!eds_resources_cache_.has_value() || + (type_url == Config::getTypeUrl())); + } // Adds 'callbacks' to the WatchMap, with every possible resource being watched. // (Use updateWatchInterest() to narrow it down to some specific names). @@ -133,6 +148,7 @@ class WatchMap : public UntypedConfigUpdateCallbacks, public Logger::Loggable::GrpcMuxImpl( Stats::Scope& scope, const RateLimitSettings& rate_limit_settings, CustomConfigValidatorsPtr&& config_validators, BackOffStrategyPtr backoff_strategy, XdsConfigTrackerOptRef xds_config_tracker, XdsResourcesDelegateOptRef xds_resources_delegate, - const std::string& target_xds_authority) + EdsResourcesCachePtr eds_resources_cache, const std::string& target_xds_authority) : grpc_stream_(this, std::move(async_client), service_method, dispatcher, scope, std::move(backoff_strategy), rate_limit_settings), subscription_state_factory_(std::move(subscription_state_factory)), @@ -53,7 +55,9 @@ GrpcMuxImpl::GrpcMuxImpl( onDynamicContextUpdate(resource_type_url); })), config_validators_(std::move(config_validators)), xds_config_tracker_(xds_config_tracker), - xds_resources_delegate_(xds_resources_delegate), target_xds_authority_(target_xds_authority) { + xds_resources_delegate_(xds_resources_delegate), + eds_resources_cache_(std::move(eds_resources_cache)), + target_xds_authority_(target_xds_authority) { Config::Utility::checkLocalInfo("ads", local_info); AllMuxes::get().insert(this); } @@ -84,12 +88,19 @@ Config::GrpcMuxWatchPtr GrpcMuxImpl::addWatch( const SubscriptionOptions& options) { auto watch_map = watch_maps_.find(type_url); if (watch_map == watch_maps_.end()) { + // Resource cache is only used for EDS resources. + EdsResourcesCacheOptRef resources_cache{absl::nullopt}; + if (eds_resources_cache_ && + (type_url == Config::getTypeUrl())) { + resources_cache = makeOptRefFromPtr(eds_resources_cache_.get()); + } + // We don't yet have a subscription for type_url! Make one! - watch_map = - watch_maps_ - .emplace(type_url, std::make_unique(options.use_namespace_matching_, type_url, - *config_validators_.get())) - .first; + watch_map = watch_maps_ + .emplace(type_url, + std::make_unique(options.use_namespace_matching_, type_url, + *config_validators_.get(), resources_cache)) + .first; subscriptions_.emplace(type_url, subscription_state_factory_->makeSubscriptionState( type_url, *watch_maps_[type_url], resource_decoder, xds_config_tracker_, xds_resources_delegate_, @@ -367,11 +378,12 @@ GrpcMuxDelta::GrpcMuxDelta(Grpc::RawAsyncClientPtr&& async_client, Event::Dispat const LocalInfo::LocalInfo& local_info, bool skip_subsequent_node, CustomConfigValidatorsPtr&& config_validators, BackOffStrategyPtr backoff_strategy, - XdsConfigTrackerOptRef xds_config_tracker) + XdsConfigTrackerOptRef xds_config_tracker, + EdsResourcesCachePtr eds_resources_cache) : GrpcMuxImpl(std::make_unique(dispatcher), skip_subsequent_node, local_info, std::move(async_client), dispatcher, service_method, scope, rate_limit_settings, std::move(config_validators), std::move(backoff_strategy), - xds_config_tracker) {} + xds_config_tracker, absl::nullopt, std::move(eds_resources_cache)) {} // GrpcStreamCallbacks for GrpcMuxDelta void GrpcMuxDelta::requestOnDemandUpdate(const std::string& type_url, @@ -392,11 +404,13 @@ GrpcMuxSotw::GrpcMuxSotw(Grpc::RawAsyncClientPtr&& async_client, Event::Dispatch BackOffStrategyPtr backoff_strategy, XdsConfigTrackerOptRef xds_config_tracker, XdsResourcesDelegateOptRef xds_resources_delegate, + EdsResourcesCachePtr eds_resources_cache, const std::string& target_xds_authority) : GrpcMuxImpl(std::make_unique(dispatcher), skip_subsequent_node, local_info, std::move(async_client), dispatcher, service_method, scope, rate_limit_settings, std::move(config_validators), std::move(backoff_strategy), - xds_config_tracker, xds_resources_delegate, target_xds_authority) {} + xds_config_tracker, xds_resources_delegate, std::move(eds_resources_cache), + target_xds_authority) {} Config::GrpcMuxWatchPtr NullGrpcMuxImpl::addWatch(const std::string&, const absl::flat_hash_set&, @@ -416,7 +430,7 @@ class DeltaGrpcMuxFactory : public MuxFactory { const envoy::config::core::v3::ApiConfigSource& ads_config, const LocalInfo::LocalInfo& local_info, CustomConfigValidatorsPtr&& config_validators, BackOffStrategyPtr&& backoff_strategy, XdsConfigTrackerOptRef xds_config_tracker, - XdsResourcesDelegateOptRef) override { + XdsResourcesDelegateOptRef, bool use_eds_resources_cache) override { return std::make_shared( std::move(async_client), dispatcher, *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( @@ -424,7 +438,11 @@ class DeltaGrpcMuxFactory : public MuxFactory { "DeltaAggregatedResources"), scope, Envoy::Config::Utility::parseRateLimitSettings(ads_config), local_info, ads_config.set_node_on_first_message_only(), std::move(config_validators), - std::move(backoff_strategy), xds_config_tracker); + std::move(backoff_strategy), xds_config_tracker, + (use_eds_resources_cache && + Runtime::runtimeFeatureEnabled("envoy.restart_features.use_eds_cache_for_ads")) + ? std::make_unique(dispatcher) + : nullptr); } }; @@ -438,7 +456,7 @@ class SotwGrpcMuxFactory : public MuxFactory { const envoy::config::core::v3::ApiConfigSource& ads_config, const LocalInfo::LocalInfo& local_info, CustomConfigValidatorsPtr&& config_validators, BackOffStrategyPtr&& backoff_strategy, XdsConfigTrackerOptRef xds_config_tracker, - XdsResourcesDelegateOptRef) override { + XdsResourcesDelegateOptRef, bool use_eds_resources_cache) override { return std::make_shared( std::move(async_client), dispatcher, *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( @@ -446,7 +464,11 @@ class SotwGrpcMuxFactory : public MuxFactory { "StreamAggregatedResources"), scope, Envoy::Config::Utility::parseRateLimitSettings(ads_config), local_info, ads_config.set_node_on_first_message_only(), std::move(config_validators), - std::move(backoff_strategy), xds_config_tracker); + std::move(backoff_strategy), xds_config_tracker, absl::nullopt, + (use_eds_resources_cache && + Runtime::runtimeFeatureEnabled("envoy.restart_features.use_eds_cache_for_ads")) + ? std::make_unique(dispatcher) + : nullptr); } }; diff --git a/source/extensions/config_subscription/grpc/xds_mux/grpc_mux_impl.h b/source/extensions/config_subscription/grpc/xds_mux/grpc_mux_impl.h index 7669ca73cffd..67746534b092 100644 --- a/source/extensions/config_subscription/grpc/xds_mux/grpc_mux_impl.h +++ b/source/extensions/config_subscription/grpc/xds_mux/grpc_mux_impl.h @@ -66,6 +66,7 @@ class GrpcMuxImpl : public GrpcStreamCallbacks, CustomConfigValidatorsPtr&& config_validators, BackOffStrategyPtr backoff_strategy, XdsConfigTrackerOptRef xds_config_tracker, XdsResourcesDelegateOptRef xds_resources_delegate = absl::nullopt, + EdsResourcesCachePtr eds_resources_cache = nullptr, const std::string& target_xds_authority = ""); ~GrpcMuxImpl() override; @@ -108,6 +109,10 @@ class GrpcMuxImpl : public GrpcStreamCallbacks, genericHandleResponse(message->type_url(), *message, control_plane_stats); } + EdsResourcesCacheOptRef edsResourcesCache() override { + return makeOptRefFromPtr(eds_resources_cache_.get()); + } + GrpcStream& grpcStreamForTest() { return grpc_stream_; } protected: @@ -209,6 +214,7 @@ class GrpcMuxImpl : public GrpcStreamCallbacks, CustomConfigValidatorsPtr config_validators_; XdsConfigTrackerOptRef xds_config_tracker_; XdsResourcesDelegateOptRef xds_resources_delegate_; + EdsResourcesCachePtr eds_resources_cache_; const std::string target_xds_authority_; // True iff Envoy is shutting down; no messages should be sent on the `grpc_stream_` when this is @@ -224,7 +230,8 @@ class GrpcMuxDelta : public GrpcMuxImpl&) override { ENVOY_BUG(false, "unexpected request for on demand update"); } + + EdsResourcesCacheOptRef edsResourcesCache() override { return {}; } }; } // namespace XdsMux diff --git a/test/common/config/grpc_subscription_test_harness.h b/test/common/config/grpc_subscription_test_harness.h index e1e41448ef87..c478901eca49 100644 --- a/test/common/config/grpc_subscription_test_harness.h +++ b/test/common/config/grpc_subscription_test_harness.h @@ -64,15 +64,17 @@ class GrpcSubscriptionTestHarness : public SubscriptionTestHarness { std::unique_ptr(async_client_), dispatcher_, *method_descriptor_, *stats_store_.rootScope(), rate_limit_settings_, local_info_, true, std::move(config_validators_), std::move(backoff_strategy), - /*xds_config_tracker=*/XdsConfigTrackerOptRef()); + /*xds_config_tracker=*/XdsConfigTrackerOptRef(), + /*xds_resources_delegate=*/XdsResourcesDelegateOptRef(), + /*eds_resources_cache=*/nullptr); } else { mux_ = std::make_shared( local_info_, std::unique_ptr(async_client_), dispatcher_, *method_descriptor_, *stats_store_.rootScope(), rate_limit_settings_, true, std::move(config_validators_), std::move(backoff_strategy), /*xds_config_tracker=*/XdsConfigTrackerOptRef(), - /*xds_resources_delegate=*/ - XdsResourcesDelegateOptRef(), + /*xds_resources_delegate=*/XdsResourcesDelegateOptRef(), + /*eds_resources_cache=*/nullptr, /*target_xds_authority=*/""); } subscription_ = std::make_unique( diff --git a/test/extensions/clusters/eds/eds_speed_test.cc b/test/extensions/clusters/eds/eds_speed_test.cc index d30b1e1e1129..68c076f7b6f2 100644 --- a/test/extensions/clusters/eds/eds_speed_test.cc +++ b/test/extensions/clusters/eds/eds_speed_test.cc @@ -58,7 +58,8 @@ class EdsSpeedTest { *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( "envoy.service.endpoint.v3.EndpointDiscoveryService.StreamEndpoints"), scope_, {}, local_info_, true, std::move(config_validators_), std::move(backoff_strategy), - /*xds_config_tracker=*/Config::XdsConfigTrackerOptRef())); + /*xds_config_tracker=*/Config::XdsConfigTrackerOptRef(), + /*xds_resources_delegate=*/{}, /*eds_resources_cache=*/nullptr)); } else { grpc_mux_.reset(new Config::GrpcMuxImpl( local_info_, std::unique_ptr(async_client_), @@ -68,6 +69,7 @@ class EdsSpeedTest { scope_, {}, true, std::move(config_validators_), std::move(backoff_strategy), /*xds_config_tracker=*/Config::XdsConfigTrackerOptRef(), /*xds_resources_delegate=*/Config::XdsResourcesDelegateOptRef(), + /*eds_resources_cache=*/nullptr, /*target_xds_authority=*/"")); } resetCluster(R"EOF( diff --git a/test/extensions/config_subscription/grpc/BUILD b/test/extensions/config_subscription/grpc/BUILD index 46e75f764c04..f043d00891c0 100644 --- a/test/extensions/config_subscription/grpc/BUILD +++ b/test/extensions/config_subscription/grpc/BUILD @@ -22,6 +22,7 @@ envoy_cc_test( "//test/mocks:common_lib", "//test/mocks/config:config_mocks", "//test/mocks/config:custom_config_validators_mocks", + "//test/mocks/config:eds_resources_cache_mocks", "//test/mocks/event:event_mocks", "//test/mocks/grpc:grpc_mocks", "//test/mocks/local_info:local_info_mocks", @@ -43,6 +44,7 @@ envoy_cc_test( "//envoy/config:xds_config_tracker_interface", "//envoy/config:xds_resources_delegate_interface", "//source/common/config:api_version_lib", + "//source/common/config:null_grpc_mux_lib", "//source/common/config:protobuf_link_hacks", "//source/common/protobuf", "//source/common/stats:isolated_store_lib", @@ -51,6 +53,7 @@ envoy_cc_test( "//test/mocks:common_lib", "//test/mocks/config:config_mocks", "//test/mocks/config:custom_config_validators_mocks", + "//test/mocks/config:eds_resources_cache_mocks", "//test/mocks/event:event_mocks", "//test/mocks/grpc:grpc_mocks", "//test/mocks/local_info:local_info_mocks", @@ -76,6 +79,7 @@ envoy_cc_test( "//source/extensions/config_subscription/grpc:grpc_subscription_lib", "//test/mocks:common_lib", "//test/mocks/config:config_mocks", + "//test/mocks/config:eds_resources_cache_mocks", "//test/mocks/event:event_mocks", "//test/mocks/grpc:grpc_mocks", "//test/mocks/local_info:local_info_mocks", @@ -181,6 +185,7 @@ envoy_cc_test( "//test/mocks:common_lib", "//test/mocks/config:config_mocks", "//test/mocks/config:custom_config_validators_mocks", + "//test/mocks/config:eds_resources_cache_mocks", "//test/mocks/event:event_mocks", "//test/mocks/grpc:grpc_mocks", "//test/mocks/local_info:local_info_mocks", @@ -228,6 +233,7 @@ envoy_cc_test( "//source/extensions/config_subscription/grpc:watch_map_lib", "//test/mocks/config:config_mocks", "//test/mocks/config:custom_config_validators_mocks", + "//test/mocks/config:eds_resources_cache_mocks", "//test/test_common:utility_lib", "@envoy_api//envoy/config/endpoint/v3:pkg_cc_proto", "@envoy_api//envoy/service/discovery/v3:pkg_cc_proto", diff --git a/test/extensions/config_subscription/grpc/delta_subscription_impl_test.cc b/test/extensions/config_subscription/grpc/delta_subscription_impl_test.cc index 2b435f510aac..14a7760b6501 100644 --- a/test/extensions/config_subscription/grpc/delta_subscription_impl_test.cc +++ b/test/extensions/config_subscription/grpc/delta_subscription_impl_test.cc @@ -159,13 +159,15 @@ TEST_P(DeltaSubscriptionNoGrpcStreamTest, NoGrpcStream) { std::unique_ptr(async_client), dispatcher, *method_descriptor, *stats_store.rootScope(), rate_limit_settings, local_info, false, std::make_unique>(), std::move(backoff_strategy), - /*xds_config_tracker=*/XdsConfigTrackerOptRef()); + /*xds_config_tracker=*/XdsConfigTrackerOptRef(), + /*eds_resources_cache=*/nullptr); } else { xds_context = std::make_shared( std::unique_ptr(async_client), dispatcher, *method_descriptor, *stats_store.rootScope(), rate_limit_settings, local_info, std::make_unique>(), std::move(backoff_strategy), - /*xds_config_tracker=*/XdsConfigTrackerOptRef()); + /*xds_config_tracker=*/XdsConfigTrackerOptRef(), + /*eds_resources_cache=*/nullptr); } GrpcSubscriptionImplPtr subscription = std::make_unique( diff --git a/test/extensions/config_subscription/grpc/delta_subscription_test_harness.h b/test/extensions/config_subscription/grpc/delta_subscription_test_harness.h index 77c4310137aa..a597f4f7b2dc 100644 --- a/test/extensions/config_subscription/grpc/delta_subscription_test_harness.h +++ b/test/extensions/config_subscription/grpc/delta_subscription_test_harness.h @@ -55,13 +55,15 @@ class DeltaSubscriptionTestHarness : public SubscriptionTestHarness { std::unique_ptr(async_client_), dispatcher_, *method_descriptor_, *stats_store_.rootScope(), rate_limit_settings_, local_info_, false, std::make_unique>(), std::move(backoff_strategy), - /*xds_config_tracker=*/XdsConfigTrackerOptRef()); + /*xds_config_tracker=*/XdsConfigTrackerOptRef(), + /*eds_resources_cache=*/nullptr); } else { xds_context_ = std::make_shared( std::unique_ptr(async_client_), dispatcher_, *method_descriptor_, *stats_store_.rootScope(), rate_limit_settings_, local_info_, std::make_unique>(), std::move(backoff_strategy), - /*xds_config_tracker=*/XdsConfigTrackerOptRef()); + /*xds_config_tracker=*/XdsConfigTrackerOptRef(), + /*eds_resources_cache=*/nullptr); } subscription_ = std::make_unique( xds_context_, callbacks_, resource_decoder_, stats_, diff --git a/test/extensions/config_subscription/grpc/grpc_mux_impl_test.cc b/test/extensions/config_subscription/grpc/grpc_mux_impl_test.cc index af24bf376a88..abe52c8c7098 100644 --- a/test/extensions/config_subscription/grpc/grpc_mux_impl_test.cc +++ b/test/extensions/config_subscription/grpc/grpc_mux_impl_test.cc @@ -8,6 +8,7 @@ #include "source/common/common/empty_string.h" #include "source/common/config/api_version.h" +#include "source/common/config/null_grpc_mux_impl.h" #include "source/common/config/protobuf_link_hacks.h" #include "source/common/config/utility.h" #include "source/common/protobuf/protobuf.h" @@ -17,6 +18,7 @@ #include "test/common/stats/stat_test_utility.h" #include "test/mocks/common.h" #include "test/mocks/config/custom_config_validators.h" +#include "test/mocks/config/eds_resources_cache.h" #include "test/mocks/config/mocks.h" #include "test/mocks/event/mocks.h" #include "test/mocks/grpc/mocks.h" @@ -69,7 +71,8 @@ class GrpcMuxImplTestBase : public testing::Test { SubscriptionFactory::RetryInitialDelayMs, SubscriptionFactory::RetryMaxDelayMs, random_), /*xds_config_tracker=*/XdsConfigTrackerOptRef(), - /*xds_resources_delegate=*/XdsResourcesDelegateOptRef(), /*target_xds_authority=*/""); + /*xds_resources_delegate=*/XdsResourcesDelegateOptRef(), + std::unique_ptr(eds_resources_cache_), /*target_xds_authority=*/""); } void setup(const RateLimitSettings& custom_rate_limit_settings) { @@ -82,7 +85,8 @@ class GrpcMuxImplTestBase : public testing::Test { SubscriptionFactory::RetryInitialDelayMs, SubscriptionFactory::RetryMaxDelayMs, random_), /*xds_config_tracker=*/XdsConfigTrackerOptRef(), - /*xds_resources_delegate=*/XdsResourcesDelegateOptRef(), /*target_xds_authority=*/""); + /*xds_resources_delegate=*/XdsResourcesDelegateOptRef(), + std::unique_ptr(eds_resources_cache_), /*target_xds_authority=*/""); } void expectSendMessage(const std::string& type_url, @@ -123,6 +127,7 @@ class GrpcMuxImplTestBase : public testing::Test { Envoy::Config::RateLimitSettings rate_limit_settings_; Stats::Gauge& control_plane_connected_state_; Stats::Gauge& control_plane_pending_requests_; + MockEdsResourcesCache* eds_resources_cache_{nullptr}; }; class GrpcMuxImplTest : public GrpcMuxImplTestBase { @@ -962,7 +967,7 @@ TEST_F(GrpcMuxImplTest, BadLocalInfoEmptyClusterName) { random_), /*xds_config_tracker=*/XdsConfigTrackerOptRef(), /*xds_resources_delegate=*/XdsResourcesDelegateOptRef(), - /*target_xds_authority=*/""), + /*eds_resources_cache=*/nullptr, /*target_xds_authority=*/""), EnvoyException, "ads: node 'id' and 'cluster' are required. Set it either in 'node' config or via " "--service-node and --service-cluster options."); @@ -981,12 +986,238 @@ TEST_F(GrpcMuxImplTest, BadLocalInfoEmptyNodeName) { SubscriptionFactory::RetryInitialDelayMs, SubscriptionFactory::RetryMaxDelayMs, random_), /*xds_config_tracker=*/XdsConfigTrackerOptRef(), - /*xds_resources_delegate=*/XdsResourcesDelegateOptRef(), /*target_xds_authority=*/""), + /*xds_resources_delegate=*/XdsResourcesDelegateOptRef(), + /*eds_resources_cache=*/nullptr, /*target_xds_authority=*/""), EnvoyException, "ads: node 'id' and 'cluster' are required. Set it either in 'node' config or via " "--service-node and --service-cluster options."); } +// Validates that the EDS cache getter returns the cache. +TEST_F(GrpcMuxImplTest, EdsResourcesCacheForEds) { + eds_resources_cache_ = new NiceMock(); + setup(); + EXPECT_NE({}, grpc_mux_->edsResourcesCache()); +} + +// Validates that the EDS cache getter returns empty if there is no cache. +TEST_F(GrpcMuxImplTest, EdsResourcesCacheForEdsNoCache) { + setup(); + EXPECT_EQ({}, grpc_mux_->edsResourcesCache()); +} + +// Validate that an EDS resource is cached if there's a cache. +TEST_F(GrpcMuxImplTest, CacheEdsResource) { + // Create the cache that will also be passed to the GrpcMux object via setup(). + eds_resources_cache_ = new NiceMock(); + setup(); + + OpaqueResourceDecoderSharedPtr resource_decoder( + std::make_shared>("cluster_name")); + const std::string& type_url = Config::TypeUrl::get().ClusterLoadAssignment; + InSequence s; + auto eds_sub = grpc_mux_->addWatch(type_url, {"x"}, callbacks_, resource_decoder, {}); + + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); + expectSendMessage(type_url, {"x"}, "", true); + grpc_mux_->start(); + + // Reply with the resource, it will be added to the cache. + { + auto response = std::make_unique(); + response->set_type_url(type_url); + response->set_version_info("1"); + envoy::config::endpoint::v3::ClusterLoadAssignment load_assignment; + load_assignment.set_cluster_name("x"); + response->add_resources()->PackFrom(load_assignment); + + EXPECT_CALL(callbacks_, onConfigUpdate(_, "1")) + .WillOnce(Invoke([](const std::vector& resources, const std::string&) { + EXPECT_EQ(1, resources.size()); + })); + EXPECT_CALL(*eds_resources_cache_, setResource("x", ProtoEq(load_assignment))); + expectSendMessage(type_url, {"x"}, "1"); + grpc_mux_->grpcStreamForTest().onReceiveMessage(std::move(response)); + } + + // Envoy will unsubscribe from all resources. + expectSendMessage(type_url, {}, "1"); + EXPECT_CALL(*eds_resources_cache_, removeResource("x")); +} + +// Validate that an update to an EDS resource watcher is reflected in the cache, +// if there's a cache. +TEST_F(GrpcMuxImplTest, UpdateCacheEdsResource) { + // Create the cache that will also be passed to the GrpcMux object via setup(). + eds_resources_cache_ = new NiceMock(); + setup(); + + OpaqueResourceDecoderSharedPtr resource_decoder( + std::make_shared>("cluster_name")); + const std::string& type_url = Config::TypeUrl::get().ClusterLoadAssignment; + InSequence s; + auto eds_sub = grpc_mux_->addWatch(type_url, {"x"}, callbacks_, resource_decoder, {}); + + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); + expectSendMessage(type_url, {"x"}, "", true); + grpc_mux_->start(); + + // Reply with the resource, it will be added to the cache. + { + auto response = std::make_unique(); + response->set_type_url(type_url); + response->set_version_info("1"); + envoy::config::endpoint::v3::ClusterLoadAssignment load_assignment; + load_assignment.set_cluster_name("x"); + response->add_resources()->PackFrom(load_assignment); + + EXPECT_CALL(callbacks_, onConfigUpdate(_, "1")) + .WillOnce(Invoke([](const std::vector& resources, const std::string&) { + EXPECT_EQ(1, resources.size()); + })); + EXPECT_CALL(*eds_resources_cache_, setResource("x", ProtoEq(load_assignment))); + expectSendMessage(type_url, {"x"}, "1"); + grpc_mux_->grpcStreamForTest().onReceiveMessage(std::move(response)); + } + + // Update the cache to another resource. + expectSendMessage(type_url, {}, "1"); + EXPECT_CALL(*eds_resources_cache_, removeResource("x")); + expectSendMessage(type_url, {"y"}, "1"); + eds_sub->update({"y"}); + + // Envoy will unsubscribe from all resources. + expectSendMessage(type_url, {}, "1"); + EXPECT_CALL(*eds_resources_cache_, removeResource("y")); +} + +// Validate that adding and removing watchers reflects on the cache changes, +// if there's a cache. +TEST_F(GrpcMuxImplTest, AddRemoveSubscriptions) { + // Create the cache that will also be passed to the GrpcMux object via setup(). + eds_resources_cache_ = new NiceMock(); + setup(); + + OpaqueResourceDecoderSharedPtr resource_decoder( + std::make_shared>("cluster_name")); + const std::string& type_url = Config::TypeUrl::get().ClusterLoadAssignment; + InSequence s; + + { + auto eds_sub = grpc_mux_->addWatch(type_url, {"x"}, callbacks_, resource_decoder, {}); + + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); + expectSendMessage(type_url, {"x"}, "", true); + grpc_mux_->start(); + + // Reply with the resource, it will be added to the cache. + { + auto response = std::make_unique(); + response->set_type_url(type_url); + response->set_version_info("1"); + envoy::config::endpoint::v3::ClusterLoadAssignment load_assignment; + load_assignment.set_cluster_name("x"); + response->add_resources()->PackFrom(load_assignment); + + EXPECT_CALL(callbacks_, onConfigUpdate(_, "1")) + .WillOnce(Invoke([](const std::vector& resources, + const std::string&) { EXPECT_EQ(1, resources.size()); })); + EXPECT_CALL(*eds_resources_cache_, setResource("x", ProtoEq(load_assignment))); + expectSendMessage(type_url, {"x"}, "1"); // Ack. + grpc_mux_->grpcStreamForTest().onReceiveMessage(std::move(response)); + } + + // Watcher (eds_sub) going out of scope, the resource should be removed, as well as + // the interest. + expectSendMessage(type_url, {}, "1"); + EXPECT_CALL(*eds_resources_cache_, removeResource("x")); + } + + // Update to a new resource interest. + { + expectSendMessage(type_url, {"y"}, "1"); + auto eds_sub2 = grpc_mux_->addWatch(type_url, {"y"}, callbacks_, resource_decoder, {}); + + // Reply with the resource, it will be added to the cache. + { + auto response = std::make_unique(); + response->set_type_url(type_url); + response->set_version_info("2"); + envoy::config::endpoint::v3::ClusterLoadAssignment load_assignment; + load_assignment.set_cluster_name("y"); + response->add_resources()->PackFrom(load_assignment); + + EXPECT_CALL(callbacks_, onConfigUpdate(_, "2")) + .WillOnce(Invoke([](const std::vector& resources, + const std::string&) { EXPECT_EQ(1, resources.size()); })); + EXPECT_CALL(*eds_resources_cache_, setResource("y", ProtoEq(load_assignment))); + expectSendMessage(type_url, {"y"}, "2"); // Ack. + grpc_mux_->grpcStreamForTest().onReceiveMessage(std::move(response)); + } + + // Watcher (eds_sub2) going out of scope, the resource should be removed, as well as + // the interest. + expectSendMessage(type_url, {}, "2"); + EXPECT_CALL(*eds_resources_cache_, removeResource("y")); + } +} + +/** + * Tests the NullGrpcMuxImpl object to increase code-coverage. + */ +class NullGrpcMuxImplTest : public testing::Test { +public: + NullGrpcMuxImplTest() {} + NullGrpcMuxImpl null_mux_; + NiceMock callbacks_; +}; + +TEST_F(NullGrpcMuxImplTest, StartImplemented) { EXPECT_NO_THROW(null_mux_.start()); } + +TEST_F(NullGrpcMuxImplTest, PauseImplemented) { + ScopedResume scoped; + EXPECT_NO_THROW(scoped = null_mux_.pause("ignored")); +} + +TEST_F(NullGrpcMuxImplTest, PauseMultipleArgsImplemented) { + ScopedResume scoped; + const std::vector params = {"ignored", "another_ignored"}; + EXPECT_NO_THROW(scoped = null_mux_.pause(params)); +} + +TEST_F(NullGrpcMuxImplTest, RequestOnDemandNotImplemented) { + EXPECT_ENVOY_BUG(null_mux_.requestOnDemandUpdate("type_url", {"for_update"}), + "unexpected request for on demand update"); +} + +TEST_F(NullGrpcMuxImplTest, AddWatchRaisesException) { + NiceMock callbacks; + OpaqueResourceDecoderSharedPtr resource_decoder( + std::make_shared>("cluster_name")); + + EXPECT_THROW_WITH_REGEX(null_mux_.addWatch("type_url", {}, callbacks, resource_decoder, {}), + EnvoyException, "ADS must be configured to support an ADS config source"); +} + +TEST_F(NullGrpcMuxImplTest, NoEdsResourcesCache) { EXPECT_EQ({}, null_mux_.edsResourcesCache()); } +TEST_F(NullGrpcMuxImplTest, OnWriteableImplemented) { EXPECT_NO_THROW(null_mux_.onWriteable()); } +TEST_F(NullGrpcMuxImplTest, OnStreamEstablishedImplemented) { + EXPECT_NO_THROW(null_mux_.onStreamEstablished()); +} +TEST_F(NullGrpcMuxImplTest, OnEstablishmentFailureImplemented) { + EXPECT_NO_THROW(null_mux_.onEstablishmentFailure()); +} +TEST_F(NullGrpcMuxImplTest, OnDiscoveryResponseImplemented) { + std::unique_ptr response; + Stats::TestUtil::TestStore stats; + ControlPlaneStats cp_stats{Utility::generateControlPlaneStats(*stats.rootScope())}; + EXPECT_NO_THROW(null_mux_.onDiscoveryResponse(std::move(response), cp_stats)); +} + } // namespace } // namespace Config } // namespace Envoy diff --git a/test/extensions/config_subscription/grpc/new_grpc_mux_impl_test.cc b/test/extensions/config_subscription/grpc/new_grpc_mux_impl_test.cc index 3712709f9f73..ebec8abfc6c4 100644 --- a/test/extensions/config_subscription/grpc/new_grpc_mux_impl_test.cc +++ b/test/extensions/config_subscription/grpc/new_grpc_mux_impl_test.cc @@ -17,6 +17,7 @@ #include "test/config/v2_link_hacks.h" #include "test/mocks/common.h" #include "test/mocks/config/custom_config_validators.h" +#include "test/mocks/config/eds_resources_cache.h" #include "test/mocks/config/mocks.h" #include "test/mocks/event/mocks.h" #include "test/mocks/grpc/mocks.h" @@ -70,7 +71,8 @@ class NewGrpcMuxImplTestBase : public testing::TestWithParam { "envoy.service.discovery.v2.AggregatedDiscoveryService.StreamAggregatedResources"), *stats_.rootScope(), rate_limit_settings_, local_info_, false, std::move(config_validators_), std::move(backoff_strategy), - /*xds_config_tracker=*/XdsConfigTrackerOptRef()); + /*xds_config_tracker=*/XdsConfigTrackerOptRef(), + std::unique_ptr(eds_resources_cache_)); return; } grpc_mux_ = std::make_unique( @@ -79,7 +81,8 @@ class NewGrpcMuxImplTestBase : public testing::TestWithParam { "envoy.service.discovery.v3.AggregatedDiscoveryService.StreamAggregatedResources"), *stats_.rootScope(), rate_limit_settings_, local_info_, std::move(config_validators_), std::move(backoff_strategy), - /*xds_config_tracker=*/XdsConfigTrackerOptRef()); + /*xds_config_tracker=*/XdsConfigTrackerOptRef(), + std::unique_ptr(eds_resources_cache_)); } void expectSendMessage(const std::string& type_url, @@ -172,6 +175,7 @@ class NewGrpcMuxImplTestBase : public testing::TestWithParam { ControlPlaneStats control_plane_stats_; Stats::Gauge& control_plane_connected_state_; bool should_use_unified_; + MockEdsResourcesCache* eds_resources_cache_{nullptr}; }; class NewGrpcMuxImplTest : public NewGrpcMuxImplTestBase { @@ -561,6 +565,195 @@ TEST_P(NewGrpcMuxImplTest, Shutdown) { // There won't be any unsubscribe messages for the legacy mux either for the same reason } +// Validates that the EDS cache getter returns the cache. +TEST_P(NewGrpcMuxImplTest, EdsResourcesCacheForEds) { + eds_resources_cache_ = new NiceMock(); + setup(); + EXPECT_NE({}, grpc_mux_->edsResourcesCache()); +} + +// Validates that the EDS cache getter returns empty if there is no cache. +TEST_P(NewGrpcMuxImplTest, EdsResourcesCacheForEdsNoCache) { + setup(); + EXPECT_EQ({}, grpc_mux_->edsResourcesCache()); +} + +// Validate that an EDS resource is cached if there's a cache. +TEST_P(NewGrpcMuxImplTest, CacheEdsResource) { + // Create the cache that will also be passed to the GrpcMux object via setup(). + eds_resources_cache_ = new NiceMock(); + setup(); + InSequence s; + const std::string& type_url = Config::TypeUrl::get().ClusterLoadAssignment; + auto watch = grpc_mux_->addWatch(type_url, {"x"}, callbacks_, resource_decoder_, {}); + + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); + expectSendMessage(type_url, {"x"}, {}); + grpc_mux_->start(); + + // Reply with the resource, it will be added to the cache. + { + auto response = std::make_unique(); + response->set_type_url(type_url); + response->set_system_version_info("1"); + envoy::config::endpoint::v3::ClusterLoadAssignment load_assignment; + auto res = response->add_resources(); + res->set_name("x"); + res->set_version("1"); + load_assignment.set_cluster_name("x"); + res->mutable_resource()->PackFrom(load_assignment); + + EXPECT_CALL(callbacks_, onConfigUpdate(_, _, "1")) + .WillOnce(Invoke([&load_assignment](const std::vector& added_resources, + const Protobuf::RepeatedPtrField&, + const std::string&) { + EXPECT_EQ(1, added_resources.size()); + EXPECT_TRUE( + TestUtility::protoEqual(added_resources[0].get().resource(), load_assignment)); + })); + EXPECT_CALL(*eds_resources_cache_, setResource("x", ProtoEq(load_assignment))); + expectSendMessage(type_url, {}, {}); // Ack. + onDiscoveryResponse(std::move(response)); + } + + // Envoy will unsubscribe from all resources. + EXPECT_CALL(*eds_resources_cache_, removeResource("x")); + expectSendMessage(type_url, {}, {"x"}); +} + +// Validate that an update to an EDS resource watcher is reflected in the cache, +// if there's a cache. +TEST_P(NewGrpcMuxImplTest, UpdateCacheEdsResource) { + // Create the cache that will also be passed to the GrpcMux object via setup(). + eds_resources_cache_ = new NiceMock(); + setup(); + InSequence s; + const std::string& type_url = Config::TypeUrl::get().ClusterLoadAssignment; + auto watch = grpc_mux_->addWatch(type_url, {"x"}, callbacks_, resource_decoder_, {}); + + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); + expectSendMessage(type_url, {"x"}, {}); + grpc_mux_->start(); + + // Reply with the resource, it will be added to the cache. + { + auto response = std::make_unique(); + response->set_type_url(type_url); + response->set_system_version_info("1"); + envoy::config::endpoint::v3::ClusterLoadAssignment load_assignment; + auto res = response->add_resources(); + res->set_name("x"); + res->set_version("1"); + load_assignment.set_cluster_name("x"); + res->mutable_resource()->PackFrom(load_assignment); + + EXPECT_CALL(callbacks_, onConfigUpdate(_, _, "1")) + .WillOnce(Invoke([&load_assignment](const std::vector& added_resources, + const Protobuf::RepeatedPtrField&, + const std::string&) { + EXPECT_EQ(1, added_resources.size()); + EXPECT_TRUE( + TestUtility::protoEqual(added_resources[0].get().resource(), load_assignment)); + })); + EXPECT_CALL(*eds_resources_cache_, setResource("x", ProtoEq(load_assignment))); + expectSendMessage(type_url, {}, {}); // Ack. + onDiscoveryResponse(std::move(response)); + } + + // Update the cache to another resource. + EXPECT_CALL(*eds_resources_cache_, removeResource("x")); + expectSendMessage(type_url, {"y"}, {"x"}); + watch->update({"y"}); + + // Envoy will unsubscribe from all resources. + EXPECT_CALL(*eds_resources_cache_, removeResource("y")); + expectSendMessage(type_url, {}, {"y"}); +} + +// Validate that adding and removing watchers reflects on the cache changes, +// if there's a cache. +TEST_P(NewGrpcMuxImplTest, AddRemoveSubscriptions) { + // Create the cache that will also be passed to the GrpcMux object via setup(). + eds_resources_cache_ = new NiceMock(); + setup(); + InSequence s; + const std::string& type_url = Config::TypeUrl::get().ClusterLoadAssignment; + + { + auto watch1 = grpc_mux_->addWatch(type_url, {"x"}, callbacks_, resource_decoder_, {}); + + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); + expectSendMessage(type_url, {"x"}, {}); + grpc_mux_->start(); + + // Reply with the resource, it will be added to the cache. + { + auto response = std::make_unique(); + response->set_type_url(type_url); + response->set_system_version_info("1"); + envoy::config::endpoint::v3::ClusterLoadAssignment load_assignment; + auto res = response->add_resources(); + res->set_name("x"); + res->set_version("1"); + load_assignment.set_cluster_name("x"); + res->mutable_resource()->PackFrom(load_assignment); + + EXPECT_CALL(callbacks_, onConfigUpdate(_, _, "1")) + .WillOnce(Invoke([&load_assignment]( + const std::vector& added_resources, + const Protobuf::RepeatedPtrField&, const std::string&) { + EXPECT_EQ(1, added_resources.size()); + EXPECT_TRUE( + TestUtility::protoEqual(added_resources[0].get().resource(), load_assignment)); + })); + EXPECT_CALL(*eds_resources_cache_, setResource("x", ProtoEq(load_assignment))); + expectSendMessage(type_url, {}, {}); // Ack. + onDiscoveryResponse(std::move(response)); + } + + // Watcher (watch1) going out of scope, the resource should be removed, as well as + // the interest. + EXPECT_CALL(*eds_resources_cache_, removeResource("x")); + expectSendMessage(type_url, {}, {"x"}); + } + + // Update to a new resource interest. + { + expectSendMessage(type_url, {"y"}, {}); + auto watch2 = grpc_mux_->addWatch(type_url, {"y"}, callbacks_, resource_decoder_, {}); + + // Reply with the resource, it will be added to the cache. + { + auto response = std::make_unique(); + response->set_type_url(type_url); + response->set_system_version_info("2"); + envoy::config::endpoint::v3::ClusterLoadAssignment load_assignment; + auto res = response->add_resources(); + res->set_name("y"); + res->set_version("2"); + load_assignment.set_cluster_name("y"); + res->mutable_resource()->PackFrom(load_assignment); + + EXPECT_CALL(callbacks_, onConfigUpdate(_, _, "2")) + .WillOnce(Invoke([&load_assignment]( + const std::vector& added_resources, + const Protobuf::RepeatedPtrField&, const std::string&) { + EXPECT_EQ(1, added_resources.size()); + EXPECT_TRUE( + TestUtility::protoEqual(added_resources[0].get().resource(), load_assignment)); + })); + EXPECT_CALL(*eds_resources_cache_, setResource("y", ProtoEq(load_assignment))); + expectSendMessage(type_url, {}, {}); // Ack. + onDiscoveryResponse(std::move(response)); + } + + // Watcher (watch2) going out of scope, the resource should be removed, as well as + // the interest. + EXPECT_CALL(*eds_resources_cache_, removeResource("y")); + expectSendMessage(type_url, {}, {"y"}); + } +} + } // namespace } // namespace Config } // namespace Envoy diff --git a/test/extensions/config_subscription/grpc/watch_map_test.cc b/test/extensions/config_subscription/grpc/watch_map_test.cc index dcdbe9a418ee..771c26cea73c 100644 --- a/test/extensions/config_subscription/grpc/watch_map_test.cc +++ b/test/extensions/config_subscription/grpc/watch_map_test.cc @@ -9,6 +9,7 @@ #include "source/extensions/config_subscription/grpc/watch_map.h" #include "test/mocks/config/custom_config_validators.h" +#include "test/mocks/config/eds_resources_cache.h" #include "test/mocks/config/mocks.h" #include "test/test_common/utility.h" @@ -127,7 +128,7 @@ TEST(WatchMapTest, Basic) { TestUtility::TestOpaqueResourceDecoderImpl resource_decoder("cluster_name"); NiceMock config_validators; - WatchMap watch_map(false, "ClusterLoadAssignmentType", config_validators); + WatchMap watch_map(false, "ClusterLoadAssignmentType", config_validators, {}); Watch* watch = watch_map.addWatch(callbacks, resource_decoder); { @@ -201,7 +202,7 @@ TEST(WatchMapTest, Overlap) { TestUtility::TestOpaqueResourceDecoderImpl resource_decoder("cluster_name"); NiceMock config_validators; - WatchMap watch_map(false, "ClusterLoadAssignmentType", config_validators); + WatchMap watch_map(false, "ClusterLoadAssignmentType", config_validators, {}); Watch* watch1 = watch_map.addWatch(callbacks1, resource_decoder); Watch* watch2 = watch_map.addWatch(callbacks2, resource_decoder); @@ -259,12 +260,99 @@ TEST(WatchMapTest, Overlap) { } } +// Checks that a resource is added to the cache the first time it is received, +// and removed when there's no more interest. +TEST(WatchMapTest, CacheResourceAddResource) { + MockSubscriptionCallbacks callbacks1; + MockSubscriptionCallbacks callbacks2; + TestUtility::TestOpaqueResourceDecoderImpl + resource_decoder("cluster_name"); + NiceMock config_validators; + NiceMock eds_resources_cache; + const std::string eds_type_url = + Config::getTypeUrl(); + WatchMap watch_map(false, eds_type_url, config_validators, + makeOptRef(eds_resources_cache)); + // The test uses 2 watchers to ensure that interest is kept regardless of + // which watcher was the first to add a watch for the assignment. + Watch* watch1 = watch_map.addWatch(callbacks1, resource_decoder); + Watch* watch2 = watch_map.addWatch(callbacks2, resource_decoder); + + Protobuf::RepeatedPtrField updated_resources; + envoy::config::endpoint::v3::ClusterLoadAssignment alice; + alice.set_cluster_name("alice"); + updated_resources.Add()->PackFrom(alice); + + // First watch becomes interested. + { + absl::flat_hash_set update_to({"alice", "dummy"}); + // "alice" isn't known - no need to remove from the cache. + EXPECT_CALL(eds_resources_cache, removeResource("alice")).Times(0); + AddedRemoved added_removed = watch_map.updateWatchInterest(watch1, update_to); + EXPECT_EQ(update_to, added_removed.added_); // add to subscription + EXPECT_TRUE(added_removed.removed_.empty()); + watch_map.updateWatchInterest(watch2, {"dummy"}); + + // *Only* first watch receives update. + expectDeltaAndSotwUpdate(callbacks1, {alice}, {}, "version1"); + expectNoUpdate(callbacks2, "version1"); + // A call for SotW and a call for Delta to the cache's setResource method. + EXPECT_CALL(eds_resources_cache, setResource("alice", ProtoEq(alice))).Times(2); + doDeltaAndSotwUpdate(watch_map, updated_resources, {}, "version1"); + } + // Second watch becomes interested. + { + absl::flat_hash_set update_to({"alice", "dummy"}); + // "alice" is known, and there's still interest - no removal. + EXPECT_CALL(eds_resources_cache, removeResource("alice")).Times(0); + AddedRemoved added_removed = watch_map.updateWatchInterest(watch2, update_to); + EXPECT_TRUE(added_removed.added_.empty()); // nothing happens + EXPECT_TRUE(added_removed.removed_.empty()); + + // Both watches receive update. + expectDeltaAndSotwUpdate(callbacks1, {alice}, {}, "version2"); + expectDeltaAndSotwUpdate(callbacks2, {alice}, {}, "version2"); + // A call for SotW and a call for Delta to the cache's setResource method. + EXPECT_CALL(eds_resources_cache, setResource("alice", ProtoEq(alice))).Times(2); + doDeltaAndSotwUpdate(watch_map, updated_resources, {}, "version2"); + } + // First watch loses interest. + { + // "alice" is known, and there's still interest - no removal. + EXPECT_CALL(eds_resources_cache, removeResource("alice")).Times(0); + AddedRemoved added_removed = watch_map.updateWatchInterest(watch1, {"dummy"}); + EXPECT_TRUE(added_removed.added_.empty()); // nothing happens + EXPECT_TRUE(added_removed.removed_.empty()); + + // Both watches receive the update. For watch2, this is obviously desired. + expectDeltaAndSotwUpdate(callbacks2, {alice}, {}, "version3"); + // For watch1, it's more subtle: the WatchMap sees that this update has no + // resources watch1 cares about, but also knows that watch1 previously had + // some resources. So, it must inform watch1 that it now has no resources. + // (SotW only: delta's explicit removals avoid the need for this guessing.) + expectEmptySotwNoDeltaUpdate(callbacks1, "version3"); + // A call for SotW and a call for Delta to the cache's setResource method. + EXPECT_CALL(eds_resources_cache, setResource("alice", ProtoEq(alice))).Times(2); + doDeltaAndSotwUpdate(watch_map, updated_resources, {}, "version3"); + } + // Second watch loses interest. + { + // A call for the cache's removeResource method as there's no more + // interest in "alice". + EXPECT_CALL(eds_resources_cache, removeResource("alice")); + AddedRemoved added_removed = watch_map.updateWatchInterest(watch2, {"dummy"}); + EXPECT_TRUE(added_removed.added_.empty()); + EXPECT_EQ(absl::flat_hash_set({"alice"}), + added_removed.removed_); // remove from subscription + } +} + // These are regression tests for #11877, validate that when two watches point at the same // watched resource, and an update to one of the watches removes one or both of them, that // WatchMap defers deletes and doesn't crash. class SameWatchRemoval : public testing::Test { public: - SameWatchRemoval() : watch_map_(false, "ClusterLoadAssignmentType", config_validators) {} + SameWatchRemoval() : watch_map_(false, "ClusterLoadAssignmentType", config_validators, {}) {} void SetUp() override { envoy::config::endpoint::v3::ClusterLoadAssignment alice; @@ -343,7 +431,7 @@ TEST(WatchMapTest, AddRemoveAdd) { TestUtility::TestOpaqueResourceDecoderImpl resource_decoder("cluster_name"); NiceMock config_validators; - WatchMap watch_map(false, "ClusterLoadAssignmentType", config_validators); + WatchMap watch_map(false, "ClusterLoadAssignmentType", config_validators, {}); Watch* watch1 = watch_map.addWatch(callbacks1, resource_decoder); Watch* watch2 = watch_map.addWatch(callbacks2, resource_decoder); @@ -400,7 +488,7 @@ TEST(WatchMapTest, UninterestingUpdate) { TestUtility::TestOpaqueResourceDecoderImpl resource_decoder("cluster_name"); NiceMock config_validators; - WatchMap watch_map(false, "ClusterLoadAssignmentType", config_validators); + WatchMap watch_map(false, "ClusterLoadAssignmentType", config_validators, {}); Watch* watch = watch_map.addWatch(callbacks, resource_decoder); watch_map.updateWatchInterest(watch, {"alice"}); @@ -445,7 +533,7 @@ TEST(WatchMapTest, WatchingEverything) { TestUtility::TestOpaqueResourceDecoderImpl resource_decoder("cluster_name"); NiceMock config_validators; - WatchMap watch_map(false, "ClusterLoadAssignmentType", config_validators); + WatchMap watch_map(false, "ClusterLoadAssignmentType", config_validators, {}); /*Watch* watch1 = */ watch_map.addWatch(callbacks1, resource_decoder); Watch* watch2 = watch_map.addWatch(callbacks2, resource_decoder); // watch1 never specifies any names, and so is treated as interested in everything. @@ -482,7 +570,7 @@ TEST(WatchMapTest, DeltaOnConfigUpdate) { TestUtility::TestOpaqueResourceDecoderImpl resource_decoder("cluster_name"); NiceMock config_validators; - WatchMap watch_map(false, "ClusterLoadAssignmentType", config_validators); + WatchMap watch_map(false, "ClusterLoadAssignmentType", config_validators, {}); Watch* watch1 = watch_map.addWatch(callbacks1, resource_decoder); Watch* watch2 = watch_map.addWatch(callbacks2, resource_decoder); Watch* watch3 = watch_map.addWatch(callbacks3, resource_decoder); @@ -516,7 +604,7 @@ TEST(WatchMapTest, DeltaOnConfigUpdate) { TEST(WatchMapTest, OnConfigUpdateFailed) { NiceMock config_validators; - WatchMap watch_map(false, "ClusterLoadAssignmentType", config_validators); + WatchMap watch_map(false, "ClusterLoadAssignmentType", config_validators, {}); // calling on empty map doesn't break watch_map.onConfigUpdateFailed(ConfigUpdateFailureReason::UpdateRejected, nullptr); @@ -538,7 +626,7 @@ TEST(WatchMapTest, OnConfigUpdateXdsTpGlobCollections) { TestUtility::TestOpaqueResourceDecoderImpl resource_decoder("cluster_name"); NiceMock config_validators; - WatchMap watch_map(false, "ClusterLoadAssignmentType", config_validators); + WatchMap watch_map(false, "ClusterLoadAssignmentType", config_validators, {}); Watch* watch = watch_map.addWatch(callbacks, resource_decoder); watch_map.updateWatchInterest(watch, {"xdstp://foo/bar/baz/*?some=thing&thing=some"}); @@ -583,7 +671,7 @@ TEST(WatchMapTest, OnConfigUpdateXdsTpSingletons) { TestUtility::TestOpaqueResourceDecoderImpl resource_decoder("cluster_name"); NiceMock config_validators; - WatchMap watch_map(false, "ClusterLoadAssignmentType", config_validators); + WatchMap watch_map(false, "ClusterLoadAssignmentType", config_validators, {}); Watch* watch = watch_map.addWatch(callbacks, resource_decoder); watch_map.updateWatchInterest(watch, {"xdstp://foo/bar/baz?some=thing&thing=some"}); @@ -624,7 +712,7 @@ TEST(WatchMapTest, OnConfigUpdateUsingNamespaces) { TestUtility::TestOpaqueResourceDecoderImpl resource_decoder("cluster_name"); NiceMock config_validators; - WatchMap watch_map(true, "ClusterLoadAssignmentType", config_validators); + WatchMap watch_map(true, "ClusterLoadAssignmentType", config_validators, {}); Watch* watch1 = watch_map.addWatch(callbacks1, resource_decoder); Watch* watch2 = watch_map.addWatch(callbacks2, resource_decoder); Watch* watch3 = watch_map.addWatch(callbacks3, resource_decoder); @@ -677,6 +765,10 @@ TEST(WatchMapTest, OnConfigUpdateUsingNamespaces) { } } +// TODO(adip): Add tests that use the eds cache. +// Needs to test the following function onConfigUpdate (sotw&delta) and +// updateWatchInterest + } // namespace } // namespace Config } // namespace Envoy diff --git a/test/extensions/config_subscription/grpc/xds_grpc_mux_impl_test.cc b/test/extensions/config_subscription/grpc/xds_grpc_mux_impl_test.cc index 8530afb7a64f..0ecb2e8cad5e 100644 --- a/test/extensions/config_subscription/grpc/xds_grpc_mux_impl_test.cc +++ b/test/extensions/config_subscription/grpc/xds_grpc_mux_impl_test.cc @@ -17,6 +17,7 @@ #include "test/config/v2_link_hacks.h" #include "test/mocks/common.h" #include "test/mocks/config/custom_config_validators.h" +#include "test/mocks/config/eds_resources_cache.h" #include "test/mocks/config/mocks.h" #include "test/mocks/event/mocks.h" #include "test/mocks/grpc/mocks.h" @@ -58,17 +59,7 @@ class GrpcMuxImplTestBase : public testing::Test { control_plane_pending_requests_(stats_.gauge("control_plane.pending_requests", Stats::Gauge::ImportMode::NeverImport)) {} - void setup() { - grpc_mux_ = std::make_unique( - std::unique_ptr(async_client_), dispatcher_, - *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( - "envoy.service.discovery.v2.AggregatedDiscoveryService.StreamAggregatedResources"), - *stats_.rootScope(), rate_limit_settings_, local_info_, true, std::move(config_validators_), - std::make_unique( - SubscriptionFactory::RetryInitialDelayMs, SubscriptionFactory::RetryMaxDelayMs, - random_), - /*xds_config_tracker=*/XdsConfigTrackerOptRef()); - } + void setup() { setup(rate_limit_settings_); } void setup(const RateLimitSettings& custom_rate_limit_settings) { grpc_mux_ = std::make_unique( @@ -80,7 +71,9 @@ class GrpcMuxImplTestBase : public testing::Test { std::make_unique( SubscriptionFactory::RetryInitialDelayMs, SubscriptionFactory::RetryMaxDelayMs, random_), - /*xds_config_tracker=*/XdsConfigTrackerOptRef()); + /*xds_config_tracker=*/XdsConfigTrackerOptRef(), + /*xds_resources_delegate=*/XdsResourcesDelegateOptRef(), + std::unique_ptr(eds_resources_cache_), /*target_xds_authority=*/""); } void expectSendMessage(const std::string& type_url, @@ -138,6 +131,7 @@ class GrpcMuxImplTestBase : public testing::Test { Envoy::Config::RateLimitSettings rate_limit_settings_; Stats::Gauge& control_plane_connected_state_; Stats::Gauge& control_plane_pending_requests_; + MockEdsResourcesCache* eds_resources_cache_{nullptr}; }; class GrpcMuxImplTest : public GrpcMuxImplTestBase { @@ -1045,6 +1039,177 @@ TEST_F(GrpcMuxImplTest, AllMuxesStateTest) { EXPECT_TRUE(grpc_mux_1->isShutdown()); } +// Validates that the EDS cache getter returns the cache. +TEST_F(GrpcMuxImplTest, EdsResourcesCacheForEds) { + eds_resources_cache_ = new NiceMock(); + setup(); + EXPECT_NE({}, grpc_mux_->edsResourcesCache()); +} + +// Validates that the EDS cache getter returns empty if there is no cache. +TEST_F(GrpcMuxImplTest, EdsResourcesCacheForEdsNoCache) { + setup(); + EXPECT_EQ({}, grpc_mux_->edsResourcesCache()); +} + +// Validate that an EDS resource is cached if there's a cache. +TEST_F(GrpcMuxImplTest, CacheEdsResource) { + // Create the cache that will also be passed to the GrpcMux object via setup(). + eds_resources_cache_ = new NiceMock(); + setup(); + + OpaqueResourceDecoderSharedPtr resource_decoder( + std::make_shared>("cluster_name")); + const std::string& type_url = Config::TypeUrl::get().ClusterLoadAssignment; + InSequence s; + auto eds_sub = makeWatch(type_url, {"x"}); + + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); + expectSendMessage(type_url, {"x"}, "", true); + grpc_mux_->start(); + + // Reply with the resource, it will be added to the cache. + { + auto response = std::make_unique(); + response->set_type_url(type_url); + response->set_version_info("1"); + envoy::config::endpoint::v3::ClusterLoadAssignment load_assignment; + load_assignment.set_cluster_name("x"); + response->add_resources()->PackFrom(load_assignment); + + EXPECT_CALL(callbacks_, onConfigUpdate(_, "1")) + .WillOnce(Invoke([](const std::vector& resources, const std::string&) { + EXPECT_EQ(1, resources.size()); + })); + EXPECT_CALL(*eds_resources_cache_, setResource("x", ProtoEq(load_assignment))); + expectSendMessage(type_url, {"x"}, "1"); + grpc_mux_->onDiscoveryResponse(std::move(response), control_plane_stats_); + } + + // Envoy will unsubscribe from all resources. + EXPECT_CALL(*eds_resources_cache_, removeResource("x")); + expectSendMessage(type_url, {}, "1"); +} + +// Validate that an update to an EDS resource watcher is reflected in the cache, +// if there's a cache. +TEST_F(GrpcMuxImplTest, UpdateCacheEdsResource) { + // Create the cache that will also be passed to the GrpcMux object via setup(). + eds_resources_cache_ = new NiceMock(); + setup(); + + OpaqueResourceDecoderSharedPtr resource_decoder( + std::make_shared>("cluster_name")); + const std::string& type_url = Config::TypeUrl::get().ClusterLoadAssignment; + InSequence s; + auto eds_sub = makeWatch(type_url, {"x"}); + + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); + expectSendMessage(type_url, {"x"}, "", true); + grpc_mux_->start(); + + // Reply with the resource, it will be added to the cache. + { + auto response = std::make_unique(); + response->set_type_url(type_url); + response->set_version_info("1"); + envoy::config::endpoint::v3::ClusterLoadAssignment load_assignment; + load_assignment.set_cluster_name("x"); + response->add_resources()->PackFrom(load_assignment); + + EXPECT_CALL(callbacks_, onConfigUpdate(_, "1")) + .WillOnce(Invoke([](const std::vector& resources, const std::string&) { + EXPECT_EQ(1, resources.size()); + })); + EXPECT_CALL(*eds_resources_cache_, setResource("x", ProtoEq(load_assignment))); + expectSendMessage(type_url, {"x"}, "1"); + grpc_mux_->onDiscoveryResponse(std::move(response), control_plane_stats_); + } + + // Update the cache to another resource. + EXPECT_CALL(*eds_resources_cache_, removeResource("x")); + expectSendMessage(type_url, {"y"}, "1"); + eds_sub->update({"y"}); + + // Envoy will unsubscribe from all resources. + EXPECT_CALL(*eds_resources_cache_, removeResource("y")); + expectSendMessage(type_url, {}, "1"); +} + +// Validate that adding and removing watchers reflects on the cache changes, +// if there's a cache. +TEST_F(GrpcMuxImplTest, AddRemoveSubscriptions) { + // Create the cache that will also be passed to the GrpcMux object via setup(). + eds_resources_cache_ = new NiceMock(); + setup(); + + OpaqueResourceDecoderSharedPtr resource_decoder( + std::make_shared>("cluster_name")); + const std::string& type_url = Config::TypeUrl::get().ClusterLoadAssignment; + InSequence s; + + { + auto eds_sub = makeWatch(type_url, {"x"}); + + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); + expectSendMessage(type_url, {"x"}, "", true); + grpc_mux_->start(); + + // Reply with the resource, it will be added to the cache. + { + auto response = std::make_unique(); + response->set_type_url(type_url); + response->set_version_info("1"); + envoy::config::endpoint::v3::ClusterLoadAssignment load_assignment; + load_assignment.set_cluster_name("x"); + response->add_resources()->PackFrom(load_assignment); + + EXPECT_CALL(callbacks_, onConfigUpdate(_, "1")) + .WillOnce(Invoke([](const std::vector& resources, + const std::string&) { EXPECT_EQ(1, resources.size()); })); + EXPECT_CALL(*eds_resources_cache_, setResource("x", ProtoEq(load_assignment))); + expectSendMessage(type_url, {"x"}, "1"); // Ack. + grpc_mux_->onDiscoveryResponse(std::move(response), control_plane_stats_); + } + + // Watcher (eds_sub) going out of scope, the resource should be removed, as well as + // the interest. + EXPECT_CALL(*eds_resources_cache_, removeResource("x")); + expectSendMessage(type_url, {}, "1"); + } + + // Update to a new resource interest. + { + expectSendMessage(type_url, {"y"}, "1"); + auto eds_sub2 = makeWatch(type_url, {"y"}); + + // Reply with the resource, it will be added to the cache. + { + auto response = std::make_unique(); + response->set_type_url(type_url); + response->set_version_info("2"); + envoy::config::endpoint::v3::ClusterLoadAssignment load_assignment; + load_assignment.set_cluster_name("y"); + response->add_resources()->PackFrom(load_assignment); + + EXPECT_CALL(callbacks_, onConfigUpdate(_, "2")) + .WillOnce(Invoke([](const std::vector& resources, + const std::string&) { EXPECT_EQ(1, resources.size()); })); + EXPECT_CALL(*eds_resources_cache_, setResource("y", ProtoEq(load_assignment))); + expectSendMessage(type_url, {"y"}, "2"); // Ack. + grpc_mux_->onDiscoveryResponse(std::move(response), control_plane_stats_); + } + + // Watcher (eds_sub2) going out of scope, the resource should be removed, as well as + // the interest. + EXPECT_CALL(*eds_resources_cache_, removeResource("y")); + expectSendMessage(type_url, {}, "2"); + } +} + class NullGrpcMuxImplTest : public testing::Test { public: NullGrpcMuxImplTest() : null_mux_(std::make_unique()) {} @@ -1080,6 +1245,8 @@ TEST_F(NullGrpcMuxImplTest, AddWatchRaisesException) { EnvoyException, "ADS must be configured to support an ADS config source"); } +TEST_F(NullGrpcMuxImplTest, NoEdsResourcesCache) { EXPECT_EQ({}, null_mux_->edsResourcesCache()); } + } // namespace } // namespace XdsMux } // namespace Config diff --git a/test/mocks/config/BUILD b/test/mocks/config/BUILD index ea9057e1aed7..82c7ba41ecf0 100644 --- a/test/mocks/config/BUILD +++ b/test/mocks/config/BUILD @@ -35,3 +35,13 @@ envoy_cc_mock( "//envoy/config:config_validator_interface", ], ) + +envoy_cc_mock( + name = "eds_resources_cache_mocks", + srcs = ["eds_resources_cache.cc"], + hdrs = ["eds_resources_cache.h"], + deps = [ + "//envoy/config:eds_resources_cache_interface", + "//source/common/common:logger_lib", + ], +) diff --git a/test/mocks/config/eds_resources_cache.cc b/test/mocks/config/eds_resources_cache.cc new file mode 100644 index 000000000000..1f11831228d5 --- /dev/null +++ b/test/mocks/config/eds_resources_cache.cc @@ -0,0 +1,16 @@ +#include "test/mocks/config/eds_resources_cache.h" + +#include "source/common/common/logger.h" + +namespace Envoy { +namespace Config { +using testing::_; +using testing::Return; + +using testing::Invoke; +MockEdsResourcesCache::MockEdsResourcesCache() { + ON_CALL(*this, getResource(_, _)).WillByDefault(Return(absl::nullopt)); +} + +} // namespace Config +} // namespace Envoy diff --git a/test/mocks/config/eds_resources_cache.h b/test/mocks/config/eds_resources_cache.h new file mode 100644 index 000000000000..72661c0312bd --- /dev/null +++ b/test/mocks/config/eds_resources_cache.h @@ -0,0 +1,31 @@ +#pragma once + +#include "envoy/config/eds_resources_cache.h" + +#include "gmock/gmock.h" + +namespace Envoy { +namespace Config { + +class MockEdsResourcesCache : public EdsResourcesCache { +public: + MockEdsResourcesCache(); + ~MockEdsResourcesCache() override = default; + + MOCK_METHOD(void, setResource, + (absl::string_view resource_name, + const envoy::config::endpoint::v3::ClusterLoadAssignment& resource)); + MOCK_METHOD(void, removeResource, (absl::string_view resource_name)); + MOCK_METHOD(OptRef, getResource, + (absl::string_view resource_name, EdsResourceRemovalCallback* removal_cb)); + MOCK_METHOD(void, removeCallback, + (absl::string_view resource_name, EdsResourceRemovalCallback* removal_cb)); + MOCK_METHOD(uint32_t, cacheSizeForTest, (), (const)); + + MOCK_METHOD(void, setExpiryTimer, + (absl::string_view resource_name, std::chrono::milliseconds ms)); + MOCK_METHOD(void, disableExpiryTimer, (absl::string_view resource_name)); +}; + +} // namespace Config +} // namespace Envoy diff --git a/test/mocks/config/mocks.h b/test/mocks/config/mocks.h index 4147b4046f1d..d6fe23e3c3f9 100644 --- a/test/mocks/config/mocks.h +++ b/test/mocks/config/mocks.h @@ -129,6 +129,8 @@ class MockGrpcMux : public GrpcMux { const absl::flat_hash_set& add_these_names)); MOCK_METHOD(bool, paused, (const std::string& type_url), (const)); + + MOCK_METHOD(EdsResourcesCacheOptRef, edsResourcesCache, ()); }; class MockGrpcStreamCallbacks