Skip to content

Commit c09936f

Browse files
committed
update temp sample to use mqtt5
1 parent 8071941 commit c09936f

File tree

1 file changed

+124
-143
lines changed

1 file changed

+124
-143
lines changed

samples/pubsub.py

Lines changed: 124 additions & 143 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,9 @@
11
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
22
# SPDX-License-Identifier: Apache-2.0.
33

4-
from awscrt import mqtt, http
5-
from awsiot import mqtt_connection_builder
6-
import sys
7-
import threading
8-
import time
9-
import json
10-
4+
from awsiot import mqtt5_client_builder
5+
from awscrt import mqtt5
6+
import threading, time
117
# This sample uses the Message Broker for AWS IoT to send and receive messages
128
# through an MQTT connection. On startup, the device connects to the server,
139
# subscribes to a topic, and begins publishing messages to that topic.
@@ -19,7 +15,7 @@
1915
import uuid
2016

2117
parser = argparse.ArgumentParser(
22-
description="PubSub Sample (MQTT3)",
18+
description="MQTT5 X509 Sample (mTLS)",
2319
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
2420
)
2521
required = parser.add_argument_group("required arguments")
@@ -29,176 +25,161 @@
2925
required.add_argument("--endpoint", required=True, metavar="", dest="input_endpoint",
3026
help="IoT endpoint hostname")
3127
required.add_argument("--cert", required=True, metavar="", dest="input_cert",
32-
help="Path to the certificate file to use during mTLS connection establishment")
28+
help="Path to the certificate file to use during mTLS connection establishment")
3329
required.add_argument("--key", required=True, metavar="", dest="input_key",
34-
help="Path to the private key file to use during mTLS connection establishment")
30+
help="Path to the private key file to use during mTLS connection establishment")
3531

