Skip to content

Using task_done() in multiple threads #89

Open
@kodonnell

Description

@kodonnell

I'd like to use Queue to store items to be processed by threads. However, if one of the items fails to get processed (and task_done is hence not called) it's still possible that the item is removed from the queue persistently (whereas one would expect it not to be, as is usual behaviour).

Example:

import threading
import time

from persistqueue import Queue

q = Queue("testq")


def worker1():
    print("getting from worker1")
    x = q.get()
    print("got", x, "from worker1")
    # processing goes here ... takes some time
    time.sleep(2)
    try:
        assert False, "something went wrong"
        q.task_done()
    except:
        print("something went wrong with worker1 in processing", x, "so not calling task_done")


def worker2():
    time.sleep(1)
    print("getting from worker2")
    x = q.get()
    print("got", x, "from worker2")
    # processing would happen here - but happens quicker than task1
    print("finished processing", x, "from worker2 so calling task_done")
    q.task_done()
    print("called task_done from worker2")


if __name__ == "__main__":

    q.put("a")
    q.put("b")

    t1 = threading.Thread(target=worker1)
    t1.start()
    t2 = threading.Thread(target=worker2)
    t2.start()
    t1.join()
    t2.join()
    print("reloading q")
    del q
    q = Queue("testq")
    print("qsize", q.qsize())

Output:

getting from worker1
got a from worker1
getting from worker2
got b from worker2
finished processing b from worker2 so calling task_done
called task_done from worker2
something went wrong with worker1 in processing a so not calling task_done
reloading q
qsize 0

As you can see, 'a' was permanently removed, even though task_done "wasn't" called. In other words, I'd expect to see qsize 1 as the output. Is there a way to achieve this, i.e. task_done only completes a specific task, not all tasks in all threads?

Bonus question: how do I also add 'a' back onto the in-memory queue (ignoring persistence)? I.e. the equivalent of SQLiteAckQueue.nack? The only way I see how would be reloading the queue from disk (in which case the get wouldn't have persisted) but this seems messy.

(Also, yes, I know of the SQLiteAckQueue which seems well-suited, but I'd prefer to use plain files if possible.)

Metadata

Metadata

Assignees

No one assigned

    Labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions