-
Notifications
You must be signed in to change notification settings - Fork 4.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
upstream: cluster manager api for dynamic cluster creation #3479
Changes from all commits
70b81b9
eaa6971
4477dae
80cb88c
ea3a5ba
0dc129d
6c23b26
ebccdff
292271b
c44656a
60a7411
7222ca8
e668f53
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -163,6 +163,25 @@ void ClusterManagerInitHelper::setInitializedCb(std::function<void()> callback) | |
} | ||
} | ||
|
||
DynamicClusterHandlerImpl::DynamicClusterHandlerImpl( | ||
const std::string& cluster_name, | ||
std::unordered_map<std::string, Envoy::Upstream::DynamicClusterHandlerPtr>& pending_clusters, | ||
PostClusterCreationCb post_cluster_cb) | ||
: cluster_name_(cluster_name), pending_clusters_(pending_clusters), | ||
post_cluster_cb_(post_cluster_cb) {} | ||
|
||
void DynamicClusterHandlerImpl::cancel() { | ||
ENVOY_LOG(debug, "cancelling dynamic cluster creation callback for cluster {}", cluster_name_); | ||
pending_clusters_.erase(cluster_name_); | ||
} | ||
|
||
void DynamicClusterHandlerImpl::onClusterCreationComplete() { | ||
ENVOY_LOG(debug, "cluster {} creation complete. initiating post callback", cluster_name_); | ||
if (pending_clusters_.erase(cluster_name_) != 0) { | ||
post_cluster_cb_(); | ||
} | ||
} | ||
|
||
ClusterManagerImpl::ClusterManagerImpl(const envoy::config::bootstrap::v2::Bootstrap& bootstrap, | ||
ClusterManagerFactory& factory, Stats::Store& stats, | ||
ThreadLocal::Instance& tls, Runtime::Loader& runtime, | ||
|
@@ -172,8 +191,9 @@ ClusterManagerImpl::ClusterManagerImpl(const envoy::config::bootstrap::v2::Boots | |
Event::Dispatcher& main_thread_dispatcher, | ||
Server::Admin& admin) | ||
: factory_(factory), runtime_(runtime), stats_(stats), tls_(tls.allocateSlot()), | ||
random_(random), bind_config_(bootstrap.cluster_manager().upstream_bind_config()), | ||
local_info_(local_info), cm_stats_(generateStats(stats)), | ||
main_thread_dispatcher_(main_thread_dispatcher), random_(random), | ||
bind_config_(bootstrap.cluster_manager().upstream_bind_config()), local_info_(local_info), | ||
cm_stats_(generateStats(stats)), | ||
init_helper_([this](Cluster& cluster) { onClusterInit(cluster); }), | ||
config_tracker_entry_( | ||
admin.getConfigTracker().add("clusters", [this] { return dumpClusterConfigs(); })) { | ||
|
@@ -339,8 +359,46 @@ void ClusterManagerImpl::onClusterInit(Cluster& cluster) { | |
} | ||
} | ||
|
||
std::pair<ClusterResponseCode, DynamicClusterHandlerPtr> | ||
ClusterManagerImpl::addOrUpdateClusterCrossThread(const envoy::api::v2::Cluster& cluster, | ||
const std::string& version_info, | ||
PostClusterCreationCb post_cluster_cb) { | ||
ASSERT(!tls_->isMainThread()); | ||
|
||
bool should_create_cluster = true; | ||
if (active_clusters_.find(cluster.name()) != active_clusters_.end()) { | ||
ENVOY_LOG(trace, "cluster {} already exists", cluster.name()); | ||
should_create_cluster = false; | ||
return std::make_pair(ClusterResponseCode::DuplicateCluster, nullptr); | ||
} | ||
if (cluster.type() != envoy::api::v2::Cluster::STATIC || cluster.hosts_size() < 1) { | ||
ENVOY_LOG(warn, "dynamic cluster {} should be of type static and should have hosts", | ||
cluster.name()); | ||
should_create_cluster = false; | ||
return std::make_pair(ClusterResponseCode::NonStaticClusterNotAllowed, nullptr); | ||
} | ||
|
||
DynamicClusterHandlerPtr cluster_handler; | ||
if (should_create_cluster) { | ||
ThreadLocalClusterManagerImpl& cluster_manager = | ||
tls_->getTyped<ThreadLocalClusterManagerImpl>(); | ||
cluster_handler = std::make_shared<DynamicClusterHandlerImpl>( | ||
cluster.name(), cluster_manager.pending_cluster_creations_, post_cluster_cb); | ||
cluster_manager.pending_cluster_creations_[cluster.name()] = cluster_handler; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What if two different filters/requests are trying to create the cluster with the same name? What happens? I think this code is not going to work correctly as you are going to overwrite the other requests. Like the cross thread cae, we also need to handle multiple requests on the same thread. |
||
Event::Dispatcher& thread_local_dispatcher = cluster_manager.thread_local_dispatcher_; | ||
main_thread_dispatcher_.post([this, cluster, version_info, &thread_local_dispatcher]() -> void { | ||
ENVOY_LOG(trace, "initating dynamic cluster {} creation", cluster.name()); | ||
pending_cluster_creations_[cluster.name()].push_back(thread_local_dispatcher); | ||
addOrUpdateCluster(cluster, version_info); | ||
}); | ||
} | ||
return std::make_pair(ClusterResponseCode::Accepted, cluster_handler); | ||
} | ||
|
||
bool ClusterManagerImpl::addOrUpdateCluster(const envoy::api::v2::Cluster& cluster, | ||
const std::string& version_info) { | ||
ASSERT(tls_->isMainThread()); | ||
|
||
// First we need to see if this new config is new or an update to an existing dynamic cluster. | ||
// We don't allow updates to statically configured clusters in the main configuration. We check | ||
// both the warming clusters and the active clusters to see if we need an update or the update | ||
|
@@ -349,60 +407,77 @@ bool ClusterManagerImpl::addOrUpdateCluster(const envoy::api::v2::Cluster& clust | |
const auto existing_active_cluster = active_clusters_.find(cluster_name); | ||
const auto existing_warming_cluster = warming_clusters_.find(cluster_name); | ||
const uint64_t new_hash = MessageUtil::hash(cluster); | ||
bool cluster_create = true; | ||
if ((existing_active_cluster != active_clusters_.end() && | ||
existing_active_cluster->second->blockUpdate(new_hash)) || | ||
(existing_warming_cluster != warming_clusters_.end() && | ||
existing_warming_cluster->second->blockUpdate(new_hash))) { | ||
return false; | ||
cluster_create = false; | ||
} | ||
if (cluster_create) { | ||
if (existing_active_cluster != active_clusters_.end() || | ||
existing_warming_cluster != warming_clusters_.end()) { | ||
// The following init manager remove call is a NOP in the case we are already initialized. | ||
// It's just kept here to avoid additional logic. | ||
init_helper_.removeCluster(*existing_active_cluster->second->cluster_); | ||
cm_stats_.cluster_modified_.inc(); | ||
} else { | ||
cm_stats_.cluster_added_.inc(); | ||
} | ||
|
||
// There are two discrete paths here depending on when we are adding/updating a cluster. | ||
// 1) During initial server load we use the init manager which handles complex logic related to | ||
// primary/secondary init, static/CDS init, warming all clusters, etc. | ||
// 2) After initial server load, we handle warming independently for each cluster in the warming | ||
// map. | ||
// Note: It's likely possible that all warming logic could be centralized in the init manager, | ||
// but | ||
// a decision was made to split the logic given how complex the init manager already is. | ||
// In the future we may decide to undergo a refactor to unify the logic but the | ||
// effort/risk to do that right now does not seem worth it given that the logic is | ||
// generally pretty clean and easy to understand. | ||
const bool use_active_map = | ||
init_helper_.state() != ClusterManagerInitHelper::State::AllClustersInitialized; | ||
loadCluster(cluster, version_info, true, use_active_map ? active_clusters_ : warming_clusters_); | ||
|
||
if (use_active_map) { | ||
ENVOY_LOG(info, "add/update cluster {} during init", cluster_name); | ||
auto& cluster_entry = active_clusters_.at(cluster_name); | ||
createOrUpdateThreadLocalCluster(*cluster_entry); | ||
init_helper_.addCluster(*cluster_entry->cluster_); | ||
} else { | ||
auto& cluster_entry = warming_clusters_.at(cluster_name); | ||
ENVOY_LOG(info, "add/update cluster {} starting warming", cluster_name); | ||
cluster_entry->cluster_->initialize([this, cluster_name] { | ||
auto warming_it = warming_clusters_.find(cluster_name); | ||
auto& cluster_entry = *warming_it->second; | ||
active_clusters_[cluster_name] = std::move(warming_it->second); | ||
warming_clusters_.erase(warming_it); | ||
|
||
ENVOY_LOG(info, "warming cluster {} complete", cluster_name); | ||
createOrUpdateThreadLocalCluster(cluster_entry); | ||
onClusterInit(*cluster_entry.cluster_); | ||
updateGauges(); | ||
}); | ||
} | ||
|
||
updateGauges(); | ||
} | ||
|
||
if (existing_active_cluster != active_clusters_.end() || | ||
existing_warming_cluster != warming_clusters_.end()) { | ||
// The following init manager remove call is a NOP in the case we are already initialized. It's | ||
// just kept here to avoid additional logic. | ||
init_helper_.removeCluster(*existing_active_cluster->second->cluster_); | ||
cm_stats_.cluster_modified_.inc(); | ||
} else { | ||
cm_stats_.cluster_added_.inc(); | ||
} | ||
|
||
// There are two discrete paths here depending on when we are adding/updating a cluster. | ||
// 1) During initial server load we use the init manager which handles complex logic related to | ||
// primary/secondary init, static/CDS init, warming all clusters, etc. | ||
// 2) After initial server load, we handle warming independently for each cluster in the warming | ||
// map. | ||
// Note: It's likely possible that all warming logic could be centralized in the init manager, but | ||
// a decision was made to split the logic given how complex the init manager already is. In | ||
// the future we may decide to undergo a refactor to unify the logic but the effort/risk to | ||
// do that right now does not seem worth it given that the logic is generally pretty clean | ||
// and easy to understand. | ||
const bool use_active_map = | ||
init_helper_.state() != ClusterManagerInitHelper::State::AllClustersInitialized; | ||
loadCluster(cluster, version_info, true, use_active_map ? active_clusters_ : warming_clusters_); | ||
|
||
if (use_active_map) { | ||
ENVOY_LOG(info, "add/update cluster {} during init", cluster_name); | ||
auto& cluster_entry = active_clusters_.at(cluster_name); | ||
createOrUpdateThreadLocalCluster(*cluster_entry); | ||
init_helper_.addCluster(*cluster_entry->cluster_); | ||
} else { | ||
auto& cluster_entry = warming_clusters_.at(cluster_name); | ||
ENVOY_LOG(info, "add/update cluster {} starting warming", cluster_name); | ||
cluster_entry->cluster_->initialize([this, cluster_name] { | ||
auto warming_it = warming_clusters_.find(cluster_name); | ||
auto& cluster_entry = *warming_it->second; | ||
active_clusters_[cluster_name] = std::move(warming_it->second); | ||
warming_clusters_.erase(warming_it); | ||
|
||
ENVOY_LOG(info, "warming cluster {} complete", cluster_name); | ||
createOrUpdateThreadLocalCluster(cluster_entry); | ||
onClusterInit(*cluster_entry.cluster_); | ||
updateGauges(); | ||
// post the cluster create completion message to all registered dispathers both in success and failure | ||
// cases so that they can unblock themselves and proceed with the request flow. | ||
for (Event::Dispatcher& dispatcher : pending_cluster_creations_[cluster_name]) { | ||
dispatcher.post([this,cluster_name]()->void { | ||
ThreadLocalClusterManagerImpl& cluster_manager = | ||
tls_->getTyped<ThreadLocalClusterManagerImpl>(); | ||
if (cluster_manager.pending_cluster_creations_.find(cluster_name) != cluster_manager.pending_cluster_creations_.end()) { | ||
auto handler = cluster_manager.pending_cluster_creations_[cluster_name]; | ||
handler->onClusterCreationComplete(); | ||
} | ||
}); | ||
} | ||
|
||
updateGauges(); | ||
return true; | ||
pending_cluster_creations_.erase(cluster_name); | ||
return cluster_create; | ||
} | ||
|
||
void ClusterManagerImpl::createOrUpdateThreadLocalCluster(ClusterData& cluster) { | ||
|
@@ -775,6 +850,11 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::updateClusterMembership( | |
ENVOY_LOG(debug, "re-creating local LB for TLS cluster {}", name); | ||
cluster_entry->lb_ = cluster_entry->lb_factory_->create(); | ||
} | ||
|
||
if (config.pending_cluster_creations_.find(name) != config.pending_cluster_creations_.end()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If two threads race to create this cluster (which can easily happen if you build clusters in the request path), you are going to hang the thread that loses the race. You are still going to have to handle this somehow. Roughly what you need to do is register the per-thread intents on the main thread and make sure a "notify cluster creation" message goes out to all threads no matter what in this case. Please make sure you have a unit test for this. (In general I would like you to remove the Lua stuff from this PR for now. I realize Lua will add very nice integration tests, but for now I would prefer to get some solid unit tests that cover all the interleavings). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @mattklein123 I agree and it is in my TODO list to handle two threads race case and two requests case. I just wanted to get the high level view first. I will handle those cases now. I will definitely add solid unit tests for the new API.
|
||
auto handler = config.pending_cluster_creations_.find(name)->second; | ||
handler->onClusterCreationComplete(); | ||
} | ||
} | ||
|
||
void ClusterManagerImpl::ThreadLocalClusterManagerImpl::onHostHealthFailure( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about removal? Do we need to be thinking about TTL for added clusters? What about safeguards for cluster addition? Do we assume right now that whoever is calling this API must deal with resource usage safety similar to trusting the management server? For example, if a filter allows clusters to be created via untrusted headers, it's up to the filter to sanity check what it does? Can we add detailed comments on this assumptions and things we might want to do in the future?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. I thought about removeCluster and here are my thoughts
But for now for simplicity, I think we should assume that whoever is calling this API must deal with resource usage safety.
If you agree with this, I would add these semantics to the docs/release notes as well.