Open
Description
If you get a task that crashes the consumer, for example, due to a memory leak, the consumer will enter a crash loop that it is unable to recover from.
- The consumer receives a messages and starts a task that eats up memory.
- The process is killed with SIGKILL due to an OOM condition.
- Because that is not an exception that can be caught, the consumer is unable to ackowledge the message and, if the retry middleware is used, to update the retry count and send a message with the updated retry count.
- Instead, the original message is requeued to the queue as is. The
redeliver
property is set toTrue
, but it doesn't contain the number of deliveries. - The consumer picks it up again and the loop continues.
This can be prevented with the use of quorum queues, which keep track of the number deliveries with x-delivery-count
and automatically drop or dead-letter a message if that count exceeds the delivery-limit
setting.
Here's a reproducible example:
Consumer/producer code:
import asyncio
import time
from taskiq import SimpleRetryMiddleware
from taskiq_aio_pika import AioPikaBroker
broker = AioPikaBroker("amqp://user:password@localhost:5672").with_middlewares(
SimpleRetryMiddleware(default_retry_count=3),
)
def eat_memory(max_gb=1.0, chunk_mb=100.0):
chunk_size, max_bytes, chunks, total = (
int(chunk_mb * 1024 * 1024),
int(max_gb * 1024 * 1024 * 1024),
[],
0,
)
try:
while True:
if total >= max_bytes:
break
chunks.append(bytearray(chunk_size))
total += chunk_size
time.sleep(0.1)
except:
pass
finally:
chunks.clear()
@broker.task
async def test(input: int) -> None:
if input == 3:
eat_memory()
async def main() -> None:
await broker.startup()
await test.kiq(1)
await test.kiq(2)
await test.kiq(3)
await broker.shutdown()
if __name__ == "__main__":
asyncio.run(main())
docker-compose with a memory limit:
version: '3.8'
services:
taskiq-consumer:
build:
context: .
dockerfile: Dockerfile_taskiq_poison_message
volumes:
- .:/app
command: poetry run taskiq worker taskiq_poison_message:broker
restart: unless-stopped
environment:
- PYTHONUNBUFFERED=1
network_mode: "host"
deploy:
resources:
limits:
memory: 500MB
depends_on:
- rabbitmq
rabbitmq:
image: 'rabbitmq:4-management'
ports:
- '5672:5672'
- '15672:15672'
environment:
- RABBITMQ_DEFAULT_USER=user
- RABBITMQ_DEFAULT_PASS=password
Logs showing the worker keeps trying to process the third message and crashing indefinitely.
[2024-12-20 12:51:57,539][taskiq.process-manager][INFO ][MainProcess] Process worker-0 restarted with pid 22
[2024-12-20 12:51:57,619][taskiq.receiver.receiver][INFO ][worker-0] Listening started.
[2024-12-20 12:51:57,641][taskiq.process-manager][INFO ][MainProcess] Process worker-1 restarted with pid 24
[2024-12-20 12:51:57,704][taskiq.receiver.receiver][INFO ][worker-1] Listening started.
[2024-12-20 12:58:17,253][taskiq.receiver.receiver][INFO ][worker-0] Executing task taskiq_poison_message:test with ID: 7dbe4ff8d044446aa9a3fbafec4ad25b
[2024-12-20 12:58:17,255][taskiq.receiver.receiver][INFO ][worker-1] Executing task taskiq_poison_message:test with ID: 87f46763d49b420f8b0b7216055e118f
[2024-12-20 12:58:17,256][taskiq.receiver.receiver][INFO ][worker-0] Executing task taskiq_poison_message:test with ID: 37f99831dc1e41f78bd54c7c342452fe
[2024-12-20 12:58:18,858][taskiq.process-manager][INFO ][MainProcess] worker-0 is dead. Scheduling reload.
[2024-12-20 12:58:18,976][taskiq.receiver.receiver][INFO ][worker-1] Executing task taskiq_poison_message:test with ID: 37f99831dc1e41f78bd54c7c342452fe
[2024-12-20 12:58:19,963][taskiq.process-manager][INFO ][MainProcess] Process worker-0 restarted with pid 26
[2024-12-20 12:58:21,153][taskiq.process-manager][INFO ][MainProcess] worker-1 is dead. Scheduling reload.
[2024-12-20 12:58:22,083][taskiq.receiver.receiver][INFO ][worker-0] Listening started.
[2024-12-20 12:58:22,111][taskiq.receiver.receiver][INFO ][worker-0] Executing task taskiq_poison_message:test with ID: 37f99831dc1e41f78bd54c7c342452fe
[2024-12-20 12:58:22,232][taskiq.process-manager][INFO ][MainProcess] Process worker-1 restarted with pid 28
[2024-12-20 12:58:22,348][taskiq.receiver.receiver][INFO ][worker-1] Listening started.
[2024-12-20 12:58:23,794][taskiq.receiver.receiver][INFO ][worker-1] Executing task taskiq_poison_message:test with ID: 37f99831dc1e41f78bd54c7c342452fe
[2024-12-20 12:58:24,365][taskiq.process-manager][INFO ][MainProcess] worker-0 is dead. Scheduling reload.
[2024-12-20 12:58:25,464][taskiq.process-manager][INFO ][MainProcess] Process worker-0 restarted with pid 30
[2024-12-20 12:58:25,573][taskiq.process-manager][INFO ][MainProcess] worker-1 is dead. Scheduling reload.
[2024-12-20 12:58:26,002][taskiq.receiver.receiver][INFO ][worker-0] Listening started.
[2024-12-20 12:58:26,022][taskiq.receiver.receiver][INFO ][worker-0] Executing task taskiq_poison_message:test with ID: 37f99831dc1e41f78bd54c7c342452fe
[2024-12-20 12:58:26,631][taskiq.process-manager][INFO ][MainProcess] Process worker-1 restarted with pid 32
[2024-12-20 12:58:27,310][taskiq.receiver.receiver][INFO ][worker-1] Listening started.
[2024-12-20 12:58:27,564][taskiq.receiver.receiver][INFO ][worker-1] Executing task taskiq_poison_message:test with ID: 37f99831dc1e41f78bd54c7c342452fe
[2024-12-20 12:58:27,757][taskiq.process-manager][INFO ][MainProcess] worker-0 is dead. Scheduling reload.
[2024-12-20 12:58:28,825][taskiq.process-manager][INFO ][MainProcess] Process worker-0 restarted with pid 34
[2024-12-20 12:58:29,545][taskiq.receiver.receiver][INFO ][worker-0] Listening started.
[2024-12-20 12:58:29,570][taskiq.receiver.receiver][INFO ][worker-0] Executing task taskiq_poison_message:test with ID: 37f99831dc1e41f78bd54c7c342452fe
[2024-12-20 12:58:29,931][taskiq.process-manager][INFO ][MainProcess] worker-1 is dead. Scheduling reload.
[2024-12-20 12:58:30,994][taskiq.process-manager][INFO ][MainProcess] Process worker-1 restarted with pid 36
[2024-12-20 12:58:31,687][taskiq.receiver.receiver][INFO ][worker-1] Listening started.
[2024-12-20 12:58:31,713][taskiq.receiver.receiver][INFO ][worker-1] Executing task taskiq_poison_message:test with ID: 37f99831dc1e41f78bd54c7c342452fe
Metadata
Metadata
Assignees
Labels
No labels