Skip to content

Commit

Permalink
eds: Adding eds caching support to grpc-mux (#28273)
Browse files Browse the repository at this point in the history
Continuation of PR #28079 (as part of the work for issue #26749).

Currently after an EDS-cluster update, Envoy waits for an EDS response. If a timeout occurs, the EDS-cluster will be used without endpoints.

This PR adds the use of caching into the GrpcMux. The GrpcMux object adds an EDS resource to the cache when it is received/updated, and removes it when there are no longer subscriptions (watchers).

A runtime flag is added to disable the use of the cache, and will be enabled in a future PR when ADS is used.

Next PR will plumb this into ADS, and add fetching of resources from the cache as part of the EdsClusterImpl.

The entire change can be looked here: adisuissa/envoy@f0b7ac8

Risk Level: Low - the disabled runtime flag should prevent the use of the cache in non-tests code.
Testing: Added unit tests.
Docs Changes: N/A.
Release Notes: N/A (future PR).
Platform Specific Features: N/A.
Runtime guard: disabled by default: envoy_restart_features_use_eds_cache_for_ads

Signed-off-by: Adi Suissa-Peleg <adip@google.com>
  • Loading branch information
adisuissa authored Aug 2, 2023
1 parent 4a84e88 commit 6e7d486
Show file tree
Hide file tree
Showing 31 changed files with 1,064 additions and 97 deletions.
1 change: 1 addition & 0 deletions envoy/config/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ envoy_cc_library(
name = "grpc_mux_interface",
hdrs = ["grpc_mux.h"],
deps = [
":eds_resources_cache_interface",
":subscription_interface",
"//envoy/stats:stats_macros",
"//source/common/common:cleanup_lib",
Expand Down
7 changes: 7 additions & 0 deletions envoy/config/grpc_mux.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include "envoy/common/exception.h"
#include "envoy/common/pure.h"
#include "envoy/config/eds_resources_cache.h"
#include "envoy/config/subscription.h"
#include "envoy/stats/stats_macros.h"

Expand Down Expand Up @@ -105,6 +106,12 @@ class GrpcMux {

virtual void requestOnDemandUpdate(const std::string& type_url,
const absl::flat_hash_set<std::string>& for_update) PURE;

/**
* Returns an EdsResourcesCache for this GrpcMux if there is one.
* @return EdsResourcesCacheOptRef optional eds resources cache for the gRPC-mux.
*/
virtual EdsResourcesCacheOptRef edsResourcesCache() PURE;
};

using GrpcMuxPtr = std::unique_ptr<GrpcMux>;
Expand Down
2 changes: 1 addition & 1 deletion envoy/config/subscription_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ class MuxFactory : public Config::UntypedFactory {
const LocalInfo::LocalInfo& local_info,
std::unique_ptr<CustomConfigValidators>&& config_validators,
BackOffStrategyPtr&& backoff_strategy, OptRef<XdsConfigTracker> xds_config_tracker,
OptRef<XdsResourcesDelegate> xds_resources_delegate) PURE;
OptRef<XdsResourcesDelegate> xds_resources_delegate, bool use_eds_resources_cache) PURE;
};

} // namespace Config
Expand Down
2 changes: 2 additions & 0 deletions source/common/config/null_grpc_mux_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ class NullGrpcMuxImpl : public GrpcMux,
ENVOY_BUG(false, "unexpected request for on demand update");
}

EdsResourcesCacheOptRef edsResourcesCache() override { return absl::nullopt; }

void onWriteable() override {}
void onStreamEstablished() override {}
void onEstablishmentFailure() override {}
Expand Down
3 changes: 3 additions & 0 deletions source/common/runtime/runtime_features.cc
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ FALSE_RUNTIME_GUARD(envoy_reloadable_features_enable_include_histograms);
FALSE_RUNTIME_GUARD(envoy_reloadable_features_refresh_rtt_after_request);
// TODO(danzh) false deprecate it once QUICHE has its own enable/disable flag.
FALSE_RUNTIME_GUARD(envoy_reloadable_features_quic_reject_all);
// TODO(adisuissa) this will be enabled by default once the work on the feature is
// done in EdsClusterImpl.
FALSE_RUNTIME_GUARD(envoy_restart_features_use_eds_cache_for_ads);

