Skip to content

Commit 82d66cc

Browse files
committed
Merge remote-tracking branch 'origin/main' into set_durable_true
2 parents 3fdd959 + d3ca042 commit 82d66cc

File tree

19 files changed

+1605
-256
lines changed

19 files changed

+1605
-256
lines changed

.ci/ubuntu/gha-setup.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ set -o xtrace
77
script_dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
88
readonly script_dir
99
echo "[INFO] script_dir: '$script_dir'"
10-
readonly rabbitmq_image=rabbitmq:4.1.0-management
10+
readonly rabbitmq_image=rabbitmq:4.2-rc-management
1111

1212

1313
readonly docker_name_prefix='rabbitmq-amqp-python-client'

Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ format:
1212
poetry run black rabbitmq_amqp_python_client/
1313
poetry run black tests/
1414
poetry run flake8 --exclude=venv,.venv,local_tests,docs/examples,rabbitmq_amqp_python_client/qpid --max-line-length=120 --ignore=E203,W503
15+
poetry run mypy --exclude=rabbitmq_amqp_python_client/qpid .
1516

1617
test: format
1718
poetry run pytest .

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
## RabbitMQ AMQP 1.0 Python Client
2-
This library is meant to be used with RabbitMQ 4.0. Suitable for testing in pre-production environments.
2+
This library is meant to be used with RabbitMQ `4.x`. Suitable for testing in pre-production environments.
33

