Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions python/destinations/MQTT/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,18 @@ You'll need to have a MQTT either locally or in the cloud
The connector uses the following environment variables:

- **input**: Name of the input topic to listen to.
- **mqtt_topic_root**: The root for messages in MQTT, this can be anything.
- **mqtt_server**: The address of your MQTT server.
- **mqtt_port**: The port of your MQTT server.
- **mqtt_username**: Username of your MQTT user.
- **mqtt_password**: Password for the MQTT user.
- **MQTT_CLIENT_ID**: A client ID for the sink.
**Default**: `mqtt-sink`
- **MQTT_TOPIC_ROOT**: The root for messages in MQTT, this can be anything.
- **MQTT_SERVER**: The address of your MQTT server.
- **MQTT_PORT**: The port of your MQTT server.
**Default**: `8883`
- **MQTT_USERNAME**: Username of your MQTT user.
- **MQTT_PASSWORD**: Password for the MQTT user.
- **MQTT_VERSION**: MQTT protocol version; choose 3.1, 3.1.1, or 5.
**Default**: `3.1.1`
- **MQTT_USE_TLS**: Set to true if the server uses TLS.
**Default**: `True`

## Contribute

Expand Down
36 changes: 25 additions & 11 deletions python/destinations/MQTT/library.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,51 +18,65 @@
"Type": "EnvironmentVariable",
"InputType": "InputTopic",
"Description": "Name of the input topic to listen to.",
"DefaultValue": "",
"Required": true
},
{
"Name": "mqtt_topic_root",
"Name": "MQTT_CLIENT_ID",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "A client ID for the sink",
"DefaultValue": "mqtt-sink",
"Required": true
},
{
"Name": "MQTT_TOPIC_ROOT",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "The root for messages in MQTT, this can be anything",
"Required": true
},
{
"Name": "mqtt_server",
"Name": "MQTT_SERVER",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "The address of your MQTT server",
"Required": true
},
{
"Name": "mqtt_port",
"Name": "MQTT_PORT",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "The port of your MQTT server",
"DefaultValue": "8883",
"Required": true
},
{
"Name": "mqtt_username",
"Name": "MQTT_USERNAME",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "Username of your MQTT user",
"Required": false
"Required": true
},
{
"Name": "mqtt_password",
"Name": "MQTT_PASSWORD",
"Type": "EnvironmentVariable",
"InputType": "Secret",
"Description": "Password for the MQTT user",
"Required": false
"Required": true
},
{
"Name": "MQTT_VERSION",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "MQTT protocol version; choose 3.1, 3.1.1, or 5",
"Required": true
},
{
"Name": "mqtt_version",
"Name": "MQTT_USE_TLS",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "MQTT protocol version: 3.1, 3.1.1, 5",
"DefaultValue": "3.1.1",
"Description": "Set to true if the server uses TLS",
"DefaultValue": "true",
"Required": true
}
],
Expand Down
107 changes: 20 additions & 87 deletions python/destinations/MQTT/main.py
Original file line number Diff line number Diff line change
@@ -1,95 +1,28 @@
from quixstreams import Application, context
import paho.mqtt.client as paho
from paho import mqtt
import json
from mqtt import MQTTSink
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: Properly import MQTT Sink from Quix Streams once proper version is available

from quixstreams import Application
import os

# Load environment variables (useful when working locally)
from dotenv import load_dotenv
load_dotenv()
# from dotenv import load_dotenv
# load_dotenv()

def mqtt_protocol_version():
if os.environ["mqtt_version"] == "3.1":
print("Using MQTT version 3.1")
return paho.MQTTv31
if os.environ["mqtt_version"] == "3.1.1":
print("Using MQTT version 3.1.1")
return paho.MQTTv311
if os.environ["mqtt_version"] == "5":
print("Using MQTT version 5")
return paho.MQTTv5
print("Defaulting to MQTT version 3.1.1")
return paho.MQTTv311
app = Application(consumer_group="mqtt_consumer_group", auto_offset_reset="earliest")
input_topic = app.topic(os.environ["input"], value_deserializer="double")

