Skip to content

Commit 481857a

Browse files
authored
Restore the pubsub sample for Connect One Device service (#656)
* restore the mqtt3 sample for Connect One Device service * update temp sample to use mqtt5 * add ca_file option
1 parent c95dab8 commit 481857a

File tree

2 files changed

+256
-0
lines changed

2 files changed

+256
-0
lines changed

samples/pubsub.md

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
# PubSub
2+
3+
[**Return to main sample list**](./README.md)
4+
5+
This sample uses the
6+
[Message Broker](https://docs.aws.amazon.com/iot/latest/developerguide/iot-message-broker.html)
7+
for AWS IoT to send and receive messages through an MQTT connection.
8+
9+
On startup, the device connects to the server, subscribes to a topic, and begins publishing messages to that topic. The device should receive those same messages back from the message broker, since it is subscribed to that same topic. Status updates are continually printed to the console. This sample demonstrates how to send and receive messages on designated IoT Core topics, an essential task that is the backbone of many IoT applications that need to send data over the internet. This sample simply subscribes and publishes to a topic, printing the messages it just sent as it is received from AWS IoT Core, but this can be used as a reference point for more complex Pub-Sub applications.
10+
11+
Your IoT Core Thing's [Policy](https://docs.aws.amazon.com/iot/latest/developerguide/iot-policies.html) must provide privileges for this sample to connect, subscribe, publish, and receive. Below is a sample policy that can be used on your IoT Core Thing that will allow this sample to run as intended.
12+
13+
<details>
14+
<summary>(see sample policy)</summary>
15+
<pre>
16+
{
17+
"Version": "2012-10-17",
18+
"Statement": [
19+
{
20+
"Effect": "Allow",
21+
"Action": [
22+
"iot:Publish",
23+
"iot:Receive"
24+
],
25+
"Resource": [
26+
"arn:aws:iot:<b>region</b>:<b>account</b>:topic/test/topic"
27+
]
28+
},
29+
{
30+
"Effect": "Allow",
31+
"Action": [
32+
"iot:Subscribe"
33+
],
34+
"Resource": [
35+
"arn:aws:iot:<b>region</b>:<b>account</b>:topicfilter/test/topic"
36+
]
37+
},
38+
{
39+
"Effect": "Allow",
40+
"Action": [
41+
"iot:Connect"
42+
],
43+
"Resource": [
44+
"arn:aws:iot:<b>region</b>:<b>account</b>:client/test-*"
45+
]
46+
}
47+
]
48+
}
49+
</pre>
50+
51+
Replace with the following with the data from your AWS account:
52+
* `<region>`: The AWS IoT Core region where you created your AWS IoT Core thing you wish to use with this sample. For example `us-east-1`.
53+
* `<account>`: Your AWS IoT Core account ID. This is the set of numbers in the top right next to your AWS account name when using the AWS IoT Core website.
54+
55+
Note that in a real application, you may want to avoid the use of wildcards in your ClientID or use them selectively. Please follow best practices when working with AWS on production applications using the SDK. Also, for the purposes of this sample, please make sure your policy allows a client ID of `test-*` to connect or use `--client_id <client ID here>` to send the client ID your policy supports.
56+
57+
</details>
58+
59+
## How to run
60+
61+
To Run this sample from the `samples` folder, use the following command:
62+
63+
```sh
64+
# For Windows: replace 'python3' with 'python' and '/' with '\'
65+
python3 pubsub.py --endpoint <endpoint> --cert <file> --key <file>
66+
```
67+
68+
You can also pass a Certificate Authority file (CA) if your certificate and key combination requires it:
69+
70+
```sh
71+
# For Windows: replace 'python3' with 'python' and '/' with '\'
72+
python3 pubsub.py --endpoint <endpoint> --cert <file> --key <file> --ca_file <file>
73+
```

samples/pubsub.py

Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
# SPDX-License-Identifier: Apache-2.0.
3+
4+
from awsiot import mqtt5_client_builder
5+
from awscrt import mqtt5
6+
import threading
7+
import time
8+
# This sample uses the Message Broker for AWS IoT to send and receive messages
9+
# through an MQTT connection. On startup, the device connects to the server,
10+
# subscribes to a topic, and begins publishing messages to that topic.
11+
# The device should receive those same messages back from the message broker,
12+
# since it is subscribed to that same topic.
13+
14+
# --------------------------------- ARGUMENT PARSING -----------------------------------------
15+
import argparse
16+
import uuid
17+
18+
parser = argparse.ArgumentParser(
19+
description="MQTT5 X509 Sample (mTLS)",
20+
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
21+
)
22+
required = parser.add_argument_group("required arguments")
23+
optional = parser.add_argument_group("optional arguments")
24+
25+
# Required Arguments
26+
required.add_argument("--endpoint", required=True, metavar="", dest="input_endpoint",
27+
help="IoT endpoint hostname")
28+
required.add_argument("--cert", required=True, metavar="", dest="input_cert",
29+
help="Path to the certificate file to use during mTLS connection establishment")
30+
required.add_argument("--key", required=True, metavar="", dest="input_key",
31+
help="Path to the private key file to use during mTLS connection establishment")
32+
33+
# Optional Arguments
34+
optional.add_argument("--client_id", metavar="", dest="input_clientId", default=f"mqtt5-sample-{uuid.uuid4().hex[:8]}",
35+
help="Client ID")
36+
optional.add_argument("--topic", metavar="", default="test/topic", dest="input_topic",
37+
help="Topic")
38+
optional.add_argument("--message", metavar="", default="Hello from mqtt5 sample", dest="input_message",
39+
help="Message payload")
40+
optional.add_argument("--count", type=int, metavar="", default=5, dest="input_count",
41+
help="Messages to publish (0 = infinite)")
42+
optional.add_argument("--ca_file", metavar="", dest="input_ca", default=None,
43+
help="Path to root CA file")
44+
45+
# args contains all the parsed commandline arguments used by the sample
46+
args = parser.parse_args()
47+
# --------------------------------- ARGUMENT PARSING END -----------------------------------------
48+
49+
TIMEOUT = 100
50+
message_count = args.input_count
51+
message_topic = args.input_topic
52+
message_string = args.input_message
53+
# Events used within callbacks to progress sample
54+
connection_success_event = threading.Event()
55+
stopped_event = threading.Event()
56+
received_all_event = threading.Event()
57+
received_count = 0
58+
59+
60+
# Callback when any publish is received
61+
def on_publish_received(publish_packet_data):
62+
publish_packet = publish_packet_data.publish_packet
63+
print("==== Received message from topic '{}': {} ====\n".format(
64+
publish_packet.topic, publish_packet.payload.decode('utf-8')))
65+
66+
# Track number of publishes received
67+
global received_count
68+
received_count += 1
69+
if received_count == args.input_count:
70+
received_all_event.set()
71+
72+
73+
# Callback for the lifecycle event Stopped
74+
def on_lifecycle_stopped(lifecycle_stopped_data: mqtt5.LifecycleStoppedData):
75+
print("Lifecycle Stopped\n")
76+
stopped_event.set()
77+
78+
79+
# Callback for lifecycle event Attempting Connect
80+
def on_lifecycle_attempting_connect(lifecycle_attempting_connect_data: mqtt5.LifecycleAttemptingConnectData):
81+
print("Lifecycle Connection Attempt\nConnecting to endpoint: '{}' with client ID'{}'".format(
82+
args.input_endpoint, args.input_clientId))
83+
84+
85+
# Callback for the lifecycle event Connection Success
86+
def on_lifecycle_connection_success(lifecycle_connect_success_data: mqtt5.LifecycleConnectSuccessData):
87+
connack_packet = lifecycle_connect_success_data.connack_packet
88+
print("Lifecycle Connection Success with reason code:{}\n".format(
89+
repr(connack_packet.reason_code)))
90+
connection_success_event.set()
91+
92+
93+
# Callback for the lifecycle event Connection Failure
94+
def on_lifecycle_connection_failure(lifecycle_connection_failure: mqtt5.LifecycleConnectFailureData):
95+
print("Lifecycle Connection Failure with exception:{}".format(
96+
lifecycle_connection_failure.exception))
97+
98+
99+
# Callback for the lifecycle event Disconnection
100+
def on_lifecycle_disconnection(lifecycle_disconnect_data: mqtt5.LifecycleDisconnectData):
101+
print("Lifecycle Disconnected with reason code:{}".format(
102+
lifecycle_disconnect_data.disconnect_packet.reason_code if lifecycle_disconnect_data.disconnect_packet else "None"))
103+
104+
105+
if __name__ == '__main__':
106+
print("\nStarting MQTT5 X509 PubSub Sample\n")
107+
message_count = args.input_count
108+
message_topic = args.input_topic
109+
message_string = args.input_message
110+
111+
# Create MQTT5 client using mutual TLS via X509 Certificate and Private Key
112+
print("==== Creating MQTT5 Client ====\n")
113+
client = mqtt5_client_builder.mtls_from_path(
114+
endpoint=args.input_endpoint,
115+
cert_filepath=args.input_cert,
116+
pri_key_filepath=args.input_key,
117+
on_publish_received=on_publish_received,
118+
on_lifecycle_stopped=on_lifecycle_stopped,
119+
on_lifecycle_attempting_connect=on_lifecycle_attempting_connect,
120+
on_lifecycle_connection_success=on_lifecycle_connection_success,
121+
on_lifecycle_connection_failure=on_lifecycle_connection_failure,
122+
on_lifecycle_disconnection=on_lifecycle_disconnection,
123+
client_id=args.input_clientId,
124+
ca_filepath=args.input_ca)
125+
126+
# Start the client, instructing the client to desire a connected state. The client will try to
127+
# establish a connection with the provided settings. If the client is disconnected while in this
128+
# state it will attempt to reconnect automatically.
129+
print("==== Starting client ====")
130+
client.start()
131+
132+
# We await the `on_lifecycle_connection_success` callback to be invoked.
133+
if not connection_success_event.wait(TIMEOUT):
134+
raise TimeoutError("Connection timeout")
135+
136+
# Subscribe
137+
print("==== Subscribing to topic '{}' ====".format(message_topic))
138+
subscribe_future = client.subscribe(subscribe_packet=mqtt5.SubscribePacket(
139+
subscriptions=[mqtt5.Subscription(
140+
topic_filter=message_topic,
141+
qos=mqtt5.QoS.AT_LEAST_ONCE)]
142+
))
143+
suback = subscribe_future.result(TIMEOUT)
144+
print("Suback received with reason code:{}\n".format(suback.reason_codes))
145+
146+
# Publish
147+
if message_count == 0:
148+
print("==== Sending messages until program killed ====\n")
149+
else:
150+
print("==== Sending {} message(s) ====\n".format(message_count))
151+
152+
publish_count = 1
153+
while (publish_count <= message_count) or (message_count == 0):
154+
message = f"{message_string} [{publish_count}]"
155+
print(f"Publishing message to topic '{message_topic}': {message}")
156+
publish_future = client.publish(mqtt5.PublishPacket(
157+
topic=message_topic,
158+
payload=message,
159+
qos=mqtt5.QoS.AT_LEAST_ONCE
160+
))
161+
publish_completion_data = publish_future.result(TIMEOUT)
162+
print("PubAck received with {}\n".format(repr(publish_completion_data.puback.reason_code)))
163+
time.sleep(1.5)
164+
publish_count += 1
165+
166+
received_all_event.wait(TIMEOUT)
167+
print("{} message(s) received.\n".format(received_count))
168+
169+
# Unsubscribe
170+
print("==== Unsubscribing from topic '{}' ====".format(message_topic))
171+
unsubscribe_future = client.unsubscribe(unsubscribe_packet=mqtt5.UnsubscribePacket(
172+
topic_filters=[message_topic]))
173+
unsuback = unsubscribe_future.result(TIMEOUT)
174+
print("Unsubscribed with {}\n".format(unsuback.reason_codes))
175+
176+
# Stop the client. Instructs the client to disconnect and remain in a disconnected state.
177+
print("==== Stopping Client ====")
178+
client.stop()
179+
180+
if not stopped_event.wait(TIMEOUT):
181+
raise TimeoutError("Stop timeout")
182+
183+
print("==== Client Stopped! ====")

0 commit comments

Comments
 (0)