1+ # type: ignore
2+
3+ import asyncio
4+ import base64
5+ import signal
6+ from datetime import datetime , timedelta
7+
8+ import jwt
9+
10+ from rabbitmq_amqp_python_client import (
11+ AddressHelper ,
12+ AMQPMessagingHandler ,
13+ AsyncEnvironment ,
14+ Converter ,
15+ Event ,
16+ ExchangeSpecification ,
17+ ExchangeToQueueBindingSpecification ,
18+ Message ,
19+ OAuth2Options ,
20+ OutcomeState ,
21+ QuorumQueueSpecification ,
22+ )
23+
24+ MESSAGES_TO_PUBLISH = 100
25+
26+
27+ class MyMessageHandler (AMQPMessagingHandler ):
28+ def __init__ (self ):
29+ super ().__init__ ()
30+ self ._count = 0
31+
32+ def on_amqp_message (self , event : Event ):
33+ print ("received message: " + Converter .bytes_to_string (event .message .body ))
34+
35+ # accepting
36+ self .delivery_context .accept (event )
37+
38+ # in case of rejection (+eventually deadlettering)
39+ # self.delivery_context.discard(event)
40+
41+ # in case of requeuing
42+ # self.delivery_context.requeue(event)
43+
44+ # annotations = {}
45+ # annotations[symbol('x-opt-string')] = 'x-test1'
46+ # in case of requeuing with annotations added
47+ # self.delivery_context.requeue_with_annotations(event, annotations)
48+
49+ # in case of rejection with annotations added
50+ # self.delivery_context.discard_with_annotations(event)
51+
52+ print ("count " + str (self ._count ))
53+
54+ self ._count = self ._count + 1
55+
56+ if self ._count == MESSAGES_TO_PUBLISH :
57+ print ("received all messages" )
58+
59+ def on_connection_closed (self , event : Event ):
60+ # if you want you can add cleanup operations here
61+ print ("connection closed" )
62+
63+ def on_link_closed (self , event : Event ) -> None :
64+ # if you want you can add cleanup operations here
65+ print ("link closed" )
66+
67+
68+ async def main () -> None :
69+ exchange_name = "oAuth2-test-exchange"
70+ queue_name = "oAuth2-example-queue"
71+ routing_key = "oAuth2-routing-key"
72+
73+ print ("connection to amqp server" )
74+ oauth_token = token (
75+ datetime .now () + timedelta (milliseconds = 10000 ),
76+ "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGH" ,
77+ )
78+
79+ async with AsyncEnvironment (
80+ uri = "amqp://localhost:5672" , oauth2_options = OAuth2Options (token = oauth_token )
81+ ) as environment :
82+ async with await environment .connection () as connection :
83+ async with await connection .management () as management :
84+ # you can refresh the oauth token with the connection api
85+ oauth_token = token (
86+ datetime .now () + timedelta (milliseconds = 10000 ),
87+ "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGH" ,
88+ )
89+ await connection .refresh_token (oauth_token )
90+
91+ print ("declaring exchange and queue" )
92+ await management .declare_exchange (
93+ ExchangeSpecification (name = exchange_name )
94+ )
95+
96+ await management .declare_queue (
97+ QuorumQueueSpecification (name = queue_name )
98+ # QuorumQueueSpecification(name=queue_name, dead_letter_exchange="dead-letter")
99+ )
100+
101+ print ("binding queue to exchange" )
102+ bind_name = await management .bind (
103+ ExchangeToQueueBindingSpecification (
104+ source_exchange = exchange_name ,
105+ destination_queue = queue_name ,
106+ binding_key = routing_key ,
107+ )
108+ )
109+
110+ addr = AddressHelper .exchange_address (exchange_name , routing_key )
111+ addr_queue = AddressHelper .queue_address (queue_name )
112+
113+ print ("create a publisher and publish a test message" )
114+ async with await connection .publisher (addr ) as publisher :
115+ print ("purging the queue" )
116+ messages_purged = await management .purge_queue (queue_name )
117+ print ("messages purged: " + str (messages_purged ))
118+
119+ # publish messages
120+ for i in range (MESSAGES_TO_PUBLISH ):
121+ status = await publisher .publish (
122+ Message (body = Converter .string_to_bytes ("test_{}" .format (i )))
123+ )
124+ if status .remote_state == OutcomeState .ACCEPTED :
125+ print ("message: test_{} accepted" .format (i ))
126+ elif status .remote_state == OutcomeState .RELEASED :
127+ print ("message: test_{} not routed" .format (i ))
128+ elif status .remote_state == OutcomeState .REJECTED :
129+ print ("message: test_{} rejected" .format (i ))
130+
131+ print (
132+ "create a consumer and consume the test message - press control + c to terminate"
133+ )
134+ handler = MyMessageHandler ()
135+ async with await connection .consumer (
136+ addr_queue , message_handler = handler
137+ ) as consumer :
138+ # Create stop event and signal handler
139+ stop_event = asyncio .Event ()
140+
141+ def handle_sigint ():
142+ print ("\n Ctrl+C detected, stopping consumer gracefully..." )
143+ stop_event .set ()
144+
145+ # Register signal handler
146+ loop = asyncio .get_running_loop ()
147+ loop .add_signal_handler (signal .SIGINT , handle_sigint )
148+
149+ try :
150+ # Run consumer in background
151+ consumer_task = asyncio .create_task (consumer .run ())
152+
153+ # Wait for stop signal
154+ await stop_event .wait ()
155+
156+ # Stop consumer gracefully
157+ print ("Stopping consumer..." )
158+ await consumer .stop_processing ()
159+
160+ # Wait for task to complete
161+ try :
162+ await asyncio .wait_for (consumer_task , timeout = 3.0 )
163+ except asyncio .TimeoutError :
164+ print ("Consumer task timed out" )
165+
166+ finally :
167+ loop .remove_signal_handler (signal .SIGINT )
168+
169+ print ("cleanup" )
170+ print ("unbind" )
171+ await management .unbind (bind_name )
172+
173+ print ("delete queue" )
174+ await management .delete_queue (queue_name )
175+
176+ print ("delete exchange" )
177+ await management .delete_exchange (exchange_name )
178+
179+
180+ def token (duration : datetime , token : str ) -> str :
181+ # Decode the base64 key
182+ decoded_key = base64 .b64decode (token )
183+
184+ # Define the claims
185+ claims = {
186+ "iss" : "unit_test" ,
187+ "aud" : "rabbitmq" ,
188+ "exp" : int (duration .timestamp ()),
189+ "scope" : ["rabbitmq.configure:*/*" , "rabbitmq.write:*/*" , "rabbitmq.read:*/*" ],
190+ "random" : random_string (6 ),
191+ }
192+
193+ # Create the token with the claims and sign it
194+ jwt_token = jwt .encode (
195+ claims , decoded_key , algorithm = "HS256" , headers = {"kid" : "token-key" }
196+ )
197+
198+ return jwt_token
199+
200+
201+ # Helper function to generate a random string (replace with your implementation)
202+ def random_string (length : int ) -> str :
203+ import random
204+ import string
205+
206+ return "" .join (random .choices (string .ascii_letters + string .digits , k = length ))
207+
208+
209+ if __name__ == "__main__" :
210+ asyncio .run (main ())
0 commit comments