Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

upstream: cluster manager api for dynamic cluster creation #3479

Conversation

ramaraochavali
Copy link
Contributor

Signed-off-by: Rama rama.rao@salesforce.com

Description:
Enhances the addOrUpdateCluster api so that it can be called from worker thread there by providing the ability to create the cluster on the request path.

Risk Level: Low
Testing: Automated tests and manual testing
Docs Changes: N/A
Release Notes: N/A

Signed-off-by: Rama <rama.rao@salesforce.com>
@ramaraochavali
Copy link
Contributor Author

@mattklein123 cluster manager api. Can you PTAL?

Thread::CondVar tls_update_;
Thread::MutexBasicLockable tls_lock_;
Thread::ReleasableLockGuard lock(tls_lock_);
main_thread_dispatcher_.post([this, &thread_local_clustercb, &tls_update_]() -> void {
Copy link
Contributor Author

@ramaraochavali ramaraochavali May 24, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I tried to wait till all worker's are finished with another variant of runAllThreads but it does not work because this thread would have a lock and post would not be able to enter this. Any ideas on how we can wait on all worker threads to finish? But this seems to work fine, tested from Lua.

Also, thinking aloud here, why should we wait till workers are done? We do not do that in the main thread when CDS updates come with a new cluster? Is it not sufficient here to trigger updates on all threads and ensure the current thread has it (though it gets the message twice) instead of waiting till all threads are done? it may be reasonable compromise at the cost of duplicate message. WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also any guidance on how I can test this in a unit test? How do I call this from a worker thread? Is there any sample that i can look at?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to step back and discuss the overall design for this feature. Here is how I would recommend we go about implementing it:

virtual bool addOrUpdateCluster(const envoy::api::v2::Cluster& cluster,
const std::string& version_info) PURE;

At this API level, I would add a new function called addOrUpdateClusterCrossThread. This idea is that this function can only be called from worker threads. addOrUpdateCluster can only be called from the main thread. We should verify this using asserts.

Essentially, at an API level, we are going to make the caller deal with the fact that if they create a cluster off of the main thread they will need to deal with a continuation for when the cluster actually exists. Note, this is not as simple as passing a lambda that gets called (though this will be part of the call signature). The reason for this is that the callback must be cancelable. This is the reason it will be easier to have a new function, because instead of returning void, the cross thread version should return a handle to the pending cluster creation. This handle should have a cancel() function to cover the case where for example a filter tries to create a cluster but the request is reset/destroyed before cluster creation is complete.

For the actually cross-thread cluster creation, here is how it should work:

  1. Register intent to create the cluster on the worker thread (save continuation, return a handle that can be canceled, etc.). You will need to think about corner cases such as the user creating a cluster that already exists. Maybe others.
  2. Post a message to the main thread to create the cluster, passing all the data required to post back to the worker thread that initiated the call.
  3. Create the cluster on the main thread (dealing with the fact that it might already exist).
  4. Either the creation will post messages to all worker threads, or if it already exists, we must post a message back to the worker thread that initiated the call to unblock it.
  5. On the worker thread, deal with unblocking any waiting creators (and of course if they canceled don't worry about it).

Hopefully that gives you enough to go on at a high level.

In terms of testing, I think probably testing this fully with unit tests will be a little difficult. There are two options:

  1. Actually do some variant of cluster manager tests that use multiple threads.
  2. Implement the Lua API as part of this change, which I think would not be that difficult (since Lua can just pass YAML, convert to proto, etc.), and then write Lua integration tests.

Copy link
Contributor Author

@ramaraochavali ramaraochavali May 25, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mattklein123 Thanks for the details. I have looked at it very briefly today and if IIUC this is what you are proposing (Some clarifications below)

  1. Is this how you are thinking that new method would look like?
ClusterFuturePtr ClusterManagerImpl::addOrUpdateClusterCrossThread(const envoy::api::v2::Cluster& cluster,
                                  const std::string& version_info,PostClusterCreationCb post_cluster_cb) {
// Check for duplicates. If exists do not do any thing
  std::async([this,&cluster,version_info,&post_cluster_cb]() { 
    main_thread_dispatcher_.post([ &cluster, version_info]()->void {
           // Write the logic of creating cluster here ..Specially 2,3,4 in your proposal.
    });
  }
 });
 return std::make_unique<ClusterFutureImpl>(cluster_name);
}

Are you thinking that we should run the main logic in std::async as shown above and return the handle?
ClusterFuture I am thinking is essentially the handle you are talking about. It will have a cancel method which caller would call.

  1. What should the cancel of handle expected to do? Should it remove the cluster?
  2. How do I get the dispatcher of the current worker thread so that it can be passed main thread which can post back when done?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, not std::async. You should be using dispatcher post. You need to post to main thread, create the cluster, and then post back to all the workers. Each worker effectively needs to keep a list of pending cluster creation requests that will eventually be satisfied.

There are a few different ways of doing this, but basically you will create an object, insert it into a list, and return a handle which if destroyed/cancelled, removes the thing from the list. We have a few different examples of this in the code base. Have a look at: https://github.com/envoyproxy/envoy/blob/master/source/common/http/http1/conn_pool.h#L107, https://github.com/envoyproxy/envoy/blob/master/source/common/common/callback_impl.h, https://github.com/envoyproxy/envoy/blob/master/source/server/http/config_tracker_impl.h, etc.

Copy link
Contributor Author

@ramaraochavali ramaraochavali May 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Thanks. Let me repeat what I understand so that there is no confusion.

  1. When a worker thread calls the new API, it will create a call back object (that has cancel method), store the PostClusterCreationCb, insert it it in to a pending cluster creation list.
  2. The PostClusterCreationCb holds the code to continue the request flow.
  3. Post the message to main thread with the details of cluster creation - which would post back to worker threads. When the post back comes to the worker thread, we will create thread local cluster and see if there is a pending request (PostClusterCreationCb) and call it - so that ensures the request flow is continued after the cluster creation. So this way the worker thread is not blocked on this request - is that the intent? I was thinking that the worker thread need to block till the cluster creation happens after calling that api in the filter. So if it has to block, the new api should block and return only once the cluster creation is done. Looks like that is not the case. It seems that it would similar to how luaHttpCall works from lua filter perspective. It initiates the cluster creation call (like http) and once it is done and a callback is called which would proceed with the request. Please correct me here if my understanding is incorrect..
  4. When we return the Handle to the caller, in which case the continuation is triggered when the cluster creation on the worker is done - however if the request is reset, the caller would use the handle to cancel which would delete the object from the list so the continuation call back is never fired.
    Is that correct understanding? Sorry - My knowledge on the lower level request flow in Envoy is pretty poor. That is why asking so many questions. Thanks much for your patience :-)

Copy link
Member

@mattklein123 mattklein123 May 29, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, that's exactly right. Thank you for checking. :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok.cool. I am on PTO most of the week. I will make changes as soon as I am back.

@mattklein123 mattklein123 self-assigned this May 24, 2018
Signed-off-by: Rama <rama.rao@salesforce.com>
Signed-off-by: Rama <rama.rao@salesforce.com>
Signed-off-by: Rama <rama.rao@salesforce.com>
Signed-off-by: Rama <rama.rao@salesforce.com>
Signed-off-by: Rama <rama.rao@salesforce.com>
Signed-off-by: Rama <rama.rao@salesforce.com>
Signed-off-by: Rama <rama.rao@salesforce.com>
Signed-off-by: Rama <rama.rao@salesforce.com>
Signed-off-by: Rama <rama.rao@salesforce.com>
Signed-off-by: Rama <rama.rao@salesforce.com>
@ramaraochavali
Copy link
Contributor Author

@mattklein123 implemented the new API as per our discussion. Also added Lua API filter as per your suggestion for testing. Added tests that test the cluster creation on request path using Lua integration tests. Can you PTAL?

Copy link
Member

@mattklein123 mattklein123 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ramaraochavali thanks for working on this. I think this is going to be the base of a bunch of important features in the future. I left some high level comments that we can start with and end then we can iterate.

@@ -78,6 +109,20 @@ class ClusterManager {
virtual bool addOrUpdateCluster(const envoy::api::v2::Cluster& cluster,
const std::string& version_info) PURE;

/**
* Add a dynamic cluster via API on the request path. Currently this supports only STATIC type of
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about removal? Do we need to be thinking about TTL for added clusters? What about safeguards for cluster addition? Do we assume right now that whoever is calling this API must deal with resource usage safety similar to trusting the management server? For example, if a filter allows clusters to be created via untrusted headers, it's up to the filter to sanity check what it does? Can we add detailed comments on this assumptions and things we might want to do in the future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. I thought about removeCluster and here are my thoughts

  1. We should have some automated way of cleaning up these clusters with TTL or no requests for a period of time etc that can be supplied along with the cluster creation details in the API. Can we tie this to auto eviction based on some policies/rules that we have been discussing in the incremental xDS api? I see some overlap between the two but not sure how much it is.
  2. We can change the removeCluster api to work with worker threads and let the filter call it on need basis.
  3. Apply some configuration that allows only x number of clusters to be created like this on the request path.
    But for now for simplicity, I think we should assume that whoever is calling this API must deal with resource usage safety.
    If you agree with this, I would add these semantics to the docs/release notes as well.

* cluster creation is done.
* @return DynamicClusterHandlerPtr that allows the caller to cancel if needed.
*/
virtual DynamicClusterHandlerPtr
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the API level, there should be some better way to determine if there was an error (e.g., cluster not STATIC) or if the cluster already exists. Can you think about a better way to do this? Perhaps you should return an std::pair<> with some type of status code and either a handle or nullptr? This is useful for filters but also for tests (you will need to add unit tests for cluster manager).

@@ -775,6 +828,11 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::updateClusterMembership(
ENVOY_LOG(debug, "re-creating local LB for TLS cluster {}", name);
cluster_entry->lb_ = cluster_entry->lb_factory_->create();
}

if (config.pending_cluster_creations_.find(name) != config.pending_cluster_creations_.end()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If two threads race to create this cluster (which can easily happen if you build clusters in the request path), you are going to hang the thread that loses the race. You are still going to have to handle this somehow. Roughly what you need to do is register the per-thread intents on the main thread and make sure a "notify cluster creation" message goes out to all threads no matter what in this case. Please make sure you have a unit test for this. (In general I would like you to remove the Lua stuff from this PR for now. I realize Lua will add very nice integration tests, but for now I would prefer to get some solid unit tests that cover all the interleavings).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mattklein123 I agree and it is in my TODO list to handle two threads race case and two requests case. I just wanted to get the high level view first. I will handle those cases now. I will definitely add solid unit tests for the new API.
Regarding Lua, If you do not feel very strongly about removing Lua from this PR, I would prefer to keep it because

  1. It has already been coded and it provides very nice integration tests as you mentioned.
  2. We will need this , I will have to follow-up with another PR immediately after this.
    If it is ok, can we focus on Lua changes at the end once we are good with the API in the same PR if it is OK with you so that we can finish the integration tests as well in this. But I completely agree on having solid unit tests which I will do.

ENVOY_LOG(trace, "cluster {} already exists", cluster.name());
should_create_cluster = false;
}
if (cluster.type() != envoy::api::v2::Cluster::STATIC || cluster.hosts_size() < 1) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checking cluster.hosts_size() < 1 here is a bit weird. Don't we do this check somewhere else? Isn't this a common check at both startup as well as for CDS? This brings up a larger problem which is, what if cluster creation fails when it goes to the main thread? You will need to post the failure back to the worker threads and handle that. That's related to my "lost the race" comment elsewhere.

tls_->getTyped<ThreadLocalClusterManagerImpl>();
cluster_handler = std::make_shared<DynamicClusterHandlerImpl>(
cluster.name(), cluster_manager.pending_cluster_creations_, post_cluster_cb);
cluster_manager.pending_cluster_creations_[cluster.name()] = cluster_handler;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if two different filters/requests are trying to create the cluster with the same name? What happens? I think this code is not going to work correctly as you are going to overwrite the other requests. Like the cross thread cae, we also need to handle multiple requests on the same thread.

@rshriram
Copy link
Member

rshriram commented Jun 5, 2018

What is the expected size of these static clusters? Is it just one single host? IOW, is your use case about dynamically routing to a host extracted from HTTP request headers?

@ramaraochavali
Copy link
Contributor Author

@rshriram yes. In our use case it is just single host. You are correct, our use case is just about dynamically routing to a host extracted from request headers.

Signed-off-by: Rama <rama.rao@salesforce.com>
Signed-off-by: Rama <rama.rao@salesforce.com>
@ramaraochavali
Copy link
Contributor Author

@mattklein123
Update: Implemented cross thread race condition fixes, api return value changed, error conditions handled.
Still need to fix :

  • Single thread multi request case - need to see how to capture the second request. Still thinking about it
  • Unit tests
    Will work on these two tomorrow.

@rshriram
Copy link
Member

rshriram commented Jun 5, 2018

If its just about routing to a single host extracted from the request, why not repurpose the original destination cluster implementation ? The orig_dst_cluster currently forwards the request to the host requested by the client, when the traffic is trapped by IP tables redirect and forwarded to envoy. WE use it a lot in Istio. The use case you have is very similar, except the host is obtained from HTTP headers, instead of the kernel. The original dst cluster already has some minimal form of garbage collection to eliminate unused hosts, etc.

If you literally add an API to the original dst cluster to overwrite the host from a filter, then you could cut down on most of this code.

@mattklein123
Copy link
Member

For the single host case, I agree with @rshriram that the basis use case could be done with the original_dst filter if you know the IP address of the backend. @ramaraochavali do you want to go that direction? I think the only changes there are likely to grab the target host from a header inside the LB context.

However, orthogonally, I still want code like this, as it is generally useful for other cases like generic outbound proxy, cluster lazy load (like what @nt needs), etc. So I'm fine continuing down this path also for the general use case. However, @ramaraochavali per @rshriram I don't think it's necessary for your use case.

@mattklein123
Copy link
Member

@ramaraochavali
Copy link
Contributor Author

@rshriram @mattklein123 thanks for your suggestion. I just tried it with original dst cluster and it seems simpler there. Please see the PR #3557 with the changes. If the implementation is OK, I will add tests and docs there.

@mattklein123 For our use case since original dst seems to be simpler, I would go with it to unblock our use case quickly so that we can get to production sooner. However, since I have done most of the work on the dynamic cluster creation, I am happy to continue to work with you and iterate on this if that is OK with you whenever I have free cycles.

@mattklein123
Copy link
Member

@ramaraochavali I won't have time to work on this for a while. I made a comment about this PR in #2500 and I would suggest we close this for now and come back to it when there is more time to work on it? We can use this PR as a base for when we start again?

@ramaraochavali
Copy link
Contributor Author

@mattklein123 no issues. Closing this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants