-
Notifications
You must be signed in to change notification settings - Fork 91
Description
Tell us about the bug
During the recovery process of a Quixstreams worker, we repeatedly observed log messages showing negative offsets such as -1002 in the recovery progress output. One specific worker remained stuck in the recovery phase for more than 65 minutes without switching to normal operation, while other workers completed recovery successfully. The log continuously displayed entries like:
Recovery progress for <RecoveryPartition "changelog__consumer-group--topic--default[53]">: -1002 / 93518
Recovery progress for <RecoveryPartition "changelog__consumer-group--topic--default[53]">: -1002 / 323624
Recovery progress for <RecoveryPartition "changelog__consumer-group--topic--default[53]">: -1002 / 217138
Only after restarting the agent again did the worker transition to normal operation after ~50 seconds.
What did you expect to see?
We expected the recovery phase to complete reliably and for the worker to transition to normal operation within a reasonable amount of time—especially considering that other workers recovered successfully and that the Kafka infrastructure is low-latency and co-located.
What version of the library are you using?
We are using Quixstreams 3.21.0, running on-premise in Docker.
The Kafka cluster consists of 3 controllers and 9 brokers, all in the same data center with ~0.114 ms ping.
A persistent state directory is configured and mounted outside the container.
Workaround?
Restarting the affected Quixstreams agent appears to resolve the issue temporarily. After a restart, the worker completed recovery within ~50 seconds instead of remaining stuck for more than an hour.
Anything else we should know?
Already discussed this issue with Peter Nagy in Slack: https://stream-processing.slack.com/archives/C033BGFD1L6/p1763405962304739
Additional files if needed:
# Dockerfile
FROM python:3.11
ENV DEBIAN_FRONTEND=noninteractive \
PYTHONUNBUFFERED=1 \
PYTHONIOENCODING=UTF-8 \
PYTHONPATH="/app"
ARG MAINAPPPATH=.
WORKDIR /app
# copy requirements file first
COPY "${MAINAPPPATH}/requirements.txt" "${MAINAPPPATH}/requirements.txt"
# install custom tools and python packages
RUN apt-get update \
&& apt-get install -y jq vim \
&& rm -rf /var/lib/apt/lists/* \
&& pip install --no-cache-dir -r "${MAINAPPPATH}/requirements.txt"
# copy the rest of the data
COPY . .
# adjust workdir
WORKDIR "/app/${MAINAPPPATH}"# requirements.txt
Authlib==1.6.1
avro==1.12.0
cachetools==6.1.0
cassandra-driver==3.29.2
fastavro==1.12.0
googleapis-common-protos==1.70.0
httpx==0.28.1
mysql-connector-python==9.4.0
numpy==2.3.2
pip==25.2
pyrsistent==0.20.0
quixstreams==3.21.0# app.py
from quixstreams import Application
from quixstreams.kafka.configuration import ConnectionConfig
import os
# define the kafka broker connection settings
kafka_connect = ConnectionConfig(
bootstrap_servers="kafka-controller:9092",
security_protocol="ssl",
ssl_ca_location="/ca-cert.pem",
ssl_certificate_location="/kafka.crt",
ssl_key_location="/kafka.key",
enable_ssl_certificate_verification=True,
ssl_endpoint_identification_algorithm="none",
)
# define the app and the kafka broker
app = Application(
broker_address=kafka_connect,
consumer_group=os.getenv("QUIX_CONSUMER_GROUP"),
state_dir=os.getenv("QUIX_STATE_DIR"),
processing_guarantee="exactly-once",
# commit_interval=3,
# commit_every=1,
)