def configure_authentication(mqtt_client):
mqtt_username = os.getenv("mqtt_username", "")
if mqtt_username != "":
mqtt_password = os.getenv("mqtt_password", "")
if mqtt_password == "":
raise ValueError('mqtt_password must set when mqtt_username is set')
print("Using username & password authentication")
mqtt_client.username_pw_set(os.environ["mqtt_username"], os.environ["mqtt_password"])
return
print("Using anonymous authentication")
sink = MQTTSink(
client_id=os.environ["MQTT_CLIENT_ID"],
server=os.environ["MQTT_SERVER"],
port=int(os.environ["MQTT_PORT"]),
topic_root=os.environ["MQTT_TOPIC_ROOT"],
username=os.environ["MQTT_USERNAME"],
password=os.environ["MQTT_PASSWORD"],
version=os.environ["MQTT_VERSION"],
tls_enabled=os.environ["MQTT_USE_TLS"].lower() == "true"
)

mqtt_port = os.environ["mqtt_port"]
# Validate the config
if not mqtt_port.isnumeric():
raise ValueError('mqtt_port must be a numeric value')
sdf = app.dataframe(topic=input_topic)
sdf.sink(sink)

client_id = os.getenv("Quix__Deployment__Id", "default")
mqtt_client = paho.Client(callback_api_version=paho.CallbackAPIVersion.VERSION2,
client_id = client_id, userdata = None, protocol = mqtt_protocol_version())
mqtt_client.tls_set(tls_version = mqtt.client.ssl.PROTOCOL_TLS) # we'll be using tls
mqtt_client.reconnect_delay_set(5, 60)
configure_authentication(mqtt_client)

# Create a Quix platform-specific application instead
app = Application(consumer_group="mqtt_consumer_group", auto_offset_reset='earliest')
# initialize the topic, this will combine the topic name with the environment details to produce a valid topic identifier
input_topic = app.topic(os.environ["input"])

# setting callbacks for different events to see if it works, print the message etc.
def on_connect_cb(client: paho.Client, userdata: any, connect_flags: paho.ConnectFlags,
reason_code: paho.ReasonCode, properties: paho.Properties):
if reason_code == 0:
print("CONNECTED!") # required for Quix to know this has connected
else:
print(f"ERROR! - ({reason_code.value}). {reason_code.getName()}")

def on_disconnect_cb(client: paho.Client, userdata: any, disconnect_flags: paho.DisconnectFlags,
reason_code: paho.ReasonCode, properties: paho.Properties):
print(f"DISCONNECTED! Reason code ({reason_code.value}) {reason_code.getName()}!")

mqtt_client.on_connect = on_connect_cb
mqtt_client.on_disconnect = on_disconnect_cb

mqtt_topic_root = os.environ["mqtt_topic_root"]

# connect to MQTT Cloud on port 8883 (default for MQTT)
mqtt_client.connect(os.environ["mqtt_server"], int(mqtt_port))

# Hook up to termination signal (for docker image) and CTRL-C
print("Listening to streams. Press CTRL-C to exit.")

sdf = app.dataframe(input_topic)

def publish_to_mqtt(data, key, timestamp, headers):
json_data = json.dumps(data)
message_key_string = key.decode('utf-8') # Convert to string using utf-8 encoding
# publish to MQTT
mqtt_client.publish(mqtt_topic_root + "/" + message_key_string, payload = json_data, qos = 1)

sdf = sdf.apply(publish_to_mqtt, metadata=True)


# start the background process to handle MQTT messages
mqtt_client.loop_start()

print("Starting application")
# run the data processing pipeline
app.run(sdf)

# stop handling MQTT messages
mqtt_client.loop_stop()
print("Exiting")
if __name__ == '__main__':
app.run()
Loading