44
The client is distributed via [`PIP`](https://pypi.org/project/rabbitmq-amqp-python-client/):
55
```bash

examples/README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,6 @@ Client examples
44
- [Reconnection](./reconnection/reconnection_example.py) - Producer and Consumer example with reconnection
55
- [TLS](./tls/tls_example.py) - Producer and Consumer using a TLS connection
66
- [Streams](./streams/example_with_streams.py) - Example supporting stream capabilities
7-
- [Oauth](./oauth/oAuth2.py) - Connection through Oauth token
7+
- [Oauth](./oauth/oAuth2.py) - Connection through Oauth token
8+
- [Streams with filters](./streams_with_filters/example_streams_with_filters.py) - Example supporting stream capabilities with filters
9+
- [Streams With Sql filters](./streams_with_sql_filters) - Example supporting stream capabilities with SQL filters

examples/streams/example_with_streams.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
Event,
1111
Message,
1212
OffsetSpecification,
13-
StreamOptions,
13+
StreamConsumerOptions,
1414
StreamSpecification,
1515
)
1616

@@ -104,8 +104,8 @@ def main() -> None:
104104
message_handler=MyMessageHandler(),
105105
# can be first, last, next or an offset long
106106
# you can also specify stream filters with methods: apply_filters and filter_match_unfiltered
107-
stream_filter_options=StreamOptions(
108-
offset_specification=OffsetSpecification.first, filters=["banana"]
107+
consumer_options=StreamConsumerOptions(
108+
offset_specification=OffsetSpecification.first
109109
),
110110
)
111111
print(
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
# type: ignore
2+
import logging
3+
import time
4+
5+
from rabbitmq_amqp_python_client import (
6+
AddressHelper,
7+
AMQPMessagingHandler,
8+
Connection,
9+
ConnectionClosed,
10+
Converter,
11+
Environment,
12+
Event,
13+
Message,
14+
MessageProperties,
15+
OffsetSpecification,
16+
StreamConsumerOptions,
17+
StreamFilterOptions,
18+
StreamSpecification,
19+
)
20+
21+
MESSAGES_TO_PUBLISH = 100
22+
23+
24+
class MyMessageHandler(AMQPMessagingHandler):
25+
26+
def __init__(self):
27+
super().__init__()
28+
self._count = 0
29+
30+
def on_amqp_message(self, event: Event):
31+
# only messages with banana filters and with subject yellow
32+
# and application property from = italy get received
33+
self._count = self._count + 1
34+
logger.info(
35+
"Received message: {}, subject {} application properties {} .[Total Consumed: {}]".format(
36+
Converter.bytes_to_string(event.message.body),
37+
event.message.subject,
38+
event.message.application_properties,
39+
self._count,
40+
)
41+
)
42+
self.delivery_context.accept(event)
43+
44+
def on_connection_closed(self, event: Event):
45+
# if you want you can add cleanup operations here
46+
print("connection closed")
47+
48+
def on_link_closed(self, event: Event) -> None:
49+
# if you want you can add cleanup operations here
50+
print("link closed")
51+
52+
53+
def create_connection(environment: Environment) -> Connection:
54+
connection = environment.connection()
55+
connection.dial()
56+
57+
return connection
58+
59+
60+
logging.basicConfig()
61+
logger = logging.getLogger("[streams_with_filters]")
62+
logger.setLevel(logging.INFO)
63+
64+
65+
def main() -> None:
66+
"""
67+
In this example we create a stream queue and a consumer with filtering options.
68+
The example combines two filters:
69+
- filter value: banana
70+
- subject: yellow
71+
72+
See: https://www.rabbitmq.com/docs/next/stream-filtering#stage-2-amqp-filter-expressions
73+
"""
74+
75+
queue_name = "stream-example-with-message-properties-filter-queue"
76+
logger.info("Creating connection")
77+
environment = Environment("amqp://guest:guest@localhost:5672/")
78+
connection = create_connection(environment)
79+
management = connection.management()
80+
# delete the queue if it exists
81+
management.delete_queue(queue_name)
82+
# create a stream queue
83+
management.declare_queue(StreamSpecification(name=queue_name))
84+
85+
addr_queue = AddressHelper.queue_address(queue_name)
86+
87+
consumer_connection = create_connection(environment)
88+
89+
consumer = consumer_connection.consumer(
90+
addr_queue,
91+
message_handler=MyMessageHandler(),
92+
# the consumer will only receive messages with filter value banana and subject yellow
93+
# and application property from = italy
94+
consumer_options=StreamConsumerOptions(
95+
offset_specification=OffsetSpecification.first,
96+
filter_options=StreamFilterOptions(
97+
values=["banana"],
98+
message_properties=MessageProperties(
99+
subject="yellow",
100+
),
101+
application_properties={"from": "italy"},
102+
),
103+
),
104+
)
105+
print(
106+
"create a consumer and consume the test message - press control + c to terminate to consume"
107+
)
108+
109+
# print("create a publisher and publish a test message")
110+
publisher = connection.publisher(addr_queue)
111+
112+
# publish with a filter of apple
113+
for i in range(MESSAGES_TO_PUBLISH):
114+
color = "green" if i % 2 == 0 else "yellow"
115+
from_value = "italy" if i % 3 == 0 else "spain"
116+
publisher.publish(
117+
Message(
118+
Converter.string_to_bytes(body="apple: " + str(i)),
119+
annotations={"x-stream-filter-value": "apple"},
120+
subject=color,
121+
application_properties={"from": from_value},
122+
)
123+
)
124+
125+
time.sleep(0.5) # wait a bit to ensure messages are published in different chunks
126+
127+
# publish with a filter of banana
128+
for i in range(MESSAGES_TO_PUBLISH):
129+
color = "green" if i % 2 == 0 else "yellow"
130+
from_value = "italy" if i % 3 == 0 else "spain"
131+
publisher.publish(
132+
Message(
133+
body=Converter.string_to_bytes("banana: " + str(i)),
134+
annotations={"x-stream-filter-value": "banana"},
135+
subject=color,
136+
application_properties={"from": from_value},
137+
)
138+
)
139+
140+
publisher.close()
141+
142+
while True:
143+
try:
144+
consumer.run()
145+
except KeyboardInterrupt:
146+
pass
147+
except ConnectionClosed:
148+
print("connection closed")
149+
continue
150+
except Exception as e:
151+
print("consumer exited for exception " + str(e))
152+
153+
break
154+
155+
#
156+
logger.info("consumer exited, deleting queue")
157+
management.delete_queue(queue_name)
158+
159+
print("closing connections")
160+
management.close()
161+
print("after management closing")
162+
environment.close()
163+
print("after connection closing")
164+
165+
166+
if __name__ == "__main__":
167+
main()
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
# type: ignore
2+
import logging
3+
4+
from rabbitmq_amqp_python_client import (
5+
AddressHelper,
6+
AMQPMessagingHandler,
7+
Connection,
8+
ConnectionClosed,
9+
Converter,
10+
Environment,
11+
Event,
12+
Message,
13+
OffsetSpecification,
14+
StreamConsumerOptions,
15+
StreamFilterOptions,
16+
StreamSpecification,
17+
)
18+
19+
MESSAGES_TO_PUBLISH = 100
20+
21+
22+
class MyMessageHandler(AMQPMessagingHandler):
23+
24+
def __init__(self):
25+
super().__init__()
26+
self._count = 0
27+
28+
def on_amqp_message(self, event: Event):
29+
# only messages with banana filters and with subject yellow
30+
# and application property from = italy get received
31+
self._count = self._count + 1
32+
logger.info(
33+
"Received message: {}, subject {} application properties {} .[Total Consumed: {}]".format(
34+
Converter.bytes_to_string(event.message.body),
35+
event.message.subject,
36+
event.message.application_properties,
37+
self._count,
38+
)
39+
)
40+
self.delivery_context.accept(event)
41+
42+
def on_connection_closed(self, event: Event):
43+
# if you want you can add cleanup operations here
44+
print("connection closed")
45+
46+
def on_link_closed(self, event: Event) -> None:
47+
# if you want you can add cleanup operations here
48+
print("link closed")
49+
50+
51+
def create_connection(environment: Environment) -> Connection:
52+
connection = environment.connection()
53+
connection.dial()
54+
55+
return connection
56+
57+
58+
logging.basicConfig()
59+
logger = logging.getLogger("[streams_with_filters]")
60+
logger.setLevel(logging.INFO)
61+
62+
63+
def main() -> None:
64+
"""
65+
In this example we create a stream queue and a consumer with SQL filter
66+
67+
See: https://www.rabbitmq.com/docs/next/stream-filtering#stage-2-amqp-filter-expressions
68+
"""
69+
70+
queue_name = "stream-example-with-sql-filter-queue"
71+
logger.info("Creating connection")
72+
environment = Environment("amqp://guest:guest@localhost:5672/")
73+
connection = create_connection(environment)
74+
management = connection.management()
75+
# delete the queue if it exists
76+
management.delete_queue(queue_name)
77+
# create a stream queue
78+
management.declare_queue(StreamSpecification(name=queue_name))
79+
80+
addr_queue = AddressHelper.queue_address(queue_name)
81+
82+
consumer_connection = create_connection(environment)
83+
sql = (
84+
"properties.subject LIKE '%in_the_filter%' "
85+
"AND a_in_the_filter_key = 'a_in_the_filter_value'"
86+
)
87+
88+
consumer = consumer_connection.consumer(
89+
addr_queue,
90+
message_handler=MyMessageHandler(),
91+
consumer_options=StreamConsumerOptions(
92+
offset_specification=OffsetSpecification.first,
93+
filter_options=StreamFilterOptions(sql=sql),
94+
),
95+
)
96+
print(
97+
"create a consumer and consume the test message - press control + c to terminate to consume"
98+
)
99+
100+
# print("create a publisher and publish a test message")
101+
publisher = connection.publisher(addr_queue)
102+
103+
# publish messages won't match the filter
104+
for i in range(MESSAGES_TO_PUBLISH):
105+
publisher.publish(Message(Converter.string_to_bytes(body="apple: " + str(i))))
106+
107+
# publish messages that will match the filter
108+
for i in range(MESSAGES_TO_PUBLISH):
109+
msqMatch = Message(
110+
body=Converter.string_to_bytes("the_right_one_sql"),
111+
# will match due of %
112+
subject="something_in_the_filter_{}".format(i),
113+
application_properties={"a_in_the_filter_key": "a_in_the_filter_value"},
114+
)
115+
publisher.publish(msqMatch)
116+
117+
publisher.close()
118+
119+
while True:
120+
try:
121+
consumer.run()
122+
except KeyboardInterrupt:
123+
pass
124+
except ConnectionClosed:
125+
print("connection closed")
126+
continue
127+
except Exception as e:
128+
print("consumer exited for exception " + str(e))
129+
130+
break
131+
132+
#
133+
logger.info("consumer exited, deleting queue")
134+
management.delete_queue(queue_name)
135+
136+
print("closing connections")
137+
management.close()
138+
print("after management closing")
139+
environment.close()
140+
print("after connection closing")
141+
142+
143+
if __name__ == "__main__":
144+
main()

0 commit comments

Comments
 (0)