From 33479c2c68048d1839d2b4d8a28617b734b875e4 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Fri, 28 Nov 2025 12:30:38 +0100 Subject: [PATCH 01/13] work in progress Signed-off-by: Gabriele Santomaggio --- pyproject.toml | 4 +-- rabbitmq_amqp_python_client/consumer.py | 29 ++++++++++++++----- rabbitmq_amqp_python_client/options.py | 21 ++++++++++++-- .../qpid/proton/_utils.py | 8 +++++ 4 files changed, 50 insertions(+), 12 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 3479f1a..16de4ec 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,7 +8,7 @@ readme = "README.md" [tool.poetry.dependencies] python = "^3.9" -python-qpid-proton = "^0.39.0" +python-qpid-proton = "^0.40.0" typing-extensions = "^4.13.0" packaging = "^23.0" @@ -21,7 +21,7 @@ isort = "^5.9.3" mypy = "^0.910" pytest = "^8.3.4" black = "^24.3.0" -python-qpid-proton = "^0.39.0" +python-qpid-proton = "^0.40.0" requests = "^2.31.0" pytest-asyncio = "^1.2.0" diff --git a/rabbitmq_amqp_python_client/consumer.py b/rabbitmq_amqp_python_client/consumer.py index 3efd8f3..b11d119 100644 --- a/rabbitmq_amqp_python_client/consumer.py +++ b/rabbitmq_amqp_python_client/consumer.py @@ -34,12 +34,13 @@ class Consumer: """ def __init__( - self, - conn: BlockingConnection, - addr: str, - handler: Optional[AMQPMessagingHandler] = None, - stream_options: Optional[ConsumerOptions] = None, - credit: Optional[int] = None, + self, + conn: BlockingConnection, + addr: str, + handler: Optional[AMQPMessagingHandler] = None, + stream_options: Optional[ConsumerOptions] = None, + credit: Optional[int] = None, + direct_reply_to: Optional[bool] = None, ): """ Initialize a new Consumer instance. @@ -58,6 +59,7 @@ def __init__( self._stream_options = stream_options self._credit = credit self._consumers: list[Consumer] = [] + self._direct_reply_to = direct_reply_to self._open() def _open(self) -> None: @@ -144,16 +146,27 @@ def stop(self) -> None: def _create_receiver(self, addr: str) -> BlockingReceiver: logger.debug("Creating the receiver") + if self._direct_reply_to is None: + self._direct_reply_to = True + + if self._direct_reply_to: + x = self._conn.create_dynamic_receiver() + # print(x.link.remote_source.address) + return x + if self._stream_options is None: receiver = self._conn.create_receiver( - addr, options=ReceiverOptionUnsettled(addr), handler=self._handler + addr, + options=ReceiverOptionUnsettled(addr), + handler=self._handler, + dynamic=self._direct_reply_to, ) - else: receiver = self._conn.create_receiver( addr, options=ReceiverOptionUnsettledWithFilters(addr, self._stream_options), handler=self._handler, + dynamic=self._direct_reply_to, ) if self._credit is not None: diff --git a/rabbitmq_amqp_python_client/options.py b/rabbitmq_amqp_python_client/options.py index 9d49ebe..d8ed444 100644 --- a/rabbitmq_amqp_python_client/options.py +++ b/rabbitmq_amqp_python_client/options.py @@ -1,9 +1,9 @@ from .entities import ConsumerOptions from .qpid.proton._data import ( # noqa: E402 PropertyDict, - symbol, + symbol, Data, ) -from .qpid.proton._endpoints import Link # noqa: E402 +from .qpid.proton._endpoints import Link, Terminus # noqa: E402 from .qpid.proton.reactor import ( # noqa: E402 Filter, LinkOption, @@ -52,6 +52,23 @@ def apply(self, link: Link) -> None: link.source.dynamic = False +class DynamicReceiverOption(LinkOption): # type: ignore + def __init__(self): + pass + + def apply(self, link: Link) -> None: + link.snd_settle_mode = Link.SND_SETTLED + link.rcv_settle_mode = Link.RCV_FIRST + link.source.expiry_policy = Terminus.EXPIRE_WITH_LINK + link.properties = PropertyDict({symbol("paired"): True}) + link.source.dynamic = True + data = link.source.capabilities + data.put_array(False, Data.SYMBOL) + data.enter() + data.put_string("rabbitmq:volatile-queue") + data.exit() + + class ReceiverOptionUnsettled(LinkOption): # type: ignore def __init__(self, addr: str): self._addr = addr diff --git a/rabbitmq_amqp_python_client/qpid/proton/_utils.py b/rabbitmq_amqp_python_client/qpid/proton/_utils.py index 25435a7..946bdf3 100644 --- a/rabbitmq_amqp_python_client/qpid/proton/_utils.py +++ b/rabbitmq_amqp_python_client/qpid/proton/_utils.py @@ -44,6 +44,7 @@ ) from ._reactor import Container from ._url import Url +from ...options import DynamicReceiverOption try: from typing import Literal @@ -505,6 +506,11 @@ def create_sender( ), ) + def create_dynamic_receiver(self, credit: Optional[int] = None): + return self.create_receiver( + credit=credit, dynamic=True, options=DynamicReceiverOption(), name="dynamic-receiver" + ) + def create_receiver( self, address: Optional[str] = None, @@ -514,6 +520,8 @@ def create_receiver( name: Optional[str] = None, options: Optional[ Union[ + "DynamicReceiverOption", + List["DynamicReceiverOption"], "ReceiverOption", List["ReceiverOption"], "LinkOption", From 2ab1c3ab6ff459a287b59df75792eacbd58d1f24 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Sun, 30 Nov 2025 21:59:30 +0100 Subject: [PATCH 02/13] direct reply to Signed-off-by: Gabriele Santomaggio --- .../direct_reply_queue/direct_reply_to.py | 88 ++++++++++++++++++ rabbitmq_amqp_python_client/__init__.py | 2 + rabbitmq_amqp_python_client/connection.py | 11 ++- rabbitmq_amqp_python_client/consumer.py | 92 ++++++++++++------- rabbitmq_amqp_python_client/entities.py | 18 ++++ rabbitmq_amqp_python_client/options.py | 12 ++- rabbitmq_amqp_python_client/publisher.py | 2 +- .../qpid/proton/_utils.py | 10 +- tests/direct_reply_to/__init__.py | 0 tests/direct_reply_to/test_direct_reply.py | 9 ++ 10 files changed, 199 insertions(+), 45 deletions(-) create mode 100644 examples/direct_reply_queue/direct_reply_to.py create mode 100644 tests/direct_reply_to/__init__.py create mode 100644 tests/direct_reply_to/test_direct_reply.py diff --git a/examples/direct_reply_queue/direct_reply_to.py b/examples/direct_reply_queue/direct_reply_to.py new file mode 100644 index 0000000..6338be4 --- /dev/null +++ b/examples/direct_reply_queue/direct_reply_to.py @@ -0,0 +1,88 @@ +# type: ignore + + +from rabbitmq_amqp_python_client import ( # PosixSSlConfigurationContext,; PosixClientCert, + AMQPMessagingHandler, + Connection, + Converter, + DirectReplyToConsumerOptions, + Environment, + Event, + Message, + OutcomeState, +) + +MESSAGES_TO_PUBLISH = 200 + + +class MyMessageHandler(AMQPMessagingHandler): + + def __init__(self): + super().__init__() + self._count = 0 + + def on_amqp_message(self, event: Event): + print( + "received message: {} ".format( + Converter.bytes_to_string(event.message.body) + ) + ) + + # accepting + self.delivery_context.accept(event) + + self._count = self._count + 1 + print("count " + str(self._count)) + + if self._count == MESSAGES_TO_PUBLISH: + print("received all messages") + + def on_connection_closed(self, event: Event): + # if you want you can add cleanup operations here + print("connection closed") + + def on_link_closed(self, event: Event) -> None: + # if you want you can add cleanup operations here + print("link closed") + + +def create_connection(environment: Environment) -> Connection: + connection = environment.connection() + connection.dial() + return connection + + +def main() -> None: + print("connection to amqp server") + environment = Environment(uri="amqp://guest:guest@localhost:5672/") + connection = create_connection(environment) + consumer = connection.consumer(message_handler=MyMessageHandler(), + consumer_options=DirectReplyToConsumerOptions()) + addr = consumer.get_queue_address() + print("connecting to address: {}".format(addr)) + publisher = create_connection(environment).publisher(addr) + + for i in range(MESSAGES_TO_PUBLISH): + msg = Message( + body=Converter.string_to_bytes("test message {} ".format(i))) + status = publisher.publish(msg) + if status.remote_state == OutcomeState.ACCEPTED: + print("message accepted") + elif status.remote_state == OutcomeState.RELEASED: + print("message not routed") + elif status.remote_state == OutcomeState.REJECTED: + print("message not rejected") + + try: + consumer.run() + except KeyboardInterrupt: + pass + + consumer.close() + + connection.close() + print("after connection closing") + + +if __name__ == "__main__": + main() diff --git a/rabbitmq_amqp_python_client/__init__.py b/rabbitmq_amqp_python_client/__init__.py index 0ec9cb4..6c1c30e 100644 --- a/rabbitmq_amqp_python_client/__init__.py +++ b/rabbitmq_amqp_python_client/__init__.py @@ -14,6 +14,7 @@ from .consumer import Consumer from .entities import ( ConsumerOptions, + DirectReplyToConsumerOptions, ExchangeCustomSpecification, ExchangeSpecification, ExchangeToExchangeBindingSpecification, @@ -72,6 +73,7 @@ "QuorumQueueSpecification", "ClassicQueueSpecification", "StreamSpecification", + "DirectReplyToConsumerOptions", "ExchangeToQueueBindingSpecification", "ExchangeToExchangeBindingSpecification", "QueueType", diff --git a/rabbitmq_amqp_python_client/connection.py b/rabbitmq_amqp_python_client/connection.py index e5f347d..2d45cfc 100644 --- a/rabbitmq_amqp_python_client/connection.py +++ b/rabbitmq_amqp_python_client/connection.py @@ -17,6 +17,7 @@ from .consumer import Consumer from .entities import ( ConsumerOptions, + DirectReplyToConsumerOptions, OAuth2Options, RecoveryConfiguration, ) @@ -397,9 +398,13 @@ def consumer( Consumer: A new consumer instance Raises: - ArgumentOutOfRangeException: If destination address format is invalid + ArgumentOutOfRangeException: If destination address format is invalid. + Only applies if not using Direct Reply-to. + The server will provide the queue name in that case. """ - if not validate_address(destination): + if not validate_address(destination) and not isinstance( + consumer_options, DirectReplyToConsumerOptions + ): raise ArgumentOutOfRangeException( "destination address must start with /queues or /exchanges" ) @@ -438,9 +443,7 @@ def _on_disconnection(self) -> None: time.sleep(delay.total_seconds()) try: - self._open_connections(reconnect_handlers=True) - self._connections.append(self) except ConnectionException as e: diff --git a/rabbitmq_amqp_python_client/consumer.py b/rabbitmq_amqp_python_client/consumer.py index b11d119..ed0a646 100644 --- a/rabbitmq_amqp_python_client/consumer.py +++ b/rabbitmq_amqp_python_client/consumer.py @@ -1,8 +1,14 @@ import logging from typing import Literal, Optional, Union, cast +from pika.exceptions import AMQPError + from .amqp_consumer_handler import AMQPMessagingHandler -from .entities import ConsumerOptions +from .entities import ( + ConsumerOptions, + DirectReplyToConsumerOptions, + StreamConsumerOptions, +) from .options import ( ReceiverOptionUnsettled, ReceiverOptionUnsettledWithFilters, @@ -29,18 +35,17 @@ class Consumer: _conn (BlockingConnection): The underlying connection to RabbitMQ _addr (str): The address to consume from _handler (Optional[MessagingHandler]): Optional message handling callback - _stream_options (Optional[StreamConsumerOptions]): Configuration for stream consumption + _consumer_options (Optional[StreamConsumerOptions]): Configuration for stream consumption _credit (Optional[int]): Flow control credit value """ def __init__( - self, - conn: BlockingConnection, - addr: str, - handler: Optional[AMQPMessagingHandler] = None, - stream_options: Optional[ConsumerOptions] = None, - credit: Optional[int] = None, - direct_reply_to: Optional[bool] = None, + self, + conn: BlockingConnection, + addr: str, + handler: Optional[AMQPMessagingHandler] = None, + consumer_options: Optional[ConsumerOptions] = None, + credit: Optional[int] = None, ): """ Initialize a new Consumer instance. @@ -49,17 +54,16 @@ def __init__( conn: The blocking connection to use for consuming addr: The address to consume from handler: Optional message handler for processing received messages - stream_options: Optional configuration for stream-based consumption + consumer_options: Optional configuration for stream-based consumption credit: Optional credit value for flow control """ self._receiver: Optional[BlockingReceiver] = None self._conn = conn self._addr = addr self._handler = handler - self._stream_options = stream_options + self._consumer_options = consumer_options self._credit = credit self._consumers: list[Consumer] = [] - self._direct_reply_to = direct_reply_to self._open() def _open(self) -> None: @@ -67,9 +71,21 @@ def _open(self) -> None: logger.debug("Creating Receiver") self._receiver = self._create_receiver(self._addr) + def get_queue_address(self) -> Optional[str]: + """ + Get the name of the queue from the address. + + Returns: + str: The name of the queue. + """ + if self._receiver is not None: + return cast(Optional[str], self._receiver.link.remote_source.address) + else: + raise AMQPError("Receiver is not initialized") + def _update_connection(self, conn: BlockingConnection) -> None: self._conn = conn - if self._stream_options is None: + if self._consumer_options is None: logger.debug("creating new receiver without stream") self._receiver = self._conn.create_receiver( self._addr, @@ -78,11 +94,11 @@ def _update_connection(self, conn: BlockingConnection) -> None: ) else: logger.debug("creating new stream receiver") - self._stream_options.offset(self._handler.offset - 1) # type: ignore + self._consumer_options.offset(self._handler.offset - 1) # type: ignore self._receiver = self._conn.create_receiver( self._addr, options=ReceiverOptionUnsettledWithFilters( - self._addr, self._stream_options + self._addr, self._consumer_options ), handler=self._handler, ) @@ -145,34 +161,44 @@ def stop(self) -> None: self._receiver.container.stop() def _create_receiver(self, addr: str) -> BlockingReceiver: - logger.debug("Creating the receiver") - if self._direct_reply_to is None: - self._direct_reply_to = True + credit = 100 + if self._credit is not None: + credit = self._credit - if self._direct_reply_to: - x = self._conn.create_dynamic_receiver() - # print(x.link.remote_source.address) - return x + if self._consumer_options is not None: + logger.debug( + "Creating the receiver, with options: %s", + type(self._consumer_options).__name__, + ) + else: + logger.debug("Creating the receiver, without options") - if self._stream_options is None: - receiver = self._conn.create_receiver( + if self._consumer_options is None: + return self._conn.create_receiver( addr, options=ReceiverOptionUnsettled(addr), handler=self._handler, - dynamic=self._direct_reply_to, + credit=credit, ) - else: - receiver = self._conn.create_receiver( + + if isinstance(self._consumer_options, DirectReplyToConsumerOptions): + print("Creating dynamic receiver for direct reply-to") + x = self._conn.create_dynamic_receiver(100, handler=self._handler) + x.credit = credit + return x + + if isinstance(self._consumer_options, StreamConsumerOptions): + return self._conn.create_receiver( addr, - options=ReceiverOptionUnsettledWithFilters(addr, self._stream_options), + options=ReceiverOptionUnsettledWithFilters( + addr, self._consumer_options + ), handler=self._handler, - dynamic=self._direct_reply_to, ) - if self._credit is not None: - receiver.credit = self._credit - - return receiver + raise AMQPError( + "Receiver is not initialized. No valid consumer options provided." + ) @property def address(self) -> str: diff --git a/rabbitmq_amqp_python_client/entities.py b/rabbitmq_amqp_python_client/entities.py index 8b82e4f..d5f8632 100644 --- a/rabbitmq_amqp_python_client/entities.py +++ b/rabbitmq_amqp_python_client/entities.py @@ -160,6 +160,9 @@ def validate(self, versions: Dict[str, bool]) -> None: def filter_set(self) -> Dict[symbol, Described]: raise NotImplementedError("Subclasses should implement this method") + def direct_reply_to(self) -> bool: + return False + @dataclass class MessageProperties: @@ -400,6 +403,21 @@ def validate(self, versions: Dict[str, bool]) -> None: ) +class DirectReplyToConsumerOptions(ConsumerOptions): + + def validate(self, versions: Dict[str, bool]) -> None: + if not versions.get("4.2.0", False): + raise ValidationCodeException( + "Direct Reply-To requires RabbitMQ 4.2.0 or higher" + ) + + def filter_set(self) -> Dict[symbol, Described]: + return {} + + def direct_reply_to(self) -> bool: + return True + + @dataclass class RecoveryConfiguration: """ diff --git a/rabbitmq_amqp_python_client/options.py b/rabbitmq_amqp_python_client/options.py index d8ed444..11857a7 100644 --- a/rabbitmq_amqp_python_client/options.py +++ b/rabbitmq_amqp_python_client/options.py @@ -1,9 +1,13 @@ from .entities import ConsumerOptions from .qpid.proton._data import ( # noqa: E402 + Data, PropertyDict, - symbol, Data, + symbol, +) +from .qpid.proton._endpoints import ( # noqa: E402 + Link, + Terminus, ) -from .qpid.proton._endpoints import Link, Terminus # noqa: E402 from .qpid.proton.reactor import ( # noqa: E402 Filter, LinkOption, @@ -53,12 +57,10 @@ def apply(self, link: Link) -> None: class DynamicReceiverOption(LinkOption): # type: ignore - def __init__(self): - pass def apply(self, link: Link) -> None: link.snd_settle_mode = Link.SND_SETTLED - link.rcv_settle_mode = Link.RCV_FIRST + # link.rcv_settle_mode = Link.RCV_FIRST link.source.expiry_policy = Terminus.EXPIRE_WITH_LINK link.properties = PropertyDict({symbol("paired"): True}) link.source.dynamic = True diff --git a/rabbitmq_amqp_python_client/publisher.py b/rabbitmq_amqp_python_client/publisher.py index ccb4486..455d9ed 100644 --- a/rabbitmq_amqp_python_client/publisher.py +++ b/rabbitmq_amqp_python_client/publisher.py @@ -94,7 +94,7 @@ def publish(self, message: Message) -> Delivery: return self._sender.send(message) else: if message.address != "": - if validate_address(message.address) is False: + if not validate_address(message.address): raise ArgumentOutOfRangeException( "destination address must start with /queues or /exchanges" ) diff --git a/rabbitmq_amqp_python_client/qpid/proton/_utils.py b/rabbitmq_amqp_python_client/qpid/proton/_utils.py index 946bdf3..b1f1089 100644 --- a/rabbitmq_amqp_python_client/qpid/proton/_utils.py +++ b/rabbitmq_amqp_python_client/qpid/proton/_utils.py @@ -506,9 +506,15 @@ def create_sender( ), ) - def create_dynamic_receiver(self, credit: Optional[int] = None): + def create_dynamic_receiver( + self, credit: Optional[int] = None, handler: Optional[Handler] = None + ): return self.create_receiver( - credit=credit, dynamic=True, options=DynamicReceiverOption(), name="dynamic-receiver" + credit=credit, + dynamic=True, + options=DynamicReceiverOption(), + handler=handler, + name="dynamic-receiver_" + str(id(self)), ) def create_receiver( diff --git a/tests/direct_reply_to/__init__.py b/tests/direct_reply_to/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/direct_reply_to/test_direct_reply.py b/tests/direct_reply_to/test_direct_reply.py new file mode 100644 index 0000000..668f243 --- /dev/null +++ b/tests/direct_reply_to/test_direct_reply.py @@ -0,0 +1,9 @@ +from rabbitmq_amqp_python_client import ( + Connection, + DirectReplyToConsumerOptions, +) + + +def test_consumer_create_reply_name(connection: Connection) -> None: + consumer = connection.consumer("", consumer_options=DirectReplyToConsumerOptions()) + assert "/queues/amq.rabbitmq.reply-to." in consumer.get_queue_address() From cee2566ddcb395b7383d9d2240d3b96f84d27dcc Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Sun, 30 Nov 2025 22:00:54 +0100 Subject: [PATCH 03/13] direct reply to Signed-off-by: Gabriele Santomaggio --- poetry.lock | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/poetry.lock b/poetry.lock index b3b615c..48cbdf1 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 2.2.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 2.1.2 and should not be changed by hand. [[package]] name = "backports-asyncio-runner" @@ -602,22 +602,22 @@ testing = ["coverage (>=6.2)", "hypothesis (>=5.7.1)"] [[package]] name = "python-qpid-proton" -version = "0.39.0" +version = "0.40.0" description = "An AMQP based messaging library." optional = false python-versions = "*" groups = ["main", "dev"] files = [ - {file = "python-qpid-proton-0.39.0.tar.gz", hash = "sha256:362055ae6ab4c7f1437247c602757f30328d55c0a6986d5b68ca9798de9fce02"}, - {file = "python_qpid_proton-0.39.0-cp38-abi3-macosx_11_0_x86_64.whl", hash = "sha256:f69da296ffa9e3b22f88a53fe9e27c4f4844e088a9f041061bd4f75f74f2a0af"}, - {file = "python_qpid_proton-0.39.0-cp38-abi3-win_amd64.whl", hash = "sha256:d052e85ffbc817d4db973fae230d8a80732d444e0abbac55360ad4beb181cb43"}, + {file = "python_qpid_proton-0.40.0-cp312-cp312-macosx_13_0_x86_64.whl", hash = "sha256:fe56211c6dcc7ea1fb9d78a017208a4c08043cd901780b6602a74ff70f38bf1f"}, + {file = "python_qpid_proton-0.40.0-cp313-cp313-win_amd64.whl", hash = "sha256:a19d8c71c908700ceb38f6cbc1eb4a039428570f96bfc2caeeafdfec804fb94f"}, + {file = "python_qpid_proton-0.40.0.tar.gz", hash = "sha256:7680d607cf6e9684f97bf5b2ba16cda7d8512aab9e4ff78f98d44a4644fc819a"}, ] [package.dependencies] cffi = ">=1.0.0" [package.extras] -opentracing = ["jaeger-client", "opentracing"] +opentracing = ["jaeger_client", "opentracing"] [[package]] name = "requests" @@ -729,4 +729,4 @@ zstd = ["zstandard (>=0.18.0)"] [metadata] lock-version = "2.1" python-versions = "^3.9" -content-hash = "6855640542dddf03775cf0ecc647aa2e277b471618471e31a382012117ea76ce" +content-hash = "46cb7621d2b4b109705a4bbfc39c5c06150581644a4d3a480f59f41abd3e928c" From aacd98223e6c50d1cfe12c6e5158eb7b51d9a235 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Sun, 30 Nov 2025 22:05:01 +0100 Subject: [PATCH 04/13] direct reply to Signed-off-by: Gabriele Santomaggio --- examples/direct_reply_queue/direct_reply_to.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/examples/direct_reply_queue/direct_reply_to.py b/examples/direct_reply_queue/direct_reply_to.py index 6338be4..5f4383b 100644 --- a/examples/direct_reply_queue/direct_reply_to.py +++ b/examples/direct_reply_queue/direct_reply_to.py @@ -56,15 +56,16 @@ def main() -> None: print("connection to amqp server") environment = Environment(uri="amqp://guest:guest@localhost:5672/") connection = create_connection(environment) - consumer = connection.consumer(message_handler=MyMessageHandler(), - consumer_options=DirectReplyToConsumerOptions()) + consumer = connection.consumer( + message_handler=MyMessageHandler(), + consumer_options=DirectReplyToConsumerOptions(), + ) addr = consumer.get_queue_address() print("connecting to address: {}".format(addr)) publisher = create_connection(environment).publisher(addr) for i in range(MESSAGES_TO_PUBLISH): - msg = Message( - body=Converter.string_to_bytes("test message {} ".format(i))) + msg = Message(body=Converter.string_to_bytes("test message {} ".format(i))) status = publisher.publish(msg) if status.remote_state == OutcomeState.ACCEPTED: print("message accepted") From d9c90737852d0186f1f5627e5b6bbf23d7f63ecc Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Sun, 30 Nov 2025 23:08:08 +0100 Subject: [PATCH 05/13] Update rabbitmq_amqp_python_client/consumer.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- rabbitmq_amqp_python_client/consumer.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/rabbitmq_amqp_python_client/consumer.py b/rabbitmq_amqp_python_client/consumer.py index ed0a646..a106255 100644 --- a/rabbitmq_amqp_python_client/consumer.py +++ b/rabbitmq_amqp_python_client/consumer.py @@ -183,9 +183,9 @@ def _create_receiver(self, addr: str) -> BlockingReceiver: if isinstance(self._consumer_options, DirectReplyToConsumerOptions): print("Creating dynamic receiver for direct reply-to") - x = self._conn.create_dynamic_receiver(100, handler=self._handler) - x.credit = credit - return x + dynamic_receiver = self._conn.create_dynamic_receiver(100, handler=self._handler) + dynamic_receiver.credit = credit + return dynamic_receiver if isinstance(self._consumer_options, StreamConsumerOptions): return self._conn.create_receiver( From fd7a06f92c296b286a6ae2764a20795fafc8776f Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Mon, 1 Dec 2025 10:03:56 +0100 Subject: [PATCH 06/13] Update rabbitmq_amqp_python_client/consumer.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- rabbitmq_amqp_python_client/consumer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rabbitmq_amqp_python_client/consumer.py b/rabbitmq_amqp_python_client/consumer.py index a106255..8e2dc40 100644 --- a/rabbitmq_amqp_python_client/consumer.py +++ b/rabbitmq_amqp_python_client/consumer.py @@ -182,7 +182,7 @@ def _create_receiver(self, addr: str) -> BlockingReceiver: ) if isinstance(self._consumer_options, DirectReplyToConsumerOptions): - print("Creating dynamic receiver for direct reply-to") + logger.debug("Creating dynamic receiver for direct reply-to") dynamic_receiver = self._conn.create_dynamic_receiver(100, handler=self._handler) dynamic_receiver.credit = credit return dynamic_receiver From 643cec06b7910952ed8f56d60401111034a3ba97 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Mon, 1 Dec 2025 10:05:13 +0100 Subject: [PATCH 07/13] Update examples/direct_reply_queue/direct_reply_to.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- examples/direct_reply_queue/direct_reply_to.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/direct_reply_queue/direct_reply_to.py b/examples/direct_reply_queue/direct_reply_to.py index 5f4383b..326556d 100644 --- a/examples/direct_reply_queue/direct_reply_to.py +++ b/examples/direct_reply_queue/direct_reply_to.py @@ -1,7 +1,7 @@ # type: ignore -from rabbitmq_amqp_python_client import ( # PosixSSlConfigurationContext,; PosixClientCert, +from rabbitmq_amqp_python_client import ( AMQPMessagingHandler, Connection, Converter, From acaf44d2c44813e182c6b097a471bb48e7b5bb5d Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Mon, 1 Dec 2025 10:05:31 +0100 Subject: [PATCH 08/13] Update examples/direct_reply_queue/direct_reply_to.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- examples/direct_reply_queue/direct_reply_to.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/direct_reply_queue/direct_reply_to.py b/examples/direct_reply_queue/direct_reply_to.py index 326556d..5ee6107 100644 --- a/examples/direct_reply_queue/direct_reply_to.py +++ b/examples/direct_reply_queue/direct_reply_to.py @@ -72,7 +72,7 @@ def main() -> None: elif status.remote_state == OutcomeState.RELEASED: print("message not routed") elif status.remote_state == OutcomeState.REJECTED: - print("message not rejected") + print("message rejected") try: consumer.run() From 9f8f31f89a745ca024b7eea505633b126ecd6e3f Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Mon, 1 Dec 2025 10:08:09 +0100 Subject: [PATCH 09/13] remove pika Exception Signed-off-by: Gabriele Santomaggio --- rabbitmq_amqp_python_client/consumer.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/rabbitmq_amqp_python_client/consumer.py b/rabbitmq_amqp_python_client/consumer.py index ed0a646..4b1018e 100644 --- a/rabbitmq_amqp_python_client/consumer.py +++ b/rabbitmq_amqp_python_client/consumer.py @@ -1,7 +1,6 @@ import logging from typing import Literal, Optional, Union, cast -from pika.exceptions import AMQPError from .amqp_consumer_handler import AMQPMessagingHandler from .entities import ( @@ -81,7 +80,7 @@ def get_queue_address(self) -> Optional[str]: if self._receiver is not None: return cast(Optional[str], self._receiver.link.remote_source.address) else: - raise AMQPError("Receiver is not initialized") + raise Exception("Receiver is not initialized") def _update_connection(self, conn: BlockingConnection) -> None: self._conn = conn From 4757a78a75c913985fe7074d8bdb3efbe511e885 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Mon, 1 Dec 2025 10:39:17 +0100 Subject: [PATCH 10/13] Address optional Signed-off-by: Gabriele Santomaggio --- rabbitmq_amqp_python_client/connection.py | 9 ++--- rabbitmq_amqp_python_client/consumer.py | 43 ++++++++++------------ rabbitmq_amqp_python_client/options.py | 6 ++- tests/direct_reply_to/test_direct_reply.py | 4 +- 4 files changed, 28 insertions(+), 34 deletions(-) diff --git a/rabbitmq_amqp_python_client/connection.py b/rabbitmq_amqp_python_client/connection.py index 2d45cfc..7fbdbee 100644 --- a/rabbitmq_amqp_python_client/connection.py +++ b/rabbitmq_amqp_python_client/connection.py @@ -17,7 +17,6 @@ from .consumer import Consumer from .entities import ( ConsumerOptions, - DirectReplyToConsumerOptions, OAuth2Options, RecoveryConfiguration, ) @@ -380,7 +379,7 @@ def publisher(self, destination: str = "") -> Publisher: def consumer( self, - destination: str, + destination: Optional[str] = None, message_handler: Optional[MessagingHandler] = None, consumer_options: Optional[ConsumerOptions] = None, credit: Optional[int] = None, @@ -389,7 +388,7 @@ def consumer( Create a new consumer instance. Args: - destination: The address to consume from + destination: Optional The address to consume from message_handler: Optional handler for processing messages consumer_options: Optional configuration for queue consumption. Each queue has its own consumer options. credit: Optional credit value for flow control @@ -402,9 +401,7 @@ def consumer( Only applies if not using Direct Reply-to. The server will provide the queue name in that case. """ - if not validate_address(destination) and not isinstance( - consumer_options, DirectReplyToConsumerOptions - ): + if destination is not None and not validate_address(destination): raise ArgumentOutOfRangeException( "destination address must start with /queues or /exchanges" ) diff --git a/rabbitmq_amqp_python_client/consumer.py b/rabbitmq_amqp_python_client/consumer.py index 5975a7e..994ae68 100644 --- a/rabbitmq_amqp_python_client/consumer.py +++ b/rabbitmq_amqp_python_client/consumer.py @@ -1,7 +1,6 @@ import logging from typing import Literal, Optional, Union, cast - from .amqp_consumer_handler import AMQPMessagingHandler from .entities import ( ConsumerOptions, @@ -41,7 +40,7 @@ class Consumer: def __init__( self, conn: BlockingConnection, - addr: str, + addr: Optional[str] = None, handler: Optional[AMQPMessagingHandler] = None, consumer_options: Optional[ConsumerOptions] = None, credit: Optional[int] = None, @@ -70,34 +69,26 @@ def _open(self) -> None: logger.debug("Creating Receiver") self._receiver = self._create_receiver(self._addr) - def get_queue_address(self) -> Optional[str]: - """ - Get the name of the queue from the address. - - Returns: - str: The name of the queue. - """ - if self._receiver is not None: - return cast(Optional[str], self._receiver.link.remote_source.address) - else: - raise Exception("Receiver is not initialized") - def _update_connection(self, conn: BlockingConnection) -> None: + addr = "" + if self._addr is not None: + addr = self._addr + self._conn = conn if self._consumer_options is None: logger.debug("creating new receiver without stream") self._receiver = self._conn.create_receiver( - self._addr, - options=ReceiverOptionUnsettled(self._addr), + addr, + options=ReceiverOptionUnsettled(addr), handler=self._handler, ) else: logger.debug("creating new stream receiver") self._consumer_options.offset(self._handler.offset - 1) # type: ignore self._receiver = self._conn.create_receiver( - self._addr, + addr, options=ReceiverOptionUnsettledWithFilters( - self._addr, self._consumer_options + addr, self._consumer_options ), handler=self._handler, ) @@ -159,7 +150,7 @@ def stop(self) -> None: self._receiver.container.stop_events() self._receiver.container.stop() - def _create_receiver(self, addr: str) -> BlockingReceiver: + def _create_receiver(self, addr: Optional[str] = None) -> BlockingReceiver: credit = 100 if self._credit is not None: credit = self._credit @@ -182,7 +173,9 @@ def _create_receiver(self, addr: str) -> BlockingReceiver: if isinstance(self._consumer_options, DirectReplyToConsumerOptions): logger.debug("Creating dynamic receiver for direct reply-to") - dynamic_receiver = self._conn.create_dynamic_receiver(100, handler=self._handler) + dynamic_receiver = self._conn.create_dynamic_receiver( + 100, handler=self._handler + ) dynamic_receiver.credit = credit return dynamic_receiver @@ -195,14 +188,16 @@ def _create_receiver(self, addr: str) -> BlockingReceiver: handler=self._handler, ) - raise AMQPError( + raise Exception( "Receiver is not initialized. No valid consumer options provided." ) @property - def address(self) -> str: - """Get the current publisher address.""" - return self._addr + def address(self) -> Optional[str]: + if self._receiver is not None: + return cast(Optional[str], self._receiver.link.remote_source.address) + else: + raise Exception("Receiver is not initialized") @property def handler(self) -> Optional[AMQPMessagingHandler]: diff --git a/rabbitmq_amqp_python_client/options.py b/rabbitmq_amqp_python_client/options.py index 11857a7..a8a2134 100644 --- a/rabbitmq_amqp_python_client/options.py +++ b/rabbitmq_amqp_python_client/options.py @@ -1,3 +1,5 @@ +from typing import Optional + from .entities import ConsumerOptions from .qpid.proton._data import ( # noqa: E402 Data, @@ -72,7 +74,7 @@ def apply(self, link: Link) -> None: class ReceiverOptionUnsettled(LinkOption): # type: ignore - def __init__(self, addr: str): + def __init__(self, addr: Optional[str]): self._addr = addr def apply(self, link: Link) -> None: @@ -87,7 +89,7 @@ def test(self, link: Link) -> bool: class ReceiverOptionUnsettledWithFilters(Filter): # type: ignore - def __init__(self, addr: str, consumer_options: ConsumerOptions): + def __init__(self, addr: Optional[str], consumer_options: ConsumerOptions): super().__init__(consumer_options.filter_set()) self._addr = addr diff --git a/tests/direct_reply_to/test_direct_reply.py b/tests/direct_reply_to/test_direct_reply.py index 668f243..c95a0f9 100644 --- a/tests/direct_reply_to/test_direct_reply.py +++ b/tests/direct_reply_to/test_direct_reply.py @@ -5,5 +5,5 @@ def test_consumer_create_reply_name(connection: Connection) -> None: - consumer = connection.consumer("", consumer_options=DirectReplyToConsumerOptions()) - assert "/queues/amq.rabbitmq.reply-to." in consumer.get_queue_address() + consumer = connection.consumer(consumer_options=DirectReplyToConsumerOptions()) + assert "/queues/amq.rabbitmq.reply-to." in consumer.address From 0ec35bb6b14d755cf2d9a029933005352dbefefa Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Mon, 1 Dec 2025 11:12:03 +0100 Subject: [PATCH 11/13] test Signed-off-by: Gabriele Santomaggio --- .../direct_reply_queue/direct_reply_to.py | 2 +- tests/direct_reply_to/test_direct_reply.py | 48 +++++++++++++++++++ 2 files changed, 49 insertions(+), 1 deletion(-) diff --git a/examples/direct_reply_queue/direct_reply_to.py b/examples/direct_reply_queue/direct_reply_to.py index 5ee6107..6647ebc 100644 --- a/examples/direct_reply_queue/direct_reply_to.py +++ b/examples/direct_reply_queue/direct_reply_to.py @@ -60,7 +60,7 @@ def main() -> None: message_handler=MyMessageHandler(), consumer_options=DirectReplyToConsumerOptions(), ) - addr = consumer.get_queue_address() + addr = consumer.address print("connecting to address: {}".format(addr)) publisher = create_connection(environment).publisher(addr) diff --git a/tests/direct_reply_to/test_direct_reply.py b/tests/direct_reply_to/test_direct_reply.py index c95a0f9..d77363e 100644 --- a/tests/direct_reply_to/test_direct_reply.py +++ b/tests/direct_reply_to/test_direct_reply.py @@ -1,9 +1,57 @@ from rabbitmq_amqp_python_client import ( Connection, + Converter, DirectReplyToConsumerOptions, + Environment, + OutcomeState, ) +from rabbitmq_amqp_python_client.qpid.proton import Message def test_consumer_create_reply_name(connection: Connection) -> None: consumer = connection.consumer(consumer_options=DirectReplyToConsumerOptions()) assert "/queues/amq.rabbitmq.reply-to." in consumer.address + + +def create_connection(environment: Environment) -> Connection: + connection = environment.connection() + connection.dial() + return connection + + +def test_direct_reply_to_send_and_receive(environment: Environment) -> None: + """Test that messages can be published to and consumed from a direct reply-to queue.""" + messages_to_send = 10 + + # Create a consumer using DirectReplyToConsumerOptions + consumer = create_connection(environment).consumer( + consumer_options=DirectReplyToConsumerOptions() + ) + + # Get the queue address from the consumer + addr = consumer.address + assert addr is not None + assert "/queues/amq.rabbitmq.reply-to." in addr + + # Create a new connection and publisher to publish to the reply-to address + publisher = create_connection(environment).publisher(addr) + + # Publish messages to the direct reply-to queue + for i in range(messages_to_send): + msg = Message(body=Converter.string_to_bytes("test message {}".format(i))) + status = publisher.publish(msg) + assert status.remote_state == OutcomeState.ACCEPTED + + # Consume messages synchronously + consumed = 0 + for i in range(messages_to_send): + message = consumer.consume() + if Converter.bytes_to_string(message.body) == "test message {}".format(i): + consumed = consumed + 1 + + # Clean up + publisher.close() + consumer.close() + + # Verify all messages were received + assert consumed == messages_to_send From 2d0d6388ae96441115aaf3ea3a453463dd96ffa1 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Mon, 1 Dec 2025 11:34:24 +0100 Subject: [PATCH 12/13] test Signed-off-by: Gabriele Santomaggio --- examples/direct_reply_queue/direct_reply_to.py | 15 ++++++++------- rabbitmq_amqp_python_client/consumer.py | 4 ++-- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/examples/direct_reply_queue/direct_reply_to.py b/examples/direct_reply_queue/direct_reply_to.py index 6647ebc..53c6924 100644 --- a/examples/direct_reply_queue/direct_reply_to.py +++ b/examples/direct_reply_queue/direct_reply_to.py @@ -53,16 +53,17 @@ def create_connection(environment: Environment) -> Connection: def main() -> None: - print("connection to amqp server") + print("connection_consumer to amqp server") environment = Environment(uri="amqp://guest:guest@localhost:5672/") - connection = create_connection(environment) - consumer = connection.consumer( + connection_consumer = create_connection(environment) + consumer = connection_consumer.consumer( message_handler=MyMessageHandler(), consumer_options=DirectReplyToConsumerOptions(), ) addr = consumer.address print("connecting to address: {}".format(addr)) - publisher = create_connection(environment).publisher(addr) + connection_publisher = create_connection(environment) + publisher = connection_publisher.publisher(addr) for i in range(MESSAGES_TO_PUBLISH): msg = Message(body=Converter.string_to_bytes("test message {} ".format(i))) @@ -80,9 +81,9 @@ def main() -> None: pass consumer.close() - - connection.close() - print("after connection closing") + publisher.close() + connection_consumer.close() + connection_publisher.close() if __name__ == "__main__": diff --git a/rabbitmq_amqp_python_client/consumer.py b/rabbitmq_amqp_python_client/consumer.py index 994ae68..7cfb5c8 100644 --- a/rabbitmq_amqp_python_client/consumer.py +++ b/rabbitmq_amqp_python_client/consumer.py @@ -151,7 +151,7 @@ def stop(self) -> None: self._receiver.container.stop() def _create_receiver(self, addr: Optional[str] = None) -> BlockingReceiver: - credit = 100 + credit = 2 if self._credit is not None: credit = self._credit @@ -174,7 +174,7 @@ def _create_receiver(self, addr: Optional[str] = None) -> BlockingReceiver: if isinstance(self._consumer_options, DirectReplyToConsumerOptions): logger.debug("Creating dynamic receiver for direct reply-to") dynamic_receiver = self._conn.create_dynamic_receiver( - 100, handler=self._handler + credit, handler=self._handler ) dynamic_receiver.credit = credit return dynamic_receiver From b74e2bd0558b112b834417452c4825b92b44bee3 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Mon, 1 Dec 2025 11:52:34 +0100 Subject: [PATCH 13/13] add credits Signed-off-by: Gabriele Santomaggio --- rabbitmq_amqp_python_client/consumer.py | 2 +- tests/direct_reply_to/test_direct_reply.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/rabbitmq_amqp_python_client/consumer.py b/rabbitmq_amqp_python_client/consumer.py index 7cfb5c8..f63dd2b 100644 --- a/rabbitmq_amqp_python_client/consumer.py +++ b/rabbitmq_amqp_python_client/consumer.py @@ -151,7 +151,7 @@ def stop(self) -> None: self._receiver.container.stop() def _create_receiver(self, addr: Optional[str] = None) -> BlockingReceiver: - credit = 2 + credit = 10 if self._credit is not None: credit = self._credit diff --git a/tests/direct_reply_to/test_direct_reply.py b/tests/direct_reply_to/test_direct_reply.py index d77363e..a92e8be 100644 --- a/tests/direct_reply_to/test_direct_reply.py +++ b/tests/direct_reply_to/test_direct_reply.py @@ -21,11 +21,11 @@ def create_connection(environment: Environment) -> Connection: def test_direct_reply_to_send_and_receive(environment: Environment) -> None: """Test that messages can be published to and consumed from a direct reply-to queue.""" - messages_to_send = 10 + messages_to_send = 100 # Create a consumer using DirectReplyToConsumerOptions consumer = create_connection(environment).consumer( - consumer_options=DirectReplyToConsumerOptions() + credit=100, consumer_options=DirectReplyToConsumerOptions() ) # Get the queue address from the consumer