diff --git a/api/envoy/config/bootstrap/v3/bootstrap.proto b/api/envoy/config/bootstrap/v3/bootstrap.proto index a5fcbef104b3..f12f9819dcf8 100644 --- a/api/envoy/config/bootstrap/v3/bootstrap.proto +++ b/api/envoy/config/bootstrap/v3/bootstrap.proto @@ -438,6 +438,7 @@ message Admin { } // Cluster manager :ref:`architecture overview `. +// [#next-free-field: 6] message ClusterManager { option (udpa.annotations.versioning).previous_message_type = "envoy.config.bootstrap.v2.ClusterManager"; @@ -478,6 +479,11 @@ message ClusterManager { // ` :ref:`GRPC // `. core.v3.ApiConfigSource load_stats_config = 4; + + // Whether the ClusterManager will create clusters on the worker threads + // inline during requests. This will save memory and CPU cycles in cases where + // there are lots of inactive clusters and > 1 worker thread. + bool enable_deferred_cluster_creation = 5; } // Allows you to specify different watchdog configs for different subsystems. diff --git a/changelogs/current.yaml b/changelogs/current.yaml index 9ad064268a4c..d9a6e5d8b1c1 100644 --- a/changelogs/current.yaml +++ b/changelogs/current.yaml @@ -30,6 +30,14 @@ new_features: change: | added %RESPONSE_FLAGS_LONG% substitution string, that will output a pascal case string representing the resonse flags. The output response flags will correspond with %RESPONSE_FLAGS%, only with a long textual string representation. +- area: config + change: | + Added the capability to defer broadcasting of certain cluster (CDS, EDS) to + worker threads from the main thread. This optimization can save significant + amount of memory in cases where there are (1) a large number of workers and + (2) a large amount of config, most of which is unused. This capability is + guarded by :ref:`enable_deferred_cluster_creation + `. - area: extension_discovery_service change: | added ECDS support for :ref:` downstream network filters`. diff --git a/docs/root/configuration/upstream/cluster_manager/cluster_stats.rst b/docs/root/configuration/upstream/cluster_manager/cluster_stats.rst index 3dd58c0d7a0c..7deafe39415b 100644 --- a/docs/root/configuration/upstream/cluster_manager/cluster_stats.rst +++ b/docs/root/configuration/upstream/cluster_manager/cluster_stats.rst @@ -28,6 +28,17 @@ upstreams and control plane xDS clusters. active_clusters, Gauge, Number of currently active (warmed) clusters warming_clusters, Gauge, Number of currently warming (not active) clusters + +In addition to the cluster manager stats, there are per worker thread local +cluster manager statistics tree rooted at +*thread_local_cluster_manager..* with the following statistics. + +.. csv-table:: + :header: Name, Type, Description + :widths: 1, 1, 2 + + clusters_inflated, Gauge, Number of clusters the worker has initialized. If using cluster deferral this number should be <= (cluster_added - clusters_removed). + .. _config_cluster_stats: Every cluster has a statistics tree rooted at *cluster..* with the following statistics: diff --git a/envoy/upstream/cluster_manager.h b/envoy/upstream/cluster_manager.h index 319cbec9c754..fffb69b81574 100644 --- a/envoy/upstream/cluster_manager.h +++ b/envoy/upstream/cluster_manager.h @@ -43,6 +43,7 @@ namespace Upstream { * ClusterUpdateCallbacks provide a way to expose Cluster lifecycle events in the * ClusterManager. */ +using ThreadLocalClusterCommand = std::function; class ClusterUpdateCallbacks { public: virtual ~ClusterUpdateCallbacks() = default; @@ -50,11 +51,12 @@ class ClusterUpdateCallbacks { /** * onClusterAddOrUpdate is called when a new cluster is added or an existing cluster * is updated in the ClusterManager. - * @param cluster is the ThreadLocalCluster that represents the updated - * cluster. + * @param cluster_name the name of the changed cluster. + * @param get_cluster is a callable that will provide the ThreadLocalCluster that represents the + * updated cluster. It should be used within the call or discarded. */ - virtual void onClusterAddOrUpdate(ThreadLocalCluster& cluster) PURE; - + virtual void onClusterAddOrUpdate(absl::string_view cluster_name, + ThreadLocalClusterCommand& get_cluster) PURE; /** * onClusterRemoval is called when a cluster is removed; the argument is the cluster name. * @param cluster_name is the name of the removed cluster. diff --git a/source/common/upstream/cluster_discovery_manager.cc b/source/common/upstream/cluster_discovery_manager.cc index ac6a87f3a78c..b9ab3e8d42f4 100644 --- a/source/common/upstream/cluster_discovery_manager.cc +++ b/source/common/upstream/cluster_discovery_manager.cc @@ -9,13 +9,15 @@ namespace Upstream { namespace { -using ClusterAddedCb = std::function; +using ClusterAddedCb = std::function; class ClusterCallbacks : public ClusterUpdateCallbacks { public: ClusterCallbacks(ClusterAddedCb cb) : cb_(std::move(cb)) {} - void onClusterAddOrUpdate(ThreadLocalCluster& cluster) override { cb_(cluster); }; + void onClusterAddOrUpdate(absl::string_view cluster_name, ThreadLocalClusterCommand&) override { + cb_(cluster_name); + }; void onClusterRemoval(const std::string&) override {} @@ -28,12 +30,12 @@ class ClusterCallbacks : public ClusterUpdateCallbacks { ClusterDiscoveryManager::ClusterDiscoveryManager( std::string thread_name, ClusterLifecycleCallbackHandler& lifecycle_callbacks_handler) : thread_name_(std::move(thread_name)) { - callbacks_ = std::make_unique([this](ThreadLocalCluster& cluster) { + callbacks_ = std::make_unique([this](absl::string_view cluster_name) { ENVOY_LOG(trace, "cm cdm: starting processing cluster name {} (status {}) from cluster lifecycle " "callback in {}", - cluster.info()->name(), enumToInt(ClusterDiscoveryStatus::Available), thread_name_); - processClusterName(cluster.info()->name(), ClusterDiscoveryStatus::Available); + cluster_name, enumToInt(ClusterDiscoveryStatus::Available), thread_name_); + processClusterName(cluster_name, ClusterDiscoveryStatus::Available); }); callbacks_handle_ = lifecycle_callbacks_handler.addClusterUpdateCallbacks(*callbacks_); } diff --git a/source/common/upstream/cluster_manager_impl.cc b/source/common/upstream/cluster_manager_impl.cc index 3c1d580d2bf5..726f783a910c 100644 --- a/source/common/upstream/cluster_manager_impl.cc +++ b/source/common/upstream/cluster_manager_impl.cc @@ -289,6 +289,7 @@ ClusterManagerImpl::ClusterManagerImpl( const Server::Instance& server) : factory_(factory), runtime_(runtime), stats_(stats), tls_(tls), random_(api.randomGenerator()), + deferred_cluster_creation_(bootstrap.cluster_manager().enable_deferred_cluster_creation()), bind_config_(bootstrap.cluster_manager().has_upstream_bind_config() ? absl::make_optional(bootstrap.cluster_manager().upstream_bind_config()) : absl::nullopt), @@ -528,6 +529,13 @@ ClusterManagerStats ClusterManagerImpl::generateStats(Stats::Scope& scope) { POOL_GAUGE_PREFIX(scope, final_prefix))}; } +ThreadLocalClusterManagerStats +ClusterManagerImpl::ThreadLocalClusterManagerImpl::generateStats(Stats::Scope& scope, + const std::string& thread_name) { + const std::string final_prefix = absl::StrCat("thread_local_cluster_manager.", thread_name); + return {ALL_THREAD_LOCAL_CLUSTER_MANAGER_STATS(POOL_GAUGE_PREFIX(scope, final_prefix))}; +} + void ClusterManagerImpl::onClusterInit(ClusterManagerCluster& cm_cluster) { // This routine is called when a cluster has finished initializing. The cluster has not yet // been setup for cross-thread updates to avoid needless updates during initialization. The order @@ -549,6 +557,8 @@ void ClusterManagerImpl::onClusterInit(ClusterManagerCluster& cm_cluster) { } // Now setup for cross-thread updates. + // This is used by cluster types such as EDS clusters to drain the connection pools of removed + // hosts. cluster_data->second->member_update_cb_ = cluster.prioritySet().addMemberUpdateCb( [&cluster, this](const HostVector&, const HostVector& hosts_removed) -> void { if (cluster.info()->lbConfig().close_connections_on_host_set_change()) { @@ -569,6 +579,8 @@ void ClusterManagerImpl::onClusterInit(ClusterManagerCluster& cm_cluster) { } }); + // This is used by cluster types such as EDS clusters to update the cluster + // without draining the cluster. cluster_data->second->priority_update_cb_ = cluster.prioritySet().addPriorityUpdateCb( [&cm_cluster, this](uint32_t priority, const HostVector& hosts_added, const HostVector& hosts_removed) { @@ -793,7 +805,8 @@ bool ClusterManagerImpl::removeCluster(const std::string& cluster_name) { ENVOY_LOG(debug, "removing cluster {}", cluster_name); tls_.runOnAllThreads([cluster_name](OptRef cluster_manager) { - ASSERT(cluster_manager->thread_local_clusters_.count(cluster_name) == 1); + ASSERT(cluster_manager->thread_local_clusters_.contains(cluster_name) || + cluster_manager->thread_local_deferred_clusters_.contains(cluster_name)); ENVOY_LOG(debug, "removing TLS cluster {}", cluster_name); for (auto cb_it = cluster_manager->update_callbacks_.begin(); cb_it != cluster_manager->update_callbacks_.end();) { @@ -804,7 +817,11 @@ bool ClusterManagerImpl::removeCluster(const std::string& cluster_name) { (*curr_cb_it)->onClusterRemoval(cluster_name); } cluster_manager->thread_local_clusters_.erase(cluster_name); + cluster_manager->thread_local_deferred_clusters_.erase(cluster_name); + cluster_manager->local_stats_.clusters_inflated_.set( + cluster_manager->thread_local_clusters_.size()); }); + cluster_initialization_map_.erase(cluster_name); } auto existing_warming_cluster = warming_clusters_.find(cluster_name); @@ -973,7 +990,7 @@ ThreadLocalCluster* ClusterManagerImpl::getThreadLocalCluster(absl::string_view if (entry != cluster_manager.thread_local_clusters_.end()) { return entry->second.get(); } else { - return nullptr; + return cluster_manager.initializeClusterInlineIfExists(cluster); } } @@ -1054,7 +1071,6 @@ void ClusterManagerImpl::drainConnections(const std::string& cluster, DrainConnectionsHostPredicate predicate) { ENVOY_LOG_EVENT(debug, "drain_connections_call", "drainConnections called for cluster {}", cluster); - tls_.runOnAllThreads([cluster, predicate](OptRef cluster_manager) { auto cluster_entry = cluster_manager->thread_local_clusters_.find(cluster); if (cluster_entry != cluster_manager->thread_local_clusters_.end()) { @@ -1067,7 +1083,6 @@ void ClusterManagerImpl::drainConnections(const std::string& cluster, void ClusterManagerImpl::drainConnections(DrainConnectionsHostPredicate predicate) { ENVOY_LOG_EVENT(debug, "drain_connections_call_for_all_clusters", "drainConnections called for all clusters"); - tls_.runOnAllThreads([predicate](OptRef cluster_manager) { for (const auto& cluster_entry : cluster_manager->thread_local_clusters_) { cluster_entry.second->drainConnPools(predicate, @@ -1088,12 +1103,49 @@ void ClusterManagerImpl::checkActiveStaticCluster(const std::string& cluster) { void ClusterManagerImpl::postThreadLocalRemoveHosts(const Cluster& cluster, const HostVector& hosts_removed) { + // Drain the connection pools for the given hosts. For deferred clusters have + // been created. tls_.runOnAllThreads([name = cluster.info()->name(), hosts_removed](OptRef cluster_manager) { cluster_manager->removeHosts(name, hosts_removed); }); } +bool ClusterManagerImpl::deferralIsSupportedForCluster( + const ClusterInfoConstSharedPtr& info) const { + if (!deferred_cluster_creation_) { + return false; + } + + // Certain cluster types are unsupported for deferred initialization. + // We need to check both the `clusterType()` (preferred) falling back to + // the `type()` due to how custom clusters were added leveraging an any + // config. + if (auto custom_cluster_type = info->clusterType(); custom_cluster_type.has_value()) { + // TODO(kbaichoo): make it configurable what custom types are supported? + static const std::array supported_well_known_cluster_types = { + "envoy.clusters.aggregate", "envoy.cluster.eds", "envoy.clusters.redis", + "envoy.cluster.static"}; + if (std::find(supported_well_known_cluster_types.begin(), + supported_well_known_cluster_types.end(), + custom_cluster_type->name()) == supported_well_known_cluster_types.end()) { + return false; + } + + } else { + // Check DiscoveryType instead. + static constexpr std::array + supported_cluster_types = {envoy::config::cluster::v3::Cluster::EDS, + envoy::config::cluster::v3::Cluster::STATIC}; + if (std::find(supported_cluster_types.begin(), supported_cluster_types.end(), info->type()) == + supported_cluster_types.end()) { + return false; + } + } + + return true; +} + void ClusterManagerImpl::postThreadLocalClusterUpdate(ClusterManagerCluster& cm_cluster, ThreadLocalClusterUpdateParams&& params) { bool add_or_update_cluster = false; @@ -1119,42 +1171,237 @@ void ClusterManagerImpl::postThreadLocalClusterUpdate(ClusterManagerCluster& cm_ HostMapConstSharedPtr host_map = cm_cluster.cluster().prioritySet().crossPriorityHostMap(); pending_cluster_creations_.erase(cm_cluster.cluster().info()->name()); - tls_.runOnAllThreads([info = cm_cluster.cluster().info(), params = std::move(params), - add_or_update_cluster, load_balancer_factory, map = std::move(host_map)]( - OptRef cluster_manager) { - ThreadLocalClusterManagerImpl::ClusterEntry* new_cluster = nullptr; - if (add_or_update_cluster) { - if (cluster_manager->thread_local_clusters_.contains(info->name())) { - ENVOY_LOG(debug, "updating TLS cluster {}", info->name()); - } else { - ENVOY_LOG(debug, "adding TLS cluster {}", info->name()); - } - new_cluster = new ThreadLocalClusterManagerImpl::ClusterEntry(*cluster_manager, info, - load_balancer_factory); - cluster_manager->thread_local_clusters_[info->name()].reset(new_cluster); - } + // Populate the cluster initialization object based on this update. + ClusterInitializationObjectConstSharedPtr cluster_initialization_object = + addOrUpdateClusterInitializationObjectIfSupported(params, cm_cluster.cluster().info(), + load_balancer_factory, host_map); - for (const auto& per_priority : params.per_priority_update_params_) { - cluster_manager->updateClusterMembership( - info->name(), per_priority.priority_, per_priority.update_hosts_params_, - per_priority.locality_weights_, per_priority.hosts_added_, per_priority.hosts_removed_, - per_priority.weighted_priority_health_, per_priority.overprovisioning_factor_, map); - } + tls_.runOnAllThreads([info = cm_cluster.cluster().info(), params = std::move(params), + add_or_update_cluster, load_balancer_factory, map = std::move(host_map), + cluster_initialization_object = std::move(cluster_initialization_object)]( + OptRef cluster_manager) { + ASSERT(cluster_manager.has_value(), + "Expected the ThreadLocalClusterManager to be set during ClusterManagerImpl creation."); + + // Cluster Manager here provided by the particular thread, it will provide + // this allowing to make the relevant change. + if (const bool defer_unused_clusters = + cluster_initialization_object != nullptr && + !cluster_manager->thread_local_clusters_.contains(info->name()) && + !Envoy::Thread::MainThread::isMainThread(); + defer_unused_clusters) { + // Save the cluster initialization object. + ENVOY_LOG(debug, "Deferring add or update for TLS cluster {}", info->name()); + cluster_manager->thread_local_deferred_clusters_[info->name()] = + cluster_initialization_object; + + // Invoke similar logic of onClusterAddOrUpdate. + ThreadLocalClusterCommand command = [&cluster_manager, + cluster_name = info->name()]() -> ThreadLocalCluster& { + // If we have multiple callbacks only the first one needs to use the + // command to initialize the cluster. + auto existing_cluster_entry = cluster_manager->thread_local_clusters_.find(cluster_name); + if (existing_cluster_entry != cluster_manager->thread_local_clusters_.end()) { + return *existing_cluster_entry->second; + } - if (new_cluster != nullptr) { + auto* cluster_entry = cluster_manager->initializeClusterInlineIfExists(cluster_name); + ASSERT(cluster_entry != nullptr, "Deferred clusters initiailization should not fail."); + return *cluster_entry; + }; for (auto cb_it = cluster_manager->update_callbacks_.begin(); cb_it != cluster_manager->update_callbacks_.end();) { // The current callback may remove itself from the list, so a handle for // the next item is fetched before calling the callback. auto curr_cb_it = cb_it; ++cb_it; - (*curr_cb_it)->onClusterAddOrUpdate(*new_cluster); + (*curr_cb_it)->onClusterAddOrUpdate(info->name(), command); + } + + } else { + // Broadcast + ThreadLocalClusterManagerImpl::ClusterEntry* new_cluster = nullptr; + if (add_or_update_cluster) { + if (cluster_manager->thread_local_clusters_.contains(info->name())) { + ENVOY_LOG(debug, "updating TLS cluster {}", info->name()); + } else { + ENVOY_LOG(debug, "adding TLS cluster {}", info->name()); + } + + new_cluster = new ThreadLocalClusterManagerImpl::ClusterEntry(*cluster_manager, info, + load_balancer_factory); + cluster_manager->thread_local_clusters_[info->name()].reset(new_cluster); + cluster_manager->local_stats_.clusters_inflated_.set( + cluster_manager->thread_local_clusters_.size()); + } + + for (const auto& per_priority : params.per_priority_update_params_) { + cluster_manager->updateClusterMembership( + info->name(), per_priority.priority_, per_priority.update_hosts_params_, + per_priority.locality_weights_, per_priority.hosts_added_, per_priority.hosts_removed_, + per_priority.weighted_priority_health_, per_priority.overprovisioning_factor_, map); + } + + if (new_cluster != nullptr) { + ThreadLocalClusterCommand command = [&new_cluster]() -> ThreadLocalCluster& { + return *new_cluster; + }; + for (auto cb_it = cluster_manager->update_callbacks_.begin(); + cb_it != cluster_manager->update_callbacks_.end();) { + // The current callback may remove itself from the list, so a handle for + // the next item is fetched before calling the callback. + auto curr_cb_it = cb_it; + ++cb_it; + (*curr_cb_it)->onClusterAddOrUpdate(info->name(), command); + } } } }); } +ClusterManagerImpl::ClusterInitializationObjectConstSharedPtr +ClusterManagerImpl::addOrUpdateClusterInitializationObjectIfSupported( + const ThreadLocalClusterUpdateParams& params, ClusterInfoConstSharedPtr cluster_info, + LoadBalancerFactorySharedPtr load_balancer_factory, HostMapConstSharedPtr map) { + if (!deferralIsSupportedForCluster(cluster_info)) { + return nullptr; + } + + const std::string& cluster_name = cluster_info->name(); + auto entry = cluster_initialization_map_.find(cluster_name); + // TODO(kbaichoo): if EDS can be configured via cluster_type() then modify the + // merging logic below. + // We should only merge if the cluster type is the same as before and this is + // an EDS cluster. This is due to the fact that EDS clusters get + // ClusterLoadAssignment from the configuration server but pass per priority + // deltas in updates to the ClusterManager. In the future we may decide to + // change how the updates propagate among those components. + const bool should_merge_with_prior_cluster = + entry != cluster_initialization_map_.end() && + entry->second->cluster_info_->type() == cluster_info->type() && + cluster_info->type() == envoy::config::cluster::v3::Cluster::EDS; + + if (should_merge_with_prior_cluster) { + // We need to copy from an existing Cluster Initialization Object. In + // particular, only update the params with changed priority. + auto new_initialization_object = std::make_shared( + entry->second->per_priority_state_, params, std::move(cluster_info), load_balancer_factory, + map); + cluster_initialization_map_[cluster_name] = new_initialization_object; + return new_initialization_object; + } else { + // We need to create a fresh Cluster Initialization Object. + auto new_initialization_object = std::make_shared( + params, std::move(cluster_info), load_balancer_factory, map); + cluster_initialization_map_[cluster_name] = new_initialization_object; + return new_initialization_object; + } +} + +ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry* +ClusterManagerImpl::ThreadLocalClusterManagerImpl::initializeClusterInlineIfExists( + absl::string_view cluster) { + auto entry = thread_local_deferred_clusters_.find(cluster); + if (entry == thread_local_deferred_clusters_.end()) { + // Unknown cluster. + return nullptr; + } + + // Create the cluster inline. + const ClusterInitializationObjectConstSharedPtr& initialization_object = entry->second; + ENVOY_LOG(debug, "initializing TLS cluster {} inline", cluster); + auto cluster_entry = std::make_unique( + *this, initialization_object->cluster_info_, initialization_object->load_balancer_factory_); + ClusterEntry* cluster_entry_ptr = cluster_entry.get(); + + thread_local_clusters_[cluster] = std::move(cluster_entry); + local_stats_.clusters_inflated_.set(thread_local_clusters_.size()); + + for (const auto& [_, per_priority] : initialization_object->per_priority_state_) { + updateClusterMembership(initialization_object->cluster_info_->name(), per_priority.priority_, + per_priority.update_hosts_params_, per_priority.locality_weights_, + per_priority.hosts_added_, per_priority.hosts_removed_, + per_priority.weighted_priority_health_, + per_priority.overprovisioning_factor_, + initialization_object->cross_priority_host_map_); + } + + // Remove the CIO as we've initialized the cluster. + thread_local_deferred_clusters_.erase(entry); + + return cluster_entry_ptr; +} + +ClusterManagerImpl::ClusterInitializationObject::ClusterInitializationObject( + const ThreadLocalClusterUpdateParams& params, ClusterInfoConstSharedPtr cluster_info, + LoadBalancerFactorySharedPtr load_balancer_factory, HostMapConstSharedPtr map) + : cluster_info_(std::move(cluster_info)), load_balancer_factory_(load_balancer_factory), + cross_priority_host_map_(map) { + // Copy the update since the map is empty. + for (const auto& update : params.per_priority_update_params_) { + per_priority_state_.emplace(update.priority_, update); + } +} + +ClusterManagerImpl::ClusterInitializationObject::ClusterInitializationObject( + const absl::flat_hash_map& per_priority_state, + const ThreadLocalClusterUpdateParams& update_params, ClusterInfoConstSharedPtr cluster_info, + LoadBalancerFactorySharedPtr load_balancer_factory, HostMapConstSharedPtr map) + : per_priority_state_(per_priority_state), cluster_info_(std::move(cluster_info)), + load_balancer_factory_(load_balancer_factory), cross_priority_host_map_(map) { + + ASSERT(cluster_info_->type() == envoy::config::cluster::v3::Cluster::EDS, + fmt::format("Using merge constructor on possible non-mergable cluster of type {}", + cluster_info_->type())); + // Because EDS Clusters receive the entire ClusterLoadAssignment but only + // provides the delta we must process the hosts_added and hosts_removed and + // not simply overwrite with hosts added. + for (const auto& update : update_params.per_priority_update_params_) { + auto it = per_priority_state_.find(update.priority_); + if (it != per_priority_state_.end()) { + auto& priority_state = it->second; + // Merge the two per_priorities. + priority_state.update_hosts_params_ = update.update_hosts_params_; + priority_state.locality_weights_ = update.locality_weights_; + priority_state.weighted_priority_health_ = update.weighted_priority_health_; + priority_state.overprovisioning_factor_ = update.overprovisioning_factor_; + + // Merge the hosts vectors to just have hosts added. + // Assumes that the old host_added_ is exclusive to new hosts_added_ and + // new hosts_removed_ only refers to the old hosts_added_. + ASSERT(priority_state.hosts_removed_.empty(), + "Cluster Initialization Object should apply hosts " + "removed updates to hosts_added vector!"); + + // TODO(kbaichoo): replace with a more efficient algorithm. For example + // if the EDS cluster exposed the LoadAssignment we could just merge by + // overwriting hosts_added. + if (!update.hosts_removed_.empty()) { + // Remove all hosts to be removed from the old host_added. + auto& host_added = priority_state.hosts_added_; + auto removed_section = std::remove_if( + host_added.begin(), host_added.end(), + [hosts_removed = std::cref(update.hosts_removed_)](const HostSharedPtr& ptr) { + return std::find(hosts_removed.get().begin(), hosts_removed.get().end(), ptr) != + hosts_removed.get().end(); + }); + priority_state.hosts_added_.erase(removed_section, priority_state.hosts_added_.end()); + } + + // Add updated host_added. + priority_state.hosts_added_.reserve(priority_state.hosts_added_.size() + + update.hosts_added_.size()); + std::copy(update.hosts_added_.begin(), update.hosts_added_.end(), + std::back_inserter(priority_state.hosts_added_)); + + } else { + // Just copy the new priority. + per_priority_state_.emplace(update.priority_, update); + } + } +} + void ClusterManagerImpl::postThreadLocalHealthFailure(const HostSharedPtr& host) { tls_.runOnAllThreads([host](OptRef cluster_manager) { cluster_manager->onHostHealthFailure(host); @@ -1266,8 +1513,8 @@ ClusterManagerImpl::allocateOdCdsApi(const envoy::config::core::v3::ConfigSource OptRef odcds_resources_locator, ProtobufMessage::ValidationVisitor& validation_visitor) { // TODO(krnowak): Instead of creating a new handle every time, store the handles internally and - // return an already existing one if the config or locator matches. Note that this may need a way - // to clean up the unused handles, so we can close the unnecessary connections. + // return an already existing one if the config or locator matches. Note that this may need a + // way to clean up the unused handles, so we can close the unnecessary connections. auto odcds = OdCdsApiImpl::create(odcds_config, odcds_resources_locator, *this, *this, *stats_.rootScope(), validation_visitor); return OdCdsApiHandleImpl::create(*this, std::move(odcds)); @@ -1281,28 +1528,28 @@ ClusterManagerImpl::requestOnDemandClusterDiscovery(OdCdsApiSharedPtr odcds, std auto [handle, discovery_in_progress, invoker] = cluster_manager.cdm_.addCallback(name, std::move(callback)); - // This check will catch requests for discoveries from this thread only. If other thread requested - // the same discovery, we will detect it in the main thread later. + // This check will catch requests for discoveries from this thread only. If other thread + // requested the same discovery, we will detect it in the main thread later. if (discovery_in_progress) { ENVOY_LOG(debug, "cm odcds: on-demand discovery for cluster {} is already in progress, something else " "in thread {} has already requested it", name, cluster_manager.thread_local_dispatcher_.name()); - // This worker thread has already requested a discovery of a cluster with this name, so nothing - // more left to do here. + // This worker thread has already requested a discovery of a cluster with this name, so + // nothing more left to do here. // // We can't "just" return handle here, because handle is a part of the structured binding done // above. So it's not really a ClusterDiscoveryCallbackHandlePtr, but more like - // ClusterDiscoveryCallbackHandlePtr&, so named return value optimization does not apply here - - // it needs to be moved. + // ClusterDiscoveryCallbackHandlePtr&, so named return value optimization does not apply here + // - it needs to be moved. return std::move(handle); } ENVOY_LOG( debug, "cm odcds: forwarding the on-demand discovery request for cluster {} to the main thread", name); - // This seems to be the first request for discovery of this cluster in this worker thread. Rest of - // the process may only happen in the main thread. + // This seems to be the first request for discovery of this cluster in this worker thread. Rest + // of the process may only happen in the main thread. dispatcher_.post([this, odcds = std::move(odcds), timeout, name = std::move(name), invoker = std::move(invoker), &thread_local_dispatcher = cluster_manager.thread_local_dispatcher_] { @@ -1321,8 +1568,8 @@ ClusterManagerImpl::requestOnDemandClusterDiscovery(OdCdsApiSharedPtr odcds, std if (auto it = pending_cluster_creations_.find(name); it != pending_cluster_creations_.end()) { ENVOY_LOG(debug, "cm odcds: on-demand discovery for cluster {} is already in progress", name); - // We already began the discovery process for this cluster, nothing to do. If we got here, it - // means that it was other worker thread that requested the discovery. + // We already began the discovery process for this cluster, nothing to do. If we got here, + // it means that it was other worker thread that requested the discovery. return; } // Start the discovery. If the cluster gets discovered, cluster manager will warm it up and @@ -1338,8 +1585,8 @@ ClusterManagerImpl::requestOnDemandClusterDiscovery(OdCdsApiSharedPtr odcds, std // We can't "just" return handle here, because handle is a part of the structured binding done // above. So it's not really a ClusterDiscoveryCallbackHandlePtr, but more like - // ClusterDiscoveryCallbackHandlePtr&, so named return value optimization does not apply here - it - // needs to be moved. + // ClusterDiscoveryCallbackHandlePtr&, so named return value optimization does not apply here - + // it needs to be moved. return std::move(handle); } @@ -1424,7 +1671,8 @@ ClusterManagerImpl::dumpClusterConfigs(const Matchers::StringMatcher& name_match ClusterManagerImpl::ThreadLocalClusterManagerImpl::ThreadLocalClusterManagerImpl( ClusterManagerImpl& parent, Event::Dispatcher& dispatcher, const absl::optional& local_cluster_params) - : parent_(parent), thread_local_dispatcher_(dispatcher), cdm_(dispatcher.name(), *this) { + : parent_(parent), thread_local_dispatcher_(dispatcher), cdm_(dispatcher.name(), *this), + local_stats_(generateStats(*parent.stats_.rootScope(), dispatcher.name())) { // If local cluster is defined then we need to initialize it first. if (local_cluster_params.has_value()) { const auto& local_cluster_name = local_cluster_params->info_->name(); @@ -1432,6 +1680,7 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ThreadLocalClusterManagerImpl thread_local_clusters_[local_cluster_name] = std::make_unique( *this, local_cluster_params->info_, local_cluster_params->load_balancer_factory_); local_priority_set_ = &thread_local_clusters_[local_cluster_name]->prioritySet(); + local_stats_.clusters_inflated_.set(thread_local_clusters_.size()); } } @@ -1472,8 +1721,18 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::removeTcpConn( void ClusterManagerImpl::ThreadLocalClusterManagerImpl::removeHosts( const std::string& name, const HostVector& hosts_removed) { - ASSERT(thread_local_clusters_.find(name) != thread_local_clusters_.end()); - const auto& cluster_entry = thread_local_clusters_[name]; + auto entry = thread_local_clusters_.find(name); + // The if should only be possible if deferred cluster creation is enabled. + if (entry == thread_local_clusters_.end()) { + ASSERT( + parent_.deferred_cluster_creation_, + fmt::format("Cannot find ThreadLocalCluster {}, but deferred cluster creation is disabled.", + name)); + ASSERT(thread_local_deferred_clusters_.find(name) != thread_local_deferred_clusters_.end(), + "Cluster with removed host is neither deferred or inflated!"); + return; + } + const auto& cluster_entry = entry->second; ENVOY_LOG(debug, "removing hosts for TLS cluster {} removed {}", name, hosts_removed.size()); // We need to go through and purge any connection pools for hosts that got deleted. @@ -1510,9 +1769,9 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::onHostHealthFailure( // Network::ConnectionCallbacks. The last removed tcp conn will remove the TcpConnectionsMap // from host_tcp_conn_map_, so do not cache it between iterations. // - // TODO(ggreenway) PERF: If there are a large number of connections, this could take a long time - // and halt other useful work. Consider breaking up this work. Note that this behavior is noted - // in the configuration documentation in cluster setting + // TODO(ggreenway) PERF: If there are a large number of connections, this could take a long + // time and halt other useful work. Consider breaking up this work. Note that this behavior is + // noted in the configuration documentation in cluster setting // "close_connections_on_host_health_failure". Update the docs if this if this changes. while (true) { const auto& it = host_tcp_conn_map_.find(host); @@ -1654,8 +1913,8 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::~ClusterEntry() // cluster. // // TODO(mattklein123): Optimally, we would just fire member changed callbacks and remove all of - // the hosts inside of the HostImpl destructor. That is a change with wide implications, so we are - // going with a more targeted approach for now. + // the hosts inside of the HostImpl destructor. That is a change with wide implications, so we + // are going with a more targeted approach for now. drainConnPools(); } diff --git a/source/common/upstream/cluster_manager_impl.h b/source/common/upstream/cluster_manager_impl.h index 84ea8b79dbd4..a69082be5812 100644 --- a/source/common/upstream/cluster_manager_impl.h +++ b/source/common/upstream/cluster_manager_impl.h @@ -222,6 +222,18 @@ struct ClusterManagerStats { ALL_CLUSTER_MANAGER_STATS(GENERATE_COUNTER_STRUCT, GENERATE_GAUGE_STRUCT) }; +/** + * All thread local cluster manager stats. @see stats_macros.h + */ +#define ALL_THREAD_LOCAL_CLUSTER_MANAGER_STATS(GAUGE) GAUGE(clusters_inflated, NeverImport) + +/** + * Struct definition for all cluster manager stats. @see stats_macros.h + */ +struct ThreadLocalClusterManagerStats { + ALL_THREAD_LOCAL_CLUSTER_MANAGER_STATS(GENERATE_GAUGE_STRUCT) +}; + /** * Implementation of ClusterManager that reads from a proto configuration, maintains a central * cluster list, as well as thread local caches of each cluster and associated connection pools. @@ -370,8 +382,10 @@ class ClusterManagerImpl : public ClusterManager, struct PerPriority { PerPriority(uint32_t priority, const HostVector& hosts_added, const HostVector& hosts_removed) : hosts_added_(hosts_added), hosts_removed_(hosts_removed), priority_(priority) {} - - const HostVector hosts_added_; + // TODO(kbaichoo): make the hosts_added_ vector const and have the + // cluster initialization object have a stripped down version of this + // struct. + HostVector hosts_added_; const HostVector hosts_removed_; PrioritySet::UpdateHostsParams update_hosts_params_; LocalityWeightsConstSharedPtr locality_weights_; @@ -389,6 +403,34 @@ class ClusterManagerImpl : public ClusterManager, std::vector per_priority_update_params_; }; + /** + * A cluster initialization object (CIO) encapsulates the relevant information + * to create a cluster inline when there is traffic to it. We can thus use the + * CIO to deferred instantiating clusters on workers until they are used. + */ + struct ClusterInitializationObject { + ClusterInitializationObject(const ThreadLocalClusterUpdateParams& params, + ClusterInfoConstSharedPtr cluster_info, + LoadBalancerFactorySharedPtr load_balancer_factory, + HostMapConstSharedPtr map); + + ClusterInitializationObject( + const absl::flat_hash_map& + per_priority_state, + const ThreadLocalClusterUpdateParams& update_params, ClusterInfoConstSharedPtr cluster_info, + LoadBalancerFactorySharedPtr load_balancer_factory, HostMapConstSharedPtr map); + + absl::flat_hash_map per_priority_state_; + const ClusterInfoConstSharedPtr cluster_info_; + const LoadBalancerFactorySharedPtr load_balancer_factory_; + const HostMapConstSharedPtr cross_priority_host_map_; + }; + + using ClusterInitializationObjectConstSharedPtr = + std::shared_ptr; + using ClusterInitializationMap = + absl::flat_hash_map; + /** * An implementation of an on-demand CDS handle. It forwards the discovery request to the cluster * manager that created the handle. @@ -635,9 +677,22 @@ class ClusterManagerImpl : public ClusterManager, // Upstream::ClusterLifecycleCallbackHandler ClusterUpdateCallbacksHandlePtr addClusterUpdateCallbacks(ClusterUpdateCallbacks& cb) override; + /** + * Transparently initialize the given thread local cluster if possible using + * the Cluster Initialization object. + * + * @return The ClusterEntry of the newly initialized cluster or nullptr if there + * is no cluster deferred cluster with that name. + */ + ClusterEntry* initializeClusterInlineIfExists(absl::string_view cluster); + ClusterManagerImpl& parent_; Event::Dispatcher& thread_local_dispatcher_; + // Known clusters will exclusively exist in either `thread_local_clusters_` + // or `thread_local_deferred_clusters_`. absl::flat_hash_map thread_local_clusters_; + // Maps from a given cluster name to the CIO for that cluster. + ClusterInitializationMap thread_local_deferred_clusters_; ClusterConnectivityState cluster_manager_state_; @@ -651,6 +706,11 @@ class ClusterManagerImpl : public ClusterManager, const PrioritySet* local_priority_set_{}; bool destroying_{}; ClusterDiscoveryManager cdm_; + ThreadLocalClusterManagerStats local_stats_; + + private: + static ThreadLocalClusterManagerStats generateStats(Stats::Scope& scope, + const std::string& thread_name); }; struct ClusterData : public ClusterManagerCluster { @@ -783,6 +843,18 @@ class ClusterManagerImpl : public ClusterManager, void notifyClusterDiscoveryStatus(absl::string_view name, ClusterDiscoveryStatus status); private: + /** + * Builds the cluster initialization object for this given cluster. + * @return a ClusterInitializationObjectSharedPtr that can be used to create + * this cluster or nullptr if deferred cluster creation is off or the cluster + * type is not supported. + */ + ClusterInitializationObjectConstSharedPtr addOrUpdateClusterInitializationObjectIfSupported( + const ThreadLocalClusterUpdateParams& params, ClusterInfoConstSharedPtr cluster_info, + LoadBalancerFactorySharedPtr load_balancer_factory, HostMapConstSharedPtr map); + + bool deferralIsSupportedForCluster(const ClusterInfoConstSharedPtr& info) const; + ClusterManagerFactory& factory_; Runtime::Loader& runtime_; Stats::Store& stats_; @@ -796,6 +868,8 @@ class ClusterManagerImpl : public ClusterManager, private: ClusterMap warming_clusters_; + const bool deferred_cluster_creation_; + ClusterInitializationMap cluster_initialization_map_; absl::optional bind_config_; Outlier::EventLoggerSharedPtr outlier_event_logger_; const LocalInfo::LocalInfo& local_info_; diff --git a/source/common/upstream/cluster_update_tracker.cc b/source/common/upstream/cluster_update_tracker.cc index 15eefc0d2ca5..cf4e2274d185 100644 --- a/source/common/upstream/cluster_update_tracker.cc +++ b/source/common/upstream/cluster_update_tracker.cc @@ -12,11 +12,12 @@ ClusterUpdateTracker::ClusterUpdateTracker(ClusterManager& cm, const std::string } } -void ClusterUpdateTracker::onClusterAddOrUpdate(ThreadLocalCluster& cluster) { - if (cluster.info()->name() != cluster_name_) { +void ClusterUpdateTracker::onClusterAddOrUpdate(absl::string_view cluster_name, + ThreadLocalClusterCommand& get_cluster) { + if (cluster_name != cluster_name_) { return; } - thread_local_cluster_ = cluster; + thread_local_cluster_ = get_cluster(); } void ClusterUpdateTracker::onClusterRemoval(const std::string& cluster) { diff --git a/source/common/upstream/cluster_update_tracker.h b/source/common/upstream/cluster_update_tracker.h index 4660faf3c6d9..cfef30ae7abf 100644 --- a/source/common/upstream/cluster_update_tracker.h +++ b/source/common/upstream/cluster_update_tracker.h @@ -17,7 +17,8 @@ class ClusterUpdateTracker : public ClusterUpdateCallbacks { ThreadLocalClusterOptRef threadLocalCluster() { return thread_local_cluster_; }; // ClusterUpdateCallbacks - void onClusterAddOrUpdate(ThreadLocalCluster& cluster) override; + void onClusterAddOrUpdate(absl::string_view cluster_name, + ThreadLocalClusterCommand& get_cluster) override; void onClusterRemoval(const std::string& cluster) override; private: diff --git a/source/extensions/clusters/aggregate/cluster.cc b/source/extensions/clusters/aggregate/cluster.cc index ca7d40cca6e5..f7d780c546cf 100644 --- a/source/extensions/clusters/aggregate/cluster.cc +++ b/source/extensions/clusters/aggregate/cluster.cc @@ -113,10 +113,12 @@ void AggregateClusterLoadBalancer::refresh(OptRef excluded_cl priority_context_ = std::move(priority_context); } -void AggregateClusterLoadBalancer::onClusterAddOrUpdate(Upstream::ThreadLocalCluster& cluster) { - if (std::find(clusters_->begin(), clusters_->end(), cluster.info()->name()) != clusters_->end()) { - ENVOY_LOG(debug, "adding or updating cluster '{}' for aggregate cluster '{}'", - cluster.info()->name(), parent_info_->name()); +void AggregateClusterLoadBalancer::onClusterAddOrUpdate( + absl::string_view cluster_name, Upstream::ThreadLocalClusterCommand& get_cluster) { + if (std::find(clusters_->begin(), clusters_->end(), cluster_name) != clusters_->end()) { + ENVOY_LOG(debug, "adding or updating cluster '{}' for aggregate cluster '{}'", cluster_name, + parent_info_->name()); + auto& cluster = get_cluster(); refresh(); addMemberUpdateCallbackForCluster(cluster); } diff --git a/source/extensions/clusters/aggregate/cluster.h b/source/extensions/clusters/aggregate/cluster.h index 6990b07e92e2..3e2a6ae4f334 100644 --- a/source/extensions/clusters/aggregate/cluster.h +++ b/source/extensions/clusters/aggregate/cluster.h @@ -70,7 +70,8 @@ class AggregateClusterLoadBalancer : public Upstream::LoadBalancer, const ClusterSetConstSharedPtr& clusters); // Upstream::ClusterUpdateCallbacks - void onClusterAddOrUpdate(Upstream::ThreadLocalCluster& cluster) override; + void onClusterAddOrUpdate(absl::string_view cluster_name, + Upstream::ThreadLocalClusterCommand& get_cluster) override; void onClusterRemoval(const std::string& cluster_name) override; // Upstream::LoadBalancer diff --git a/source/extensions/clusters/eds/eds.cc b/source/extensions/clusters/eds/eds.cc index 4aaea4dfba38..9e1fe0b82315 100644 --- a/source/extensions/clusters/eds/eds.cc +++ b/source/extensions/clusters/eds/eds.cc @@ -385,6 +385,9 @@ void EdsClusterImpl::onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReas std::pair EdsClusterFactory::createClusterImpl(const envoy::config::cluster::v3::Cluster& cluster, ClusterFactoryContext& context) { + // TODO(kbaichoo): EDS cluster should be able to support loading it's + // configuration from the CustomClusterType protobuf. Currently it does not. + // See: https://github.com/envoyproxy/envoy/issues/28752 if (!cluster.has_eds_cluster_config()) { throw EnvoyException("cannot create an EDS cluster without an EDS config"); } diff --git a/source/extensions/filters/http/dynamic_forward_proxy/proxy_filter.cc b/source/extensions/filters/http/dynamic_forward_proxy/proxy_filter.cc index 3ab49ca0bf26..c1314d18c90d 100644 --- a/source/extensions/filters/http/dynamic_forward_proxy/proxy_filter.cc +++ b/source/extensions/filters/http/dynamic_forward_proxy/proxy_filter.cc @@ -111,8 +111,8 @@ ProxyFilterConfig::ThreadLocalClusterInfo::~ThreadLocalClusterInfo() { } } -void ProxyFilterConfig::onClusterAddOrUpdate(Upstream::ThreadLocalCluster& cluster) { - const std::string& cluster_name = cluster.info()->name(); +void ProxyFilterConfig::onClusterAddOrUpdate(absl::string_view cluster_name, + Upstream::ThreadLocalClusterCommand&) { ENVOY_LOG(debug, "thread local cluster {} added or updated", cluster_name); ThreadLocalClusterInfo& tls_cluster_info = *tls_slot_; auto it = tls_cluster_info.pending_clusters_.find(cluster_name); diff --git a/source/extensions/filters/http/dynamic_forward_proxy/proxy_filter.h b/source/extensions/filters/http/dynamic_forward_proxy/proxy_filter.h index db9a417a2ad9..e63b96107db8 100644 --- a/source/extensions/filters/http/dynamic_forward_proxy/proxy_filter.h +++ b/source/extensions/filters/http/dynamic_forward_proxy/proxy_filter.h @@ -59,7 +59,8 @@ class ProxyFilterConfig : public Upstream::ClusterUpdateCallbacks, Upstream::ClusterUpdateCallbacksHandlePtr addThreadLocalClusterUpdateCallbacks(); // Upstream::ClusterUpdateCallbacks - void onClusterAddOrUpdate(Upstream::ThreadLocalCluster& cluster) override; + void onClusterAddOrUpdate(absl::string_view cluster_name, + Upstream::ThreadLocalClusterCommand&) override; void onClusterRemoval(const std::string&) override; private: diff --git a/source/extensions/filters/network/redis_proxy/conn_pool_impl.cc b/source/extensions/filters/network/redis_proxy/conn_pool_impl.cc index 8c95c5f62e45..e745658cf5af 100644 --- a/source/extensions/filters/network/redis_proxy/conn_pool_impl.cc +++ b/source/extensions/filters/network/redis_proxy/conn_pool_impl.cc @@ -99,7 +99,10 @@ InstanceImpl::ThreadLocalPool::ThreadLocalPool( cluster_update_handle_ = parent->cm_.addThreadLocalClusterUpdateCallbacks(*this); Upstream::ThreadLocalCluster* cluster = parent->cm_.getThreadLocalCluster(cluster_name_); if (cluster != nullptr) { - onClusterAddOrUpdateNonVirtual(*cluster); + Upstream::ThreadLocalClusterCommand command = [&cluster]() -> Upstream::ThreadLocalCluster& { + return *cluster; + }; + onClusterAddOrUpdateNonVirtual(cluster->info()->name(), command); } } @@ -116,8 +119,8 @@ InstanceImpl::ThreadLocalPool::~ThreadLocalPool() { } void InstanceImpl::ThreadLocalPool::onClusterAddOrUpdateNonVirtual( - Upstream::ThreadLocalCluster& cluster) { - if (cluster.info()->name() != cluster_name_) { + absl::string_view cluster_name, Upstream::ThreadLocalClusterCommand& get_cluster) { + if (cluster_name != cluster_name_) { return; } // Ensure the filter is not deleted in the main thread during this method. @@ -132,6 +135,7 @@ void InstanceImpl::ThreadLocalPool::onClusterAddOrUpdateNonVirtual( } ASSERT(cluster_ == nullptr); + auto& cluster = get_cluster(); cluster_ = &cluster; // Update username and password when cluster updates. auth_username_ = ProtocolOptionsConfigImpl::authUsername(cluster_->info(), shared_parent->api_); diff --git a/source/extensions/filters/network/redis_proxy/conn_pool_impl.h b/source/extensions/filters/network/redis_proxy/conn_pool_impl.h index a7d43c5fcdb7..fe33f44ae5a0 100644 --- a/source/extensions/filters/network/redis_proxy/conn_pool_impl.h +++ b/source/extensions/filters/network/redis_proxy/conn_pool_impl.h @@ -159,14 +159,16 @@ class InstanceImpl : public Instance, public std::enable_shared_from_this& hosts_added); void onHostsRemoved(const std::vector& hosts_removed); void drainClients(); // Upstream::ClusterUpdateCallbacks - void onClusterAddOrUpdate(Upstream::ThreadLocalCluster& cluster) override { - onClusterAddOrUpdateNonVirtual(cluster); + void onClusterAddOrUpdate(absl::string_view cluster_name, + Upstream::ThreadLocalClusterCommand& get_cluster) override { + onClusterAddOrUpdateNonVirtual(cluster_name, get_cluster); } void onClusterRemoval(const std::string& cluster_name) override; diff --git a/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc b/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc index 903e46fc1ec3..26e12bceba9a 100644 --- a/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc +++ b/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc @@ -17,7 +17,10 @@ UdpProxyFilter::UdpProxyFilter(Network::UdpReadFilterCallbacks& callbacks, for (const auto& entry : config_->allClusterNames()) { Upstream::ThreadLocalCluster* cluster = config->clusterManager().getThreadLocalCluster(entry); if (cluster != nullptr) { - onClusterAddOrUpdate(*cluster); + Upstream::ThreadLocalClusterCommand command = [&cluster]() -> Upstream::ThreadLocalCluster& { + return *cluster; + }; + onClusterAddOrUpdate(cluster->info()->name(), command); } } @@ -36,9 +39,11 @@ UdpProxyFilter::~UdpProxyFilter() { } } -void UdpProxyFilter::onClusterAddOrUpdate(Upstream::ThreadLocalCluster& cluster) { - auto cluster_name = cluster.info()->name(); +void UdpProxyFilter::onClusterAddOrUpdate(absl::string_view cluster_name, + Upstream::ThreadLocalClusterCommand& get_cluster) { ENVOY_LOG(debug, "udp proxy: attaching to cluster {}", cluster_name); + + auto& cluster = get_cluster(); ASSERT((!cluster_infos_.contains(cluster_name)) || &cluster_infos_[cluster_name]->cluster_ != &cluster); diff --git a/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.h b/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.h index a31b66c25696..a632e05951d8 100644 --- a/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.h +++ b/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.h @@ -394,7 +394,8 @@ class UdpProxyFilter : public Network::UdpListenerReadFilter, void fillProxyStreamInfo(); // Upstream::ClusterUpdateCallbacks - void onClusterAddOrUpdate(Upstream::ThreadLocalCluster& cluster) final; + void onClusterAddOrUpdate(absl::string_view cluster_name, + Upstream::ThreadLocalClusterCommand& get_cluster) final; void onClusterRemoval(const std::string& cluster_name) override; const UdpProxyFilterConfigSharedPtr config_; diff --git a/test/common/grpc/grpc_client_integration.h b/test/common/grpc/grpc_client_integration.h index 84abeda51cf3..70993af3d200 100644 --- a/test/common/grpc/grpc_client_integration.h +++ b/test/common/grpc/grpc_client_integration.h @@ -121,6 +121,26 @@ class DeltaSotwIntegrationParamTest SotwOrDelta sotwOrDelta() const { return std::get<2>(GetParam()); } }; +class DeltaSotwDeferredClustersIntegrationParamTest + : public BaseGrpcClientIntegrationParamTest, + public testing::TestWithParam< + std::tuple> { +public: + ~DeltaSotwDeferredClustersIntegrationParamTest() override = default; + static std::string protocolTestParamsToString( + const ::testing::TestParamInfo< + std::tuple>& p) { + return fmt::format("{}_{}_{}_{}", TestUtility::ipVersionToString(std::get<0>(p.param)), + std::get<1>(p.param) == ClientType::GoogleGrpc ? "GoogleGrpc" : "EnvoyGrpc", + std::get<2>(p.param) == SotwOrDelta::Delta ? "Delta" : "StateOfTheWorld", + std::get<3>(p.param) == true ? "DeferredClusters" : ""); + } + Network::Address::IpVersion ipVersion() const override { return std::get<0>(GetParam()); } + ClientType clientType() const override { return std::get<1>(GetParam()); } + SotwOrDelta sotwOrDelta() const { return std::get<2>(GetParam()); } + bool useDeferredCluster() const { return std::get<3>(GetParam()); } +}; + // Skip tests based on gRPC client type. #define SKIP_IF_GRPC_CLIENT(client_type) \ if (clientType() == (client_type)) { \ @@ -142,6 +162,11 @@ class DeltaSotwIntegrationParamTest testing::Combine(testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), \ testing::ValuesIn(TestEnvironment::getsGrpcVersionsForTest()), \ testing::Values(Grpc::SotwOrDelta::Sotw, Grpc::SotwOrDelta::Delta)) +#define DELTA_SOTW_GRPC_CLIENT_DEFERRED_CLUSTERS_INTEGRATION_PARAMS \ + testing::Combine(testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), \ + testing::ValuesIn(TestEnvironment::getsGrpcVersionsForTest()), \ + testing::Values(Grpc::SotwOrDelta::Sotw, Grpc::SotwOrDelta::Delta), \ + testing::Values(true, false)) #define UNIFIED_LEGACY_GRPC_CLIENT_INTEGRATION_PARAMS \ testing::Combine(testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), \ testing::ValuesIn(TestEnvironment::getsGrpcVersionsForTest()), \ diff --git a/test/common/upstream/BUILD b/test/common/upstream/BUILD index b9dd58f3dcab..4b5bf65ce077 100644 --- a/test/common/upstream/BUILD +++ b/test/common/upstream/BUILD @@ -58,6 +58,28 @@ envoy_cc_test( ], ) +envoy_cc_test( + name = "deferred_cluster_initialization_test", + srcs = ["deferred_cluster_initialization_test.cc"], + external_deps = [ + "abseil_base", + ], + deps = [ + ":test_cluster_manager", + "//envoy/upstream:cluster_manager_interface", + "//source/extensions/clusters/eds:eds_lib", + "//source/extensions/clusters/static:static_cluster_lib", + "//test/mocks/config:config_mocks", + "//test/test_common:simulated_time_system_lib", + "//test/test_common:utility_lib", + "@envoy_api//envoy/config/bootstrap/v3:pkg_cc_proto", + "@envoy_api//envoy/config/cluster/v3:pkg_cc_proto", + "@envoy_api//envoy/config/core/v3:pkg_cc_proto", + "@envoy_api//envoy/config/endpoint/v3:pkg_cc_proto", + "@envoy_api//envoy/service/discovery/v3:pkg_cc_proto", + ], +) + envoy_cc_test( name = "cluster_manager_impl_test", size = "large", diff --git a/test/common/upstream/cluster_discovery_manager_test.cc b/test/common/upstream/cluster_discovery_manager_test.cc index e1a4175687e1..c88bb75de319 100644 --- a/test/common/upstream/cluster_discovery_manager_test.cc +++ b/test/common/upstream/cluster_discovery_manager_test.cc @@ -36,7 +36,8 @@ class TestClusterLifecycleCallbackHandler : public ClusterLifecycleCallbackHandl void invokeClusterAdded(ThreadLocalCluster& cluster) { for (auto& cb : update_callbacks_) { - cb->onClusterAddOrUpdate(cluster); + ThreadLocalClusterCommand command = [&cluster]() -> ThreadLocalCluster& { return cluster; }; + cb->onClusterAddOrUpdate(cluster.info()->name(), command); } } diff --git a/test/common/upstream/cluster_manager_impl_test.cc b/test/common/upstream/cluster_manager_impl_test.cc index 322668d9ebbb..b6de4c485a27 100644 --- a/test/common/upstream/cluster_manager_impl_test.cc +++ b/test/common/upstream/cluster_manager_impl_test.cc @@ -91,7 +91,7 @@ class ClusterManagerImplTest : public testing::Test { router_context_(factory_.stats_.symbolTable()), registered_dns_factory_(dns_resolver_factory_) {} - void create(const envoy::config::bootstrap::v3::Bootstrap& bootstrap) { + virtual void create(const envoy::config::bootstrap::v3::Bootstrap& bootstrap) { cluster_manager_ = std::make_unique( bootstrap, factory_, factory_.stats_, factory_.tls_, factory_.runtime_, factory_.local_info_, log_manager_, factory_.dispatcher_, admin_, validation_context_, @@ -1224,10 +1224,27 @@ TEST_F(ClusterManagerImplTest, VerifyBufferLimits) { factory_.tls_.shutdownThread(); } -TEST_F(ClusterManagerImplTest, ShutdownOrder) { +class ClusterManagerLifecycleTest : public ClusterManagerImplTest, + public testing::WithParamInterface { +protected: + void create(const envoy::config::bootstrap::v3::Bootstrap& bootstrap) override { + if (useDeferredCluster()) { + auto bootstrap_with_deferred_cluster = bootstrap; + bootstrap_with_deferred_cluster.mutable_cluster_manager() + ->set_enable_deferred_cluster_creation(true); + ClusterManagerImplTest::create(bootstrap_with_deferred_cluster); + } else { + ClusterManagerImplTest::create(bootstrap); + } + } + bool useDeferredCluster() const { return GetParam(); } +}; + +INSTANTIATE_TEST_SUITE_P(ClusterManagerLifecycleTest, ClusterManagerLifecycleTest, testing::Bool()); + +TEST_P(ClusterManagerLifecycleTest, ShutdownOrder) { const std::string json = fmt::sprintf("{\"static_resources\":{%s}}", clustersJson({defaultStaticClusterJson("cluster_1")})); - create(parseBootstrapFromV3Json(json)); Cluster& cluster = cluster_manager_->activeClusters().begin()->second; EXPECT_EQ("cluster_1", cluster.info()->name()); @@ -1242,11 +1259,21 @@ TEST_F(ClusterManagerImplTest, ShutdownOrder) { cluster_manager_->getThreadLocalCluster("cluster_1")->loadBalancer().chooseHost(nullptr)); // Local reference, primary reference, thread local reference, host reference - EXPECT_EQ(4U, cluster.info().use_count()); + if (useDeferredCluster()) { + // Additional reference in Cluster Initialization Object. + EXPECT_EQ(5U, cluster.info().use_count()); + } else { + EXPECT_EQ(4U, cluster.info().use_count()); + } // Thread local reference should be gone. factory_.tls_.shutdownThread(); - EXPECT_EQ(3U, cluster.info().use_count()); + if (useDeferredCluster()) { + // Additional reference in Cluster Initialization Object. + EXPECT_EQ(4U, cluster.info().use_count()); + } else { + EXPECT_EQ(3U, cluster.info().use_count()); + } } TEST_F(ClusterManagerImplTest, TwoEqualCommonLbConfigSharedPool) { @@ -1275,7 +1302,7 @@ TEST_F(ClusterManagerImplTest, TwoUnequalCommonLbConfigSharedPool) { EXPECT_NE(common_config_ptr_a, common_config_ptr_b); } -TEST_F(ClusterManagerImplTest, InitializeOrder) { +TEST_P(ClusterManagerLifecycleTest, InitializeOrder) { time_system_.setSystemTime(std::chrono::milliseconds(1234567891234)); const std::string json = fmt::sprintf( @@ -1487,7 +1514,7 @@ TEST_F(ClusterManagerImplTest, InitializeOrder) { EXPECT_TRUE(Mock::VerifyAndClearExpectations(cluster5.get())); } -TEST_F(ClusterManagerImplTest, DynamicRemoveWithLocalCluster) { +TEST_P(ClusterManagerLifecycleTest, DynamicRemoveWithLocalCluster) { InSequence s; // Setup a cluster manager with a static local cluster. @@ -1547,7 +1574,7 @@ TEST_F(ClusterManagerImplTest, DynamicRemoveWithLocalCluster) { EXPECT_TRUE(Mock::VerifyAndClearExpectations(cluster1.get())); } -TEST_F(ClusterManagerImplTest, RemoveWarmingCluster) { +TEST_P(ClusterManagerLifecycleTest, RemoveWarmingCluster) { time_system_.setSystemTime(std::chrono::milliseconds(1234567891234)); create(defaultConfig()); @@ -1592,7 +1619,7 @@ TEST_F(ClusterManagerImplTest, RemoveWarmingCluster) { EXPECT_TRUE(Mock::VerifyAndClearExpectations(cluster1.get())); } -TEST_F(ClusterManagerImplTest, TestModifyWarmingClusterDuringInitialization) { +TEST_P(ClusterManagerLifecycleTest, TestModifyWarmingClusterDuringInitialization) { const std::string json = fmt::sprintf( R"EOF( { @@ -1691,7 +1718,7 @@ TEST_F(ClusterManagerImplTest, TestModifyWarmingClusterDuringInitialization) { EXPECT_TRUE(Mock::VerifyAndClearExpectations(cds_cluster.get())); } -TEST_F(ClusterManagerImplTest, ModifyWarmingCluster) { +TEST_P(ClusterManagerLifecycleTest, ModifyWarmingCluster) { time_system_.setSystemTime(std::chrono::milliseconds(1234567891234)); create(defaultConfig()); @@ -1775,7 +1802,7 @@ TEST_F(ClusterManagerImplTest, ModifyWarmingCluster) { // Regression test for https://github.com/envoyproxy/envoy/issues/14598. // Make sure the revert isn't blocked due to being the same as the active version. -TEST_F(ClusterManagerImplTest, TestRevertWarmingCluster) { +TEST_P(ClusterManagerLifecycleTest, TestRevertWarmingCluster) { time_system_.setSystemTime(std::chrono::milliseconds(1234567891234)); create(defaultConfig()); @@ -1855,7 +1882,7 @@ TEST_F(ClusterManagerImplTest, TestRevertWarmingCluster) { } // Verify that shutting down the cluster manager destroys warming clusters. -TEST_F(ClusterManagerImplTest, ShutdownWithWarming) { +TEST_P(ClusterManagerLifecycleTest, ShutdownWithWarming) { create(defaultConfig()); InSequence s; @@ -1877,7 +1904,7 @@ TEST_F(ClusterManagerImplTest, ShutdownWithWarming) { EXPECT_TRUE(Mock::VerifyAndClearExpectations(cluster1.get())); } -TEST_F(ClusterManagerImplTest, DynamicAddRemove) { +TEST_P(ClusterManagerLifecycleTest, DynamicAddRemove) { create(defaultConfig()); InSequence s; @@ -1894,7 +1921,7 @@ TEST_F(ClusterManagerImplTest, DynamicAddRemove) { .WillOnce(Return(std::make_pair(cluster1, nullptr))); EXPECT_CALL(*cluster1, initializePhase()).Times(0); EXPECT_CALL(*cluster1, initialize(_)); - EXPECT_CALL(*callbacks, onClusterAddOrUpdate(_)); + EXPECT_CALL(*callbacks, onClusterAddOrUpdate(_, _)); EXPECT_TRUE(cluster_manager_->addOrUpdateCluster(defaultStaticCluster("fake_cluster"), "")); checkStats(1 /*added*/, 0 /*modified*/, 0 /*removed*/, 0 /*active*/, 1 /*warming*/); EXPECT_EQ(1, cluster_manager_->warmingClusterCount()); @@ -1923,7 +1950,7 @@ TEST_F(ClusterManagerImplTest, DynamicAddRemove) { // Test inline init. initialize_callback(); })); - EXPECT_CALL(*callbacks, onClusterAddOrUpdate(_)); + EXPECT_CALL(*callbacks, onClusterAddOrUpdate(_, _)); EXPECT_TRUE(cluster_manager_->addOrUpdateCluster(update_cluster, "")); EXPECT_EQ(cluster2->info_, cluster_manager_->getThreadLocalCluster("fake_cluster")->info()); @@ -1981,7 +2008,7 @@ TEST_F(ClusterManagerImplTest, DynamicAddRemove) { } // Validates that a callback can remove itself from the callbacks list. -TEST_F(ClusterManagerImplTest, ClusterAddOrUpdateCallbackRemovalDuringIteration) { +TEST_P(ClusterManagerLifecycleTest, ClusterAddOrUpdateCallbackRemovalDuringIteration) { create(defaultConfig()); InSequence s; @@ -1998,10 +2025,11 @@ TEST_F(ClusterManagerImplTest, ClusterAddOrUpdateCallbackRemovalDuringIteration) .WillOnce(Return(std::make_pair(cluster1, nullptr))); EXPECT_CALL(*cluster1, initializePhase()).Times(0); EXPECT_CALL(*cluster1, initialize(_)); - EXPECT_CALL(*callbacks, onClusterAddOrUpdate(_)).WillOnce(Invoke([&cb](ThreadLocalCluster&) { - // This call will remove the callback from the list. - cb.reset(); - })); + EXPECT_CALL(*callbacks, onClusterAddOrUpdate(_, _)) + .WillOnce(Invoke([&cb](absl::string_view, ThreadLocalClusterCommand&) { + // This call will remove the callback from the list. + cb.reset(); + })); EXPECT_TRUE(cluster_manager_->addOrUpdateCluster(defaultStaticCluster("fake_cluster"), "")); checkStats(1 /*added*/, 0 /*modified*/, 0 /*removed*/, 0 /*active*/, 1 /*warming*/); EXPECT_EQ(1, cluster_manager_->warmingClusterCount()); @@ -2029,7 +2057,7 @@ TEST_F(ClusterManagerImplTest, ClusterAddOrUpdateCallbackRemovalDuringIteration) })); // There shouldn't be a call to onClusterAddOrUpdate on the callbacks as the // handler was removed. - EXPECT_CALL(*callbacks, onClusterAddOrUpdate(_)).Times(0); + EXPECT_CALL(*callbacks, onClusterAddOrUpdate(_, _)).Times(0); EXPECT_TRUE(cluster_manager_->addOrUpdateCluster(update_cluster, "")); checkStats(1 /*added*/, 1 /*modified*/, 0 /*removed*/, 1 /*active*/, 0 /*warming*/); @@ -2039,7 +2067,7 @@ TEST_F(ClusterManagerImplTest, ClusterAddOrUpdateCallbackRemovalDuringIteration) EXPECT_TRUE(Mock::VerifyAndClearExpectations(callbacks.get())); } -TEST_F(ClusterManagerImplTest, AddOrUpdateClusterStaticExists) { +TEST_P(ClusterManagerLifecycleTest, AddOrUpdateClusterStaticExists) { const std::string json = fmt::sprintf("{\"static_resources\":{%s}}", clustersJson({defaultStaticClusterJson("fake_cluster")})); std::shared_ptr cluster1(new NiceMock()); @@ -2068,7 +2096,7 @@ TEST_F(ClusterManagerImplTest, AddOrUpdateClusterStaticExists) { } // Verifies that we correctly propagate the host_set state to the TLS clusters. -TEST_F(ClusterManagerImplTest, HostsPostedToTlsCluster) { +TEST_P(ClusterManagerLifecycleTest, HostsPostedToTlsCluster) { const std::string json = fmt::sprintf("{\"static_resources\":{%s}}", clustersJson({defaultStaticClusterJson("fake_cluster")})); std::shared_ptr cluster1(new NiceMock()); @@ -2117,7 +2145,7 @@ TEST_F(ClusterManagerImplTest, HostsPostedToTlsCluster) { } // Test that we close all HTTP connection pool connections when there is a host health failure. -TEST_F(ClusterManagerImplTest, CloseHttpConnectionsOnHealthFailure) { +TEST_P(ClusterManagerLifecycleTest, CloseHttpConnectionsOnHealthFailure) { const std::string json = fmt::sprintf("{\"static_resources\":{%s}}", clustersJson({defaultStaticClusterJson("some_cluster")})); std::shared_ptr cluster1(new NiceMock()); @@ -2184,7 +2212,7 @@ TEST_F(ClusterManagerImplTest, CloseHttpConnectionsOnHealthFailure) { // Test that we drain or close all HTTP or TCP connection pool connections when there is a host // health failure and 'CLOSE_CONNECTIONS_ON_HOST_HEALTH_FAILURE' set to true. -TEST_F(ClusterManagerImplTest, +TEST_P(ClusterManagerLifecycleTest, CloseConnectionsOnHealthFailureWithCloseConnectionsOnHostHealthFailure) { const std::string json = fmt::sprintf("{\"static_resources\":{%s}}", clustersJson({defaultStaticClusterJson("some_cluster")})); @@ -2240,7 +2268,7 @@ TEST_F(ClusterManagerImplTest, // Verify that the pool gets deleted if it is idle, and that a crash does not occur due to // deleting a container while iterating through it (see `do_not_delete_` in // `ClusterManagerImpl::ThreadLocalClusterManagerImpl::onHostHealthFailure()`). -TEST_F(ClusterManagerImplTest, CloseHttpConnectionsAndDeletePoolOnHealthFailure) { +TEST_P(ClusterManagerLifecycleTest, CloseHttpConnectionsAndDeletePoolOnHealthFailure) { const std::string json = fmt::sprintf("{\"static_resources\":{%s}}", clustersJson({defaultStaticClusterJson("some_cluster")})); std::shared_ptr cluster1(new NiceMock()); @@ -2287,7 +2315,7 @@ TEST_F(ClusterManagerImplTest, CloseHttpConnectionsAndDeletePoolOnHealthFailure) } // Test that we close all TCP connection pool connections when there is a host health failure. -TEST_F(ClusterManagerImplTest, CloseTcpConnectionPoolsOnHealthFailure) { +TEST_P(ClusterManagerLifecycleTest, CloseTcpConnectionPoolsOnHealthFailure) { const std::string json = fmt::sprintf("{\"static_resources\":{%s}}", clustersJson({defaultStaticClusterJson("some_cluster")})); std::shared_ptr cluster1(new NiceMock()); @@ -2354,7 +2382,7 @@ TEST_F(ClusterManagerImplTest, CloseTcpConnectionPoolsOnHealthFailure) { // Test that we close all TCP connection pool connections when there is a host health failure, // when configured to do so. -TEST_F(ClusterManagerImplTest, CloseTcpConnectionsOnHealthFailure) { +TEST_P(ClusterManagerLifecycleTest, CloseTcpConnectionsOnHealthFailure) { const std::string yaml = R"EOF( static_resources: clusters: @@ -2432,7 +2460,7 @@ TEST_F(ClusterManagerImplTest, CloseTcpConnectionsOnHealthFailure) { // Test that we do not close TCP connection pool connections when there is a host health failure, // when not configured to do so. -TEST_F(ClusterManagerImplTest, DoNotCloseTcpConnectionsOnHealthFailure) { +TEST_P(ClusterManagerLifecycleTest, DoNotCloseTcpConnectionsOnHealthFailure) { const std::string yaml = R"EOF( static_resources: clusters: @@ -2482,7 +2510,7 @@ TEST_F(ClusterManagerImplTest, DoNotCloseTcpConnectionsOnHealthFailure) { EXPECT_TRUE(Mock::VerifyAndClearExpectations(cluster1.get())); } -TEST_F(ClusterManagerImplTest, DynamicHostRemove) { +TEST_P(ClusterManagerLifecycleTest, DynamicHostRemove) { const std::string yaml = R"EOF( static_resources: clusters: @@ -2619,7 +2647,7 @@ TEST_F(ClusterManagerImplTest, DynamicHostRemove) { factory_.tls_.shutdownThread(); } -TEST_F(ClusterManagerImplTest, DynamicHostRemoveWithTls) { +TEST_P(ClusterManagerLifecycleTest, DynamicHostRemoveWithTls) { const std::string yaml = R"EOF( static_resources: clusters: @@ -2843,7 +2871,7 @@ TEST_F(ClusterManagerImplTest, DynamicHostRemoveWithTls) { // Test that default DNS resolver with TCP lookups is used, when there are no DNS custom resolvers // configured per cluster and `dns_resolver_options.use_tcp_for_dns_lookups` is set in bootstrap // config. -TEST_F(ClusterManagerImplTest, UseTcpInDefaultDnsResolver) { +TEST_P(ClusterManagerLifecycleTest, UseTcpInDefaultDnsResolver) { const std::string yaml = R"EOF( dns_resolution_config: dns_resolver_options: @@ -2871,7 +2899,7 @@ TEST_F(ClusterManagerImplTest, UseTcpInDefaultDnsResolver) { // Test that custom DNS resolver is used, when custom resolver is configured // per cluster and deprecated field `dns_resolvers` is specified. -TEST_F(ClusterManagerImplTest, CustomDnsResolverSpecifiedViaDeprecatedField) { +TEST_P(ClusterManagerLifecycleTest, CustomDnsResolverSpecifiedViaDeprecatedField) { const std::string yaml = R"EOF( static_resources: clusters: @@ -2907,7 +2935,7 @@ TEST_F(ClusterManagerImplTest, CustomDnsResolverSpecifiedViaDeprecatedField) { // Test that custom DNS resolver is used, when custom resolver is configured // per cluster and deprecated field `dns_resolvers` is specified with multiple resolvers. -TEST_F(ClusterManagerImplTest, CustomDnsResolverSpecifiedViaDeprecatedFieldMultipleResolvers) { +TEST_P(ClusterManagerLifecycleTest, CustomDnsResolverSpecifiedViaDeprecatedFieldMultipleResolvers) { const std::string yaml = R"EOF( static_resources: clusters: @@ -3585,7 +3613,7 @@ TEST_F(ClusterManagerImplTest, TypedDnsResolverConfigSpecifiedOveridingDeprecate // ClusterManagerImpl::ThreadLocalClusterManagerImpl::drainConnPools(), where a removal at one // priority from the ConnPoolsContainer would delete the ConnPoolsContainer mid-iteration over the // pool. -TEST_F(ClusterManagerImplTest, DynamicHostRemoveDefaultPriority) { +TEST_P(ClusterManagerLifecycleTest, DynamicHostRemoveDefaultPriority) { const std::string yaml = R"EOF( static_resources: clusters: @@ -3677,7 +3705,7 @@ class MockTcpConnPoolWithDestroy : public Tcp::ConnectionPool::MockInstance { // Regression test for https://github.com/envoyproxy/envoy/issues/3518. Make sure we handle a // drain callback during CP destroy. -TEST_F(ClusterManagerImplTest, ConnPoolDestroyWithDraining) { +TEST_P(ClusterManagerLifecycleTest, ConnPoolDestroyWithDraining) { const std::string yaml = R"EOF( static_resources: clusters: @@ -3793,7 +3821,7 @@ TEST_F(ClusterManagerImplTest, OriginalDstInitialization) { // there's no hosts changes in between. // Also tests that if hosts are added/removed between mergeable updates, delivery will // happen and the scheduled update will be cancelled. -TEST_F(ClusterManagerImplTest, MergedUpdates) { +TEST_P(ClusterManagerLifecycleTest, MergedUpdates) { createWithLocalClusterUpdate(); // Ensure we see the right set of added/removed hosts on every call. @@ -3930,7 +3958,7 @@ TEST_F(ClusterManagerImplTest, MergedUpdates) { } // Tests that mergeable updates outside of a window get applied immediately. -TEST_F(ClusterManagerImplTest, MergedUpdatesOutOfWindow) { +TEST_P(ClusterManagerLifecycleTest, MergedUpdatesOutOfWindow) { createWithLocalClusterUpdate(); // Ensure we see the right set of added/removed hosts on every call. @@ -3966,7 +3994,7 @@ TEST_F(ClusterManagerImplTest, MergedUpdatesOutOfWindow) { } // Tests that mergeable updates inside of a window are not applied immediately. -TEST_F(ClusterManagerImplTest, MergedUpdatesInsideWindow) { +TEST_P(ClusterManagerLifecycleTest, MergedUpdatesInsideWindow) { createWithLocalClusterUpdate(); Cluster& cluster = cluster_manager_->activeClusters().begin()->second; @@ -3994,7 +4022,7 @@ TEST_F(ClusterManagerImplTest, MergedUpdatesInsideWindow) { // Tests that mergeable updates outside of a window get applied immediately when // merging is disabled, and that the counters are correct. -TEST_F(ClusterManagerImplTest, MergedUpdatesOutOfWindowDisabled) { +TEST_P(ClusterManagerLifecycleTest, MergedUpdatesOutOfWindowDisabled) { createWithLocalClusterUpdate(false); // Ensure we see the right set of added/removed hosts on every call. @@ -4027,7 +4055,7 @@ TEST_F(ClusterManagerImplTest, MergedUpdatesOutOfWindowDisabled) { EXPECT_EQ(0, factory_.stats_.counter("cluster_manager.update_merge_cancelled").value()); } -TEST_F(ClusterManagerImplTest, MergedUpdatesDestroyedOnUpdate) { +TEST_P(ClusterManagerLifecycleTest, MergedUpdatesDestroyedOnUpdate) { // We create the default cluster, although for this test we won't use it since // we can only update dynamic clusters. createWithLocalClusterUpdate(); @@ -4332,7 +4360,7 @@ TEST_F(ClusterManagerImplTest, HttpPoolDataForwardsCallsToConnectionPool) { // Test that the read only cross-priority host map in the main thread is correctly synchronized to // the worker thread when the cluster's host set is updated. -TEST_F(ClusterManagerImplTest, CrossPriorityHostMapSyncTest) { +TEST_P(ClusterManagerLifecycleTest, CrossPriorityHostMapSyncTest) { std::string yaml = R"EOF( static_resources: clusters: @@ -5461,7 +5489,7 @@ TEST_F(TcpKeepaliveTest, TcpKeepaliveWithAllOptions) { } // Make sure the drainConnections() with a predicate can correctly exclude a host. -TEST_F(ClusterManagerImplTest, DrainConnectionsPredicate) { +TEST_P(ClusterManagerLifecycleTest, DrainConnectionsPredicate) { const std::string yaml = R"EOF( static_resources: clusters: @@ -5506,7 +5534,7 @@ TEST_F(ClusterManagerImplTest, DrainConnectionsPredicate) { }); } -TEST_F(ClusterManagerImplTest, ConnPoolsDrainedOnHostSetChange) { +TEST_P(ClusterManagerLifecycleTest, ConnPoolsDrainedOnHostSetChange) { const std::string yaml = R"EOF( static_resources: clusters: @@ -5649,7 +5677,7 @@ TEST_F(ClusterManagerImplTest, ConnPoolsDrainedOnHostSetChange) { hosts_added, {}, absl::nullopt, 100); } -TEST_F(ClusterManagerImplTest, ConnPoolsNotDrainedOnHostSetChange) { +TEST_P(ClusterManagerLifecycleTest, ConnPoolsNotDrainedOnHostSetChange) { const std::string yaml = R"EOF( static_resources: clusters: @@ -5718,7 +5746,7 @@ TEST_F(ClusterManagerImplTest, ConnPoolsNotDrainedOnHostSetChange) { hosts_added, {}, absl::nullopt, 100); } -TEST_F(ClusterManagerImplTest, ConnPoolsIdleDeleted) { +TEST_P(ClusterManagerLifecycleTest, ConnPoolsIdleDeleted) { TestScopedRuntime scoped_runtime; const std::string yaml = R"EOF( diff --git a/test/common/upstream/cluster_update_tracker_test.cc b/test/common/upstream/cluster_update_tracker_test.cc index c97ac7831db1..9ab61529bf07 100644 --- a/test/common/upstream/cluster_update_tracker_test.cc +++ b/test/common/upstream/cluster_update_tracker_test.cc @@ -52,14 +52,16 @@ TEST_F(ClusterUpdateTrackerTest, ShouldProperlyHandleUpdateCallbacks) { { // Simulate addition of an irrelevant cluster. - cluster_tracker.onClusterAddOrUpdate(irrelevant_); + ThreadLocalClusterCommand command = [this]() -> ThreadLocalCluster& { return irrelevant_; }; + cluster_tracker.onClusterAddOrUpdate("unrelated_cluster", command); EXPECT_FALSE(cluster_tracker.threadLocalCluster().has_value()); } { // Simulate addition of the relevant cluster. - cluster_tracker.onClusterAddOrUpdate(expected_); + ThreadLocalClusterCommand command = [this]() -> ThreadLocalCluster& { return expected_; }; + cluster_tracker.onClusterAddOrUpdate(cluster_name_, command); ASSERT_TRUE(cluster_tracker.threadLocalCluster().has_value()); EXPECT_EQ(cluster_tracker.threadLocalCluster()->get().info(), expected_.cluster_.info_); diff --git a/test/common/upstream/deferred_cluster_initialization_test.cc b/test/common/upstream/deferred_cluster_initialization_test.cc new file mode 100644 index 000000000000..ff0824bf0e71 --- /dev/null +++ b/test/common/upstream/deferred_cluster_initialization_test.cc @@ -0,0 +1,815 @@ +#include "envoy/config/bootstrap/v3/bootstrap.pb.h" +#include "envoy/config/cluster/v3/cluster.pb.h" +#include "envoy/config/cluster/v3/cluster.pb.validate.h" +#include "envoy/config/core/v3/base.pb.h" +#include "envoy/config/endpoint/v3/endpoint.pb.h" +#include "envoy/config/endpoint/v3/endpoint_components.pb.h" +#include "envoy/service/discovery/v3/discovery.pb.h" + +#include "source/extensions/clusters/eds/eds.h" +#include "source/extensions/clusters/static/static_cluster.h" + +#include "test/common/upstream/test_cluster_manager.h" +#include "test/mocks/config/mocks.h" +#include "test/mocks/protobuf/mocks.h" +#include "test/mocks/server/instance.h" +#include "test/test_common/simulated_time_system.h" +#include "test/test_common/utility.h" + +namespace Envoy { +namespace Upstream { +namespace { + +using testing::_; + +using ClusterType = absl::variant; + +void setClusterType(envoy::config::cluster::v3::Cluster& cluster, const ClusterType& cluster_type) { + cluster.clear_cluster_discovery_type(); + if (absl::holds_alternative(cluster_type)) { + cluster.set_type(absl::get(cluster_type)); + } else if (absl::holds_alternative( + cluster_type)) { + cluster.mutable_cluster_type()->CopyFrom( + absl::get(cluster_type)); + } +} + +bool hostsInHostsVector(const Envoy::Upstream::HostVector& host_vector, + std::vector host_ports) { + size_t matches = 0; + std::sort(host_ports.begin(), host_ports.end()); + for (const auto& host : host_vector) { + if (std::binary_search(host_ports.begin(), host_ports.end(), host->address()->ip()->port())) { + ++matches; + } + } + return matches == host_ports.size(); +} + +envoy::config::cluster::v3::Cluster parseClusterFromV3Yaml(const std::string& yaml_config, + const ClusterType& cluster_type) { + auto cluster = parseClusterFromV3Yaml(yaml_config); + setClusterType(cluster, cluster_type); + return cluster; +} + +class DeferredClusterInitializationTest : public testing::TestWithParam { +protected: + DeferredClusterInitializationTest() + : http_context_(factory_.stats_.symbolTable()), grpc_context_(factory_.stats_.symbolTable()), + router_context_(factory_.stats_.symbolTable()) {} + + void create(const envoy::config::bootstrap::v3::Bootstrap& bootstrap) { + cluster_manager_ = std::make_unique( + bootstrap, factory_, factory_.stats_, factory_.tls_, factory_.runtime_, + factory_.local_info_, log_manager_, factory_.dispatcher_, admin_, validation_context_, + *factory_.api_, http_context_, grpc_context_, router_context_, server_); + cluster_manager_->setPrimaryClustersInitializedCb( + [this, bootstrap]() { cluster_manager_->initializeSecondaryClusters(bootstrap); }); + } + + ClusterType getStaticClusterType() const { + if (GetParam()) { + envoy::config::cluster::v3::Cluster::CustomClusterType custom_cluster_type; + custom_cluster_type.set_name("envoy.cluster.static"); + return custom_cluster_type; + } + + return envoy::config::cluster::v3::Cluster::STATIC; + } + + ClusterType getEdsClusterType() const { + if (GetParam()) { + ASSERT(false, "EDS cluster support via CustomClusterType unimplemented."); + envoy::config::cluster::v3::Cluster::CustomClusterType custom_cluster_type; + return custom_cluster_type; + } + + return envoy::config::cluster::v3::Cluster::EDS; + } + + envoy::config::bootstrap::v3::Bootstrap + parseBootstrapFromV3YamlEnableDeferredCluster(const std::string& yaml) { + envoy::config::bootstrap::v3::Bootstrap bootstrap; + TestUtility::loadFromYaml(yaml, bootstrap); + bootstrap.mutable_cluster_manager()->set_enable_deferred_cluster_creation(true); + ClusterType cluster_type = getStaticClusterType(); + for (auto& cluster : *bootstrap.mutable_static_resources()->mutable_clusters()) { + setClusterType(cluster, cluster_type); + } + return bootstrap; + } + + uint64_t readGauge(const std::string& gauge_name) const { + auto gauge_or = factory_.stats_.findGaugeByString(gauge_name); + ASSERT(gauge_or.has_value()); + return gauge_or.value().get().value(); + } + + NiceMock factory_; + NiceMock validation_context_; + std::unique_ptr cluster_manager_; + AccessLog::MockAccessLogManager log_manager_; + NiceMock admin_; + Http::ContextImpl http_context_; + Grpc::ContextImpl grpc_context_; + Router::ContextImpl router_context_; + NiceMock server_; +}; + +class StaticClusterTest : public DeferredClusterInitializationTest {}; + +INSTANTIATE_TEST_SUITE_P(UseCustomClusterType, StaticClusterTest, testing::Bool()); + +// Test that bootstrap static clusters are deferred initialized. +TEST_P(StaticClusterTest, BootstrapStaticClustersAreDeferredInitialized) { + const std::string yaml = R"EOF( + static_resources: + clusters: + - name: cluster_1 + connect_timeout: 0.250s + lb_policy: ROUND_ROBIN + load_assignment: + cluster_name: cluster_1 + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: 11001 + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: 11002 + )EOF"; + + auto bootstrap = parseBootstrapFromV3YamlEnableDeferredCluster(yaml); + + EXPECT_LOG_CONTAINS("debug", "Deferring add or update for TLS cluster cluster_1", + create(bootstrap)); + EXPECT_EQ(readGauge("thread_local_cluster_manager.test_thread.clusters_inflated"), 0); + EXPECT_LOG_CONTAINS("debug", "initializing TLS cluster cluster_1 inline", + cluster_manager_->getThreadLocalCluster("cluster_1")); + EXPECT_EQ(readGauge("thread_local_cluster_manager.test_thread.clusters_inflated"), 1); +} + +// Test that cds cluster are deferred initialized. +TEST_P(StaticClusterTest, CdsStaticClustersAreDeferredInitialized) { + const std::string bootstrap_yaml = R"EOF( + static_resources: + clusters: + - name: bootstrap_cluster + connect_timeout: 0.250s + lb_policy: ROUND_ROBIN + load_assignment: + cluster_name: bootstrap_cluster + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: 60000 + )EOF"; + + auto bootstrap = parseBootstrapFromV3YamlEnableDeferredCluster(bootstrap_yaml); + create(bootstrap); + + EXPECT_EQ(readGauge("thread_local_cluster_manager.test_thread.clusters_inflated"), 0); + const std::string static_cds_cluster_yaml = R"EOF( + name: cluster_1 + connect_timeout: 0.250s + lb_policy: ROUND_ROBIN + load_assignment: + cluster_name: cluster_1 + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: 11001 + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: 11002 + )EOF"; + + EXPECT_LOG_CONTAINS("debug", "Deferring add or update for TLS cluster cluster_1", { + EXPECT_TRUE(cluster_manager_->addOrUpdateCluster( + parseClusterFromV3Yaml(static_cds_cluster_yaml, getStaticClusterType()), "version1")); + }); + EXPECT_EQ(readGauge("thread_local_cluster_manager.test_thread.clusters_inflated"), 0); + EXPECT_LOG_CONTAINS("debug", "initializing TLS cluster cluster_1 inline", + cluster_manager_->getThreadLocalCluster("cluster_1")); + EXPECT_EQ(readGauge("thread_local_cluster_manager.test_thread.clusters_inflated"), 1); +} + +// Test that we can merge deferred cds cluster configuration. +TEST_P(StaticClusterTest, MergeStaticCdsClusterUpdates) { + const std::string bootstrap_yaml = R"EOF( + static_resources: + )EOF"; + + auto bootstrap = parseBootstrapFromV3YamlEnableDeferredCluster(bootstrap_yaml); + create(bootstrap); + + { + const std::string static_cds_cluster_yaml_v1 = R"EOF( + name: cluster_1 + connect_timeout: 0.250s + lb_policy: ROUND_ROBIN + load_assignment: + cluster_name: cluster_1 + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: 60000 + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: 60001 + )EOF"; + + EXPECT_LOG_CONTAINS("debug", "Deferring add or update for TLS cluster cluster_1", { + EXPECT_TRUE(cluster_manager_->addOrUpdateCluster( + parseClusterFromV3Yaml(static_cds_cluster_yaml_v1, getStaticClusterType()), "version1")); + }); + EXPECT_EQ(readGauge("thread_local_cluster_manager.test_thread.clusters_inflated"), 0); + } + + { + const std::string static_cds_cluster_yaml_v2 = R"EOF( + name: cluster_1 + connect_timeout: 0.250s + lb_policy: ROUND_ROBIN + load_assignment: + cluster_name: cluster_1 + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: 11001 + priority: 0 + - lb_endpoints: + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: 11002 + priority: 1 + )EOF"; + + EXPECT_LOG_CONTAINS("debug", "Deferring add or update for TLS cluster cluster_1", { + EXPECT_TRUE(cluster_manager_->addOrUpdateCluster( + parseClusterFromV3Yaml(static_cds_cluster_yaml_v2, getStaticClusterType()), "version2")); + }); + EXPECT_EQ(readGauge("thread_local_cluster_manager.test_thread.clusters_inflated"), 0); + } + + EXPECT_LOG_CONTAINS("debug", "initializing TLS cluster cluster_1 inline", + cluster_manager_->getThreadLocalCluster("cluster_1")); + EXPECT_EQ(readGauge("thread_local_cluster_manager.test_thread.clusters_inflated"), 1); + Cluster& cluster = cluster_manager_->activeClusters().find("cluster_1")->second; + + // Check we only know of the two endpoints from the recent update. + EXPECT_EQ(cluster.info()->endpointStats().membership_total_.value(), 2); + EXPECT_EQ(cluster.prioritySet().crossPriorityHostMap()->size(), 2); + auto& host_sets_vector = cluster.prioritySet().hostSetsPerPriority(); + for (auto& host_set : host_sets_vector) { + EXPECT_EQ(host_set->hosts().size(), 1); + } +} + +// Test that an active deferred cds cluster can get updated after initialization. +TEST_P(StaticClusterTest, ActiveClusterGetsUpdated) { + const std::string bootstrap_yaml = R"EOF( + static_resources: + )EOF"; + + auto bootstrap = parseBootstrapFromV3YamlEnableDeferredCluster(bootstrap_yaml); + create(bootstrap); + + { + const std::string static_cds_cluster_yaml_v1 = R"EOF( + name: cluster_1 + connect_timeout: 0.250s + lb_policy: ROUND_ROBIN + load_assignment: + cluster_name: cluster_1 + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: 60000 + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: 60001 + )EOF"; + + EXPECT_LOG_CONTAINS("debug", "Deferring add or update for TLS cluster cluster_1", { + EXPECT_TRUE(cluster_manager_->addOrUpdateCluster( + parseClusterFromV3Yaml(static_cds_cluster_yaml_v1, getStaticClusterType()), "version1")); + }); + EXPECT_LOG_CONTAINS("debug", "initializing TLS cluster cluster_1 inline", + cluster_manager_->getThreadLocalCluster("cluster_1")); + EXPECT_EQ(readGauge("thread_local_cluster_manager.test_thread.clusters_inflated"), 1); + } + + { + const std::string static_cds_cluster_yaml_v2 = R"EOF( + name: cluster_1 + connect_timeout: 0.250s + lb_policy: ROUND_ROBIN + load_assignment: + cluster_name: cluster_1 + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: 11001 + priority: 0 + - lb_endpoints: + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: 11002 + priority: 1 + )EOF"; + // Expect this line to fail as we should just inflate as usual. + EXPECT_LOG_CONTAINS("debug", "updating TLS cluster cluster_1", { + EXPECT_TRUE(cluster_manager_->addOrUpdateCluster( + parseClusterFromV3Yaml(static_cds_cluster_yaml_v2, getStaticClusterType()), "version2")); + }); + EXPECT_EQ(readGauge("thread_local_cluster_manager.test_thread.clusters_inflated"), 1); + } + + Cluster& cluster = cluster_manager_->activeClusters().find("cluster_1")->second; + + // Check we only know of the two endpoints from the recent update. + EXPECT_EQ(cluster.info()->endpointStats().membership_total_.value(), 2); + EXPECT_EQ(cluster.prioritySet().crossPriorityHostMap()->size(), 2); + auto& host_sets_vector = cluster.prioritySet().hostSetsPerPriority(); + for (auto& host_set : host_sets_vector) { + EXPECT_EQ(host_set->hosts().size(), 1); + } +} + +// Test that removed deferred cds clusters have their cluster initialization object removed. +TEST_P(StaticClusterTest, RemoveDeferredCluster) { + const std::string bootstrap_yaml = R"EOF( + static_resources: + )EOF"; + + auto bootstrap = parseBootstrapFromV3YamlEnableDeferredCluster(bootstrap_yaml); + create(bootstrap); + + const std::string static_cds_cluster_yaml_v1 = R"EOF( + name: cluster_1 + connect_timeout: 0.250s + lb_policy: ROUND_ROBIN + load_assignment: + cluster_name: cluster_1 + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: 60000 + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: 60001 + )EOF"; + + EXPECT_LOG_CONTAINS("debug", "Deferring add or update for TLS cluster cluster_1", { + EXPECT_TRUE(cluster_manager_->addOrUpdateCluster( + parseClusterFromV3Yaml(static_cds_cluster_yaml_v1, getStaticClusterType()), "version1")); + }); + EXPECT_EQ(readGauge("thread_local_cluster_manager.test_thread.clusters_inflated"), 0); + + cluster_manager_->removeCluster("cluster_1"); + EXPECT_EQ(factory_.stats_.counter("cluster_manager.cluster_removed").value(), 1); + EXPECT_EQ(cluster_manager_->getThreadLocalCluster("cluster_1"), nullptr); + EXPECT_EQ(readGauge("thread_local_cluster_manager.test_thread.clusters_inflated"), 0); +} + +class MockConfigSubscriptionFactory : public Config::ConfigSubscriptionFactory { +public: + std::string name() const override { return "envoy.config_subscription.rest"; } + MOCK_METHOD(Config::SubscriptionPtr, create, (SubscriptionData & data), (override)); +}; + +class EdsTest : public DeferredClusterInitializationTest { +protected: + void doOnConfigUpdateVerifyNoThrow( + const envoy::config::endpoint::v3::ClusterLoadAssignment& cluster_load_assignment) { + const auto decoded_resources = + TestUtility::decodeResources({cluster_load_assignment}, "cluster_name"); + VERBOSE_EXPECT_NO_THROW(callbacks_->onConfigUpdate(decoded_resources.refvec_, {}, "")); + } + + void addEndpoint(envoy::config::endpoint::v3::ClusterLoadAssignment& cluster_load_assignment, + uint32_t port, uint32_t priority = 0) { + auto* endpoints = cluster_load_assignment.add_endpoints(); + endpoints->set_priority(priority); + auto* socket_address = endpoints->add_lb_endpoints() + ->mutable_endpoint() + ->mutable_address() + ->mutable_socket_address(); + socket_address->set_address("1.2.3.4"); + socket_address->set_port_value(port); + } + + NiceMock factory_; + Registry::InjectFactory registered_{factory_}; + Config::SubscriptionCallbacks* callbacks_{nullptr}; +}; + +// TODO(kbaichoo): when Eds Cluster supports getting its config via +// custom_cluster_type then we can enable these tests to run with that config as +// well. +INSTANTIATE_TEST_SUITE_P(UseCustomClusterType, EdsTest, testing::ValuesIn({false})); + +// Test that hosts can be added to deferred initialized eds cluster. +TEST_P(EdsTest, ShouldMergeAddingHosts) { + const std::string bootstrap_yaml = R"EOF( + static_resources: + clusters: + - name: eds + connect_timeout: 0.250s + lb_policy: ROUND_ROBIN + load_assignment: + cluster_name: bootstrap_cluster + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: 60000 + )EOF"; + + auto bootstrap = parseBootstrapFromV3YamlEnableDeferredCluster(bootstrap_yaml); + create(bootstrap); + + const std::string eds_cluster_yaml = R"EOF( + name: cluster_1 + connect_timeout: 0.25s + lb_policy: ROUND_ROBIN + eds_cluster_config: + service_name: fare + eds_config: + api_config_source: + api_type: REST + transport_api_version: V3 + cluster_names: + - eds + refresh_delay: 1s + )EOF"; + + EXPECT_CALL(factory_, create(_)) + .WillOnce(testing::Invoke([this](Config::ConfigSubscriptionFactory::SubscriptionData& data) { + callbacks_ = &data.callbacks_; + return std::make_unique>(); + })); + + EXPECT_EQ(readGauge("thread_local_cluster_manager.test_thread.clusters_inflated"), 0); + EXPECT_TRUE(cluster_manager_->addOrUpdateCluster( + parseClusterFromV3Yaml(eds_cluster_yaml, getEdsClusterType()), "version1")); + + envoy::config::endpoint::v3::ClusterLoadAssignment cluster_load_assignment; + cluster_load_assignment.set_cluster_name("fare"); + for (int i = 1; i < 11; ++i) { + addEndpoint(cluster_load_assignment, 1000 * i); + const auto decoded_resources = + TestUtility::decodeResources({cluster_load_assignment}, "cluster_name"); + doOnConfigUpdateVerifyNoThrow(cluster_load_assignment); + } + + EXPECT_LOG_CONTAINS("debug", "initializing TLS cluster cluster_1 inline", + cluster_manager_->getThreadLocalCluster("cluster_1")); + EXPECT_EQ(readGauge("thread_local_cluster_manager.test_thread.clusters_inflated"), 1); + Cluster& cluster = cluster_manager_->activeClusters().find("cluster_1")->second; + EXPECT_EQ(cluster.info()->endpointStats().membership_total_.value(), 10); + EXPECT_TRUE(hostsInHostsVector(cluster.prioritySet().hostSetsPerPriority()[0]->hosts(), + {1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000})); +} + +// Test that removed hosts do not appear when initializing a deferred eds cluster. +TEST_P(EdsTest, ShouldNotHaveRemovedHosts) { + const std::string bootstrap_yaml = R"EOF( + static_resources: + clusters: + - name: eds + connect_timeout: 0.250s + lb_policy: ROUND_ROBIN + load_assignment: + cluster_name: bootstrap_cluster + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: 60000 + )EOF"; + + auto bootstrap = parseBootstrapFromV3YamlEnableDeferredCluster(bootstrap_yaml); + create(bootstrap); + EXPECT_EQ(readGauge("thread_local_cluster_manager.test_thread.clusters_inflated"), 0); + + const std::string eds_cluster_yaml = R"EOF( + name: cluster_1 + connect_timeout: 0.25s + lb_policy: ROUND_ROBIN + eds_cluster_config: + service_name: fare + eds_config: + api_config_source: + api_type: REST + transport_api_version: V3 + cluster_names: + - eds + refresh_delay: 1s + )EOF"; + + EXPECT_CALL(factory_, create(_)) + .WillOnce(testing::Invoke([this](Config::ConfigSubscriptionFactory::SubscriptionData& data) { + callbacks_ = &data.callbacks_; + return std::make_unique>(); + })); + + EXPECT_TRUE(cluster_manager_->addOrUpdateCluster( + parseClusterFromV3Yaml(eds_cluster_yaml, getEdsClusterType()), "version1")); + + // ClusterLoadAssignment should contain all hosts to be kept for the + // cluster. If a host is not in a subsequent update it is removed. + envoy::config::endpoint::v3::ClusterLoadAssignment cluster_load_assignment; + cluster_load_assignment.set_cluster_name("fare"); + addEndpoint(cluster_load_assignment, 1000); + addEndpoint(cluster_load_assignment, 2000); + addEndpoint(cluster_load_assignment, 3000); + addEndpoint(cluster_load_assignment, 4000); + auto decoded_resources = TestUtility::decodeResources({cluster_load_assignment}, "cluster_name"); + doOnConfigUpdateVerifyNoThrow(cluster_load_assignment); + + cluster_load_assignment.clear_endpoints(); + addEndpoint(cluster_load_assignment, 1000); + addEndpoint(cluster_load_assignment, 2000); + decoded_resources = TestUtility::decodeResources({cluster_load_assignment}, "cluster_name"); + doOnConfigUpdateVerifyNoThrow(cluster_load_assignment); + + EXPECT_LOG_CONTAINS("debug", "initializing TLS cluster cluster_1 inline", + cluster_manager_->getThreadLocalCluster("cluster_1")); + EXPECT_EQ(readGauge("thread_local_cluster_manager.test_thread.clusters_inflated"), 1); + Cluster& cluster = cluster_manager_->activeClusters().find("cluster_1")->second; + EXPECT_EQ(cluster.info()->endpointStats().membership_total_.value(), 2); + EXPECT_TRUE( + hostsInHostsVector(cluster.prioritySet().hostSetsPerPriority()[0]->hosts(), {1000, 2000})); +} + +// Test that removed hosts that were added again appear when initializing a deferred eds cluster. +TEST_P(EdsTest, ShouldHaveHostThatWasAddedAfterRemoval) { + const std::string bootstrap_yaml = R"EOF( + static_resources: + clusters: + - name: eds + connect_timeout: 0.250s + lb_policy: ROUND_ROBIN + load_assignment: + cluster_name: bootstrap_cluster + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: 60000 + )EOF"; + + auto bootstrap = parseBootstrapFromV3YamlEnableDeferredCluster(bootstrap_yaml); + create(bootstrap); + EXPECT_EQ(readGauge("thread_local_cluster_manager.test_thread.clusters_inflated"), 0); + + const std::string eds_cluster_yaml = R"EOF( + name: cluster_1 + connect_timeout: 0.25s + lb_policy: ROUND_ROBIN + eds_cluster_config: + service_name: fare + eds_config: + api_config_source: + api_type: REST + transport_api_version: V3 + cluster_names: + - eds + refresh_delay: 1s + )EOF"; + + EXPECT_CALL(factory_, create(_)) + .WillOnce(testing::Invoke([this](Config::ConfigSubscriptionFactory::SubscriptionData& data) { + callbacks_ = &data.callbacks_; + return std::make_unique>(); + })); + + EXPECT_TRUE(cluster_manager_->addOrUpdateCluster( + parseClusterFromV3Yaml(eds_cluster_yaml, getEdsClusterType()), "version1")); + + // ClusterLoadAssignment should contain all hosts to be kept for the + // cluster. If a host is not in a subsequent update it is removed. + envoy::config::endpoint::v3::ClusterLoadAssignment cluster_load_assignment; + cluster_load_assignment.set_cluster_name("fare"); + addEndpoint(cluster_load_assignment, 1000); + addEndpoint(cluster_load_assignment, 2000); + addEndpoint(cluster_load_assignment, 3000); + addEndpoint(cluster_load_assignment, 4000); + auto decoded_resources = TestUtility::decodeResources({cluster_load_assignment}, "cluster_name"); + doOnConfigUpdateVerifyNoThrow(cluster_load_assignment); + + cluster_load_assignment.clear_endpoints(); + addEndpoint(cluster_load_assignment, 1000); + addEndpoint(cluster_load_assignment, 2000); + decoded_resources = TestUtility::decodeResources({cluster_load_assignment}, "cluster_name"); + doOnConfigUpdateVerifyNoThrow(cluster_load_assignment); + + addEndpoint(cluster_load_assignment, 3000); + decoded_resources = TestUtility::decodeResources({cluster_load_assignment}, "cluster_name"); + doOnConfigUpdateVerifyNoThrow(cluster_load_assignment); + + EXPECT_LOG_CONTAINS("debug", "initializing TLS cluster cluster_1 inline", + cluster_manager_->getThreadLocalCluster("cluster_1")); + EXPECT_EQ(readGauge("thread_local_cluster_manager.test_thread.clusters_inflated"), 1); + Cluster& cluster = cluster_manager_->activeClusters().find("cluster_1")->second; + EXPECT_EQ(cluster.info()->endpointStats().membership_total_.value(), 3); + EXPECT_TRUE(hostsInHostsVector(cluster.prioritySet().hostSetsPerPriority()[0]->hosts(), + {1000, 2000, 3000})); +} + +// Test merging multiple priorities for a deferred eds cluster. +TEST_P(EdsTest, MultiplePrioritiesShouldMergeCorrectly) { + const std::string bootstrap_yaml = R"EOF( + static_resources: + clusters: + - name: eds + connect_timeout: 0.250s + lb_policy: ROUND_ROBIN + load_assignment: + cluster_name: bootstrap_cluster + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: 60000 + )EOF"; + + auto bootstrap = parseBootstrapFromV3YamlEnableDeferredCluster(bootstrap_yaml); + create(bootstrap); + EXPECT_EQ(readGauge("thread_local_cluster_manager.test_thread.clusters_inflated"), 0); + + const std::string eds_cluster_yaml = R"EOF( + name: cluster_1 + connect_timeout: 0.25s + lb_policy: ROUND_ROBIN + eds_cluster_config: + service_name: fare + eds_config: + api_config_source: + api_type: REST + transport_api_version: V3 + cluster_names: + - eds + refresh_delay: 1s + )EOF"; + + EXPECT_CALL(factory_, create(_)) + .WillOnce(testing::Invoke([this](Config::ConfigSubscriptionFactory::SubscriptionData& data) { + callbacks_ = &data.callbacks_; + return std::make_unique>(); + })); + + EXPECT_TRUE(cluster_manager_->addOrUpdateCluster( + parseClusterFromV3Yaml(eds_cluster_yaml, getEdsClusterType()), "version1")); + + envoy::config::endpoint::v3::ClusterLoadAssignment cluster_load_assignment; + cluster_load_assignment.set_cluster_name("fare"); + addEndpoint(cluster_load_assignment, 1000); + addEndpoint(cluster_load_assignment, 2000); + addEndpoint(cluster_load_assignment, 3000, 2); + auto decoded_resources = TestUtility::decodeResources({cluster_load_assignment}, "cluster_name"); + doOnConfigUpdateVerifyNoThrow(cluster_load_assignment); + + cluster_load_assignment.clear_endpoints(); + addEndpoint(cluster_load_assignment, 1000); + addEndpoint(cluster_load_assignment, 4000); + addEndpoint(cluster_load_assignment, 5000, 1); + decoded_resources = TestUtility::decodeResources({cluster_load_assignment}, "cluster_name"); + doOnConfigUpdateVerifyNoThrow(cluster_load_assignment); + + EXPECT_LOG_CONTAINS("debug", "initializing TLS cluster cluster_1 inline", + cluster_manager_->getThreadLocalCluster("cluster_1")); + EXPECT_EQ(readGauge("thread_local_cluster_manager.test_thread.clusters_inflated"), 1); + Cluster& cluster = cluster_manager_->activeClusters().find("cluster_1")->second; + EXPECT_EQ(cluster.prioritySet().hostSetsPerPriority().size(), 3); + EXPECT_TRUE( + hostsInHostsVector(cluster.prioritySet().hostSetsPerPriority()[0]->hosts(), {1000, 4000})); + EXPECT_TRUE(hostsInHostsVector(cluster.prioritySet().hostSetsPerPriority()[1]->hosts(), {5000})); + EXPECT_TRUE(cluster.prioritySet().hostSetsPerPriority()[2]->hosts().empty()); +} + +// Test updating an initialized deferred eds cluster. +TEST_P(EdsTest, ActiveClusterGetsUpdated) { + const std::string bootstrap_yaml = R"EOF( + static_resources: + clusters: + - name: eds + connect_timeout: 0.250s + lb_policy: ROUND_ROBIN + load_assignment: + cluster_name: bootstrap_cluster + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: 60000 + )EOF"; + + auto bootstrap = parseBootstrapFromV3YamlEnableDeferredCluster(bootstrap_yaml); + create(bootstrap); + EXPECT_EQ(readGauge("thread_local_cluster_manager.test_thread.clusters_inflated"), 0); + + const std::string eds_cluster_yaml = R"EOF( + name: cluster_1 + connect_timeout: 0.25s + lb_policy: ROUND_ROBIN + eds_cluster_config: + service_name: fare + eds_config: + api_config_source: + api_type: REST + transport_api_version: V3 + cluster_names: + - eds + refresh_delay: 1s + )EOF"; + + EXPECT_CALL(factory_, create(_)) + .WillOnce(testing::Invoke([this](Config::ConfigSubscriptionFactory::SubscriptionData& data) { + callbacks_ = &data.callbacks_; + return std::make_unique>(); + })); + + EXPECT_TRUE(cluster_manager_->addOrUpdateCluster( + parseClusterFromV3Yaml(eds_cluster_yaml, getEdsClusterType()), "version1")); + + envoy::config::endpoint::v3::ClusterLoadAssignment cluster_load_assignment; + cluster_load_assignment.set_cluster_name("fare"); + addEndpoint(cluster_load_assignment, 1000); + addEndpoint(cluster_load_assignment, 2000); + addEndpoint(cluster_load_assignment, 3000); + auto decoded_resources = TestUtility::decodeResources({cluster_load_assignment}, "cluster_name"); + doOnConfigUpdateVerifyNoThrow(cluster_load_assignment); + + EXPECT_LOG_CONTAINS("debug", "initializing TLS cluster cluster_1 inline", + cluster_manager_->getThreadLocalCluster("cluster_1")); + EXPECT_EQ(readGauge("thread_local_cluster_manager.test_thread.clusters_inflated"), 1); + Cluster& cluster = cluster_manager_->activeClusters().find("cluster_1")->second; + EXPECT_TRUE( + hostsInHostsVector(cluster.prioritySet().hostSetsPerPriority()[0]->hosts(), {1000, 2000})); + + cluster_load_assignment.clear_endpoints(); + addEndpoint(cluster_load_assignment, 1000); + addEndpoint(cluster_load_assignment, 4000); + decoded_resources = TestUtility::decodeResources({cluster_load_assignment}, "cluster_name"); + doOnConfigUpdateVerifyNoThrow(cluster_load_assignment); + EXPECT_TRUE( + hostsInHostsVector(cluster.prioritySet().hostSetsPerPriority()[0]->hosts(), {1000, 4000})); +} + +} // namespace +} // namespace Upstream +} // namespace Envoy diff --git a/test/extensions/clusters/aggregate/cluster_integration_test.cc b/test/extensions/clusters/aggregate/cluster_integration_test.cc index aad8f587b5a3..1ada724a520b 100644 --- a/test/extensions/clusters/aggregate/cluster_integration_test.cc +++ b/test/extensions/clusters/aggregate/cluster_integration_test.cc @@ -122,10 +122,13 @@ const std::string& config() { Platform::null_device_path)); } -class AggregateIntegrationTest : public testing::TestWithParam, - public HttpIntegrationTest { +class AggregateIntegrationTest + : public testing::TestWithParam>, + public HttpIntegrationTest { public: - AggregateIntegrationTest() : HttpIntegrationTest(Http::CodecType::HTTP1, GetParam(), config()) { + AggregateIntegrationTest() + : HttpIntegrationTest(Http::CodecType::HTTP1, std::get<0>(GetParam()), config()), + deferred_cluster_creation_(std::get<1>(GetParam())) { use_lds_ = false; } @@ -137,16 +140,20 @@ class AggregateIntegrationTest : public testing::TestWithParamset_enable_deferred_cluster_creation( + deferred_cluster_creation_); + }); HttpIntegrationTest::initialize(); addFakeUpstream(Http::CodecType::HTTP2); addFakeUpstream(Http::CodecType::HTTP2); cluster1_ = ConfigHelper::buildStaticCluster( FirstClusterName, fake_upstreams_[FirstUpstreamIndex]->localAddress()->ip()->port(), - Network::Test::getLoopbackAddressString(GetParam())); + Network::Test::getLoopbackAddressString(version_)); cluster2_ = ConfigHelper::buildStaticCluster( SecondClusterName, fake_upstreams_[SecondUpstreamIndex]->localAddress()->ip()->port(), - Network::Test::getLoopbackAddressString(GetParam())); + Network::Test::getLoopbackAddressString(version_)); // Let Envoy establish its connection to the CDS server. acceptXdsConnection(); @@ -173,12 +180,14 @@ class AggregateIntegrationTest : public testing::TestWithParamstartGrpcStream(); } + const bool deferred_cluster_creation_; envoy::config::cluster::v3::Cluster cluster1_; envoy::config::cluster::v3::Cluster cluster2_; }; -INSTANTIATE_TEST_SUITE_P(IpVersions, AggregateIntegrationTest, - testing::ValuesIn(TestEnvironment::getIpVersionsForTest())); +INSTANTIATE_TEST_SUITE_P( + IpVersions, AggregateIntegrationTest, + testing::Combine(testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), testing::Bool())); TEST_P(AggregateIntegrationTest, ClusterUpDownUp) { // Calls our initialize(), which includes establishing a listener, route, and cluster. diff --git a/test/extensions/clusters/aggregate/cluster_update_test.cc b/test/extensions/clusters/aggregate/cluster_update_test.cc index f82aa0965eb4..08a1d0d47af9 100644 --- a/test/extensions/clusters/aggregate/cluster_update_test.cc +++ b/test/extensions/clusters/aggregate/cluster_update_test.cc @@ -31,7 +31,8 @@ envoy::config::bootstrap::v3::Bootstrap parseBootstrapFromV2Yaml(const std::stri return bootstrap; } -class AggregateClusterUpdateTest : public Event::TestUsingSimulatedTime, public testing::Test { +class AggregateClusterUpdateTest : public Event::TestUsingSimulatedTime, + public testing::TestWithParam { public: AggregateClusterUpdateTest() : http_context_(stats_store_.symbolTable()), grpc_context_(stats_store_.symbolTable()), @@ -39,6 +40,8 @@ class AggregateClusterUpdateTest : public Event::TestUsingSimulatedTime, public void initialize(const std::string& yaml_config) { auto bootstrap = parseBootstrapFromV2Yaml(yaml_config); + const bool use_deferred_cluster = GetParam(); + bootstrap.mutable_cluster_manager()->set_enable_deferred_cluster_creation(use_deferred_cluster); cluster_manager_ = std::make_unique( bootstrap, factory_, factory_.stats_, factory_.tls_, factory_.runtime_, factory_.local_info_, log_manager_, factory_.dispatcher_, admin_, validation_context_, @@ -77,12 +80,14 @@ class AggregateClusterUpdateTest : public Event::TestUsingSimulatedTime, public )EOF"; }; -TEST_F(AggregateClusterUpdateTest, NoHealthyUpstream) { +INSTANTIATE_TEST_SUITE_P(DeferredClusters, AggregateClusterUpdateTest, testing::Bool()); + +TEST_P(AggregateClusterUpdateTest, NoHealthyUpstream) { initialize(default_yaml_config_); EXPECT_EQ(nullptr, cluster_->loadBalancer().chooseHost(nullptr)); } -TEST_F(AggregateClusterUpdateTest, BasicFlow) { +TEST_P(AggregateClusterUpdateTest, BasicFlow) { initialize(default_yaml_config_); std::unique_ptr callbacks( @@ -132,7 +137,7 @@ TEST_F(AggregateClusterUpdateTest, BasicFlow) { EXPECT_EQ("127.0.0.1:11001", host->address()->asString()); } -TEST_F(AggregateClusterUpdateTest, LoadBalancingTest) { +TEST_P(AggregateClusterUpdateTest, LoadBalancingTest) { initialize(default_yaml_config_); EXPECT_TRUE(cluster_manager_->addOrUpdateCluster(Upstream::defaultStaticCluster("primary"), "")); auto primary = cluster_manager_->getThreadLocalCluster("primary"); @@ -242,7 +247,7 @@ TEST_F(AggregateClusterUpdateTest, LoadBalancingTest) { } } -TEST_F(AggregateClusterUpdateTest, InitializeAggregateClusterAfterOtherClusters) { +TEST_P(AggregateClusterUpdateTest, InitializeAggregateClusterAfterOtherClusters) { const std::string config = R"EOF( static_resources: clusters: diff --git a/test/extensions/filters/network/redis_proxy/conn_pool_impl_test.cc b/test/extensions/filters/network/redis_proxy/conn_pool_impl_test.cc index c9ce02d13dc4..fc78353c5964 100644 --- a/test/extensions/filters/network/redis_proxy/conn_pool_impl_test.cc +++ b/test/extensions/filters/network/redis_proxy/conn_pool_impl_test.cc @@ -591,7 +591,12 @@ TEST_F(RedisConnPoolImplTest, NoClusterAtConstruction) { EXPECT_EQ(nullptr, request); // Now add the cluster. Request to the cluster should succeed. - update_callbacks_->onClusterAddOrUpdate(cm_.thread_local_cluster_); + { + Upstream::ThreadLocalClusterCommand command = [this]() -> Upstream::ThreadLocalCluster& { + return cm_.thread_local_cluster_; + }; + update_callbacks_->onClusterAddOrUpdate(cm_.thread_local_cluster_.info()->name(), command); + } // MurmurHash of "foo" is 9631199822919835226U makeSimpleRequest(true, "foo", 9631199822919835226U); @@ -604,10 +609,20 @@ TEST_F(RedisConnPoolImplTest, NoClusterAtConstruction) { // Add a cluster we don't care about. NiceMock cluster2; cluster2.cluster_.info_->name_ = "cluster2"; - update_callbacks_->onClusterAddOrUpdate(cluster2); + { + Upstream::ThreadLocalClusterCommand command = [&cluster2]() -> Upstream::ThreadLocalCluster& { + return cluster2; + }; + update_callbacks_->onClusterAddOrUpdate(cluster2.cluster_.info()->name(), command); + } // Add the cluster back. Request to the cluster should succeed. - update_callbacks_->onClusterAddOrUpdate(cm_.thread_local_cluster_); + { + Upstream::ThreadLocalClusterCommand command = [this]() -> Upstream::ThreadLocalCluster& { + return cm_.thread_local_cluster_; + }; + update_callbacks_->onClusterAddOrUpdate(cm_.thread_local_cluster_.info()->name(), command); + } // MurmurHash of "foo" is 9631199822919835226U makeSimpleRequest(true, "foo", 9631199822919835226U); @@ -619,7 +634,12 @@ TEST_F(RedisConnPoolImplTest, NoClusterAtConstruction) { // Update the cluster. This should count as a remove followed by an add. Request to the cluster // should succeed. EXPECT_CALL(*client_, close()); - update_callbacks_->onClusterAddOrUpdate(cm_.thread_local_cluster_); + { + Upstream::ThreadLocalClusterCommand command = [this]() -> Upstream::ThreadLocalCluster& { + return cm_.thread_local_cluster_; + }; + update_callbacks_->onClusterAddOrUpdate(cm_.thread_local_cluster_.info()->name(), command); + } // MurmurHash of "foo" is 9631199822919835226U makeSimpleRequest(true, "foo", 9631199822919835226U); @@ -654,7 +674,12 @@ TEST_F(RedisConnPoolImplTest, AuthInfoUpdate) { auth_password_ = ""; // Now add the cluster. Request to the cluster should succeed. - update_callbacks_->onClusterAddOrUpdate(cm_.thread_local_cluster_); + { + Upstream::ThreadLocalClusterCommand command = [this]() -> Upstream::ThreadLocalCluster& { + return cm_.thread_local_cluster_; + }; + update_callbacks_->onClusterAddOrUpdate(cm_.thread_local_cluster_.info()->name(), command); + } // MurmurHash of "foo" is 9631199822919835226U makeSimpleRequest(true, "foo", 9631199822919835226U); @@ -800,7 +825,12 @@ TEST_F(RedisConnPoolImplTest, MakeRequestToHost) { // There is no cluster yet, so makeRequestToHost() should fail. EXPECT_EQ(nullptr, conn_pool_->makeRequestToHost("10.0.0.1:3000", value, callbacks1)); // Add the cluster now. - update_callbacks_->onClusterAddOrUpdate(cm_.thread_local_cluster_); + { + Upstream::ThreadLocalClusterCommand command = [this]() -> Upstream::ThreadLocalCluster& { + return cm_.thread_local_cluster_; + }; + update_callbacks_->onClusterAddOrUpdate(cm_.thread_local_cluster_.info()->name(), command); + } EXPECT_CALL(*this, create_(_)).WillOnce(DoAll(SaveArg<0>(&host1), Return(client1))); EXPECT_CALL(*client1, makeRequest_(Ref(value), Ref(callbacks1))) diff --git a/test/extensions/filters/udp/udp_proxy/udp_proxy_filter_test.cc b/test/extensions/filters/udp/udp_proxy/udp_proxy_filter_test.cc index 22876780059d..d14f5ec93f7b 100644 --- a/test/extensions/filters/udp/udp_proxy/udp_proxy_filter_test.cc +++ b/test/extensions/filters/udp/udp_proxy/udp_proxy_filter_test.cc @@ -748,15 +748,27 @@ stat_prefix: foo // Add a cluster that we don't care about. NiceMock other_thread_local_cluster; other_thread_local_cluster.cluster_.info_->name_ = "other_cluster"; - cluster_update_callbacks_->onClusterAddOrUpdate(other_thread_local_cluster); + { + Upstream::ThreadLocalClusterCommand command = + [&other_thread_local_cluster]() -> Upstream::ThreadLocalCluster& { + return other_thread_local_cluster; + }; + cluster_update_callbacks_->onClusterAddOrUpdate(other_thread_local_cluster.info()->name(), + command); + } recvDataFromDownstream("10.0.0.1:1000", "10.0.0.2:80", "hello"); EXPECT_EQ(2, config_->stats().downstream_sess_no_route_.value()); EXPECT_EQ(0, config_->stats().downstream_sess_total_.value()); EXPECT_EQ(0, config_->stats().downstream_sess_active_.value()); // Now add the cluster we care about. - cluster_update_callbacks_->onClusterAddOrUpdate( - factory_context_.cluster_manager_.thread_local_cluster_); + { + Upstream::ThreadLocalClusterCommand command = [this]() -> Upstream::ThreadLocalCluster& { + return factory_context_.cluster_manager_.thread_local_cluster_; + }; + cluster_update_callbacks_->onClusterAddOrUpdate( + factory_context_.cluster_manager_.thread_local_cluster_.info()->name(), command); + } expectSessionCreate(upstream_address_); test_sessions_[0].expectWriteToUpstream("hello", 0, nullptr, true); recvDataFromDownstream("10.0.0.1:1000", "10.0.0.2:80", "hello"); diff --git a/test/extensions/tracers/datadog/agent_http_client_test.cc b/test/extensions/tracers/datadog/agent_http_client_test.cc index f4d59eb8b0dc..98e60c971578 100644 --- a/test/extensions/tracers/datadog/agent_http_client_test.cc +++ b/test/extensions/tracers/datadog/agent_http_client_test.cc @@ -499,8 +499,10 @@ TEST_F(DatadogAgentHttpClientTest, SkipReportIfCollectorClusterHasBeenRemoved) { // Simulate addition of an irrelevant cluster. NiceMock unrelated_cluster; unrelated_cluster.cluster_.info_->name_ = "unrelated_cluster"; - cluster_update_callbacks->onClusterAddOrUpdate(unrelated_cluster); - + Upstream::ThreadLocalClusterCommand command = + [&unrelated_cluster]() -> Upstream::ThreadLocalCluster& { return unrelated_cluster; }; + cluster_update_callbacks->onClusterAddOrUpdate(unrelated_cluster.cluster_.info_->name_, + command); // Verify that no report will be sent. EXPECT_CALL(cm.thread_local_cluster_, httpAsyncClient()).Times(0); EXPECT_CALL(cm.thread_local_cluster_.async_client_, send_(_, _, _)).Times(0); @@ -519,7 +521,11 @@ TEST_F(DatadogAgentHttpClientTest, SkipReportIfCollectorClusterHasBeenRemoved) { { // Simulate addition of the relevant cluster. - cluster_update_callbacks->onClusterAddOrUpdate(cm.thread_local_cluster_); + Upstream::ThreadLocalClusterCommand command = [&cm]() -> Upstream::ThreadLocalCluster& { + return cm.thread_local_cluster_; + }; + cluster_update_callbacks->onClusterAddOrUpdate(cm.thread_local_cluster_.info()->name(), + command); // Verify that report will be sent. EXPECT_CALL(cm.thread_local_cluster_, httpAsyncClient()) diff --git a/test/extensions/tracers/zipkin/zipkin_tracer_impl_test.cc b/test/extensions/tracers/zipkin/zipkin_tracer_impl_test.cc index 6dd8a6b1b093..b77dd99abb5d 100644 --- a/test/extensions/tracers/zipkin/zipkin_tracer_impl_test.cc +++ b/test/extensions/tracers/zipkin/zipkin_tracer_impl_test.cc @@ -316,7 +316,10 @@ TEST_F(ZipkinDriverTest, SkipReportIfCollectorClusterHasBeenRemoved) { // Simulate addition of an irrelevant cluster. NiceMock unrelated_cluster; unrelated_cluster.cluster_.info_->name_ = "unrelated_cluster"; - cluster_update_callbacks->onClusterAddOrUpdate(unrelated_cluster); + Upstream::ThreadLocalClusterCommand command = + [&unrelated_cluster]() -> Upstream::ThreadLocalCluster& { return unrelated_cluster; }; + cluster_update_callbacks->onClusterAddOrUpdate(unrelated_cluster.cluster_.info_->name_, + command); // Verify that no report will be sent. EXPECT_CALL(cm_.thread_local_cluster_, httpAsyncClient()).Times(0); @@ -338,7 +341,11 @@ TEST_F(ZipkinDriverTest, SkipReportIfCollectorClusterHasBeenRemoved) { { // Simulate addition of the relevant cluster. - cluster_update_callbacks->onClusterAddOrUpdate(cm_.thread_local_cluster_); + Upstream::ThreadLocalClusterCommand command = [this]() -> Upstream::ThreadLocalCluster& { + return cm_.thread_local_cluster_; + }; + cluster_update_callbacks->onClusterAddOrUpdate(cm_.thread_local_cluster_.info()->name(), + command); // Verify that report will be sent. EXPECT_CALL(cm_.thread_local_cluster_, httpAsyncClient()) diff --git a/test/integration/cds_integration_test.cc b/test/integration/cds_integration_test.cc index 3da6ca2b3c08..1956b21e39c5 100644 --- a/test/integration/cds_integration_test.cc +++ b/test/integration/cds_integration_test.cc @@ -29,7 +29,8 @@ const char ClusterName2[] = "cluster_2"; const int UpstreamIndex1 = 1; const int UpstreamIndex2 = 2; -class CdsIntegrationTest : public Grpc::DeltaSotwIntegrationParamTest, public HttpIntegrationTest { +class CdsIntegrationTest : public Grpc::DeltaSotwDeferredClustersIntegrationParamTest, + public HttpIntegrationTest { public: CdsIntegrationTest() : HttpIntegrationTest(Http::CodecType::HTTP2, ipVersion(), @@ -39,6 +40,11 @@ class CdsIntegrationTest : public Grpc::DeltaSotwIntegrationParamTest, public Ht ? "GRPC" : "DELTA_GRPC")), cluster_creator_(&ConfigHelper::buildStaticCluster) { + + config_helper_.addConfigModifier([this](envoy::config::bootstrap::v3::Bootstrap& bootstrap) { + bootstrap.mutable_cluster_manager()->set_enable_deferred_cluster_creation( + useDeferredCluster()); + }); config_helper_.addRuntimeOverride("envoy.reloadable_features.unified_mux", (sotwOrDelta() == Grpc::SotwOrDelta::UnifiedSotw || sotwOrDelta() == Grpc::SotwOrDelta::UnifiedDelta) @@ -151,8 +157,8 @@ class CdsIntegrationTest : public Grpc::DeltaSotwIntegrationParamTest, public Ht cluster_creator_; }; -INSTANTIATE_TEST_SUITE_P(IpVersionsClientTypeDelta, CdsIntegrationTest, - DELTA_SOTW_GRPC_CLIENT_INTEGRATION_PARAMS); +INSTANTIATE_TEST_SUITE_P(IpVersionsClientTypeDeltaDeferredCluster, CdsIntegrationTest, + DELTA_SOTW_GRPC_CLIENT_DEFERRED_CLUSTERS_INTEGRATION_PARAMS); // 1) Envoy starts up with no static clusters (other than the CDS-over-gRPC server). // 2) Envoy is told of a cluster via CDS. @@ -164,9 +170,24 @@ INSTANTIATE_TEST_SUITE_P(IpVersionsClientTypeDelta, CdsIntegrationTest, TEST_P(CdsIntegrationTest, CdsClusterUpDownUp) { // Calls our initialize(), which includes establishing a listener, route, and cluster. config_helper_.addConfigModifier(configureProxyStatus()); + initialize(); + + if (useDeferredCluster()) { + test_server_->waitForGaugeEq("thread_local_cluster_manager.worker_0.clusters_inflated", 0); + } else { + test_server_->waitForGaugeEq("thread_local_cluster_manager.worker_0.clusters_inflated", 2); + } + testRouterHeaderOnlyRequestAndResponse(nullptr, UpstreamIndex1, "/cluster1"); test_server_->waitForCounterGe("cluster_manager.cluster_added", 1); + if (useDeferredCluster()) { + test_server_->waitForGaugeEq("thread_local_cluster_manager.worker_0.clusters_inflated", 1); + } else { + EXPECT_EQ( + test_server_->gauge("thread_local_cluster_manager.worker_0.clusters_inflated")->value(), 2); + } + // Tell Envoy that cluster_1 is gone. EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Cluster, "55", {}, {}, {})); sendClusterDiscoveryResponse({}, {}, {ClusterName1}, "42"); @@ -174,6 +195,12 @@ TEST_P(CdsIntegrationTest, CdsClusterUpDownUp) { // the DiscoveryResponse that says cluster_1 is gone. test_server_->waitForCounterGe("cluster_manager.cluster_removed", 1); + if (useDeferredCluster()) { + test_server_->waitForGaugeEq("thread_local_cluster_manager.worker_0.clusters_inflated", 0); + } else { + test_server_->waitForGaugeEq("thread_local_cluster_manager.worker_0.clusters_inflated", 1); + } + // Now that cluster_1 is gone, the listener (with its routing to cluster_1) should 503. BufferingStreamDecoderPtr response = IntegrationUtil::makeSingleRequest( lookupPort("http"), "GET", "/cluster1", "", downstream_protocol_, version_, "foo.com"); @@ -192,9 +219,18 @@ TEST_P(CdsIntegrationTest, CdsClusterUpDownUp) { // We can continue the test once we're sure that Envoy's ClusterManager has made use of // the DiscoveryResponse describing cluster_1 that we sent. Again, 2 includes CDS server. test_server_->waitForGaugeGe("cluster_manager.active_clusters", 2); + if (useDeferredCluster()) { + EXPECT_EQ( + test_server_->gauge("thread_local_cluster_manager.worker_0.clusters_inflated")->value(), 0); + } else { + test_server_->waitForGaugeEq("thread_local_cluster_manager.worker_0.clusters_inflated", 2); + } // Does *not* call our initialize(). testRouterHeaderOnlyRequestAndResponse(nullptr, UpstreamIndex1, "/cluster1"); + if (useDeferredCluster()) { + test_server_->waitForGaugeEq("thread_local_cluster_manager.worker_0.clusters_inflated", 1); + } cleanupUpstreamAndDownstream(); } @@ -268,7 +304,7 @@ class DeferredCreationClusterStatsTest : public CdsIntegrationTest { }; INSTANTIATE_TEST_SUITE_P(IpVersionsClientTypeDelta, DeferredCreationClusterStatsTest, - DELTA_SOTW_GRPC_CLIENT_INTEGRATION_PARAMS); + DELTA_SOTW_GRPC_CLIENT_DEFERRED_CLUSTERS_INTEGRATION_PARAMS); // Test that DeferredCreationTrafficStats gets created and updated correctly. TEST_P(DeferredCreationClusterStatsTest, diff --git a/test/integration/eds_integration_test.cc b/test/integration/eds_integration_test.cc index 9b6e142a67f4..b7031214a1d0 100644 --- a/test/integration/eds_integration_test.cc +++ b/test/integration/eds_integration_test.cc @@ -17,14 +17,30 @@ namespace Envoy { namespace { +void validateClusters(const Upstream::ClusterManager::ClusterInfoMap& active_cluster_map, + const std::string& cluster, size_t expected_active_clusters, + size_t hosts_expected, size_t healthy_hosts, size_t degraded_hosts) { + EXPECT_EQ(expected_active_clusters, active_cluster_map.size()); + ASSERT_EQ(1, active_cluster_map.count(cluster)); + const auto& cluster_ref = active_cluster_map.find(cluster)->second; + const auto& hostset_per_priority = cluster_ref.get().prioritySet().hostSetsPerPriority(); + EXPECT_EQ(1, hostset_per_priority.size()); + const Envoy::Upstream::HostSetPtr& host_set = hostset_per_priority[0]; + EXPECT_EQ(hosts_expected, host_set->hosts().size()); + EXPECT_EQ(healthy_hosts, host_set->healthyHosts().size()); + EXPECT_EQ(degraded_hosts, host_set->degradedHosts().size()); +}; + // Integration test for EDS features. EDS is consumed via filesystem // subscription. -class EdsIntegrationTest : public testing::TestWithParam, - public HttpIntegrationTest { +class EdsIntegrationTest + : public testing::TestWithParam>, + public HttpIntegrationTest { public: EdsIntegrationTest() - : HttpIntegrationTest(Http::CodecType::HTTP1, GetParam()), - codec_client_type_(envoy::type::v3::HTTP1) {} + : HttpIntegrationTest(Http::CodecType::HTTP1, std::get<0>(GetParam())), + codec_client_type_(envoy::type::v3::HTTP1), + deferred_cluster_creation_(std::get<1>(GetParam())) {} // We need to supply the endpoints via EDS to provide health status. Use a // filesystem delivery to simplify test mechanics. @@ -136,6 +152,8 @@ class EdsIntegrationTest : public testing::TestWithParammutable_path_config_source() ->set_path(cds_helper_.cdsPath()); bootstrap.mutable_static_resources()->clear_clusters(); + bootstrap.mutable_cluster_manager()->set_enable_deferred_cluster_creation( + deferred_cluster_creation_); }); // Set validate_clusters to false to allow us to reference a CDS cluster. @@ -178,13 +196,15 @@ class EdsIntegrationTest : public testing::TestWithParamwaitForGaugeEq("thread_local_cluster_manager.worker_0.clusters_inflated", 0); + + BufferingStreamDecoderPtr response = IntegrationUtil::makeSingleRequest( + lookupPort("http"), "GET", "/cluster_0", "", downstream_protocol_, version_, "foo.com"); + ASSERT_TRUE(response->complete()); + EXPECT_EQ("200", response->headers().getStatusValue()); + cleanupUpstreamAndDownstream(); + test_server_->waitForGaugeEq("thread_local_cluster_manager.worker_0.clusters_inflated", 1); + + const auto& active_cluster_map = + test_server_->server().clusterManager().clusters().active_clusters_; + { + const size_t expected_active_cluster = 1; + const size_t expected_hosts = 2, healthy_hosts = 2, degraded_hosts = 0; + validateClusters(active_cluster_map, "cluster_0", expected_active_cluster, expected_hosts, + healthy_hosts, degraded_hosts); + } +} + +// Test that a deferred EDS cluster that was created inline can get EDS updates +// and receive traffic after the update. +TEST_P(EdsIntegrationTest, DataplaneTrafficAfterEdsUpdateOfInitializedCluster) { + if (!deferred_cluster_creation_) { + GTEST_SKIP() << "Test depends on deferred cluster creation. Skipping."; + } + autonomous_upstream_ = true; + + initializeTest(false); + EndpointSettingOptions options; + options.total_endpoints = 1; + options.healthy_endpoints = 1; + setEndpoints(options); + + test_server_->waitForGaugeEq("thread_local_cluster_manager.worker_0.clusters_inflated", 0); + BufferingStreamDecoderPtr response = IntegrationUtil::makeSingleRequest( + lookupPort("http"), "GET", "/cluster_0", "", downstream_protocol_, version_, "foo.com"); + ASSERT_TRUE(response->complete()); + EXPECT_EQ("200", response->headers().getStatusValue()); + cleanupUpstreamAndDownstream(); + test_server_->waitForGaugeEq("thread_local_cluster_manager.worker_0.clusters_inflated", 1); + const auto& active_cluster_map = + test_server_->server().clusterManager().clusters().active_clusters_; + { + const size_t expected_active_cluster = 1; + const size_t expected_hosts = 1, healthy_hosts = 1, degraded_hosts = 0; + validateClusters(active_cluster_map, "cluster_0", expected_active_cluster, expected_hosts, + healthy_hosts, degraded_hosts); + } + + options.total_endpoints = 2; + options.healthy_endpoints = 1; + options.degraded_endpoints = 1; + setEndpoints(options); + + response = IntegrationUtil::makeSingleRequest(lookupPort("http"), "GET", "/cluster_0", "", + downstream_protocol_, version_, "foo.com"); + ASSERT_TRUE(response->complete()); + EXPECT_EQ("200", response->headers().getStatusValue()); + cleanupUpstreamAndDownstream(); + { + const size_t expected_active_cluster = 1; + const size_t expected_hosts = 2, healthy_hosts = 1, degraded_hosts = 1; + validateClusters(active_cluster_map, "cluster_0", expected_active_cluster, expected_hosts, + healthy_hosts, degraded_hosts); + } +} + } // namespace } // namespace Envoy diff --git a/test/mocks/upstream/cluster_update_callbacks.h b/test/mocks/upstream/cluster_update_callbacks.h index 782c319004c1..2de2fffefb40 100644 --- a/test/mocks/upstream/cluster_update_callbacks.h +++ b/test/mocks/upstream/cluster_update_callbacks.h @@ -14,7 +14,8 @@ class MockClusterUpdateCallbacks : public ClusterUpdateCallbacks { MockClusterUpdateCallbacks(); ~MockClusterUpdateCallbacks() override; - MOCK_METHOD(void, onClusterAddOrUpdate, (ThreadLocalCluster & cluster)); + MOCK_METHOD(void, onClusterAddOrUpdate, + (absl::string_view cluster_name, ThreadLocalClusterCommand& command)); MOCK_METHOD(void, onClusterRemoval, (const std::string& cluster_name)); }; } // namespace Upstream diff --git a/test/server/server_test.cc b/test/server/server_test.cc index c732c88209ab..8ade9b1d649c 100644 --- a/test/server/server_test.cc +++ b/test/server/server_test.cc @@ -1541,7 +1541,7 @@ class CallbacksStatsSink : public Stats::Sink, public Upstream::ClusterUpdateCal void onHistogramComplete(const Stats::Histogram&, uint64_t) override {} // Upstream::ClusterUpdateCallbacks - void onClusterAddOrUpdate(Upstream::ThreadLocalCluster&) override {} + void onClusterAddOrUpdate(absl::string_view, Upstream::ThreadLocalClusterCommand&) override {} void onClusterRemoval(const std::string&) override {} private: diff --git a/tools/spelling/spelling_dictionary.txt b/tools/spelling/spelling_dictionary.txt index b77f26cb5c30..d40014b9e29d 100644 --- a/tools/spelling/spelling_dictionary.txt +++ b/tools/spelling/spelling_dictionary.txt @@ -30,6 +30,7 @@ BSON BPF Bdecoded Bencoded +CIO DFP DOM GiB