Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

upstream: cluster manager api for dynamic cluster creation #3479

5 changes: 5 additions & 0 deletions include/envoy/thread_local/thread_local.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ class Slot {
*/
virtual void runOnAllThreads(Event::PostCb cb, Event::PostCb all_threads_complete_cb) PURE;

/**
* Returns true if the thread is main dispatcher otherwise false.
*/
virtual bool isMainThread() PURE;

/**
* Set thread local data on all threads previously registered via registerThread().
* @param initializeCb supplies the functor that will be called *on each thread*. The functor
Expand Down
45 changes: 45 additions & 0 deletions include/envoy/upstream/cluster_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,37 @@ class ClusterUpdateCallbacksHandle {

typedef std::unique_ptr<ClusterUpdateCallbacksHandle> ClusterUpdateCallbacksHandlePtr;

/**
* A cancellable handler returned to the caller who initiates the dynamic cluster creation on the
* request path.
*/
class DynamicClusterHandler {
public:
virtual ~DynamicClusterHandler() {}

/**
* @return Cluster that this handle is created for.
*/
virtual const std::string& cluster() const PURE;

/**
* Cancels the callback.
*/
virtual void cancel() PURE;

/**
* Called when cluster creation is complete.
*/
virtual void onClusterCreationComplete() PURE;
};

typedef std::shared_ptr<DynamicClusterHandler> DynamicClusterHandlerPtr;

/**
* Callback invoked when a cross thread cluster creation is complete.
*/
typedef std::function<void()> PostClusterCreationCb;

/**
* Manages connection pools and load balancing for upstream clusters. The cluster manager is
* persistent and shared among multiple ongoing requests/connections.
Expand All @@ -78,6 +109,20 @@ class ClusterManager {
virtual bool addOrUpdateCluster(const envoy::api::v2::Cluster& cluster,
const std::string& version_info) PURE;

/**
* Add a dynamic cluster via API on the request path. Currently this supports only STATIC type of
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about removal? Do we need to be thinking about TTL for added clusters? What about safeguards for cluster addition? Do we assume right now that whoever is calling this API must deal with resource usage safety similar to trusting the management server? For example, if a filter allows clusters to be created via untrusted headers, it's up to the filter to sanity check what it does? Can we add detailed comments on this assumptions and things we might want to do in the future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. I thought about removeCluster and here are my thoughts

  1. We should have some automated way of cleaning up these clusters with TTL or no requests for a period of time etc that can be supplied along with the cluster creation details in the API. Can we tie this to auto eviction based on some policies/rules that we have been discussing in the incremental xDS api? I see some overlap between the two but not sure how much it is.
  2. We can change the removeCluster api to work with worker threads and let the filter call it on need basis.
  3. Apply some configuration that allows only x number of clusters to be created like this on the request path.
    But for now for simplicity, I think we should assume that whoever is calling this API must deal with resource usage safety.
    If you agree with this, I would add these semantics to the docs/release notes as well.

* clusters.
*
* @param cluster supplies the cluster configuration.
* @param version_info supplies the xDS version of the cluster.
* @param post_cluster_cb supplies the call back that allows the request to continue after the
* cluster creation is done.
* @return DynamicClusterHandlerPtr that allows the caller to cancel if needed.
*/
virtual DynamicClusterHandlerPtr
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the API level, there should be some better way to determine if there was an error (e.g., cluster not STATIC) or if the cluster already exists. Can you think about a better way to do this? Perhaps you should return an std::pair<> with some type of status code and either a handle or nullptr? This is useful for filters but also for tests (you will need to add unit tests for cluster manager).

addOrUpdateClusterCrossThread(const envoy::api::v2::Cluster& cluster,
const std::string& version_info,
PostClusterCreationCb post_cluster_cb) PURE;
/**
* Set a callback that will be invoked when all owned clusters have been initialized.
*/
Expand Down
2 changes: 2 additions & 0 deletions source/common/thread_local/thread_local_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ void InstanceImpl::runOnAllThreads(Event::PostCb cb, Event::PostCb all_threads_c
}
}

bool InstanceImpl::isMainThread() { return (std::this_thread::get_id() == main_thread_id_); }