// Block of non-boolean flags. Use of int flags is deprecated. Do not add more.
ABSL_FLAG(uint64_t, re2_max_program_size_error_level, 100, ""); // NOLINT
Expand Down
4 changes: 2 additions & 2 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ ClusterManagerImpl::ClusterManagerImpl(
->createUncachedRawAsyncClient(),
main_thread_dispatcher, random_, *stats_.rootScope(), dyn_resources.ads_config(),
local_info, std::move(custom_config_validators), std::move(backoff_strategy),
makeOptRefFromPtr(xds_config_tracker_.get()), {});
makeOptRefFromPtr(xds_config_tracker_.get()), {}, false);
} else {
Config::Utility::checkTransportVersion(dyn_resources.ads_config());
auto xds_delegate_opt_ref = makeOptRefFromPtr(xds_resources_delegate_.get());
Expand All @@ -439,7 +439,7 @@ ClusterManagerImpl::ClusterManagerImpl(
->createUncachedRawAsyncClient(),
main_thread_dispatcher, random_, *stats_.rootScope(), dyn_resources.ads_config(),
local_info, std::move(custom_config_validators), std::move(backoff_strategy),
makeOptRefFromPtr(xds_config_tracker_.get()), xds_delegate_opt_ref);
makeOptRefFromPtr(xds_config_tracker_.get()), xds_delegate_opt_ref, false);
}
} else {
ads_mux_ = std::make_unique<Config::NullGrpcMuxImpl>();
Expand Down
6 changes: 6 additions & 0 deletions source/extensions/config_subscription/grpc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ envoy_cc_library(
srcs = ["grpc_mux_impl.cc"],
hdrs = ["grpc_mux_impl.h"],
deps = [
":eds_resources_cache_lib",
":grpc_stream_lib",
":xds_source_id_lib",
"//envoy/config:custom_config_validators_interface",
Expand All @@ -34,6 +35,7 @@ envoy_cc_library(
"//source/common/memory:utils_lib",
"//source/common/protobuf",
"@com_google_absl//absl/container:btree",
"@envoy_api//envoy/config/endpoint/v3:pkg_cc_proto",
"@envoy_api//envoy/service/discovery/v3:pkg_cc_proto",
],
)
Expand All @@ -44,6 +46,7 @@ envoy_cc_library(
hdrs = ["new_grpc_mux_impl.h"],
deps = [
":delta_subscription_state_lib",
":eds_resources_cache_lib",
":grpc_stream_lib",
":pausable_ack_queue_lib",
":watch_map_lib",
Expand All @@ -54,6 +57,7 @@ envoy_cc_library(
"//source/common/config:xds_context_params_lib",
"//source/common/config:xds_resource_lib",
"//source/common/memory:utils_lib",
"@envoy_api//envoy/config/endpoint/v3:pkg_cc_proto",
"@envoy_api//envoy/service/discovery/v3:pkg_cc_proto",
],
)
Expand All @@ -63,6 +67,7 @@ envoy_cc_library(
srcs = ["grpc_subscription_impl.cc"],
hdrs = ["grpc_subscription_impl.h"],
deps = [
":eds_resources_cache_lib",
":grpc_mux_lib",
":new_grpc_mux_lib",
"//envoy/config:subscription_interface",
Expand Down Expand Up @@ -210,6 +215,7 @@ envoy_cc_library(
"//source/common/common:minimal_logger_lib",
"//source/common/common:utility_lib",
"//source/common/config:decoded_resource_lib",
"//source/common/config:resource_name_lib",
"//source/common/config:utility_lib",
"//source/common/config:xds_resource_lib",
"//source/common/protobuf",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ SubscriptionPtr DeltaGrpcCollectionConfigSubscriptionFactory::create(
data.dispatcher_, deltaGrpcMethod(data.type_url_), data.scope_,
Utility::parseRateLimitSettings(api_config_source), data.local_info_,
std::move(custom_config_validators), std::move(backoff_strategy),
data.xds_config_tracker_),
data.xds_config_tracker_,
// No EDS resources cache needed from collections.
/*eds_resources_cache=*/nullptr),
data.callbacks_, data.resource_decoder_, data.stats_, data.dispatcher_,
Utility::configSourceInitialFetchTimeout(data.config_), /*is_aggregated=*/false,
data.options_);
Expand Down
50 changes: 37 additions & 13 deletions source/extensions/config_subscription/grpc/grpc_mux_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "source/common/config/utility.h"
#include "source/common/memory/utils.h"
#include "source/common/protobuf/protobuf.h"
#include "source/extensions/config_subscription/grpc/eds_resources_cache_impl.h"
#include "source/extensions/config_subscription/grpc/xds_source_id.h"

#include "absl/container/btree_map.h"
Expand Down Expand Up @@ -56,21 +57,21 @@ std::string convertToWildcard(const std::string& resource_name) {
}
} // namespace

GrpcMuxImpl::GrpcMuxImpl(const LocalInfo::LocalInfo& local_info,
Grpc::RawAsyncClientPtr async_client, Event::Dispatcher& dispatcher,
const Protobuf::MethodDescriptor& service_method, Stats::Scope& scope,
const RateLimitSettings& rate_limit_settings, bool skip_subsequent_node,
CustomConfigValidatorsPtr&& config_validators,
BackOffStrategyPtr backoff_strategy,
XdsConfigTrackerOptRef xds_config_tracker,
XdsResourcesDelegateOptRef xds_resources_delegate,
const std::string& target_xds_authority)
GrpcMuxImpl::GrpcMuxImpl(
const LocalInfo::LocalInfo& local_info, Grpc::RawAsyncClientPtr async_client,
Event::Dispatcher& dispatcher, const Protobuf::MethodDescriptor& service_method,
Stats::Scope& scope, const RateLimitSettings& rate_limit_settings, bool skip_subsequent_node,
CustomConfigValidatorsPtr&& config_validators, BackOffStrategyPtr backoff_strategy,
XdsConfigTrackerOptRef xds_config_tracker, XdsResourcesDelegateOptRef xds_resources_delegate,
EdsResourcesCachePtr eds_resources_cache, const std::string& target_xds_authority)
: grpc_stream_(this, std::move(async_client), service_method, dispatcher, scope,
std::move(backoff_strategy), rate_limit_settings),
local_info_(local_info), skip_subsequent_node_(skip_subsequent_node),
config_validators_(std::move(config_validators)), xds_config_tracker_(xds_config_tracker),
xds_resources_delegate_(xds_resources_delegate), target_xds_authority_(target_xds_authority),
first_stream_request_(true), dispatcher_(dispatcher),
xds_resources_delegate_(xds_resources_delegate),
eds_resources_cache_(std::move(eds_resources_cache)),
target_xds_authority_(target_xds_authority), first_stream_request_(true),
dispatcher_(dispatcher),
dynamic_update_callback_handle_(local_info.contextProvider().addDynamicContextUpdateCallback(
[this](absl::string_view resource_type_url) {
onDynamicContextUpdate(resource_type_url);
Expand Down Expand Up @@ -187,8 +188,14 @@ GrpcMuxWatchPtr GrpcMuxImpl::addWatch(const std::string& type_url,
SubscriptionCallbacks& callbacks,
OpaqueResourceDecoderSharedPtr resource_decoder,
const SubscriptionOptions& options) {
// Resource cache is only used for EDS resources.
EdsResourcesCacheOptRef resources_cache{absl::nullopt};
if (eds_resources_cache_ &&
(type_url == Config::getTypeUrl<envoy::config::endpoint::v3::ClusterLoadAssignment>())) {
resources_cache = makeOptRefFromPtr(eds_resources_cache_.get());
}
auto watch = std::make_unique<GrpcMuxWatchImpl>(resources, callbacks, resource_decoder, type_url,
*this, options, local_info_);
*this, options, local_info_, resources_cache);
ENVOY_LOG(debug, "gRPC mux addWatch for " + type_url);

// Lazily kick off the requests based on first subscription. This has the
Expand Down Expand Up @@ -412,6 +419,19 @@ void GrpcMuxImpl::processDiscoveryResources(const std::vector<DecodedResourcePtr
// updates in the message for EDS/RDS.
if (!found_resources.empty()) {
watch->callbacks_.onConfigUpdate(found_resources, version_info);
// Resource cache is only used for EDS resources.
if (eds_resources_cache_ &&
(type_url == Config::getTypeUrl<envoy::config::endpoint::v3::ClusterLoadAssignment>())) {
for (const auto& resource : found_resources) {
const envoy::config::endpoint::v3::ClusterLoadAssignment& cluster_load_assignment =
dynamic_cast<const envoy::config::endpoint::v3::ClusterLoadAssignment&>(
resource.get().resource());
eds_resources_cache_->setResource(resource.get().name(), cluster_load_assignment);
}
// No need to remove resources from the cache, as currently only non-collection
// subscriptions are supported, and these resources are removed in the call
// to updateWatchInterest().
}
}
}

Expand Down Expand Up @@ -532,14 +552,18 @@ class GrpcMuxFactory : public MuxFactory {
const envoy::config::core::v3::ApiConfigSource& ads_config,
const LocalInfo::LocalInfo& local_info, CustomConfigValidatorsPtr&& config_validators,
BackOffStrategyPtr&& backoff_strategy, XdsConfigTrackerOptRef xds_config_tracker,
XdsResourcesDelegateOptRef xds_resources_delegate) override {
XdsResourcesDelegateOptRef xds_resources_delegate, bool use_eds_resources_cache) override {
return std::make_shared<Config::GrpcMuxImpl>(
local_info, std::move(async_client), dispatcher,
*Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
"envoy.service.discovery.v3.AggregatedDiscoveryService.StreamAggregatedResources"),
scope, Utility::parseRateLimitSettings(ads_config),
ads_config.set_node_on_first_message_only(), std::move(config_validators),
std::move(backoff_strategy), xds_config_tracker, xds_resources_delegate,
(use_eds_resources_cache &&
Runtime::runtimeFeatureEnabled("envoy.restart_features.use_eds_cache_for_ads"))
? std::make_unique<EdsResourcesCacheImpl>(dispatcher)
: nullptr,
Config::Utility::getGrpcControlPlane(ads_config).value_or(""));
}
};
Expand Down
44 changes: 40 additions & 4 deletions source/extensions/config_subscription/grpc/grpc_mux_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "envoy/common/random_generator.h"
#include "envoy/common/time.h"
#include "envoy/config/custom_config_validators.h"
#include "envoy/config/endpoint/v3/endpoint.pb.h"
#include "envoy/config/grpc_mux.h"
#include "envoy/config/subscription.h"
#include "envoy/config/xds_config_tracker.h"
Expand All @@ -20,6 +21,7 @@
#include "source/common/common/logger.h"
#include "source/common/common/utility.h"
#include "source/common/config/api_version.h"
#include "source/common/config/resource_name.h"
#include "source/common/config/ttl.h"
#include "source/common/config/utility.h"
#include "source/common/config/xds_context_params.h"
Expand All @@ -44,7 +46,7 @@ class GrpcMuxImpl : public GrpcMux,
bool skip_subsequent_node, CustomConfigValidatorsPtr&& config_validators,
BackOffStrategyPtr backoff_strategy, XdsConfigTrackerOptRef xds_config_tracker,
XdsResourcesDelegateOptRef xds_resources_delegate,
const std::string& target_xds_authority);
EdsResourcesCachePtr eds_resources_cache, const std::string& target_xds_authority);

~GrpcMuxImpl() override;

Expand Down Expand Up @@ -72,6 +74,10 @@ class GrpcMuxImpl : public GrpcMux,
void requestOnDemandUpdate(const std::string&, const absl::flat_hash_set<std::string>&) override {
}

EdsResourcesCacheOptRef edsResourcesCache() override {
return makeOptRefFromPtr(eds_resources_cache_.get());
}

void handleDiscoveryResponse(
std::unique_ptr<envoy::service::discovery::v3::DiscoveryResponse>&& message);

Expand Down Expand Up @@ -99,17 +105,29 @@ class GrpcMuxImpl : public GrpcMux,
SubscriptionCallbacks& callbacks,
OpaqueResourceDecoderSharedPtr resource_decoder, const std::string& type_url,
GrpcMuxImpl& parent, const SubscriptionOptions& options,
const LocalInfo::LocalInfo& local_info)
const LocalInfo::LocalInfo& local_info,
EdsResourcesCacheOptRef eds_resources_cache)
: callbacks_(callbacks), resource_decoder_(resource_decoder), type_url_(type_url),
parent_(parent), subscription_options_(options), local_info_(local_info),
watches_(parent.apiStateFor(type_url).watches_) {
watches_(parent.apiStateFor(type_url).watches_),
eds_resources_cache_(eds_resources_cache) {
updateResources(resources);
// If eds resources cache is provided, then the type must be ClusterLoadAssignment.
ASSERT(
!eds_resources_cache_.has_value() ||
(type_url == Config::getTypeUrl<envoy::config::endpoint::v3::ClusterLoadAssignment>()));
}

~GrpcMuxWatchImpl() override {
watches_.erase(iter_);
if (!resources_.empty()) {
parent_.queueDiscoveryRequest(type_url_);
if (eds_resources_cache_.has_value()) {
// All resources are to be removed, so remove them from the cache.
for (const auto& resource_name : resources_) {
eds_resources_cache_->removeResource(resource_name);
}
}
}
}

Expand All @@ -131,7 +149,12 @@ class GrpcMuxImpl : public GrpcMux,

private:
void updateResources(const absl::flat_hash_set<std::string>& resources) {
resources_.clear();
// Finding the list of removed resources by keeping the current resources
// set until the end the function and computing the diff.
// Temporarily keep the resources prior to the update to find which ones
// were removed.
std::set<std::string> previous_resources;
previous_resources.swap(resources_);
std::transform(
resources.begin(), resources.end(), std::inserter(resources_, resources_.begin()),
[this](const std::string& resource_name) -> std::string {
Expand All @@ -148,6 +171,16 @@ class GrpcMuxImpl : public GrpcMux,
}
return resource_name;
});
if (eds_resources_cache_.has_value()) {
// Compute the removed resources and remove them from the cache.
std::vector<std::string> removed_resources;
std::set_difference(previous_resources.begin(), previous_resources.end(),
resources_.begin(), resources_.end(),
std::back_inserter(removed_resources));
for (const auto& resource_name : removed_resources) {
eds_resources_cache_->removeResource(resource_name);
}
}
// move this watch to the beginning of the list
iter_ = watches_.emplace(watches_.begin(), this);
}
Expand All @@ -157,6 +190,8 @@ class GrpcMuxImpl : public GrpcMux,
const LocalInfo::LocalInfo& local_info_;
WatchList& watches_;
WatchList::iterator iter_;
// Optional cache for the specific ClusterLoadAssignments of this watch.
EdsResourcesCacheOptRef eds_resources_cache_;
};

