Skip to content

Commit

Permalink
Fix xDS missing endpoint race condition.
Browse files Browse the repository at this point in the history
This fixes the following race condition:
- Send update endpoints
- Send update cluster
- Recv ACK endpoints
- Recv ACK cluster

Prior to this fix, it would have resulted in the endpoints NOT existing in
Envoy. This occurred because the cluster update implicitly clears the endpoints
in Envoy, but we would never re-send the endpoint data to compensate for the
loss, because we would incorrectly ACK the invalid old endpoint hash. Since the
endpoint's hash did not actually change, they would not be resent.

The fix for this is to effectively clear out the invalid pending ACKs for child
resources whenever the parent changes. This ensures that we do not store the
child's hash as accepted when the race occurs.

An escape-hatch environment variable `XDS_PROTOCOL_LEGACY_CHILD_RESEND` was
added so that users can revert back to the old legacy behavior in the event
that this produces unknown side-effects. Visit the following thread for some
extra context on why certainty around these race conditions is difficult:
envoyproxy/envoy#13009

This bug report and fix was mostly implemented by @ksmiley with some minor
tweaks.

Co-authored-by: Keith Smiley <ksmiley@salesforce.com>
  • Loading branch information
hashi-derek and ksmiley committed Dec 8, 2023
1 parent d93f7f7 commit ef786b7
Show file tree
Hide file tree
Showing 2 changed files with 181 additions and 7 deletions.
44 changes: 38 additions & 6 deletions agent/xds/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"encoding/hex"
"errors"
"fmt"
"os"
"strconv"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -43,6 +44,11 @@ import (

var errOverwhelmed = status.Error(codes.ResourceExhausted, "this server has too many xDS streams open, please try another")

// xdsProtocolLegacyChildResend enables the legacy behavior for the `ensureChildResend` function.
// This environment variable exists as an escape hatch so that users can disable the behavior, if needed.
// Ideally, this is a flag we can remove in 1.19+
var xdsProtocolLegacyChildResend = (os.Getenv("XDS_PROTOCOL_LEGACY_CHILD_RESEND") != "")

type deltaRecvResponse int

const (
Expand Down Expand Up @@ -1080,25 +1086,51 @@ func (t *xDSDeltaType) createDeltaResponse(
}

func (t *xDSDeltaType) ensureChildResend(parentName, childName string) {
if _, exist := t.deltaChild.childType.resourceVersions[childName]; !exist {
return
}
if !t.subscribed(childName) {
return
}

t.logger.Trace(
"triggering implicit update of resource",
"typeUrl", t.typeURL,
"resource", parentName,
"childTypeUrl", t.deltaChild.childType.typeURL,
"childResource", childName,
)

// resourceVersions tracks the last known version for this childName that Envoy
// has ACKed. By setting this to empty it effectively tells us that Envoy does
// not have any data for that child, and we need to re-send.
t.deltaChild.childType.resourceVersions[childName] = ""
if _, exist := t.deltaChild.childType.resourceVersions[childName]; exist {
t.deltaChild.childType.resourceVersions[childName] = ""
}

if xdsProtocolLegacyChildResend {
return
// TODO: This legacy behavior can be removed in 1.19, provided there are no outstanding issues.
//
// In this legacy mode, there is a confirmed race condition:
// - Send update endpoints
// - Send update cluster
// - Recv ACK endpoints
// - Recv ACK cluster
//
// When this situation happens, Envoy wipes the child endpoints when the cluster is updated,
// but it would never receive new ones. The endpoints would not be resent, because their hash
// never changed since the previous ACK.
//
// Due to ambiguity with the Envoy protocol [https://github.com/envoyproxy/envoy/issues/13009],
// it's difficult to state with certainty that no other unexpected side-effects are possible.
// This legacy escape hatch is left in-place in case some other complex race condition crops up.
//
// Longer-term, we should modify the hash of children to include the parent hash so that this
// behavior is implicitly handled, rather than being an edge case.
}

// pendingUpdates can contain newer versions that have been sent to Envoy but
// that we haven't processed an ACK for yet. These need to be cleared out, too,
// so that they aren't moved to resourceVersions by ack()
for nonce := range t.deltaChild.childType.pendingUpdates {
delete(t.deltaChild.childType.pendingUpdates[nonce], childName)
}
}

func computeResourceVersions(resourceMap *xdscommon.IndexedResources) (map[string]map[string]string, error) {
Expand Down
144 changes: 143 additions & 1 deletion agent/xds/delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ package xds
import (
"errors"
"fmt"
"github.com/hashicorp/consul/envoyextensions/xdscommon"
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/hashicorp/consul/envoyextensions/xdscommon"

"github.com/armon/go-metrics"
envoy_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
envoy_listener_v3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
Expand Down Expand Up @@ -821,6 +822,147 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpa
}
}

func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangeBeforeEndpointAck(t *testing.T) {
// This test ensures that the following race condition does not block indefinitely:
// - Send update endpoints
// - Send update cluster
// - Recv ACK endpoints
// - Recv ACK cluster
// Prior to a bug fix, this would have resulted in the endpoints NOT existing in Envoy. This occurred because
// the cluster update implicitly clears the endpoints in Envoy, but we would never re-send the endpoint data
// to compensate for the loss because we would incorrectly ACK the invalid old endpoint hash. Since the
// endpoint's hash did not actually change, they would not be resent.
aclResolve := func(id string) (acl.Authorizer, error) {
// Allow all
return acl.RootAuthorizer("manage"), nil
}
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0)
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy

sid := structs.NewServiceID("web-sidecar-proxy", nil)

// Register the proxy to create state needed to Watch() on
mgr.RegisterProxy(t, sid)

var snap *proxycfg.ConfigSnapshot
testutil.RunStep(t, "initial setup", func(t *testing.T) {
snap = newTestSnapshot(t, nil, "", nil)

// Send initial cluster discover.
envoy.SendDeltaReq(t, xdscommon.ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{})

// Check no response sent yet
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)

requireProtocolVersionGauge(t, scenario, "v3", 1)

// Deliver a new snapshot (tcp with one tcp upstream)
mgr.DeliverConfig(t, sid, snap)

assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: xdscommon.ClusterType,
Nonce: hexString(1),
Resources: makeTestResources(t,
makeTestCluster(t, snap, "tcp:local_app"),
makeTestCluster(t, snap, "tcp:db"),
makeTestCluster(t, snap, "tcp:geo-cache"),
),
})
})

