-
Notifications
You must be signed in to change notification settings - Fork 4.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
upstream: cluster manager api for dynamic cluster creation #3479
Changes from 11 commits
70b81b9
eaa6971
4477dae
80cb88c
ea3a5ba
0dc129d
6c23b26
ebccdff
292271b
c44656a
60a7411
7222ca8
e668f53
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -57,6 +57,37 @@ class ClusterUpdateCallbacksHandle { | |
|
||
typedef std::unique_ptr<ClusterUpdateCallbacksHandle> ClusterUpdateCallbacksHandlePtr; | ||
|
||
/** | ||
* A cancellable handler returned to the caller who initiates the dynamic cluster creation on the | ||
* request path. | ||
*/ | ||
class DynamicClusterHandler { | ||
public: | ||
virtual ~DynamicClusterHandler() {} | ||
|
||
/** | ||
* @return Cluster that this handle is created for. | ||
*/ | ||
virtual const std::string& cluster() const PURE; | ||
|
||
/** | ||
* Cancels the callback. | ||
*/ | ||
virtual void cancel() PURE; | ||
|
||
/** | ||
* Called when cluster creation is complete. | ||
*/ | ||
virtual void onClusterCreationComplete() PURE; | ||
}; | ||
|
||
typedef std::shared_ptr<DynamicClusterHandler> DynamicClusterHandlerPtr; | ||
|
||
/** | ||
* Callback invoked when a cross thread cluster creation is complete. | ||
*/ | ||
typedef std::function<void()> PostClusterCreationCb; | ||
|
||
/** | ||
* Manages connection pools and load balancing for upstream clusters. The cluster manager is | ||
* persistent and shared among multiple ongoing requests/connections. | ||
|
@@ -78,6 +109,20 @@ class ClusterManager { | |
virtual bool addOrUpdateCluster(const envoy::api::v2::Cluster& cluster, | ||
const std::string& version_info) PURE; | ||
|
||
/** | ||
* Add a dynamic cluster via API on the request path. Currently this supports only STATIC type of | ||
* clusters. | ||
* | ||
* @param cluster supplies the cluster configuration. | ||
* @param version_info supplies the xDS version of the cluster. | ||
* @param post_cluster_cb supplies the call back that allows the request to continue after the | ||
* cluster creation is done. | ||
* @return DynamicClusterHandlerPtr that allows the caller to cancel if needed. | ||
*/ | ||
virtual DynamicClusterHandlerPtr | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. At the API level, there should be some better way to determine if there was an error (e.g., cluster not STATIC) or if the cluster already exists. Can you think about a better way to do this? Perhaps you should return an std::pair<> with some type of status code and either a handle or nullptr? This is useful for filters but also for tests (you will need to add unit tests for cluster manager). |
||
addOrUpdateClusterCrossThread(const envoy::api::v2::Cluster& cluster, | ||
const std::string& version_info, | ||
PostClusterCreationCb post_cluster_cb) PURE; | ||
/** | ||
* Set a callback that will be invoked when all owned clusters have been initialized. | ||
*/ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -163,6 +163,24 @@ void ClusterManagerInitHelper::setInitializedCb(std::function<void()> callback) | |
} | ||
} | ||
|
||
DynamicClusterHandlerImpl::DynamicClusterHandlerImpl( | ||
const std::string& cluster_name, | ||
std::unordered_map<std::string, Envoy::Upstream::DynamicClusterHandlerPtr>& pending_clusters, | ||
PostClusterCreationCb post_cluster_cb) | ||
: cluster_name_(cluster_name), pending_clusters_(pending_clusters), | ||
post_cluster_cb_(post_cluster_cb) {} | ||
|
||
void DynamicClusterHandlerImpl::cancel() { | ||
ENVOY_LOG(debug, "cancelling dynamic cluster creation callback for cluster {}", cluster_name_); | ||
pending_clusters_.erase(cluster_name_); | ||
} | ||
|
||
void DynamicClusterHandlerImpl::onClusterCreationComplete() { | ||
ENVOY_LOG(debug, "cluster {} creation complete. initiating post callback", cluster_name_); | ||
post_cluster_cb_(); | ||
pending_clusters_.erase(cluster_name_); | ||
} | ||
|
||
ClusterManagerImpl::ClusterManagerImpl(const envoy::config::bootstrap::v2::Bootstrap& bootstrap, | ||
ClusterManagerFactory& factory, Stats::Store& stats, | ||
ThreadLocal::Instance& tls, Runtime::Loader& runtime, | ||
|
@@ -172,8 +190,9 @@ ClusterManagerImpl::ClusterManagerImpl(const envoy::config::bootstrap::v2::Boots | |
Event::Dispatcher& main_thread_dispatcher, | ||
Server::Admin& admin) | ||
: factory_(factory), runtime_(runtime), stats_(stats), tls_(tls.allocateSlot()), | ||
random_(random), bind_config_(bootstrap.cluster_manager().upstream_bind_config()), | ||
local_info_(local_info), cm_stats_(generateStats(stats)), | ||
main_thread_dispatcher_(main_thread_dispatcher), random_(random), | ||
bind_config_(bootstrap.cluster_manager().upstream_bind_config()), local_info_(local_info), | ||
cm_stats_(generateStats(stats)), | ||
init_helper_([this](Cluster& cluster) { onClusterInit(cluster); }), | ||
config_tracker_entry_( | ||
admin.getConfigTracker().add("clusters", [this] { return dumpClusterConfigs(); })) { | ||
|
@@ -339,8 +358,42 @@ void ClusterManagerImpl::onClusterInit(Cluster& cluster) { | |
} | ||
} | ||
|
||
DynamicClusterHandlerPtr | ||
ClusterManagerImpl::addOrUpdateClusterCrossThread(const envoy::api::v2::Cluster& cluster, | ||
const std::string& version_info, | ||
PostClusterCreationCb post_cluster_cb) { | ||
ASSERT(!tls_->isMainThread()); | ||
|
||
bool should_create_cluster = true; | ||
if (active_clusters_.find(cluster.name()) != active_clusters_.end()) { | ||
ENVOY_LOG(trace, "cluster {} already exists", cluster.name()); | ||
should_create_cluster = false; | ||
} | ||
if (cluster.type() != envoy::api::v2::Cluster::STATIC || cluster.hosts_size() < 1) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Checking |
||
ENVOY_LOG(warn, "dynamic cluster {} should be of type static and should have hosts", | ||
cluster.name()); | ||
should_create_cluster = false; | ||
} | ||
|
||
DynamicClusterHandlerPtr cluster_handler; | ||
if (should_create_cluster) { | ||
ThreadLocalClusterManagerImpl& cluster_manager = | ||
tls_->getTyped<ThreadLocalClusterManagerImpl>(); | ||
cluster_handler = std::make_shared<DynamicClusterHandlerImpl>( | ||
cluster.name(), cluster_manager.pending_cluster_creations_, post_cluster_cb); | ||
cluster_manager.pending_cluster_creations_[cluster.name()] = cluster_handler; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What if two different filters/requests are trying to create the cluster with the same name? What happens? I think this code is not going to work correctly as you are going to overwrite the other requests. Like the cross thread cae, we also need to handle multiple requests on the same thread. |
||
main_thread_dispatcher_.post([this, cluster, version_info]() -> void { | ||
ENVOY_LOG(trace, "initating dynamic cluster {} creation", cluster.name()); | ||
addOrUpdateCluster(cluster, version_info); | ||
}); | ||
} | ||
return cluster_handler; | ||
} | ||
|
||
bool ClusterManagerImpl::addOrUpdateCluster(const envoy::api::v2::Cluster& cluster, | ||
const std::string& version_info) { | ||
ASSERT(tls_->isMainThread()); | ||
|
||
// First we need to see if this new config is new or an update to an existing dynamic cluster. | ||
// We don't allow updates to statically configured clusters in the main configuration. We check | ||
// both the warming clusters and the active clusters to see if we need an update or the update | ||
|
@@ -775,6 +828,11 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::updateClusterMembership( | |
ENVOY_LOG(debug, "re-creating local LB for TLS cluster {}", name); | ||
cluster_entry->lb_ = cluster_entry->lb_factory_->create(); | ||
} | ||
|
||
if (config.pending_cluster_creations_.find(name) != config.pending_cluster_creations_.end()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If two threads race to create this cluster (which can easily happen if you build clusters in the request path), you are going to hang the thread that loses the race. You are still going to have to handle this somehow. Roughly what you need to do is register the per-thread intents on the main thread and make sure a "notify cluster creation" message goes out to all threads no matter what in this case. Please make sure you have a unit test for this. (In general I would like you to remove the Lua stuff from this PR for now. I realize Lua will add very nice integration tests, but for now I would prefer to get some solid unit tests that cover all the interleavings). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @mattklein123 I agree and it is in my TODO list to handle two threads race case and two requests case. I just wanted to get the high level view first. I will handle those cases now. I will definitely add solid unit tests for the new API.
|
||
auto handler = config.pending_cluster_creations_.find(name)->second; | ||
handler->onClusterCreationComplete(); | ||
} | ||
} | ||
|
||
void ClusterManagerImpl::ThreadLocalClusterManagerImpl::onHostHealthFailure( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about removal? Do we need to be thinking about TTL for added clusters? What about safeguards for cluster addition? Do we assume right now that whoever is calling this API must deal with resource usage safety similar to trusting the management server? For example, if a filter allows clusters to be created via untrusted headers, it's up to the filter to sanity check what it does? Can we add detailed comments on this assumptions and things we might want to do in the future?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. I thought about removeCluster and here are my thoughts
But for now for simplicity, I think we should assume that whoever is calling this API must deal with resource usage safety.
If you agree with this, I would add these semantics to the docs/release notes as well.