-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathtask.go
144 lines (124 loc) · 3.28 KB
/
task.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package tasq
import (
"bytes"
"encoding/gob"
"fmt"
"time"
"github.com/google/uuid"
)
// TaskStatus is an enum type describing the status a task is currently in.
type TaskStatus string
// The collection of possible task statuses.
const (
StatusNew TaskStatus = "NEW"
StatusEnqueued TaskStatus = "ENQUEUED"
StatusInProgress TaskStatus = "IN_PROGRESS"
StatusSuccessful TaskStatus = "SUCCESSFUL"
StatusFailed TaskStatus = "FAILED"
)
// TaskStatusGroup is an enum type describing the key used in the
// map of TaskStatuses which groups them for different purposes.
type TaskStatusGroup int
// The collection of possible task status groupings.
const (
AllTasks TaskStatusGroup = iota
OpenTasks
FinishedTasks
)
// GetTaskStatuses returns a slice of TaskStatuses based on the TaskStatusGroup
// passed as an argument.
func GetTaskStatuses(taskStatusGroup TaskStatusGroup) []TaskStatus {
if selected, ok := map[TaskStatusGroup][]TaskStatus{
AllTasks: {
StatusNew,
StatusEnqueued,
StatusInProgress,
StatusSuccessful,
StatusFailed,
},
OpenTasks: {
StatusNew,
StatusEnqueued,
StatusInProgress,
},
FinishedTasks: {
StatusSuccessful,
StatusFailed,
},
}[taskStatusGroup]; ok {
return selected
}
return nil
}
// Task is the struct used to represent an atomic task managed by tasq.
type Task struct {
ID uuid.UUID
Type string
Args []byte
Queue string
Priority int16
Status TaskStatus
ReceiveCount int32
MaxReceives int32
LastError *string
CreatedAt time.Time
StartedAt *time.Time
FinishedAt *time.Time
VisibleAt time.Time
}
// NewTask creates a new Task struct based on the supplied arguments required to define it.
func NewTask(taskType string, taskArgs any, queue string, priority int16, maxReceives int32) (*Task, error) {
taskID, err := uuid.NewRandom()
if err != nil {
return nil, fmt.Errorf("failed to generate new task ID: %w", err)
}
encodedArgs, err := encodeTaskArgs(taskArgs)
if err != nil {
return nil, err
}
return &Task{
ID: taskID,
Type: taskType,
Args: encodedArgs,
Queue: queue,
Priority: priority,
Status: StatusNew,
ReceiveCount: 0,
MaxReceives: maxReceives,
LastError: nil,
CreatedAt: time.Now(),
StartedAt: nil,
FinishedAt: nil,
VisibleAt: time.Time{},
}, nil
}
// IsLastReceive returns true if the task has reached its maximum number of receives.
func (t *Task) IsLastReceive() bool {
return t.ReceiveCount >= t.MaxReceives
}
// SetVisibility sets the time at which the task will become visible again.
func (t *Task) SetVisibility(visibleAt time.Time) {
t.VisibleAt = visibleAt
}
// UnmarshalArgs decodes the task arguments into the passed target interface.
func (t *Task) UnmarshalArgs(target any) error {
var (
buffer = bytes.NewBuffer(t.Args)
decoder = gob.NewDecoder(buffer)
)
if err := decoder.Decode(target); err != nil {
return fmt.Errorf("failed to decode task arguments: %w", err)
}
return nil
}
func encodeTaskArgs(taskArgs any) ([]byte, error) {
var (
buffer bytes.Buffer
encoder = gob.NewEncoder(&buffer)
)
err := encoder.Encode(taskArgs)
if err != nil {
return []byte{}, fmt.Errorf("failed to encode task arguments: %w", err)
}
return buffer.Bytes(), nil
}