var newSnap *proxycfg.ConfigSnapshot
testutil.RunStep(t, "resend cluster immediately", func(t *testing.T) {
// Deliver updated snapshot with new CA roots and leaf certificate. This will not be
// sent to Envoy until the initial set of cluster message is ACKed.
newSnap = newTestSnapshot(t, nil, "", nil)
mgr.DeliverConfig(t, sid, newSnap)

// Envoy then tries to discover endpoints for clusters.
envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
ResourceNamesSubscribe: []string{
"db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
"geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul",
},
})

// We should get a response immediately since the config is already present in
// the server for endpoints. Note that this should not be racy if the server
// is behaving well since the Cluster send above should be blocked until we
// deliver a new config version.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: xdscommon.EndpointType,
Nonce: hexString(2),
Resources: makeTestResources(t,
makeTestEndpoints(t, snap, "tcp:db"),
makeTestEndpoints(t, snap, "tcp:geo-cache"),
),
})

// After receiving the endpoints Envoy sends an ACK for the clusters
envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1)

// The updated cluster snapshot with new certificates is sent immediately
// after the first is ACKed.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: xdscommon.ClusterType,
Nonce: hexString(3),
Resources: makeTestResources(t,
// SAME makeTestCluster(t, snap, "tcp:local_app"),
makeTestCluster(t, newSnap, "tcp:db"),
makeTestCluster(t, newSnap, "tcp:geo-cache"),
),
})
})

testutil.RunStep(t, "resend endpoints", func(t *testing.T) {
// Envoy requests listeners because it has received endpoints. We won't send listeners
// until Envoy ACKs the second cluster update.
envoy.SendDeltaReq(t, xdscommon.ListenerType, nil)

// Envoy ACKs the endpoints from the first cluster update.
envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 2)

// Resend endpoints because the clusters changed.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: xdscommon.EndpointType,
Nonce: hexString(4),
Resources: makeTestResources(t,
makeTestEndpoints(t, newSnap, "tcp:db"),
makeTestEndpoints(t, newSnap, "tcp:geo-cache"),
),
})

// Envoy ACKs the new cluster and endpoints.
envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 3)
envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 4)

// Listeners are sent after the cluster and endpoints are ACKed.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: xdscommon.ListenerType,
Nonce: hexString(5),
Resources: makeTestResources(t,
makeTestListener(t, newSnap, "tcp:public_listener"),
makeTestListener(t, newSnap, "tcp:db"),
makeTestListener(t, newSnap, "tcp:geo-cache"),
),
})

// We are caught up, so there should be nothing queued to send.
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)

// ACKs the listener
envoy.SendDeltaReqACK(t, xdscommon.ListenerType, 5)
})

envoy.Close()
select {
case err := <-errCh:
require.NoError(t, err)
case <-time.After(50 * time.Millisecond):
t.Fatalf("timed out waiting for handler to finish")
}
}

func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChangesImpactRoutes(t *testing.T) {
aclResolve := func(id string) (acl.Authorizer, error) {
// Allow all
Expand Down

0 comments on commit ef786b7

Please sign in to comment.