652 lines
23 KiB
Python
652 lines
23 KiB
Python
from fastapi import FastAPI, Depends, HTTPException, APIRouter, status
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from sqlalchemy.orm import Session
|
|
from typing import List, Optional, Dict, Any
|
|
from datetime import datetime, timedelta
|
|
from .database import engine, get_db, Base
|
|
from .models import Drug, DrugVariant, Dispensing, User
|
|
from .auth import hash_password, verify_password, create_access_token, get_current_user, get_current_admin_user, ACCESS_TOKEN_EXPIRE_MINUTES
|
|
from .mqtt_service import publish_label_print_with_response
|
|
from pydantic import BaseModel
|
|
|
|
# Create tables
|
|
Base.metadata.create_all(bind=engine)
|
|
|
|
app = FastAPI(title="Drug Inventory API")
|
|
|
|
# CORS middleware for frontend
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"], # In production, restrict this
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
# Create a router with /api prefix
|
|
router = APIRouter(prefix="/api")
|
|
|
|
# Pydantic schemas
|
|
class UserCreate(BaseModel):
|
|
username: str
|
|
password: str
|
|
|
|
class PasswordChange(BaseModel):
|
|
current_password: str
|
|
new_password: str
|
|
|
|
class AdminPasswordChange(BaseModel):
|
|
new_password: str
|
|
|
|
class UserResponse(BaseModel):
|
|
id: int
|
|
username: str
|
|
is_admin: bool
|
|
|
|
class Config:
|
|
from_attributes = True
|
|
|
|
class TokenResponse(BaseModel):
|
|
access_token: str
|
|
token_type: str
|
|
user: UserResponse
|
|
|
|
class DrugCreate(BaseModel):
|
|
name: str
|
|
description: Optional[str] = None
|
|
|
|
class DrugUpdate(BaseModel):
|
|
name: Optional[str] = None
|
|
description: Optional[str] = None
|
|
|
|
class DrugResponse(BaseModel):
|
|
id: int
|
|
name: str
|
|
description: Optional[str] = None
|
|
|
|
class Config:
|
|
from_attributes = True
|
|
|
|
class DrugVariantCreate(BaseModel):
|
|
strength: str
|
|
quantity: float
|
|
unit: str = "units"
|
|
low_stock_threshold: float = 10
|
|
|
|
class DrugVariantUpdate(BaseModel):
|
|
strength: str = None
|
|
quantity: float = None
|
|
unit: str = None
|
|
low_stock_threshold: float = None
|
|
|
|
class DrugVariantResponse(BaseModel):
|
|
id: int
|
|
drug_id: int
|
|
strength: str
|
|
quantity: float
|
|
unit: str
|
|
low_stock_threshold: float
|
|
|
|
class Config:
|
|
from_attributes = True
|
|
|
|
class DrugWithVariantsResponse(BaseModel):
|
|
id: int
|
|
name: str
|
|
description: Optional[str] = None
|
|
variants: List[DrugVariantResponse] = []
|
|
|
|
class Config:
|
|
from_attributes = True
|
|
|
|
class DispensingCreate(BaseModel):
|
|
drug_variant_id: int
|
|
quantity: float
|
|
animal_name: Optional[str] = None
|
|
user_name: str
|
|
notes: Optional[str] = None
|
|
|
|
class DispensingResponse(BaseModel):
|
|
id: int
|
|
drug_variant_id: int
|
|
quantity: float
|
|
animal_name: Optional[str] = None
|
|
user_name: str
|
|
notes: Optional[str] = None
|
|
dispensed_at: datetime
|
|
|
|
class Config:
|
|
from_attributes = True
|
|
|
|
class LabelVariables(BaseModel):
|
|
practice_name: str
|
|
animal_name: str
|
|
drug_name: str
|
|
dosage: str
|
|
quantity: str
|
|
expiry_date: str
|
|
|
|
class LabelPrintRequest(BaseModel):
|
|
variables: LabelVariables
|
|
|
|
class LabelPrintResponse(BaseModel):
|
|
success: bool
|
|
message: str
|
|
|
|
class NotesVariables(BaseModel):
|
|
animal_name: str
|
|
notes: str
|
|
|
|
class NotesPrintRequest(BaseModel):
|
|
variables: NotesVariables
|
|
|
|
class NotesPrintResponse(BaseModel):
|
|
success: bool
|
|
message: str
|
|
|
|
# Authentication Routes
|
|
@router.post("/auth/register", response_model=TokenResponse)
|
|
def register(user_data: UserCreate, db: Session = Depends(get_db)):
|
|
"""Register the first admin user (only allowed if no users exist)"""
|
|
# Check if users already exist
|
|
user_count = db.query(User).count()
|
|
if user_count > 0:
|
|
raise HTTPException(
|
|
status_code=403,
|
|
detail="Registration is disabled. Contact an administrator to create an account."
|
|
)
|
|
|
|
# Check if user already exists
|
|
existing_user = db.query(User).filter(User.username == user_data.username).first()
|
|
if existing_user:
|
|
raise HTTPException(status_code=400, detail="Username already registered")
|
|
|
|
# First (and only allowed) user is admin
|
|
hashed_password = hash_password(user_data.password)
|
|
db_user = User(
|
|
username=user_data.username,
|
|
hashed_password=hashed_password,
|
|
is_admin=True
|
|
)
|
|
db.add(db_user)
|
|
db.commit()
|
|
db.refresh(db_user)
|
|
|
|
# Create access token
|
|
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
|
|
access_token = create_access_token(
|
|
data={"sub": db_user.username},
|
|
expires_delta=access_token_expires
|
|
)
|
|
|
|
return {
|
|
"access_token": access_token,
|
|
"token_type": "bearer",
|
|
"user": db_user
|
|
}
|
|
|
|
@router.post("/auth/login", response_model=TokenResponse)
|
|
def login(user_data: UserCreate, db: Session = Depends(get_db)):
|
|
"""Login with username and password"""
|
|
user = db.query(User).filter(User.username == user_data.username).first()
|
|
if not user or not verify_password(user_data.password, user.hashed_password):
|
|
raise HTTPException(status_code=401, detail="Invalid credentials")
|
|
|
|
# Create access token
|
|
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
|
|
access_token = create_access_token(
|
|
data={"sub": user.username},
|
|
expires_delta=access_token_expires
|
|
)
|
|
|
|
return {
|
|
"access_token": access_token,
|
|
"token_type": "bearer",
|
|
"user": user
|
|
}
|
|
|
|
@router.get("/auth/me", response_model=UserResponse)
|
|
def get_current_user_info(current_user: User = Depends(get_current_user)):
|
|
"""Get current user info"""
|
|
return current_user
|
|
|
|
# User Management Routes (Admin only)
|
|
@router.get("/users", response_model=List[UserResponse])
|
|
def list_users(db: Session = Depends(get_db), current_user: User = Depends(get_current_admin_user)):
|
|
"""List all users (admin only)"""
|
|
return db.query(User).all()
|
|
|
|
@router.post("/users", response_model=UserResponse)
|
|
def create_user(user_data: UserCreate, db: Session = Depends(get_db), current_user: User = Depends(get_current_admin_user)):
|
|
"""Create a new user (admin only)"""
|
|
# Check if user already exists
|
|
existing_user = db.query(User).filter(User.username == user_data.username).first()
|
|
if existing_user:
|
|
raise HTTPException(status_code=400, detail="Username already exists")
|
|
|
|
hashed_password = hash_password(user_data.password)
|
|
db_user = User(
|
|
username=user_data.username,
|
|
hashed_password=hashed_password,
|
|
is_admin=False
|
|
)
|
|
db.add(db_user)
|
|
db.commit()
|
|
db.refresh(db_user)
|
|
return db_user
|
|
|
|
@router.delete("/users/{user_id}")
|
|
def delete_user(user_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_admin_user)):
|
|
"""Delete a user (admin only)"""
|
|
# Don't allow deleting yourself
|
|
if current_user.id == user_id:
|
|
raise HTTPException(status_code=400, detail="Cannot delete your own user account")
|
|
|
|
user = db.query(User).filter(User.id == user_id).first()
|
|
if not user:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
|
|
db.delete(user)
|
|
db.commit()
|
|
return {"message": "User deleted successfully"}
|
|
|
|
@router.post("/auth/change-password")
|
|
def change_own_password(password_data: PasswordChange, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
|
|
"""Change current user's password"""
|
|
user = db.query(User).filter(User.id == current_user.id).first()
|
|
if not user:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
|
|
# Verify current password
|
|
if not verify_password(password_data.current_password, user.hashed_password):
|
|
raise HTTPException(status_code=401, detail="Current password is incorrect")
|
|
|
|
# Update password
|
|
user.hashed_password = hash_password(password_data.new_password)
|
|
db.commit()
|
|
|
|
return {"message": "Password changed successfully"}
|
|
|
|
@router.post("/users/{user_id}/change-password")
|
|
def admin_change_password(user_id: int, password_data: AdminPasswordChange, db: Session = Depends(get_db), current_user: User = Depends(get_current_admin_user)):
|
|
"""Change a user's password (admin only)"""
|
|
user = db.query(User).filter(User.id == user_id).first()
|
|
if not user:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
|
|
# Don't allow changing yourself via this endpoint
|
|
if current_user.id == user_id:
|
|
raise HTTPException(status_code=400, detail="Use /auth/change-password to change your own password")
|
|
|
|
# Update password
|
|
user.hashed_password = hash_password(password_data.new_password)
|
|
db.commit()
|
|
|
|
return {"message": "Password changed successfully"}
|
|
|
|
# Routes
|
|
@router.get("/")
|
|
def read_root():
|
|
return {"message": "Drug Inventory API"}
|
|
|
|
@router.get("/drugs", response_model=List[DrugWithVariantsResponse])
|
|
def list_drugs(db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
|
|
"""Get all drugs with their variants"""
|
|
drugs = db.query(Drug).all()
|
|
result = []
|
|
for drug in drugs:
|
|
variants = db.query(DrugVariant).filter(DrugVariant.drug_id == drug.id).all()
|
|
drug_dict = drug.__dict__.copy()
|
|
drug_dict['variants'] = variants
|
|
result.append(drug_dict)
|
|
return result
|
|
|
|
@router.get("/drugs/low-stock", response_model=List[DrugWithVariantsResponse])
|
|
def low_stock_drugs(db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
|
|
"""Get drugs with low stock variants"""
|
|
# Get variants that are low on stock
|
|
low_stock_variants = db.query(DrugVariant).filter(
|
|
DrugVariant.quantity <= DrugVariant.low_stock_threshold
|
|
).all()
|
|
|
|
# Get unique drug IDs
|
|
drug_ids = list(set(v.drug_id for v in low_stock_variants))
|
|
drugs = db.query(Drug).filter(Drug.id.in_(drug_ids)).all()
|
|
|
|
result = []
|
|
for drug in drugs:
|
|
variants = [v for v in low_stock_variants if v.drug_id == drug.id]
|
|
drug_dict = drug.__dict__.copy()
|
|
drug_dict['variants'] = variants
|
|
result.append(drug_dict)
|
|
return result
|
|
|
|
@router.get("/drugs/{drug_id}", response_model=DrugWithVariantsResponse)
|
|
def get_drug(drug_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
|
|
"""Get a specific drug with its variants"""
|
|
drug = db.query(Drug).filter(Drug.id == drug_id).first()
|
|
if not drug:
|
|
raise HTTPException(status_code=404, detail="Drug not found")
|
|
|
|
variants = db.query(DrugVariant).filter(DrugVariant.drug_id == drug.id).all()
|
|
drug_dict = drug.__dict__.copy()
|
|
drug_dict['variants'] = variants
|
|
return drug_dict
|
|
|
|
@router.post("/drugs", response_model=DrugWithVariantsResponse)
|
|
def create_drug(drug: DrugCreate, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
|
|
"""Create a new drug"""
|
|
# Check if drug name already exists
|
|
existing = db.query(Drug).filter(Drug.name == drug.name).first()
|
|
if existing:
|
|
raise HTTPException(status_code=400, detail="Drug with this name already exists")
|
|
|
|
db_drug = Drug(name=drug.name, description=drug.description)
|
|
db.add(db_drug)
|
|
db.commit()
|
|
db.refresh(db_drug)
|
|
|
|
# Return drug with empty variants list
|
|
drug_dict = db_drug.__dict__.copy()
|
|
drug_dict['variants'] = []
|
|
return drug_dict
|
|
|
|
@router.put("/drugs/{drug_id}", response_model=DrugWithVariantsResponse)
|
|
def update_drug(drug_id: int, drug_update: DrugUpdate, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
|
|
"""Update a drug"""
|
|
drug = db.query(Drug).filter(Drug.id == drug_id).first()
|
|
if not drug:
|
|
raise HTTPException(status_code=404, detail="Drug not found")
|
|
|
|
for field, value in drug_update.dict(exclude_unset=True).items():
|
|
setattr(drug, field, value)
|
|
|
|
db.commit()
|
|
db.refresh(drug)
|
|
|
|
variants = db.query(DrugVariant).filter(DrugVariant.drug_id == drug.id).all()
|
|
drug_dict = drug.__dict__.copy()
|
|
drug_dict['variants'] = variants
|
|
return drug_dict
|
|
|
|
@router.delete("/drugs/{drug_id}")
|
|
def delete_drug(drug_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
|
|
"""Delete a drug and all its variants"""
|
|
drug = db.query(Drug).filter(Drug.id == drug_id).first()
|
|
if not drug:
|
|
raise HTTPException(status_code=404, detail="Drug not found")
|
|
|
|
# Delete all variants first
|
|
db.query(DrugVariant).filter(DrugVariant.drug_id == drug_id).delete()
|
|
# Delete the drug
|
|
db.delete(drug)
|
|
db.commit()
|
|
|
|
return {"message": "Drug and all variants deleted successfully"}
|
|
|
|
# Drug Variant endpoints
|
|
@router.post("/drugs/{drug_id}/variants", response_model=DrugVariantResponse)
|
|
def create_drug_variant(drug_id: int, variant: DrugVariantCreate, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
|
|
"""Create a new variant for a drug"""
|
|
# Check if drug exists
|
|
drug = db.query(Drug).filter(Drug.id == drug_id).first()
|
|
if not drug:
|
|
raise HTTPException(status_code=404, detail="Drug not found")
|
|
|
|
# Check if variant with same strength already exists for this drug
|
|
existing = db.query(DrugVariant).filter(
|
|
DrugVariant.drug_id == drug_id,
|
|
DrugVariant.strength == variant.strength
|
|
).first()
|
|
if existing:
|
|
raise HTTPException(status_code=400, detail="Variant with this strength already exists for this drug")
|
|
|
|
db_variant = DrugVariant(
|
|
drug_id=drug_id,
|
|
strength=variant.strength,
|
|
quantity=variant.quantity,
|
|
unit=variant.unit,
|
|
low_stock_threshold=variant.low_stock_threshold
|
|
)
|
|
db.add(db_variant)
|
|
db.commit()
|
|
db.refresh(db_variant)
|
|
return db_variant
|
|
|
|
@router.get("/variants/{variant_id}", response_model=DrugVariantResponse)
|
|
def get_drug_variant(variant_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
|
|
"""Get a specific drug variant"""
|
|
variant = db.query(DrugVariant).filter(DrugVariant.id == variant_id).first()
|
|
if not variant:
|
|
raise HTTPException(status_code=404, detail="Drug variant not found")
|
|
return variant
|
|
|
|
@router.put("/variants/{variant_id}", response_model=DrugVariantResponse)
|
|
def update_drug_variant(variant_id: int, variant_update: DrugVariantUpdate, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
|
|
"""Update a drug variant"""
|
|
variant = db.query(DrugVariant).filter(DrugVariant.id == variant_id).first()
|
|
if not variant:
|
|
raise HTTPException(status_code=404, detail="Drug variant not found")
|
|
|
|
for field, value in variant_update.dict(exclude_unset=True).items():
|
|
setattr(variant, field, value)
|
|
|
|
db.commit()
|
|
db.refresh(variant)
|
|
return variant
|
|
|
|
@router.delete("/variants/{variant_id}")
|
|
def delete_drug_variant(variant_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
|
|
"""Delete a drug variant"""
|
|
variant = db.query(DrugVariant).filter(DrugVariant.id == variant_id).first()
|
|
if not variant:
|
|
raise HTTPException(status_code=404, detail="Drug variant not found")
|
|
|
|
db.delete(variant)
|
|
db.commit()
|
|
|
|
return {"message": "Drug variant deleted successfully"}
|
|
|
|
# Dispensing endpoints
|
|
@router.post("/dispense", response_model=DispensingResponse)
|
|
def dispense_drug(dispensing: DispensingCreate, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
|
|
"""Record a drug dispensing and reduce inventory"""
|
|
# Check if drug variant exists
|
|
variant = db.query(DrugVariant).filter(DrugVariant.id == dispensing.drug_variant_id).first()
|
|
if not variant:
|
|
raise HTTPException(status_code=404, detail="Drug variant not found")
|
|
|
|
# Check if enough quantity available
|
|
if variant.quantity < dispensing.quantity:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Insufficient quantity. Available: {variant.quantity}, Requested: {dispensing.quantity}"
|
|
)
|
|
|
|
# Reduce variant quantity
|
|
variant.quantity -= dispensing.quantity
|
|
db.commit()
|
|
|
|
# Create dispensing record
|
|
db_dispensing = Dispensing(**dispensing.dict())
|
|
db.add(db_dispensing)
|
|
db.commit()
|
|
db.refresh(db_dispensing)
|
|
return db_dispensing
|
|
|
|
@router.get("/dispense/history", response_model=List[DispensingResponse])
|
|
def list_dispensings(skip: int = 0, limit: int = 100, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
|
|
"""Get dispensing records (audit log)"""
|
|
return db.query(Dispensing).order_by(Dispensing.dispensed_at.desc()).offset(skip).limit(limit).all()
|
|
|
|
@router.get("/drugs/{drug_id}/dispense/history", response_model=List[DispensingResponse])
|
|
def get_drug_dispensings(drug_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
|
|
"""Get dispensing history for a specific drug (all variants)"""
|
|
# Verify drug exists
|
|
drug = db.query(Drug).filter(Drug.id == drug_id).first()
|
|
if not drug:
|
|
raise HTTPException(status_code=404, detail="Drug not found")
|
|
|
|
# Get all variant IDs for this drug
|
|
variant_ids = db.query(DrugVariant.id).filter(DrugVariant.drug_id == drug_id).subquery()
|
|
|
|
return db.query(Dispensing).filter(Dispensing.drug_variant_id.in_(variant_ids)).order_by(Dispensing.dispensed_at.desc()).all()
|
|
|
|
@router.get("/variants/{variant_id}/dispense/history", response_model=List[DispensingResponse])
|
|
def get_variant_dispensings(variant_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
|
|
"""Get dispensing history for a specific drug variant"""
|
|
# Verify variant exists
|
|
variant = db.query(DrugVariant).filter(DrugVariant.id == variant_id).first()
|
|
if not variant:
|
|
raise HTTPException(status_code=404, detail="Drug variant not found")
|
|
|
|
return db.query(Dispensing).filter(Dispensing.drug_variant_id == variant_id).order_by(Dispensing.dispensed_at.desc()).all()
|
|
|
|
# Helper function to capitalize text for labels
|
|
def capitalize_label_text(text: str) -> str:
|
|
"""Capitalize the first letter of each sentence in the text"""
|
|
if not text:
|
|
return text
|
|
|
|
# Capitalize first letter of the entire string
|
|
result = text[0].upper() + text[1:] if len(text) > 1 else text.upper()
|
|
|
|
# Also capitalize after periods and common sentence breaks
|
|
for delimiter in ['. ', '! ', '? ']:
|
|
parts = result.split(delimiter)
|
|
result = delimiter.join([
|
|
part[0].upper() + part[1:] if part else part
|
|
for part in parts
|
|
])
|
|
|
|
return result
|
|
|
|
# Label printing endpoint
|
|
@router.post("/labels/print", response_model=LabelPrintResponse)
|
|
def print_label(label_request: LabelPrintRequest, current_user: User = Depends(get_current_user)):
|
|
"""
|
|
Print a drug label by publishing an MQTT message
|
|
|
|
This endpoint publishes a label print request to the MQTT broker,
|
|
which will be picked up by the label printing service.
|
|
"""
|
|
try:
|
|
# Get label configuration from environment
|
|
import os
|
|
template_id = os.getenv("LABEL_TEMPLATE_ID", "vet_label")
|
|
label_size = os.getenv("LABEL_SIZE", "29x90")
|
|
test_mode = os.getenv("LABEL_TEST", "false").lower() == "true"
|
|
|
|
# Capitalize all text fields for better presentation
|
|
variables = label_request.variables.dict()
|
|
variables["practice_name"] = capitalize_label_text(variables["practice_name"])
|
|
variables["animal_name"] = capitalize_label_text(variables["animal_name"])
|
|
variables["drug_name"] = capitalize_label_text(variables["drug_name"])
|
|
variables["dosage"] = capitalize_label_text(variables["dosage"])
|
|
variables["quantity"] = capitalize_label_text(variables["quantity"])
|
|
# expiry_date doesn't need capitalization
|
|
|
|
# Convert the request to the MQTT message format
|
|
mqtt_message = {
|
|
"template_id": template_id,
|
|
"label_size": label_size,
|
|
"variables": variables,
|
|
"test": test_mode
|
|
}
|
|
|
|
# Publish to MQTT and wait for response
|
|
success, response = publish_label_print_with_response(mqtt_message, timeout=10.0)
|
|
|
|
print(f"Label print result: success={success}, response={response}")
|
|
|
|
if success:
|
|
result = LabelPrintResponse(
|
|
success=True,
|
|
message=response.get("message", "Label printed successfully")
|
|
)
|
|
print(f"Returning success response: {result}")
|
|
return result
|
|
else:
|
|
# Return error details from printer
|
|
# Check both 'message' and 'error' fields for error details
|
|
if response:
|
|
error_msg = response.get("message") or response.get("error", "Unknown error")
|
|
else:
|
|
error_msg = "No response from printer"
|
|
result = LabelPrintResponse(
|
|
success=False,
|
|
message=f"Print failed: {error_msg}"
|
|
)
|
|
print(f"Returning error response: {result}")
|
|
return result
|
|
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Error sending label print request: {str(e)}"
|
|
)
|
|
|
|
# Notes printing endpoint
|
|
@router.post("/notes/print", response_model=NotesPrintResponse)
|
|
def print_notes(notes_request: NotesPrintRequest, current_user: User = Depends(get_current_user)):
|
|
"""
|
|
Print notes by publishing an MQTT message
|
|
|
|
This endpoint publishes a notes print request to the MQTT broker,
|
|
which will be picked up by the label printing service.
|
|
"""
|
|
try:
|
|
# Get notes template configuration from environment
|
|
import os
|
|
template_id = os.getenv("NOTES_TEMPLATE_ID", "notes_template")
|
|
label_size = os.getenv("LABEL_SIZE", "29x90")
|
|
test_mode = os.getenv("LABEL_TEST", "false").lower() == "true"
|
|
|
|
# Capitalize text fields for better presentation
|
|
variables = notes_request.variables.dict()
|
|
variables["animal_name"] = capitalize_label_text(variables["animal_name"])
|
|
variables["notes"] = capitalize_label_text(variables["notes"])
|
|
|
|
# Convert the request to the MQTT message format
|
|
mqtt_message = {
|
|
"template_id": template_id,
|
|
"label_size": label_size,
|
|
"variables": variables,
|
|
"test": test_mode
|
|
}
|
|
|
|
# Publish to MQTT and wait for response
|
|
success, response = publish_label_print_with_response(mqtt_message, timeout=10.0)
|
|
|
|
print(f"Notes print result: success={success}, response={response}")
|
|
|
|
if success:
|
|
result = NotesPrintResponse(
|
|
success=True,
|
|
message=response.get("message", "Notes printed successfully")
|
|
)
|
|
print(f"Returning success response: {result}")
|
|
return result
|
|
else:
|
|
# Return error details from printer
|
|
# Check both 'message' and 'error' fields for error details
|
|
if response:
|
|
error_msg = response.get("message") or response.get("error", "Unknown error")
|
|
else:
|
|
error_msg = "No response from printer"
|
|
result = NotesPrintResponse(
|
|
success=False,
|
|
message=f"Print failed: {error_msg}"
|
|
)
|
|
print(f"Returning error response: {result}")
|
|
return result
|
|
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Error sending notes print request: {str(e)}"
|
|
)
|
|
|
|
# Include router with /api prefix
|
|
app.include_router(router)
|