Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Config: Implement deferred clusters on worker. #28702

Merged
merged 12 commits into from
Aug 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions api/envoy/config/bootstrap/v3/bootstrap.proto
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ message Admin {
}

// Cluster manager :ref:`architecture overview <arch_overview_cluster_manager>`.
// [#next-free-field: 6]
message ClusterManager {
option (udpa.annotations.versioning).previous_message_type =
"envoy.config.bootstrap.v2.ClusterManager";
Expand Down Expand Up @@ -478,6 +479,11 @@ message ClusterManager {
// <envoy_v3_api_field_config.core.v3.ApiConfigSource.api_type>` :ref:`GRPC
// <envoy_v3_api_enum_value_config.core.v3.ApiConfigSource.ApiType.GRPC>`.
core.v3.ApiConfigSource load_stats_config = 4;

// Whether the ClusterManager will create clusters on the worker threads
// inline during requests. This will save memory and CPU cycles in cases where
// there are lots of inactive clusters and > 1 worker thread.
bool enable_deferred_cluster_creation = 5;
}

// Allows you to specify different watchdog configs for different subsystems.
Expand Down
8 changes: 8 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ new_features:
change: |
added %RESPONSE_FLAGS_LONG% substitution string, that will output a pascal case string representing the resonse flags.
The output response flags will correspond with %RESPONSE_FLAGS%, only with a long textual string representation.
- area: config
change: |
Added the capability to defer broadcasting of certain cluster (CDS, EDS) to
worker threads from the main thread. This optimization can save significant
amount of memory in cases where there are (1) a large number of workers and
(2) a large amount of config, most of which is unused. This capability is
guarded by :ref:`enable_deferred_cluster_creation
<envoy_v3_api_field_config.bootstrap.v3.ClusterManager.enable_deferred_cluster_creation>`.
- area: extension_discovery_service
change: |
added ECDS support for :ref:` downstream network filters<envoy_v3_api_field_config.listener.v3.Filter.config_discovery>`.
Expand Down
11 changes: 11 additions & 0 deletions docs/root/configuration/upstream/cluster_manager/cluster_stats.rst
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,17 @@ upstreams and control plane xDS clusters.
active_clusters, Gauge, Number of currently active (warmed) clusters
warming_clusters, Gauge, Number of currently warming (not active) clusters


In addition to the cluster manager stats, there are per worker thread local
cluster manager statistics tree rooted at
*thread_local_cluster_manager.<worker_id>.* with the following statistics.

.. csv-table::
:header: Name, Type, Description
:widths: 1, 1, 2

clusters_inflated, Gauge, Number of clusters the worker has initialized. If using cluster deferral this number should be <= (cluster_added - clusters_removed).

.. _config_cluster_stats:

Every cluster has a statistics tree rooted at *cluster.<name>.* with the following statistics:
Expand Down
10 changes: 6 additions & 4 deletions envoy/upstream/cluster_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,20 @@ namespace Upstream {
* ClusterUpdateCallbacks provide a way to expose Cluster lifecycle events in the
* ClusterManager.
*/
using ThreadLocalClusterCommand = std::function<ThreadLocalCluster&()>;
class ClusterUpdateCallbacks {
public:
virtual ~ClusterUpdateCallbacks() = default;

/**
* onClusterAddOrUpdate is called when a new cluster is added or an existing cluster
* is updated in the ClusterManager.
* @param cluster is the ThreadLocalCluster that represents the updated
* cluster.
* @param cluster_name the name of the changed cluster.
* @param get_cluster is a callable that will provide the ThreadLocalCluster that represents the
* updated cluster. It should be used within the call or discarded.
*/
virtual void onClusterAddOrUpdate(ThreadLocalCluster& cluster) PURE;

virtual void onClusterAddOrUpdate(absl::string_view cluster_name,
ThreadLocalClusterCommand& get_cluster) PURE;
/**
* onClusterRemoval is called when a cluster is removed; the argument is the cluster name.
* @param cluster_name is the name of the removed cluster.
Expand Down
12 changes: 7 additions & 5 deletions source/common/upstream/cluster_discovery_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ namespace Upstream {

namespace {

using ClusterAddedCb = std::function<void(ThreadLocalCluster&)>;
using ClusterAddedCb = std::function<void(absl::string_view)>;

class ClusterCallbacks : public ClusterUpdateCallbacks {
public:
ClusterCallbacks(ClusterAddedCb cb) : cb_(std::move(cb)) {}

void onClusterAddOrUpdate(ThreadLocalCluster& cluster) override { cb_(cluster); };
void onClusterAddOrUpdate(absl::string_view cluster_name, ThreadLocalClusterCommand&) override {
cb_(cluster_name);
};

void onClusterRemoval(const std::string&) override {}

Expand All @@ -28,12 +30,12 @@ class ClusterCallbacks : public ClusterUpdateCallbacks {
ClusterDiscoveryManager::ClusterDiscoveryManager(
std::string thread_name, ClusterLifecycleCallbackHandler& lifecycle_callbacks_handler)
: thread_name_(std::move(thread_name)) {
callbacks_ = std::make_unique<ClusterCallbacks>([this](ThreadLocalCluster& cluster) {
callbacks_ = std::make_unique<ClusterCallbacks>([this](absl::string_view cluster_name) {
ENVOY_LOG(trace,
"cm cdm: starting processing cluster name {} (status {}) from cluster lifecycle "
"callback in {}",
cluster.info()->name(), enumToInt(ClusterDiscoveryStatus::Available), thread_name_);
processClusterName(cluster.info()->name(), ClusterDiscoveryStatus::Available);
cluster_name, enumToInt(ClusterDiscoveryStatus::Available), thread_name_);
processClusterName(cluster_name, ClusterDiscoveryStatus::Available);
});
callbacks_handle_ = lifecycle_callbacks_handler.addClusterUpdateCallbacks(*callbacks_);
}
Expand Down
Loading
Loading