Files
mt-drugs/backend/app/main.py
2026-02-07 08:46:08 -05:00

568 lines
20 KiB
Python

from fastapi import FastAPI, Depends, HTTPException, APIRouter, status
from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy.orm import Session
from typing import List, Optional, Dict, Any
from datetime import datetime, timedelta
from .database import engine, get_db, Base
from .models import Drug, DrugVariant, Dispensing, User
from .auth import hash_password, verify_password, create_access_token, get_current_user, get_current_admin_user, ACCESS_TOKEN_EXPIRE_MINUTES
from .mqtt_service import publish_label_print
from pydantic import BaseModel
# Create tables
Base.metadata.create_all(bind=engine)
app = FastAPI(title="Drug Inventory API")
# CORS middleware for frontend
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # In production, restrict this
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Create a router with /api prefix
router = APIRouter(prefix="/api")
# Pydantic schemas
class UserCreate(BaseModel):
username: str
password: str
class PasswordChange(BaseModel):
current_password: str
new_password: str
class AdminPasswordChange(BaseModel):
new_password: str
class UserResponse(BaseModel):
id: int
username: str
is_admin: bool
class Config:
from_attributes = True
class TokenResponse(BaseModel):
access_token: str
token_type: str
user: UserResponse
class DrugCreate(BaseModel):
name: str
description: Optional[str] = None
class DrugUpdate(BaseModel):
name: Optional[str] = None
description: Optional[str] = None
class DrugResponse(BaseModel):
id: int
name: str
description: Optional[str] = None
class Config:
from_attributes = True
class DrugVariantCreate(BaseModel):
strength: str
quantity: float
unit: str = "units"
low_stock_threshold: float = 10
class DrugVariantUpdate(BaseModel):
strength: str = None
quantity: float = None
unit: str = None
low_stock_threshold: float = None
class DrugVariantResponse(BaseModel):
id: int
drug_id: int
strength: str
quantity: float
unit: str
low_stock_threshold: float
class Config:
from_attributes = True
class DrugWithVariantsResponse(BaseModel):
id: int
name: str
description: Optional[str] = None
variants: List[DrugVariantResponse] = []
class Config:
from_attributes = True
class DispensingCreate(BaseModel):
drug_variant_id: int
quantity: float
animal_name: Optional[str] = None
user_name: str
notes: Optional[str] = None
class DispensingResponse(BaseModel):
id: int
drug_variant_id: int
quantity: float
animal_name: Optional[str] = None
user_name: str
notes: Optional[str] = None
dispensed_at: datetime
class Config:
from_attributes = True
class LabelVariables(BaseModel):
practice_name: str
animal_name: str
drug_name: str
dosage: str
quantity: str
expiry_date: str
class LabelPrintRequest(BaseModel):
variables: LabelVariables
class LabelPrintResponse(BaseModel):
success: bool
message: str
# Authentication Routes
@router.post("/auth/register", response_model=TokenResponse)
def register(user_data: UserCreate, db: Session = Depends(get_db)):
"""Register the first admin user (only allowed if no users exist)"""
# Check if users already exist
user_count = db.query(User).count()
if user_count > 0:
raise HTTPException(
status_code=403,
detail="Registration is disabled. Contact an administrator to create an account."
)
# Check if user already exists
existing_user = db.query(User).filter(User.username == user_data.username).first()
if existing_user:
raise HTTPException(status_code=400, detail="Username already registered")
# First (and only allowed) user is admin
hashed_password = hash_password(user_data.password)
db_user = User(
username=user_data.username,
hashed_password=hashed_password,
is_admin=True
)
db.add(db_user)
db.commit()
db.refresh(db_user)
# Create access token
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": db_user.username},
expires_delta=access_token_expires
)
return {
"access_token": access_token,
"token_type": "bearer",
"user": db_user
}
@router.post("/auth/login", response_model=TokenResponse)
def login(user_data: UserCreate, db: Session = Depends(get_db)):
"""Login with username and password"""
user = db.query(User).filter(User.username == user_data.username).first()
if not user or not verify_password(user_data.password, user.hashed_password):
raise HTTPException(status_code=401, detail="Invalid credentials")
# Create access token
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username},
expires_delta=access_token_expires
)
return {
"access_token": access_token,
"token_type": "bearer",
"user": user
}
@router.get("/auth/me", response_model=UserResponse)
def get_current_user_info(current_user: User = Depends(get_current_user)):
"""Get current user info"""
return current_user
# User Management Routes (Admin only)
@router.get("/users", response_model=List[UserResponse])
def list_users(db: Session = Depends(get_db), current_user: User = Depends(get_current_admin_user)):
"""List all users (admin only)"""
return db.query(User).all()
@router.post("/users", response_model=UserResponse)
def create_user(user_data: UserCreate, db: Session = Depends(get_db), current_user: User = Depends(get_current_admin_user)):
"""Create a new user (admin only)"""
# Check if user already exists
existing_user = db.query(User).filter(User.username == user_data.username).first()
if existing_user:
raise HTTPException(status_code=400, detail="Username already exists")
hashed_password = hash_password(user_data.password)
db_user = User(
username=user_data.username,
hashed_password=hashed_password,
is_admin=False
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
@router.delete("/users/{user_id}")
def delete_user(user_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_admin_user)):
"""Delete a user (admin only)"""
# Don't allow deleting yourself
if current_user.id == user_id:
raise HTTPException(status_code=400, detail="Cannot delete your own user account")
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
db.delete(user)
db.commit()
return {"message": "User deleted successfully"}
@router.post("/auth/change-password")
def change_own_password(password_data: PasswordChange, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
"""Change current user's password"""
user = db.query(User).filter(User.id == current_user.id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
# Verify current password
if not verify_password(password_data.current_password, user.hashed_password):
raise HTTPException(status_code=401, detail="Current password is incorrect")
# Update password
user.hashed_password = hash_password(password_data.new_password)
db.commit()
return {"message": "Password changed successfully"}
@router.post("/users/{user_id}/change-password")
def admin_change_password(user_id: int, password_data: AdminPasswordChange, db: Session = Depends(get_db), current_user: User = Depends(get_current_admin_user)):
"""Change a user's password (admin only)"""
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
# Don't allow changing yourself via this endpoint
if current_user.id == user_id:
raise HTTPException(status_code=400, detail="Use /auth/change-password to change your own password")
# Update password
user.hashed_password = hash_password(password_data.new_password)
db.commit()
return {"message": "Password changed successfully"}
# Routes
@router.get("/")
def read_root():
return {"message": "Drug Inventory API"}
@router.get("/drugs", response_model=List[DrugWithVariantsResponse])
def list_drugs(db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
"""Get all drugs with their variants"""
drugs = db.query(Drug).all()
result = []
for drug in drugs:
variants = db.query(DrugVariant).filter(DrugVariant.drug_id == drug.id).all()
drug_dict = drug.__dict__.copy()
drug_dict['variants'] = variants
result.append(drug_dict)
return result
@router.get("/drugs/low-stock", response_model=List[DrugWithVariantsResponse])
def low_stock_drugs(db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
"""Get drugs with low stock variants"""
# Get variants that are low on stock
low_stock_variants = db.query(DrugVariant).filter(
DrugVariant.quantity <= DrugVariant.low_stock_threshold
).all()
# Get unique drug IDs
drug_ids = list(set(v.drug_id for v in low_stock_variants))
drugs = db.query(Drug).filter(Drug.id.in_(drug_ids)).all()
result = []
for drug in drugs:
variants = [v for v in low_stock_variants if v.drug_id == drug.id]
drug_dict = drug.__dict__.copy()
drug_dict['variants'] = variants
result.append(drug_dict)
return result
@router.get("/drugs/{drug_id}", response_model=DrugWithVariantsResponse)
def get_drug(drug_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
"""Get a specific drug with its variants"""
drug = db.query(Drug).filter(Drug.id == drug_id).first()
if not drug:
raise HTTPException(status_code=404, detail="Drug not found")
variants = db.query(DrugVariant).filter(DrugVariant.drug_id == drug.id).all()
drug_dict = drug.__dict__.copy()
drug_dict['variants'] = variants
return drug_dict
@router.post("/drugs", response_model=DrugWithVariantsResponse)
def create_drug(drug: DrugCreate, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
"""Create a new drug"""
# Check if drug name already exists
existing = db.query(Drug).filter(Drug.name == drug.name).first()
if existing:
raise HTTPException(status_code=400, detail="Drug with this name already exists")
db_drug = Drug(name=drug.name, description=drug.description)
db.add(db_drug)
db.commit()
db.refresh(db_drug)
# Return drug with empty variants list
drug_dict = db_drug.__dict__.copy()
drug_dict['variants'] = []
return drug_dict
@router.put("/drugs/{drug_id}", response_model=DrugWithVariantsResponse)
def update_drug(drug_id: int, drug_update: DrugUpdate, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
"""Update a drug"""
drug = db.query(Drug).filter(Drug.id == drug_id).first()
if not drug:
raise HTTPException(status_code=404, detail="Drug not found")
for field, value in drug_update.dict(exclude_unset=True).items():
setattr(drug, field, value)
db.commit()
db.refresh(drug)
variants = db.query(DrugVariant).filter(DrugVariant.drug_id == drug.id).all()
drug_dict = drug.__dict__.copy()
drug_dict['variants'] = variants
return drug_dict
@router.delete("/drugs/{drug_id}")
def delete_drug(drug_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
"""Delete a drug and all its variants"""
drug = db.query(Drug).filter(Drug.id == drug_id).first()
if not drug:
raise HTTPException(status_code=404, detail="Drug not found")
# Delete all variants first
db.query(DrugVariant).filter(DrugVariant.drug_id == drug_id).delete()
# Delete the drug
db.delete(drug)
db.commit()
return {"message": "Drug and all variants deleted successfully"}
# Drug Variant endpoints
@router.post("/drugs/{drug_id}/variants", response_model=DrugVariantResponse)
def create_drug_variant(drug_id: int, variant: DrugVariantCreate, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
"""Create a new variant for a drug"""
# Check if drug exists
drug = db.query(Drug).filter(Drug.id == drug_id).first()
if not drug:
raise HTTPException(status_code=404, detail="Drug not found")
# Check if variant with same strength already exists for this drug
existing = db.query(DrugVariant).filter(
DrugVariant.drug_id == drug_id,
DrugVariant.strength == variant.strength
).first()
if existing:
raise HTTPException(status_code=400, detail="Variant with this strength already exists for this drug")
db_variant = DrugVariant(
drug_id=drug_id,
strength=variant.strength,
quantity=variant.quantity,
unit=variant.unit,
low_stock_threshold=variant.low_stock_threshold
)
db.add(db_variant)
db.commit()
db.refresh(db_variant)
return db_variant
@router.get("/variants/{variant_id}", response_model=DrugVariantResponse)
def get_drug_variant(variant_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
"""Get a specific drug variant"""
variant = db.query(DrugVariant).filter(DrugVariant.id == variant_id).first()
if not variant:
raise HTTPException(status_code=404, detail="Drug variant not found")
return variant
@router.put("/variants/{variant_id}", response_model=DrugVariantResponse)
def update_drug_variant(variant_id: int, variant_update: DrugVariantUpdate, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
"""Update a drug variant"""
variant = db.query(DrugVariant).filter(DrugVariant.id == variant_id).first()
if not variant:
raise HTTPException(status_code=404, detail="Drug variant not found")
for field, value in variant_update.dict(exclude_unset=True).items():
setattr(variant, field, value)
db.commit()
db.refresh(variant)
return variant
@router.delete("/variants/{variant_id}")
def delete_drug_variant(variant_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
"""Delete a drug variant"""
variant = db.query(DrugVariant).filter(DrugVariant.id == variant_id).first()
if not variant:
raise HTTPException(status_code=404, detail="Drug variant not found")
db.delete(variant)
db.commit()
return {"message": "Drug variant deleted successfully"}
# Dispensing endpoints
@router.post("/dispense", response_model=DispensingResponse)
def dispense_drug(dispensing: DispensingCreate, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
"""Record a drug dispensing and reduce inventory"""
# Check if drug variant exists
variant = db.query(DrugVariant).filter(DrugVariant.id == dispensing.drug_variant_id).first()
if not variant:
raise HTTPException(status_code=404, detail="Drug variant not found")
# Check if enough quantity available
if variant.quantity < dispensing.quantity:
raise HTTPException(
status_code=400,
detail=f"Insufficient quantity. Available: {variant.quantity}, Requested: {dispensing.quantity}"
)
# Reduce variant quantity
variant.quantity -= dispensing.quantity
db.commit()
# Create dispensing record
db_dispensing = Dispensing(**dispensing.dict())
db.add(db_dispensing)
db.commit()
db.refresh(db_dispensing)
return db_dispensing
@router.get("/dispense/history", response_model=List[DispensingResponse])
def list_dispensings(skip: int = 0, limit: int = 100, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
"""Get dispensing records (audit log)"""
return db.query(Dispensing).order_by(Dispensing.dispensed_at.desc()).offset(skip).limit(limit).all()
@router.get("/drugs/{drug_id}/dispense/history", response_model=List[DispensingResponse])
def get_drug_dispensings(drug_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
"""Get dispensing history for a specific drug (all variants)"""
# Verify drug exists
drug = db.query(Drug).filter(Drug.id == drug_id).first()
if not drug:
raise HTTPException(status_code=404, detail="Drug not found")
# Get all variant IDs for this drug
variant_ids = db.query(DrugVariant.id).filter(DrugVariant.drug_id == drug_id).subquery()
return db.query(Dispensing).filter(Dispensing.drug_variant_id.in_(variant_ids)).order_by(Dispensing.dispensed_at.desc()).all()
@router.get("/variants/{variant_id}/dispense/history", response_model=List[DispensingResponse])
def get_variant_dispensings(variant_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
"""Get dispensing history for a specific drug variant"""
# Verify variant exists
variant = db.query(DrugVariant).filter(DrugVariant.id == variant_id).first()
if not variant:
raise HTTPException(status_code=404, detail="Drug variant not found")
return db.query(Dispensing).filter(Dispensing.drug_variant_id == variant_id).order_by(Dispensing.dispensed_at.desc()).all()
# Helper function to capitalize text for labels
def capitalize_label_text(text: str) -> str:
"""Capitalize the first letter of each sentence in the text"""
if not text:
return text
# Capitalize first letter of the entire string
result = text[0].upper() + text[1:] if len(text) > 1 else text.upper()
# Also capitalize after periods and common sentence breaks
for delimiter in ['. ', '! ', '? ']:
parts = result.split(delimiter)
result = delimiter.join([
part[0].upper() + part[1:] if part else part
for part in parts
])
return result
# Label printing endpoint
@router.post("/labels/print", response_model=LabelPrintResponse)
def print_label(label_request: LabelPrintRequest, current_user: User = Depends(get_current_user)):
"""
Print a drug label by publishing an MQTT message
This endpoint publishes a label print request to the MQTT broker,
which will be picked up by the label printing service.
"""
try:
# Get label configuration from environment
import os
template_id = os.getenv("LABEL_TEMPLATE_ID", "vet_label")
label_size = os.getenv("LABEL_SIZE", "29x90")
test_mode = os.getenv("LABEL_TEST", "false").lower() == "true"
# Capitalize all text fields for better presentation
variables = label_request.variables.dict()
variables["practice_name"] = capitalize_label_text(variables["practice_name"])
variables["animal_name"] = capitalize_label_text(variables["animal_name"])
variables["drug_name"] = capitalize_label_text(variables["drug_name"])
variables["dosage"] = capitalize_label_text(variables["dosage"])
variables["quantity"] = capitalize_label_text(variables["quantity"])
# expiry_date doesn't need capitalization
# Convert the request to the MQTT message format
mqtt_message = {
"template_id": template_id,
"label_size": label_size,
"variables": variables,
"test": test_mode
}
# Publish to MQTT
success = publish_label_print(mqtt_message)
if success:
return LabelPrintResponse(
success=True,
message="Label print request sent successfully"
)
else:
raise HTTPException(
status_code=500,
detail="Failed to send label print request to MQTT broker"
)
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Error sending label print request: {str(e)}"
)
# Include router with /api prefix
app.include_router(router)