|
| 1 | +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. |
| 2 | +# SPDX-License-Identifier: Apache-2.0. |
| 3 | + |
| 4 | +from awscrt import mqtt, http |
| 5 | +from awsiot import mqtt_connection_builder |
| 6 | +import sys |
| 7 | +import threading |
| 8 | +import time |
| 9 | +import json |
| 10 | + |
| 11 | +# This sample uses the Message Broker for AWS IoT to send and receive messages |
| 12 | +# through an MQTT connection. On startup, the device connects to the server, |
| 13 | +# subscribes to a topic, and begins publishing messages to that topic. |
| 14 | +# The device should receive those same messages back from the message broker, |
| 15 | +# since it is subscribed to that same topic. |
| 16 | + |
| 17 | +# --------------------------------- ARGUMENT PARSING ----------------------------------------- |
| 18 | +import argparse |
| 19 | +import uuid |
| 20 | + |
| 21 | +parser = argparse.ArgumentParser( |
| 22 | + description="PubSub Sample (MQTT3)", |
| 23 | + formatter_class=argparse.ArgumentDefaultsHelpFormatter, |
| 24 | +) |
| 25 | +required = parser.add_argument_group("required arguments") |
| 26 | +optional = parser.add_argument_group("optional arguments") |
| 27 | + |
| 28 | +# Required Arguments |
| 29 | +required.add_argument("--endpoint", required=True, metavar="", dest="input_endpoint", |
| 30 | + help="IoT endpoint hostname") |
| 31 | +required.add_argument("--cert", required=True, metavar="", dest="input_cert", |
| 32 | + help="Path to the certificate file to use during mTLS connection establishment") |
| 33 | +required.add_argument("--key", required=True, metavar="", dest="input_key", |
| 34 | + help="Path to the private key file to use during mTLS connection establishment") |
| 35 | + |
| 36 | +# Optional Arguments |
| 37 | +optional.add_argument("--client_id", metavar="", dest="input_clientId", default=f"pubsub-sample-{uuid.uuid4().hex[:8]}", |
| 38 | + help="Client ID") |
| 39 | +optional.add_argument("--topic", metavar="", default="test/topic", dest="input_topic", |
| 40 | + help="Topic") |
| 41 | +optional.add_argument("--message", metavar="", default="Hello from pubsub sample", dest="input_message", |
| 42 | + help="Message payload") |
| 43 | +optional.add_argument("--count", type=int, metavar="", default=10, dest="input_count", |
| 44 | + help="Messages to publish (0 = infinite)") |
| 45 | +optional.add_argument("--ca_file", metavar="", dest="input_ca", |
| 46 | + help="Path to root CA file") |
| 47 | +optional.add_argument("--port", type=int, metavar="", default=8883, dest="input_port", |
| 48 | + help="Connection port") |
| 49 | +optional.add_argument("--proxy_host", metavar="", dest="input_proxy_host", |
| 50 | + help="Proxy hostname") |
| 51 | +optional.add_argument("--proxy_port", type=int, metavar="", default=0, dest="input_proxy_port", |
| 52 | + help="Proxy port") |
| 53 | +optional.add_argument("--is_ci", action="store_true", dest="input_is_ci", |
| 54 | + help="CI mode (suppress some output)") |
| 55 | + |
| 56 | +# args contains all the parsed commandline arguments used by the sample |
| 57 | +args = parser.parse_args() |
| 58 | +# --------------------------------- ARGUMENT PARSING END ----------------------------------------- |
| 59 | + |
| 60 | +received_count = 0 |
| 61 | +received_all_event = threading.Event() |
| 62 | + |
| 63 | +# Callback when connection is accidentally lost. |
| 64 | + |
| 65 | + |
| 66 | +def on_connection_interrupted(connection, error, **kwargs): |
| 67 | + print("Connection interrupted. error: {}".format(error)) |
| 68 | + |
| 69 | + |
| 70 | +# Callback when an interrupted connection is re-established. |
| 71 | +def on_connection_resumed(connection, return_code, session_present, **kwargs): |
| 72 | + print("Connection resumed. return_code: {} session_present: {}".format(return_code, session_present)) |
| 73 | + |
| 74 | + if return_code == mqtt.ConnectReturnCode.ACCEPTED and not session_present: |
| 75 | + print("Session did not persist. Resubscribing to existing topics...") |
| 76 | + resubscribe_future, _ = connection.resubscribe_existing_topics() |
| 77 | + |
| 78 | + # Cannot synchronously wait for resubscribe result because we're on the connection's event-loop thread, |
| 79 | + # evaluate result with a callback instead. |
| 80 | + resubscribe_future.add_done_callback(on_resubscribe_complete) |
| 81 | + |
| 82 | + |
| 83 | +def on_resubscribe_complete(resubscribe_future): |
| 84 | + resubscribe_results = resubscribe_future.result() |
| 85 | + print("Resubscribe results: {}".format(resubscribe_results)) |
| 86 | + |
| 87 | + for topic, qos in resubscribe_results['topics']: |
| 88 | + if qos is None: |
| 89 | + sys.exit("Server rejected resubscribe to topic: {}".format(topic)) |
| 90 | + |
| 91 | + |
| 92 | +# Callback when the subscribed topic receives a message |
| 93 | +def on_message_received(topic, payload, dup, qos, retain, **kwargs): |
| 94 | + print("Received message from topic '{}': {}".format(topic, payload)) |
| 95 | + global received_count |
| 96 | + received_count += 1 |
| 97 | + if received_count == args.input_count: |
| 98 | + received_all_event.set() |
| 99 | + |
| 100 | +# Callback when the connection successfully connects |
| 101 | + |
| 102 | + |
| 103 | +def on_connection_success(connection, callback_data): |
| 104 | + assert isinstance(callback_data, mqtt.OnConnectionSuccessData) |
| 105 | + print("Connection Successful with return code: {} session present: {}".format( |
| 106 | + callback_data.return_code, callback_data.session_present)) |
| 107 | + |
| 108 | +# Callback when a connection attempt fails |
| 109 | + |
| 110 | + |
| 111 | +def on_connection_failure(connection, callback_data): |
| 112 | + assert isinstance(callback_data, mqtt.OnConnectionFailureData) |
| 113 | + print("Connection failed with error code: {}".format(callback_data.error)) |
| 114 | + |
| 115 | +# Callback when a connection has been disconnected or shutdown successfully |
| 116 | + |
| 117 | + |
| 118 | +def on_connection_closed(connection, callback_data): |
| 119 | + print("Connection closed") |
| 120 | + |
| 121 | + |
| 122 | +if __name__ == '__main__': |
| 123 | + # Create the proxy options if the data is present in args |
| 124 | + proxy_options = None |
| 125 | + if args.input_proxy_host is not None and args.input_proxy_port != 0: |
| 126 | + proxy_options = http.HttpProxyOptions( |
| 127 | + host_name=args.input_proxy_host, |
| 128 | + port=args.input_proxy_port) |
| 129 | + |
| 130 | + # Create a MQTT connection from the command line data |
| 131 | + mqtt_connection = mqtt_connection_builder.mtls_from_path( |
| 132 | + endpoint=args.input_endpoint, |
| 133 | + port=args.input_port, |
| 134 | + cert_filepath=args.input_cert, |
| 135 | + pri_key_filepath=args.input_key, |
| 136 | + ca_filepath=args.input_ca, |
| 137 | + on_connection_interrupted=on_connection_interrupted, |
| 138 | + on_connection_resumed=on_connection_resumed, |
| 139 | + client_id=args.input_clientId, |
| 140 | + clean_session=False, |
| 141 | + keep_alive_secs=30, |
| 142 | + http_proxy_options=proxy_options, |
| 143 | + on_connection_success=on_connection_success, |
| 144 | + on_connection_failure=on_connection_failure, |
| 145 | + on_connection_closed=on_connection_closed) |
| 146 | + |
| 147 | + if not args.input_is_ci: |
| 148 | + print(f"Connecting to {args.input_endpoint} with client ID '{args.input_clientId}'...") |
| 149 | + else: |
| 150 | + print("Connecting to endpoint with client ID") |
| 151 | + connect_future = mqtt_connection.connect() |
| 152 | + |
| 153 | + # Future.result() waits until a result is available |
| 154 | + connect_future.result() |
| 155 | + print("Connected!") |
| 156 | + |
| 157 | + message_count = args.input_count |
| 158 | + message_topic = args.input_topic |
| 159 | + message_string = args.input_message |
| 160 | + |
| 161 | + # Subscribe |
| 162 | + print("Subscribing to topic '{}'...".format(message_topic)) |
| 163 | + subscribe_future, packet_id = mqtt_connection.subscribe( |
| 164 | + topic=message_topic, |
| 165 | + qos=mqtt.QoS.AT_LEAST_ONCE, |
| 166 | + callback=on_message_received) |
| 167 | + |
| 168 | + subscribe_result = subscribe_future.result() |
| 169 | + print("Subscribed with {}".format(str(subscribe_result['qos']))) |
| 170 | + |
| 171 | + # Publish message to server desired number of times. |
| 172 | + # This step is skipped if message is blank. |
| 173 | + # This step loops forever if count was set to 0. |
| 174 | + if message_string: |
| 175 | + if message_count == 0: |
| 176 | + print("Sending messages until program killed") |
| 177 | + else: |
| 178 | + print("Sending {} message(s)".format(message_count)) |
| 179 | + |
| 180 | + publish_count = 1 |
| 181 | + while (publish_count <= message_count) or (message_count == 0): |
| 182 | + message = "{} [{}]".format(message_string, publish_count) |
| 183 | + print("Publishing message to topic '{}': {}".format(message_topic, message)) |
| 184 | + message_json = json.dumps(message) |
| 185 | + mqtt_connection.publish( |
| 186 | + topic=message_topic, |
| 187 | + payload=message_json, |
| 188 | + qos=mqtt.QoS.AT_LEAST_ONCE) |
| 189 | + time.sleep(1) |
| 190 | + publish_count += 1 |
| 191 | + |
| 192 | + # Wait for all messages to be received. |
| 193 | + # This waits forever if count was set to 0. |
| 194 | + if message_count != 0 and not received_all_event.is_set(): |
| 195 | + print("Waiting for all messages to be received...") |
| 196 | + |
| 197 | + received_all_event.wait() |
| 198 | + print("{} message(s) received.".format(received_count)) |
| 199 | + |
| 200 | + # Disconnect |
| 201 | + print("Disconnecting...") |
| 202 | + disconnect_future = mqtt_connection.disconnect() |
| 203 | + disconnect_future.result() |
| 204 | + print("Disconnected!") |
0 commit comments