import os import json import logging import uuid import time from typing import Optional, Dict, Any import paho.mqtt.client as mqtt import threading import atexit logger = logging.getLogger(__name__) # MQTT Configuration from environment MQTT_BROKER_HOST = os.getenv("MQTT_BROKER_HOST", "localhost") MQTT_BROKER_PORT = int(os.getenv("MQTT_BROKER_PORT", "1883")) MQTT_USERNAME = os.getenv("MQTT_USERNAME", "") MQTT_PASSWORD = os.getenv("MQTT_PASSWORD", "") MQTT_LABEL_TOPIC = os.getenv("MQTT_LABEL_TOPIC", "vet/labels/print") MQTT_STATUS_TOPIC = os.getenv("MQTT_STATUS_TOPIC", "vet/labels/status") # Store responses by job_id _response_store: Dict[str, Any] = {} _response_lock = threading.Lock() # Persistent MQTT client _mqtt_client: Optional[mqtt.Client] = None _client_connected = threading.Event() _client_lock = threading.Lock() _initialization_attempted = False def on_connect(client, userdata, flags, rc): """Callback when connected to broker""" if rc == 0: print(f"MQTT: Connected to broker at {MQTT_BROKER_HOST}:{MQTT_BROKER_PORT}") # Subscribe to status topic result = client.subscribe(MQTT_STATUS_TOPIC, qos=1) print(f"MQTT: Subscribed to {MQTT_STATUS_TOPIC}, result={result}") _client_connected.set() else: print(f"MQTT: Failed to connect, return code: {rc}") _client_connected.clear() def on_disconnect(client, userdata, rc): """Callback when disconnected from broker""" print(f"MQTT: Disconnected, return code: {rc}") _client_connected.clear() def on_status_message(client, userdata, message): """Callback for status messages""" print(f"MQTT: Received message on topic '{message.topic}': {message.payload.decode()[:200]}") try: payload = json.loads(message.payload.decode()) job_id = payload.get("job_id") if job_id: with _response_lock: _response_store[job_id] = payload print(f"MQTT: Stored response for job {job_id}: {payload.get('status')}") else: print(f"MQTT: Message has no job_id: {payload}") except Exception as e: print(f"MQTT: Error processing status message: {str(e)}") def get_mqtt_client(): """Get or initialize the persistent MQTT client""" global _mqtt_client, _initialization_attempted with _client_lock: # If client exists and is connected, return it if _mqtt_client is not None and _client_connected.is_set(): return _mqtt_client # If we already tried and failed recently, don't retry immediately if _initialization_attempted and _mqtt_client is None: return None _initialization_attempted = True try: print(f"MQTT: Initializing client connection to {MQTT_BROKER_HOST}:{MQTT_BROKER_PORT}") # Clean up old client if exists if _mqtt_client is not None: try: _mqtt_client.loop_stop() _mqtt_client.disconnect() except: pass # Create new MQTT client _mqtt_client = mqtt.Client(client_id=f"drug-inventory-main") # Set username and password if provided if MQTT_USERNAME and MQTT_PASSWORD: _mqtt_client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD) # Set up callbacks _mqtt_client.on_connect = on_connect _mqtt_client.on_disconnect = on_disconnect _mqtt_client.on_message = on_status_message # Connect to broker _mqtt_client.connect(MQTT_BROKER_HOST, MQTT_BROKER_PORT, 60) # Start network loop in background _mqtt_client.loop_start() # Wait for connection (max 3 seconds) if _client_connected.wait(timeout=3.0): print("MQTT: Client initialized successfully") return _mqtt_client else: print("MQTT: Connection timeout") _mqtt_client.loop_stop() _mqtt_client = None return None except Exception as e: print(f"MQTT: Error initializing client: {str(e)}") _mqtt_client = None return None def publish_label_print_with_response(label_data: dict, timeout: float = 5.0) -> tuple[bool, Optional[dict]]: """ Publish a label print request to MQTT broker and wait for response Args: label_data: Dictionary containing label print information timeout: Maximum time to wait for response in seconds Returns: Tuple of (success, response_data) """ # Get or initialize MQTT client client = get_mqtt_client() if client is None: print("MQTT: Client not available") return False, {"status": "error", "message": "MQTT client not connected"} job_id = str(uuid.uuid4()) label_data["job_id"] = job_id try: # Publish message message = json.dumps(label_data) result = client.publish(MQTT_LABEL_TOPIC, message, qos=1) if result.rc != mqtt.MQTT_ERR_SUCCESS: print(f"MQTT: Failed to publish, rc={result.rc}") return False, {"status": "error", "message": "Failed to publish message"} print(f"MQTT: Published job {job_id}") # Wait for response start_time = time.time() while time.time() - start_time < timeout: with _response_lock: if job_id in _response_store: response = _response_store.pop(job_id) # Check if print was successful status = response.get("status", "").lower() success = status in ["success", "completed", "ok"] print(f"MQTT: Job {job_id} completed with status: {status}") return success, response time.sleep(0.05) # Check every 50ms # Timeout - no response received print(f"MQTT: Timeout waiting for job {job_id}") # Clean up in case response arrives late with _response_lock: _response_store.pop(job_id, None) return False, {"status": "timeout", "message": "No response from printer"} except Exception as e: print(f"MQTT: Error in publish_label_print_with_response: {str(e)}") return False, {"status": "error", "message": str(e)} def publish_label_print(label_data: dict) -> bool: """ Publish a label print request to MQTT broker (fire and forget) Args: label_data: Dictionary containing label print information Returns: True if successful, False otherwise """ try: # Create MQTT client client = mqtt.Client() # Set username and password if provided if MQTT_USERNAME and MQTT_PASSWORD: client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD) # Connect to broker client.connect(MQTT_BROKER_HOST, MQTT_BROKER_PORT, 60) # Start network loop to process connection client.loop_start() # Publish message with QoS 0 (fire and forget) message = json.dumps(label_data) result = client.publish(MQTT_LABEL_TOPIC, message, qos=0) # Stop loop and disconnect client.loop_stop() client.disconnect() if result.rc == mqtt.MQTT_ERR_SUCCESS: logger.info(f"Successfully published label print request to {MQTT_LABEL_TOPIC}") return True else: logger.error(f"Failed to publish label print request: {result.rc}") return False except Exception as e: logger.error(f"Error publishing MQTT message: {str(e)}") return False