221 lines
7.8 KiB
Python
221 lines
7.8 KiB
Python
import os
|
|
import json
|
|
import logging
|
|
import uuid
|
|
import time
|
|
from typing import Optional, Dict, Any
|
|
import paho.mqtt.client as mqtt
|
|
import threading
|
|
import atexit
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# MQTT Configuration from environment
|
|
MQTT_BROKER_HOST = os.getenv("MQTT_BROKER_HOST", "localhost")
|
|
MQTT_BROKER_PORT = int(os.getenv("MQTT_BROKER_PORT", "1883"))
|
|
MQTT_USERNAME = os.getenv("MQTT_USERNAME", "")
|
|
MQTT_PASSWORD = os.getenv("MQTT_PASSWORD", "")
|
|
MQTT_LABEL_TOPIC = os.getenv("MQTT_LABEL_TOPIC", "vet/labels/print")
|
|
MQTT_STATUS_TOPIC = os.getenv("MQTT_STATUS_TOPIC", "vet/labels/status")
|
|
|
|
# Store responses by job_id
|
|
_response_store: Dict[str, Any] = {}
|
|
_response_lock = threading.Lock()
|
|
|
|
# Persistent MQTT client
|
|
_mqtt_client: Optional[mqtt.Client] = None
|
|
_client_connected = threading.Event()
|
|
_client_lock = threading.Lock()
|
|
_initialization_attempted = False
|
|
|
|
def on_connect(client, userdata, flags, rc):
|
|
"""Callback when connected to broker"""
|
|
if rc == 0:
|
|
print(f"MQTT: Connected to broker at {MQTT_BROKER_HOST}:{MQTT_BROKER_PORT}")
|
|
# Subscribe to status topic
|
|
result = client.subscribe(MQTT_STATUS_TOPIC, qos=1)
|
|
print(f"MQTT: Subscribed to {MQTT_STATUS_TOPIC}, result={result}")
|
|
_client_connected.set()
|
|
else:
|
|
print(f"MQTT: Failed to connect, return code: {rc}")
|
|
_client_connected.clear()
|
|
|
|
def on_disconnect(client, userdata, rc):
|
|
"""Callback when disconnected from broker"""
|
|
print(f"MQTT: Disconnected, return code: {rc}")
|
|
_client_connected.clear()
|
|
|
|
def on_status_message(client, userdata, message):
|
|
"""Callback for status messages"""
|
|
print(f"MQTT: Received message on topic '{message.topic}': {message.payload.decode()[:200]}")
|
|
try:
|
|
payload = json.loads(message.payload.decode())
|
|
job_id = payload.get("job_id")
|
|
if job_id:
|
|
with _response_lock:
|
|
_response_store[job_id] = payload
|
|
print(f"MQTT: Stored response for job {job_id}: {payload.get('status')}")
|
|
else:
|
|
print(f"MQTT: Message has no job_id: {payload}")
|
|
except Exception as e:
|
|
print(f"MQTT: Error processing status message: {str(e)}")
|
|
|
|
def get_mqtt_client():
|
|
"""Get or initialize the persistent MQTT client"""
|
|
global _mqtt_client, _initialization_attempted
|
|
|
|
with _client_lock:
|
|
# If client exists and is connected, return it
|
|
if _mqtt_client is not None and _client_connected.is_set():
|
|
return _mqtt_client
|
|
|
|
# If we already tried and failed recently, don't retry immediately
|
|
if _initialization_attempted and _mqtt_client is None:
|
|
return None
|
|
|
|
_initialization_attempted = True
|
|
|
|
try:
|
|
print(f"MQTT: Initializing client connection to {MQTT_BROKER_HOST}:{MQTT_BROKER_PORT}")
|
|
|
|
# Clean up old client if exists
|
|
if _mqtt_client is not None:
|
|
try:
|
|
_mqtt_client.loop_stop()
|
|
_mqtt_client.disconnect()
|
|
except:
|
|
pass
|
|
|
|
# Create new MQTT client
|
|
_mqtt_client = mqtt.Client(client_id=f"drug-inventory-main")
|
|
|
|
# Set username and password if provided
|
|
if MQTT_USERNAME and MQTT_PASSWORD:
|
|
_mqtt_client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD)
|
|
|
|
# Set up callbacks
|
|
_mqtt_client.on_connect = on_connect
|
|
_mqtt_client.on_disconnect = on_disconnect
|
|
_mqtt_client.on_message = on_status_message
|
|
|
|
# Connect to broker
|
|
_mqtt_client.connect(MQTT_BROKER_HOST, MQTT_BROKER_PORT, 60)
|
|
|
|
# Start network loop in background
|
|
_mqtt_client.loop_start()
|
|
|
|
# Wait for connection (max 3 seconds)
|
|
if _client_connected.wait(timeout=3.0):
|
|
print("MQTT: Client initialized successfully")
|
|
return _mqtt_client
|
|
else:
|
|
print("MQTT: Connection timeout")
|
|
_mqtt_client.loop_stop()
|
|
_mqtt_client = None
|
|
return None
|
|
|
|
except Exception as e:
|
|
print(f"MQTT: Error initializing client: {str(e)}")
|
|
_mqtt_client = None
|
|
return None
|
|
|
|
def publish_label_print_with_response(label_data: dict, timeout: float = 5.0) -> tuple[bool, Optional[dict]]:
|
|
"""
|
|
Publish a label print request to MQTT broker and wait for response
|
|
|
|
Args:
|
|
label_data: Dictionary containing label print information
|
|
timeout: Maximum time to wait for response in seconds
|
|
|
|
Returns:
|
|
Tuple of (success, response_data)
|
|
"""
|
|
# Get or initialize MQTT client
|
|
client = get_mqtt_client()
|
|
if client is None:
|
|
print("MQTT: Client not available")
|
|
return False, {"status": "error", "message": "MQTT client not connected"}
|
|
|
|
job_id = str(uuid.uuid4())
|
|
label_data["job_id"] = job_id
|
|
|
|
try:
|
|
# Publish message
|
|
message = json.dumps(label_data)
|
|
result = client.publish(MQTT_LABEL_TOPIC, message, qos=1)
|
|
|
|
if result.rc != mqtt.MQTT_ERR_SUCCESS:
|
|
print(f"MQTT: Failed to publish, rc={result.rc}")
|
|
return False, {"status": "error", "message": "Failed to publish message"}
|
|
|
|
print(f"MQTT: Published job {job_id}")
|
|
|
|
# Wait for response
|
|
start_time = time.time()
|
|
while time.time() - start_time < timeout:
|
|
with _response_lock:
|
|
if job_id in _response_store:
|
|
response = _response_store.pop(job_id)
|
|
|
|
# Check if print was successful
|
|
status = response.get("status", "").lower()
|
|
success = status in ["success", "completed", "ok"]
|
|
print(f"MQTT: Job {job_id} completed with status: {status}")
|
|
return success, response
|
|
|
|
time.sleep(0.05) # Check every 50ms
|
|
|
|
# Timeout - no response received
|
|
print(f"MQTT: Timeout waiting for job {job_id}")
|
|
# Clean up in case response arrives late
|
|
with _response_lock:
|
|
_response_store.pop(job_id, None)
|
|
return False, {"status": "timeout", "message": "No response from printer"}
|
|
|
|
except Exception as e:
|
|
print(f"MQTT: Error in publish_label_print_with_response: {str(e)}")
|
|
return False, {"status": "error", "message": str(e)}
|
|
|
|
def publish_label_print(label_data: dict) -> bool:
|
|
"""
|
|
Publish a label print request to MQTT broker (fire and forget)
|
|
|
|
Args:
|
|
label_data: Dictionary containing label print information
|
|
|
|
Returns:
|
|
True if successful, False otherwise
|
|
"""
|
|
try:
|
|
# Create MQTT client
|
|
client = mqtt.Client()
|
|
|
|
# Set username and password if provided
|
|
if MQTT_USERNAME and MQTT_PASSWORD:
|
|
client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD)
|
|
|
|
# Connect to broker
|
|
client.connect(MQTT_BROKER_HOST, MQTT_BROKER_PORT, 60)
|
|
|
|
# Start network loop to process connection
|
|
client.loop_start()
|
|
|
|
# Publish message with QoS 0 (fire and forget)
|
|
message = json.dumps(label_data)
|
|
result = client.publish(MQTT_LABEL_TOPIC, message, qos=0)
|
|
|
|
# Stop loop and disconnect
|
|
client.loop_stop()
|
|
client.disconnect()
|
|
|
|
if result.rc == mqtt.MQTT_ERR_SUCCESS:
|
|
logger.info(f"Successfully published label print request to {MQTT_LABEL_TOPIC}")
|
|
return True
|
|
else:
|
|
logger.error(f"Failed to publish label print request: {result.rc}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error publishing MQTT message: {str(e)}")
|
|
return False
|