From 6a08cb2634f28e4c3b441279e421d24f2c6320d5 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Fri, 12 Sep 2025 16:39:03 +0200 Subject: [PATCH 1/6] implement SQL Filter closes: https://github.com/rabbitmq/rabbitmq-amqp-python-client/issues/73 Signed-off-by: Gabriele Santomaggio --- rabbitmq_amqp_python_client/entities.py | 32 +++++++++++++++++++------ 1 file changed, 25 insertions(+), 7 deletions(-) diff --git a/rabbitmq_amqp_python_client/entities.py b/rabbitmq_amqp_python_client/entities.py index 5423006..eb08b11 100644 --- a/rabbitmq_amqp_python_client/entities.py +++ b/rabbitmq_amqp_python_client/entities.py @@ -7,6 +7,8 @@ from .exceptions import ValidationCodeException from .qpid.proton._data import Described, symbol +SQL_FILTER = "sql-filter" +AMQP_SQL_FILTER = "amqp:sql-filter" STREAM_FILTER_SPEC = "rabbitmq:stream-filter" STREAM_OFFSET_SPEC = "rabbitmq:stream-offset-spec" STREAM_FILTER_MATCH_UNFILTERED = "rabbitmq:stream-match-unfiltered" @@ -245,20 +247,24 @@ def __init__( if offset_specification is not None: self._offset(offset_specification) - if filter_options is not None and filter_options.values is not None: + if filter_options is None: + return + + if filter_options.values is not None: self._filter_values(filter_options.values) - if filter_options is not None and filter_options.match_unfiltered: + if filter_options.match_unfiltered: self._filter_match_unfiltered(filter_options.match_unfiltered) - if filter_options is not None and filter_options.message_properties is not None: + if filter_options.message_properties is not None: self._filter_message_properties(filter_options.message_properties) - if ( - filter_options is not None - and filter_options.application_properties is not None - ): + + if filter_options.application_properties is not None: self._filter_application_properties(filter_options.application_properties) + if filter_options.sql is not None and filter_options.sql != "": + self._filter_sql(filter_options.sql) + def _offset(self, offset_specification: Union[OffsetSpecification, int]) -> None: """ Set the offset specification for the stream. @@ -334,6 +340,18 @@ def _filter_application_properties( Described(symbol(AMQP_APPLICATION_PROPERTIES_FILTER), app_prop) ) + def _filter_sql(self, sql: str) -> None: + """ + Set SQL filter for the stream. + + Args: + sql: SQL string to apply as a filter + """ + if sql != "": + self._filter_set[symbol(SQL_FILTER)] = Described( + symbol(AMQP_SQL_FILTER), sql + ) + def filter_set(self) -> Dict[symbol, Described]: """ Get the current filter set configuration. From 4023804fc8938e25d2e521d60a03149e8bfeac3b Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Fri, 12 Sep 2025 17:06:52 +0200 Subject: [PATCH 2/6] implement SQL Filter closes: https://github.com/rabbitmq/rabbitmq-amqp-python-client/issues/73 Signed-off-by: Gabriele Santomaggio --- .ci/ubuntu/gha-setup.sh | 2 +- rabbitmq_amqp_python_client/entities.py | 2 - tests/test_streams.py | 76 +++++++++++++++++++++++++ 3 files changed, 77 insertions(+), 3 deletions(-) diff --git a/.ci/ubuntu/gha-setup.sh b/.ci/ubuntu/gha-setup.sh index ffb6760..619ccae 100755 --- a/.ci/ubuntu/gha-setup.sh +++ b/.ci/ubuntu/gha-setup.sh @@ -7,7 +7,7 @@ set -o xtrace script_dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" readonly script_dir echo "[INFO] script_dir: '$script_dir'" -readonly rabbitmq_image=rabbitmq:4.1.0-management +readonly rabbitmq_image=rabbitmq:4.2-rc-management readonly docker_name_prefix='rabbitmq-amqp-python-client' diff --git a/rabbitmq_amqp_python_client/entities.py b/rabbitmq_amqp_python_client/entities.py index eb08b11..f0ea81a 100644 --- a/rabbitmq_amqp_python_client/entities.py +++ b/rabbitmq_amqp_python_client/entities.py @@ -161,7 +161,6 @@ class MessageProperties: Attributes: message_id: Uniquely identifies a message within the system (int, UUID, bytes, or str). user_id: Identity of the user responsible for producing the message. - to: Intended destination node of the message. subject: Summary information about the message content and purpose. reply_to: Address of the node to send replies to. correlation_id: Client-specific id for marking or identifying messages (int, UUID, bytes, or str). @@ -176,7 +175,6 @@ class MessageProperties: message_id: Optional[Union[int, str, bytes]] = None user_id: Optional[bytes] = None - to: Optional[str] = None subject: Optional[str] = None reply_to: Optional[str] = None correlation_id: Optional[Union[int, str, bytes]] = None diff --git a/tests/test_streams.py b/tests/test_streams.py index a3b49f2..0a16d58 100644 --- a/tests/test_streams.py +++ b/tests/test_streams.py @@ -536,6 +536,82 @@ def test_stream_filter_application_properties( management.delete_queue(stream_name) +class MyMessageHandlerSQLFilter(AMQPMessagingHandler): + def __init__( + self, + ): + super().__init__() + + def on_message(self, event: Event): + self.delivery_context.accept(event) + assert event.message.body == Converter.string_to_bytes("the_right_one_sql") + assert event.message.subject == "something_in_the_filter" + assert event.message.reply_to == "the_reply_to" + assert ( + event.message.application_properties["a_in_the_filter_key"] + == "a_in_the_filter_value" + ) + + raise ConsumerTestException("consumed") + + +def test_stream_filter_sql(connection: Connection, environment: Environment) -> None: + consumer = None + stream_name = "test_stream_filter_sql" + messages_to_send = 30 + + queue_specification = StreamSpecification( + name=stream_name, + ) + management = connection.management() + management.delete_queue(stream_name) + management.declare_queue(queue_specification) + + addr_queue = AddressHelper.queue_address(stream_name) + sql = ( + "properties.subject LIKE '%in_the_filter%' AND properties.reply_to = 'the_reply_to' " + "AND a_in_the_filter_key = 'a_in_the_filter_value'" + ) + try: + connection_consumer = environment.connection() + connection_consumer.dial() + consumer = connection_consumer.consumer( + addr_queue, + message_handler=MyMessageHandlerSQLFilter(), + stream_consumer_options=StreamConsumerOptions( + filter_options=StreamFilterOptions(sql=sql) + ), + ) + publisher = connection.publisher(addr_queue) + # won't match + for i in range(messages_to_send): + msg = Message( + body=Converter.string_to_bytes("hello_{}".format(i)), + ) + publisher.publish(msg) + + msqMatch = Message( + body=Converter.string_to_bytes("the_right_one_sql"), + subject="something_in_the_filter", + reply_to="the_reply_to", + application_properties={"a_in_the_filter_key": "a_in_the_filter_value"}, + ) + + publisher.publish(msqMatch) + + publisher.close() + + consumer.run() + # ack to terminate the consumer + except ConsumerTestException: + pass + + if consumer is not None: + consumer.close() + + management.delete_queue(stream_name) + + class MyMessageHandlerMixingDifferentFilters(AMQPMessagingHandler): def __init__( self, From 70621dfd064933c17ef54f06ef2b1cdb6df19ac6 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Fri, 12 Sep 2025 17:28:44 +0200 Subject: [PATCH 3/6] formatting Signed-off-by: Gabriele Santomaggio --- tests/test_streams.py | 99 +++++++++++++++++++------------------------ 1 file changed, 43 insertions(+), 56 deletions(-) diff --git a/tests/test_streams.py b/tests/test_streams.py index 0a16d58..231f316 100644 --- a/tests/test_streams.py +++ b/tests/test_streams.py @@ -54,10 +54,9 @@ def test_stream_read_from_last_default( # ack to terminate the consumer except ConsumerTestException: pass - - consumer.close() - - management.delete_queue(stream_name) + finally: + consumer.close() + management.delete_queue(stream_name) def test_stream_read_from_last( @@ -91,10 +90,9 @@ def test_stream_read_from_last( # ack to terminate the consumer except ConsumerTestException: pass - - consumer.close() - - management.delete_queue(stream_name) + finally: + consumer.close() + management.delete_queue(stream_name) def test_stream_read_from_offset_zero( @@ -128,10 +126,9 @@ def test_stream_read_from_offset_zero( # ack to terminate the consumer except ConsumerTestException: pass - - consumer.close() - - management.delete_queue(stream_name) + finally: + consumer.close() + management.delete_queue(stream_name) def test_stream_read_from_offset_first( @@ -165,10 +162,9 @@ def test_stream_read_from_offset_first( # ack to terminate the consumer except ConsumerTestException: pass - - consumer.close() - - management.delete_queue(stream_name) + finally: + consumer.close() + management.delete_queue(stream_name) def test_stream_read_from_offset_ten( @@ -203,10 +199,9 @@ def test_stream_read_from_offset_ten( # this will finish after 10 messages read except ConsumerTestException: pass - - consumer.close() - - management.delete_queue(stream_name) + finally: + consumer.close() + management.delete_queue(stream_name) def test_stream_filtering(connection: Connection, environment: Environment) -> None: @@ -240,10 +235,9 @@ def test_stream_filtering(connection: Connection, environment: Environment) -> N # ack to terminate the consumer except ConsumerTestException: pass - - consumer.close() - - management.delete_queue(stream_name) + finally: + consumer.close() + management.delete_queue(stream_name) def test_stream_filtering_mixed( @@ -281,10 +275,9 @@ def test_stream_filtering_mixed( # ack to terminate the consumer except ConsumerTestException: pass - - consumer.close() - - management.delete_queue(stream_name) + finally: + consumer.close() + management.delete_queue(stream_name) def test_stream_filtering_not_present( @@ -362,10 +355,9 @@ def test_stream_match_unfiltered( # ack to terminate the consumer except ConsumerTestException: pass - - consumer.close() - - management.delete_queue(stream_name) + finally: + consumer.close() + management.delete_queue(stream_name) def test_stream_reconnection( @@ -403,10 +395,9 @@ def test_stream_reconnection( # ack to terminate the consumer except ConsumerTestException: pass - - consumer.close() - - management.delete_queue(stream_name) + finally: + consumer.close() + management.delete_queue(stream_name) class MyMessageHandlerMessagePropertiesFilter(AMQPMessagingHandler): @@ -468,11 +459,10 @@ def test_stream_filter_message_properties( # ack to terminate the consumer except ConsumerTestException: pass - - if consumer is not None: - consumer.close() - - management.delete_queue(stream_name) + finally: + if consumer is not None: + consumer.close() + management.delete_queue(stream_name) class MyMessageHandlerApplicationPropertiesFilter(AMQPMessagingHandler): @@ -529,11 +519,10 @@ def test_stream_filter_application_properties( # ack to terminate the consumer except ConsumerTestException: pass - - if consumer is not None: - consumer.close() - - management.delete_queue(stream_name) + finally: + if consumer is not None: + consumer.close() + management.delete_queue(stream_name) class MyMessageHandlerSQLFilter(AMQPMessagingHandler): @@ -605,11 +594,10 @@ def test_stream_filter_sql(connection: Connection, environment: Environment) -> # ack to terminate the consumer except ConsumerTestException: pass - - if consumer is not None: - consumer.close() - - management.delete_queue(stream_name) + finally: + if consumer is not None: + consumer.close() + management.delete_queue(stream_name) class MyMessageHandlerMixingDifferentFilters(AMQPMessagingHandler): @@ -681,8 +669,7 @@ def test_stream_filter_mixing_different( # ack to terminate the consumer except ConsumerTestException: pass - - if consumer is not None: - consumer.close() - - management.delete_queue(stream_name) + finally: + if consumer is not None: + consumer.close() + management.delete_queue(stream_name) From 404f1179c7f825f5bdb12585fae1275d156c4d25 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Mon, 15 Sep 2025 09:42:55 +0200 Subject: [PATCH 4/6] example Signed-off-by: Gabriele Santomaggio --- .../example_streams_with_filters.py | 2 +- .../example_streams_with_sql_filters.py | 144 ++++++++++++++++++ tests/test_streams.py | 1 + 3 files changed, 146 insertions(+), 1 deletion(-) create mode 100644 examples/streams_with_sql_filters/example_streams_with_sql_filters.py diff --git a/examples/streams_with_filters/example_streams_with_filters.py b/examples/streams_with_filters/example_streams_with_filters.py index 9f5c2dc..1365866 100644 --- a/examples/streams_with_filters/example_streams_with_filters.py +++ b/examples/streams_with_filters/example_streams_with_filters.py @@ -72,7 +72,7 @@ def main() -> None: See: https://www.rabbitmq.com/docs/next/stream-filtering#stage-2-amqp-filter-expressions """ - queue_name = "stream-example-with_filtering-queue" + queue_name = "stream-example-with-message-properties-filter-queue" logger.info("Creating connection") environment = Environment("amqp://guest:guest@localhost:5672/") connection = create_connection(environment) diff --git a/examples/streams_with_sql_filters/example_streams_with_sql_filters.py b/examples/streams_with_sql_filters/example_streams_with_sql_filters.py new file mode 100644 index 0000000..435b8f9 --- /dev/null +++ b/examples/streams_with_sql_filters/example_streams_with_sql_filters.py @@ -0,0 +1,144 @@ +# type: ignore +import logging + +from rabbitmq_amqp_python_client import ( + AddressHelper, + AMQPMessagingHandler, + Connection, + ConnectionClosed, + Converter, + Environment, + Event, + Message, + OffsetSpecification, + StreamConsumerOptions, + StreamFilterOptions, + StreamSpecification, +) + +MESSAGES_TO_PUBLISH = 100 + + +class MyMessageHandler(AMQPMessagingHandler): + + def __init__(self): + super().__init__() + self._count = 0 + + def on_amqp_message(self, event: Event): + # only messages with banana filters and with subject yellow + # and application property from = italy get received + self._count = self._count + 1 + logger.info( + "Received message: {}, subject {} application properties {} .[Total Consumed: {}]".format( + Converter.bytes_to_string(event.message.body), + event.message.subject, + event.message.application_properties, + self._count, + ) + ) + self.delivery_context.accept(event) + + def on_connection_closed(self, event: Event): + # if you want you can add cleanup operations here + print("connection closed") + + def on_link_closed(self, event: Event) -> None: + # if you want you can add cleanup operations here + print("link closed") + + +def create_connection(environment: Environment) -> Connection: + connection = environment.connection() + connection.dial() + + return connection + + +logging.basicConfig() +logger = logging.getLogger("[streams_with_filters]") +logger.setLevel(logging.INFO) + + +def main() -> None: + """ + In this example we create a stream queue and a consumer with SQL filter + + See: https://www.rabbitmq.com/docs/next/stream-filtering#stage-2-amqp-filter-expressions + """ + + queue_name = "stream-example-with-sql-filter-queue" + logger.info("Creating connection") + environment = Environment("amqp://guest:guest@localhost:5672/") + connection = create_connection(environment) + management = connection.management() + # delete the queue if it exists + management.delete_queue(queue_name) + # create a stream queue + management.declare_queue(StreamSpecification(name=queue_name)) + + addr_queue = AddressHelper.queue_address(queue_name) + + consumer_connection = create_connection(environment) + sql = ( + "properties.subject LIKE '%in_the_filter%' " + "AND a_in_the_filter_key = 'a_in_the_filter_value'" + ) + + consumer = consumer_connection.consumer( + addr_queue, + message_handler=MyMessageHandler(), + stream_consumer_options=StreamConsumerOptions( + offset_specification=OffsetSpecification.first, + filter_options=StreamFilterOptions(sql=sql), + ), + ) + print( + "create a consumer and consume the test message - press control + c to terminate to consume" + ) + + # print("create a publisher and publish a test message") + publisher = connection.publisher(addr_queue) + + # publish messages won't match the filter + for i in range(MESSAGES_TO_PUBLISH): + publisher.publish(Message(Converter.string_to_bytes(body="apple: " + str(i)))) + + # publish messages that will match the filter + for i in range(MESSAGES_TO_PUBLISH): + msqMatch = Message( + body=Converter.string_to_bytes("the_right_one_sql"), + # will match due of % + subject="something_in_the_filter_{}".format(i), + application_properties={"a_in_the_filter_key": "a_in_the_filter_value"}, + ) + publisher.publish(msqMatch) + + publisher.close() + + while True: + try: + consumer.run() + except KeyboardInterrupt: + pass + except ConnectionClosed: + print("connection closed") + continue + except Exception as e: + print("consumer exited for exception " + str(e)) + + break + + # + logger.info("consumer exited, deleting queue") + management.delete_queue(queue_name) + + print("closing connections") + management.close() + print("after management closing") + environment.close() + print("after connection closing") + + +if __name__ == "__main__": + main() diff --git a/tests/test_streams.py b/tests/test_streams.py index 231f316..2e503fb 100644 --- a/tests/test_streams.py +++ b/tests/test_streams.py @@ -579,6 +579,7 @@ def test_stream_filter_sql(connection: Connection, environment: Environment) -> ) publisher.publish(msg) + # the only one that will match msqMatch = Message( body=Converter.string_to_bytes("the_right_one_sql"), subject="something_in_the_filter", From 2ac69ffe8b97c7b654bbe9b5c72708cf1402cb9c Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Mon, 15 Sep 2025 09:45:11 +0200 Subject: [PATCH 5/6] Update rabbitmq_amqp_python_client/entities.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- rabbitmq_amqp_python_client/entities.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/rabbitmq_amqp_python_client/entities.py b/rabbitmq_amqp_python_client/entities.py index f0ea81a..338263e 100644 --- a/rabbitmq_amqp_python_client/entities.py +++ b/rabbitmq_amqp_python_client/entities.py @@ -345,10 +345,9 @@ def _filter_sql(self, sql: str) -> None: Args: sql: SQL string to apply as a filter """ - if sql != "": - self._filter_set[symbol(SQL_FILTER)] = Described( - symbol(AMQP_SQL_FILTER), sql - ) + self._filter_set[symbol(SQL_FILTER)] = Described( + symbol(AMQP_SQL_FILTER), sql + ) def filter_set(self) -> Dict[symbol, Described]: """ From 7f18a55a355352f1ed70d8b57dfab778ec452ad5 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Mon, 15 Sep 2025 09:47:53 +0200 Subject: [PATCH 6/6] formatting Signed-off-by: Gabriele Santomaggio --- rabbitmq_amqp_python_client/entities.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/rabbitmq_amqp_python_client/entities.py b/rabbitmq_amqp_python_client/entities.py index 338263e..54491ba 100644 --- a/rabbitmq_amqp_python_client/entities.py +++ b/rabbitmq_amqp_python_client/entities.py @@ -345,9 +345,7 @@ def _filter_sql(self, sql: str) -> None: Args: sql: SQL string to apply as a filter """ - self._filter_set[symbol(SQL_FILTER)] = Described( - symbol(AMQP_SQL_FILTER), sql - ) + self._filter_set[symbol(SQL_FILTER)] = Described(symbol(AMQP_SQL_FILTER), sql) def filter_set(self) -> Dict[symbol, Described]: """