Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(kds): fix the case when webhook/db reject resource #10315

Merged
merged 10 commits into from
May 29, 2024
40 changes: 27 additions & 13 deletions pkg/kds/v2/server/resource_retry_forcer.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,13 @@ import (
// We cannot simply invalidate existing snapshot because versions are also set in StreamState
type kdsRetryForcer struct {
util_xds_v3.NoopCallbacks
forceFn func(*envoy_core.Node, model.ResourceType)
log logr.Logger
nodes map[xds.StreamID]*envoy_core.Node
backoff time.Duration
emitter events.Emitter
hasher envoy_cache.NodeHash
forceFn func(*envoy_core.Node, model.ResourceType)
log logr.Logger
nodes map[xds.StreamID]*envoy_core.Node
backoff time.Duration
emitter events.Emitter
hasher envoy_cache.NodeHash
streamToDelay map[xds.StreamID]bool

sync.Mutex
}
Expand All @@ -64,12 +65,13 @@ func newKdsRetryForcer(
hasher envoy_cache.NodeHash,
) *kdsRetryForcer {
return &kdsRetryForcer{
forceFn: forceFn,
log: log,
nodes: map[xds.StreamID]*envoy_core.Node{},
backoff: backoff,
emitter: emitter,
hasher: hasher,
forceFn: forceFn,
log: log,
nodes: map[xds.StreamID]*envoy_core.Node{},
backoff: backoff,
emitter: emitter,
hasher: hasher,
streamToDelay: map[int64]bool{},
}
}

Expand All @@ -95,10 +97,13 @@ func (r *kdsRetryForcer) OnStreamDeltaRequest(streamID xds.StreamID, request *en
if !ok {
node = request.Node
r.nodes[streamID] = node
// store information about NACK resources, to delete force retries once ACK
}
if _, found := r.streamToDelay[streamID]; !found {
r.streamToDelay[streamID] = true
}
r.Unlock()
r.log.Info("received NACK, will retry", "nodeID", r.hasher.ID(request.Node), "type", request.TypeUrl, "err", request.GetErrorDetail().GetMessage(), "backoff", r.backoff)
time.Sleep(r.backoff)
r.forceFn(node, model.ResourceType(request.TypeUrl))
r.emitter.Send(events.TriggerKDSResyncEvent{
Type: model.ResourceType(request.TypeUrl),
Expand All @@ -107,3 +112,12 @@ func (r *kdsRetryForcer) OnStreamDeltaRequest(streamID xds.StreamID, request *en
r.log.V(1).Info("forced the new verion of resources", "nodeID", node.Id, "type", request.TypeUrl)
return nil
}

func (r *kdsRetryForcer) OnStreamDeltaResponse(streamID int64, req *envoy_sd.DeltaDiscoveryRequest, resp *envoy_sd.DeltaDiscoveryResponse) {
r.Lock()
if _, found := r.streamToDelay[streamID]; found {
lukidzi marked this conversation as resolved.
Show resolved Hide resolved
time.Sleep(r.backoff)
}
delete(r.streamToDelay, streamID)
r.Unlock()
lukidzi marked this conversation as resolved.
Show resolved Hide resolved
}
Loading