Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .ci/ubuntu/gha-setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ set -o xtrace
script_dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
readonly script_dir
echo "[INFO] script_dir: '$script_dir'"
readonly rabbitmq_image=rabbitmq:4.1.0-management
readonly rabbitmq_image=rabbitmq:4.2-rc-management


readonly docker_name_prefix='rabbitmq-amqp-python-client'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def main() -> None:
See: https://www.rabbitmq.com/docs/next/stream-filtering#stage-2-amqp-filter-expressions
"""

queue_name = "stream-example-with_filtering-queue"
queue_name = "stream-example-with-message-properties-filter-queue"
logger.info("Creating connection")
environment = Environment("amqp://guest:guest@localhost:5672/")
connection = create_connection(environment)
Expand Down
144 changes: 144 additions & 0 deletions examples/streams_with_sql_filters/example_streams_with_sql_filters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
# type: ignore
import logging

from rabbitmq_amqp_python_client import (
AddressHelper,
AMQPMessagingHandler,
Connection,
ConnectionClosed,
Converter,
Environment,
Event,
Message,
OffsetSpecification,
StreamConsumerOptions,
StreamFilterOptions,
StreamSpecification,
)

MESSAGES_TO_PUBLISH = 100


class MyMessageHandler(AMQPMessagingHandler):

def __init__(self):
super().__init__()
self._count = 0

def on_amqp_message(self, event: Event):
# only messages with banana filters and with subject yellow
# and application property from = italy get received
self._count = self._count + 1
logger.info(
"Received message: {}, subject {} application properties {} .[Total Consumed: {}]".format(
Converter.bytes_to_string(event.message.body),
event.message.subject,
event.message.application_properties,
self._count,
)
)
self.delivery_context.accept(event)

def on_connection_closed(self, event: Event):
# if you want you can add cleanup operations here
print("connection closed")

def on_link_closed(self, event: Event) -> None:
# if you want you can add cleanup operations here
print("link closed")


def create_connection(environment: Environment) -> Connection:
connection = environment.connection()
connection.dial()

return connection


logging.basicConfig()
logger = logging.getLogger("[streams_with_filters]")
logger.setLevel(logging.INFO)


def main() -> None:
"""
In this example we create a stream queue and a consumer with SQL filter

See: https://www.rabbitmq.com/docs/next/stream-filtering#stage-2-amqp-filter-expressions
"""

queue_name = "stream-example-with-sql-filter-queue"
logger.info("Creating connection")
environment = Environment("amqp://guest:guest@localhost:5672/")
connection = create_connection(environment)
management = connection.management()
# delete the queue if it exists
management.delete_queue(queue_name)
# create a stream queue
management.declare_queue(StreamSpecification(name=queue_name))

addr_queue = AddressHelper.queue_address(queue_name)

consumer_connection = create_connection(environment)
sql = (
"properties.subject LIKE '%in_the_filter%' "
"AND a_in_the_filter_key = 'a_in_the_filter_value'"
)

consumer = consumer_connection.consumer(
addr_queue,
message_handler=MyMessageHandler(),
stream_consumer_options=StreamConsumerOptions(
offset_specification=OffsetSpecification.first,
filter_options=StreamFilterOptions(sql=sql),
),
)
print(
"create a consumer and consume the test message - press control + c to terminate to consume"
)

# print("create a publisher and publish a test message")
publisher = connection.publisher(addr_queue)

# publish messages won't match the filter
for i in range(MESSAGES_TO_PUBLISH):
publisher.publish(Message(Converter.string_to_bytes(body="apple: " + str(i))))

# publish messages that will match the filter
for i in range(MESSAGES_TO_PUBLISH):
msqMatch = Message(
body=Converter.string_to_bytes("the_right_one_sql"),
# will match due of %
subject="something_in_the_filter_{}".format(i),
application_properties={"a_in_the_filter_key": "a_in_the_filter_value"},
)
publisher.publish(msqMatch)

publisher.close()

while True:
try:
consumer.run()
except KeyboardInterrupt:
pass
except ConnectionClosed:
print("connection closed")
continue
except Exception as e:
print("consumer exited for exception " + str(e))

break

#
logger.info("consumer exited, deleting queue")
management.delete_queue(queue_name)

print("closing connections")
management.close()
print("after management closing")
environment.close()
print("after connection closing")


if __name__ == "__main__":
main()
31 changes: 22 additions & 9 deletions rabbitmq_amqp_python_client/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from .exceptions import ValidationCodeException
from .qpid.proton._data import Described, symbol

SQL_FILTER = "sql-filter"
AMQP_SQL_FILTER = "amqp:sql-filter"
STREAM_FILTER_SPEC = "rabbitmq:stream-filter"
STREAM_OFFSET_SPEC = "rabbitmq:stream-offset-spec"
STREAM_FILTER_MATCH_UNFILTERED = "rabbitmq:stream-match-unfiltered"
Expand Down Expand Up @@ -159,7 +161,6 @@ class MessageProperties:
Attributes:
message_id: Uniquely identifies a message within the system (int, UUID, bytes, or str).
user_id: Identity of the user responsible for producing the message.
to: Intended destination node of the message.
subject: Summary information about the message content and purpose.
reply_to: Address of the node to send replies to.
correlation_id: Client-specific id for marking or identifying messages (int, UUID, bytes, or str).
Expand All @@ -174,7 +175,6 @@ class MessageProperties:

message_id: Optional[Union[int, str, bytes]] = None
user_id: Optional[bytes] = None
to: Optional[str] = None
subject: Optional[str] = None
reply_to: Optional[str] = None
correlation_id: Optional[Union[int, str, bytes]] = None
Expand Down Expand Up @@ -245,20 +245,24 @@ def __init__(
if offset_specification is not None:
self._offset(offset_specification)

if filter_options is not None and filter_options.values is not None:
if filter_options is None:
return

if filter_options.values is not None:
self._filter_values(filter_options.values)

if filter_options is not None and filter_options.match_unfiltered:
if filter_options.match_unfiltered:
self._filter_match_unfiltered(filter_options.match_unfiltered)

if filter_options is not None and filter_options.message_properties is not None:
if filter_options.message_properties is not None:
self._filter_message_properties(filter_options.message_properties)
if (
filter_options is not None
and filter_options.application_properties is not None
):

if filter_options.application_properties is not None:
self._filter_application_properties(filter_options.application_properties)

if filter_options.sql is not None and filter_options.sql != "":
self._filter_sql(filter_options.sql)

def _offset(self, offset_specification: Union[OffsetSpecification, int]) -> None:
"""
Set the offset specification for the stream.
Expand Down Expand Up @@ -334,6 +338,15 @@ def _filter_application_properties(
Described(symbol(AMQP_APPLICATION_PROPERTIES_FILTER), app_prop)
)

def _filter_sql(self, sql: str) -> None:
"""
Set SQL filter for the stream.

Args:
sql: SQL string to apply as a filter
"""
self._filter_set[symbol(SQL_FILTER)] = Described(symbol(AMQP_SQL_FILTER), sql)

def filter_set(self) -> Dict[symbol, Described]:
"""
Get the current filter set configuration.
Expand Down
Loading