From b28da4739f5360b9ac648e08cf0a03be97f69419 Mon Sep 17 00:00:00 2001 From: localhorst Date: Sun, 1 Jun 2025 12:46:16 +0200 Subject: [PATCH] new mqtt connection --- ttn-vegapulsair-exporter.py | 60 +++++++++++++++++++++++++------------ 1 file changed, 41 insertions(+), 19 deletions(-) diff --git a/ttn-vegapulsair-exporter.py b/ttn-vegapulsair-exporter.py index 6794c17..cce64a5 100644 --- a/ttn-vegapulsair-exporter.py +++ b/ttn-vegapulsair-exporter.py @@ -11,6 +11,9 @@ import time import json import sys import config +import logging +import ssl + scrape_healthy = True startTime = datetime.now() @@ -214,8 +217,27 @@ def on_message(mqttc, obj, msg): scrape_healthy = False print(f"Unable to parse uplink: {e}") -def poll_mqtt(mqttc): - mqttc.loop_forever() +def poll_mqtt(mqtt_client): + # Start the network loop + mqtt_client.loop_forever() + +def configure_mqtt_client(): + client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2) + client.on_connect = on_connect + client.on_message = on_message + client.on_disconnect = on_disconnect + + # Set credentials + client.username_pw_set(config.ttn_user, config.ttn_key) + + # Set up TLS/SSL + client.tls_set( + cert_reqs=ssl.CERT_REQUIRED, + tls_version=ssl.PROTOCOL_TLSv1_2, # Enforce TLS 1.2 + ) + client.tls_insecure_set(False) # Enforce strict certificate validation + + return client def main(): global mqtt_client @@ -229,29 +251,29 @@ def main(): reconnect_thread.start() while True: + mqtt_client = configure_mqtt_client() try: - print("starting ...") - mqtt_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2) - mqtt_client.on_connect = on_connect - mqtt_client.on_message = on_message - mqtt_client.on_disconnect = on_disconnect - mqtt_client.username_pw_set(config.ttn_user, config.ttn_key) - mqtt_client.tls_set() - mqtt_client.connect( - config.ttn_region.lower() + ".cloud.thethings.network", 8883, 60 - ) - mqtt_client.subscribe("#", 0) # all device uplinks + # Connect to TTN broker + broker_url = f"{config.ttn_region.lower()}.cloud.thethings.network" + mqtt_client.connect(broker_url, 8883, 60) + + # Subscribe to all topics + mqtt_client.subscribe("#", 1) + logging.info(f"Subscribed to all topics.") poll_mqtt_thread = threading.Thread(target=poll_mqtt, args=((mqtt_client,))) poll_mqtt_thread.start() + except Exception as e: + logging.error(f"Error occurred: {e}") + mqtt_client.loop_stop() - webServer = HTTPServer((config.hostName, config.serverPort), RequestHandler) - print("Server started http://%s:%s" % (config.hostName, config.serverPort)) + webServer = HTTPServer((config.hostName, config.serverPort), RequestHandler) + print("Server started http://%s:%s" % (config.hostName, config.serverPort)) - try: - webServer.serve_forever() - except KeyboardInterrupt: - sys.exit(-1) + try: + webServer.serve_forever() + except KeyboardInterrupt: + sys.exit(-1) webServer.server_close() print("Server stopped.")