Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix xDS missing endpoint race condition. #19866

Merged
merged 1 commit into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/19866.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
xds: ensure child resources are re-sent to Envoy when the parent is updated even if the child already has pending updates.
```
44 changes: 38 additions & 6 deletions agent/xds/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"encoding/hex"
"errors"
"fmt"
"os"
"strconv"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -43,6 +44,11 @@ import (

var errOverwhelmed = status.Error(codes.ResourceExhausted, "this server has too many xDS streams open, please try another")

// xdsProtocolLegacyChildResend enables the legacy behavior for the `ensureChildResend` function.
// This environment variable exists as an escape hatch so that users can disable the behavior, if needed.
// Ideally, this is a flag we can remove in 1.19+
var xdsProtocolLegacyChildResend = (os.Getenv("XDS_PROTOCOL_LEGACY_CHILD_RESEND") != "")

type deltaRecvResponse int

const (
Expand Down Expand Up @@ -1080,25 +1086,51 @@ func (t *xDSDeltaType) createDeltaResponse(
}

func (t *xDSDeltaType) ensureChildResend(parentName, childName string) {
if _, exist := t.deltaChild.childType.resourceVersions[childName]; !exist {
return
}
if !t.subscribed(childName) {
return
}

t.logger.Trace(
"triggering implicit update of resource",
"typeUrl", t.typeURL,
"resource", parentName,
"childTypeUrl", t.deltaChild.childType.typeURL,
"childResource", childName,
)

// resourceVersions tracks the last known version for this childName that Envoy
// has ACKed. By setting this to empty it effectively tells us that Envoy does
// not have any data for that child, and we need to re-send.
t.deltaChild.childType.resourceVersions[childName] = ""
if _, exist := t.deltaChild.childType.resourceVersions[childName]; exist {
t.deltaChild.childType.resourceVersions[childName] = ""
}

if xdsProtocolLegacyChildResend {
return
// TODO: This legacy behavior can be removed in 1.19, provided there are no outstanding issues.
//
// In this legacy mode, there is a confirmed race condition:
// - Send update endpoints
// - Send update cluster
// - Recv ACK endpoints
// - Recv ACK cluster
//
// When this situation happens, Envoy wipes the child endpoints when the cluster is updated,
// but it would never receive new ones. The endpoints would not be resent, because their hash
// never changed since the previous ACK.
//
// Due to ambiguity with the Envoy protocol [https://github.com/envoyproxy/envoy/issues/13009],
// it's difficult to state with certainty that no other unexpected side-effects are possible.
// This legacy escape hatch is left in-place in case some other complex race condition crops up.
//
// Longer-term, we should modify the hash of children to include the parent hash so that this
// behavior is implicitly handled, rather than being an edge case.
}

// pendingUpdates can contain newer versions that have been sent to Envoy but
// that we haven't processed an ACK for yet. These need to be cleared out, too,
// so that they aren't moved to resourceVersions by ack()
for nonce := range t.deltaChild.childType.pendingUpdates {
delete(t.deltaChild.childType.pendingUpdates[nonce], childName)
}
}

func computeResourceVersions(resourceMap *xdscommon.IndexedResources) (map[string]map[string]string, error) {
Expand Down
144 changes: 143 additions & 1 deletion agent/xds/delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ package xds
import (
"errors"
"fmt"
"github.com/hashicorp/consul/envoyextensions/xdscommon"
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/hashicorp/consul/envoyextensions/xdscommon"

"github.com/armon/go-metrics"
envoy_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
envoy_listener_v3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
Expand Down Expand Up @@ -821,6 +822,147 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpa
}
}

func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangeBeforeEndpointAck(t *testing.T) {
// This test ensures that the following race condition does not block indefinitely:
// - Send update endpoints
// - Send update cluster
// - Recv ACK endpoints
// - Recv ACK cluster
// Prior to a bug fix, this would have resulted in the endpoints NOT existing in Envoy. This occurred because
// the cluster update implicitly clears the endpoints in Envoy, but we would never re-send the endpoint data
// to compensate for the loss because we would incorrectly ACK the invalid old endpoint hash. Since the
// endpoint's hash did not actually change, they would not be resent.
aclResolve := func(id string) (acl.Authorizer, error) {
// Allow all
return acl.RootAuthorizer("manage"), nil
}
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0)
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy

sid := structs.NewServiceID("web-sidecar-proxy", nil)

// Register the proxy to create state needed to Watch() on
mgr.RegisterProxy(t, sid)

var snap *proxycfg.ConfigSnapshot
testutil.RunStep(t, "initial setup", func(t *testing.T) {
snap = newTestSnapshot(t, nil, "", nil)

// Send initial cluster discover.
envoy.SendDeltaReq(t, xdscommon.ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{})

// Check no response sent yet
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)

requireProtocolVersionGauge(t, scenario, "v3", 1)

// Deliver a new snapshot (tcp with one tcp upstream)
mgr.DeliverConfig(t, sid, snap)

assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: xdscommon.ClusterType,
Nonce: hexString(1),
Resources: makeTestResources(t,
makeTestCluster(t, snap, "tcp:local_app"),
makeTestCluster(t, snap, "tcp:db"),
makeTestCluster(t, snap, "tcp:geo-cache"),
),
})
})

var newSnap *proxycfg.ConfigSnapshot
testutil.RunStep(t, "resend cluster immediately", func(t *testing.T) {
// Deliver updated snapshot with new CA roots and leaf certificate. This will not be
// sent to Envoy until the initial set of cluster message is ACKed.
newSnap = newTestSnapshot(t, nil, "", nil)
mgr.DeliverConfig(t, sid, newSnap)

// Envoy then tries to discover endpoints for clusters.
envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
ResourceNamesSubscribe: []string{
"db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
"geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul",
},
})

// We should get a response immediately since the config is already present in
// the server for endpoints. Note that this should not be racy if the server
// is behaving well since the Cluster send above should be blocked until we
// deliver a new config version.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: xdscommon.EndpointType,
Nonce: hexString(2),
Resources: makeTestResources(t,
makeTestEndpoints(t, snap, "tcp:db"),
makeTestEndpoints(t, snap, "tcp:geo-cache"),
),
})

// After receiving the endpoints Envoy sends an ACK for the clusters
envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1)

// The updated cluster snapshot with new certificates is sent immediately
// after the first is ACKed.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: xdscommon.ClusterType,
Nonce: hexString(3),
Resources: makeTestResources(t,
// SAME makeTestCluster(t, snap, "tcp:local_app"),
makeTestCluster(t, newSnap, "tcp:db"),
makeTestCluster(t, newSnap, "tcp:geo-cache"),
),
})
})

testutil.RunStep(t, "resend endpoints", func(t *testing.T) {
// Envoy requests listeners because it has received endpoints. We won't send listeners
// until Envoy ACKs the second cluster update.
envoy.SendDeltaReq(t, xdscommon.ListenerType, nil)

// Envoy ACKs the endpoints from the first cluster update.
envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 2)

// Resend endpoints because the clusters changed.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: xdscommon.EndpointType,
Nonce: hexString(4),
Resources: makeTestResources(t,
makeTestEndpoints(t, newSnap, "tcp:db"),
makeTestEndpoints(t, newSnap, "tcp:geo-cache"),
),
})

// Envoy ACKs the new cluster and endpoints.
envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 3)
envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 4)

// Listeners are sent after the cluster and endpoints are ACKed.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: xdscommon.ListenerType,
Nonce: hexString(5),
Resources: makeTestResources(t,
makeTestListener(t, newSnap, "tcp:public_listener"),
makeTestListener(t, newSnap, "tcp:db"),
makeTestListener(t, newSnap, "tcp:geo-cache"),
),
})

// We are caught up, so there should be nothing queued to send.
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)

// ACKs the listener
envoy.SendDeltaReqACK(t, xdscommon.ListenerType, 5)
})

envoy.Close()
select {
case err := <-errCh:
require.NoError(t, err)
case <-time.After(50 * time.Millisecond):
t.Fatalf("timed out waiting for handler to finish")
}
}

func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChangesImpactRoutes(t *testing.T) {
aclResolve := func(id string) (acl.Authorizer, error) {
// Allow all
Expand Down
Loading