From 671d5f8c5cba37fc876db45f5052c9b13d3e5115 Mon Sep 17 00:00:00 2001 From: John Peters Date: Wed, 20 Dec 2023 19:10:06 -0600 Subject: [PATCH 01/16] "added to the readme to have a quickstart section" --- README.md | 56 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/README.md b/README.md index 50a0f0d..3da3c3d 100644 --- a/README.md +++ b/README.md @@ -41,6 +41,62 @@ from memphis.types import Retention, Storage import asyncio ``` +### Quickstart - Producing and Consuming + +The most basic functionaly of memphis is the ability to produce messages to a station and to consume those messages. + +First, a connection to Memphis must be made: + +```python + from memphis import Memphis + + # Connecting to the broker + memphis = Memphis() + + await memphis.connect( + host = "aws-us-east-1.cloud.memphis.dev", + username = "test_user", + password = os.environ.get("memphis_pass"), + account_id = os.environ.get("memphis_account_id") # For cloud users on, at the top of the overview page + ) +``` + +Then, to produce a message, simple create a producer and call `produce`: + +```python + # Creating a producer and producing a message. You can also use the memphis.producer function + producer = await memphis.producer( + station_name = "test_station", # Matches the station name in memphis cloud + producer_name = "producer" + ) + + await producer.produce(message={ + "id": i, + "chocolates_to_eat": 3 + }) +``` + +Lastly, to consume this message, create a consumer and call `fetch`: + +```python + # Creating a consumer and consuming the message we just produced + consumer = await memphis.consumer( + station_name="test_station", + consumer_name="consumer", + ) + + messages: list[Message] = await consumer.fetch() # Type-hint the return here for LSP integration + for consumed_message in messages: + msg_as_dict = json.loads(consumed_message.get_data()) + + # Do something here with the msg_as_dict + if msg_as_dict["chocolates_to_eat"] > 2: + print("That's a lot of chocolate!") + + # Ack the message to tell the broker we're done with it + await consumed_message.ack() +``` + ### Connecting to Memphis First, we need to create Memphis `object` and then connect with Memphis by using `memphis.connect`. From 54b893d12fb142515382489949a4d49b9065c339 Mon Sep 17 00:00:00 2001 From: John-Memphis <136013599+John-Memphis@users.noreply.github.com> Date: Wed, 20 Dec 2023 19:15:05 -0600 Subject: [PATCH 02/16] Update README.md, spelling mistake --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 3da3c3d..4684a6e 100644 --- a/README.md +++ b/README.md @@ -57,7 +57,7 @@ First, a connection to Memphis must be made: host = "aws-us-east-1.cloud.memphis.dev", username = "test_user", password = os.environ.get("memphis_pass"), - account_id = os.environ.get("memphis_account_id") # For cloud users on, at the top of the overview page + account_id = os.environ.get("memphis_account_id") # For cloud users, at the top of the overview page ) ``` From 324660d9f1943856dd4c46684379094dcc3f80df Mon Sep 17 00:00:00 2001 From: John-Memphis <136013599+John-Memphis@users.noreply.github.com> Date: Wed, 20 Dec 2023 19:16:36 -0600 Subject: [PATCH 03/16] Update README.md, added import for type hinting --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index 4684a6e..034a332 100644 --- a/README.md +++ b/README.md @@ -79,6 +79,8 @@ Then, to produce a message, simple create a producer and call `produce`: Lastly, to consume this message, create a consumer and call `fetch`: ```python + from memphis.message import Message + # Creating a consumer and consuming the message we just produced consumer = await memphis.consumer( station_name="test_station", From 1405f7b772a85ba1376a26211f3f6087a966eada Mon Sep 17 00:00:00 2001 From: John Peters Date: Wed, 20 Dec 2023 19:17:25 -0600 Subject: [PATCH 04/16] "Removed extra indenting" --- README.md | 68 +++++++++++++++++++++++++++---------------------------- 1 file changed, 34 insertions(+), 34 deletions(-) diff --git a/README.md b/README.md index 034a332..49adde6 100644 --- a/README.md +++ b/README.md @@ -48,55 +48,55 @@ The most basic functionaly of memphis is the ability to produce messages to a st First, a connection to Memphis must be made: ```python - from memphis import Memphis +from memphis import Memphis - # Connecting to the broker - memphis = Memphis() +# Connecting to the broker +memphis = Memphis() - await memphis.connect( - host = "aws-us-east-1.cloud.memphis.dev", - username = "test_user", - password = os.environ.get("memphis_pass"), - account_id = os.environ.get("memphis_account_id") # For cloud users, at the top of the overview page - ) +await memphis.connect( + host = "aws-us-east-1.cloud.memphis.dev", + username = "test_user", + password = os.environ.get("memphis_pass"), + account_id = os.environ.get("memphis_account_id") # For cloud users, at the top of the overview page +) ``` Then, to produce a message, simple create a producer and call `produce`: ```python - # Creating a producer and producing a message. You can also use the memphis.producer function - producer = await memphis.producer( - station_name = "test_station", # Matches the station name in memphis cloud - producer_name = "producer" - ) +# Creating a producer and producing a message. You can also use the memphis.producer function +producer = await memphis.producer( + station_name = "test_station", # Matches the station name in memphis cloud + producer_name = "producer" +) - await producer.produce(message={ - "id": i, - "chocolates_to_eat": 3 - }) +await producer.produce(message={ + "id": i, + "chocolates_to_eat": 3 +}) ``` Lastly, to consume this message, create a consumer and call `fetch`: ```python - from memphis.message import Message +from memphis.message import Message - # Creating a consumer and consuming the message we just produced - consumer = await memphis.consumer( - station_name="test_station", - consumer_name="consumer", - ) +# Creating a consumer and consuming the message we just produced +consumer = await memphis.consumer( + station_name="test_station", + consumer_name="consumer", +) - messages: list[Message] = await consumer.fetch() # Type-hint the return here for LSP integration - for consumed_message in messages: - msg_as_dict = json.loads(consumed_message.get_data()) - - # Do something here with the msg_as_dict - if msg_as_dict["chocolates_to_eat"] > 2: - print("That's a lot of chocolate!") - - # Ack the message to tell the broker we're done with it - await consumed_message.ack() +messages: list[Message] = await consumer.fetch() # Type-hint the return here for LSP integration +for consumed_message in messages: + msg_as_dict = json.loads(consumed_message.get_data()) + + # Do something here with the msg_as_dict + if msg_as_dict["chocolates_to_eat"] > 2: + print("That's a lot of chocolate!") + + # Ack the message to tell the broker we're done with it + await consumed_message.ack() ``` ### Connecting to Memphis From 02049f3255080fa0324313e3827e292de9e7bd44 Mon Sep 17 00:00:00 2001 From: John Peters Date: Wed, 20 Dec 2023 19:21:42 -0600 Subject: [PATCH 05/16] "Added comment on asyncio" --- README.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/README.md b/README.md index 49adde6..4bfae04 100644 --- a/README.md +++ b/README.md @@ -45,6 +45,16 @@ import asyncio The most basic functionaly of memphis is the ability to produce messages to a station and to consume those messages. +> The Memphis.py SDK uses asyncio for many functions. Make sure to call the following code in an async function: + +```python +async def main: + ... + +if __name__ == '__main__': + asyncio.run(main()) +``` + First, a connection to Memphis must be made: ```python From 179fd4aaa5a750edd1c144fee48a45b820f88334 Mon Sep 17 00:00:00 2001 From: John Peters Date: Wed, 20 Dec 2023 19:22:38 -0600 Subject: [PATCH 06/16] "Forgot ()" --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 4bfae04..31d2ca9 100644 --- a/README.md +++ b/README.md @@ -48,7 +48,7 @@ The most basic functionaly of memphis is the ability to produce messages to a st > The Memphis.py SDK uses asyncio for many functions. Make sure to call the following code in an async function: ```python -async def main: +async def main(): ... if __name__ == '__main__': From 6ca03784ec8cae925e6f2e4f1bc8d338f1164441 Mon Sep 17 00:00:00 2001 From: John Peters Date: Thu, 21 Dec 2023 11:49:28 -0600 Subject: [PATCH 07/16] "Changed to use memphis.produce and memphis.fetch_messages in order to get the examples shorter" --- README.md | 34 +++++++++++++--------------------- 1 file changed, 13 insertions(+), 21 deletions(-) diff --git a/README.md b/README.md index 31d2ca9..5d685a2 100644 --- a/README.md +++ b/README.md @@ -71,41 +71,33 @@ await memphis.connect( ) ``` -Then, to produce a message, simple create a producer and call `produce`: +Then, to produce a message, call the `memphis.produce` function or create a producer and call its `producer.produce` function: ```python -# Creating a producer and producing a message. You can also use the memphis.producer function -producer = await memphis.producer( - station_name = "test_station", # Matches the station name in memphis cloud - producer_name = "producer" -) - -await producer.produce(message={ +await memphis.produce( + station_name="test_station", + producer_name="producer", + message={ "id": i, "chocolates_to_eat": 3 -}) + } +) ``` -Lastly, to consume this message, create a consumer and call `fetch`: +Lastly, to consume this message, call the `memphis.fetch_messages` function or create a consumer and call its `consumer.fetch` function: ```python from memphis.message import Message -# Creating a consumer and consuming the message we just produced -consumer = await memphis.consumer( +messages: list[Message] = await memphis.fetch_messages( station_name="test_station", consumer_name="consumer", -) +) # Type-hint the return here for LSP integration -messages: list[Message] = await consumer.fetch() # Type-hint the return here for LSP integration for consumed_message in messages: - msg_as_dict = json.loads(consumed_message.get_data()) - - # Do something here with the msg_as_dict - if msg_as_dict["chocolates_to_eat"] > 2: - print("That's a lot of chocolate!") - - # Ack the message to tell the broker we're done with it + msg_data = json.loads(consumed_message.get_data()) + print(f"Ate {msg_data['chocolates_to_eat']} chocolates... Yum") + await consumed_message.ack() ``` From 739778b7516344fe50d6a92f5425f5ae405e3928 Mon Sep 17 00:00:00 2001 From: John-Memphis <136013599+John-Memphis@users.noreply.github.com> Date: Wed, 27 Dec 2023 18:38:05 -0600 Subject: [PATCH 08/16] Added close reminder --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index 5d685a2..7256e65 100644 --- a/README.md +++ b/README.md @@ -101,6 +101,8 @@ for consumed_message in messages: await consumed_message.ack() ``` +> Remember to call `memphis.close()` to close the connection. + ### Connecting to Memphis First, we need to create Memphis `object` and then connect with Memphis by using `memphis.connect`. From 44f49916030db85877fe839a81767fcaae83cfc6 Mon Sep 17 00:00:00 2001 From: John Peters Date: Thu, 28 Dec 2023 20:38:58 -0600 Subject: [PATCH 09/16] "Updated examples for python" --- examples/consumer.py | 56 +++++++++++++++++++------------------------- examples/producer.py | 56 ++++++++++++++++++-------------------------- 2 files changed, 47 insertions(+), 65 deletions(-) diff --git a/examples/consumer.py b/examples/consumer.py index 39d0c9f..00b0717 100644 --- a/examples/consumer.py +++ b/examples/consumer.py @@ -1,47 +1,39 @@ -from __future__ import annotations - +from memphis import Memphis +from memphis.message import Message import asyncio - -from memphis import Memphis, MemphisConnectError, MemphisError, MemphisHeaderError - +import os +import json async def main(): - async def msg_handler(msgs, error, _): - try: - for msg in msgs: - print("message: ", msg.get_data()) - await msg.ack() - if error: - print(error) - except (MemphisError, MemphisConnectError, MemphisHeaderError) as e: - print(e) - return - try: + # Connecting to the broker memphis = Memphis() + await memphis.connect( - host="", - username="", - connection_token="", - ) + host = "aws-us-east-1.cloud.memphis.dev", + username = "test_user", + password = os.environ.get("memphis_pass"), + account_id = os.environ.get("memphis_account_id") # For cloud users on, at the top of the overview page + ) consumer = await memphis.consumer( - station_name="", - consumer_name="", - consumer_group="", + station_name="test_station", + consumer_name="consumer", ) - consumer.set_context({"key": "value"}) - consumer.consume(msg_handler) - # Keep your main thread alive so the consumer will keep receiving data - await asyncio.Event().wait() + messages: list[Message] = await consumer.fetch() # Type-hint the return here for LSP integration + + for consumed_message in messages: + msg_data = json.loads(consumed_message.get_data()) - except (MemphisError, MemphisConnectError) as e: - print(e) + # Do something with the message data + + await consumed_message.ack() + except Exception as e: + print(e) finally: await memphis.close() - -if __name__ == "__main__": - asyncio.run(main()) +if __name__ == '__main__': + asyncio.run(main()) \ No newline at end of file diff --git a/examples/producer.py b/examples/producer.py index 01dd381..f646833 100644 --- a/examples/producer.py +++ b/examples/producer.py @@ -1,48 +1,38 @@ -from __future__ import annotations - +from memphis import Memphis +from memphis.message import Message import asyncio - -from memphis import ( - Headers, - Memphis, - MemphisConnectError, - MemphisError, - MemphisHeaderError, - MemphisSchemaError, -) - +import os async def main(): try: + # Connecting to the broker memphis = Memphis() + await memphis.connect( - host="", - username="", - connection_token="", - ) + host = "aws-us-east-1.cloud.memphis.dev", + username = "test_user", + password = os.environ.get("memphis_pass"), + account_id = os.environ.get("memphis_account_id") # For cloud users on, at the top of the overview page + ) + # Creating a producer and producing a message. You can also use the memphis.producer function producer = await memphis.producer( - station_name="", producer_name="" + station_name = "test_station", # Matches the station name in memphis cloud + producer_name = "producer" ) - headers = Headers() - headers.add("key", "value") - for i in range(5): + + for i in range(10): await producer.produce( - bytearray("Message #" + str(i) + ": Hello world", "utf-8"), - headers=headers, - ) # you can send the message parameter as dict as well + message={ + "id": i, + "chocolates_to_eat": 3 + } + ) - except ( - MemphisError, - MemphisConnectError, - MemphisHeaderError, - MemphisSchemaError, - ) as e: + except Exception as e: print(e) - finally: await memphis.close() - -if __name__ == "__main__": - asyncio.run(main()) +if __name__ == '__main__': + asyncio.run(main()) \ No newline at end of file From 9b35f1d0512783c7315bc6880716f8521dcae9ab Mon Sep 17 00:00:00 2001 From: John Peters Date: Sat, 30 Dec 2023 11:40:21 -0600 Subject: [PATCH 10/16] "touched up the formatting a little bit" --- examples/consumer.py | 32 +++++++++++++++++++------------- examples/producer.py | 34 ++++++++++++++++------------------ 2 files changed, 35 insertions(+), 31 deletions(-) diff --git a/examples/consumer.py b/examples/consumer.py index 00b0717..523d39a 100644 --- a/examples/consumer.py +++ b/examples/consumer.py @@ -1,30 +1,35 @@ -from memphis import Memphis -from memphis.message import Message import asyncio import os import json +from memphis import Memphis +from memphis.message import Message + async def main(): try: # Connecting to the broker memphis = Memphis() - + await memphis.connect( - host = "aws-us-east-1.cloud.memphis.dev", - username = "test_user", - password = os.environ.get("memphis_pass"), - account_id = os.environ.get("memphis_account_id") # For cloud users on, at the top of the overview page - ) + host="aws-us-east-1.cloud.memphis.dev", + username="test_user", + password=os.environ.get("memphis_pass"), + account_id=os.environ.get( + "memphis_account_id" + ), # For cloud users on, at the top of the overview page + ) consumer = await memphis.consumer( station_name="test_station", consumer_name="consumer", ) - messages: list[Message] = await consumer.fetch() # Type-hint the return here for LSP integration - + messages: list[ + Message + ] = await consumer.fetch() # Type-hint the return here for LSP integration + for consumed_message in messages: - msg_data = json.loads(consumed_message.get_data()) + _msg_data = json.loads(consumed_message.get_data()) # Do something with the message data @@ -35,5 +40,6 @@ async def main(): finally: await memphis.close() -if __name__ == '__main__': - asyncio.run(main()) \ No newline at end of file + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/producer.py b/examples/producer.py index f646833..70411c1 100644 --- a/examples/producer.py +++ b/examples/producer.py @@ -1,38 +1,36 @@ -from memphis import Memphis -from memphis.message import Message import asyncio import os +from memphis import Memphis + async def main(): try: # Connecting to the broker memphis = Memphis() - + await memphis.connect( - host = "aws-us-east-1.cloud.memphis.dev", - username = "test_user", - password = os.environ.get("memphis_pass"), - account_id = os.environ.get("memphis_account_id") # For cloud users on, at the top of the overview page - ) + host="aws-us-east-1.cloud.memphis.dev", + username="test_user", + password=os.environ.get("memphis_pass"), + account_id=os.environ.get( + "memphis_account_id" + ), # For cloud users on, at the top of the overview page + ) # Creating a producer and producing a message. You can also use the memphis.producer function producer = await memphis.producer( - station_name = "test_station", # Matches the station name in memphis cloud - producer_name = "producer" + station_name="test_station", # Matches the station name in memphis cloud + producer_name="producer", ) for i in range(10): - await producer.produce( - message={ - "id": i, - "chocolates_to_eat": 3 - } - ) + await producer.produce(message={"id": i, "chocolates_to_eat": 3}) except Exception as e: print(e) finally: await memphis.close() -if __name__ == '__main__': - asyncio.run(main()) \ No newline at end of file + +if __name__ == "__main__": + asyncio.run(main()) From db5dbadb3190d7517c10c278a38afe1998b4a278 Mon Sep 17 00:00:00 2001 From: John Peters Date: Sat, 30 Dec 2023 14:34:34 -0600 Subject: [PATCH 11/16] "fixed all linting/formatting issues" --- examples/consumer.py | 11 +++++++++-- examples/producer.py | 14 +++++++++++--- 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/examples/consumer.py b/examples/consumer.py index 523d39a..14803d0 100644 --- a/examples/consumer.py +++ b/examples/consumer.py @@ -1,11 +1,18 @@ +""" +An example consumer for the Memphis.dev python SDK. +""" + import asyncio import os import json -from memphis import Memphis +from memphis import Memphis, MemphisConnectError, MemphisError from memphis.message import Message async def main(): + """ + Async main function used for the asyncio runtime. + """ try: # Connecting to the broker memphis = Memphis() @@ -35,7 +42,7 @@ async def main(): await consumed_message.ack() - except Exception as e: + except (MemphisError, MemphisConnectError) as e: print(e) finally: await memphis.close() diff --git a/examples/producer.py b/examples/producer.py index 70411c1..1e90140 100644 --- a/examples/producer.py +++ b/examples/producer.py @@ -1,9 +1,16 @@ +""" +An example producer for the Memphis.dev python SDK. +""" + import asyncio import os -from memphis import Memphis +from memphis import Memphis, MemphisConnectError, MemphisError async def main(): + """ + Async main function used for the asyncio runtime. + """ try: # Connecting to the broker memphis = Memphis() @@ -17,7 +24,8 @@ async def main(): ), # For cloud users on, at the top of the overview page ) - # Creating a producer and producing a message. You can also use the memphis.producer function + # Creating a producer and producing a message. + # You can also use the memphis.producer function producer = await memphis.producer( station_name="test_station", # Matches the station name in memphis cloud producer_name="producer", @@ -26,7 +34,7 @@ async def main(): for i in range(10): await producer.produce(message={"id": i, "chocolates_to_eat": 3}) - except Exception as e: + except (MemphisError, MemphisConnectError) as e: print(e) finally: await memphis.close() From 1e10c8b148cf6f4f2f68a8722c352755ba9ee525 Mon Sep 17 00:00:00 2001 From: John Peters Date: Tue, 2 Jan 2024 09:43:10 -0600 Subject: [PATCH 12/16] "Changed params to generic params" --- README.md | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index 7256e65..253723c 100644 --- a/README.md +++ b/README.md @@ -64,10 +64,10 @@ from memphis import Memphis memphis = Memphis() await memphis.connect( - host = "aws-us-east-1.cloud.memphis.dev", - username = "test_user", - password = os.environ.get("memphis_pass"), - account_id = os.environ.get("memphis_account_id") # For cloud users, at the top of the overview page + host = "", + username = "", + password = "", + account_id = # For cloud users, at the top of the overview page ) ``` @@ -75,8 +75,8 @@ Then, to produce a message, call the `memphis.produce` function or create a prod ```python await memphis.produce( - station_name="test_station", - producer_name="producer", + station_name="", + producer_name="", message={ "id": i, "chocolates_to_eat": 3 @@ -90,8 +90,8 @@ Lastly, to consume this message, call the `memphis.fetch_messages` function or c from memphis.message import Message messages: list[Message] = await memphis.fetch_messages( - station_name="test_station", - consumer_name="consumer", + station_name="", + consumer_name="", ) # Type-hint the return here for LSP integration for consumed_message in messages: From 00131379b517b57aed891a853e4ac6c871e73a14 Mon Sep 17 00:00:00 2001 From: John Peters Date: Thu, 4 Jan 2024 13:40:59 -0600 Subject: [PATCH 13/16] "Made examples use generic names" --- examples/consumer.py | 14 ++++++-------- examples/producer.py | 14 ++++++-------- 2 files changed, 12 insertions(+), 16 deletions(-) diff --git a/examples/consumer.py b/examples/consumer.py index 14803d0..ed40d3d 100644 --- a/examples/consumer.py +++ b/examples/consumer.py @@ -18,17 +18,15 @@ async def main(): memphis = Memphis() await memphis.connect( - host="aws-us-east-1.cloud.memphis.dev", - username="test_user", - password=os.environ.get("memphis_pass"), - account_id=os.environ.get( - "memphis_account_id" - ), # For cloud users on, at the top of the overview page + host="", + username="", + password="", + account_id= , # For cloud users on, at the top of the overview page ) consumer = await memphis.consumer( - station_name="test_station", - consumer_name="consumer", + station_name="", + consumer_name="", ) messages: list[ diff --git a/examples/producer.py b/examples/producer.py index 1e90140..661a272 100644 --- a/examples/producer.py +++ b/examples/producer.py @@ -16,19 +16,17 @@ async def main(): memphis = Memphis() await memphis.connect( - host="aws-us-east-1.cloud.memphis.dev", - username="test_user", - password=os.environ.get("memphis_pass"), - account_id=os.environ.get( - "memphis_account_id" - ), # For cloud users on, at the top of the overview page + host="", + username="", + password="", + account_id= , # For cloud users on, at the top of the overview page ) # Creating a producer and producing a message. # You can also use the memphis.producer function producer = await memphis.producer( - station_name="test_station", # Matches the station name in memphis cloud - producer_name="producer", + station_name="", # Matches the station name in memphis cloud + producer_name="", ) for i in range(10): From ff857ef880663da87965b606a7d5845f76d96097 Mon Sep 17 00:00:00 2001 From: John Peters Date: Tue, 9 Jan 2024 15:34:49 -0600 Subject: [PATCH 14/16] "Added while true loop" --- examples/consumer.py | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/examples/consumer.py b/examples/consumer.py index ed40d3d..2aa4fa6 100644 --- a/examples/consumer.py +++ b/examples/consumer.py @@ -21,7 +21,7 @@ async def main(): host="", username="", password="", - account_id= , # For cloud users on, at the top of the overview page + account_id=, # For cloud users on, at the top of the overview page ) consumer = await memphis.consumer( @@ -29,21 +29,26 @@ async def main(): consumer_name="", ) - messages: list[ - Message - ] = await consumer.fetch() # Type-hint the return here for LSP integration + while True: + messages: list[ + Message + ] = await consumer.fetch() # Type-hint the return here for LSP integration - for consumed_message in messages: - _msg_data = json.loads(consumed_message.get_data()) + if len(messages) == 0: + continue - # Do something with the message data + for consumed_message in messages: + msg_data = json.loads(consumed_message.get_data()) - await consumed_message.ack() + # Do something with the message data + print(msg_data) + await consumed_message.ack() - except (MemphisError, MemphisConnectError) as e: + except Exception as e: print(e) finally: - await memphis.close() + if memphis != None: + await memphis.close() if __name__ == "__main__": From a611076a2d5fafb7e1c36911c3176adc144debc7 Mon Sep 17 00:00:00 2001 From: John Peters Date: Sat, 13 Jan 2024 13:26:21 -0600 Subject: [PATCH 15/16] "Added comment on accountID so build completes" --- examples/consumer.py | 2 +- examples/producer.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/consumer.py b/examples/consumer.py index 2aa4fa6..dd38756 100644 --- a/examples/consumer.py +++ b/examples/consumer.py @@ -21,7 +21,7 @@ async def main(): host="", username="", password="", - account_id=, # For cloud users on, at the top of the overview page + # account_id=, # For cloud users on, at the top of the overview page ) consumer = await memphis.consumer( diff --git a/examples/producer.py b/examples/producer.py index 661a272..997beeb 100644 --- a/examples/producer.py +++ b/examples/producer.py @@ -19,7 +19,7 @@ async def main(): host="", username="", password="", - account_id= , # For cloud users on, at the top of the overview page + # account_id= , # For cloud users on, at the top of the overview page ) # Creating a producer and producing a message. From 4b9f5b210ee05d081e7c73207f0354356a42bffe Mon Sep 17 00:00:00 2001 From: John Peters Date: Sat, 13 Jan 2024 13:28:15 -0600 Subject: [PATCH 16/16] "removed unsused imports" --- examples/consumer.py | 3 +-- examples/producer.py | 1 - 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/examples/consumer.py b/examples/consumer.py index dd38756..92a4b9e 100644 --- a/examples/consumer.py +++ b/examples/consumer.py @@ -3,9 +3,8 @@ """ import asyncio -import os import json -from memphis import Memphis, MemphisConnectError, MemphisError +from memphis import Memphis from memphis.message import Message diff --git a/examples/producer.py b/examples/producer.py index 997beeb..9b0b2b9 100644 --- a/examples/producer.py +++ b/examples/producer.py @@ -3,7 +3,6 @@ """ import asyncio -import os from memphis import Memphis, MemphisConnectError, MemphisError