Skip to content

Commit bb54051

Browse files
improving stream options API (#55)
* improving stream options API * name convention
1 parent eb5c2ac commit bb54051

File tree

3 files changed

+50
-46
lines changed

3 files changed

+50
-46
lines changed

examples/streams/example_with_streams.py

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
StreamSpecification,
1414
)
1515

16-
MESSAGES_TO_PUBLISH = 1
16+
MESSAGES_TO_PUBLISH = 100
1717

1818

1919
class MyMessageHandler(AMQPMessagingHandler):
@@ -87,7 +87,7 @@ def main() -> None:
8787
queue_name = "example-queue"
8888

8989
print("connection to amqp server")
90-
environment = Environment("amqp://guest:guest@localhost:5672/", reconnect=True)
90+
environment = Environment("amqp://guest:guest@localhost:5672/")
9191
connection = create_connection(environment)
9292

9393
management = connection.management()
@@ -98,16 +98,14 @@ def main() -> None:
9898

9999
consumer_connection = create_connection(environment)
100100

101-
stream_filter_options = StreamOptions()
102-
# can be first, last, next or an offset long
103-
# you can also specify stream filters with methods: apply_filters and filter_match_unfiltered
104-
stream_filter_options.offset(OffsetSpecification.first)
105-
stream_filter_options.filter_values(["banana"])
106-
107101
consumer = consumer_connection.consumer(
108102
addr_queue,
109103
message_handler=MyMessageHandler(),
110-
stream_filter_options=stream_filter_options,
104+
# can be first, last, next or an offset long
105+
# you can also specify stream filters with methods: apply_filters and filter_match_unfiltered
106+
stream_filter_options=StreamOptions(
107+
offset_specification=OffsetSpecification.first, filters=["banana"]
108+
),
111109
)
112110
print(
113111
"create a consumer and consume the test message - press control + c to terminate to consume"

rabbitmq_amqp_python_client/entities.py

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from typing import Any, Dict, Optional, Union
55

66
from .common import ExchangeType, QueueType
7+
from .exceptions import ValidationCodeException
78
from .qpid.proton._data import Described, symbol
89

910
STREAM_FILTER_SPEC = "rabbitmq:stream-filter"
@@ -156,12 +157,35 @@ class StreamOptions:
156157
157158
Attributes:
158159
_filter_set: Dictionary of stream filter specifications
160+
161+
Args:
162+
offset_specification: Either an OffsetSpecification enum value or
163+
an integer offset
164+
filters: List of filter strings to apply to the stream
159165
"""
160166

161-
def __init__(self): # type: ignore
167+
def __init__(
168+
self,
169+
offset_specification: Optional[Union[OffsetSpecification, int]] = None,
170+
filters: Optional[list[str]] = None,
171+
filter_match_unfiltered: bool = False,
172+
):
173+
174+
if offset_specification is None and filters is None:
175+
raise ValidationCodeException(
176+
"At least one between offset_specification and filters must be set when setting up filtering"
177+
)
162178
self._filter_set: Dict[symbol, Described] = {}
179+
if offset_specification is not None:
180+
self._offset(offset_specification)
181+
182+
if filters is not None:
183+
self._filter_values(filters)
184+
185+
if filter_match_unfiltered is True:
186+
self._filter_match_unfiltered(filter_match_unfiltered)
163187

164-
def offset(self, offset_specification: Union[OffsetSpecification, int]) -> None:
188+
def _offset(self, offset_specification: Union[OffsetSpecification, int]) -> None:
165189
"""
166190
Set the offset specification for the stream.
167191
@@ -178,7 +202,7 @@ def offset(self, offset_specification: Union[OffsetSpecification, int]) -> None:
178202
symbol(STREAM_OFFSET_SPEC), offset_specification.name
179203
)
180204

181-
def filter_values(self, filters: list[str]) -> None:
205+
def _filter_values(self, filters: list[str]) -> None:
182206
"""
183207
Set the filter values for the stream.
184208
@@ -189,7 +213,7 @@ def filter_values(self, filters: list[str]) -> None:
189213
symbol(STREAM_FILTER_SPEC), filters
190214
)
191215

192-
def filter_match_unfiltered(self, filter_match_unfiltered: bool) -> None:
216+
def _filter_match_unfiltered(self, filter_match_unfiltered: bool) -> None:
193217
"""
194218
Set whether to match unfiltered messages.
195219

tests/test_streams.py

