Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reduce number of srds update callbacks #12118

Merged
merged 25 commits into from
Jul 24, 2020
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 18 additions & 14 deletions source/common/router/scoped_config_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -116,23 +116,27 @@ ScopeKeyPtr ScopeKeyBuilderImpl::computeScopeKey(const Http::HeaderMap& headers)
return std::make_unique<ScopeKey>(std::move(key));
}

void ScopedConfigImpl::addOrUpdateRoutingScope(
const ScopedRouteInfoConstSharedPtr& scoped_route_info) {
const auto iter = scoped_route_info_by_name_.find(scoped_route_info->scopeName());
if (iter != scoped_route_info_by_name_.end()) {
ASSERT(scoped_route_info_by_key_.contains(iter->second->scopeKey().hash()));
scoped_route_info_by_key_.erase(iter->second->scopeKey().hash());
void ScopedConfigImpl::addOrUpdateRoutingScopes(
const std::vector<ScopedRouteInfoConstSharedPtr>& scoped_route_infos) {
for (auto& scoped_route_info : scoped_route_infos) {
const auto iter = scoped_route_info_by_name_.find(scoped_route_info->scopeName());
if (iter != scoped_route_info_by_name_.end()) {
ASSERT(scoped_route_info_by_key_.contains(iter->second->scopeKey().hash()));
scoped_route_info_by_key_.erase(iter->second->scopeKey().hash());
}
scoped_route_info_by_name_[scoped_route_info->scopeName()] = scoped_route_info;
scoped_route_info_by_key_[scoped_route_info->scopeKey().hash()] = scoped_route_info;
}
scoped_route_info_by_name_[scoped_route_info->scopeName()] = scoped_route_info;
scoped_route_info_by_key_[scoped_route_info->scopeKey().hash()] = scoped_route_info;
}

void ScopedConfigImpl::removeRoutingScope(const std::string& scope_name) {
const auto iter = scoped_route_info_by_name_.find(scope_name);
if (iter != scoped_route_info_by_name_.end()) {
ASSERT(scoped_route_info_by_key_.contains(iter->second->scopeKey().hash()));
scoped_route_info_by_key_.erase(iter->second->scopeKey().hash());
scoped_route_info_by_name_.erase(iter);
void ScopedConfigImpl::removeRoutingScopes(const std::vector<std::string>& scope_names) {
for (std::string const& scope_name : scope_names) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: for consistency, I suggest using auto here as well.

const auto iter = scoped_route_info_by_name_.find(scope_name);
if (iter != scoped_route_info_by_name_.end()) {
ASSERT(scoped_route_info_by_key_.contains(iter->second->scopeKey().hash()));
scoped_route_info_by_key_.erase(iter->second->scopeKey().hash());
scoped_route_info_by_name_.erase(iter);
}
}
}

Expand Down
6 changes: 4 additions & 2 deletions source/common/router/scoped_config_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,10 @@ class ScopedConfigImpl : public ScopedConfig {
ScopedConfigImpl(ScopedRoutes::ScopeKeyBuilder&& scope_key_builder)
: scope_key_builder_(std::move(scope_key_builder)) {}

void addOrUpdateRoutingScope(const ScopedRouteInfoConstSharedPtr& scoped_route_info);
void removeRoutingScope(const std::string& scope_name);
void
addOrUpdateRoutingScopes(const std::vector<ScopedRouteInfoConstSharedPtr>& scoped_route_infos);

void removeRoutingScopes(const std::vector<std::string>& scope_names);

// Envoy::Router::ScopedConfig
Router::ConfigConstSharedPtr getRouteConfig(const Http::HeaderMap& headers) const override;
Expand Down
176 changes: 98 additions & 78 deletions source/common/router/scoped_rds.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "common/init/manager_impl.h"
#include "common/init/watcher_impl.h"
#include "common/router/rds_impl.h"
#include "common/router/scoped_config_impl.h"

#include "absl/strings/str_join.h"

Expand Down Expand Up @@ -136,56 +137,41 @@ ScopedRdsConfigSubscription::RdsRouteConfigProviderHelper::RdsRouteConfigProvide

bool ScopedRdsConfigSubscription::addOrUpdateScopes(
const std::vector<Envoy::Config::DecodedResourceRef>& resources, Init::Manager& init_manager,
const std::string& version_info, std::vector<std::string>& exception_msgs) {
const std::string& version_info) {
bool any_applied = false;
envoy::extensions::filters::network::http_connection_manager::v3::Rds rds;
rds.mutable_config_source()->MergeFrom(rds_config_source_);
absl::flat_hash_set<std::string> unique_resource_names;
std::vector<ScopedRouteInfoConstSharedPtr> updated_scopes;
for (const auto& resource : resources) {
try {
// Explicit copy so that we can std::move later.
envoy::config::route::v3::ScopedRouteConfiguration scoped_route_config =
dynamic_cast<const envoy::config::route::v3::ScopedRouteConfiguration&>(
resource.get().resource());
const std::string scope_name = scoped_route_config.name();
if (!unique_resource_names.insert(scope_name).second) {
throw EnvoyException(
fmt::format("duplicate scoped route configuration '{}' found", scope_name));
}
// TODO(stevenzzz): Creating a new RdsRouteConfigProvider likely expensive, migrate RDS to
// config-provider-framework to make it light weight.
rds.set_route_config_name(scoped_route_config.route_configuration_name());
auto rds_config_provider_helper =
std::make_unique<RdsRouteConfigProviderHelper>(*this, scope_name, rds, init_manager);
auto scoped_route_info = std::make_shared<ScopedRouteInfo>(
std::move(scoped_route_config), rds_config_provider_helper->routeConfig());
// Detect if there is key conflict between two scopes, in which case Envoy won't be able to
// tell which RouteConfiguration to use. Reject the second scope in the delta form API.
auto iter = scope_name_by_hash_.find(scoped_route_info->scopeKey().hash());
if (iter != scope_name_by_hash_.end()) {
if (iter->second != scoped_route_info->scopeName()) {
throw EnvoyException(
fmt::format("scope key conflict found, first scope is '{}', second scope is '{}'",
iter->second, scoped_route_info->scopeName()));
}
}
// NOTE: delete previous route provider if any.
route_provider_by_scope_.insert({scope_name, std::move(rds_config_provider_helper)});
scope_name_by_hash_[scoped_route_info->scopeKey().hash()] = scoped_route_info->scopeName();
scoped_route_map_[scoped_route_info->scopeName()] = scoped_route_info;
applyConfigUpdate([scoped_route_info](ConfigProvider::ConfigConstSharedPtr config)
-> ConfigProvider::ConfigConstSharedPtr {
auto* thread_local_scoped_config =
const_cast<ScopedConfigImpl*>(static_cast<const ScopedConfigImpl*>(config.get()));
thread_local_scoped_config->addOrUpdateRoutingScope(scoped_route_info);
return config;
});
any_applied = true;
ENVOY_LOG(debug, "srds: add/update scoped_route '{}', version: {}",
scoped_route_info->scopeName(), version_info);
} catch (const EnvoyException& e) {
exception_msgs.emplace_back(absl::StrCat("", e.what()));
}
// Explicit copy so that we can std::move later.
envoy::config::route::v3::ScopedRouteConfiguration scoped_route_config =
dynamic_cast<const envoy::config::route::v3::ScopedRouteConfiguration&>(
resource.get().resource());
const std::string scope_name = scoped_route_config.name();
// TODO(stevenzzz): Creating a new RdsRouteConfigProvider likely expensive, migrate RDS to
// config-provider-framework to make it light weight.
rds.set_route_config_name(scoped_route_config.route_configuration_name());
auto rds_config_provider_helper =
std::make_unique<RdsRouteConfigProviderHelper>(*this, scope_name, rds, init_manager);
auto scoped_route_info = std::make_shared<ScopedRouteInfo>(
std::move(scoped_route_config), rds_config_provider_helper->routeConfig());
route_provider_by_scope_.insert({scope_name, std::move(rds_config_provider_helper)});
scope_name_by_hash_[scoped_route_info->scopeKey().hash()] = scoped_route_info->scopeName();
scoped_route_map_[scoped_route_info->scopeName()] = scoped_route_info;
updated_scopes.push_back(scoped_route_info);
any_applied = true;
ENVOY_LOG(debug, "srds: queueing add/update of scoped_route '{}', version: {}",
scoped_route_info->scopeName(), version_info);
}

if (!updated_scopes.empty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this changes the logic such that partial application of an SRDS config update is no longer possible. For example, if there is a duplicate scoped route, all preceding routes would have been applied prior to this change.

I actually think the new behavior is preferable; I suggest adding a test to explicitly validate this logic.

FYI @stevenzzzz

Copy link
Member Author

@chaoqin-li1123 chaoqin-li1123 Jul 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, currently the srds uses SotW. A test to validate that partial update is not accepted will be good.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Andres is saying in delta XDS within a response handling, we could have a portion of the list applied, as the previous scope info is distributed to workers. We need to get this behavior tested.
I actually dont have a strong opinion of it, delta xDS cant send per-resource status back to management server anyway, so go ahead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even for a regular SOTW config update, the existing logic could partially apply config since applyConfigUpdate() is called inline for each scoped route distributed in the DiscoveryResponse.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for the SotW api, the two errors has been checked before the Delta interface is called.
Actually a good refactoring could be moving the verification code into a helper fuction. @chaoqin-li1123

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, thanks Xin. This only applies to delta updates.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if the new change is applied, in delta api, partial updates is accepted in main thread when exception happens, but the partial updates are not applied to worker threads. We want the update to be fully rejected in both main thread and worker thread, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can store all the incoming scoped_route_config in a vector, pass the reference of this vector to a helper function to detect any scope key or scope name conflict. If there is any conflict, no change will be applied, otherwise all changes will be applied by creating corresponding RdsRouteConfigProviderHelper and push scope_route_info into the maps.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A helper function has been added to detect key and scope name conflict.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks

applyConfigUpdate([updated_scopes](ConfigProvider::ConfigConstSharedPtr config)
-> ConfigProvider::ConfigConstSharedPtr {
auto* thread_local_scoped_config =
const_cast<ScopedConfigImpl*>(static_cast<const ScopedConfigImpl*>(config.get()));
thread_local_scoped_config->addOrUpdateRoutingScopes(updated_scopes);
return config;
});
}
return any_applied;
}
Expand All @@ -195,6 +181,7 @@ ScopedRdsConfigSubscription::removeScopes(
const Protobuf::RepeatedPtrField<std::string>& scope_names, const std::string& version_info) {
std::list<ScopedRdsConfigSubscription::RdsRouteConfigProviderHelperPtr>
to_be_removed_rds_providers;
std::vector<std::string> removed_scope_names;
for (const auto& scope_name : scope_names) {
auto iter = scoped_route_map_.find(scope_name);
if (iter != scoped_route_map_.end()) {
Expand All @@ -206,16 +193,20 @@ ScopedRdsConfigSubscription::removeScopes(
}
scope_name_by_hash_.erase(iter->second->scopeKey().hash());
scoped_route_map_.erase(iter);
applyConfigUpdate([scope_name](ConfigProvider::ConfigConstSharedPtr config)
-> ConfigProvider::ConfigConstSharedPtr {
auto* thread_local_scoped_config =
const_cast<ScopedConfigImpl*>(static_cast<const ScopedConfigImpl*>(config.get()));
thread_local_scoped_config->removeRoutingScope(scope_name);
return config;
});
ENVOY_LOG(debug, "srds: remove scoped route '{}', version: {}", scope_name, version_info);
removed_scope_names.push_back(scope_name);
ENVOY_LOG(debug, "srds: queueing removal of scoped route '{}', version: {}", scope_name,
version_info);
}
}
if (!removed_scope_names.empty()) {
applyConfigUpdate([removed_scope_names](ConfigProvider::ConfigConstSharedPtr config)
-> ConfigProvider::ConfigConstSharedPtr {
auto* thread_local_scoped_config =
const_cast<ScopedConfigImpl*>(static_cast<const ScopedConfigImpl*>(config.get()));
thread_local_scoped_config->removeRoutingScopes(removed_scope_names);
return config;
});
}
return to_be_removed_rds_providers;
}

Expand All @@ -224,7 +215,6 @@ void ScopedRdsConfigSubscription::onConfigUpdate(
const Protobuf::RepeatedPtrField<std::string>& removed_resources,
const std::string& version_info) {
// NOTE: deletes are done before adds/updates.

absl::flat_hash_map<std::string, ScopedRouteInfoConstSharedPtr> to_be_removed_scopes;
// Destruction of resume_rds will lift the floodgate for new RDS subscriptions.
// Note in the case of partial acceptance, accepted RDS subscriptions should be started
Expand Down Expand Up @@ -263,25 +253,29 @@ void ScopedRdsConfigSubscription::onConfigUpdate(
});
}

std::vector<std::string> exception_msgs;
std::string exception_msg;
Protobuf::RepeatedPtrField<std::string> clean_removed_resources;
detectUpdateConflictAndCleanupRemoved(added_resources, removed_resources, clean_removed_resources,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: it's generally preferable for readability to use a return arg as opposed to output parameters.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, I have changed clean_removed_resources to be the explicitly returned value.

exception_msg);
if (!exception_msg.empty()) {
throw EnvoyException(fmt::format("Error adding/updating scoped route(s): {}", exception_msg));
}

// Do not delete RDS config providers just yet, in case the to be deleted RDS subscriptions could
// be reused by some to be added scopes.
std::list<ScopedRdsConfigSubscription::RdsRouteConfigProviderHelperPtr>
to_be_removed_rds_providers = removeScopes(removed_resources, version_info);
to_be_removed_rds_providers = removeScopes(clean_removed_resources, version_info);

bool any_applied =
addOrUpdateScopes(added_resources,
(srds_init_mgr == nullptr ? localInitManager() : *srds_init_mgr),
version_info, exception_msgs) ||
version_info) ||
!to_be_removed_rds_providers.empty();
ConfigSubscriptionCommonBase::onConfigUpdate();
if (any_applied) {
setLastConfigInfo(absl::optional<LastConfigInfo>({absl::nullopt, version_info}));
}
stats_.config_reload_.inc();
if (!exception_msgs.empty()) {
throw EnvoyException(fmt::format("Error adding/updating scoped route(s): {}",
absl::StrJoin(exception_msgs, ", ")));
}
}

void ScopedRdsConfigSubscription::onRdsConfigUpdate(const std::string& scope_name,
Expand All @@ -298,7 +292,7 @@ void ScopedRdsConfigSubscription::onRdsConfigUpdate(const std::string& scope_nam
-> ConfigProvider::ConfigConstSharedPtr {
auto* thread_local_scoped_config =
const_cast<ScopedConfigImpl*>(static_cast<const ScopedConfigImpl*>(config.get()));
thread_local_scoped_config->addOrUpdateRoutingScope(new_scoped_route_info);
thread_local_scoped_config->addOrUpdateRoutingScopes({new_scoped_route_info});
return config;
});
}
Expand All @@ -308,9 +302,37 @@ void ScopedRdsConfigSubscription::onRdsConfigUpdate(const std::string& scope_nam
void ScopedRdsConfigSubscription::onConfigUpdate(
const std::vector<Envoy::Config::DecodedResourceRef>& resources,
const std::string& version_info) {
Protobuf::RepeatedPtrField<std::string> to_remove_repeated;
for (const auto& scoped_route : scoped_route_map_) {
*to_remove_repeated.Add() = scoped_route.first;
}
onConfigUpdate(resources, to_remove_repeated, version_info);
}

void ScopedRdsConfigSubscription::detectUpdateConflictAndCleanupRemoved(
const std::vector<Envoy::Config::DecodedResourceRef>& resources,
const Protobuf::RepeatedPtrField<std::string>& removed_resources,
Protobuf::RepeatedPtrField<std::string>& clean_removed_resources, std::string& exception_msg) {
// all the scope names to be removed or updated.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit capitalize

absl::flat_hash_set<std::string> updated_or_removed_scopes;
for (const std::string& removed_resource : removed_resources) {
updated_or_removed_scopes.insert(removed_resource);
}
for (const auto& resource : resources) {
const auto& scoped_route =
dynamic_cast<const envoy::config::route::v3::ScopedRouteConfiguration&>(
resource.get().resource());
updated_or_removed_scopes.insert(scoped_route.name());
}

absl::flat_hash_map<uint64_t, std::string> scope_name_by_hash = scope_name_by_hash_;
// don't check key conflict with scopes to be updated or removed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment is also confusing, I think we checked the conflict later in the for loops.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only check conflict with scopes that are not updated or removed. Maybe we can change the wording a little bit to make it less confusing?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's confusing as it says "we dont check conflict within the updated scopes."

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about "Don't check key conflict with old scopes to be updated or removed."?

absl::erase_if(scope_name_by_hash, [&updated_or_removed_scopes](const auto& key_name) {
auto const& [key, name] = key_name;
return (updated_or_removed_scopes.contains(name));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: superfluous parens.

});
absl::flat_hash_map<std::string, envoy::config::route::v3::ScopedRouteConfiguration>
scoped_routes;
absl::flat_hash_map<uint64_t, std::string> scope_name_by_key_hash;
for (const auto& resource : resources) {
// Throws (thus rejects all) on any error.
const auto& scoped_route =
Expand All @@ -319,28 +341,26 @@ void ScopedRdsConfigSubscription::onConfigUpdate(
const std::string& scope_name = scoped_route.name();
auto scope_config_inserted = scoped_routes.try_emplace(scope_name, std::move(scoped_route));
if (!scope_config_inserted.second) {
throw EnvoyException(
fmt::format("duplicate scoped route configuration '{}' found", scope_name));
exception_msg = fmt::format("duplicate scoped route configuration '{}' found", scope_name);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we no longer throwing and doing error-by-value?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because Andres think it is not a good style to throw exception in a validation function. "I suggest avoiding the use of an exception to return the conflicting resources. It complicates the error handling logic for an event that is not really "exceptional" (i.e., this function is explicitly checking for that condition)."

return;
}
const envoy::config::route::v3::ScopedRouteConfiguration& scoped_route_config =
scope_config_inserted.first->second;
const uint64_t key_fingerprint = MessageUtil::hash(scoped_route_config.key());
if (!scope_name_by_key_hash.try_emplace(key_fingerprint, scope_name).second) {
throw EnvoyException(
if (!scope_name_by_hash.try_emplace(key_fingerprint, scope_name).second) {
exception_msg =
fmt::format("scope key conflict found, first scope is '{}', second scope is '{}'",
scope_name_by_key_hash[key_fingerprint], scope_name));
scope_name_by_hash[key_fingerprint], scope_name);
return;
}
}
ScopedRouteMap scoped_routes_to_remove = scoped_route_map_;
Protobuf::RepeatedPtrField<std::string> to_remove_repeated;
for (auto& iter : scoped_routes) {
const std::string& scope_name = iter.first;
scoped_routes_to_remove.erase(scope_name);
}
for (const auto& scoped_route : scoped_routes_to_remove) {
*to_remove_repeated.Add() = scoped_route.first;

// only remove resources that is not going to be updated.
for (const std::string& removed_resource : removed_resources) {
if (!scoped_routes.contains(removed_resource)) {
*clean_removed_resources.Add() = removed_resource;
}
}
onConfigUpdate(resources, to_remove_repeated, version_info);
}

ScopedRdsConfigProvider::ScopedRdsConfigProvider(
Expand Down
14 changes: 8 additions & 6 deletions source/common/router/scoped_rds.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,7 @@ class ScopedRdsConfigSubscription
// during updating, the exception message is collected via the exception messages vector.
// Returns true if any scope updated, false otherwise.
bool addOrUpdateScopes(const std::vector<Envoy::Config::DecodedResourceRef>& resources,
Init::Manager& init_manager, const std::string& version_info,
std::vector<std::string>& exception_msgs);
Init::Manager& init_manager, const std::string& version_info);
// Removes given scopes from the managed set of scopes.
// Returns a list of to be removed helpers which is temporally held in the onConfigUpdate method,
// to make sure new scopes sharing the same RDS source configs could reuse the subscriptions.
Expand All @@ -148,12 +147,15 @@ class ScopedRdsConfigSubscription
// Envoy::Config::DeltaConfigSubscriptionInstance
void start() override { subscription_->start({}); }

void detectUpdateConflictAndCleanupRemoved(
const std::vector<Envoy::Config::DecodedResourceRef>& added_resources,
const Protobuf::RepeatedPtrField<std::string>& removed_resources,
Protobuf::RepeatedPtrField<std::string>& clean_removed_resources, std::string& exception_msg);

// Envoy::Config::SubscriptionCallbacks

// NOTE: state-of-the-world form onConfigUpdate(resources, version_info) will throw an
// EnvoyException on any error and essentially reject an update. While the Delta form
// onConfigUpdate(added_resources, removed_resources, version_info) by design will partially
// accept correct RouteConfiguration from management server.
// NOTE: both delta form and state-of-the-world form onConfigUpdate(resources, version_info) will
// throw an EnvoyException on any error and essentially reject an update.
void onConfigUpdate(const std::vector<Envoy::Config::DecodedResourceRef>& resources,
const std::string& version_info) override;
void onConfigUpdate(const std::vector<Envoy::Config::DecodedResourceRef>& added_resources,
Expand Down
Loading