Description
I'd like to use Queue
to store items to be processed by threads. However, if one of the items fails to get processed (and task_done
is hence not called) it's still possible that the item is removed from the queue persistently (whereas one would expect it not to be, as is usual behaviour).
Example:
import threading
import time
from persistqueue import Queue
q = Queue("testq")
def worker1():
print("getting from worker1")
x = q.get()
print("got", x, "from worker1")
# processing goes here ... takes some time
time.sleep(2)
try:
assert False, "something went wrong"
q.task_done()
except:
print("something went wrong with worker1 in processing", x, "so not calling task_done")
def worker2():
time.sleep(1)
print("getting from worker2")
x = q.get()
print("got", x, "from worker2")
# processing would happen here - but happens quicker than task1
print("finished processing", x, "from worker2 so calling task_done")
q.task_done()
print("called task_done from worker2")
if __name__ == "__main__":
q.put("a")
q.put("b")
t1 = threading.Thread(target=worker1)
t1.start()
t2 = threading.Thread(target=worker2)
t2.start()
t1.join()
t2.join()
print("reloading q")
del q
q = Queue("testq")
print("qsize", q.qsize())
Output:
getting from worker1
got a from worker1
getting from worker2
got b from worker2
finished processing b from worker2 so calling task_done
called task_done from worker2
something went wrong with worker1 in processing a so not calling task_done
reloading q
qsize 0
As you can see, 'a'
was permanently removed, even though task_done
"wasn't" called. In other words, I'd expect to see qsize 1
as the output. Is there a way to achieve this, i.e. task_done
only completes a specific task, not all tasks in all threads?
Bonus question: how do I also add 'a'
back onto the in-memory queue (ignoring persistence)? I.e. the equivalent of SQLiteAckQueue.nack
? The only way I see how would be reloading the queue from disk (in which case the get
wouldn't have persisted) but this seems messy.
(Also, yes, I know of the SQLiteAckQueue
which seems well-suited, but I'd prefer to use plain files if possible.)