Skip to content

Commit

Permalink
reduce number of srds update callbacks (envoyproxy#12118)
Browse files Browse the repository at this point in the history
Currently, in scope route discovery service, when resources are received from a management server, for each added, updated and removed scope route, a callback is fired. If several update callbacks are combined into a single update callback, less performance penalty will be incurred. This can be achieved by moving applyConfigUpdate out of the for loop and applying all the updates in a single callback. In this case, partial update won't be accepted both in sotw and delta srds. Scope key and scope name conflicts will be checked before any config update is applied.

Risk Level: Low
Testing: Modify a unit test related to change.
Docs Changes: The behavior of delta srds will be changed, partial update won't be accepted.

Signed-off-by: chaoqinli <chaoqinli@google.com>
  • Loading branch information
chaoqin-li1123 authored and chaoqinli committed Aug 7, 2020
1 parent 0fcaf06 commit 4c5ebd9
Show file tree
Hide file tree
Showing 6 changed files with 280 additions and 141 deletions.
32 changes: 18 additions & 14 deletions source/common/router/scoped_config_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -116,23 +116,27 @@ ScopeKeyPtr ScopeKeyBuilderImpl::computeScopeKey(const Http::HeaderMap& headers)
return std::make_unique<ScopeKey>(std::move(key));
}

