Skip to content

Commit

Permalink
xds: safely handle watch removals during update, nested pause/resume.
Browse files Browse the repository at this point in the history
To fix envoyproxy#11877, we need to handle safely the case where two
watches point at the same resource, and a WatchMap onConfigUpdate() causes one watch to
remove the other watch during its invoked onConfigUpdate().

While working on this, it made sense to fix envoyproxy#11674,
avoiding spurious ClusterLoadAssignment discovery requests in the regression integration test.

Risk level: Medium (this has xDS wire-level implications).
Testing: New unit tests for pause/resume, regression unit and integration tests for watch map
  removal behaviors.

Fixes envoyproxy#11877 envoyproxy#11674

Signed-off-by: Harvey Tuch <htuch@google.com>
Co-authored-by: Sebastian Schepens <sebastian.schepens@mercadolibre.com>
Signed-off-by: Harvey Tuch <htuch@google.com>
  • Loading branch information
htuch and sschepens committed Jul 14, 2020
1 parent 7a83dbb commit 34d8d7c
Show file tree
Hide file tree
Showing 17 changed files with 269 additions and 42 deletions.
1 change: 1 addition & 0 deletions source/common/config/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,7 @@ envoy_cc_library(
":decoded_resource_lib",
"//include/envoy/config:subscription_interface",
"//source/common/common:assert_lib",
"//source/common/common:cleanup_lib",
"//source/common/common:minimal_logger_lib",
"//source/common/protobuf",
"@envoy_api//envoy/service/discovery/v3:pkg_cc_proto",
Expand Down
32 changes: 18 additions & 14 deletions source/common/config/grpc_mux_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ void GrpcMuxImpl::sendDiscoveryRequest(const std::string& type_url) {
}

ApiState& api_state = api_state_[type_url];
if (api_state.paused_) {
if (api_state.pauses_ > 0) {
ENVOY_LOG(trace, "API {} paused during sendDiscoveryRequest(), setting pending.", type_url);
api_state.pending_ = true;
return; // Drop this request; the unpause will enqueue a new one.
Expand Down Expand Up @@ -94,7 +94,7 @@ GrpcMuxWatchPtr GrpcMuxImpl::addWatch(const std::string& type_url,
// TODO(htuch): For RDS/EDS, this will generate a new DiscoveryRequest on each resource we added.
// Consider in the future adding some kind of collation/batching during CDS/LDS updates so that we
// only send a single RDS/EDS update after the CDS/LDS update.
queueDiscoveryRequest(type_url);
sendDiscoveryRequest(type_url);

return watch;
}
Expand All @@ -105,21 +105,19 @@ ScopedResume GrpcMuxImpl::pause(const std::string& type_url) {

ScopedResume GrpcMuxImpl::pause(const std::vector<std::string> type_urls) {
for (const auto& type_url : type_urls) {
ENVOY_LOG(debug, "Pausing discovery requests for {}", type_url);
ApiState& api_state = api_state_[type_url];
ASSERT(!api_state.paused_);
ASSERT(!api_state.pending_);
api_state.paused_ = true;
ENVOY_LOG(debug, "Pausing discovery requests for {} (previous count {})", type_url,
api_state.pauses_);
++api_state.pauses_;
}
return std::make_unique<Cleanup>([this, type_urls]() {
for (const auto& type_url : type_urls) {
ENVOY_LOG(debug, "Resuming discovery requests for {}", type_url);
ApiState& api_state = api_state_[type_url];
ASSERT(api_state.paused_);
api_state.paused_ = false;
ENVOY_LOG(debug, "Resuming discovery requests for {} (previous count {})", type_url,
api_state.pauses_);
ASSERT(api_state.pauses_ > 0);

if (api_state.pending_) {
ASSERT(api_state.subscribed_);
if (--api_state.pauses_ == 0 && api_state.pending_ && api_state.subscribed_) {
queueDiscoveryRequest(type_url);
api_state.pending_ = false;
}
Expand All @@ -132,7 +130,7 @@ bool GrpcMuxImpl::paused(const std::string& type_url) const {
if (entry == api_state_.end()) {
return false;
}
return entry->second.paused_;
return entry->second.pauses_ > 0;
}

bool GrpcMuxImpl::paused(const std::vector<std::string> type_urls) const {
Expand Down Expand Up @@ -173,10 +171,16 @@ void GrpcMuxImpl::onDiscoveryResponse(
// No watches and we have resources - this should not happen. send a NACK (by not
// updating the version).
ENVOY_LOG(warn, "Ignoring unwatched type URL {}", type_url);
queueDiscoveryRequest(type_url);
sendDiscoveryRequest(type_url);
}
return;
}
ScopedResume same_type_resume;
// We pause updates of the same type. This is necessary for SotW and GrpcMuxImpl, since unlike
// delta and NewGRpcMuxImpl, independent watch additions/removals trigger updates regardless of
// the delta state. The proper fix for this is to converge these implementations,
// see https://github.com/envoyproxy/envoy/issues/11477.
same_type_resume = pause(type_url);
try {
// To avoid O(n^2) explosion (e.g. when we have 1000s of EDS watches), we
// build a map here from resource name to resource and then walk watches_.
Expand Down Expand Up @@ -234,7 +238,7 @@ void GrpcMuxImpl::onDiscoveryResponse(
error_detail->set_message(Config::Utility::truncateGrpcStatusMessage(e.what()));
}
api_state_[type_url].request_.set_response_nonce(message->nonce());
queueDiscoveryRequest(type_url);
sendDiscoveryRequest(type_url);
}

void GrpcMuxImpl::onWriteable() { drainRequests(); }
Expand Down
5 changes: 3 additions & 2 deletions source/common/config/grpc_mux_impl.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <cstdint>
#include <queue>
#include <unordered_map>

Expand Down Expand Up @@ -113,8 +114,8 @@ class GrpcMuxImpl : public GrpcMux,
std::list<GrpcMuxWatchImpl*> watches_;
// Current DiscoveryRequest for API.
envoy::service::discovery::v3::DiscoveryRequest request_;
// Paused via pause()?
bool paused_{};
// Count of unresumed pause() invocations.
uint32_t pauses_{};
// Was a DiscoveryRequest elided during a pause?
bool pending_{};
// Has this API been tracked in subscriptions_?
Expand Down
4 changes: 3 additions & 1 deletion source/common/config/new_grpc_mux_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ ScopedResume NewGrpcMuxImpl::pause(const std::vector<std::string> type_urls) {
return std::make_unique<Cleanup>([this, type_urls]() {
for (const auto& type_url : type_urls) {
pausable_ack_queue_.resume(type_url);
trySendDiscoveryRequests();
if (!pausable_ack_queue_.paused(type_url)) {
trySendDiscoveryRequests();
}
}
});
}
Expand Down
23 changes: 11 additions & 12 deletions source/common/config/pausable_ack_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ size_t PausableAckQueue::size() const { return storage_.size(); }

bool PausableAckQueue::empty() {
for (const auto& entry : storage_) {
if (!paused_[entry.type_url_]) {
if (pauses_[entry.type_url_] == 0) {
return false;
}
}
Expand All @@ -22,7 +22,7 @@ bool PausableAckQueue::empty() {

const UpdateAck& PausableAckQueue::front() {
for (const auto& entry : storage_) {
if (!paused_[entry.type_url_]) {
if (pauses_[entry.type_url_] == 0) {
return entry;
}
}
Expand All @@ -32,7 +32,7 @@ const UpdateAck& PausableAckQueue::front() {

UpdateAck PausableAckQueue::popFront() {
for (auto it = storage_.begin(); it != storage_.end(); ++it) {
if (!paused_[it->type_url_]) {
if (pauses_[it->type_url_] == 0) {
UpdateAck ret = *it;
storage_.erase(it);
return ret;
Expand All @@ -44,23 +44,22 @@ UpdateAck PausableAckQueue::popFront() {

void PausableAckQueue::pause(const std::string& type_url) {
// It's ok to pause a subscription that doesn't exist yet.
auto& pause_entry = paused_[type_url];
ASSERT(!pause_entry);
pause_entry = true;
auto& pause_entry = pauses_[type_url];
++pause_entry;
}

void PausableAckQueue::resume(const std::string& type_url) {
auto& pause_entry = paused_[type_url];
ASSERT(pause_entry);
pause_entry = false;
auto& pause_entry = pauses_[type_url];
ASSERT(pause_entry > 0);
--pause_entry;
}

bool PausableAckQueue::paused(const std::string& type_url) const {
auto entry = paused_.find(type_url);
if (entry == paused_.end()) {
auto entry = pauses_.find(type_url);
if (entry == pauses_.end()) {
return false;
}
return entry->second;
return entry->second > 0;
}

} // namespace Config
Expand Down
2 changes: 1 addition & 1 deletion source/common/config/pausable_ack_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class PausableAckQueue {
private:
// It's ok for non-existent subs to be paused/resumed. The cleanest way to support that is to give
// the pause state its own map. (Map key is type_url.)
absl::flat_hash_map<std::string, bool> paused_;
absl::flat_hash_map<std::string, uint32_t> pauses_;
std::list<UpdateAck> storage_;
};

Expand Down
36 changes: 32 additions & 4 deletions source/common/config/watch_map.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include "envoy/service/discovery/v3/discovery.pb.h"

#include "common/common/cleanup.h"
#include "common/config/decoded_resource_impl.h"

namespace Envoy {
Expand All @@ -17,8 +18,20 @@ Watch* WatchMap::addWatch(SubscriptionCallbacks& callbacks,
}

void WatchMap::removeWatch(Watch* watch) {
wildcard_watches_.erase(watch); // may or may not be in there, but we want it gone.
watches_.erase(watch);
if (deferred_removed_during_update_ != nullptr) {
deferred_removed_during_update_->insert(watch);
} else {
wildcard_watches_.erase(watch); // may or may not be in there, but we want it gone.
watches_.erase(watch);
}
}

void WatchMap::removeDeferredWatches() {
for (auto& watch : *deferred_removed_during_update_) {
wildcard_watches_.erase(watch); // may or may not be in there, but we want it gone.
watches_.erase(watch);
}
deferred_removed_during_update_ = nullptr;
}

AddedRemoved WatchMap::updateWatchInterest(Watch* watch,
Expand Down Expand Up @@ -62,6 +75,9 @@ void WatchMap::onConfigUpdate(const Protobuf::RepeatedPtrField<ProtobufWkt::Any>
return;
}

// Track any removals triggered by earlier watch updates.
deferred_removed_during_update_ = std::make_unique<absl::flat_hash_set<Watch*>>();
Cleanup cleanup([this] { removeDeferredWatches(); });
// Build a map from watches, to the set of updated resources that each watch cares about. Each
// entry in the map is then a nice little bundle that can be fed directly into the individual
// onConfigUpdate()s.
Expand All @@ -81,6 +97,9 @@ void WatchMap::onConfigUpdate(const Protobuf::RepeatedPtrField<ProtobufWkt::Any>
// We just bundled up the updates into nice per-watch packages. Now, deliver them.
for (auto& watch : watches_) {
const auto this_watch_updates = per_watch_updates.find(watch);
if (deferred_removed_during_update_->count(watch.get()) > 0) {
continue;
}
if (this_watch_updates == per_watch_updates.end()) {
// This update included no resources this watch cares about.
// 1) If there is only a single, wildcard watch (i.e. Cluster or Listener), always call
Expand All @@ -90,12 +109,12 @@ void WatchMap::onConfigUpdate(const Protobuf::RepeatedPtrField<ProtobufWkt::Any>
// of this watch's resources, so the watch must be informed with an onConfigUpdate.
// 3) Otherwise, we can skip onConfigUpdate for this watch.
if (map_is_single_wildcard || !watch->state_of_the_world_empty_) {
watch->callbacks_.onConfigUpdate({}, version_info);
watch->state_of_the_world_empty_ = true;
watch->callbacks_.onConfigUpdate({}, version_info);
}
} else {
watch->callbacks_.onConfigUpdate(this_watch_updates->second, version_info);
watch->state_of_the_world_empty_ = false;
watch->callbacks_.onConfigUpdate(this_watch_updates->second, version_info);
}
}
}
Expand Down Expand Up @@ -130,6 +149,9 @@ void WatchMap::onConfigUpdate(
const Protobuf::RepeatedPtrField<envoy::service::discovery::v3::Resource>& added_resources,
const Protobuf::RepeatedPtrField<std::string>& removed_resources,
const std::string& system_version_info) {
// Track any removals triggered by earlier watch updates.
deferred_removed_during_update_ = std::make_unique<absl::flat_hash_set<Watch*>>();
Cleanup cleanup([this] { removeDeferredWatches(); });
// Build a pair of maps: from watches, to the set of resources {added,removed} that each watch
// cares about. Each entry in the map-pair is then a nice little bundle that can be fed directly
// into the individual onConfigUpdate()s.
Expand Down Expand Up @@ -159,6 +181,9 @@ void WatchMap::onConfigUpdate(
// We just bundled up the updates into nice per-watch packages. Now, deliver them.
for (const auto& added : per_watch_added) {
const Watch* cur_watch = added.first;
if (deferred_removed_during_update_->count(cur_watch) > 0) {
continue;
}
const auto removed = per_watch_removed.find(cur_watch);
if (removed == per_watch_removed.end()) {
// additions only, no removals
Expand All @@ -172,6 +197,9 @@ void WatchMap::onConfigUpdate(
}
// Any removals-only updates will not have been picked up in the per_watch_added loop.
for (auto& removed : per_watch_removed) {
if (deferred_removed_during_update_->count(removed.first) > 0) {
continue;
}
removed.first->callbacks_.onConfigUpdate({}, removed.second, system_version_info);
}
}
Expand Down
7 changes: 7 additions & 0 deletions source/common/config/watch_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ class WatchMap : public UntypedConfigUpdateCallbacks, public Logger::Loggable<Lo
WatchMap& operator=(const WatchMap&) = delete;

private:
void removeDeferredWatches();

// Given a list of names that are new to an individual watch, returns those names that are in fact
// new to the entire subscription.
std::set<std::string> findAdditions(const std::vector<std::string>& newly_added_to_watch,
Expand All @@ -114,6 +116,11 @@ class WatchMap : public UntypedConfigUpdateCallbacks, public Logger::Loggable<Lo
// Watches whose interest set is currently empty, which is interpreted as "everything".
absl::flat_hash_set<Watch*> wildcard_watches_;

// Watches that have been removed inside the call stack of the WatchMap's onConfigUpdate(). This
// can happen when a watch's onConfigUpdate() results in another watch being removed via
// removeWatch().
std::unique_ptr<absl::flat_hash_set<Watch*>> deferred_removed_during_update_;

// Maps a resource name to the set of watches interested in that resource. Has two purposes:
// 1) Acts as a reference count; no watches care anymore ==> the resource can be removed.
// 2) Enables efficient lookup of all interested watches when a resource has been updated.
Expand Down
3 changes: 3 additions & 0 deletions test/common/config/delta_subscription_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ TEST_F(DeltaSubscriptionImplTest, UpdateResourcesCausesRequest) {
TEST_F(DeltaSubscriptionImplTest, PauseHoldsRequest) {
startSubscription({"name1", "name2", "name3"});
auto resume_sub = subscription_->pause();
// If nested pause wasn't handled correctly, the single expectedSendMessage below would be
// insufficient.
auto nested_resume_sub = subscription_->pause();

expectSendMessage({"name4"}, {"name1", "name2"}, Grpc::Status::WellKnownGrpcStatus::Ok, "", {});
// If not for the pause, these updates would make the expectSendMessage fail due to too many
Expand Down
13 changes: 10 additions & 3 deletions test/common/config/grpc_mux_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,13 @@ TEST_F(GrpcMuxImplTest, PauseResume) {
foo_zz_sub = grpc_mux_->addWatch("foo", {"zz"}, callbacks_, resource_decoder_);
expectSendMessage("foo", {"zz", "z", "x", "y"}, "");
}
// When nesting, we only have a single resumption.
{
ScopedResume a = grpc_mux_->pause("foo");
ScopedResume b = grpc_mux_->pause("foo");
foo_zz_sub = grpc_mux_->addWatch("foo", {"zz"}, callbacks_, resource_decoder_);
expectSendMessage("foo", {"zz", "z", "x", "y"}, "");
}
grpc_mux_->pause("foo")->cancel();
}

Expand Down Expand Up @@ -527,7 +534,7 @@ TEST_F(GrpcMuxImplTestWithMockTimeSystem, TooManyRequestsWithEmptyRateLimitSetti

// Validate that drain_request_timer is enabled when there are no tokens.
EXPECT_CALL(*drain_request_timer, enableTimer(std::chrono::milliseconds(100), _));
onReceiveMessage(99);
onReceiveMessage(100);
EXPECT_EQ(1, stats_.counter("control_plane.rate_limit_enforced").value());
EXPECT_EQ(
1,
Expand Down Expand Up @@ -588,10 +595,10 @@ TEST_F(GrpcMuxImplTest, TooManyRequestsWithCustomRateLimitSettings) {
EXPECT_CALL(*drain_request_timer, enableTimer(std::chrono::milliseconds(500), _))
.Times(AtLeast(1));
onReceiveMessage(160);
EXPECT_EQ(12, stats_.counter("control_plane.rate_limit_enforced").value());
EXPECT_EQ(11, stats_.counter("control_plane.rate_limit_enforced").value());
Stats::Gauge& pending_requests =
stats_.gauge("control_plane.pending_requests", Stats::Gauge::ImportMode::Accumulate);
EXPECT_EQ(12, pending_requests.value());
EXPECT_EQ(11, pending_requests.value());

// Validate that drain requests call when there are multiple requests in queue.
time_system_.setMonotonicTime(std::chrono::seconds(10));
Expand Down
9 changes: 9 additions & 0 deletions test/common/config/pausable_ack_queue_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,15 @@ TEST(PausableAckQueueTest, TestPauseResume) {
EXPECT_EQ("nonce2", p.front().nonce_);
EXPECT_EQ("type2", p.front().type_url_);

// validate the above result is invariant even if we nest pauses.
p.pause("type1");
EXPECT_EQ(4, p.size());
EXPECT_EQ("nonce2", p.front().nonce_);
EXPECT_EQ("type2", p.front().type_url_);
p.resume("type1");
EXPECT_EQ("nonce2", p.front().nonce_);
EXPECT_EQ("type2", p.front().type_url_);

UpdateAck ack = p.popFront();
EXPECT_EQ("nonce2", ack.nonce_);
EXPECT_EQ("type2", ack.type_url_);
Expand Down
Loading

0 comments on commit 34d8d7c

Please sign in to comment.