Skip to content
This repository was archived by the owner on Sep 24, 2024. It is now read-only.

Commit bae4268

Browse files
More resilient to duplicate jobs in the queue.
Still, we need to stop a bit and rethink how the "reevaluate", "find jobs to do" and so on cooperate to add jobs to the queue.
1 parent 09b7b8e commit bae4268

File tree

1 file changed

+33
-25
lines changed

1 file changed

+33
-25
lines changed

cms/cms/service/EvaluationService.py

Lines changed: 33 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -502,17 +502,10 @@ def search_jobs_not_done(self):
502502
new_submission_ids_to_check = \
503503
[x.id for x in contest.get_submissions()
504504
if (not x.compiled() and x.compilation_tries <
505-
EvaluationService.MAX_COMPILATION_TRIES and
506-
(EvaluationService.JOB_TYPE_COMPILATION, x.id) not in
507-
self.queue and
508-
(EvaluationService.JOB_TYPE_COMPILATION, x.id) not in
509-
self.pool)
510-
or (not x.evaluated() and x.evaluation_tries <
511-
EvaluationService.MAX_EVALUATION_TRIES and
512-
(EvaluationService.JOB_TYPE_EVALUATION, x.id) not in
513-
self.queue and
514-
(EvaluationService.JOB_TYPE_EVALUATION, x.id)
515-
not in self.pool)]
505+
EvaluationService.MAX_COMPILATION_TRIES)
506+
or (x.compilation_outcome == "ok" and
507+
not x.evaluated() and x.evaluation_tries <
508+
EvaluationService.MAX_EVALUATION_TRIES)]
516509

517510
new = len(new_submission_ids_to_check)
518511
old = len(self.submission_ids_to_check)
@@ -661,7 +654,7 @@ def check_workers_timeout(self):
661654
for priority, timestamp, job in lost_jobs:
662655
logger.info("Job %s for submission %s put again in the queue "
663656
"because of timeout worker." % (job[0], job[1]))
664-
self.queue.push(job, priority, timestamp)
657+
self.push_in_queue(job, priority, timestamp)
665658
return True
666659

667660
def check_workers_connection(self):
@@ -673,9 +666,24 @@ def check_workers_connection(self):
673666
for priority, timestamp, job in lost_jobs:
674667
logger.info("Job %s for submission %s put again in the queue "
675668
"because of disconnected worker." % (job[0], job[1]))
676-
self.queue.push(job, priority, timestamp)
669+
self.push_in_queue(job, priority, timestamp)
677670
return True
678671

672+
def push_in_queue(self, job, priority, timestamp):
673+
"""Push a job in the job queue if the job is not already in
674+
the queue or assigned to a worker.
675+
676+
job (job): a couple (job_type, submission_id) to push.
677+
678+
return (bool): True if pushed, False if not.
679+
680+
"""
681+
if job in self.queue or job in self.pool:
682+
return False
683+
else:
684+
self.queue.push(job, priority, timestamp)
685+
return True
686+
679687
@rpc_callback
680688
def action_finished(self, data, plus, error=None):
681689
"""Callback from a worker, to signal that is finished some
@@ -765,8 +773,8 @@ def compilation_ended(self, submission_id,
765773
priority = EvaluationService.JOB_PRIORITY_LOW
766774
if tokened:
767775
priority = EvaluationService.JOB_PRIORITY_MEDIUM
768-
self.queue.push((EvaluationService.JOB_TYPE_EVALUATION,
769-
submission_id), priority, timestamp)
776+
self.push_in_queue((EvaluationService.JOB_TYPE_EVALUATION,
777+
submission_id), priority, timestamp)
770778
# If instead submission failed compilation, we don't evaluate.
771779
elif compilation_outcome == "fail":
772780
logger.info("Submission %s did not compile. Not going "
@@ -780,10 +788,10 @@ def compilation_ended(self, submission_id,
780788
else:
781789
# Note: lower priority (MEDIUM instead of HIGH) for
782790
# compilation that are probably failing again.
783-
self.queue.push((EvaluationService.JOB_TYPE_COMPILATION,
784-
submission_id),
785-
EvaluationService.JOB_PRIORITY_MEDIUM,
786-
timestamp)
791+
self.push_in_queue((EvaluationService.JOB_TYPE_COMPILATION,
792+
submission_id),
793+
EvaluationService.JOB_PRIORITY_MEDIUM,
794+
timestamp)
787795
# Otherwise, error.
788796
else:
789797
logger.error("Compilation outcome %r not recognized." %
@@ -813,8 +821,8 @@ def evaluation_ended(self, submission_id,
813821
priority = EvaluationService.JOB_PRIORITY_LOW
814822
if tokened:
815823
priority = EvaluationService.JOB_PRIORITY_MEDIUM
816-
self.queue.push((EvaluationService.JOB_TYPE_EVALUATION,
817-
submission_id), priority, timestamp)
824+
self.push_in_queue((EvaluationService.JOB_TYPE_EVALUATION,
825+
submission_id), priority, timestamp)
818826
else:
819827
logger.error("Maximum tries reached for the "
820828
"evaluation of submission %s. I will "
@@ -845,10 +853,10 @@ def new_submission(self, submission_id):
845853
# If not compiled, I compile. Note that we give here a
846854
# chance for submissions that already have too many
847855
# compilation tries.
848-
self.queue.push((EvaluationService.JOB_TYPE_COMPILATION,
849-
submission_id),
850-
EvaluationService.JOB_PRIORITY_HIGH,
851-
timestamp)
856+
self.push_in_queue((EvaluationService.JOB_TYPE_COMPILATION,
857+
submission_id),
858+
EvaluationService.JOB_PRIORITY_HIGH,
859+
timestamp)
852860
else:
853861
if not evaluated:
854862
self.compilation_ended(submission_id,

0 commit comments

Comments
 (0)