3632
# Optional Arguments
37-
optional.add_argument("--client_id", metavar="", dest="input_clientId", default=f"pubsub-sample-{uuid.uuid4().hex[:8]}",
33+
optional.add_argument("--client_id", metavar="",dest="input_clientId", default=f"mqtt5-sample-{uuid.uuid4().hex[:8]}",
3834
help="Client ID")
39-
optional.add_argument("--topic", metavar="", default="test/topic", dest="input_topic",
35+
optional.add_argument("--topic", metavar="",default="test/topic", dest="input_topic",
4036
help="Topic")
41-
optional.add_argument("--message", metavar="", default="Hello from pubsub sample", dest="input_message",
37+
optional.add_argument("--message", metavar="",default="Hello from mqtt5 sample", dest="input_message",
4238
help="Message payload")
43-
optional.add_argument("--count", type=int, metavar="", default=10, dest="input_count",
39+
optional.add_argument("--count", type=int, metavar="",default=5, dest="input_count",
4440
help="Messages to publish (0 = infinite)")
4541
optional.add_argument("--ca_file", metavar="", dest="input_ca",
4642
help="Path to root CA file")
47-
optional.add_argument("--port", type=int, metavar="", default=8883, dest="input_port",
48-
help="Connection port")
49-
optional.add_argument("--proxy_host", metavar="", dest="input_proxy_host",
50-
help="Proxy hostname")
51-
optional.add_argument("--proxy_port", type=int, metavar="", default=0, dest="input_proxy_port",
52-
help="Proxy port")
53-
optional.add_argument("--is_ci", action="store_true", dest="input_is_ci",
54-
help="CI mode (suppress some output)")
5543

5644
# args contains all the parsed commandline arguments used by the sample
5745
args = parser.parse_args()
5846
# --------------------------------- ARGUMENT PARSING END -----------------------------------------
5947

60-
received_count = 0
48+
TIMEOUT = 100
49+
message_count = args.input_count
50+
message_topic = args.input_topic
51+
message_string = args.input_message
52+
# Events used within callbacks to progress sample
53+
connection_success_event = threading.Event()
54+
stopped_event = threading.Event()
6155
received_all_event = threading.Event()
56+
received_count = 0
6257

63-
# Callback when connection is accidentally lost.
64-
65-
66-
def on_connection_interrupted(connection, error, **kwargs):
67-
print("Connection interrupted. error: {}".format(error))
68-
69-
70-
# Callback when an interrupted connection is re-established.
71-
def on_connection_resumed(connection, return_code, session_present, **kwargs):
72-
print("Connection resumed. return_code: {} session_present: {}".format(return_code, session_present))
73-
74-
if return_code == mqtt.ConnectReturnCode.ACCEPTED and not session_present:
75-
print("Session did not persist. Resubscribing to existing topics...")
76-
resubscribe_future, _ = connection.resubscribe_existing_topics()
77-
78-
# Cannot synchronously wait for resubscribe result because we're on the connection's event-loop thread,
79-
# evaluate result with a callback instead.
80-
resubscribe_future.add_done_callback(on_resubscribe_complete)
81-
82-
83-
def on_resubscribe_complete(resubscribe_future):
84-
resubscribe_results = resubscribe_future.result()
85-
print("Resubscribe results: {}".format(resubscribe_results))
86-
87-
for topic, qos in resubscribe_results['topics']:
88-
if qos is None:
89-
sys.exit("Server rejected resubscribe to topic: {}".format(topic))
9058

59+
# Callback when any publish is received
60+
def on_publish_received(publish_packet_data):
61+
publish_packet = publish_packet_data.publish_packet
62+
print("==== Received message from topic '{}': {} ====\n".format(
63+
publish_packet.topic, publish_packet.payload.decode('utf-8')))
9164

92-
# Callback when the subscribed topic receives a message
93-
def on_message_received(topic, payload, dup, qos, retain, **kwargs):
94-
print("Received message from topic '{}': {}".format(topic, payload))
65+
# Track number of publishes received
9566
global received_count
9667
received_count += 1
9768
if received_count == args.input_count:
9869
received_all_event.set()
9970

100-
# Callback when the connection successfully connects
10171

72+
# Callback for the lifecycle event Stopped
73+
def on_lifecycle_stopped(lifecycle_stopped_data: mqtt5.LifecycleStoppedData):
74+
print("Lifecycle Stopped\n")
75+
stopped_event.set()
10276

103-
def on_connection_success(connection, callback_data):
104-
assert isinstance(callback_data, mqtt.OnConnectionSuccessData)
105-
print("Connection Successful with return code: {} session present: {}".format(
106-
callback_data.return_code, callback_data.session_present))
10777

108-
# Callback when a connection attempt fails
78+
# Callback for lifecycle event Attempting Connect
79+
def on_lifecycle_attempting_connect(lifecycle_attempting_connect_data: mqtt5.LifecycleAttemptingConnectData):
80+
print("Lifecycle Connection Attempt\nConnecting to endpoint: '{}' with client ID'{}'".format(
81+
args.input_endpoint, args.input_clientId))
10982

11083

111-
def on_connection_failure(connection, callback_data):
112-
assert isinstance(callback_data, mqtt.OnConnectionFailureData)
113-
print("Connection failed with error code: {}".format(callback_data.error))
84+
# Callback for the lifecycle event Connection Success
85+
def on_lifecycle_connection_success(lifecycle_connect_success_data: mqtt5.LifecycleConnectSuccessData):
86+
connack_packet = lifecycle_connect_success_data.connack_packet
87+
print("Lifecycle Connection Success with reason code:{}\n".format(
88+
repr(connack_packet.reason_code)))
89+
connection_success_event.set()
11490

115-
# Callback when a connection has been disconnected or shutdown successfully
11691

92+
# Callback for the lifecycle event Connection Failure
93+
def on_lifecycle_connection_failure(lifecycle_connection_failure: mqtt5.LifecycleConnectFailureData):
94+
print("Lifecycle Connection Failure with exception:{}".format(
95+
lifecycle_connection_failure.exception))
11796

118-
def on_connection_closed(connection, callback_data):
119-
print("Connection closed")
12097

98+
# Callback for the lifecycle event Disconnection
99+
def on_lifecycle_disconnection(lifecycle_disconnect_data: mqtt5.LifecycleDisconnectData):
100+
print("Lifecycle Disconnected with reason code:{}".format(
101+
lifecycle_disconnect_data.disconnect_packet.reason_code if lifecycle_disconnect_data.disconnect_packet else "None"))
121102

122-
if __name__ == '__main__':
123-
# Create the proxy options if the data is present in args
124-
proxy_options = None
125-
if args.input_proxy_host is not None and args.input_proxy_port != 0:
126-
proxy_options = http.HttpProxyOptions(
127-
host_name=args.input_proxy_host,
128-
port=args.input_proxy_port)
129-
130-
# Create a MQTT connection from the command line data
131-
mqtt_connection = mqtt_connection_builder.mtls_from_path(
132-
endpoint=args.input_endpoint,
133-
port=args.input_port,
134-
cert_filepath=args.input_cert,
135-
pri_key_filepath=args.input_key,
136-
ca_filepath=args.input_ca,
137-
on_connection_interrupted=on_connection_interrupted,
138-
on_connection_resumed=on_connection_resumed,
139-
client_id=args.input_clientId,
140-
clean_session=False,
141-
keep_alive_secs=30,
142-
http_proxy_options=proxy_options,
143-
on_connection_success=on_connection_success,
144-
on_connection_failure=on_connection_failure,
145-
on_connection_closed=on_connection_closed)
146-
147-
if not args.input_is_ci:
148-
print(f"Connecting to {args.input_endpoint} with client ID '{args.input_clientId}'...")
149-
else:
150-
print("Connecting to endpoint with client ID")
151-
connect_future = mqtt_connection.connect()
152-
153-
# Future.result() waits until a result is available
154-
connect_future.result()
155-
print("Connected!")
156103

104+
if __name__ == '__main__':
105+
print("\nStarting MQTT5 X509 PubSub Sample\n")
157106
message_count = args.input_count
158107
message_topic = args.input_topic
159108
message_string = args.input_message
160109

161-
# Subscribe
162-
print("Subscribing to topic '{}'...".format(message_topic))
163-
subscribe_future, packet_id = mqtt_connection.subscribe(
164-
topic=message_topic,
165-
qos=mqtt.QoS.AT_LEAST_ONCE,
166-
callback=on_message_received)
167-
168-
subscribe_result = subscribe_future.result()
169-
print("Subscribed with {}".format(str(subscribe_result['qos'])))
170-
171-
# Publish message to server desired number of times.
172-
# This step is skipped if message is blank.
173-
# This step loops forever if count was set to 0.
174-
if message_string:
175-
if message_count == 0:
176-
print("Sending messages until program killed")
177-
else:
178-
print("Sending {} message(s)".format(message_count))
179-
180-
publish_count = 1
181-
while (publish_count <= message_count) or (message_count == 0):
182-
message = "{} [{}]".format(message_string, publish_count)
183-
print("Publishing message to topic '{}': {}".format(message_topic, message))
184-
message_json = json.dumps(message)
185-
mqtt_connection.publish(
186-
topic=message_topic,
187-
payload=message_json,
188-
qos=mqtt.QoS.AT_LEAST_ONCE)
189-
time.sleep(1)
190-
publish_count += 1
191-
192-
# Wait for all messages to be received.
193-
# This waits forever if count was set to 0.
194-
if message_count != 0 and not received_all_event.is_set():
195-
print("Waiting for all messages to be received...")
196-
197-
received_all_event.wait()
198-
print("{} message(s) received.".format(received_count))
199-
200-
# Disconnect
201-
print("Disconnecting...")
202-
disconnect_future = mqtt_connection.disconnect()
203-
disconnect_future.result()
204-
print("Disconnected!")
110+
# Create MQTT5 client using mutual TLS via X509 Certificate and Private Key
111+
print("==== Creating MQTT5 Client ====\n")
112+
client = mqtt5_client_builder.mtls_from_path(
113+
endpoint=args.input_endpoint,
114+
cert_filepath=args.input_cert,
115+
pri_key_filepath=args.input_key,
116+
on_publish_received=on_publish_received,
117+
on_lifecycle_stopped=on_lifecycle_stopped,
118+
on_lifecycle_attempting_connect=on_lifecycle_attempting_connect,
119+
on_lifecycle_connection_success=on_lifecycle_connection_success,
120+
on_lifecycle_connection_failure=on_lifecycle_connection_failure,
121+
on_lifecycle_disconnection=on_lifecycle_disconnection,
122+
client_id=args.input_clientId)
123+
124+
125+
# Start the client, instructing the client to desire a connected state. The client will try to
126+
# establish a connection with the provided settings. If the client is disconnected while in this
127+
# state it will attempt to reconnect automatically.
128+
print("==== Starting client ====")
129+
client.start()
130+
131+
# We await the `on_lifecycle_connection_success` callback to be invoked.
132+
if not connection_success_event.wait(TIMEOUT):
133+
raise TimeoutError("Connection timeout")
134+
135+
136+
# Subscribe
137+
print("==== Subscribing to topic '{}' ====".format(message_topic))
138+
subscribe_future = client.subscribe(subscribe_packet=mqtt5.SubscribePacket(
139+
subscriptions=[mqtt5.Subscription(
140+
topic_filter=message_topic,
141+
qos=mqtt5.QoS.AT_LEAST_ONCE)]
142+
))
143+
suback = subscribe_future.result(TIMEOUT)
144+
print("Suback received with reason code:{}\n".format(suback.reason_codes))
145+
146+
147+
# Publish
148+
if message_count == 0:
149+
print("==== Sending messages until program killed ====\n")
150+
else:
151+
print("==== Sending {} message(s) ====\n".format(message_count))
152+
153+
publish_count = 1
154+
while (publish_count <= message_count) or (message_count == 0):
155+
message = f"{message_string} [{publish_count}]"
156+
print(f"Publishing message to topic '{message_topic}': {message}")
157+
publish_future = client.publish(mqtt5.PublishPacket(
158+
topic=message_topic,
159+
payload=message,
160+
qos=mqtt5.QoS.AT_LEAST_ONCE
161+
))
162+
publish_completion_data = publish_future.result(TIMEOUT)
163+
print("PubAck received with {}\n".format(repr(publish_completion_data.puback.reason_code)))
164+
time.sleep(1.5)
165+
publish_count += 1
166+
167+
received_all_event.wait(TIMEOUT)
168+
print("{} message(s) received.\n".format(received_count))
169+
170+
# Unsubscribe
171+
print("==== Unsubscribing from topic '{}' ====".format(message_topic))
172+
unsubscribe_future = client.unsubscribe(unsubscribe_packet=mqtt5.UnsubscribePacket(
173+
topic_filters=[message_topic]))
174+
unsuback = unsubscribe_future.result(TIMEOUT)
175+
print("Unsubscribed with {}\n".format(unsuback.reason_codes))
176+
177+
178+
# Stop the client. Instructs the client to disconnect and remain in a disconnected state.
179+
print("==== Stopping Client ====")
180+
client.stop()
181+
182+
if not stopped_event.wait(TIMEOUT):
183+
raise TimeoutError("Stop timeout")
184+
185+
print("==== Client Stopped! ====")

0 commit comments

Comments
 (0)