Websockets MQTT support
This commit is contained in:
55
client.py
55
client.py
@@ -2,6 +2,7 @@ import os
|
||||
import json
|
||||
import time
|
||||
import threading
|
||||
import ssl
|
||||
import paho.mqtt.client as mqtt
|
||||
from PIL import Image, ImageDraw, ImageFont
|
||||
from datetime import date
|
||||
@@ -13,6 +14,11 @@ from templates import TEMPLATES
|
||||
# Load configuration from environment
|
||||
MQTT_HOST = os.getenv('MQTT_HOST', 'localhost')
|
||||
MQTT_PORT = int(os.getenv('MQTT_PORT', 1883))
|
||||
MQTT_USE_WEBSOCKET = os.getenv('MQTT_USE_WEBSOCKET', 'False').lower() == 'true'
|
||||
MQTT_WEBSOCKET_PATH = os.getenv('MQTT_WEBSOCKET_PATH', '/mqtt')
|
||||
MQTT_TLS_INSECURE = os.getenv('MQTT_TLS_INSECURE', 'False').lower() == 'true'
|
||||
MQTT_USERNAME = os.getenv('MQTT_USERNAME', '')
|
||||
MQTT_PASSWORD = os.getenv('MQTT_PASSWORD', '')
|
||||
MQTT_TOPIC_SUB = os.getenv('MQTT_TOPIC_SUB', 'vet/labels/print')
|
||||
MQTT_TOPIC_PUB_STATUS = os.getenv('MQTT_TOPIC_PUB_STATUS', 'vet/labels/status')
|
||||
MQTT_TOPIC_HEARTBEAT = os.getenv('MQTT_TOPIC_HEARTBEAT', 'vet/labels/heartbeat')
|
||||
@@ -142,12 +148,14 @@ def on_message(client, userdata, msg):
|
||||
except json.JSONDecodeError as e:
|
||||
error_msg = f"Invalid JSON in message: {e}. Raw payload: {raw_payload}"
|
||||
print(error_msg)
|
||||
client.publish(MQTT_TOPIC_PUB_STATUS, json.dumps({"status": "error", "job_id": job_id, "error": error_msg, "topic": msg.topic}))
|
||||
except Exception as e:
|
||||
error_details = {"status": "error", "job_id": job_id, "error": error_msg, "topic": msg.topic}
|
||||
client.publish(MQTT_TOPIC_PUB_STATUS, json.dumps(error_details))
|
||||
send_webhook("parse_error", error_details)
|
||||
error_msg = f"Error processing message: {str(e)}"
|
||||
print(error_msg)
|
||||
client.publish(MQTT_TOPIC_PUB_STATUS, json.dumps({"status": "error", "job_id": job_id, "error": error_msg, "original_payload": raw_payload}))
|
||||
|
||||
error_details = {"status": "error", "job_id": job_id, "error": error_msg, "original_payload": raw_payload}
|
||||
client.publish(MQTT_TOPIC_PUB_STATUS, json.dumps(error_details))
|
||||
send_webhook("processing_error", error_details)
|
||||
def heartbeat(client):
|
||||
while True:
|
||||
try:
|
||||
@@ -155,7 +163,8 @@ def heartbeat(client):
|
||||
"status": "alive",
|
||||
"timestamp": time.time(),
|
||||
"host": MQTT_HOST,
|
||||
"port": MQTT_PORT
|
||||
"port": MQTT_PORT,
|
||||
"websocket": MQTT_USE_WEBSOCKET
|
||||
}
|
||||
client.publish(MQTT_TOPIC_HEARTBEAT, json.dumps(heartbeat_msg))
|
||||
time.sleep(30) # Publish every 30 seconds
|
||||
@@ -164,11 +173,45 @@ def heartbeat(client):
|
||||
time.sleep(5) # Retry sooner on error
|
||||
|
||||
def main():
|
||||
client = mqtt.Client()
|
||||
client = mqtt.Client(client_id="printer_client", transport="websockets" if MQTT_USE_WEBSOCKET else "tcp")
|
||||
client.on_connect = on_connect
|
||||
client.on_message = on_message
|
||||
|
||||
# Configure TLS if using WebSocket
|
||||
if MQTT_USE_WEBSOCKET:
|
||||
if MQTT_TLS_INSECURE:
|
||||
# Disable TLS certificate verification (use with caution)
|
||||
print("WARNING: TLS certificate verification is disabled!")
|
||||
client.tls_set(
|
||||
ca_certs=None,
|
||||
certfile=None,
|
||||
keyfile=None,
|
||||
cert_reqs=ssl.CERT_NONE,
|
||||
tls_version=ssl.PROTOCOL_TLS,
|
||||
ciphers=None
|
||||
)
|
||||
client.tls_insecure_set(True)
|
||||
else:
|
||||
# Use default CA certificates
|
||||
client.tls_set(
|
||||
ca_certs=None,
|
||||
certfile=None,
|
||||
keyfile=None,
|
||||
cert_reqs=ssl.CERT_REQUIRED,
|
||||
tls_version=ssl.PROTOCOL_TLS,
|
||||
ciphers=None
|
||||
)
|
||||
|
||||
# Set username and password if provided
|
||||
if MQTT_USERNAME or MQTT_PASSWORD:
|
||||
client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD)
|
||||
|
||||
# Construct connection string with WebSocket path if needed
|
||||
print(f"Attempting to connect to MQTT broker at {MQTT_HOST}:{MQTT_PORT}")
|
||||
if MQTT_USE_WEBSOCKET:
|
||||
print(f"Using WebSocket transport with path: {MQTT_WEBSOCKET_PATH}")
|
||||
print(f"TLS Certificate Validation: {'Disabled' if MQTT_TLS_INSECURE else 'Enabled'}")
|
||||
|
||||
client.connect(MQTT_HOST, MQTT_PORT, 60)
|
||||
|
||||
# Start the network loop in a background thread
|
||||
|
||||
Reference in New Issue
Block a user