Skip to content

Commit 42048b0

Browse files
implement SQL Filter (#80)
This PR implements SQL filter functionality for RabbitMQ AMQP streams, allowing users to filter messages using SQL-like expressions. The implementation adds support for SQL filters in the stream consumer options and includes comprehensive test coverage and documentation. Added SQL filter support to StreamConsumerOptions and StreamFilterOptions classes Created comprehensive test cases to verify SQL filter functionality Updated documentation with a new example demonstrating SQL filter usage closes: #73 --------- Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent 62b4608 commit 42048b0

File tree

5 files changed

+280
-59
lines changed

5 files changed

+280
-59
lines changed

.ci/ubuntu/gha-setup.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ set -o xtrace
77
script_dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
88
readonly script_dir
99
echo "[INFO] script_dir: '$script_dir'"
10-
readonly rabbitmq_image=rabbitmq:4.1.0-management
10+
readonly rabbitmq_image=rabbitmq:4.2-rc-management
1111

1212

1313
readonly docker_name_prefix='rabbitmq-amqp-python-client'

examples/streams_with_filters/example_streams_with_filters.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ def main() -> None:
7272
See: https://www.rabbitmq.com/docs/next/stream-filtering#stage-2-amqp-filter-expressions
7373
"""
7474

75-
queue_name = "stream-example-with_filtering-queue"
75+
queue_name = "stream-example-with-message-properties-filter-queue"
7676
logger.info("Creating connection")
7777
environment = Environment("amqp://guest:guest@localhost:5672/")
7878
connection = create_connection(environment)
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
# type: ignore
2+
import logging
3+
4+
from rabbitmq_amqp_python_client import (
5+
AddressHelper,
6+
AMQPMessagingHandler,
7+
Connection,
8+
ConnectionClosed,
9+
Converter,
10+
Environment,
11+
Event,
12+
Message,
13+
OffsetSpecification,
14+
StreamConsumerOptions,
15+
StreamFilterOptions,
16+
StreamSpecification,
17+
)
18+
19+
MESSAGES_TO_PUBLISH = 100
20+
21+
22+
class MyMessageHandler(AMQPMessagingHandler):
23+
24+
def __init__(self):
25+
super().__init__()
26+
self._count = 0
27+
28+
def on_amqp_message(self, event: Event):
29+
# only messages with banana filters and with subject yellow
30+
# and application property from = italy get received
31+
self._count = self._count + 1
32+
logger.info(
33+
"Received message: {}, subject {} application properties {} .[Total Consumed: {}]".format(
34+
Converter.bytes_to_string(event.message.body),
35+
event.message.subject,
36+
event.message.application_properties,
37+
self._count,
38+
)
39+
)
40+
self.delivery_context.accept(event)
41+
42+
def on_connection_closed(self, event: Event):
43+
# if you want you can add cleanup operations here
44+
print("connection closed")
45+
46+
def on_link_closed(self, event: Event) -> None:
47+
# if you want you can add cleanup operations here
48+
print("link closed")
49+
50+
51+
def create_connection(environment: Environment) -> Connection:
52+
connection = environment.connection()
53+
connection.dial()
54+
55+
return connection
56+
57+
58+
logging.basicConfig()
59+
logger = logging.getLogger("[streams_with_filters]")
60+
logger.setLevel(logging.INFO)
61+
62+
63+
def main() -> None:
64+
"""
65+
In this example we create a stream queue and a consumer with SQL filter
66+
67+
See: https://www.rabbitmq.com/docs/next/stream-filtering#stage-2-amqp-filter-expressions
68+
"""
69+
70+
queue_name = "stream-example-with-sql-filter-queue"
71+
logger.info("Creating connection")
72+
environment = Environment("amqp://guest:guest@localhost:5672/")
73+
connection = create_connection(environment)
74+
management = connection.management()
75+
# delete the queue if it exists
76+
management.delete_queue(queue_name)
77+
# create a stream queue
78+
management.declare_queue(StreamSpecification(name=queue_name))
79+
80+
addr_queue = AddressHelper.queue_address(queue_name)
81+
82+
consumer_connection = create_connection(environment)
83+
sql = (
84+
"properties.subject LIKE '%in_the_filter%' "
85+
"AND a_in_the_filter_key = 'a_in_the_filter_value'"
86+
)
87+
88+
consumer = consumer_connection.consumer(
89+
addr_queue,
90+
message_handler=MyMessageHandler(),
91+
stream_consumer_options=StreamConsumerOptions(
92+
offset_specification=OffsetSpecification.first,
93+
filter_options=StreamFilterOptions(sql=sql),
94+
),
95+
)
96+
print(
97+
"create a consumer and consume the test message - press control + c to terminate to consume"
98+
)
99+
100+
# print("create a publisher and publish a test message")
101+
publisher = connection.publisher(addr_queue)
102+
103+
# publish messages won't match the filter
104+
for i in range(MESSAGES_TO_PUBLISH):
105+
publisher.publish(Message(Converter.string_to_bytes(body="apple: " + str(i))))
106+
107+
# publish messages that will match the filter
108+
for i in range(MESSAGES_TO_PUBLISH):
109+
msqMatch = Message(
110+
body=Converter.string_to_bytes("the_right_one_sql"),
111+
# will match due of %
112+
subject="something_in_the_filter_{}".format(i),
113+
application_properties={"a_in_the_filter_key": "a_in_the_filter_value"},
114+
)
115+
publisher.publish(msqMatch)
116+
117+
publisher.close()
118+
119+
while True:
120+
try:
121+
consumer.run()
122+
except KeyboardInterrupt:
123+
pass
124+
except ConnectionClosed:
125+
print("connection closed")
126+
continue
127+
except Exception as e:
128+
print("consumer exited for exception " + str(e))
129+
130+
break
131+
132+
#
133+
logger.info("consumer exited, deleting queue")
134+
management.delete_queue(queue_name)
135+
136+
print("closing connections")
137+
management.close()
138+
print("after management closing")
139+
environment.close()
140+
print("after connection closing")
141+
142+
143+
if __name__ == "__main__":
144+
main()

rabbitmq_amqp_python_client/entities.py

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
from .exceptions import ValidationCodeException
88
from .qpid.proton._data import Described, symbol
99

10+
SQL_FILTER = "sql-filter"
11+
AMQP_SQL_FILTER = "amqp:sql-filter"
1012
STREAM_FILTER_SPEC = "rabbitmq:stream-filter"
1113
STREAM_OFFSET_SPEC = "rabbitmq:stream-offset-spec"
1214
STREAM_FILTER_MATCH_UNFILTERED = "rabbitmq:stream-match-unfiltered"
@@ -159,7 +161,6 @@ class MessageProperties:
159161
Attributes:
160162
message_id: Uniquely identifies a message within the system (int, UUID, bytes, or str).
161163
user_id: Identity of the user responsible for producing the message.
162-
to: Intended destination node of the message.
163164
subject: Summary information about the message content and purpose.
164165
reply_to: Address of the node to send replies to.
165166
correlation_id: Client-specific id for marking or identifying messages (int, UUID, bytes, or str).
@@ -174,7 +175,6 @@ class MessageProperties:
174175

175176
message_id: Optional[Union[int, str, bytes]] = None
176177
user_id: Optional[bytes] = None
177-
to: Optional[str] = None
178178
subject: Optional[str] = None
179179
reply_to: Optional[str] = None
180180
correlation_id: Optional[Union[int, str, bytes]] = None
@@ -245,20 +245,24 @@ def __init__(
245245
if offset_specification is not None:
246246
self._offset(offset_specification)
247247

248-
if filter_options is not None and filter_options.values is not None:
248+
if filter_options is None:
249+
return
250+
251+
if filter_options.values is not None:
249252
self._filter_values(filter_options.values)
250253

251-
if filter_options is not None and filter_options.match_unfiltered:
254+
if filter_options.match_unfiltered:
252255
self._filter_match_unfiltered(filter_options.match_unfiltered)
253256

254-
if filter_options is not None and filter_options.message_properties is not None:
257+
if filter_options.message_properties is not None:
255258
self._filter_message_properties(filter_options.message_properties)
256-
if (
257-
filter_options is not None
258-
and filter_options.application_properties is not None
259-
):
259+
260+
if filter_options.application_properties is not None:
260261
self._filter_application_properties(filter_options.application_properties)
261262

263+
if filter_options.sql is not None and filter_options.sql != "":
264+
self._filter_sql(filter_options.sql)
265+
262266
def _offset(self, offset_specification: Union[OffsetSpecification, int]) -> None:
263267
"""
264268
Set the offset specification for the stream.
@@ -334,6 +338,15 @@ def _filter_application_properties(
334338
Described(symbol(AMQP_APPLICATION_PROPERTIES_FILTER), app_prop)
335339
)
336340

341+
def _filter_sql(self, sql: str) -> None:
342+
"""
343+
Set SQL filter for the stream.
344+
345+
Args:
346+
sql: SQL string to apply as a filter
347+
"""
348+
self._filter_set[symbol(SQL_FILTER)] = Described(symbol(AMQP_SQL_FILTER), sql)
349+
337350
def filter_set(self) -> Dict[symbol, Described]:
338351
"""
339352
Get the current filter set configuration.

0 commit comments

Comments
 (0)