Skip to content
This repository was archived by the owner on Oct 9, 2023. It is now read-only.

Commit 9bf0fb9

Browse files
authored
Mark webapi task failure as retry limit exceeded (#392)
Signed-off-by: Kevin Su <pingsutw@apache.org>
1 parent ff9f55b commit 9bf0fb9

File tree

4 files changed

+58
-10
lines changed

4 files changed

+58
-10
lines changed

go/tasks/pluginmachinery/internal/webapi/cache.go

+15-8
Original file line numberDiff line numberDiff line change
@@ -78,31 +78,38 @@ func (q *ResourceCache) SyncResource(ctx context.Context, batch cache.Batch) (
7878
logger.Debugf(ctx, "Sync loop - processing resource with cache key [%s]",
7979
resource.GetID())
8080

81-
if cacheItem.SyncFailureCount > q.cfg.MaxSystemFailures {
82-
logger.Infof(ctx, "Sync loop - Item with key [%v] has failed to sync [%v] time(s). More than the allowed [%v] time(s). Marking as failure.",
83-
cacheItem.SyncFailureCount, q.cfg.MaxSystemFailures)
84-
cacheItem.State.Phase = PhaseSystemFailure
85-
}
86-
8781
if cacheItem.State.Phase.IsTerminal() {
8882
logger.Debugf(ctx, "Sync loop - resource cache key [%v] in terminal state [%s]",
8983
resource.GetID())
90-
9184
resp = append(resp, cache.ItemSyncResponse{
9285
ID: resource.GetID(),
93-
Item: resource.GetItem(),
86+
Item: cacheItem,
9487
Action: cache.Unchanged,
9588
})
9689

9790
continue
9891
}
9992

93+
if cacheItem.SyncFailureCount > q.cfg.MaxSystemFailures {
94+
logger.Debugf(ctx, "Sync loop - Item with key [%v] has failed to sync [%v] time(s). More than the allowed [%v] time(s). Marking as failure.",
95+
cacheItem.SyncFailureCount, q.cfg.MaxSystemFailures)
96+
cacheItem.State.Phase = PhaseSystemFailure
97+
resp = append(resp, cache.ItemSyncResponse{
98+
ID: resource.GetID(),
99+
Item: cacheItem,
100+
Action: cache.Update,
101+
})
102+
103+
continue
104+
}
105+
100106
// Get an updated status
101107
logger.Debugf(ctx, "Querying AsyncPlugin for %s", resource.GetID())
102108
newResource, err := q.client.Get(ctx, newPluginContext(cacheItem.ResourceMeta, cacheItem.Resource, "", nil))
103109
if err != nil {
104110
logger.Infof(ctx, "Error retrieving resource [%s]. Error: %v", resource.GetID(), err)
105111
cacheItem.SyncFailureCount++
112+
cacheItem.ErrorMessage = err.Error()
106113

107114
// Make sure we don't return nil for the first argument, because that deletes it from the cache.
108115
resp = append(resp, cache.ItemSyncResponse{

go/tasks/pluginmachinery/internal/webapi/cache_test.go

+30
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,36 @@ func TestResourceCache_SyncResource(t *testing.T) {
6767
assert.Equal(t, cacheItem, newCacheItem[0].Item)
6868
})
6969

70+
t.Run("Retry limit exceeded", func(t *testing.T) {
71+
mockCache := &cacheMocks.AutoRefresh{}
72+
mockClient := &mocks.Client{}
73+
74+
q := ResourceCache{
75+
AutoRefresh: mockCache,
76+
client: mockClient,
77+
cfg: webapi.CachingConfig{
78+
MaxSystemFailures: 2,
79+
},
80+
}
81+
82+
cacheItem := CacheItem{
83+
State: State{
84+
SyncFailureCount: 5,
85+
ErrorMessage: "some error",
86+
},
87+
}
88+
89+
iw := &cacheMocks.ItemWrapper{}
90+
iw.OnGetItem().Return(cacheItem)
91+
iw.OnGetID().Return("some-id")
92+
93+
newCacheItem, err := q.SyncResource(ctx, []cache.ItemWrapper{iw})
94+
assert.NoError(t, err)
95+
assert.Equal(t, cache.Update, newCacheItem[0].Action)
96+
cacheItem.State.Phase = PhaseSystemFailure
97+
assert.Equal(t, cacheItem, newCacheItem[0].Item)
98+
})
99+
70100
t.Run("move to success", func(t *testing.T) {
71101
mockCache := &cacheMocks.AutoRefresh{}
72102
mockClient := &mocks.Client{}

go/tasks/pluginmachinery/internal/webapi/monitor.go

+10-2
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,16 @@ func monitor(ctx context.Context, tCtx core.TaskExecutionContext, p Client, cach
2929
errors.CacheFailed, "Failed to cast [%v]", cacheItem)
3030
}
3131

32-
// If the cache has not syncd yet, just return
32+
// If the cache has not synced yet, just return
3333
if cacheItem.Resource == nil {
34+
if cacheItem.Phase.IsTerminal() {
35+
err = cache.DeleteDelayed(cacheItemID)
36+
if err != nil {
37+
logger.Errorf(ctx, "Failed to queue item for deletion in the cache with Item Id: [%v]. Error: %v",
38+
cacheItemID, err)
39+
}
40+
return state, core.PhaseInfoFailure(errors.CacheFailed, cacheItem.ErrorMessage, nil), nil
41+
}
3442
return state, core.PhaseInfoRunning(0, nil), nil
3543
}
3644

@@ -54,7 +62,7 @@ func monitor(ctx context.Context, tCtx core.TaskExecutionContext, p Client, cach
5462
// Queue item for deletion in the cache.
5563
err = cache.DeleteDelayed(cacheItemID)
5664
if err != nil {
57-
logger.Warnf(ctx, "Failed to queue item for deletion in the cache with Item Id: [%v]. Error: %v",
65+
logger.Errorf(ctx, "Failed to queue item for deletion in the cache with Item Id: [%v]. Error: %v",
5866
cacheItemID, err)
5967
}
6068
}

go/tasks/pluginmachinery/internal/webapi/state.go

+3
Original file line numberDiff line numberDiff line change
@@ -56,4 +56,7 @@ type State struct {
5656

5757
// The time the execution first requests for an allocation token
5858
AllocationTokenRequestStartTime time.Time `json:"allocationTokenRequestStartTime,omitempty"`
59+
60+
// ErrorMessage generated during cache synchronization.
61+
ErrorMessage string `json:"error_message,omitempty"`
5962
}

0 commit comments

Comments
 (0)