Skip to content

Commit

Permalink
xds-sotw: reset nonce when reconnecting (#30206)
Browse files Browse the repository at this point in the history
Similar to #16037, this PR resets the nonce after reconnection for SotW.
This change is a bug fix that ensures SotW behaves according to the xDS protocol.

Risk Level: low
Testing: Added a test.
Docs Changes: N/A.
Release Notes: Added.

Fixes: #30155

Signed-off-by: Adi Suissa-Peleg <adip@google.com>
  • Loading branch information
adisuissa authored Oct 18, 2023
1 parent fc9046f commit 525b1d2
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 0 deletions.
3 changes: 3 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,9 @@ bug_fixes:
- area: docker/publishing
change: |
Update base images to resolve various glibc vulnerabilities.
- area: xds
change: |
Fix a bug where the nonce was not reset after reconnecting to the xDS server, when using State-of-the-World.
removed_config_or_runtime:
# *Normally occurs at the end of the* :ref:`deprecation period <deprecated>`
Expand Down
10 changes: 10 additions & 0 deletions source/extensions/config_subscription/grpc/grpc_mux_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,15 @@ void GrpcMuxImpl::sendDiscoveryRequest(absl::string_view type_url) {
}
}

void GrpcMuxImpl::clearNonce() {
// Iterate over all api_states (for each type_url), and clear its nonce.
for (auto& [type_url, api_state] : api_state_) {
if (api_state) {
api_state->request_.clear_response_nonce();
}
}
}

void GrpcMuxImpl::loadConfigFromDelegate(const std::string& type_url,
const absl::flat_hash_set<std::string>& resource_names) {
if (!xds_resources_delegate_.has_value()) {
Expand Down Expand Up @@ -451,6 +460,7 @@ void GrpcMuxImpl::onWriteable() { drainRequests(); }
void GrpcMuxImpl::onStreamEstablished() {
first_stream_request_ = true;
grpc_stream_.maybeUpdateQueueSizeStat(0);
clearNonce();
request_queue_ = std::make_unique<std::queue<std::string>>();
for (const auto& type_url : subscriptions_) {
queueDiscoveryRequest(type_url);
Expand Down
2 changes: 2 additions & 0 deletions source/extensions/config_subscription/grpc/grpc_mux_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ class GrpcMuxImpl : public GrpcMux,
void drainRequests();
void setRetryTimer();
void sendDiscoveryRequest(absl::string_view type_url);
// Clears the nonces of all subscribed types in this gRPC mux.
void clearNonce();

struct GrpcMuxWatchImpl : public GrpcMuxWatch {
GrpcMuxWatchImpl(const absl::flat_hash_set<std::string>& resources,
Expand Down
57 changes: 57 additions & 0 deletions test/extensions/config_subscription/grpc/grpc_mux_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ using testing::IsSubstring;
using testing::NiceMock;
using testing::Return;
using testing::ReturnRef;
using testing::SaveArg;

namespace Envoy {
namespace Config {
Expand Down Expand Up @@ -276,6 +277,62 @@ TEST_F(GrpcMuxImplTest, ResetStream) {
expectSendMessage("foo", {}, "");
}

// Validate cached nonces are cleared on reconnection.
TEST_F(GrpcMuxImplTest, ReconnectionResetsNonceAndAcks) {
OpaqueResourceDecoderSharedPtr resource_decoder(
std::make_shared<TestUtility::TestOpaqueResourceDecoderImpl<
envoy::config::endpoint::v3::ClusterLoadAssignment>>("cluster_name"));
// Create the retry timer that will invoke the callback that will trigger
// reconnection when the gRPC connection is closed.
Event::MockTimer* grpc_stream_retry_timer{new Event::MockTimer()};
Event::MockTimer* ttl_mgr_timer{new NiceMock<Event::MockTimer>()};
Event::TimerCb grpc_stream_retry_timer_cb;
EXPECT_CALL(dispatcher_, createTimer_(_))
.WillOnce(
testing::DoAll(SaveArg<0>(&grpc_stream_retry_timer_cb), Return(grpc_stream_retry_timer)))
// Happens when adding a type url watch.
.WillRepeatedly(Return(ttl_mgr_timer));
setup();
InSequence s;
const std::string& type_url = Config::TypeUrl::get().ClusterLoadAssignment;
auto foo_sub = grpc_mux_->addWatch(type_url, {"x", "y"}, callbacks_, resource_decoder, {});
EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_));
// Send on connection.
expectSendMessage(type_url, {"x", "y"}, {}, true);
grpc_mux_->start();

// Create a reply with some nonce.
auto response = std::make_unique<envoy::service::discovery::v3::DiscoveryResponse>();
response->set_type_url(type_url);
response->set_version_info("3000");
response->set_nonce("111");
auto add_response_resource = [](const std::string& name,
envoy::service::discovery::v3::DiscoveryResponse& response) {
envoy::config::endpoint::v3::ClusterLoadAssignment cla;
cla.set_cluster_name(name);
auto res = response.add_resources();
res->PackFrom(cla);
};
add_response_resource("x", *response);
add_response_resource("y", *response);
{
// Pause EDS to allow the ACK to be cached.
auto resume_eds = grpc_mux_->pause(type_url);
// Send the reply.
grpc_mux_->grpcStreamForTest().onReceiveMessage(std::move(response));
// Now disconnect, gRPC stream retry timer will kick in and reconnection will happen.
EXPECT_CALL(*grpc_stream_retry_timer, enableTimer(_, _))
.WillOnce(Invoke(grpc_stream_retry_timer_cb));
EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_));
grpc_mux_->grpcStreamForTest().onRemoteClose(Grpc::Status::WellKnownGrpcStatus::Canceled, "");

// Unpausing will initiate a new request, with the same resources, version,
// but empty nonce.
expectSendMessage(type_url, {"x", "y"}, "3000", true, "");
}
expectSendMessage(type_url, {}, "3000", false);
}

// Validate pause-resume behavior.
TEST_F(GrpcMuxImplTest, PauseResume) {
setup();
Expand Down

0 comments on commit 525b1d2

Please sign in to comment.