Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
e5fc783
add asyncio wrapper classes
dadodimauro Oct 29, 2025
156aaf6
handle connection close with callbacks
dadodimauro Oct 29, 2025
78db3e7
edit publisher sender handling
dadodimauro Oct 29, 2025
10b988c
safer handling in close
dadodimauro Oct 29, 2025
18e80d6
ignore amqp:resource-deleted when closing management sender and receive
dadodimauro Oct 29, 2025
5320813
fix formatting
dadodimauro Oct 29, 2025
d440429
add tests for async publisher
dadodimauro Oct 29, 2025
7c45ec0
fix publisher tests
dadodimauro Oct 29, 2025
9510cf1
add tests for async consumer
dadodimauro Oct 29, 2025
0b8fb2e
add tests for async management
dadodimauro Oct 31, 2025
2835843
add tests for async connection and format
dadodimauro Oct 31, 2025
aec72f6
add tests for amqp 0.9.1
dadodimauro Oct 31, 2025
2f0736d
add tests for streams with async classes
dadodimauro Oct 31, 2025
96f74fd
Update README with fix for SSLUnavailable error
dadodimauro Oct 31, 2025
e3a397e
fix type hint uncompatible with python 3.9
dadodimauro Nov 3, 2025
a0ee953
better handling of asyncio.Lock and event loop, add docstrings
dadodimauro Nov 8, 2025
83a7cb8
minor fix
dadodimauro Nov 8, 2025
331aebf
add async getting started example
dadodimauro Nov 8, 2025
8187a18
add type: ignore to example
dadodimauro Nov 8, 2025
9dc2bed
fix async consumer
dadodimauro Nov 10, 2025
2c63373
add more async examples
dadodimauro Nov 10, 2025
23cd38d
update readme
dadodimauro Nov 10, 2025
2bb1443
fix black errors for examples
dadodimauro Nov 10, 2025
1bba34f
fix problems after copilot review
dadodimauro Nov 13, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,48 @@ To run TLS you need to:

Read more about the issue [here](https://stackoverflow.com/questions/44979947/python-qpid-proton-for-mac-using-amqps)

### SSL Problems in local environment

If when running tests, this exceptions is raised by the proton library: `SSLUnavailable`:
``` bash
pip uninstall python-qpid-proton -y

sudo apt-get update
sudo apt-get install -y swig cmake build-essential libssl-dev pkg-config

export PKG_CONFIG_PATH=/usr/lib/x86_64-linux-gnu/pkgconfig
export CFLAGS="-I/usr/include/openssl"
export LDFLAGS="-L/usr/lib/x86_64-linux-gnu"

pip install "python-qpid-proton>=0.39.0,<0.40.0" --no-binary python-qpid-proton --verbose --no-cache-dir
```

### Async Interface (Experimental)

The client provides an async interface via the `rabbitmq_amqp_python_client.asyncio` module. The async classes act as facades that:

- Wrap the corresponding synchronous classes
- Execute blocking operations in a thread pool executor using `run_in_executor`
- Coordinate concurrent access using `asyncio.Lock`
- Implement proper async context managers (`async with`) for resource management
- Maintain API compatibility with the synchronous version

**Key differences from the synchronous interface:**

1. Use `AsyncEnvironment` instead of `Environment`
2. All operations must be awaited with `await`
3. Use `async with` for resource management (connections, publishers, consumers, management)
4. Consumer signal handling uses `asyncio.Event` and `loop.add_signal_handler`

For a complete example showing proper consumer termination and signal handling, refer to:

- [examples/getting_started/getting_started_async.py](./examples/getting_started/getting_started_async.py)

Additional async examples are available in the [examples](./examples) folder:

- OAuth2: [examples/oauth/oAuth2_async.py](./examples/oauth/oAuth2_async.py)
- Reconnection: [examples/reconnection/reconnection_example_async.py](./examples/reconnection/reconnection_example_async.py)
- Streams: [examples/streams/example_with_streams_async.py](./examples/streams/example_with_streams_async.py)
- TLS: [examples/tls/tls_example_async.py](./examples/tls/tls_example_async.py)


149 changes: 149 additions & 0 deletions examples/getting_started/getting_started_async.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
# type: ignore

import asyncio
import signal

from rabbitmq_amqp_python_client import (
AddressHelper,
AMQPMessagingHandler,
AsyncEnvironment,
Converter,
Event,
ExchangeSpecification,
ExchangeToQueueBindingSpecification,
Message,
OutcomeState,
QuorumQueueSpecification,
)

MESSAGES_TO_PUBLISH = 100


class StopConsumerException(Exception):
"""Exception to signal consumer should stop"""

pass


class MyMessageHandler(AMQPMessagingHandler):
def __init__(self):
super().__init__()
self._count = 0

def on_amqp_message(self, event: Event):
print(
"received message: {} ".format(
Converter.bytes_to_string(event.message.body)
)
)

self.delivery_context.accept(event)
self._count = self._count + 1
print("count " + str(self._count))

def on_connection_closed(self, event: Event):
print("connection closed")

def on_link_closed(self, event: Event) -> None:
print("link closed")


async def main():
exchange_name = "test-exchange"
queue_name = "example-queue"
routing_key = "routing-key"

print("connection to amqp server")
async with AsyncEnvironment(
uri="amqp://guest:guest@localhost:5672/"
) as environment:
async with await environment.connection() as connection:
async with await connection.management() as management:
print("declaring exchange and queue")
await management.declare_exchange(
ExchangeSpecification(name=exchange_name)
)
await management.declare_queue(
QuorumQueueSpecification(name=queue_name)
)

print("binding queue to exchange")
bind_name = await management.bind(
ExchangeToQueueBindingSpecification(
source_exchange=exchange_name,
destination_queue=queue_name,
binding_key=routing_key,
)
)

addr = AddressHelper.exchange_address(exchange_name, routing_key)
addr_queue = AddressHelper.queue_address(queue_name)

print("create a publisher and publish a test message")
async with await connection.publisher(addr) as publisher:
print("purging the queue")
messages_purged = await management.purge_queue(queue_name)
print("messages purged: " + str(messages_purged))

# publish messages
for i in range(MESSAGES_TO_PUBLISH):
status = await publisher.publish(
Message(
body=Converter.string_to_bytes(
"test message {} ".format(i)
)
)
)
if status.remote_state == OutcomeState.ACCEPTED:
print("message accepted")

print(
"create a consumer and consume the test message - press control + c to terminate"
)
handler = MyMessageHandler()
async with await connection.consumer(
addr_queue, message_handler=handler
) as consumer:
# Create stop event and signal handler
stop_event = asyncio.Event()

def handle_sigint():
print("\nCtrl+C detected, stopping consumer gracefully...")
stop_event.set()

# Register signal handler
loop = asyncio.get_running_loop()
loop.add_signal_handler(signal.SIGINT, handle_sigint)

try:
# Run consumer in background
consumer_task = asyncio.create_task(consumer.run())

# Wait for stop signal
await stop_event.wait()

# Stop consumer gracefully
print("Stopping consumer...")
await consumer.stop_processing()

# Wait for task to complete
try:
await asyncio.wait_for(consumer_task, timeout=3.0)
except asyncio.TimeoutError:
print("Consumer task timed out")

finally:
loop.remove_signal_handler(signal.SIGINT)

print("unbind")
await management.unbind(bind_name)

print("delete queue")
await management.delete_queue(queue_name)

print("delete exchange")
await management.delete_exchange(exchange_name)


if __name__ == "__main__":
asyncio.run(main())
Loading