From 32745ca0eb9c998a1f262beeb404afa60ca13d3f Mon Sep 17 00:00:00 2001 From: Derek Menteer Date: Fri, 8 Dec 2023 15:46:40 +0000 Subject: [PATCH] backport of commit 7d8764dc0e323904e1517739109dcda41528e685 --- .changelog/19866.txt | 3 + agent/xds/delta.go | 44 ++++++++++-- agent/xds/delta_test.go | 144 +++++++++++++++++++++++++++++++++++++++- 3 files changed, 184 insertions(+), 7 deletions(-) create mode 100644 .changelog/19866.txt diff --git a/.changelog/19866.txt b/.changelog/19866.txt new file mode 100644 index 000000000000..b1fd94f8cb2c --- /dev/null +++ b/.changelog/19866.txt @@ -0,0 +1,3 @@ +```release-note:bug +xds: ensure child resources are re-sent to Envoy when the parent is updated even if the child already has pending updates. +``` diff --git a/agent/xds/delta.go b/agent/xds/delta.go index f5d07e2763ae..70b65afe2719 100644 --- a/agent/xds/delta.go +++ b/agent/xds/delta.go @@ -8,6 +8,7 @@ import ( "encoding/hex" "errors" "fmt" + "os" "strconv" "sync" "sync/atomic" @@ -43,6 +44,11 @@ import ( var errOverwhelmed = status.Error(codes.ResourceExhausted, "this server has too many xDS streams open, please try another") +// xdsProtocolLegacyChildResend enables the legacy behavior for the `ensureChildResend` function. +// This environment variable exists as an escape hatch so that users can disable the behavior, if needed. +// Ideally, this is a flag we can remove in 1.19+ +var xdsProtocolLegacyChildResend = (os.Getenv("XDS_PROTOCOL_LEGACY_CHILD_RESEND") != "") + type deltaRecvResponse int const ( @@ -1080,13 +1086,9 @@ func (t *xDSDeltaType) createDeltaResponse( } func (t *xDSDeltaType) ensureChildResend(parentName, childName string) { - if _, exist := t.deltaChild.childType.resourceVersions[childName]; !exist { - return - } if !t.subscribed(childName) { return } - t.logger.Trace( "triggering implicit update of resource", "typeUrl", t.typeURL, @@ -1094,11 +1096,41 @@ func (t *xDSDeltaType) ensureChildResend(parentName, childName string) { "childTypeUrl", t.deltaChild.childType.typeURL, "childResource", childName, ) - // resourceVersions tracks the last known version for this childName that Envoy // has ACKed. By setting this to empty it effectively tells us that Envoy does // not have any data for that child, and we need to re-send. - t.deltaChild.childType.resourceVersions[childName] = "" + if _, exist := t.deltaChild.childType.resourceVersions[childName]; exist { + t.deltaChild.childType.resourceVersions[childName] = "" + } + + if xdsProtocolLegacyChildResend { + return + // TODO: This legacy behavior can be removed in 1.19, provided there are no outstanding issues. + // + // In this legacy mode, there is a confirmed race condition: + // - Send update endpoints + // - Send update cluster + // - Recv ACK endpoints + // - Recv ACK cluster + // + // When this situation happens, Envoy wipes the child endpoints when the cluster is updated, + // but it would never receive new ones. The endpoints would not be resent, because their hash + // never changed since the previous ACK. + // + // Due to ambiguity with the Envoy protocol [https://github.com/envoyproxy/envoy/issues/13009], + // it's difficult to state with certainty that no other unexpected side-effects are possible. + // This legacy escape hatch is left in-place in case some other complex race condition crops up. + // + // Longer-term, we should modify the hash of children to include the parent hash so that this + // behavior is implicitly handled, rather than being an edge case. + } + + // pendingUpdates can contain newer versions that have been sent to Envoy but + // that we haven't processed an ACK for yet. These need to be cleared out, too, + // so that they aren't moved to resourceVersions by ack() + for nonce := range t.deltaChild.childType.pendingUpdates { + delete(t.deltaChild.childType.pendingUpdates[nonce], childName) + } } func computeResourceVersions(resourceMap *xdscommon.IndexedResources) (map[string]map[string]string, error) { diff --git a/agent/xds/delta_test.go b/agent/xds/delta_test.go index d93d06271bac..34150cc30fd3 100644 --- a/agent/xds/delta_test.go +++ b/agent/xds/delta_test.go @@ -6,7 +6,6 @@ package xds import ( "errors" "fmt" - "github.com/hashicorp/consul/envoyextensions/xdscommon" "strconv" "strings" "sync" @@ -14,6 +13,8 @@ import ( "testing" "time" + "github.com/hashicorp/consul/envoyextensions/xdscommon" + "github.com/armon/go-metrics" envoy_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" envoy_listener_v3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" @@ -821,6 +822,147 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpa } } +func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangeBeforeEndpointAck(t *testing.T) { + // This test ensures that the following race condition does not block indefinitely: + // - Send update endpoints + // - Send update cluster + // - Recv ACK endpoints + // - Recv ACK cluster + // Prior to a bug fix, this would have resulted in the endpoints NOT existing in Envoy. This occurred because + // the cluster update implicitly clears the endpoints in Envoy, but we would never re-send the endpoint data + // to compensate for the loss because we would incorrectly ACK the invalid old endpoint hash. Since the + // endpoint's hash did not actually change, they would not be resent. + aclResolve := func(id string) (acl.Authorizer, error) { + // Allow all + return acl.RootAuthorizer("manage"), nil + } + scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0) + mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy + + sid := structs.NewServiceID("web-sidecar-proxy", nil) + + // Register the proxy to create state needed to Watch() on + mgr.RegisterProxy(t, sid) + + var snap *proxycfg.ConfigSnapshot + testutil.RunStep(t, "initial setup", func(t *testing.T) { + snap = newTestSnapshot(t, nil, "", nil) + + // Send initial cluster discover. + envoy.SendDeltaReq(t, xdscommon.ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{}) + + // Check no response sent yet + assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) + + requireProtocolVersionGauge(t, scenario, "v3", 1) + + // Deliver a new snapshot (tcp with one tcp upstream) + mgr.DeliverConfig(t, sid, snap) + + assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ + TypeUrl: xdscommon.ClusterType, + Nonce: hexString(1), + Resources: makeTestResources(t, + makeTestCluster(t, snap, "tcp:local_app"), + makeTestCluster(t, snap, "tcp:db"), + makeTestCluster(t, snap, "tcp:geo-cache"), + ), + }) + }) + + var newSnap *proxycfg.ConfigSnapshot + testutil.RunStep(t, "resend cluster immediately", func(t *testing.T) { + // Deliver updated snapshot with new CA roots and leaf certificate. This will not be + // sent to Envoy until the initial set of cluster message is ACKed. + newSnap = newTestSnapshot(t, nil, "", nil) + mgr.DeliverConfig(t, sid, newSnap) + + // Envoy then tries to discover endpoints for clusters. + envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{ + ResourceNamesSubscribe: []string{ + "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul", + "geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul", + }, + }) + + // We should get a response immediately since the config is already present in + // the server for endpoints. Note that this should not be racy if the server + // is behaving well since the Cluster send above should be blocked until we + // deliver a new config version. + assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ + TypeUrl: xdscommon.EndpointType, + Nonce: hexString(2), + Resources: makeTestResources(t, + makeTestEndpoints(t, snap, "tcp:db"), + makeTestEndpoints(t, snap, "tcp:geo-cache"), + ), + }) + + // After receiving the endpoints Envoy sends an ACK for the clusters + envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1) + + // The updated cluster snapshot with new certificates is sent immediately + // after the first is ACKed. + assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ + TypeUrl: xdscommon.ClusterType, + Nonce: hexString(3), + Resources: makeTestResources(t, + // SAME makeTestCluster(t, snap, "tcp:local_app"), + makeTestCluster(t, newSnap, "tcp:db"), + makeTestCluster(t, newSnap, "tcp:geo-cache"), + ), + }) + }) + + testutil.RunStep(t, "resend endpoints", func(t *testing.T) { + // Envoy requests listeners because it has received endpoints. We won't send listeners + // until Envoy ACKs the second cluster update. + envoy.SendDeltaReq(t, xdscommon.ListenerType, nil) + + // Envoy ACKs the endpoints from the first cluster update. + envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 2) + + // Resend endpoints because the clusters changed. + assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ + TypeUrl: xdscommon.EndpointType, + Nonce: hexString(4), + Resources: makeTestResources(t, + makeTestEndpoints(t, newSnap, "tcp:db"), + makeTestEndpoints(t, newSnap, "tcp:geo-cache"), + ), + }) + + // Envoy ACKs the new cluster and endpoints. + envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 3) + envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 4) + + // Listeners are sent after the cluster and endpoints are ACKed. + assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ + TypeUrl: xdscommon.ListenerType, + Nonce: hexString(5), + Resources: makeTestResources(t, + makeTestListener(t, newSnap, "tcp:public_listener"), + makeTestListener(t, newSnap, "tcp:db"), + makeTestListener(t, newSnap, "tcp:geo-cache"), + ), + }) + + // We are caught up, so there should be nothing queued to send. + assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) + + // ACKs the listener + envoy.SendDeltaReqACK(t, xdscommon.ListenerType, 5) + }) + + envoy.Close() + select { + case err := <-errCh: + require.NoError(t, err) + case <-time.After(50 * time.Millisecond): + t.Fatalf("timed out waiting for handler to finish") + } +} + func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChangesImpactRoutes(t *testing.T) { aclResolve := func(id string) (acl.Authorizer, error) { // Allow all