Skip to content

Commit dbcd5f0

Browse files
committed
feat(rst): TODO merge - cleanup error messages
1 parent c0f3c14 commit dbcd5f0

File tree

2 files changed

+10
-7
lines changed

2 files changed

+10
-7
lines changed

rst/sync/internal/workmgr/manager.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -514,12 +514,12 @@ func (m *Manager) resumeWorkRequest(waitQueueSubmissionId string, waitQueueWorkE
514514

515515
_, workEntry, commitJournalEntry, err := m.workJournal.CreateAndLockEntry(submissionID)
516516
if err != nil {
517-
fmt.Printf("unable to create work journal entry for job ID %s work request ID %s: %s\n", jobID, workRequestID, err.Error())
517+
return false, fmt.Errorf("failed to create work journal entry for job ID %s: %w", jobID, err)
518+
518519
}
519520
defer func() {
520521
if err = commitJournalEntry(); err != nil {
521522
m.log.Error("unable to release work journal entry", zap.Error(err), zap.Any("jobID", jobID))
522-
523523
}
524524
}()
525525

@@ -529,7 +529,7 @@ func (m *Manager) resumeWorkRequest(waitQueueSubmissionId string, waitQueueWorkE
529529

530530
_, waitQueueCommitJournalEntry, err := m.workWaitQueueJournal.GetAndLockEntry(waitQueueSubmissionId)
531531
if err != nil {
532-
return false, fmt.Errorf("failed to get waitQueue work journal entry. What should we do?")
532+
return false, fmt.Errorf("failed to get waitQueue work journal entry for job ID %s: %w", jobID, err)
533533
}
534534
defer func() {
535535
if err = waitQueueCommitJournalEntry(kvstore.WithDeleteEntry(true)); err != nil {

rst/sync/internal/workmgr/work.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -301,22 +301,25 @@ func (w *worker) processWork(work workAssignment) {
301301

302302
baseKey, err := w.workWaitQueueJournal.GenerateNextPK()
303303
if err != nil {
304-
// TODO: // return nil, fmt.Errorf("unable to generate database key for job ID %s: %w", request.GetJobId(), err)
304+
w.log.Error("unable to generate database key for wor request wait queue", zap.Error(err), zap.Any("jobID", request.GetJobId()), zap.Any("workID", request.GetRequestId()))
305+
status.SetState(flex.Work_FAILED)
306+
status.SetMessage("unable to generate database key for work request wait queue for job ID " + request.GetJobId() + ": " + err.Error())
305307
}
306308

307309
key := createSubmissionID(baseKey, submissionIDPriority(work.submissionID))
308310
_, waitEntry, commitAndReleaseWait, err := w.workWaitQueueJournal.CreateAndLockEntry(key)
309311
if err != nil {
310-
// TODO: return nil, fmt.Errorf("unable to create work journal entry for job ID %s work request ID %s: %w", request.GetJobId(), request.GetRequestId(), err)
312+
w.log.Error("unable to create work journal entry for wor request wait queue", zap.Error(err), zap.Any("jobID", request.GetJobId()), zap.Any("workID", request.GetRequestId()))
313+
status.SetState(flex.Work_FAILED)
314+
status.SetMessage("unable to generate database key for work request wait queue for job ID " + request.GetJobId() + ": " + err.Error())
311315
}
312316

313317
waitEntry.Value.ExecutionTime = retryTime.Unix()
314318
waitEntry.Value.SubmissionID = work.submissionID
315319
waitEntry.Value.WorkRequest = &workRequest{WorkRequest: request.WorkRequest}
316320
waitEntry.Value.WorkResult = result
317-
318321
if err := commitAndReleaseWait(); err != nil {
319-
w.log.Error("unable to release work wait queue journal entry", zap.Error(err), zap.Any("jobID", request.GetJobId()))
322+
w.log.Error("unable to release work wait queue journal entry", zap.Error(err), zap.Any("jobID", request.GetJobId()), zap.Any("workID", request.GetRequestId()))
320323
}
321324

322325
return

0 commit comments

Comments
 (0)