Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Config: Implement deferred clusters on worker. #28702

Merged
merged 12 commits into from
Aug 8, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions api/envoy/config/bootstrap/v3/bootstrap.proto
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ message Admin {
}

// Cluster manager :ref:`architecture overview <arch_overview_cluster_manager>`.
// [#next-free-field: 6]
message ClusterManager {
option (udpa.annotations.versioning).previous_message_type =
"envoy.config.bootstrap.v2.ClusterManager";
Expand Down Expand Up @@ -478,6 +479,11 @@ message ClusterManager {
// <envoy_v3_api_field_config.core.v3.ApiConfigSource.api_type>` :ref:`GRPC
// <envoy_v3_api_enum_value_config.core.v3.ApiConfigSource.ApiType.GRPC>`.
core.v3.ApiConfigSource load_stats_config = 4;

// Whether the ClusterManager will create clusters on the worker threads
// inline during requests. This will save memory and CPU cycles in cases where
// there are lots of inactive clusters and > 1 worker thread.
bool enable_deferred_cluster_creation = 5;
}

// Allows you to specify different watchdog configs for different subsystems.
Expand Down
3 changes: 3 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,8 @@ removed_config_or_runtime:
# *Normally occurs at the end of the* :ref:`deprecation period <deprecated>`

new_features:
- area: config
change: |
Added the capability to defer broadcasting of certain cluster (CDS, EDS) to worker threads from the main thread. This optimization can save significant amount of memory in cases where there are (1) a large number of workers and (2) a large amount of config, most of which is unused. This capability is guarded by :ref:`enable_deferred_cluster_creation <envoy_api_field_config.bootstrap.v3.ClusterManager.enable_deferred_cluster_creation>.

deprecated:
11 changes: 11 additions & 0 deletions docs/root/configuration/upstream/cluster_manager/cluster_stats.rst
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,17 @@ upstreams and control plane xDS clusters.
active_clusters, Gauge, Number of currently active (warmed) clusters
warming_clusters, Gauge, Number of currently warming (not active) clusters


In addition to the cluster manager stats, there are per worker thread local
cluster manager statistics tree rooted at
*thread_local_cluster_manager.<worker_id>.* with the following statistics.

.. csv-table::
:header: Name, Type, Description
:widths: 1, 1, 2

clusters_inflated, Gauge, Number of clusters the worker has initialized. If using cluster deferral this number should be <= cluster_added.

.. _config_cluster_stats:

Every cluster has a statistics tree rooted at *cluster.<name>.* with the following statistics:
Expand Down
10 changes: 6 additions & 4 deletions envoy/upstream/cluster_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,20 @@ namespace Upstream {
* ClusterUpdateCallbacks provide a way to expose Cluster lifecycle events in the
* ClusterManager.
*/
using ThreadLocalClusterCommand = std::function<ThreadLocalCluster&()>;
class ClusterUpdateCallbacks {
public:
virtual ~ClusterUpdateCallbacks() = default;

/**
* onClusterAddOrUpdate is called when a new cluster is added or an existing cluster
* is updated in the ClusterManager.
* @param cluster is the ThreadLocalCluster that represents the updated
* cluster.
* @param cluster_name the name of the changed cluster.
* @param get_cluster is a callable that will provide the ThreadLocalCluster that represents the
* updated cluster. It should be used within the call or discarded.
*/
virtual void onClusterAddOrUpdate(ThreadLocalCluster& cluster) PURE;

virtual void onClusterAddOrUpdate(const std::string& cluster_name,
KBaichoo marked this conversation as resolved.
Show resolved Hide resolved
ThreadLocalClusterCommand& get_cluster) PURE;
/**
* onClusterRemoval is called when a cluster is removed; the argument is the cluster name.
* @param cluster_name is the name of the removed cluster.
Expand Down
12 changes: 7 additions & 5 deletions source/common/upstream/cluster_discovery_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ namespace Upstream {

namespace {

using ClusterAddedCb = std::function<void(ThreadLocalCluster&)>;
using ClusterAddedCb = std::function<void(const std::string&)>;
KBaichoo marked this conversation as resolved.
Show resolved Hide resolved

class ClusterCallbacks : public ClusterUpdateCallbacks {
public:
ClusterCallbacks(ClusterAddedCb cb) : cb_(std::move(cb)) {}

void onClusterAddOrUpdate(ThreadLocalCluster& cluster) override { cb_(cluster); };
void onClusterAddOrUpdate(const std::string& cluster_name, ThreadLocalClusterCommand&) override {
cb_(cluster_name);
};

void onClusterRemoval(const std::string&) override {}

Expand All @@ -28,12 +30,12 @@ class ClusterCallbacks : public ClusterUpdateCallbacks {
ClusterDiscoveryManager::ClusterDiscoveryManager(
std::string thread_name, ClusterLifecycleCallbackHandler& lifecycle_callbacks_handler)
: thread_name_(std::move(thread_name)) {
callbacks_ = std::make_unique<ClusterCallbacks>([this](ThreadLocalCluster& cluster) {
callbacks_ = std::make_unique<ClusterCallbacks>([this](const std::string& cluster_name) {
ENVOY_LOG(trace,
"cm cdm: starting processing cluster name {} (status {}) from cluster lifecycle "
"callback in {}",
cluster.info()->name(), enumToInt(ClusterDiscoveryStatus::Available), thread_name_);
processClusterName(cluster.info()->name(), ClusterDiscoveryStatus::Available);
cluster_name, enumToInt(ClusterDiscoveryStatus::Available), thread_name_);
processClusterName(cluster_name, ClusterDiscoveryStatus::Available);
});
callbacks_handle_ = lifecycle_callbacks_handler.addClusterUpdateCallbacks(*callbacks_);
}
Expand Down
Loading
Loading