Skip to content

Commit b6d42af

Browse files
committed
Add expired status to assignments fetching params
1 parent 4af37cb commit b6d42af

File tree

5 files changed

+56
-0
lines changed

5 files changed

+56
-0
lines changed

pkg/api/assignmentfetcher/transport.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,11 @@ func decodeAssignmentsFetcherRequest(_ context.Context, r *http.Request) (interf
5050
as.TaskID = taskID[0]
5151
}
5252

53+
status, ok := params["status"]
54+
if ok && len(taskID) > 0 {
55+
as.Status = assignment.Status(status[0])
56+
}
57+
5358
return as, nil
5459
}
5560

pkg/assignment/assignment.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ const (
1414
Pending Status = "pending"
1515
Accepted Status = "accepted"
1616
Rejected Status = "rejected"
17+
Expired Status = "expired"
1718
)
1819

1920
type Params struct {

pkg/datastore/assignment.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@ type Storage interface {
1818
GetWhitelist(jobID uint64, workerID uint64) (*whitelist.Whitelist, error)
1919
WorkerAlreadyAssigned(jobID uint64, workerID uint64) (bool, error)
2020
DeleteAssignment(workerID uint64, jobID uint64) (bool, error)
21+
DeleteAssignments(ids []uint64) error
2122
UpdateAssignment(workerID uint64, jobID uint64, status string) (bool, error)
2223
CreateSettings(assignment.Settings) (*assignment.Settings, error)
24+
SelectExpiredAssignments() (assignment.Assignments, error)
2325
}
2426

2527
type AssignmentStore struct {
@@ -33,10 +35,16 @@ func NewAssignmentStore(db *sqlx.DB) *AssignmentStore {
3335
}
3436

3537
func (as *AssignmentStore) GetAssignments(p assignment.Params) (assignment.Assignments, error) {
38+
// If we're looking for expired assignments, then return them
39+
if p.Status == assignment.Expired {
40+
return as.SelectExpiredAssignments()
41+
}
42+
3643
assignments := assignment.Assignments{}
3744
query := "SELECT * FROM assignments"
3845
paramsQuery := []string{}
3946
args := []interface{}{}
47+
4048
if p.WorkerID != "" {
4149
args = append(args, p.WorkerID)
4250
paramsQuery = append(paramsQuery, "worker_id=?")
@@ -200,3 +208,34 @@ func (as *AssignmentStore) CreateSettings(s assignment.Settings) (*assignment.Se
200208

201209
return set, nil
202210
}
211+
212+
func (as *AssignmentStore) SelectExpiredAssignments() (assignment.Assignments, error) {
213+
assignments := assignment.Assignments{}
214+
err := as.DB.Select(
215+
&assignments,
216+
`SELECT * FROM assignments WHERE expires_at <= NOW()`,
217+
)
218+
if err != nil {
219+
return nil, err
220+
}
221+
return assignments, nil
222+
}
223+
224+
func (as *AssignmentStore) DeleteAssignments(ids []uint64) error {
225+
query, args, _ := sqlx.In(`DELETE FROM assignments WHERE id IN (?)`, ids)
226+
query = as.DB.Rebind(query)
227+
228+
res, err := as.DB.Exec(query, args...)
229+
230+
if err != nil {
231+
return err
232+
}
233+
234+
numRows, _ := res.RowsAffected()
235+
236+
if numRows != int64(len(ids)) {
237+
return RecordsMismatch{}
238+
}
239+
240+
return nil
241+
}

pkg/datastore/errors.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,3 +28,9 @@ type NoRowErr struct{}
2828
func (err NoRowErr) Error() string {
2929
return "No Records"
3030
}
31+
32+
type RecordsMismatch struct{}
33+
34+
func (err RecordsMismatch) Error() string {
35+
return "Some records where not processed properly"
36+
}

pkg/service/service.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ type AssignmentService interface {
1818
DeleteAssignment(workerID uint64, jobID uint64) (bool, error)
1919
UpdateAssignment(workerID uint64, jobID uint64, status string) (bool, error)
2020
CreateSettings(assignment.Settings) (*assignment.Settings, error)
21+
GetStore() datastore.Storage
2122
}
2223

2324
type service struct {
@@ -32,6 +33,10 @@ func New(s datastore.Storage, a authorization.Authorizer) *service {
3233
}
3334
}
3435

36+
func (s *service) GetStore() datastore.Storage {
37+
return s.store
38+
}
39+
3540
func (s *service) Healthy() bool {
3641
return true
3742
}

0 commit comments

Comments
 (0)