-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy patherror_contract_test.go
More file actions
258 lines (233 loc) · 7.79 KB
/
error_contract_test.go
File metadata and controls
258 lines (233 loc) · 7.79 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
package queue
import (
"context"
"errors"
"strings"
"testing"
"time"
)
func TestQueueErrorContract_DispatchCtxCancellation(t *testing.T) {
newSaturatedQueue := func(t *testing.T) (*Queue, chan struct{}) {
t.Helper()
backend := newLocalQueueWithConfig(DriverWorkerpool, WorkerpoolConfig{
Workers: 1,
QueueCapacity: 1,
})
cfg := (Config{Driver: DriverWorkerpool}).normalize()
rt := &nativeQueueRuntime{
common: &queueCommon{
inner: newObservedQueue(backend, cfg.Driver, cfg.Observer),
cfg: cfg,
driver: cfg.Driver,
},
runtime: backend,
registered: make(map[string]Handler),
}
q, err := newQueueFromRuntime(rt)
if err != nil {
t.Fatalf("new queue from workerpool runtime: %v", err)
}
blockHandler := make(chan struct{})
q.Register("job:error-contract:block", func(context.Context, Message) error {
<-blockHandler
return nil
})
if err := q.StartWorkers(context.Background()); err != nil {
t.Fatalf("start workers: %v", err)
}
t.Cleanup(func() {
close(blockHandler)
_ = q.Shutdown(context.Background())
})
// First dispatch occupies the lone worker and blocks in the handler.
if _, err := q.Dispatch(NewJob("job:error-contract:block").OnQueue("default")); err != nil {
t.Fatalf("dispatch blocking worker job: %v", err)
}
// Second dispatch fills the queue buffer while the worker is blocked.
if _, err := q.Dispatch(NewJob("job:error-contract:block").OnQueue("default")); err != nil {
t.Fatalf("dispatch queued blocker job: %v", err)
}
return q, blockHandler
}
t.Run("precanceled_context", func(t *testing.T) {
q, _ := newSaturatedQueue(t)
ctx, cancel := context.WithCancel(context.Background())
cancel()
_, err := q.DispatchCtx(ctx, NewJob("job:error-contract:block").OnQueue("default"))
if !errors.Is(err, context.Canceled) {
t.Fatalf("expected context.Canceled, got %v", err)
}
})
t.Run("expired_deadline_context", func(t *testing.T) {
q, _ := newSaturatedQueue(t)
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(-1*time.Second))
defer cancel()
_, err := q.DispatchCtx(ctx, NewJob("job:error-contract:block").OnQueue("default"))
if !errors.Is(err, context.DeadlineExceeded) {
t.Fatalf("expected context.DeadlineExceeded, got %v", err)
}
})
}
func TestQueueErrorContract_UnsupportedCapabilities(t *testing.T) {
q, err := NewNull()
if err != nil {
t.Fatalf("new null queue: %v", err)
}
if err := q.Pause(context.Background(), "default"); !errors.Is(err, ErrPauseUnsupported) {
t.Fatalf("expected ErrPauseUnsupported from Queue.Pause, got %v", err)
}
if err := q.Resume(context.Background(), "default"); !errors.Is(err, ErrPauseUnsupported) {
t.Fatalf("expected ErrPauseUnsupported from Queue.Resume, got %v", err)
}
_, err = q.Stats(context.Background())
if err == nil {
t.Fatal("expected stats unsupported error")
}
if !strings.Contains(err.Error(), "stats provider is not available") {
t.Fatalf("expected unsupported stats error message, got %v", err)
}
if !strings.Contains(err.Error(), string(DriverNull)) {
t.Fatalf("expected driver name in stats unsupported error, got %v", err)
}
}
func TestQueueErrorContract_WorkflowNotFound(t *testing.T) {
q, err := NewSync()
if err != nil {
t.Fatalf("new sync queue: %v", err)
}
if _, err := q.FindChain(context.Background(), "missing-chain"); !errors.Is(err, ErrWorkflowNotFound) {
t.Fatalf("expected ErrWorkflowNotFound for chain lookup, got %v", err)
}
if _, err := q.FindBatch(context.Background(), "missing-batch"); !errors.Is(err, ErrWorkflowNotFound) {
t.Fatalf("expected ErrWorkflowNotFound for batch lookup, got %v", err)
}
}
func TestQueueErrorContract_Constructors(t *testing.T) {
t.Run("unsupported_driver", func(t *testing.T) {
_, err := New(Config{Driver: Driver("nope")})
if err == nil {
t.Fatal("expected unsupported driver error")
}
if !strings.Contains(err.Error(), "unsupported queue driver") {
t.Fatalf("expected unsupported driver error message, got %v", err)
}
if !strings.Contains(err.Error(), `"nope"`) {
t.Fatalf("expected driver name in unsupported driver error, got %v", err)
}
})
t.Run("optional_driver_moved_guidance", func(t *testing.T) {
tests := []struct {
driver Driver
want string
}{
{driver: DriverRedis, want: "driver/redisqueue"},
{driver: DriverNATS, want: "driver/natsqueue"},
{driver: DriverSQS, want: "driver/sqsqueue"},
{driver: DriverRabbitMQ, want: "driver/rabbitmqqueue"},
{driver: DriverDatabase, want: "driver/{mysqlqueue,postgresqueue,sqlitequeue}"},
}
for _, tt := range tests {
tt := tt
t.Run(string(tt.driver), func(t *testing.T) {
_, err := New(Config{Driver: tt.driver})
if err == nil {
t.Fatalf("expected moved-driver guidance error for %q", tt.driver)
}
if !strings.Contains(err.Error(), tt.want) {
t.Fatalf("expected guidance %q in error, got %v", tt.want, err)
}
})
}
})
}
func TestQueueErrorContract_RuntimeDispatchInputValidation(t *testing.T) {
r, err := newRuntime(Config{Driver: DriverSync})
if err != nil {
t.Fatalf("new sync runtime: %v", err)
}
t.Run("nil_job", func(t *testing.T) {
err := r.DispatchCtx(context.Background(), nil)
if err == nil {
t.Fatal("expected nil job error")
}
if !strings.Contains(err.Error(), "dispatch job is nil") {
t.Fatalf("expected nil job error message, got %v", err)
}
})
t.Run("uninferable_job_type", func(t *testing.T) {
err := r.Dispatch(struct{ F func() }{})
if err == nil {
t.Fatal("expected uninferable job type error")
}
if !strings.Contains(err.Error(), "dispatch job type could not be inferred") {
t.Fatalf("expected uninferable job type message, got %v", err)
}
})
}
func TestQueueErrorContract_WorkflowBuilderValidation(t *testing.T) {
q, err := NewSync()
if err != nil {
t.Fatalf("new sync queue: %v", err)
}
t.Run("chain_requires_jobs", func(t *testing.T) {
_, err := q.Chain().Dispatch(context.Background())
if err == nil {
t.Fatal("expected chain requires jobs error")
}
if !strings.Contains(err.Error(), "chain requires at least one job") {
t.Fatalf("expected chain requires jobs message, got %v", err)
}
})
t.Run("batch_requires_jobs", func(t *testing.T) {
_, err := q.Batch().Dispatch(context.Background())
if err == nil {
t.Fatal("expected batch requires jobs error")
}
if !strings.Contains(err.Error(), "batch requires at least one job") {
t.Fatalf("expected batch requires jobs message, got %v", err)
}
})
t.Run("chain_invalid_job", func(t *testing.T) {
_, err := q.Chain(NewJob("")).Dispatch(context.Background())
if err == nil {
t.Fatal("expected chain invalid job error")
}
if !strings.Contains(err.Error(), "job type is required") {
t.Fatalf("expected chain invalid job message, got %v", err)
}
})
t.Run("batch_invalid_job", func(t *testing.T) {
_, err := q.Batch(NewJob("")).Dispatch(context.Background())
if err == nil {
t.Fatal("expected batch invalid job error")
}
if !strings.Contains(err.Error(), "job type is required") {
t.Fatalf("expected batch invalid job message, got %v", err)
}
})
}
func TestQueueErrorContract_HighLevelDispatchValidation(t *testing.T) {
t.Run("nil_queue_receiver", func(t *testing.T) {
var q *Queue
_, err := q.Dispatch(NewJob("job:error-contract:nil-queue"))
if err == nil {
t.Fatal("expected nil queue receiver error")
}
if !strings.Contains(err.Error(), "runtime is nil") {
t.Fatalf("expected nil queue receiver error message, got %v", err)
}
})
t.Run("zero_value_job", func(t *testing.T) {
q, err := NewSync()
if err != nil {
t.Fatalf("new sync queue: %v", err)
}
_, err = q.Dispatch(Job{})
if err == nil {
t.Fatal("expected invalid job error")
}
if !strings.Contains(err.Error(), "job type is required") {
t.Fatalf("expected invalid job message, got %v", err)
}
})
}