Skip to content

feat: add maxlen param to RedisStreamBroker #81

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 16, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion taskiq_redis/redis_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ def __init__(
consumer_id: str = "$",
mkstream: bool = True,
xread_block: int = 10000,
maxlen: Optional[int] = None,
additional_streams: Optional[Dict[str, str]] = None,
**connection_kwargs: Any,
) -> None:
Expand All @@ -184,6 +185,8 @@ def __init__(
:param mkstream: create stream if it does not exist.
:param xread_block: block time in ms for xreadgroup.
Better to set it to a bigger value, to avoid unnecessary calls.
:param maxlen: sets the maximum length of the stream
trims (the old values of) the stream each time a new element is added
:param additional_streams: additional streams to read from.
Each key is a stream name, value is a consumer id.
"""
Expand All @@ -200,6 +203,7 @@ def __init__(
self.consumer_id = consumer_id
self.mkstream = mkstream
self.block = xread_block
self.maxlen = maxlen
self.additional_streams = additional_streams or {}

async def _declare_consumer_group(self) -> None:
Expand Down Expand Up @@ -235,7 +239,11 @@ async def kick(self, message: BrokerMessage) -> None:
:param message: message to append.
"""
async with Redis(connection_pool=self.connection_pool) as redis_conn:
await redis_conn.xadd(self.queue_name, {b"data": message.message})
await redis_conn.xadd(
self.queue_name,
{b"data": message.message},
maxlen=self.maxlen,
)

def _ack_generator(self, id: str) -> Callable[[], Awaitable[None]]:
async def _ack() -> None:
Expand Down
Loading