Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .ci/ubuntu/gha-setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ set -o xtrace
script_dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
readonly script_dir
echo "[INFO] script_dir: '$script_dir'"
readonly rabbitmq_image=rabbitmq:4.1.0-management
readonly rabbitmq_image=rabbitmq:4.2-rc-management


readonly docker_name_prefix='rabbitmq-amqp-python-client'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def main() -> None:
See: https://www.rabbitmq.com/docs/next/stream-filtering#stage-2-amqp-filter-expressions
"""

queue_name = "stream-example-with_filtering-queue"
queue_name = "stream-example-with-message-properties-filter-queue"
logger.info("Creating connection")
environment = Environment("amqp://guest:guest@localhost:5672/")
connection = create_connection(environment)
Expand Down
144 changes: 144 additions & 0 deletions examples/streams_with_sql_filters/example_streams_with_sql_filters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
# type: ignore
import logging

from rabbitmq_amqp_python_client import (
AddressHelper,
AMQPMessagingHandler,
Connection,
ConnectionClosed,
Converter,
Environment,
Event,
Message,
OffsetSpecification,
StreamConsumerOptions,
StreamFilterOptions,
StreamSpecification,
)

MESSAGES_TO_PUBLISH = 100


class MyMessageHandler(AMQPMessagingHandler):

def __init__(self):
super().__init__()
self._count = 0

def on_amqp_message(self, event: Event):
# only messages with banana filters and with subject yellow
# and application property from = italy get received
self._count = self._count + 1
logger.info(
"Received message: {}, subject {} application properties {} .[Total Consumed: {}]".format(
Converter.bytes_to_string(event.message.body),
event.message.subject,
event.message.application_properties,
self._count,
)
)
self.delivery_context.accept(event)

def on_connection_closed(self, event: Event):
# if you want you can add cleanup operations here
print("connection closed")

def on_link_closed(self, event: Event) -> None:
# if you want you can add cleanup operations here
print("link closed")


def create_connection(environment: Environment) -> Connection:
connection = environment.connection()
connection.dial()

return connection


logging.basicConfig()
logger = logging.getLogger("[streams_with_filters]")
logger.setLevel(logging.INFO)


def main() -> None:
"""
In this example we create a stream queue and a consumer with SQL filter

See: https://www.rabbitmq.com/docs/next/stream-filtering#stage-2-amqp-filter-expressions
"""

queue_name = "stream-example-with-sql-filter-queue"
logger.info("Creating connection")
environment = Environment("amqp://guest:guest@localhost:5672/")
connection = create_connection(environment)
management = connection.management()
# delete the queue if it exists
management.delete_queue(queue_name)
# create a stream queue
management.declare_queue(StreamSpecification(name=queue_name))

addr_queue = AddressHelper.queue_address(queue_name)

consumer_connection = create_connection(environment)
sql = (
"properties.subject LIKE '%in_the_filter%' "
"AND a_in_the_filter_key = 'a_in_the_filter_value'"
)

consumer = consumer_connection.consumer(
addr_queue,
message_handler=MyMessageHandler(),
stream_consumer_options=StreamConsumerOptions(
offset_specification=OffsetSpecification.first,
filter_options=StreamFilterOptions(sql=sql),
),
)
print(
"create a consumer and consume the test message - press control + c to terminate to consume"
)

# print("create a publisher and publish a test message")
publisher = connection.publisher(addr_queue)

# publish messages won't match the filter
for i in range(MESSAGES_TO_PUBLISH):
publisher.publish(Message(Converter.string_to_bytes(body="apple: " + str(i))))

# publish messages that will match the filter
for i in range(MESSAGES_TO_PUBLISH):
msqMatch = Message(
body=Converter.string_to_bytes("the_right_one_sql"),
# will match due of %
subject="something_in_the_filter_{}".format(i),
application_properties={"a_in_the_filter_key": "a_in_the_filter_value"},
)
publisher.publish(msqMatch)

publisher.close()

while True:
try:
consumer.run()
except KeyboardInterrupt:
pass
except ConnectionClosed:
print("connection closed")
continue
except Exception as e:
print("consumer exited for exception " + str(e))

break

#
logger.info("consumer exited, deleting queue")
management.delete_queue(queue_name)

print("closing connections")
management.close()
print("after management closing")
environment.close()
print("after connection closing")


if __name__ == "__main__":
main()
34 changes: 25 additions & 9 deletions rabbitmq_amqp_python_client/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from .exceptions import ValidationCodeException
from .qpid.proton._data import Described, symbol

SQL_FILTER = "sql-filter"
AMQP_SQL_FILTER = "amqp:sql-filter"
STREAM_FILTER_SPEC = "rabbitmq:stream-filter"
STREAM_OFFSET_SPEC = "rabbitmq:stream-offset-spec"
STREAM_FILTER_MATCH_UNFILTERED = "rabbitmq:stream-match-unfiltered"
Expand Down Expand Up @@ -159,7 +161,6 @@ class MessageProperties:
Attributes:
message_id: Uniquely identifies a message within the system (int, UUID, bytes, or str).
user_id: Identity of the user responsible for producing the message.
to: Intended destination node of the message.
subject: Summary information about the message content and purpose.
reply_to: Address of the node to send replies to.
correlation_id: Client-specific id for marking or identifying messages (int, UUID, bytes, or str).
Expand All @@ -174,7 +175,6 @@ class MessageProperties:

message_id: Optional[Union[int, str, bytes]] = None
user_id: Optional[bytes] = None
to: Optional[str] = None
subject: Optional[str] = None
reply_to: Optional[str] = None
correlation_id: Optional[Union[int, str, bytes]] = None
Expand Down Expand Up @@ -245,20 +245,24 @@ def __init__(
if offset_specification is not None:
self._offset(offset_specification)

if filter_options is not None and filter_options.values is not None:
if filter_options is None:
return

if filter_options.values is not None:
self._filter_values(filter_options.values)

if filter_options is not None and filter_options.match_unfiltered:
if filter_options.match_unfiltered:
self._filter_match_unfiltered(filter_options.match_unfiltered)

if filter_options is not None and filter_options.message_properties is not None:
if filter_options.message_properties is not None:
self._filter_message_properties(filter_options.message_properties)
if (
filter_options is not None
and filter_options.application_properties is not None
):

if filter_options.application_properties is not None:
self._filter_application_properties(filter_options.application_properties)

if filter_options.sql is not None and filter_options.sql != "":
self._filter_sql(filter_options.sql)

def _offset(self, offset_specification: Union[OffsetSpecification, int]) -> None:
"""
Set the offset specification for the stream.
Expand Down Expand Up @@ -334,6 +338,18 @@ def _filter_application_properties(
Described(symbol(AMQP_APPLICATION_PROPERTIES_FILTER), app_prop)
)

def _filter_sql(self, sql: str) -> None:
"""
Set SQL filter for the stream.

Args:
sql: SQL string to apply as a filter
"""
if sql != "":
self._filter_set[symbol(SQL_FILTER)] = Described(
symbol(AMQP_SQL_FILTER), sql
)

def filter_set(self) -> Dict[symbol, Described]:
"""
Get the current filter set configuration.
Expand Down
Loading