|
| 1 | +import time |
| 2 | + |
1 | 3 | from rabbitmq_amqp_python_client import (
|
2 | 4 | AddressHelper,
|
3 | 5 | AMQPMessagingHandler,
|
@@ -471,3 +473,140 @@ def test_stream_filter_message_properties(
|
471 | 473 | consumer.close()
|
472 | 474 |
|
473 | 475 | management.delete_queue(stream_name)
|
| 476 | + |
| 477 | + |
| 478 | +class MyMessageHandlerApplicationPropertiesFilter(AMQPMessagingHandler): |
| 479 | + def __init__( |
| 480 | + self, |
| 481 | + ): |
| 482 | + super().__init__() |
| 483 | + |
| 484 | + def on_message(self, event: Event): |
| 485 | + self.delivery_context.accept(event) |
| 486 | + assert event.message.application_properties == {"key": "value_17"} |
| 487 | + raise ConsumerTestException("consumed") |
| 488 | + |
| 489 | + |
| 490 | +def test_stream_filter_application_properties( |
| 491 | + connection: Connection, environment: Environment |
| 492 | +) -> None: |
| 493 | + consumer = None |
| 494 | + stream_name = "test_stream_application_message_properties" |
| 495 | + messages_to_send = 30 |
| 496 | + |
| 497 | + queue_specification = StreamSpecification( |
| 498 | + name=stream_name, |
| 499 | + ) |
| 500 | + management = connection.management() |
| 501 | + management.declare_queue(queue_specification) |
| 502 | + |
| 503 | + addr_queue = AddressHelper.queue_address(stream_name) |
| 504 | + |
| 505 | + # consume and then publish |
| 506 | + try: |
| 507 | + connection_consumer = environment.connection() |
| 508 | + connection_consumer.dial() |
| 509 | + consumer = connection_consumer.consumer( |
| 510 | + addr_queue, |
| 511 | + message_handler=MyMessageHandlerApplicationPropertiesFilter(), |
| 512 | + stream_consumer_options=StreamConsumerOptions( |
| 513 | + filter_options=StreamFilterOptions( |
| 514 | + application_properties={"key": "value_17"}, |
| 515 | + ) |
| 516 | + ), |
| 517 | + ) |
| 518 | + publisher = connection.publisher(addr_queue) |
| 519 | + for i in range(messages_to_send): |
| 520 | + msg = Message( |
| 521 | + body=Converter.string_to_bytes("hello_{}".format(i)), |
| 522 | + application_properties={"key": "value_{}".format(i)}, |
| 523 | + ) |
| 524 | + publisher.publish(msg) |
| 525 | + |
| 526 | + publisher.close() |
| 527 | + |
| 528 | + consumer.run() |
| 529 | + # ack to terminate the consumer |
| 530 | + except ConsumerTestException: |
| 531 | + pass |
| 532 | + |
| 533 | + if consumer is not None: |
| 534 | + consumer.close() |
| 535 | + |
| 536 | + management.delete_queue(stream_name) |
| 537 | + |
| 538 | + |
| 539 | +class MyMessageHandlerMixingDifferentFilters(AMQPMessagingHandler): |
| 540 | + def __init__( |
| 541 | + self, |
| 542 | + ): |
| 543 | + super().__init__() |
| 544 | + |
| 545 | + def on_message(self, event: Event): |
| 546 | + self.delivery_context.accept(event) |
| 547 | + assert event.message.annotations["x-stream-filter-value"] == "the_value_filter" |
| 548 | + assert event.message.application_properties == {"key": "app_value_9999"} |
| 549 | + assert event.message.subject == "important_9999" |
| 550 | + assert event.message.body == Converter.string_to_bytes("the_right_one_9999") |
| 551 | + raise ConsumerTestException("consumed") |
| 552 | + |
| 553 | + |
| 554 | +def test_stream_filter_mixing_different( |
| 555 | + connection: Connection, environment: Environment |
| 556 | +) -> None: |
| 557 | + consumer = None |
| 558 | + stream_name = "test_stream_filter_mixing_different" |
| 559 | + messages_to_send = 30 |
| 560 | + |
| 561 | + queue_specification = StreamSpecification( |
| 562 | + name=stream_name, |
| 563 | + ) |
| 564 | + management = connection.management() |
| 565 | + management.delete_queue(stream_name) |
| 566 | + management.declare_queue(queue_specification) |
| 567 | + |
| 568 | + addr_queue = AddressHelper.queue_address(stream_name) |
| 569 | + |
| 570 | + # consume and then publish |
| 571 | + try: |
| 572 | + connection_consumer = environment.connection() |
| 573 | + connection_consumer.dial() |
| 574 | + consumer = connection_consumer.consumer( |
| 575 | + addr_queue, |
| 576 | + message_handler=MyMessageHandlerMixingDifferentFilters(), |
| 577 | + stream_consumer_options=StreamConsumerOptions( |
| 578 | + filter_options=StreamFilterOptions( |
| 579 | + values=["the_value_filter"], |
| 580 | + application_properties={"key": "app_value_9999"}, |
| 581 | + message_properties=MessageProperties(subject="important_9999"), |
| 582 | + ) |
| 583 | + ), |
| 584 | + ) |
| 585 | + publisher = connection.publisher(addr_queue) |
| 586 | + # all these messages will be filtered out |
| 587 | + for i in range(messages_to_send): |
| 588 | + msg = Message( |
| 589 | + body=Converter.string_to_bytes("hello_{}".format(i)), |
| 590 | + ) |
| 591 | + publisher.publish(msg) |
| 592 | + |
| 593 | + time.sleep(1) # wait a bit to ensure messages are published in different chunks |
| 594 | + msg = Message( |
| 595 | + body=Converter.string_to_bytes("the_right_one_9999"), |
| 596 | + annotations={"x-stream-filter-value": "the_value_filter"}, |
| 597 | + application_properties={"key": "app_value_9999"}, |
| 598 | + subject="important_9999", |
| 599 | + ) |
| 600 | + publisher.publish(msg) |
| 601 | + |
| 602 | + publisher.close() |
| 603 | + |
| 604 | + consumer.run() |
| 605 | + # ack to terminate the consumer |
| 606 | + except ConsumerTestException: |
| 607 | + pass |
| 608 | + |
| 609 | + if consumer is not None: |
| 610 | + consumer.close() |
| 611 | + |
| 612 | + management.delete_queue(stream_name) |
0 commit comments