Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

upstream: cluster manager api for dynamic cluster creation #3479

5 changes: 5 additions & 0 deletions include/envoy/thread_local/thread_local.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ class Slot {
*/
virtual void runOnAllThreads(Event::PostCb cb, Event::PostCb all_threads_complete_cb) PURE;

/**
* Returns true if the thread is main dispatcher otherwise false.
*/
virtual bool isMainThread() PURE;

/**
* Set thread local data on all threads previously registered via registerThread().
* @param initializeCb supplies the functor that will be called *on each thread*. The functor
Expand Down
57 changes: 57 additions & 0 deletions include/envoy/upstream/cluster_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,47 @@ class ClusterUpdateCallbacksHandle {

typedef std::unique_ptr<ClusterUpdateCallbacksHandle> ClusterUpdateCallbacksHandlePtr;

/**
* A cancellable handler returned to the caller who initiates the dynamic cluster creation on the
* request path.
*/
class DynamicClusterHandler {
public:
virtual ~DynamicClusterHandler() {}

/**
* @return Cluster that this handle is created for.
*/
virtual const std::string& cluster() const PURE;

/**
* Cancels the callback.
*/
virtual void cancel() PURE;

/**
* Called when cluster creation is complete.
*/
virtual void onClusterCreationComplete() PURE;
};

typedef std::shared_ptr<DynamicClusterHandler> DynamicClusterHandlerPtr;

/**
* Callback invoked when a cross thread cluster creation is complete.
*/
typedef std::function<void()> PostClusterCreationCb;

/**
* List of response codes returned by addOrUpdateClusterCrossThread API.
*/
enum ClusterResponseCode {
Accepted,
DuplicateCluster,
ClusterCreationInProgress,
NonStaticClusterNotAllowed
};

/**
* Manages connection pools and load balancing for upstream clusters. The cluster manager is
* persistent and shared among multiple ongoing requests/connections.
Expand All @@ -78,6 +119,22 @@ class ClusterManager {
virtual bool addOrUpdateCluster(const envoy::api::v2::Cluster& cluster,
const std::string& version_info) PURE;

/**
* Add a dynamic cluster via API on the request path. Currently this supports only STATIC type of
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about removal? Do we need to be thinking about TTL for added clusters? What about safeguards for cluster addition? Do we assume right now that whoever is calling this API must deal with resource usage safety similar to trusting the management server? For example, if a filter allows clusters to be created via untrusted headers, it's up to the filter to sanity check what it does? Can we add detailed comments on this assumptions and things we might want to do in the future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. I thought about removeCluster and here are my thoughts

  1. We should have some automated way of cleaning up these clusters with TTL or no requests for a period of time etc that can be supplied along with the cluster creation details in the API. Can we tie this to auto eviction based on some policies/rules that we have been discussing in the incremental xDS api? I see some overlap between the two but not sure how much it is.
  2. We can change the removeCluster api to work with worker threads and let the filter call it on need basis.
  3. Apply some configuration that allows only x number of clusters to be created like this on the request path.
    But for now for simplicity, I think we should assume that whoever is calling this API must deal with resource usage safety.
    If you agree with this, I would add these semantics to the docs/release notes as well.

* clusters.
*
* @param cluster supplies the cluster configuration.
* @param version_info supplies the xDS version of the cluster.
* @param post_cluster_cb supplies the call back that allows the request to continue after the
* cluster creation is done or will be called immediately if cluster already exists.
* @return std::pair<ClusterResponseCode,DynamicClusterHandlerPtr>. ClusterResponseCode provides
* the status of the API and DynamicClusterHandlerPtr allows the caller to cancel if
* needed.
*/
virtual std::pair<ClusterResponseCode, DynamicClusterHandlerPtr>
addOrUpdateClusterCrossThread(const envoy::api::v2::Cluster& cluster,
const std::string& version_info,
PostClusterCreationCb post_cluster_cb) PURE;
/**
* Set a callback that will be invoked when all owned clusters have been initialized.
*/
Expand Down
2 changes: 2 additions & 0 deletions source/common/thread_local/thread_local_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ void InstanceImpl::runOnAllThreads(Event::PostCb cb, Event::PostCb all_threads_c
}
}

bool InstanceImpl::isMainThread() { return (std::this_thread::get_id() == main_thread_id_); }

void InstanceImpl::SlotImpl::set(InitializeCb cb) {
ASSERT(std::this_thread::get_id() == parent_.main_thread_id_);
ASSERT(!parent_.shutdown_);
Expand Down
2 changes: 2 additions & 0 deletions source/common/thread_local/thread_local_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class InstanceImpl : Logger::Loggable<Logger::Id::main>, public Instance {
void runOnAllThreads(Event::PostCb cb, Event::PostCb main_callback) override {
parent_.runOnAllThreads(cb, main_callback);
}
bool isMainThread() override { return parent_.isMainThread(); }
void set(InitializeCb cb) override;

InstanceImpl& parent_;
Expand All @@ -52,6 +53,7 @@ class InstanceImpl : Logger::Loggable<Logger::Id::main>, public Instance {
void removeSlot(SlotImpl& slot);
void runOnAllThreads(Event::PostCb cb);
void runOnAllThreads(Event::PostCb cb, Event::PostCb main_callback);
bool isMainThread();
static void setThreadLocal(uint32_t index, ThreadLocalObjectSharedPtr object);

static thread_local ThreadLocalData thread_local_data_;
Expand Down
176 changes: 128 additions & 48 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,25 @@ void ClusterManagerInitHelper::setInitializedCb(std::function<void()> callback)
}
}

DynamicClusterHandlerImpl::DynamicClusterHandlerImpl(
const std::string& cluster_name,
std::unordered_map<std::string, Envoy::Upstream::DynamicClusterHandlerPtr>& pending_clusters,
PostClusterCreationCb post_cluster_cb)
: cluster_name_(cluster_name), pending_clusters_(pending_clusters),
post_cluster_cb_(post_cluster_cb) {}

void DynamicClusterHandlerImpl::cancel() {
ENVOY_LOG(debug, "cancelling dynamic cluster creation callback for cluster {}", cluster_name_);
pending_clusters_.erase(cluster_name_);
}

void DynamicClusterHandlerImpl::onClusterCreationComplete() {
ENVOY_LOG(debug, "cluster {} creation complete. initiating post callback", cluster_name_);
if (pending_clusters_.erase(cluster_name_) != 0) {
post_cluster_cb_();
}
}

ClusterManagerImpl::ClusterManagerImpl(const envoy::config::bootstrap::v2::Bootstrap& bootstrap,
ClusterManagerFactory& factory, Stats::Store& stats,
ThreadLocal::Instance& tls, Runtime::Loader& runtime,
Expand All @@ -172,8 +191,9 @@ ClusterManagerImpl::ClusterManagerImpl(const envoy::config::bootstrap::v2::Boots
Event::Dispatcher& main_thread_dispatcher,
Server::Admin& admin)
: factory_(factory), runtime_(runtime), stats_(stats), tls_(tls.allocateSlot()),
random_(random), bind_config_(bootstrap.cluster_manager().upstream_bind_config()),
local_info_(local_info), cm_stats_(generateStats(stats)),
main_thread_dispatcher_(main_thread_dispatcher), random_(random),
bind_config_(bootstrap.cluster_manager().upstream_bind_config()), local_info_(local_info),
cm_stats_(generateStats(stats)),
init_helper_([this](Cluster& cluster) { onClusterInit(cluster); }),
config_tracker_entry_(
admin.getConfigTracker().add("clusters", [this] { return dumpClusterConfigs(); })) {
Expand Down Expand Up @@ -339,8 +359,46 @@ void ClusterManagerImpl::onClusterInit(Cluster& cluster) {
}
}

std::pair<ClusterResponseCode, DynamicClusterHandlerPtr>
ClusterManagerImpl::addOrUpdateClusterCrossThread(const envoy::api::v2::Cluster& cluster,
const std::string& version_info,
PostClusterCreationCb post_cluster_cb) {
ASSERT(!tls_->isMainThread());

bool should_create_cluster = true;
if (active_clusters_.find(cluster.name()) != active_clusters_.end()) {
ENVOY_LOG(trace, "cluster {} already exists", cluster.name());
should_create_cluster = false;
return std::make_pair(ClusterResponseCode::DuplicateCluster, nullptr);
}
if (cluster.type() != envoy::api::v2::Cluster::STATIC || cluster.hosts_size() < 1) {
ENVOY_LOG(warn, "dynamic cluster {} should be of type static and should have hosts",
cluster.name());
should_create_cluster = false;
return std::make_pair(ClusterResponseCode::NonStaticClusterNotAllowed, nullptr);
}

DynamicClusterHandlerPtr cluster_handler;
if (should_create_cluster) {
ThreadLocalClusterManagerImpl& cluster_manager =
tls_->getTyped<ThreadLocalClusterManagerImpl>();
cluster_handler = std::make_shared<DynamicClusterHandlerImpl>(
cluster.name(), cluster_manager.pending_cluster_creations_, post_cluster_cb);
cluster_manager.pending_cluster_creations_[cluster.name()] = cluster_handler;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if two different filters/requests are trying to create the cluster with the same name? What happens? I think this code is not going to work correctly as you are going to overwrite the other requests. Like the cross thread cae, we also need to handle multiple requests on the same thread.

Event::Dispatcher& thread_local_dispatcher = cluster_manager.thread_local_dispatcher_;
main_thread_dispatcher_.post([this, cluster, version_info, &thread_local_dispatcher]() -> void {
ENVOY_LOG(trace, "initating dynamic cluster {} creation", cluster.name());
pending_cluster_creations_[cluster.name()].push_back(thread_local_dispatcher);
addOrUpdateCluster(cluster, version_info);
});
}
return std::make_pair(ClusterResponseCode::Accepted, cluster_handler);
}

bool ClusterManagerImpl::addOrUpdateCluster(const envoy::api::v2::Cluster& cluster,
const std::string& version_info) {
ASSERT(tls_->isMainThread());

// First we need to see if this new config is new or an update to an existing dynamic cluster.
// We don't allow updates to statically configured clusters in the main configuration. We check
// both the warming clusters and the active clusters to see if we need an update or the update
Expand All @@ -349,60 +407,77 @@ bool ClusterManagerImpl::addOrUpdateCluster(const envoy::api::v2::Cluster& clust
const auto existing_active_cluster = active_clusters_.find(cluster_name);
const auto existing_warming_cluster = warming_clusters_.find(cluster_name);
const uint64_t new_hash = MessageUtil::hash(cluster);
bool cluster_create = true;
if ((existing_active_cluster != active_clusters_.end() &&
existing_active_cluster->second->blockUpdate(new_hash)) ||
(existing_warming_cluster != warming_clusters_.end() &&
existing_warming_cluster->second->blockUpdate(new_hash))) {
return false;
cluster_create = false;
}
if (cluster_create) {
if (existing_active_cluster != active_clusters_.end() ||
existing_warming_cluster != warming_clusters_.end()) {
// The following init manager remove call is a NOP in the case we are already initialized.
// It's just kept here to avoid additional logic.
init_helper_.removeCluster(*existing_active_cluster->second->cluster_);
cm_stats_.cluster_modified_.inc();
} else {
cm_stats_.cluster_added_.inc();
}

// There are two discrete paths here depending on when we are adding/updating a cluster.
// 1) During initial server load we use the init manager which handles complex logic related to
// primary/secondary init, static/CDS init, warming all clusters, etc.
// 2) After initial server load, we handle warming independently for each cluster in the warming
// map.
// Note: It's likely possible that all warming logic could be centralized in the init manager,
// but
// a decision was made to split the logic given how complex the init manager already is.
// In the future we may decide to undergo a refactor to unify the logic but the
// effort/risk to do that right now does not seem worth it given that the logic is
// generally pretty clean and easy to understand.
const bool use_active_map =
init_helper_.state() != ClusterManagerInitHelper::State::AllClustersInitialized;
loadCluster(cluster, version_info, true, use_active_map ? active_clusters_ : warming_clusters_);

if (use_active_map) {
ENVOY_LOG(info, "add/update cluster {} during init", cluster_name);
auto& cluster_entry = active_clusters_.at(cluster_name);
createOrUpdateThreadLocalCluster(*cluster_entry);
init_helper_.addCluster(*cluster_entry->cluster_);
} else {
auto& cluster_entry = warming_clusters_.at(cluster_name);
ENVOY_LOG(info, "add/update cluster {} starting warming", cluster_name);
cluster_entry->cluster_->initialize([this, cluster_name] {
auto warming_it = warming_clusters_.find(cluster_name);
auto& cluster_entry = *warming_it->second;
active_clusters_[cluster_name] = std::move(warming_it->second);
warming_clusters_.erase(warming_it);

ENVOY_LOG(info, "warming cluster {} complete", cluster_name);
createOrUpdateThreadLocalCluster(cluster_entry);
onClusterInit(*cluster_entry.cluster_);
updateGauges();
});
}

updateGauges();
}

if (existing_active_cluster != active_clusters_.end() ||
existing_warming_cluster != warming_clusters_.end()) {
// The following init manager remove call is a NOP in the case we are already initialized. It's
// just kept here to avoid additional logic.
init_helper_.removeCluster(*existing_active_cluster->second->cluster_);
cm_stats_.cluster_modified_.inc();
} else {
cm_stats_.cluster_added_.inc();
}

// There are two discrete paths here depending on when we are adding/updating a cluster.
// 1) During initial server load we use the init manager which handles complex logic related to
// primary/secondary init, static/CDS init, warming all clusters, etc.
// 2) After initial server load, we handle warming independently for each cluster in the warming
// map.
// Note: It's likely possible that all warming logic could be centralized in the init manager, but
// a decision was made to split the logic given how complex the init manager already is. In
// the future we may decide to undergo a refactor to unify the logic but the effort/risk to
// do that right now does not seem worth it given that the logic is generally pretty clean
// and easy to understand.
const bool use_active_map =
init_helper_.state() != ClusterManagerInitHelper::State::AllClustersInitialized;
loadCluster(cluster, version_info, true, use_active_map ? active_clusters_ : warming_clusters_);

if (use_active_map) {
ENVOY_LOG(info, "add/update cluster {} during init", cluster_name);
auto& cluster_entry = active_clusters_.at(cluster_name);
createOrUpdateThreadLocalCluster(*cluster_entry);
init_helper_.addCluster(*cluster_entry->cluster_);
} else {
auto& cluster_entry = warming_clusters_.at(cluster_name);
ENVOY_LOG(info, "add/update cluster {} starting warming", cluster_name);
cluster_entry->cluster_->initialize([this, cluster_name] {
auto warming_it = warming_clusters_.find(cluster_name);
auto& cluster_entry = *warming_it->second;
active_clusters_[cluster_name] = std::move(warming_it->second);
warming_clusters_.erase(warming_it);

ENVOY_LOG(info, "warming cluster {} complete", cluster_name);
createOrUpdateThreadLocalCluster(cluster_entry);
onClusterInit(*cluster_entry.cluster_);
updateGauges();
// post the cluster create completion message to all registered dispathers both in success and failure
// cases so that they can unblock themselves and proceed with the request flow.
for (Event::Dispatcher& dispatcher : pending_cluster_creations_[cluster_name]) {
dispatcher.post([this,cluster_name]()->void {
ThreadLocalClusterManagerImpl& cluster_manager =
tls_->getTyped<ThreadLocalClusterManagerImpl>();
if (cluster_manager.pending_cluster_creations_.find(cluster_name) != cluster_manager.pending_cluster_creations_.end()) {
auto handler = cluster_manager.pending_cluster_creations_[cluster_name];
handler->onClusterCreationComplete();
}
});
}

updateGauges();
return true;
pending_cluster_creations_.erase(cluster_name);
return cluster_create;
}

void ClusterManagerImpl::createOrUpdateThreadLocalCluster(ClusterData& cluster) {
Expand Down Expand Up @@ -775,6 +850,11 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::updateClusterMembership(
ENVOY_LOG(debug, "re-creating local LB for TLS cluster {}", name);
cluster_entry->lb_ = cluster_entry->lb_factory_->create();
}

if (config.pending_cluster_creations_.find(name) != config.pending_cluster_creations_.end()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If two threads race to create this cluster (which can easily happen if you build clusters in the request path), you are going to hang the thread that loses the race. You are still going to have to handle this somehow. Roughly what you need to do is register the per-thread intents on the main thread and make sure a "notify cluster creation" message goes out to all threads no matter what in this case. Please make sure you have a unit test for this. (In general I would like you to remove the Lua stuff from this PR for now. I realize Lua will add very nice integration tests, but for now I would prefer to get some solid unit tests that cover all the interleavings).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mattklein123 I agree and it is in my TODO list to handle two threads race case and two requests case. I just wanted to get the high level view first. I will handle those cases now. I will definitely add solid unit tests for the new API.
Regarding Lua, If you do not feel very strongly about removing Lua from this PR, I would prefer to keep it because

  1. It has already been coded and it provides very nice integration tests as you mentioned.
  2. We will need this , I will have to follow-up with another PR immediately after this.
    If it is ok, can we focus on Lua changes at the end once we are good with the API in the same PR if it is OK with you so that we can finish the integration tests as well in this. But I completely agree on having solid unit tests which I will do.

auto handler = config.pending_cluster_creations_.find(name)->second;
handler->onClusterCreationComplete();
}
}

void ClusterManagerImpl::ThreadLocalClusterManagerImpl::onHostHealthFailure(
Expand Down
26 changes: 26 additions & 0 deletions source/common/upstream/cluster_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,23 @@ struct ClusterManagerStats {
ALL_CLUSTER_MANAGER_STATS(GENERATE_COUNTER_STRUCT, GENERATE_GAUGE_STRUCT)
};

class DynamicClusterHandlerImpl : public DynamicClusterHandler,
Logger::Loggable<Logger::Id::upstream> {
public:
DynamicClusterHandlerImpl(
const std::string& cluster_name,
std::unordered_map<std::string, Envoy::Upstream::DynamicClusterHandlerPtr>& pending_clusters,
PostClusterCreationCb post_cluster_cb);
const std::string& cluster() const override { return cluster_name_; };
void cancel() override;
void onClusterCreationComplete() override;

private:
std::string cluster_name_;
std::unordered_map<std::string, Envoy::Upstream::DynamicClusterHandlerPtr>& pending_clusters_;
PostClusterCreationCb post_cluster_cb_;
};

/**
* Implementation of ClusterManager that reads from a proto configuration, maintains a central
* cluster list, as well as thread local caches of each cluster and associated connection pools.
Expand All @@ -155,6 +172,10 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u
// Upstream::ClusterManager
bool addOrUpdateCluster(const envoy::api::v2::Cluster& cluster,
const std::string& version_info) override;
std::pair<ClusterResponseCode, DynamicClusterHandlerPtr>
addOrUpdateClusterCrossThread(const envoy::api::v2::Cluster& cluster,
const std::string& version_info,
PostClusterCreationCb post_cluster_cb) override;
void setInitializedCb(std::function<void()> callback) override {
init_helper_.setInitializedCb(callback);
}
Expand Down Expand Up @@ -286,6 +307,8 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u
std::unordered_map<HostConstSharedPtr, TcpConnectionsMap> host_tcp_conn_map_;

std::list<Envoy::Upstream::ClusterUpdateCallbacks*> update_callbacks_;
std::unordered_map<std::string, Envoy::Upstream::DynamicClusterHandlerPtr>
pending_cluster_creations_;
const PrioritySet* local_priority_set_{};
};

Expand Down Expand Up @@ -344,6 +367,9 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u
Runtime::Loader& runtime_;
Stats::Store& stats_;
ThreadLocal::SlotPtr tls_;
Event::Dispatcher& main_thread_dispatcher_;
std::unordered_map<std::string, std::list<std::reference_wrapper<Event::Dispatcher>>>
pending_cluster_creations_;
Runtime::RandomGenerator& random_;
ClusterMap active_clusters_;
ClusterMap warming_clusters_;
Expand Down
Loading