void ScopedConfigImpl::addOrUpdateRoutingScope(
const ScopedRouteInfoConstSharedPtr& scoped_route_info) {
const auto iter = scoped_route_info_by_name_.find(scoped_route_info->scopeName());
if (iter != scoped_route_info_by_name_.end()) {
ASSERT(scoped_route_info_by_key_.contains(iter->second->scopeKey().hash()));
scoped_route_info_by_key_.erase(iter->second->scopeKey().hash());
void ScopedConfigImpl::addOrUpdateRoutingScopes(
const std::vector<ScopedRouteInfoConstSharedPtr>& scoped_route_infos) {
for (auto& scoped_route_info : scoped_route_infos) {
const auto iter = scoped_route_info_by_name_.find(scoped_route_info->scopeName());
if (iter != scoped_route_info_by_name_.end()) {
ASSERT(scoped_route_info_by_key_.contains(iter->second->scopeKey().hash()));
scoped_route_info_by_key_.erase(iter->second->scopeKey().hash());
}
scoped_route_info_by_name_[scoped_route_info->scopeName()] = scoped_route_info;
scoped_route_info_by_key_[scoped_route_info->scopeKey().hash()] = scoped_route_info;
}
scoped_route_info_by_name_[scoped_route_info->scopeName()] = scoped_route_info;
scoped_route_info_by_key_[scoped_route_info->scopeKey().hash()] = scoped_route_info;
}

void ScopedConfigImpl::removeRoutingScope(const std::string& scope_name) {
const auto iter = scoped_route_info_by_name_.find(scope_name);
if (iter != scoped_route_info_by_name_.end()) {
ASSERT(scoped_route_info_by_key_.contains(iter->second->scopeKey().hash()));
scoped_route_info_by_key_.erase(iter->second->scopeKey().hash());
scoped_route_info_by_name_.erase(iter);
void ScopedConfigImpl::removeRoutingScopes(const std::vector<std::string>& scope_names) {
for (std::string const& scope_name : scope_names) {
const auto iter = scoped_route_info_by_name_.find(scope_name);
if (iter != scoped_route_info_by_name_.end()) {
ASSERT(scoped_route_info_by_key_.contains(iter->second->scopeKey().hash()));
scoped_route_info_by_key_.erase(iter->second->scopeKey().hash());
scoped_route_info_by_name_.erase(iter);
}
}
}

Expand Down
6 changes: 4 additions & 2 deletions source/common/router/scoped_config_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,10 @@ class ScopedConfigImpl : public ScopedConfig {
ScopedConfigImpl(ScopedRoutes::ScopeKeyBuilder&& scope_key_builder)
: scope_key_builder_(std::move(scope_key_builder)) {}

void addOrUpdateRoutingScope(const ScopedRouteInfoConstSharedPtr& scoped_route_info);
void removeRoutingScope(const std::string& scope_name);
void
addOrUpdateRoutingScopes(const std::vector<ScopedRouteInfoConstSharedPtr>& scoped_route_infos);

void removeRoutingScopes(const std::vector<std::string>& scope_names);

// Envoy::Router::ScopedConfig
Router::ConfigConstSharedPtr getRouteConfig(const Http::HeaderMap& headers) const override;
Expand Down
176 changes: 98 additions & 78 deletions source/common/router/scoped_rds.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "common/init/manager_impl.h"
#include "common/init/watcher_impl.h"
#include "common/router/rds_impl.h"
#include "common/router/scoped_config_impl.h"

#include "absl/strings/str_join.h"

Expand Down Expand Up @@ -157,56 +158,41 @@ void ScopedRdsConfigSubscription::RdsRouteConfigProviderHelper::runOnDemandUpdat

bool ScopedRdsConfigSubscription::addOrUpdateScopes(
const std::vector<Envoy::Config::DecodedResourceRef>& resources, Init::Manager& init_manager,
const std::string& version_info, std::vector<std::string>& exception_msgs) {
const std::string& version_info) {
bool any_applied = false;
envoy::extensions::filters::network::http_connection_manager::v3::Rds rds;
rds.mutable_config_source()->MergeFrom(rds_config_source_);
absl::flat_hash_set<std::string> unique_resource_names;
std::vector<ScopedRouteInfoConstSharedPtr> updated_scopes;
for (const auto& resource : resources) {
try {
// Explicit copy so that we can std::move later.
envoy::config::route::v3::ScopedRouteConfiguration scoped_route_config =
dynamic_cast<const envoy::config::route::v3::ScopedRouteConfiguration&>(
resource.get().resource());
const std::string scope_name = scoped_route_config.name();
if (!unique_resource_names.insert(scope_name).second) {
throw EnvoyException(
fmt::format("duplicate scoped route configuration '{}' found", scope_name));
}
// TODO(stevenzzz): Creating a new RdsRouteConfigProvider likely expensive, migrate RDS to
// config-provider-framework to make it light weight.
rds.set_route_config_name(scoped_route_config.route_configuration_name());
auto rds_config_provider_helper =
std::make_unique<RdsRouteConfigProviderHelper>(*this, scope_name, rds, init_manager);
auto scoped_route_info = std::make_shared<ScopedRouteInfo>(
std::move(scoped_route_config), rds_config_provider_helper->routeConfig());
// Detect if there is key conflict between two scopes, in which case Envoy won't be able to
// tell which RouteConfiguration to use. Reject the second scope in the delta form API.
auto iter = scope_name_by_hash_.find(scoped_route_info->scopeKey().hash());
if (iter != scope_name_by_hash_.end()) {
if (iter->second != scoped_route_info->scopeName()) {
throw EnvoyException(
fmt::format("scope key conflict found, first scope is '{}', second scope is '{}'",
iter->second, scoped_route_info->scopeName()));
}
}
// NOTE: delete previous route provider if any.
route_provider_by_scope_.insert({scope_name, std::move(rds_config_provider_helper)});
scope_name_by_hash_[scoped_route_info->scopeKey().hash()] = scoped_route_info->scopeName();
scoped_route_map_[scoped_route_info->scopeName()] = scoped_route_info;
applyConfigUpdate([scoped_route_info](ConfigProvider::ConfigConstSharedPtr config)
-> ConfigProvider::ConfigConstSharedPtr {
auto* thread_local_scoped_config =
const_cast<ScopedConfigImpl*>(static_cast<const ScopedConfigImpl*>(config.get()));
thread_local_scoped_config->addOrUpdateRoutingScope(scoped_route_info);
return config;
});
any_applied = true;
ENVOY_LOG(debug, "srds: add/update scoped_route '{}', version: {}",
scoped_route_info->scopeName(), version_info);
} catch (const EnvoyException& e) {
exception_msgs.emplace_back(absl::StrCat("", e.what()));
}
// Explicit copy so that we can std::move later.
envoy::config::route::v3::ScopedRouteConfiguration scoped_route_config =
dynamic_cast<const envoy::config::route::v3::ScopedRouteConfiguration&>(
resource.get().resource());
const std::string scope_name = scoped_route_config.name();
// TODO(stevenzzz): Creating a new RdsRouteConfigProvider likely expensive, migrate RDS to
// config-provider-framework to make it light weight.
rds.set_route_config_name(scoped_route_config.route_configuration_name());
auto rds_config_provider_helper =
std::make_unique<RdsRouteConfigProviderHelper>(*this, scope_name, rds, init_manager);
auto scoped_route_info = std::make_shared<ScopedRouteInfo>(
std::move(scoped_route_config), rds_config_provider_helper->routeConfig());
route_provider_by_scope_.insert({scope_name, std::move(rds_config_provider_helper)});
scope_name_by_hash_[scoped_route_info->scopeKey().hash()] = scoped_route_info->scopeName();
scoped_route_map_[scoped_route_info->scopeName()] = scoped_route_info;
updated_scopes.push_back(scoped_route_info);
any_applied = true;
ENVOY_LOG(debug, "srds: queueing add/update of scoped_route '{}', version: {}",
scoped_route_info->scopeName(), version_info);
}

if (!updated_scopes.empty()) {
applyConfigUpdate([updated_scopes](ConfigProvider::ConfigConstSharedPtr config)
-> ConfigProvider::ConfigConstSharedPtr {
auto* thread_local_scoped_config =
const_cast<ScopedConfigImpl*>(static_cast<const ScopedConfigImpl*>(config.get()));
thread_local_scoped_config->addOrUpdateRoutingScopes(updated_scopes);
return config;
});
}
return any_applied;
}
Expand All @@ -216,6 +202,7 @@ ScopedRdsConfigSubscription::removeScopes(
const Protobuf::RepeatedPtrField<std::string>& scope_names, const std::string& version_info) {
std::list<ScopedRdsConfigSubscription::RdsRouteConfigProviderHelperPtr>
to_be_removed_rds_providers;
std::vector<std::string> removed_scope_names;
for (const auto& scope_name : scope_names) {
auto iter = scoped_route_map_.find(scope_name);
if (iter != scoped_route_map_.end()) {
Expand All @@ -227,16 +214,20 @@ ScopedRdsConfigSubscription::removeScopes(
}
scope_name_by_hash_.erase(iter->second->scopeKey().hash());
scoped_route_map_.erase(iter);
applyConfigUpdate([scope_name](ConfigProvider::ConfigConstSharedPtr config)
-> ConfigProvider::ConfigConstSharedPtr {
auto* thread_local_scoped_config =
const_cast<ScopedConfigImpl*>(static_cast<const ScopedConfigImpl*>(config.get()));
thread_local_scoped_config->removeRoutingScope(scope_name);
return config;
});
ENVOY_LOG(debug, "srds: remove scoped route '{}', version: {}", scope_name, version_info);
removed_scope_names.push_back(scope_name);
ENVOY_LOG(debug, "srds: queueing removal of scoped route '{}', version: {}", scope_name,
version_info);
}
}
if (!removed_scope_names.empty()) {
applyConfigUpdate([removed_scope_names](ConfigProvider::ConfigConstSharedPtr config)
-> ConfigProvider::ConfigConstSharedPtr {
auto* thread_local_scoped_config =
const_cast<ScopedConfigImpl*>(static_cast<const ScopedConfigImpl*>(config.get()));
thread_local_scoped_config->removeRoutingScopes(removed_scope_names);
return config;
});
}
return to_be_removed_rds_providers;
}

Expand All @@ -245,7 +236,6 @@ void ScopedRdsConfigSubscription::onConfigUpdate(
const Protobuf::RepeatedPtrField<std::string>& removed_resources,
const std::string& version_info) {
// NOTE: deletes are done before adds/updates.

absl::flat_hash_map<std::string, ScopedRouteInfoConstSharedPtr> to_be_removed_scopes;
// Destruction of resume_rds will lift the floodgate for new RDS subscriptions.
// Note in the case of partial acceptance, accepted RDS subscriptions should be started
Expand Down Expand Up @@ -284,25 +274,28 @@ void ScopedRdsConfigSubscription::onConfigUpdate(
});
}

std::vector<std::string> exception_msgs;
std::string exception_msg;
Protobuf::RepeatedPtrField<std::string> clean_removed_resources =
detectUpdateConflictAndCleanupRemoved(added_resources, removed_resources, exception_msg);
if (!exception_msg.empty()) {
throw EnvoyException(fmt::format("Error adding/updating scoped route(s): {}", exception_msg));
}

// Do not delete RDS config providers just yet, in case the to be deleted RDS subscriptions could
// be reused by some to be added scopes.
std::list<ScopedRdsConfigSubscription::RdsRouteConfigProviderHelperPtr>
to_be_removed_rds_providers = removeScopes(removed_resources, version_info);
to_be_removed_rds_providers = removeScopes(clean_removed_resources, version_info);

bool any_applied =
addOrUpdateScopes(added_resources,
(srds_init_mgr == nullptr ? localInitManager() : *srds_init_mgr),
version_info, exception_msgs) ||
version_info) ||
!to_be_removed_rds_providers.empty();
ConfigSubscriptionCommonBase::onConfigUpdate();
if (any_applied) {
setLastConfigInfo(absl::optional<LastConfigInfo>({absl::nullopt, version_info}));
}
stats_.config_reload_.inc();
if (!exception_msgs.empty()) {
throw EnvoyException(fmt::format("Error adding/updating scoped route(s): {}",
absl::StrJoin(exception_msgs, ", ")));
}
}

void ScopedRdsConfigSubscription::onRdsConfigUpdate(const std::string& scope_name,
Expand All @@ -319,7 +312,7 @@ void ScopedRdsConfigSubscription::onRdsConfigUpdate(const std::string& scope_nam
-> ConfigProvider::ConfigConstSharedPtr {
auto* thread_local_scoped_config =
const_cast<ScopedConfigImpl*>(static_cast<const ScopedConfigImpl*>(config.get()));
thread_local_scoped_config->addOrUpdateRoutingScope(new_scoped_route_info);
thread_local_scoped_config->addOrUpdateRoutingScopes({new_scoped_route_info});
return config;
});
route_provider_by_scope_[scope_name]->runOnDemandUpdateCallback();
Expand All @@ -330,9 +323,37 @@ void ScopedRdsConfigSubscription::onRdsConfigUpdate(const std::string& scope_nam
void ScopedRdsConfigSubscription::onConfigUpdate(
const std::vector<Envoy::Config::DecodedResourceRef>& resources,
const std::string& version_info) {
Protobuf::RepeatedPtrField<std::string> to_remove_repeated;
for (const auto& scoped_route : scoped_route_map_) {
*to_remove_repeated.Add() = scoped_route.first;
}
onConfigUpdate(resources, to_remove_repeated, version_info);
}

Protobuf::RepeatedPtrField<std::string>
ScopedRdsConfigSubscription::detectUpdateConflictAndCleanupRemoved(
const std::vector<Envoy::Config::DecodedResourceRef>& resources,
const Protobuf::RepeatedPtrField<std::string>& removed_resources, std::string& exception_msg) {
Protobuf::RepeatedPtrField<std::string> clean_removed_resources;
// All the scope names to be removed or updated.
absl::flat_hash_set<std::string> updated_or_removed_scopes;
for (const std::string& removed_resource : removed_resources) {
updated_or_removed_scopes.insert(removed_resource);
}
for (const auto& resource : resources) {
const auto& scoped_route =
dynamic_cast<const envoy::config::route::v3::ScopedRouteConfiguration&>(
resource.get().resource());
updated_or_removed_scopes.insert(scoped_route.name());
}

absl::flat_hash_map<uint64_t, std::string> scope_name_by_hash = scope_name_by_hash_;
absl::erase_if(scope_name_by_hash, [&updated_or_removed_scopes](const auto& key_name) {
auto const& [key, name] = key_name;
return updated_or_removed_scopes.contains(name);
});
absl::flat_hash_map<std::string, envoy::config::route::v3::ScopedRouteConfiguration>
scoped_routes;
absl::flat_hash_map<uint64_t, std::string> scope_name_by_key_hash;
for (const auto& resource : resources) {
// Throws (thus rejects all) on any error.
const auto& scoped_route =
Expand All @@ -341,28 +362,27 @@ void ScopedRdsConfigSubscription::onConfigUpdate(
const std::string& scope_name = scoped_route.name();
auto scope_config_inserted = scoped_routes.try_emplace(scope_name, std::move(scoped_route));
if (!scope_config_inserted.second) {
throw EnvoyException(
fmt::format("duplicate scoped route configuration '{}' found", scope_name));
exception_msg = fmt::format("duplicate scoped route configuration '{}' found", scope_name);
return clean_removed_resources;
}
const envoy::config::route::v3::ScopedRouteConfiguration& scoped_route_config =
scope_config_inserted.first->second;
const uint64_t key_fingerprint = MessageUtil::hash(scoped_route_config.key());
if (!scope_name_by_key_hash.try_emplace(key_fingerprint, scope_name).second) {
throw EnvoyException(
if (!scope_name_by_hash.try_emplace(key_fingerprint, scope_name).second) {
exception_msg =
fmt::format("scope key conflict found, first scope is '{}', second scope is '{}'",
scope_name_by_key_hash[key_fingerprint], scope_name));
scope_name_by_hash[key_fingerprint], scope_name);
return clean_removed_resources;
}
}
ScopedRouteMap scoped_routes_to_remove = scoped_route_map_;
Protobuf::RepeatedPtrField<std::string> to_remove_repeated;
for (auto& iter : scoped_routes) {
const std::string& scope_name = iter.first;
scoped_routes_to_remove.erase(scope_name);
}
for (const auto& scoped_route : scoped_routes_to_remove) {
*to_remove_repeated.Add() = scoped_route.first;

// only remove resources that is not going to be updated.
for (const std::string& removed_resource : removed_resources) {
if (!scoped_routes.contains(removed_resource)) {
*clean_removed_resources.Add() = removed_resource;
}
}
onConfigUpdate(resources, to_remove_repeated, version_info);
return clean_removed_resources;
}

void ScopedRdsConfigSubscription::onDemandRdsUpdate(
Expand Down
17 changes: 11 additions & 6 deletions source/common/router/scoped_rds.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,7 @@ class ScopedRdsConfigSubscription
// during updating, the exception message is collected via the exception messages vector.
// Returns true if any scope updated, false otherwise.
bool addOrUpdateScopes(const std::vector<Envoy::Config::DecodedResourceRef>& resources,
Init::Manager& init_manager, const std::string& version_info,
std::vector<std::string>& exception_msgs);
Init::Manager& init_manager, const std::string& version_info);
// Removes given scopes from the managed set of scopes.
// Returns a list of to be removed helpers which is temporally held in the onConfigUpdate method,
// to make sure new scopes sharing the same RDS source configs could reuse the subscriptions.
Expand All @@ -160,12 +159,18 @@ class ScopedRdsConfigSubscription
// Envoy::Config::DeltaConfigSubscriptionInstance
void start() override { subscription_->start({}); }

// Detect scope name and scope key conflict between added scopes or between added scopes and old
// scopes. Some removed scopes may be in added resources list, instead of being removed, they
// should be updated, so only return scope names that will disappear after update. If conflict
// detected, fill exception_msg with information about scope conflict and return.
Protobuf::RepeatedPtrField<std::string> detectUpdateConflictAndCleanupRemoved(
const std::vector<Envoy::Config::DecodedResourceRef>& added_resources,
const Protobuf::RepeatedPtrField<std::string>& removed_resources, std::string& exception_msg);

// Envoy::Config::SubscriptionCallbacks

// NOTE: state-of-the-world form onConfigUpdate(resources, version_info) will throw an
// EnvoyException on any error and essentially reject an update. While the Delta form
// onConfigUpdate(added_resources, removed_resources, version_info) by design will partially
// accept correct RouteConfiguration from management server.
// NOTE: both delta form and state-of-the-world form onConfigUpdate(resources, version_info) will
// throw an EnvoyException on any error and essentially reject an update.
void onConfigUpdate(const std::vector<Envoy::Config::DecodedResourceRef>& resources,
const std::string& version_info) override;
void onConfigUpdate(const std::vector<Envoy::Config::DecodedResourceRef>& added_resources,
Expand Down
Loading

0 comments on commit 4c5ebd9

Please sign in to comment.