Lines changed: 15 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -65,17 +65,16 @@ def test_stream_read_from_last(
6565

6666
addr_queue = AddressHelper.queue_address(stream_name)
6767

68-
stream_filter_options = StreamOptions()
69-
stream_filter_options.offset(OffsetSpecification.last)
70-
7168
# consume and then publish
7269
try:
7370
connection_consumer = environment.connection()
7471
connection_consumer.dial()
7572
consumer = connection_consumer.consumer(
7673
addr_queue,
7774
message_handler=MyMessageHandlerAcceptStreamOffset(),
78-
stream_filter_options=stream_filter_options,
75+
stream_filter_options=StreamOptions(
76+
offset_specification=OffsetSpecification.last
77+
),
7978
)
8079
publish_messages(connection, messages_to_send, stream_name)
8180
consumer.run()
@@ -107,16 +106,13 @@ def test_stream_read_from_offset_zero(
107106
# publish and then consume
108107
publish_messages(connection, messages_to_send, stream_name)
109108

110-
stream_filter_options = StreamOptions()
111-
stream_filter_options.offset(0)
112-
113109
try:
114110
connection_consumer = environment.connection()
115111
connection_consumer.dial()
116112
consumer = connection_consumer.consumer(
117113
addr_queue,
118114
message_handler=MyMessageHandlerAcceptStreamOffset(0),
119-
stream_filter_options=stream_filter_options,
115+
stream_filter_options=StreamOptions(offset_specification=0),
120116
)
121117

122118
consumer.run()
@@ -148,16 +144,13 @@ def test_stream_read_from_offset_first(
148144
# publish and then consume
149145
publish_messages(connection, messages_to_send, stream_name)
150146

151-
stream_filter_options = StreamOptions()
152-
stream_filter_options.offset(OffsetSpecification.first)
153-
154147
try:
155148
connection_consumer = environment.connection()
156149
connection_consumer.dial()
157150
consumer = connection_consumer.consumer(
158151
addr_queue,
159152
message_handler=MyMessageHandlerAcceptStreamOffset(0),
160-
stream_filter_options=stream_filter_options,
153+
stream_filter_options=StreamOptions(OffsetSpecification.first),
161154
)
162155

163156
consumer.run()
@@ -189,16 +182,13 @@ def test_stream_read_from_offset_ten(
189182
# publish and then consume
190183
publish_messages(connection, messages_to_send, stream_name)
191184

192-
stream_filter_options = StreamOptions()
193-
stream_filter_options.offset(10)
194-
195185
try:
196186
connection_consumer = environment.connection()
197187
connection_consumer.dial()
198188
consumer = connection_consumer.consumer(
199189
addr_queue,
200190
message_handler=MyMessageHandlerAcceptStreamOffset(10),
201-
stream_filter_options=stream_filter_options,
191+
stream_filter_options=StreamOptions(offset_specification=10),
202192
)
203193

204194
consumer.run()
@@ -228,15 +218,13 @@ def test_stream_filtering(connection: Connection, environment: Environment) -> N
228218

229219
# consume and then publish
230220
try:
231-
stream_filter_options = StreamOptions()
232-
stream_filter_options.filter_values(["banana"])
233221
connection_consumer = environment.connection()
234222
connection_consumer.dial()
235223

236224
consumer = connection_consumer.consumer(
237225
addr_queue,
238226
message_handler=MyMessageHandlerAcceptStreamOffset(),
239-
stream_filter_options=stream_filter_options,
227+
stream_filter_options=StreamOptions(filters=["banana"]),
240228
)
241229
# send with annotations filter banana
242230
publish_messages(connection, messages_to_send, stream_name, ["banana"])
@@ -268,15 +256,13 @@ def test_stream_filtering_mixed(
268256

269257
# consume and then publish
270258
try:
271-
stream_filter_options = StreamOptions()
272-
stream_filter_options.filter_values(["banana"])
273259
connection_consumer = environment.connection()
274260
connection_consumer.dial()
275261
consumer = connection_consumer.consumer(
276262
addr_queue,
277263
# check we are reading just from offset 10 as just banana filtering applies
278264
message_handler=MyMessageHandlerAcceptStreamOffset(10),
279-
stream_filter_options=stream_filter_options,
265+
stream_filter_options=StreamOptions(filters=["banana"]),
280266
)
281267
# send with annotations filter apple and then banana
282268
# consumer will read just from offset 10
@@ -309,13 +295,11 @@ def test_stream_filtering_not_present(
309295
addr_queue = AddressHelper.queue_address(stream_name)
310296

311297
# consume and then publish
312-
stream_filter_options = StreamOptions()
313-
stream_filter_options.filter_values(["apple"])
314298
connection_consumer = environment.connection()
315299
connection_consumer.dial()
316300

317301
consumer = connection_consumer.consumer(
318-
addr_queue, stream_filter_options=stream_filter_options
302+
addr_queue, stream_filter_options=StreamOptions(filters=["apple"])
319303
)
320304
# send with annotations filter banana
321305
publish_messages(connection, messages_to_send, stream_name, ["banana"])
@@ -351,15 +335,14 @@ def test_stream_match_unfiltered(
351335

352336
# consume and then publish
353337
try:
354-
stream_filter_options = StreamOptions()
355-
stream_filter_options.filter_values(["banana"])
356-
stream_filter_options.filter_match_unfiltered(True)
357338
connection_consumer = environment.connection()
358339
connection_consumer.dial()
359340
consumer = connection_consumer.consumer(
360341
addr_queue,
361342
message_handler=MyMessageHandlerAcceptStreamOffset(),
362-
stream_filter_options=stream_filter_options,
343+
stream_filter_options=StreamOptions(
344+
filters=["banana"], filter_match_unfiltered=True
345+
),
363346
)
364347
# send with annotations filter banana
365348
publish_messages(connection, messages_to_send, stream_name)
@@ -391,16 +374,15 @@ def test_stream_reconnection(
391374

392375
# consume and then publish
393376
try:
394-
stream_filter_options = StreamOptions()
395-
stream_filter_options.filter_values(["banana"])
396-
stream_filter_options.filter_match_unfiltered(True)
397377
connection_consumer = environment.connection()
398378
connection_consumer.dial()
399379
consumer = connection_consumer.consumer(
400380
addr_queue,
401381
# disconnection and check happens here
402382
message_handler=MyMessageHandlerAcceptStreamOffsetReconnect(),
403-
stream_filter_options=stream_filter_options,
383+
stream_filter_options=StreamOptions(
384+
filters=["banana"], filter_match_unfiltered=True
385+
),
404386
)
405387
# send with annotations filter banana
406388
publish_messages(connection_with_reconnect, messages_to_send, stream_name)

0 commit comments

Comments
 (0)