-
Notifications
You must be signed in to change notification settings - Fork 4.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
reduce number of srds update callbacks #12118
Changes from 11 commits
90526e2
0e31325
adeeefe
db1875a
6b86a52
4b894c4
203370c
be714f3
4aee477
13b2fd3
8fe51ae
6646179
8ee4dfd
6135e4a
7caa6aa
36c691d
16a62d3
250158e
5ab8734
65cef2b
ec50df3
ab4ba97
6e08e2d
c937f08
48ee8a7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,7 @@ | |
#include "common/init/manager_impl.h" | ||
#include "common/init/watcher_impl.h" | ||
#include "common/router/rds_impl.h" | ||
#include "common/router/scoped_config_impl.h" | ||
|
||
#include "absl/strings/str_join.h" | ||
|
||
|
@@ -137,64 +138,52 @@ ScopedRdsConfigSubscription::RdsRouteConfigProviderHelper::RdsRouteConfigProvide | |
bool ScopedRdsConfigSubscription::addOrUpdateScopes( | ||
const std::vector<Envoy::Config::DecodedResourceRef>& resources, Init::Manager& init_manager, | ||
const std::string& version_info, std::vector<std::string>& exception_msgs) { | ||
ASSERT(exception_msgs.empty()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no need to, this parameter is not used in this method now, remove it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, and we don't need any_applied anymore if addOrUpdateScopes() doesn't do validation. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think anyapplied is still required to give an signal to whether call the setLastConfigInfo method. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes |
||
bool any_applied = false; | ||
envoy::extensions::filters::network::http_connection_manager::v3::Rds rds; | ||
rds.mutable_config_source()->MergeFrom(rds_config_source_); | ||
absl::flat_hash_set<std::string> unique_resource_names; | ||
std::vector<ScopedRouteInfoConstSharedPtr> updated_scopes; | ||
for (const auto& resource : resources) { | ||
try { | ||
// Explicit copy so that we can std::move later. | ||
envoy::config::route::v3::ScopedRouteConfiguration scoped_route_config = | ||
dynamic_cast<const envoy::config::route::v3::ScopedRouteConfiguration&>( | ||
resource.get().resource()); | ||
const std::string scope_name = scoped_route_config.name(); | ||
if (!unique_resource_names.insert(scope_name).second) { | ||
throw EnvoyException( | ||
fmt::format("duplicate scoped route configuration '{}' found", scope_name)); | ||
} | ||
// TODO(stevenzzz): Creating a new RdsRouteConfigProvider likely expensive, migrate RDS to | ||
// config-provider-framework to make it light weight. | ||
rds.set_route_config_name(scoped_route_config.route_configuration_name()); | ||
auto rds_config_provider_helper = | ||
std::make_unique<RdsRouteConfigProviderHelper>(*this, scope_name, rds, init_manager); | ||
auto scoped_route_info = std::make_shared<ScopedRouteInfo>( | ||
std::move(scoped_route_config), rds_config_provider_helper->routeConfig()); | ||
// Detect if there is key conflict between two scopes, in which case Envoy won't be able to | ||
// tell which RouteConfiguration to use. Reject the second scope in the delta form API. | ||
auto iter = scope_name_by_hash_.find(scoped_route_info->scopeKey().hash()); | ||
if (iter != scope_name_by_hash_.end()) { | ||
if (iter->second != scoped_route_info->scopeName()) { | ||
throw EnvoyException( | ||
fmt::format("scope key conflict found, first scope is '{}', second scope is '{}'", | ||
iter->second, scoped_route_info->scopeName())); | ||
} | ||
} | ||
// NOTE: delete previous route provider if any. | ||
route_provider_by_scope_.insert({scope_name, std::move(rds_config_provider_helper)}); | ||
scope_name_by_hash_[scoped_route_info->scopeKey().hash()] = scoped_route_info->scopeName(); | ||
scoped_route_map_[scoped_route_info->scopeName()] = scoped_route_info; | ||
applyConfigUpdate([scoped_route_info](ConfigProvider::ConfigConstSharedPtr config) | ||
-> ConfigProvider::ConfigConstSharedPtr { | ||
auto* thread_local_scoped_config = | ||
const_cast<ScopedConfigImpl*>(static_cast<const ScopedConfigImpl*>(config.get())); | ||
thread_local_scoped_config->addOrUpdateRoutingScope(scoped_route_info); | ||
return config; | ||
}); | ||
any_applied = true; | ||
ENVOY_LOG(debug, "srds: add/update scoped_route '{}', version: {}", | ||
scoped_route_info->scopeName(), version_info); | ||
} catch (const EnvoyException& e) { | ||
exception_msgs.emplace_back(absl::StrCat("", e.what())); | ||
} | ||
// Explicit copy so that we can std::move later. | ||
envoy::config::route::v3::ScopedRouteConfiguration scoped_route_config = | ||
dynamic_cast<const envoy::config::route::v3::ScopedRouteConfiguration&>( | ||
resource.get().resource()); | ||
const std::string scope_name = scoped_route_config.name(); | ||
// TODO(stevenzzz): Creating a new RdsRouteConfigProvider likely expensive, migrate RDS to | ||
// config-provider-framework to make it light weight. | ||
rds.set_route_config_name(scoped_route_config.route_configuration_name()); | ||
auto rds_config_provider_helper = | ||
std::make_unique<RdsRouteConfigProviderHelper>(*this, scope_name, rds, init_manager); | ||
auto scoped_route_info = std::make_shared<ScopedRouteInfo>( | ||
std::move(scoped_route_config), rds_config_provider_helper->routeConfig()); | ||
// NOTE: delete previous route provider if any. | ||
route_provider_by_scope_.insert({scope_name, std::move(rds_config_provider_helper)}); | ||
scope_name_by_hash_[scoped_route_info->scopeKey().hash()] = scoped_route_info->scopeName(); | ||
scoped_route_map_[scoped_route_info->scopeName()] = scoped_route_info; | ||
updated_scopes.push_back(scoped_route_info); | ||
any_applied = true; | ||
ENVOY_LOG(debug, "srds: add/update scoped_route '{}', version: {}", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. out of date comment, follow Andres' suggestion here as well. |
||
scoped_route_info->scopeName(), version_info); | ||
} | ||
|
||
if (!updated_scopes.empty()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note that this changes the logic such that partial application of an SRDS config update is no longer possible. For example, if there is a duplicate scoped route, all preceding routes would have been applied prior to this change. I actually think the new behavior is preferable; I suggest adding a test to explicitly validate this logic. FYI @stevenzzzz There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, currently the srds uses SotW. A test to validate that partial update is not accepted will be good. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think Andres is saying in delta XDS within a response handling, we could have a portion of the list applied, as the previous scope info is distributed to workers. We need to get this behavior tested. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Even for a regular SOTW config update, the existing logic could partially apply config since There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for the SotW api, the two errors has been checked before the Delta interface is called. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point, thanks Xin. This only applies to delta updates. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So if the new change is applied, in delta api, partial updates is accepted in main thread when exception happens, but the partial updates are not applied to worker threads. We want the update to be fully rejected in both main thread and worker thread, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe we can store all the incoming scoped_route_config in a vector, pass the reference of this vector to a helper function to detect any scope key or scope name conflict. If there is any conflict, no change will be applied, otherwise all changes will be applied by creating corresponding RdsRouteConfigProviderHelper and push scope_route_info into the maps. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A helper function has been added to detect key and scope name conflict. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thanks |
||
applyConfigUpdate([updated_scopes](ConfigProvider::ConfigConstSharedPtr config) | ||
-> ConfigProvider::ConfigConstSharedPtr { | ||
auto* thread_local_scoped_config = | ||
const_cast<ScopedConfigImpl*>(static_cast<const ScopedConfigImpl*>(config.get())); | ||
thread_local_scoped_config->addOrUpdateRoutingScopes(updated_scopes); | ||
return config; | ||
}); | ||
} | ||
return any_applied; | ||
} | ||
} // namespace Router | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. remove? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sure, I will remove it. |
||
|
||
std::list<ScopedRdsConfigSubscription::RdsRouteConfigProviderHelperPtr> | ||
ScopedRdsConfigSubscription::removeScopes( | ||
const Protobuf::RepeatedPtrField<std::string>& scope_names, const std::string& version_info) { | ||
std::list<ScopedRdsConfigSubscription::RdsRouteConfigProviderHelperPtr> | ||
to_be_removed_rds_providers; | ||
std::vector<std::string> removed_scope_names; | ||
for (const auto& scope_name : scope_names) { | ||
auto iter = scoped_route_map_.find(scope_name); | ||
if (iter != scoped_route_map_.end()) { | ||
|
@@ -206,16 +195,19 @@ ScopedRdsConfigSubscription::removeScopes( | |
} | ||
scope_name_by_hash_.erase(iter->second->scopeKey().hash()); | ||
scoped_route_map_.erase(iter); | ||
applyConfigUpdate([scope_name](ConfigProvider::ConfigConstSharedPtr config) | ||
-> ConfigProvider::ConfigConstSharedPtr { | ||
auto* thread_local_scoped_config = | ||
const_cast<ScopedConfigImpl*>(static_cast<const ScopedConfigImpl*>(config.get())); | ||
thread_local_scoped_config->removeRoutingScope(scope_name); | ||
return config; | ||
}); | ||
removed_scope_names.push_back(scope_name); | ||
ENVOY_LOG(debug, "srds: remove scoped route '{}', version: {}", scope_name, version_info); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as prior comment, I suggest modifying the wording to clarify that the update has not yet been applied. |
||
} | ||
} | ||
if (!removed_scope_names.empty()) { | ||
applyConfigUpdate([removed_scope_names](ConfigProvider::ConfigConstSharedPtr config) | ||
-> ConfigProvider::ConfigConstSharedPtr { | ||
auto* thread_local_scoped_config = | ||
const_cast<ScopedConfigImpl*>(static_cast<const ScopedConfigImpl*>(config.get())); | ||
thread_local_scoped_config->removeRoutingScopes(removed_scope_names); | ||
return config; | ||
}); | ||
} | ||
return to_be_removed_rds_providers; | ||
} | ||
|
||
|
@@ -224,7 +216,6 @@ void ScopedRdsConfigSubscription::onConfigUpdate( | |
const Protobuf::RepeatedPtrField<std::string>& removed_resources, | ||
const std::string& version_info) { | ||
// NOTE: deletes are done before adds/updates. | ||
|
||
absl::flat_hash_map<std::string, ScopedRouteInfoConstSharedPtr> to_be_removed_scopes; | ||
// Destruction of resume_rds will lift the floodgate for new RDS subscriptions. | ||
// Note in the case of partial acceptance, accepted RDS subscriptions should be started | ||
|
@@ -264,10 +255,23 @@ void ScopedRdsConfigSubscription::onConfigUpdate( | |
} | ||
|
||
std::vector<std::string> exception_msgs; | ||
Protobuf::RepeatedPtrField<std::string> clean_removed_resources; | ||
try { | ||
clean_removed_resources = | ||
detectUpdateConflictAndCleanupRemoved(added_resources, removed_resources); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I suggest avoiding the use of an exception to return the conflicting resources. It complicates the error handling logic for an event that is not really "exceptional" (i.e., this function is explicitly checking for that condition). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agree, I have changed the validation function to return normally instead of throwing an exception when detecting conflict. |
||
} catch (const EnvoyException& e) { | ||
exception_msgs.emplace_back(absl::StrCat("", e.what())); | ||
} | ||
if (!exception_msgs.empty()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. whynot throw in the above catch clause? |
||
throw EnvoyException(fmt::format("Error adding/updating scoped route(s): {}", | ||
absl::StrJoin(exception_msgs, ", "))); | ||
} | ||
// Do not delete RDS config providers just yet, in case the to be deleted RDS subscriptions could | ||
// be reused by some to be added scopes. | ||
std::list<ScopedRdsConfigSubscription::RdsRouteConfigProviderHelperPtr> | ||
to_be_removed_rds_providers = removeScopes(removed_resources, version_info); | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. extra blank lines |
||
std::list<std::unique_ptr<ScopedRdsConfigSubscription::RdsRouteConfigProviderHelper>> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. unintentional? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed |
||
to_be_removed_rds_providers = removeScopes(clean_removed_resources, version_info); | ||
|
||
bool any_applied = | ||
addOrUpdateScopes(added_resources, | ||
(srds_init_mgr == nullptr ? localInitManager() : *srds_init_mgr), | ||
|
@@ -278,10 +282,6 @@ void ScopedRdsConfigSubscription::onConfigUpdate( | |
setLastConfigInfo(absl::optional<LastConfigInfo>({absl::nullopt, version_info})); | ||
} | ||
stats_.config_reload_.inc(); | ||
if (!exception_msgs.empty()) { | ||
throw EnvoyException(fmt::format("Error adding/updating scoped route(s): {}", | ||
absl::StrJoin(exception_msgs, ", "))); | ||
} | ||
} | ||
|
||
void ScopedRdsConfigSubscription::onRdsConfigUpdate(const std::string& scope_name, | ||
|
@@ -298,7 +298,7 @@ void ScopedRdsConfigSubscription::onRdsConfigUpdate(const std::string& scope_nam | |
-> ConfigProvider::ConfigConstSharedPtr { | ||
auto* thread_local_scoped_config = | ||
const_cast<ScopedConfigImpl*>(static_cast<const ScopedConfigImpl*>(config.get())); | ||
thread_local_scoped_config->addOrUpdateRoutingScope(new_scoped_route_info); | ||
thread_local_scoped_config->addOrUpdateRoutingScopes({new_scoped_route_info}); | ||
return config; | ||
}); | ||
} | ||
|
@@ -308,6 +308,22 @@ void ScopedRdsConfigSubscription::onRdsConfigUpdate(const std::string& scope_nam | |
void ScopedRdsConfigSubscription::onConfigUpdate( | ||
const std::vector<Envoy::Config::DecodedResourceRef>& resources, | ||
const std::string& version_info) { | ||
Protobuf::RepeatedPtrField<std::string> to_remove_repeated; | ||
for (const auto& scoped_route : scoped_route_map_) { | ||
*to_remove_repeated.Add() = scoped_route.first; | ||
} | ||
onConfigUpdate(resources, to_remove_repeated, version_info); | ||
} | ||
|
||
Protobuf::RepeatedPtrField<std::string> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you add some comment here explaining what this does? What kind of update conflict? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Both scope name and scope key conflicts. The conflict can be between new scopes and old scopes, and it can also be between new scopes. |
||
ScopedRdsConfigSubscription::detectUpdateConflictAndCleanupRemoved( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the order is problematic here: also remember deletions are done ahead of add/updates, let's add a final-state hash, apply deletions first, apply ALL the add/updates, and then count the hash conflict. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The update won't be rejected in this case. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the new procedure, in an sotw update, we mark all old scopes as removed scopes from the beginning, and conflict key and scope name with removed scopes will be ignored. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not following. There might be some confusion, but the case I was proposing is like "an incoming resource has a hash conflict with an existing scope, which will be updated by the same update but haven't yet". There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. right, in Delta APi, this is still a probelm. I missed your last reply, sorry. |
||
const std::vector<Envoy::Config::DecodedResourceRef>& resources, | ||
const Protobuf::RepeatedPtrField<std::string>& removed_resources) { | ||
absl::flat_hash_set<std::string> removed_scopes; | ||
// all the scope names to be removed. | ||
for (const std::string& removed_resource : removed_resources) { | ||
removed_scopes.insert(removed_resource); | ||
} | ||
absl::flat_hash_map<std::string, envoy::config::route::v3::ScopedRouteConfiguration> | ||
scoped_routes; | ||
absl::flat_hash_map<uint64_t, std::string> scope_name_by_key_hash; | ||
|
@@ -330,17 +346,24 @@ void ScopedRdsConfigSubscription::onConfigUpdate( | |
fmt::format("scope key conflict found, first scope is '{}', second scope is '{}'", | ||
scope_name_by_key_hash[key_fingerprint], scope_name)); | ||
} | ||
// if new scope have key conflict with scope that is not going to be removed | ||
if (scope_name_by_hash_.find(key_fingerprint) != scope_name_by_hash_.end() && | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: using a iterator to avoid multiple look up. |
||
scope_name_by_hash_[key_fingerprint] != scope_name && | ||
removed_scopes.find(scope_name_by_hash_[key_fingerprint]) == removed_scopes.end()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. use contains instead of comparing the iterator. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for delta API, lets say previously it's {scope1:h1}, the update is ({scope2:h1, scope1:h2},removed={}) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, it will gets rejected because scope2:h1 has key conflict with old scope with different name scope1:h1. This can be fixed easily though. Just marked all updated scopes as removed scope. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe I should add a unit test to cover this corner case? |
||
throw EnvoyException( | ||
fmt::format("scope key conflict found, first scope is '{}', second scope is '{}'", | ||
scope_name_by_hash_[key_fingerprint], scope_name)); | ||
} | ||
} | ||
ScopedRouteMap scoped_routes_to_remove = scoped_route_map_; | ||
Protobuf::RepeatedPtrField<std::string> to_remove_repeated; | ||
for (auto& iter : scoped_routes) { | ||
const std::string& scope_name = iter.first; | ||
scoped_routes_to_remove.erase(scope_name); | ||
} | ||
for (const auto& scoped_route : scoped_routes_to_remove) { | ||
*to_remove_repeated.Add() = scoped_route.first; | ||
|
||
Protobuf::RepeatedPtrField<std::string> clean_removed_resources; | ||
// only remove resources that is not going to be updated. | ||
for (const std::string& removed_resource : removed_resources) { | ||
if (scoped_routes.find(removed_resource) == scoped_routes.end()) { | ||
*clean_removed_resources.Add() = removed_resource; | ||
} | ||
} | ||
onConfigUpdate(resources, to_remove_repeated, version_info); | ||
return clean_removed_resources; | ||
} | ||
|
||
ScopedRdsConfigProvider::ScopedRdsConfigProvider( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: for consistency, I suggest using
auto
here as well.