void InstanceImpl::SlotImpl::set(InitializeCb cb) {
ASSERT(std::this_thread::get_id() == parent_.main_thread_id_);
ASSERT(!parent_.shutdown_);
Expand Down
2 changes: 2 additions & 0 deletions source/common/thread_local/thread_local_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class InstanceImpl : Logger::Loggable<Logger::Id::main>, public Instance {
void runOnAllThreads(Event::PostCb cb, Event::PostCb main_callback) override {
parent_.runOnAllThreads(cb, main_callback);
}
bool isMainThread() override { return parent_.isMainThread(); }
void set(InitializeCb cb) override;

InstanceImpl& parent_;
Expand All @@ -52,6 +53,7 @@ class InstanceImpl : Logger::Loggable<Logger::Id::main>, public Instance {
void removeSlot(SlotImpl& slot);
void runOnAllThreads(Event::PostCb cb);
void runOnAllThreads(Event::PostCb cb, Event::PostCb main_callback);
bool isMainThread();
static void setThreadLocal(uint32_t index, ThreadLocalObjectSharedPtr object);

static thread_local ThreadLocalData thread_local_data_;
Expand Down
62 changes: 60 additions & 2 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,24 @@ void ClusterManagerInitHelper::setInitializedCb(std::function<void()> callback)
}
}

DynamicClusterHandlerImpl::DynamicClusterHandlerImpl(
const std::string& cluster_name,
std::unordered_map<std::string, Envoy::Upstream::DynamicClusterHandlerPtr>& pending_clusters,
PostClusterCreationCb post_cluster_cb)
: cluster_name_(cluster_name), pending_clusters_(pending_clusters),
post_cluster_cb_(post_cluster_cb) {}

void DynamicClusterHandlerImpl::cancel() {
ENVOY_LOG(debug, "cancelling dynamic cluster creation callback for cluster {}", cluster_name_);
pending_clusters_.erase(cluster_name_);
}

void DynamicClusterHandlerImpl::onClusterCreationComplete() {
ENVOY_LOG(debug, "cluster {} creation complete. initiating post callback", cluster_name_);
post_cluster_cb_();
pending_clusters_.erase(cluster_name_);
}

ClusterManagerImpl::ClusterManagerImpl(const envoy::config::bootstrap::v2::Bootstrap& bootstrap,
ClusterManagerFactory& factory, Stats::Store& stats,
ThreadLocal::Instance& tls, Runtime::Loader& runtime,
Expand All @@ -172,8 +190,9 @@ ClusterManagerImpl::ClusterManagerImpl(const envoy::config::bootstrap::v2::Boots
Event::Dispatcher& main_thread_dispatcher,
Server::Admin& admin)
: factory_(factory), runtime_(runtime), stats_(stats), tls_(tls.allocateSlot()),
random_(random), bind_config_(bootstrap.cluster_manager().upstream_bind_config()),
local_info_(local_info), cm_stats_(generateStats(stats)),
main_thread_dispatcher_(main_thread_dispatcher), random_(random),
bind_config_(bootstrap.cluster_manager().upstream_bind_config()), local_info_(local_info),
cm_stats_(generateStats(stats)),
init_helper_([this](Cluster& cluster) { onClusterInit(cluster); }),
config_tracker_entry_(
admin.getConfigTracker().add("clusters", [this] { return dumpClusterConfigs(); })) {
Expand Down Expand Up @@ -339,8 +358,42 @@ void ClusterManagerImpl::onClusterInit(Cluster& cluster) {
}
}

DynamicClusterHandlerPtr
ClusterManagerImpl::addOrUpdateClusterCrossThread(const envoy::api::v2::Cluster& cluster,
const std::string& version_info,
PostClusterCreationCb post_cluster_cb) {
ASSERT(!tls_->isMainThread());

bool should_create_cluster = true;
if (active_clusters_.find(cluster.name()) != active_clusters_.end()) {
ENVOY_LOG(trace, "cluster {} already exists", cluster.name());
should_create_cluster = false;
}
if (cluster.type() != envoy::api::v2::Cluster::STATIC || cluster.hosts_size() < 1) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checking cluster.hosts_size() < 1 here is a bit weird. Don't we do this check somewhere else? Isn't this a common check at both startup as well as for CDS? This brings up a larger problem which is, what if cluster creation fails when it goes to the main thread? You will need to post the failure back to the worker threads and handle that. That's related to my "lost the race" comment elsewhere.

ENVOY_LOG(warn, "dynamic cluster {} should be of type static and should have hosts",
cluster.name());
should_create_cluster = false;
}

DynamicClusterHandlerPtr cluster_handler;
if (should_create_cluster) {
ThreadLocalClusterManagerImpl& cluster_manager =
tls_->getTyped<ThreadLocalClusterManagerImpl>();
cluster_handler = std::make_shared<DynamicClusterHandlerImpl>(
cluster.name(), cluster_manager.pending_cluster_creations_, post_cluster_cb);
cluster_manager.pending_cluster_creations_[cluster.name()] = cluster_handler;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if two different filters/requests are trying to create the cluster with the same name? What happens? I think this code is not going to work correctly as you are going to overwrite the other requests. Like the cross thread cae, we also need to handle multiple requests on the same thread.

main_thread_dispatcher_.post([this, cluster, version_info]() -> void {
ENVOY_LOG(trace, "initating dynamic cluster {} creation", cluster.name());
addOrUpdateCluster(cluster, version_info);
});
}
return cluster_handler;
}

bool ClusterManagerImpl::addOrUpdateCluster(const envoy::api::v2::Cluster& cluster,
const std::string& version_info) {
ASSERT(tls_->isMainThread());

// First we need to see if this new config is new or an update to an existing dynamic cluster.
// We don't allow updates to statically configured clusters in the main configuration. We check
// both the warming clusters and the active clusters to see if we need an update or the update
Expand Down Expand Up @@ -775,6 +828,11 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::updateClusterMembership(
ENVOY_LOG(debug, "re-creating local LB for TLS cluster {}", name);
cluster_entry->lb_ = cluster_entry->lb_factory_->create();
}

if (config.pending_cluster_creations_.find(name) != config.pending_cluster_creations_.end()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If two threads race to create this cluster (which can easily happen if you build clusters in the request path), you are going to hang the thread that loses the race. You are still going to have to handle this somehow. Roughly what you need to do is register the per-thread intents on the main thread and make sure a "notify cluster creation" message goes out to all threads no matter what in this case. Please make sure you have a unit test for this. (In general I would like you to remove the Lua stuff from this PR for now. I realize Lua will add very nice integration tests, but for now I would prefer to get some solid unit tests that cover all the interleavings).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mattklein123 I agree and it is in my TODO list to handle two threads race case and two requests case. I just wanted to get the high level view first. I will handle those cases now. I will definitely add solid unit tests for the new API.
Regarding Lua, If you do not feel very strongly about removing Lua from this PR, I would prefer to keep it because

  1. It has already been coded and it provides very nice integration tests as you mentioned.
  2. We will need this , I will have to follow-up with another PR immediately after this.
    If it is ok, can we focus on Lua changes at the end once we are good with the API in the same PR if it is OK with you so that we can finish the integration tests as well in this. But I completely agree on having solid unit tests which I will do.

auto handler = config.pending_cluster_creations_.find(name)->second;
handler->onClusterCreationComplete();
}
}

void ClusterManagerImpl::ThreadLocalClusterManagerImpl::onHostHealthFailure(
Expand Down
25 changes: 25 additions & 0 deletions source/common/upstream/cluster_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <array>
#include <cstdint>
#include <functional>
#include <future>
#include <list>
#include <map>
#include <memory>
Expand Down Expand Up @@ -139,6 +140,23 @@ struct ClusterManagerStats {
ALL_CLUSTER_MANAGER_STATS(GENERATE_COUNTER_STRUCT, GENERATE_GAUGE_STRUCT)
};

class DynamicClusterHandlerImpl : public DynamicClusterHandler,
Logger::Loggable<Logger::Id::upstream> {
public:
DynamicClusterHandlerImpl(
const std::string& cluster_name,
std::unordered_map<std::string, Envoy::Upstream::DynamicClusterHandlerPtr>& pending_clusters,
PostClusterCreationCb post_cluster_cb);
const std::string& cluster() const override { return cluster_name_; };
void cancel() override;
void onClusterCreationComplete() override;

private:
std::string cluster_name_;
std::unordered_map<std::string, Envoy::Upstream::DynamicClusterHandlerPtr>& pending_clusters_;
PostClusterCreationCb post_cluster_cb_;
};

/**
* Implementation of ClusterManager that reads from a proto configuration, maintains a central
* cluster list, as well as thread local caches of each cluster and associated connection pools.
Expand All @@ -155,6 +173,10 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u
// Upstream::ClusterManager
bool addOrUpdateCluster(const envoy::api::v2::Cluster& cluster,
const std::string& version_info) override;
DynamicClusterHandlerPtr
addOrUpdateClusterCrossThread(const envoy::api::v2::Cluster& cluster,
const std::string& version_info,
PostClusterCreationCb post_cluster_cb) override;
void setInitializedCb(std::function<void()> callback) override {
init_helper_.setInitializedCb(callback);
}
Expand Down Expand Up @@ -286,6 +308,8 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u
std::unordered_map<HostConstSharedPtr, TcpConnectionsMap> host_tcp_conn_map_;

std::list<Envoy::Upstream::ClusterUpdateCallbacks*> update_callbacks_;
std::unordered_map<std::string, Envoy::Upstream::DynamicClusterHandlerPtr>
pending_cluster_creations_;
const PrioritySet* local_priority_set_{};
};

Expand Down Expand Up @@ -344,6 +368,7 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u
Runtime::Loader& runtime_;
Stats::Store& stats_;
ThreadLocal::SlotPtr tls_;
Event::Dispatcher& main_thread_dispatcher_;
Runtime::RandomGenerator& random_;
ClusterMap active_clusters_;
ClusterMap warming_clusters_;
Expand Down
49 changes: 42 additions & 7 deletions source/extensions/filters/http/lua/lua_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ StreamHandleWrapper::StreamHandleWrapper(Filters::Common::Lua::CoroutinePtr&& co
Http::FilterHeadersStatus StreamHandleWrapper::start(int function_ref) {
// We are on the top of the stack.
coroutine_->start(function_ref, 1, yield_callback_);
Http::FilterHeadersStatus status =
(state_ == State::WaitForBody || state_ == State::HttpCall || state_ == State::Responded)
? Http::FilterHeadersStatus::StopIteration
: Http::FilterHeadersStatus::Continue;
Http::FilterHeadersStatus status = (state_ == State::WaitForBody || state_ == State::HttpCall ||
state_ == State::Responded || state_ == State::DynamicCluster)
? Http::FilterHeadersStatus::StopIteration
: Http::FilterHeadersStatus::Continue;

if (status == Http::FilterHeadersStatus::Continue) {
headers_continued_ = true;
Expand Down Expand Up @@ -62,6 +62,9 @@ Http::FilterDataStatus StreamHandleWrapper::onData(Buffer::Instance& data, bool
if (state_ == State::HttpCall || state_ == State::WaitForBody) {
ENVOY_LOG(trace, "buffering body");
return Http::FilterDataStatus::StopIterationAndBuffer;
} else if (state_ == State::DynamicCluster) {
ENVOY_LOG(trace, "not buffering body");
return Http::FilterDataStatus::StopIterationAndBuffer;
} else if (state_ == State::Responded) {
return Http::FilterDataStatus::StopIterationNoBuffer;
} else {
Expand Down Expand Up @@ -91,9 +94,10 @@ Http::FilterTrailersStatus StreamHandleWrapper::onTrailers(Http::HeaderMap& trai
coroutine_->resume(luaTrailers(coroutine_->luaState()), yield_callback_);
}

Http::FilterTrailersStatus status = (state_ == State::HttpCall || state_ == State::Responded)
? Http::FilterTrailersStatus::StopIteration
: Http::FilterTrailersStatus::Continue;
Http::FilterTrailersStatus status =
(state_ == State::HttpCall || state_ == State::Responded || state_ == State::DynamicCluster)
? Http::FilterTrailersStatus::StopIteration
: Http::FilterTrailersStatus::Continue;

if (status == Http::FilterTrailersStatus::Continue) {
headers_continued_ = true;
Expand Down Expand Up @@ -200,6 +204,37 @@ int StreamHandleWrapper::luaHttpCall(lua_State* state) {
}
}

int StreamHandleWrapper::luaAddorUpdateCluster(lua_State* state) {
ASSERT(state_ == State::Running);
const std::string cluster_template_yaml = luaL_checkstring(state, 2);
envoy::api::v2::Cluster cluster;
MessageUtil::loadFromYaml(cluster_template_yaml, cluster);
cluster_handler_ =
filter_.clusterManager().addOrUpdateClusterCrossThread(cluster, "", [this]() -> void {
if (state_ == State::DynamicCluster) {
state_ = State::Running;
markLive();
try {
coroutine_->resume(0, yield_callback_);
markDead();
} catch (const Filters::Common::Lua::LuaException& e) {
filter_.scriptError(e);
}

if (state_ == State::Running) {
headers_continued_ = true;
callbacks_.continueIteration();
}
}
});
if (cluster_handler_) {
state_ = State::DynamicCluster;
return lua_yield(state, 0);
} else {
return 1;
}
}

void StreamHandleWrapper::onSuccess(Http::MessagePtr&& response) {
ASSERT(state_ == State::HttpCall || state_ == State::Running);
ENVOY_LOG(debug, "async HTTP response complete");
Expand Down
37 changes: 30 additions & 7 deletions source/extensions/filters/http/lua/lua_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ class StreamHandleWrapper : public Filters::Common::Lua::BaseLuaObject<StreamHan
WaitForTrailers,
// Lua script is blocked waiting for the result of an HTTP call.
HttpCall,
// Lua script is blocked waiting for the result of Dynamic Cluster creation call.
DynamicCluster,
// Lua script has done a direct response.
Responded
};
Expand All @@ -113,16 +115,27 @@ class StreamHandleWrapper : public Filters::Common::Lua::BaseLuaObject<StreamHan
http_request_->cancel();
http_request_ = nullptr;
}
if (cluster_handler_) {
cluster_handler_->cancel();
cluster_handler_ = nullptr;
}
}

static ExportedFunctions exportedFunctions() {
return {{"headers", static_luaHeaders}, {"body", static_luaBody},
{"bodyChunks", static_luaBodyChunks}, {"trailers", static_luaTrailers},
{"metadata", static_luaMetadata}, {"logTrace", static_luaLogTrace},
{"logDebug", static_luaLogDebug}, {"logInfo", static_luaLogInfo},
{"logWarn", static_luaLogWarn}, {"logErr", static_luaLogErr},
{"logCritical", static_luaLogCritical}, {"httpCall", static_luaHttpCall},
{"respond", static_luaRespond}};
return {{"headers", static_luaHeaders},
{"body", static_luaBody},
{"bodyChunks", static_luaBodyChunks},
{"trailers", static_luaTrailers},
{"metadata", static_luaMetadata},
{"logTrace", static_luaLogTrace},
{"logDebug", static_luaLogDebug},
{"logInfo", static_luaLogInfo},
{"logWarn", static_luaLogWarn},
{"logErr", static_luaLogErr},
{"logCritical", static_luaLogCritical},
{"httpCall", static_luaHttpCall},
{"respond", static_luaRespond},
{"addOrUpdateCluster", static_luaAddorUpdateCluster}};
}

private:
Expand Down Expand Up @@ -176,6 +189,15 @@ class StreamHandleWrapper : public Filters::Common::Lua::BaseLuaObject<StreamHan
*/
DECLARE_LUA_FUNCTION(StreamHandleWrapper, luaMetadata);

/**
* @return a handle to create a cluster on the request path.
*/
/**
* Create a cluster on the request path. This call yields the script till cluster is created.
* @param 1 (string): Yaml representation of Cluster definition as per v2 APIs.
*/
DECLARE_LUA_FUNCTION(StreamHandleWrapper, luaAddorUpdateCluster);

/**
* Log a message to the Envoy log.
* @param 1 (string): The log message.
Expand Down Expand Up @@ -224,6 +246,7 @@ class StreamHandleWrapper : public Filters::Common::Lua::BaseLuaObject<StreamHan
State state_{State::Running};
std::function<void()> yield_callback_;
Http::AsyncClient::Request* http_request_{};
Upstream::DynamicClusterHandlerPtr cluster_handler_;
};

/**
Expand Down
Loading