// Per muxed API state.
Expand Down Expand Up @@ -212,6 +247,7 @@ class GrpcMuxImpl : public GrpcMux,
CustomConfigValidatorsPtr config_validators_;
XdsConfigTrackerOptRef xds_config_tracker_;
XdsResourcesDelegateOptRef xds_resources_delegate_;
EdsResourcesCachePtr eds_resources_cache_;
const std::string target_xds_authority_;
bool first_stream_request_;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ GrpcConfigSubscriptionFactory::create(ConfigSubscriptionFactory::SubscriptionDat
Utility::parseRateLimitSettings(api_config_source), data.local_info_,
api_config_source.set_node_on_first_message_only(), std::move(custom_config_validators),
std::move(backoff_strategy), data.xds_config_tracker_, data.xds_resources_delegate_,
/*std::move(data.eds_resources_cache_)*/ nullptr, // EDS cache is only used for ADS.
control_plane_id);
} else {
mux = std::make_shared<Config::GrpcMuxImpl>(
Expand All @@ -44,6 +45,7 @@ GrpcConfigSubscriptionFactory::create(ConfigSubscriptionFactory::SubscriptionDat
Utility::parseRateLimitSettings(api_config_source),
api_config_source.set_node_on_first_message_only(), std::move(custom_config_validators),
std::move(backoff_strategy), data.xds_config_tracker_, data.xds_resources_delegate_,
/*std::move(data.eds_resources_cache_)*/ nullptr, // EDS cache is only used for ADS.
control_plane_id);
}
return std::make_unique<GrpcSubscriptionImpl>(
Expand Down Expand Up @@ -73,15 +75,19 @@ DeltaGrpcConfigSubscriptionFactory::create(ConfigSubscriptionFactory::Subscripti
data.dispatcher_, deltaGrpcMethod(data.type_url_), data.scope_,
Utility::parseRateLimitSettings(api_config_source), data.local_info_,
api_config_source.set_node_on_first_message_only(), std::move(custom_config_validators),
std::move(backoff_strategy), data.xds_config_tracker_);
std::move(backoff_strategy), data.xds_config_tracker_,
/*std::move(data.eds_resources_cache_)*/ nullptr // EDS cache is only used for ADS.
);
} else {
mux = std::make_shared<Config::NewGrpcMuxImpl>(
Config::Utility::factoryForGrpcApiConfigSource(data.cm_.grpcAsyncClientManager(),
api_config_source, data.scope_, true)
->createUncachedRawAsyncClient(),
data.dispatcher_, deltaGrpcMethod(data.type_url_), data.scope_,
Utility::parseRateLimitSettings(api_config_source), data.local_info_,
std::move(custom_config_validators), std::move(backoff_strategy), data.xds_config_tracker_);
std::move(custom_config_validators), std::move(backoff_strategy), data.xds_config_tracker_,
/*std::move(data.eds_resources_cache_)*/ nullptr // EDS cache is only used for ADS.
);
}
return std::make_unique<GrpcSubscriptionImpl>(
std::move(mux), data.callbacks_, data.resource_decoder_, data.stats_, data.type_url_,
Expand Down
Loading

0 comments on commit 6e7d486

Please sign in to comment.