Skip to content

Race condition with retries and multiple workers #482

Open
@eigenein

Description

@eigenein

Symptoms

I'm observing jobs randomly being run sooner than scheduled. As far as I can tell, this occurs with multiple workers and retried jobs.

Investigation

I believe I see a race condition in the code.

Disclaimer: I haven't verified this particular path, as it's extremely difficult to reproduce it. I did, however, verified that scaling down to 1 worker makes the issue go away.

Consider this scenario:

  1. Worker A fetches the job X:

job_ids = await self.pool.zrangebyscore(

  1. Worker B fetches multiple jobs, including the job X. Worker B goes ahead and iterates through them, making its way to the job X:

for job_id_b in job_ids:

  1. In the meantime, Worker A finishes the job X and catches a Retry. It increments the job score:

tr.zincrby(self.queue_name, incr_score, job_id)

  1. Now, Worker B gets a chance to run X. It reads the score again:

score = await pipe.zscore(self.queue_name, job_id)

And now, it’s in the future yet worker B continues normally 💥 As far as I can tell, steps 3 and 4 are not protected by the sync primitives. Does this sound plausible?

Possible fix

I haven't studied the code well enough. It looks like an additional check, like score > timestamp_ms() could be added around here to prevent the execution of a future retry:

if ongoing_exists or not score:

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions