Skip to content

Commit

Permalink
backport of commit 7d8764d
Browse files Browse the repository at this point in the history
  • Loading branch information
hashi-derek committed Dec 8, 2023
1 parent 94fd096 commit 32745ca
Show file tree
Hide file tree
Showing 3 changed files with 184 additions and 7 deletions.
3 changes: 3 additions & 0 deletions .changelog/19866.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
xds: ensure child resources are re-sent to Envoy when the parent is updated even if the child already has pending updates.
```
44 changes: 38 additions & 6 deletions agent/xds/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"encoding/hex"
"errors"
"fmt"
"os"
"strconv"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -43,6 +44,11 @@ import (

var errOverwhelmed = status.Error(codes.ResourceExhausted, "this server has too many xDS streams open, please try another")

// xdsProtocolLegacyChildResend enables the legacy behavior for the `ensureChildResend` function.
// This environment variable exists as an escape hatch so that users can disable the behavior, if needed.
// Ideally, this is a flag we can remove in 1.19+
var xdsProtocolLegacyChildResend = (os.Getenv("XDS_PROTOCOL_LEGACY_CHILD_RESEND") != "")

type deltaRecvResponse int

const (
Expand Down Expand Up @@ -1080,25 +1086,51 @@ func (t *xDSDeltaType) createDeltaResponse(
}

func (t *xDSDeltaType) ensureChildResend(parentName, childName string) {
if _, exist := t.deltaChild.childType.resourceVersions[childName]; !exist {
return
}
if !t.subscribed(childName) {
return
}

t.logger.Trace(
"triggering implicit update of resource",
"typeUrl", t.typeURL,
"resource", parentName,
"childTypeUrl", t.deltaChild.childType.typeURL,
"childResource", childName,
)

// resourceVersions tracks the last known version for this childName that Envoy
// has ACKed. By setting this to empty it effectively tells us that Envoy does
// not have any data for that child, and we need to re-send.
t.deltaChild.childType.resourceVersions[childName] = ""
if _, exist := t.deltaChild.childType.resourceVersions[childName]; exist {
t.deltaChild.childType.resourceVersions[childName] = ""
}

if xdsProtocolLegacyChildResend {
return
// TODO: This legacy behavior can be removed in 1.19, provided there are no outstanding issues.
//
// In this legacy mode, there is a confirmed race condition:
// - Send update endpoints
// - Send update cluster
// - Recv ACK endpoints
// - Recv ACK cluster
//
// When this situation happens, Envoy wipes the child endpoints when the cluster is updated,
// but it would never receive new ones. The endpoints would not be resent, because their hash
// never changed since the previous ACK.
//
// Due to ambiguity with the Envoy protocol [https://github.com/envoyproxy/envoy/issues/13009],
// it's difficult to state with certainty that no other unexpected side-effects are possible.
// This legacy escape hatch is left in-place in case some other complex race condition crops up.
//
// Longer-term, we should modify the hash of children to include the parent hash so that this
// behavior is implicitly handled, rather than being an edge case.
}

// pendingUpdates can contain newer versions that have been sent to Envoy but
// that we haven't processed an ACK for yet. These need to be cleared out, too,
// so that they aren't moved to resourceVersions by ack()
for nonce := range t.deltaChild.childType.pendingUpdates {
delete(t.deltaChild.childType.pendingUpdates[nonce], childName)
}
}

func computeResourceVersions(resourceMap *xdscommon.IndexedResources) (map[string]map[string]string, error) {
Expand Down
144 changes: 143 additions & 1 deletion agent/xds/delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ package xds
import (
"errors"
"fmt"
"github.com/hashicorp/consul/envoyextensions/xdscommon"
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/hashicorp/consul/envoyextensions/xdscommon"

"github.com/armon/go-metrics"
envoy_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
envoy_listener_v3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
Expand Down Expand Up @@ -821,6 +822,147 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpa
}
}

func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangeBeforeEndpointAck(t *testing.T) {
// This test ensures that the following race condition does not block indefinitely:
// - Send update endpoints
// - Send update cluster
// - Recv ACK endpoints
// - Recv ACK cluster
// Prior to a bug fix, this would have resulted in the endpoints NOT existing in Envoy. This occurred because
// the cluster update implicitly clears the endpoints in Envoy, but we would never re-send the endpoint data
// to compensate for the loss because we would incorrectly ACK the invalid old endpoint hash. Since the
// endpoint's hash did not actually change, they would not be resent.
aclResolve := func(id string) (acl.Authorizer, error) {
// Allow all
return acl.RootAuthorizer("manage"), nil
}
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0)
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy

sid := structs.NewServiceID("web-sidecar-proxy", nil)

// Register the proxy to create state needed to Watch() on
mgr.RegisterProxy(t, sid)

var snap *proxycfg.ConfigSnapshot
testutil.RunStep(t, "initial setup", func(t *testing.T) {
snap = newTestSnapshot(t, nil, "", nil)

// Send initial cluster discover.
envoy.SendDeltaReq(t, xdscommon.ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{})

// Check no response sent yet
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)

requireProtocolVersionGauge(t, scenario, "v3", 1)

// Deliver a new snapshot (tcp with one tcp upstream)
mgr.DeliverConfig(t, sid, snap)

assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: xdscommon.ClusterType,
Nonce: hexString(1),
Resources: makeTestResources(t,
makeTestCluster(t, snap, "tcp:local_app"),
makeTestCluster(t, snap, "tcp:db"),
makeTestCluster(t, snap, "tcp:geo-cache"),
),
})
})

var newSnap *proxycfg.ConfigSnapshot
testutil.RunStep(t, "resend cluster immediately", func(t *testing.T) {
// Deliver updated snapshot with new CA roots and leaf certificate. This will not be
// sent to Envoy until the initial set of cluster message is ACKed.
newSnap = newTestSnapshot(t, nil, "", nil)
mgr.DeliverConfig(t, sid, newSnap)

// Envoy then tries to discover endpoints for clusters.
envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
ResourceNamesSubscribe: []string{
"db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
"geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul",
},
})

// We should get a response immediately since the config is already present in
// the server for endpoints. Note that this should not be racy if the server
// is behaving well since the Cluster send above should be blocked until we
// deliver a new config version.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: xdscommon.EndpointType,
Nonce: hexString(2),
Resources: makeTestResources(t,
makeTestEndpoints(t, snap, "tcp:db"),
makeTestEndpoints(t, snap, "tcp:geo-cache"),
),
})

// After receiving the endpoints Envoy sends an ACK for the clusters
envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1)

// The updated cluster snapshot with new certificates is sent immediately
// after the first is ACKed.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: xdscommon.ClusterType,
Nonce: hexString(3),
Resources: makeTestResources(t,
// SAME makeTestCluster(t, snap, "tcp:local_app"),
makeTestCluster(t, newSnap, "tcp:db"),
makeTestCluster(t, newSnap, "tcp:geo-cache"),
),
})
})

testutil.RunStep(t, "resend endpoints", func(t *testing.T) {
// Envoy requests listeners because it has received endpoints. We won't send listeners
// until Envoy ACKs the second cluster update.
envoy.SendDeltaReq(t, xdscommon.ListenerType, nil)

// Envoy ACKs the endpoints from the first cluster update.
envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 2)

// Resend endpoints because the clusters changed.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: xdscommon.EndpointType,
Nonce: hexString(4),
Resources: makeTestResources(t,
makeTestEndpoints(t, newSnap, "tcp:db"),
makeTestEndpoints(t, newSnap, "tcp:geo-cache"),
),
})

// Envoy ACKs the new cluster and endpoints.
envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 3)
envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 4)

// Listeners are sent after the cluster and endpoints are ACKed.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: xdscommon.ListenerType,
Nonce: hexString(5),
Resources: makeTestResources(t,
makeTestListener(t, newSnap, "tcp:public_listener"),
makeTestListener(t, newSnap, "tcp:db"),
makeTestListener(t, newSnap, "tcp:geo-cache"),
),
})

// We are caught up, so there should be nothing queued to send.
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)

// ACKs the listener
envoy.SendDeltaReqACK(t, xdscommon.ListenerType, 5)
})

envoy.Close()
select {
case err := <-errCh:
require.NoError(t, err)
case <-time.After(50 * time.Millisecond):
t.Fatalf("timed out waiting for handler to finish")
}
}

func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChangesImpactRoutes(t *testing.T) {
aclResolve := func(id string) (acl.Authorizer, error) {
// Allow all
Expand Down

0 comments on commit 32745ca

Please sign in to comment.