Files
mt-drugs/backend/app/main.py
T
2026-04-20 14:12:11 -04:00

2738 lines
97 KiB
Python

from fastapi import FastAPI, Depends, HTTPException, APIRouter, status, Response
from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy.orm import Session
from sqlalchemy import func
from typing import List, Optional, Dict, Any
from datetime import datetime, timedelta, date
import math
import json
import csv
import io
from .database import engine, get_db, Base
from .models import (
Drug,
DrugVariant,
VariantPack,
Dispensing,
DispensingAllocation,
Location,
Batch,
AuditLog,
User,
GtinMapping,
)
from .auth import hash_password, verify_password, create_access_token, get_current_user, get_current_admin_user, get_current_non_readonly_user, ACCESS_TOKEN_EXPIRE_MINUTES
from .mqtt_service import publish_label_print_with_response
from .migrate_to_roles import migrate_users_table
from .migrate_compliance import migrate_compliance_schema
from .migrate_gtin import migrate_gtin_schema
from pydantic import BaseModel
# Run migration to convert is_admin to role
try:
migrate_users_table()
except Exception as e:
print(f"Warning: Migration failed: {e}. Continuing anyway...")
try:
migrate_compliance_schema()
except Exception as e:
print(f"Warning: Compliance migration failed: {e}. Continuing anyway...")
# Create tables
Base.metadata.create_all(bind=engine)
try:
migrate_gtin_schema()
except Exception as e:
print(f"Warning: GTIN migration failed: {e}. Continuing anyway...")
# Seed default locations after table creation.
try:
migrate_compliance_schema()
except Exception as e:
print(f"Warning: Compliance seed pass failed: {e}. Continuing anyway...")
app = FastAPI(title="Drug Inventory API")
# CORS middleware for frontend
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # In production, restrict this
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Create a router with /api prefix
router = APIRouter(prefix="/api")
# Pydantic schemas
class UserCreate(BaseModel):
username: str
password: str
role: Optional[str] = "user" # admin, user, readonly
class PasswordChange(BaseModel):
current_password: str
new_password: str
class AdminPasswordChange(BaseModel):
new_password: str
class UserResponse(BaseModel):
id: int
username: str
role: str
class Config:
from_attributes = True
class TokenResponse(BaseModel):
access_token: str
token_type: str
user: UserResponse
class DrugCreate(BaseModel):
name: str
description: Optional[str] = None
is_controlled: bool = False
class DrugUpdate(BaseModel):
name: Optional[str] = None
description: Optional[str] = None
is_controlled: Optional[bool] = None
class LocationCreate(BaseModel):
name: str
class LocationUpdate(BaseModel):
name: Optional[str] = None
is_active: Optional[bool] = None
class LocationResponse(BaseModel):
id: int
name: str
is_active: bool
class Config:
from_attributes = True
class BatchCreate(BaseModel):
batch_number: str
quantity: Optional[float] = None
received_pack_id: Optional[int] = None
received_pack_count: Optional[float] = None
expiry_date: date
location_id: int
notes: Optional[str] = None
class BatchUpdate(BaseModel):
batch_number: Optional[str] = None
quantity: Optional[float] = None
received_pack_id: Optional[int] = None
received_pack_count: Optional[float] = None
expiry_date: Optional[date] = None
location_id: Optional[int] = None
notes: Optional[str] = None
class BatchDisposeRequest(BaseModel):
notes: Optional[str] = None
class GtinMappingCreate(BaseModel):
gtin: str
drug_variant_id: int
variant_pack_id: int
class GtinMappingResponse(BaseModel):
id: int
gtin: str
drug_variant_id: int
variant_pack_id: int
drug_id: int
drug_name: str
variant_strength: str
variant_unit: str
pack_label: str
pack_size_in_base_units: float
pack_unit_name: str
class Config:
from_attributes = True
class BatchResponse(BaseModel):
id: int
drug_variant_id: int
batch_number: str
quantity: float
received_pack_id: Optional[int] = None
received_pack_unit_name: Optional[str] = None
received_pack_size_snapshot: Optional[float] = None
current_full_pack_count: Optional[float] = None
current_loose_base_units: Optional[float] = None
expiry_date: date
location_id: int
location_name: Optional[str] = None
notes: Optional[str] = None
received_at: datetime
disposed_at: Optional[datetime] = None
disposed_by_user_id: Optional[int] = None
disposed_quantity: Optional[float] = None
disposal_notes: Optional[str] = None
class Config:
from_attributes = True
class DrugResponse(BaseModel):
id: int
name: str
description: Optional[str] = None
is_controlled: bool
class Config:
from_attributes = True
class DrugVariantCreate(BaseModel):
strength: str
quantity: float
unit: str = "units"
base_unit: Optional[str] = None
low_stock_threshold: float = 10
class DrugVariantUpdate(BaseModel):
strength: str = None
quantity: float = None
unit: str = None
base_unit: str = None
low_stock_threshold: float = None
class VariantPackCreate(BaseModel):
pack_unit_name: str
pack_size_in_base_units: float
is_active: bool = True
class VariantPackUpdate(BaseModel):
pack_unit_name: Optional[str] = None
pack_size_in_base_units: Optional[float] = None
is_active: Optional[bool] = None
class VariantPackResponse(BaseModel):
id: int
drug_variant_id: int
pack_unit_name: str
pack_size_in_base_units: float
is_active: bool
class Config:
from_attributes = True
class DrugVariantResponse(BaseModel):
id: int
drug_id: int
strength: str
quantity: float
unit: str
base_unit: str
low_stock_threshold: float
has_inventory_history: bool = False
packs: List[VariantPackResponse] = []
batches: List[BatchResponse] = []
class Config:
from_attributes = True
class DrugWithVariantsResponse(BaseModel):
id: int
name: str
description: Optional[str] = None
is_controlled: bool = False
variants: List[DrugVariantResponse] = []
class Config:
from_attributes = True
class VariantSummaryResponse(BaseModel):
"""Lightweight variant summary returned by GET /drugs list — no packs or batches."""
id: int
drug_id: int
strength: str
quantity: float
unit: str
low_stock_threshold: float
has_inventory_history: bool = False
expired_quantity: float = 0.0
class Config:
from_attributes = True
class DrugSummaryResponse(BaseModel):
"""Lightweight drug summary for the main list — variants without packs or batches."""
id: int
name: str
description: Optional[str] = None
is_controlled: bool = False
locations: List[str] = []
variants: List[VariantSummaryResponse] = []
class Config:
from_attributes = True
class DispensingAllocationCreate(BaseModel):
batch_id: int
quantity: float
class DispensingCreate(BaseModel):
drug_variant_id: int
quantity: Optional[float] = None
dispense_mode: str = "subunit"
dispense_source: str = "batch"
requested_pack_id: Optional[int] = None
requested_pack_count: Optional[float] = None
animal_name: Optional[str] = None
user_name: Optional[str] = None
notes: Optional[str] = None
allocations: List[DispensingAllocationCreate] = []
class DispensingAllocationResponse(BaseModel):
batch_id: int
quantity: float
class Config:
from_attributes = True
class DispensingResponse(BaseModel):
id: int
drug_variant_id: int
batch_id: Optional[int] = None
actor_user_id: Optional[int] = None
quantity: float
dispense_mode: str = "subunit"
requested_pack_id: Optional[int] = None
requested_pack_count: Optional[float] = None
animal_name: Optional[str] = None
user_name: str
notes: Optional[str] = None
dispensed_at: datetime
allocations: List[DispensingAllocationResponse] = []
class Config:
from_attributes = True
class LabelVariables(BaseModel):
practice_name: str
animal_name: str
drug_name: str
dosage: str
quantity: str
expiry_date: str
class LabelPrintRequest(BaseModel):
variables: LabelVariables
class LabelPrintResponse(BaseModel):
success: bool
message: str
class NotesVariables(BaseModel):
animal_name: str
notes: str
class NotesPrintRequest(BaseModel):
variables: NotesVariables
class NotesPrintResponse(BaseModel):
success: bool
message: str
def write_audit_log(
db: Session,
action: str,
entity_type: str,
entity_id: Optional[int],
actor: Optional[User],
details: Optional[Dict[str, Any]] = None,
) -> None:
"""Persist a best-effort audit event in the current transaction."""
payload = None
if details is not None:
payload = json.dumps(details, default=str)
log = AuditLog(
action=action,
entity_type=entity_type,
entity_id=entity_id,
actor_user_id=actor.id if actor else None,
actor_username=actor.username if actor else "system",
details=payload,
)
db.add(log)
def enrich_variant_with_batches(db: Session, variant: DrugVariant) -> Dict[str, Any]:
"""Return variant data with active batch details for API responses."""
has_batch_history = (
db.query(Batch.id)
.filter(Batch.drug_variant_id == variant.id)
.first()
is not None
)
has_dispense_history = (
db.query(Dispensing.id)
.filter(Dispensing.drug_variant_id == variant.id)
.first()
is not None
)
variant_dict = {
"id": variant.id,
"drug_id": variant.drug_id,
"strength": variant.strength,
"quantity": variant.quantity,
"unit": variant.unit,
"base_unit": variant.unit,
"low_stock_threshold": variant.low_stock_threshold,
"has_inventory_history": has_batch_history or has_dispense_history,
}
packs = (
db.query(VariantPack)
.filter(VariantPack.drug_variant_id == variant.id)
.order_by(VariantPack.is_active.desc(), VariantPack.id.asc())
.all()
)
variant_dict["packs"] = [serialize_variant_pack(pack) for pack in packs]
batches = (
db.query(Batch)
.filter(Batch.drug_variant_id == variant.id, Batch.quantity > 0)
.order_by(Batch.expiry_date.asc(), Batch.received_at.asc())
.all()
)
variant_dict["batches"] = [serialize_batch_response(db, batch) for batch in batches]
return variant_dict
def serialize_variant_with_packs(db: Session, variant: DrugVariant) -> Dict[str, Any]:
"""Return variant data with packs but without batch details (level-2 detail)."""
has_batch_history = (
db.query(Batch.id)
.filter(Batch.drug_variant_id == variant.id)
.first()
is not None
)
has_dispense_history = (
db.query(Dispensing.id)
.filter(Dispensing.drug_variant_id == variant.id)
.first()
is not None
)
packs = (
db.query(VariantPack)
.filter(VariantPack.drug_variant_id == variant.id)
.order_by(VariantPack.is_active.desc(), VariantPack.id.asc())
.all()
)
return {
"id": variant.id,
"drug_id": variant.drug_id,
"strength": variant.strength,
"quantity": variant.quantity,
"unit": variant.unit,
"base_unit": variant.unit,
"low_stock_threshold": variant.low_stock_threshold,
"has_inventory_history": has_batch_history or has_dispense_history,
"packs": [serialize_variant_pack(pack) for pack in packs],
"batches": [],
}
def serialize_batch_response(db: Session, batch: Batch) -> Dict[str, Any]:
location = db.query(Location).filter(Location.id == batch.location_id).first()
pack = None
if batch.received_pack_id is not None:
pack = db.query(VariantPack).filter(VariantPack.id == batch.received_pack_id).first()
return {
"id": batch.id,
"drug_variant_id": batch.drug_variant_id,
"batch_number": batch.batch_number,
"quantity": batch.quantity,
"received_pack_id": batch.received_pack_id,
"received_pack_unit_name": pack.pack_unit_name if pack else None,
"received_pack_count": batch.received_pack_count,
"received_pack_size_snapshot": batch.received_pack_size_snapshot,
"current_full_pack_count": batch.current_full_pack_count,
"current_loose_base_units": batch.current_loose_base_units,
"expiry_date": batch.expiry_date,
"location_id": batch.location_id,
"location_name": location.name if location else None,
"notes": batch.notes,
"received_at": batch.received_at,
"disposed_at": batch.disposed_at,
"disposed_by_user_id": batch.disposed_by_user_id,
"disposed_quantity": batch.disposed_quantity,
"disposal_notes": batch.disposal_notes,
}
def resolve_pack_size_snapshot(db: Session, pack_id: Optional[int]) -> Optional[float]:
if pack_id is None:
return None
pack = db.query(VariantPack).filter(VariantPack.id == pack_id).first()
if not pack:
return None
return pack.pack_size_in_base_units
def recompute_batch_pack_state(batch: Batch) -> None:
pack_size = batch.received_pack_size_snapshot
if pack_size is None or pack_size <= 0 or batch.quantity < 0:
batch.current_full_pack_count = None
batch.current_loose_base_units = None
return
full_packs = math.floor((batch.quantity + 1e-9) / pack_size)
loose_units = batch.quantity - (full_packs * pack_size)
if loose_units < 1e-9:
loose_units = 0.0
batch.current_full_pack_count = float(full_packs)
batch.current_loose_base_units = loose_units
def serialize_variant_pack(pack: VariantPack) -> Dict[str, Any]:
return {
"id": pack.id,
"drug_variant_id": pack.drug_variant_id,
"pack_unit_name": pack.pack_unit_name,
"pack_size_in_base_units": pack.pack_size_in_base_units,
"is_active": pack.is_active,
}
def resolve_pack_quantity(
db: Session,
variant_id: int,
quantity: Optional[float],
pack_id: Optional[int],
pack_count: Optional[float],
) -> Dict[str, Any]:
"""Resolve canonical base-unit quantity from either direct quantity or pack input."""
if quantity is None and pack_id is None and pack_count is None:
raise HTTPException(status_code=400, detail="Either quantity or pack fields must be provided")
resolved_quantity = quantity
resolved_pack: Optional[VariantPack] = None
if pack_id is not None or pack_count is not None:
if pack_id is None or pack_count is None:
raise HTTPException(status_code=400, detail="Both pack_id and pack_count are required when using pack input")
if pack_count <= 0:
raise HTTPException(status_code=400, detail="Pack count must be greater than zero")
resolved_pack = (
db.query(VariantPack)
.filter(
VariantPack.id == pack_id,
VariantPack.drug_variant_id == variant_id,
VariantPack.is_active.is_(True),
)
.first()
)
if resolved_pack is None:
raise HTTPException(status_code=400, detail="Pack not found for variant or is inactive")
derived_quantity = pack_count * resolved_pack.pack_size_in_base_units
if derived_quantity <= 0:
raise HTTPException(status_code=400, detail="Derived quantity from pack must be greater than zero")
if resolved_quantity is None:
resolved_quantity = derived_quantity
elif abs(resolved_quantity - derived_quantity) > 1e-6:
raise HTTPException(
status_code=400,
detail="Quantity does not match pack conversion for selected pack",
)
if resolved_quantity is None or resolved_quantity <= 0:
raise HTTPException(status_code=400, detail="Quantity must be greater than zero")
return {
"quantity": resolved_quantity,
"pack_id": resolved_pack.id if resolved_pack else None,
"pack_count": pack_count,
}
def resolve_requested_allocations(
db: Session,
variant_id: int,
variant_quantity: float,
requested_quantity: float,
requested_allocations: List[DispensingAllocationCreate],
dispense_mode: str,
dispense_source: str,
requested_pack_id: Optional[int],
) -> List[Dict[str, Any]]:
"""Validate explicit batch allocations against in-date stock for the variant."""
today = date.today()
total_batched_quantity = sum(float(batch.quantity or 0) for batch in db.query(Batch).filter(Batch.drug_variant_id == variant_id).all())
legacy_unbatched_quantity = max(0.0, float(variant_quantity or 0) - total_batched_quantity)
selected_source = (dispense_source or "batch").strip().lower()
if selected_source not in {"batch", "legacy"}:
raise HTTPException(status_code=400, detail="dispense_source must be either 'batch' or 'legacy'")
eligible_batches = (
db.query(Batch)
.filter(
Batch.drug_variant_id == variant_id,
Batch.quantity > 0,
Batch.expiry_date >= today,
)
.order_by(Batch.expiry_date.asc(), Batch.received_at.asc())
.all()
)
if selected_source == "legacy":
if dispense_mode == "pack":
raise HTTPException(status_code=400, detail="Whole-pack dispensing requires batched stock with pack information")
if requested_allocations:
raise HTTPException(status_code=400, detail="Batch allocations cannot be supplied when dispensing legacy stock")
if legacy_unbatched_quantity <= 0:
raise HTTPException(status_code=400, detail="No legacy loose stock is available for this variant")
if requested_quantity - legacy_unbatched_quantity > 1e-6:
raise HTTPException(
status_code=400,
detail=f"Insufficient unbatched stock. Available: {legacy_unbatched_quantity}, Requested: {requested_quantity}",
)
return []
if not eligible_batches:
if dispense_mode == "pack":
raise HTTPException(status_code=400, detail="Whole-pack dispensing requires batched stock with pack information")
if requested_allocations:
raise HTTPException(status_code=400, detail="Batch allocations cannot be supplied when dispensing legacy stock")
if legacy_unbatched_quantity <= 0:
raise HTTPException(status_code=400, detail="No in-date stock batches available for this variant")
if requested_quantity - legacy_unbatched_quantity > 1e-6:
raise HTTPException(
status_code=400,
detail=f"Insufficient unbatched stock. Available: {legacy_unbatched_quantity}, Requested: {requested_quantity}",
)
return []
if not requested_allocations:
raise HTTPException(status_code=400, detail="At least one batch allocation is required")
eligible_by_id = {batch.id: batch for batch in eligible_batches}
seen_batch_ids = set()
allocations: List[Dict[str, Any]] = []
total_allocated = 0.0
selected_pack = None
selected_pack_size = None
if dispense_mode == "pack":
if requested_pack_id is None:
raise HTTPException(status_code=400, detail="Pack dispense requires a requested pack")
selected_pack = (
db.query(VariantPack)
.filter(
VariantPack.id == requested_pack_id,
VariantPack.drug_variant_id == variant_id,
VariantPack.is_active.is_(True),
)
.first()
)
if selected_pack is None:
raise HTTPException(status_code=400, detail="Selected pack is unavailable for this variant")
selected_pack_size = selected_pack.pack_size_in_base_units
total_full_packs_available = sum(
int(batch.current_full_pack_count or 0)
for batch in eligible_batches
if batch.received_pack_id == requested_pack_id
)
if total_full_packs_available <= 0:
raise HTTPException(status_code=400, detail="No full packs are available for the selected pack")
if requested_quantity - (total_full_packs_available * selected_pack_size) > 1e-6:
raise HTTPException(
status_code=400,
detail=f"Only {total_full_packs_available} full packs are available for the selected pack",
)
for entry in requested_allocations:
batch = eligible_by_id.get(entry.batch_id)
if batch is None:
raise HTTPException(status_code=400, detail=f"Batch {entry.batch_id} is unavailable, expired, or not valid for this variant")
if entry.batch_id in seen_batch_ids:
raise HTTPException(status_code=400, detail="Each batch may only be allocated once")
if entry.quantity < 0:
raise HTTPException(status_code=400, detail="Batch allocation quantity cannot be negative")
if entry.quantity == 0:
continue
if entry.quantity - batch.quantity > 1e-6:
raise HTTPException(status_code=400, detail=f"Batch {batch.batch_number} does not have enough stock for requested allocation")
if dispense_mode == "pack":
if batch.received_pack_id != requested_pack_id:
raise HTTPException(
status_code=400,
detail=f"Batch {batch.batch_number} does not contain full packs of the selected pack type",
)
available_full_packs = int(batch.current_full_pack_count or 0)
available_batch_quantity = available_full_packs * selected_pack_size
if available_full_packs <= 0 or available_batch_quantity <= 0:
raise HTTPException(
status_code=400,
detail=f"Batch {batch.batch_number} has no full packs available",
)
if abs((entry.quantity / selected_pack_size) - round(entry.quantity / selected_pack_size)) > 1e-6:
raise HTTPException(
status_code=400,
detail=f"Batch {batch.batch_number} allocation must be a whole number of packs",
)
if entry.quantity - available_batch_quantity > 1e-6:
raise HTTPException(
status_code=400,
detail=f"Batch {batch.batch_number} only has {available_full_packs} full packs available",
)
seen_batch_ids.add(entry.batch_id)
allocations.append({"batch": batch, "quantity": entry.quantity})
total_allocated += entry.quantity
if not allocations:
raise HTTPException(status_code=400, detail="At least one batch allocation must be greater than zero")
if abs(total_allocated - requested_quantity) > 1e-6:
raise HTTPException(
status_code=400,
detail=f"Allocated quantity ({total_allocated}) must match requested quantity ({requested_quantity})",
)
return allocations
# Authentication Routes
@router.post("/auth/register", response_model=TokenResponse)
def register(user_data: UserCreate, db: Session = Depends(get_db)):
"""Register the first admin user (only allowed if no users exist)"""
# Check if users already exist
user_count = db.query(User).count()
if user_count > 0:
raise HTTPException(
status_code=403,
detail="Registration is disabled. Contact an administrator to create an account."
)
# Check if user already exists
existing_user = db.query(User).filter(User.username == user_data.username).first()
if existing_user:
raise HTTPException(status_code=400, detail="Username already registered")
# First (and only allowed) user is admin
hashed_password = hash_password(user_data.password)
db_user = User(
username=user_data.username,
hashed_password=hashed_password,
role="admin"
)
db.add(db_user)
write_audit_log(
db,
action="auth.register",
entity_type="user",
entity_id=None,
actor=None,
details={"username": user_data.username, "assigned_role": "admin"},
)
db.commit()
db.refresh(db_user)
# Create access token
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": db_user.username},
expires_delta=access_token_expires
)
return {
"access_token": access_token,
"token_type": "bearer",
"user": db_user
}
@router.post("/auth/login", response_model=TokenResponse)
def login(user_data: UserCreate, db: Session = Depends(get_db)):
"""Login with username and password"""
user = db.query(User).filter(User.username == user_data.username).first()
if not user or not verify_password(user_data.password, user.hashed_password):
raise HTTPException(status_code=401, detail="Invalid credentials")
write_audit_log(
db,
action="auth.login",
entity_type="user",
entity_id=user.id,
actor=user,
details={"username": user.username},
)
db.commit()
# Create access token
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username},
expires_delta=access_token_expires
)
return {
"access_token": access_token,
"token_type": "bearer",
"user": user
}
@router.get("/auth/me", response_model=UserResponse)
def get_current_user_info(current_user: User = Depends(get_current_user)):
"""Get current user info"""
return current_user
# User Management Routes (Admin only)
@router.get("/users", response_model=List[UserResponse])
def list_users(db: Session = Depends(get_db), current_user: User = Depends(get_current_admin_user)):
"""List all users (admin only)"""
return db.query(User).all()
@router.post("/users", response_model=UserResponse)
def create_user(user_data: UserCreate, db: Session = Depends(get_db), current_user: User = Depends(get_current_admin_user)):
"""Create a new user (admin only)"""
# Check if user already exists
existing_user = db.query(User).filter(User.username == user_data.username).first()
if existing_user:
raise HTTPException(status_code=400, detail="Username already exists")
# Validate role
valid_roles = ["admin", "user", "readonly"]
role = user_data.role or "user"
if role not in valid_roles:
raise HTTPException(status_code=400, detail=f"Invalid role. Must be one of: {', '.join(valid_roles)}")
hashed_password = hash_password(user_data.password)
db_user = User(
username=user_data.username,
hashed_password=hashed_password,
role=role
)
db.add(db_user)
write_audit_log(
db,
action="user.create",
entity_type="user",
entity_id=None,
actor=current_user,
details={"username": user_data.username, "role": role},
)
db.commit()
db.refresh(db_user)
return db_user
@router.delete("/users/{user_id}")
def delete_user(user_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_admin_user)):
"""Delete a user (admin only)"""
# Don't allow deleting yourself
if current_user.id == user_id:
raise HTTPException(status_code=400, detail="Cannot delete your own user account")
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
write_audit_log(
db,
action="user.delete",
entity_type="user",
entity_id=user_id,
actor=current_user,
details={"username": user.username},
)
db.delete(user)
db.commit()
return {"message": "User deleted successfully"}
@router.post("/auth/change-password")
def change_own_password(password_data: PasswordChange, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
"""Change current user's password"""
user = db.query(User).filter(User.id == current_user.id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
# Verify current password
if not verify_password(password_data.current_password, user.hashed_password):
raise HTTPException(status_code=401, detail="Current password is incorrect")
# Update password
user.hashed_password = hash_password(password_data.new_password)
write_audit_log(
db,
action="auth.password.change.self",
entity_type="user",
entity_id=user.id,
actor=current_user,
details={"username": user.username},
)
db.commit()
return {"message": "Password changed successfully"}
@router.post("/users/{user_id}/change-password")
def admin_change_password(user_id: int, password_data: AdminPasswordChange, db: Session = Depends(get_db), current_user: User = Depends(get_current_admin_user)):
"""Change a user's password (admin only)"""
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
# Don't allow changing yourself via this endpoint
if current_user.id == user_id:
raise HTTPException(status_code=400, detail="Use /auth/change-password to change your own password")
# Update password
user.hashed_password = hash_password(password_data.new_password)
write_audit_log(
db,
action="auth.password.change.admin",
entity_type="user",
entity_id=user.id,
actor=current_user,
details={"username": user.username},
)
db.commit()
return {"message": "Password changed successfully"}
# Routes
@router.get("/")
def read_root():
return {"message": "Drug Inventory API"}
@router.get("/drugs", response_model=List[DrugSummaryResponse])
def list_drugs(db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
"""Get all drugs with lightweight variant summaries (no packs or batches)."""
drugs = db.query(Drug).all()
if not drugs:
return []
drug_ids = [d.id for d in drugs]
all_variants = (
db.query(DrugVariant)
.filter(DrugVariant.drug_id.in_(drug_ids))
.all()
)
variants_by_drug: Dict[int, list] = {}
for v in all_variants:
variants_by_drug.setdefault(v.drug_id, []).append(v)
variant_ids = [v.id for v in all_variants]
if not variant_ids:
return [
{
"id": d.id,
"name": d.name,
"description": d.description,
"is_controlled": bool(d.is_controlled),
"locations": [],
"variants": [],
}
for d in drugs
]
# Variant IDs that have ever had a batch received
batch_history_ids = set(
row[0]
for row in db.query(Batch.drug_variant_id)
.filter(Batch.drug_variant_id.in_(variant_ids))
.distinct()
.all()
)
# Variant IDs that have ever had a dispense
dispense_history_ids = set(
row[0]
for row in db.query(Dispensing.drug_variant_id)
.filter(Dispensing.drug_variant_id.in_(variant_ids))
.distinct()
.all()
)
# Sum of expired active batch stock per variant
today = date.today()
expired_rows = (
db.query(Batch.drug_variant_id, func.sum(Batch.quantity))
.filter(
Batch.drug_variant_id.in_(variant_ids),
Batch.quantity > 0,
Batch.expiry_date < today,
)
.group_by(Batch.drug_variant_id)
.all()
)
expired_by_variant: Dict[int, float] = {row[0]: float(row[1]) for row in expired_rows}
# Distinct location names per drug (from active batch stock)
location_rows = (
db.query(DrugVariant.drug_id, Location.name)
.join(Batch, Batch.drug_variant_id == DrugVariant.id)
.join(Location, Location.id == Batch.location_id)
.filter(
DrugVariant.drug_id.in_(drug_ids),
Batch.quantity > 0,
)
.distinct()
.all()
)
locations_by_drug: Dict[int, list] = {}
for drug_id, loc_name in location_rows:
locations_by_drug.setdefault(drug_id, []).append(loc_name)
result = []
for drug in drugs:
drug_variants = variants_by_drug.get(drug.id, [])
variant_summaries = [
{
"id": v.id,
"drug_id": v.drug_id,
"strength": v.strength,
"quantity": v.quantity,
"unit": v.unit,
"low_stock_threshold": v.low_stock_threshold,
"has_inventory_history": v.id in batch_history_ids or v.id in dispense_history_ids,
"expired_quantity": expired_by_variant.get(v.id, 0.0),
}
for v in drug_variants
]
result.append(
{
"id": drug.id,
"name": drug.name,
"description": drug.description,
"is_controlled": bool(drug.is_controlled),
"locations": locations_by_drug.get(drug.id, []),
"variants": variant_summaries,
}
)
return result
@router.get("/drugs/low-stock", response_model=List[DrugWithVariantsResponse])
def low_stock_drugs(db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
"""Get drugs with low stock variants"""
# Get variants that are low on stock
low_stock_variants = db.query(DrugVariant).filter(
DrugVariant.quantity <= DrugVariant.low_stock_threshold
).all()
# Get unique drug IDs
drug_ids = list(set(v.drug_id for v in low_stock_variants))
drugs = db.query(Drug).filter(Drug.id.in_(drug_ids)).all()
result = []
for drug in drugs:
variants = [v for v in low_stock_variants if v.drug_id == drug.id]
drug_dict = {
"id": drug.id,
"name": drug.name,
"description": drug.description,
"is_controlled": bool(drug.is_controlled),
"variants": [enrich_variant_with_batches(db, v) for v in variants],
}
result.append(drug_dict)
return result
@router.get("/drugs/{drug_id}", response_model=DrugWithVariantsResponse)
def get_drug(drug_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
"""Get a specific drug with its variants and packs (no batch detail — level-2 detail)."""
drug = db.query(Drug).filter(Drug.id == drug_id).first()
if not drug:
raise HTTPException(status_code=404, detail="Drug not found")
variants = db.query(DrugVariant).filter(DrugVariant.drug_id == drug.id).all()
drug_dict = {
"id": drug.id,
"name": drug.name,
"description": drug.description,
"is_controlled": bool(drug.is_controlled),
"variants": [serialize_variant_with_packs(db, v) for v in variants],
}
return drug_dict
@router.post("/drugs", response_model=DrugWithVariantsResponse)
def create_drug(drug: DrugCreate, db: Session = Depends(get_db), current_user: User = Depends(get_current_non_readonly_user)):
"""Create a new drug"""
# Check if drug name already exists
existing = db.query(Drug).filter(Drug.name == drug.name).first()
if existing:
raise HTTPException(status_code=400, detail="Drug with this name already exists")
db_drug = Drug(name=drug.name, description=drug.description, is_controlled=drug.is_controlled)
db.add(db_drug)
write_audit_log(
db,
action="drug.create",
entity_type="drug",
entity_id=None,
actor=current_user,
details={"name": drug.name, "is_controlled": drug.is_controlled},
)
db.commit()
db.refresh(db_drug)
# Return drug with empty variants list
drug_dict = {
"id": db_drug.id,
"name": db_drug.name,
"description": db_drug.description,
"is_controlled": bool(db_drug.is_controlled),
"variants": [],
}
return drug_dict
@router.put("/drugs/{drug_id}", response_model=DrugWithVariantsResponse)
def update_drug(drug_id: int, drug_update: DrugUpdate, db: Session = Depends(get_db), current_user: User = Depends(get_current_non_readonly_user)):
"""Update a drug"""
drug = db.query(Drug).filter(Drug.id == drug_id).first()
if not drug:
raise HTTPException(status_code=404, detail="Drug not found")
before = {
"name": drug.name,
"description": drug.description,
"is_controlled": bool(drug.is_controlled),
}
for field, value in drug_update.dict(exclude_unset=True).items():
setattr(drug, field, value)
write_audit_log(
db,
action="drug.update",
entity_type="drug",
entity_id=drug.id,
actor=current_user,
details={"before": before, "after": drug_update.dict(exclude_unset=True)},
)
db.commit()
db.refresh(drug)
variants = db.query(DrugVariant).filter(DrugVariant.drug_id == drug.id).all()
drug_dict = {
"id": drug.id,
"name": drug.name,
"description": drug.description,
"is_controlled": bool(drug.is_controlled),
"variants": [enrich_variant_with_batches(db, v) for v in variants],
}
return drug_dict
@router.delete("/drugs/{drug_id}")
def delete_drug(drug_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_non_readonly_user)):
"""Delete a drug and all its variants"""
drug = db.query(Drug).filter(Drug.id == drug_id).first()
if not drug:
raise HTTPException(status_code=404, detail="Drug not found")
variant_ids = [row[0] for row in db.query(DrugVariant.id).filter(DrugVariant.drug_id == drug_id).all()]
if variant_ids:
has_batch_history = (
db.query(Batch.id)
.filter(Batch.drug_variant_id.in_(variant_ids))
.first()
is not None
)
has_dispense_history = (
db.query(Dispensing.id)
.filter(Dispensing.drug_variant_id.in_(variant_ids))
.first()
is not None
)
if has_batch_history or has_dispense_history:
raise HTTPException(
status_code=400,
detail="Cannot delete drug with variants that have batch or dispensing history. Archive or manage records first.",
)
if variant_ids:
batch_ids = [row[0] for row in db.query(Batch.id).filter(Batch.drug_variant_id.in_(variant_ids)).all()]
if batch_ids:
db.query(DispensingAllocation).filter(DispensingAllocation.batch_id.in_(batch_ids)).delete(synchronize_session=False)
db.query(Batch).filter(Batch.id.in_(batch_ids)).delete(synchronize_session=False)
db.query(Dispensing).filter(Dispensing.drug_variant_id.in_(variant_ids)).delete(synchronize_session=False)
db.query(VariantPack).filter(VariantPack.drug_variant_id.in_(variant_ids)).delete(synchronize_session=False)
db.query(DrugVariant).filter(DrugVariant.id.in_(variant_ids)).delete(synchronize_session=False)
write_audit_log(
db,
action="drug.delete",
entity_type="drug",
entity_id=drug_id,
actor=current_user,
details={"name": drug.name},
)
# Delete the drug
db.delete(drug)
db.commit()
return {"message": "Drug and all variants deleted successfully"}
# Drug Variant endpoints
@router.post("/drugs/{drug_id}/variants", response_model=DrugVariantResponse)
def create_drug_variant(drug_id: int, variant: DrugVariantCreate, db: Session = Depends(get_db), current_user: User = Depends(get_current_non_readonly_user)):
"""Create a new variant for a drug"""
# Check if drug exists
drug = db.query(Drug).filter(Drug.id == drug_id).first()
if not drug:
raise HTTPException(status_code=404, detail="Drug not found")
# Check if variant with same strength already exists for this drug
existing = db.query(DrugVariant).filter(
DrugVariant.drug_id == drug_id,
DrugVariant.strength == variant.strength
).first()
if existing:
raise HTTPException(status_code=400, detail="Variant with this strength already exists for this drug")
base_unit = (variant.base_unit or variant.unit).strip()
if not base_unit:
raise HTTPException(status_code=400, detail="Variant unit/base_unit cannot be empty")
db_variant = DrugVariant(
drug_id=drug_id,
strength=variant.strength,
quantity=variant.quantity,
unit=base_unit,
low_stock_threshold=variant.low_stock_threshold
)
db.add(db_variant)
db.flush()
# Ensure each variant has at least one active default 1:1 pack representation.
db.add(VariantPack(
drug_variant_id=db_variant.id,
pack_unit_name=base_unit,
pack_size_in_base_units=1,
is_active=True,
))
write_audit_log(
db,
action="variant.create",
entity_type="drug_variant",
entity_id=None,
actor=current_user,
details={
"drug_id": drug_id,
"strength": variant.strength,
"quantity": variant.quantity,
"unit": base_unit,
},
)
db.commit()
db.refresh(db_variant)
return enrich_variant_with_batches(db, db_variant)
@router.get("/variants/{variant_id}", response_model=DrugVariantResponse)
def get_drug_variant(variant_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
"""Get a specific drug variant"""
variant = db.query(DrugVariant).filter(DrugVariant.id == variant_id).first()
if not variant:
raise HTTPException(status_code=404, detail="Drug variant not found")
return enrich_variant_with_batches(db, variant)
@router.put("/variants/{variant_id}", response_model=DrugVariantResponse)
def update_drug_variant(variant_id: int, variant_update: DrugVariantUpdate, db: Session = Depends(get_db), current_user: User = Depends(get_current_non_readonly_user)):
"""Update a drug variant"""
variant = db.query(DrugVariant).filter(DrugVariant.id == variant_id).first()
if not variant:
raise HTTPException(status_code=404, detail="Drug variant not found")
before = {
"strength": variant.strength,
"quantity": variant.quantity,
"unit": variant.unit,
"low_stock_threshold": variant.low_stock_threshold,
}
payload = variant_update.dict(exclude_unset=True)
if "base_unit" in payload and payload["base_unit"] is not None:
cleaned_base_unit = payload["base_unit"].strip()
if not cleaned_base_unit:
raise HTTPException(status_code=400, detail="base_unit cannot be empty")
payload["unit"] = cleaned_base_unit
payload.pop("base_unit", None)
has_batch_history = (
db.query(Batch.id)
.filter(Batch.drug_variant_id == variant_id)
.first()
is not None
)
has_dispense_history = (
db.query(Dispensing.id)
.filter(Dispensing.drug_variant_id == variant_id)
.first()
is not None
)
is_locked = has_batch_history or has_dispense_history
locked_field_changes = []
if is_locked:
if "strength" in payload and payload["strength"] != variant.strength:
locked_field_changes.append("strength")
if "unit" in payload and payload["unit"] != variant.unit:
locked_field_changes.append("base_unit")
if "quantity" in payload and payload["quantity"] != variant.quantity:
locked_field_changes.append("quantity")
if locked_field_changes:
raise HTTPException(
status_code=400,
detail=(
"Cannot change "
+ ", ".join(locked_field_changes)
+ " after batches or dispensing history exist for this variant"
),
)
for field, value in payload.items():
setattr(variant, field, value)
write_audit_log(
db,
action="variant.update",
entity_type="drug_variant",
entity_id=variant.id,
actor=current_user,
details={"before": before, "after": payload},
)
db.commit()
db.refresh(variant)
return enrich_variant_with_batches(db, variant)
@router.delete("/variants/{variant_id}")
def delete_drug_variant(variant_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_non_readonly_user)):
"""Delete a drug variant"""
variant = db.query(DrugVariant).filter(DrugVariant.id == variant_id).first()
if not variant:
raise HTTPException(status_code=404, detail="Drug variant not found")
has_batch_history = (
db.query(Batch.id)
.filter(Batch.drug_variant_id == variant_id)
.first()
is not None
)
has_dispense_history = (
db.query(Dispensing.id)
.filter(Dispensing.drug_variant_id == variant_id)
.first()
is not None
)
if has_batch_history or has_dispense_history:
raise HTTPException(
status_code=400,
detail="Cannot delete variant with batch or dispensing history. Archive or manage records first.",
)
batch_ids = [row[0] for row in db.query(Batch.id).filter(Batch.drug_variant_id == variant_id).all()]
if batch_ids:
db.query(DispensingAllocation).filter(DispensingAllocation.batch_id.in_(batch_ids)).delete(synchronize_session=False)
db.query(Batch).filter(Batch.id.in_(batch_ids)).delete(synchronize_session=False)
db.query(Dispensing).filter(Dispensing.drug_variant_id == variant_id).delete(synchronize_session=False)
db.query(VariantPack).filter(VariantPack.drug_variant_id == variant_id).delete(synchronize_session=False)
write_audit_log(
db,
action="variant.delete",
entity_type="drug_variant",
entity_id=variant_id,
actor=current_user,
details={"drug_id": variant.drug_id, "strength": variant.strength},
)
db.delete(variant)
db.commit()
return {"message": "Drug variant deleted successfully"}
@router.get("/variants/{variant_id}/packs", response_model=List[VariantPackResponse])
def list_variant_packs(
variant_id: int,
active_only: bool = False,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
variant = db.query(DrugVariant).filter(DrugVariant.id == variant_id).first()
if not variant:
raise HTTPException(status_code=404, detail="Drug variant not found")
query = db.query(VariantPack).filter(VariantPack.drug_variant_id == variant_id)
if active_only:
query = query.filter(VariantPack.is_active.is_(True))
packs = query.order_by(VariantPack.is_active.desc(), VariantPack.id.asc()).all()
return [serialize_variant_pack(pack) for pack in packs]
@router.post("/variants/{variant_id}/packs", response_model=VariantPackResponse)
def create_variant_pack(
variant_id: int,
payload: VariantPackCreate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_non_readonly_user),
):
variant = db.query(DrugVariant).filter(DrugVariant.id == variant_id).first()
if not variant:
raise HTTPException(status_code=404, detail="Drug variant not found")
pack_unit_name = payload.pack_unit_name.strip()
if not pack_unit_name:
raise HTTPException(status_code=400, detail="Pack unit name cannot be empty")
if payload.pack_size_in_base_units <= 0:
raise HTTPException(status_code=400, detail="Pack size in base units must be greater than zero")
row = VariantPack(
drug_variant_id=variant_id,
pack_unit_name=pack_unit_name,
pack_size_in_base_units=payload.pack_size_in_base_units,
is_active=payload.is_active,
)
db.add(row)
write_audit_log(
db,
action="variant_pack.create",
entity_type="variant_pack",
entity_id=None,
actor=current_user,
details={
"variant_id": variant_id,
"pack_unit_name": pack_unit_name,
"pack_size_in_base_units": payload.pack_size_in_base_units,
"is_active": payload.is_active,
},
)
db.commit()
db.refresh(row)
return serialize_variant_pack(row)
@router.put("/variant-packs/{pack_id}", response_model=VariantPackResponse)
def update_variant_pack(
pack_id: int,
payload: VariantPackUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_non_readonly_user),
):
row = db.query(VariantPack).filter(VariantPack.id == pack_id).first()
if not row:
raise HTTPException(status_code=404, detail="Variant pack not found")
before = serialize_variant_pack(row)
if payload.pack_unit_name is not None:
cleaned = payload.pack_unit_name.strip()
if not cleaned:
raise HTTPException(status_code=400, detail="Pack unit name cannot be empty")
row.pack_unit_name = cleaned
if payload.pack_size_in_base_units is not None:
if payload.pack_size_in_base_units <= 0:
raise HTTPException(status_code=400, detail="Pack size in base units must be greater than zero")
row.pack_size_in_base_units = payload.pack_size_in_base_units
if payload.is_active is not None and payload.is_active is False:
# Keep at least one active pack per variant to preserve usable receive/dispense UX.
active_count = db.query(VariantPack).filter(
VariantPack.drug_variant_id == row.drug_variant_id,
VariantPack.is_active.is_(True),
VariantPack.id != row.id,
).count()
if active_count == 0:
raise HTTPException(status_code=400, detail="At least one active pack must remain for this variant")
row.is_active = False
elif payload.is_active is not None and payload.is_active is True:
row.is_active = True
write_audit_log(
db,
action="variant_pack.update",
entity_type="variant_pack",
entity_id=pack_id,
actor=current_user,
details={"before": before, "after": payload.dict(exclude_unset=True)},
)
db.commit()
db.refresh(row)
return serialize_variant_pack(row)
# Dispensing endpoints
@router.post("/dispense", response_model=DispensingResponse)
def dispense_drug(dispensing: DispensingCreate, db: Session = Depends(get_db), current_user: User = Depends(get_current_non_readonly_user)):
"""Record a drug dispensing and reduce inventory"""
# Check if drug variant exists
variant = db.query(DrugVariant).filter(DrugVariant.id == dispensing.drug_variant_id).first()
if not variant:
raise HTTPException(status_code=404, detail="Drug variant not found")
dispense_mode = (dispensing.dispense_mode or "subunit").strip().lower()
if dispense_mode not in {"subunit", "pack"}:
raise HTTPException(status_code=400, detail="dispense_mode must be either 'subunit' or 'pack'")
if dispense_mode == "pack":
if dispensing.requested_pack_id is None or dispensing.requested_pack_count is None:
raise HTTPException(status_code=400, detail="Pack dispense requires requested_pack_id and requested_pack_count")
if dispensing.requested_pack_count <= 0:
raise HTTPException(status_code=400, detail="Pack count must be greater than zero")
if abs(dispensing.requested_pack_count - round(dispensing.requested_pack_count)) > 1e-6:
raise HTTPException(status_code=400, detail="Whole-pack dispense requires an integer pack count")
resolved = resolve_pack_quantity(
db,
variant_id=variant.id,
quantity=None,
pack_id=dispensing.requested_pack_id,
pack_count=dispensing.requested_pack_count,
)
else:
if dispensing.quantity is None or dispensing.quantity <= 0:
raise HTTPException(status_code=400, detail="Subunit dispense requires quantity > 0")
resolved = resolve_pack_quantity(
db,
variant_id=variant.id,
quantity=dispensing.quantity,
pack_id=None,
pack_count=None,
)
dispense_qty = resolved["quantity"]
# Check if enough total quantity available from active stock (legacy and batch-based remain in sync).
if variant.quantity < dispense_qty:
raise HTTPException(
status_code=400,
detail=f"Insufficient quantity. Available: {variant.quantity}, Requested: {dispense_qty}",
)
allocations = resolve_requested_allocations(
db,
variant_id=variant.id,
variant_quantity=variant.quantity,
requested_quantity=dispense_qty,
requested_allocations=dispensing.allocations,
dispense_mode=dispense_mode,
dispense_source=dispensing.dispense_source,
requested_pack_id=resolved["pack_id"],
)
selected_source = (dispensing.dispense_source or ("legacy" if not allocations else "batch")).strip().lower()
user_name = dispensing.user_name or current_user.username
primary_batch_id = allocations[0]["batch"].id if allocations else None
db_dispensing = Dispensing(
drug_variant_id=dispensing.drug_variant_id,
batch_id=primary_batch_id,
actor_user_id=current_user.id,
quantity=dispense_qty,
dispense_mode=dispense_mode,
requested_pack_id=resolved["pack_id"],
requested_pack_count=resolved["pack_count"],
animal_name=dispensing.animal_name,
user_name=user_name,
notes=dispensing.notes,
)
db.add(db_dispensing)
db.flush()
allocation_payload = []
for allocation in allocations:
batch = allocation["batch"]
qty = allocation["quantity"]
batch.quantity -= qty
recompute_batch_pack_state(batch)
allocation_payload.append({"batch_id": batch.id, "quantity": qty})
db.add(DispensingAllocation(dispensing_id=db_dispensing.id, batch_id=batch.id, quantity=qty))
# Keep legacy variant quantity field in sync for existing frontend flows.
variant.quantity -= dispense_qty
write_audit_log(
db,
action="dispense.create",
entity_type="dispensing",
entity_id=db_dispensing.id,
actor=current_user,
details={
"drug_variant_id": dispensing.drug_variant_id,
"requested_quantity": dispense_qty,
"dispense_mode": dispense_mode,
"dispense_source": selected_source,
"requested_pack_id": resolved["pack_id"],
"requested_pack_count": resolved["pack_count"],
"allocations": allocation_payload,
"animal_name": dispensing.animal_name,
"notes": dispensing.notes,
},
)
db.commit()
db.refresh(db_dispensing)
return {
"id": db_dispensing.id,
"drug_variant_id": db_dispensing.drug_variant_id,
"batch_id": db_dispensing.batch_id,
"actor_user_id": db_dispensing.actor_user_id,
"quantity": db_dispensing.quantity,
"dispense_mode": db_dispensing.dispense_mode,
"requested_pack_id": db_dispensing.requested_pack_id,
"requested_pack_count": db_dispensing.requested_pack_count,
"animal_name": db_dispensing.animal_name,
"user_name": db_dispensing.user_name,
"notes": db_dispensing.notes,
"dispensed_at": db_dispensing.dispensed_at,
"allocations": allocation_payload,
}
@router.get("/dispense/history", response_model=List[DispensingResponse])
def list_dispensings(skip: int = 0, limit: int = 100, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
"""Get dispensing records (audit log)"""
entries = db.query(Dispensing).order_by(Dispensing.dispensed_at.desc()).offset(skip).limit(limit).all()
result = []
for item in entries:
allocations = db.query(DispensingAllocation).filter(DispensingAllocation.dispensing_id == item.id).all()
result.append(
{
"id": item.id,
"drug_variant_id": item.drug_variant_id,
"batch_id": item.batch_id,
"actor_user_id": item.actor_user_id,
"quantity": item.quantity,
"dispense_mode": item.dispense_mode,
"requested_pack_id": item.requested_pack_id,
"requested_pack_count": item.requested_pack_count,
"animal_name": item.animal_name,
"user_name": item.user_name,
"notes": item.notes,
"dispensed_at": item.dispensed_at,
"allocations": [{"batch_id": a.batch_id, "quantity": a.quantity} for a in allocations],
}
)
return result
@router.get("/drugs/{drug_id}/dispense/history", response_model=List[DispensingResponse])
def get_drug_dispensings(drug_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
"""Get dispensing history for a specific drug (all variants)"""
# Verify drug exists
drug = db.query(Drug).filter(Drug.id == drug_id).first()
if not drug:
raise HTTPException(status_code=404, detail="Drug not found")
# Get all variant IDs for this drug
variant_ids = db.query(DrugVariant.id).filter(DrugVariant.drug_id == drug_id).subquery()
entries = db.query(Dispensing).filter(Dispensing.drug_variant_id.in_(variant_ids)).order_by(Dispensing.dispensed_at.desc()).all()
result = []
for item in entries:
allocations = db.query(DispensingAllocation).filter(DispensingAllocation.dispensing_id == item.id).all()
result.append(
{
"id": item.id,
"drug_variant_id": item.drug_variant_id,
"batch_id": item.batch_id,
"actor_user_id": item.actor_user_id,
"quantity": item.quantity,
"dispense_mode": item.dispense_mode,
"requested_pack_id": item.requested_pack_id,
"requested_pack_count": item.requested_pack_count,
"animal_name": item.animal_name,
"user_name": item.user_name,
"notes": item.notes,
"dispensed_at": item.dispensed_at,
"allocations": [{"batch_id": a.batch_id, "quantity": a.quantity} for a in allocations],
}
)
return result
@router.get("/variants/{variant_id}/dispense/history", response_model=List[DispensingResponse])
def get_variant_dispensings(variant_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
"""Get dispensing history for a specific drug variant"""
# Verify variant exists
variant = db.query(DrugVariant).filter(DrugVariant.id == variant_id).first()
if not variant:
raise HTTPException(status_code=404, detail="Drug variant not found")
entries = db.query(Dispensing).filter(Dispensing.drug_variant_id == variant_id).order_by(Dispensing.dispensed_at.desc()).all()
result = []
for item in entries:
allocations = db.query(DispensingAllocation).filter(DispensingAllocation.dispensing_id == item.id).all()
result.append(
{
"id": item.id,
"drug_variant_id": item.drug_variant_id,
"batch_id": item.batch_id,
"actor_user_id": item.actor_user_id,
"quantity": item.quantity,
"dispense_mode": item.dispense_mode,
"requested_pack_id": item.requested_pack_id,
"requested_pack_count": item.requested_pack_count,
"animal_name": item.animal_name,
"user_name": item.user_name,
"notes": item.notes,
"dispensed_at": item.dispensed_at,
"allocations": [{"batch_id": a.batch_id, "quantity": a.quantity} for a in allocations],
}
)
return result
@router.get("/locations", response_model=List[LocationResponse])
def list_locations(active_only: bool = True, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
query = db.query(Location)
if active_only:
query = query.filter(Location.is_active.is_(True))
return query.order_by(Location.name.asc()).all()
@router.post("/locations", response_model=LocationResponse)
def create_location(location: LocationCreate, db: Session = Depends(get_db), current_user: User = Depends(get_current_admin_user)):
cleaned_name = location.name.strip()
if not cleaned_name:
raise HTTPException(status_code=400, detail="Location name cannot be empty")
existing = db.query(Location).filter(Location.name == cleaned_name).first()
if existing:
raise HTTPException(status_code=400, detail="Location with this name already exists")
row = Location(name=cleaned_name, is_active=True)
db.add(row)
write_audit_log(
db,
action="location.create",
entity_type="location",
entity_id=None,
actor=current_user,
details={"name": row.name},
)
db.commit()
db.refresh(row)
return row
@router.put("/locations/{location_id}", response_model=LocationResponse)
def update_location(location_id: int, payload: LocationUpdate, db: Session = Depends(get_db), current_user: User = Depends(get_current_admin_user)):
location = db.query(Location).filter(Location.id == location_id).first()
if not location:
raise HTTPException(status_code=404, detail="Location not found")
before = {"name": location.name, "is_active": location.is_active}
if payload.name is not None:
cleaned_name = payload.name.strip()
if not cleaned_name:
raise HTTPException(status_code=400, detail="Location name cannot be empty")
dup = db.query(Location).filter(Location.name == cleaned_name, Location.id != location_id).first()
if dup:
raise HTTPException(status_code=400, detail="Another location already uses that name")
location.name = cleaned_name
if payload.is_active is not None and payload.is_active is False:
stock_count = db.query(Batch).filter(Batch.location_id == location_id, Batch.quantity > 0).count()
if stock_count > 0:
raise HTTPException(status_code=400, detail="Cannot archive location while active stock remains")
location.is_active = False
elif payload.is_active is not None and payload.is_active is True:
location.is_active = True
write_audit_log(
db,
action="location.update",
entity_type="location",
entity_id=location_id,
actor=current_user,
details={"before": before, "after": payload.dict(exclude_unset=True)},
)
db.commit()
db.refresh(location)
return location
@router.get("/variants/{variant_id}/batches", response_model=List[BatchResponse])
def list_variant_batches(variant_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
variant = db.query(DrugVariant).filter(DrugVariant.id == variant_id).first()
if not variant:
raise HTTPException(status_code=404, detail="Drug variant not found")
batches = (
db.query(Batch)
.filter(Batch.drug_variant_id == variant_id)
.order_by(Batch.expiry_date.asc(), Batch.received_at.asc())
.all()
)
return [serialize_batch_response(db, batch) for batch in batches]
@router.post("/variants/{variant_id}/batches", response_model=BatchResponse)
def create_variant_batch(variant_id: int, payload: BatchCreate, db: Session = Depends(get_db), current_user: User = Depends(get_current_non_readonly_user)):
variant = db.query(DrugVariant).filter(DrugVariant.id == variant_id).first()
if not variant:
raise HTTPException(status_code=404, detail="Drug variant not found")
resolved = resolve_pack_quantity(
db,
variant_id=variant_id,
quantity=payload.quantity,
pack_id=payload.received_pack_id,
pack_count=payload.received_pack_count,
)
batch_quantity = resolved["quantity"]
location = db.query(Location).filter(Location.id == payload.location_id, Location.is_active.is_(True)).first()
if not location:
raise HTTPException(status_code=400, detail="Location not found or inactive")
batch_number = payload.batch_number.strip()
if not batch_number:
raise HTTPException(status_code=400, detail="Batch number cannot be empty")
existing = (
db.query(Batch)
.filter(Batch.drug_variant_id == variant_id, Batch.batch_number == batch_number)
.first()
)
if existing:
if existing.expiry_date != payload.expiry_date:
raise HTTPException(
status_code=400,
detail="Batch number already exists for this variant with a different expiry date",
)
# Same batch number and expiry — restock the existing batch
existing.quantity += batch_quantity
if existing.received_pack_id == resolved["pack_id"]:
existing.received_pack_count = (existing.received_pack_count or 0) + resolved["pack_count"]
if payload.notes:
existing.notes = (existing.notes + "\n" + payload.notes) if existing.notes else payload.notes
recompute_batch_pack_state(existing)
variant.quantity += batch_quantity
write_audit_log(
db,
action="batch.restock",
entity_type="batch",
entity_id=existing.id,
actor=current_user,
details={
"variant_id": variant_id,
"batch_number": batch_number,
"quantity_added": batch_quantity,
"new_total_quantity": existing.quantity,
"received_pack_id": resolved["pack_id"],
"received_pack_count": resolved["pack_count"],
"expiry_date": str(payload.expiry_date),
"location_id": existing.location_id,
},
)
db.commit()
db.refresh(existing)
return serialize_batch_response(db, existing)
row = Batch(
drug_variant_id=variant_id,
batch_number=batch_number,
quantity=batch_quantity,
received_pack_id=resolved["pack_id"],
received_pack_count=resolved["pack_count"],
received_pack_size_snapshot=resolve_pack_size_snapshot(db, resolved["pack_id"]),
expiry_date=payload.expiry_date,
location_id=payload.location_id,
notes=payload.notes,
)
recompute_batch_pack_state(row)
db.add(row)
variant.quantity += batch_quantity
write_audit_log(
db,
action="batch.create",
entity_type="batch",
entity_id=None,
actor=current_user,
details={
"variant_id": variant_id,
"batch_number": batch_number,
"quantity": batch_quantity,
"received_pack_id": resolved["pack_id"],
"received_pack_count": resolved["pack_count"],
"received_pack_size_snapshot": row.received_pack_size_snapshot,
"expiry_date": payload.expiry_date,
"location_id": payload.location_id,
},
)
db.commit()
db.refresh(row)
return serialize_batch_response(db, row)
@router.put("/batches/{batch_id}", response_model=BatchResponse)
def update_batch(batch_id: int, payload: BatchUpdate, db: Session = Depends(get_db), current_user: User = Depends(get_current_non_readonly_user)):
batch = db.query(Batch).filter(Batch.id == batch_id).first()
if not batch:
raise HTTPException(status_code=404, detail="Batch not found")
variant = db.query(DrugVariant).filter(DrugVariant.id == batch.drug_variant_id).first()
if not variant:
raise HTTPException(status_code=404, detail="Parent variant not found")
before = {
"batch_number": batch.batch_number,
"quantity": batch.quantity,
"received_pack_id": batch.received_pack_id,
"received_pack_count": batch.received_pack_count,
"received_pack_size_snapshot": batch.received_pack_size_snapshot,
"current_full_pack_count": batch.current_full_pack_count,
"current_loose_base_units": batch.current_loose_base_units,
"expiry_date": batch.expiry_date,
"location_id": batch.location_id,
"notes": batch.notes,
"disposed_at": batch.disposed_at,
"disposed_by_user_id": batch.disposed_by_user_id,
"disposed_quantity": batch.disposed_quantity,
"disposal_notes": batch.disposal_notes,
}
if payload.batch_number is not None:
cleaned_batch_number = payload.batch_number.strip()
if not cleaned_batch_number:
raise HTTPException(status_code=400, detail="Batch number cannot be empty")
duplicate = (
db.query(Batch)
.filter(
Batch.drug_variant_id == batch.drug_variant_id,
Batch.batch_number == cleaned_batch_number,
Batch.id != batch.id,
)
.first()
)
if duplicate:
raise HTTPException(status_code=400, detail="Batch number already exists for this variant")
batch.batch_number = cleaned_batch_number
if payload.location_id is not None:
location = db.query(Location).filter(Location.id == payload.location_id, Location.is_active.is_(True)).first()
if not location:
raise HTTPException(status_code=400, detail="Location not found or inactive")
batch.location_id = payload.location_id
if payload.expiry_date is not None:
batch.expiry_date = payload.expiry_date
if payload.notes is not None:
batch.notes = payload.notes
if payload.received_pack_id is not None or payload.received_pack_count is not None or payload.quantity is not None:
if payload.quantity is not None and payload.quantity < 0:
raise HTTPException(status_code=400, detail="Batch quantity cannot be negative")
if payload.received_pack_id is None and payload.received_pack_count is None:
if payload.quantity is None:
raise HTTPException(status_code=400, detail="Batch quantity cannot be empty")
resolved_quantity = payload.quantity
resolved_pack_id = batch.received_pack_id
resolved_pack_count = batch.received_pack_count
else:
target_pack_id = payload.received_pack_id if payload.received_pack_id is not None else batch.received_pack_id
target_pack_count = payload.received_pack_count if payload.received_pack_count is not None else batch.received_pack_count
if target_pack_id is None or target_pack_count is None:
raise HTTPException(status_code=400, detail="Both pack_id and pack_count are required for pack-based updates")
resolved = resolve_pack_quantity(
db,
variant_id=batch.drug_variant_id,
quantity=payload.quantity,
pack_id=target_pack_id,
pack_count=target_pack_count,
)
resolved_quantity = resolved["quantity"]
resolved_pack_id = resolved["pack_id"]
resolved_pack_count = resolved["pack_count"]
delta = resolved_quantity - batch.quantity
projected_variant_qty = variant.quantity + delta
if projected_variant_qty < 0:
raise HTTPException(status_code=400, detail="Variant quantity cannot become negative")
batch.quantity = resolved_quantity
batch.received_pack_id = resolved_pack_id
batch.received_pack_count = resolved_pack_count
batch.received_pack_size_snapshot = resolve_pack_size_snapshot(db, resolved_pack_id)
recompute_batch_pack_state(batch)
variant.quantity = projected_variant_qty
write_audit_log(
db,
action="batch.update",
entity_type="batch",
entity_id=batch_id,
actor=current_user,
details={"before": before, "after": payload.dict(exclude_unset=True)},
)
db.commit()
db.refresh(batch)
return serialize_batch_response(db, batch)
@router.post("/batches/{batch_id}/dispose", response_model=BatchResponse)
def dispose_batch(batch_id: int, payload: BatchDisposeRequest, db: Session = Depends(get_db), current_user: User = Depends(get_current_non_readonly_user)):
batch = db.query(Batch).filter(Batch.id == batch_id).first()
if not batch:
raise HTTPException(status_code=404, detail="Batch not found")
if batch.disposed_at is not None:
raise HTTPException(status_code=400, detail="Batch has already been disposed")
if batch.quantity <= 0:
raise HTTPException(status_code=400, detail="Batch has no remaining stock to dispose")
if batch.expiry_date >= date.today():
raise HTTPException(status_code=400, detail="Only expired batches can be marked as disposed")
variant = db.query(DrugVariant).filter(DrugVariant.id == batch.drug_variant_id).first()
if not variant:
raise HTTPException(status_code=404, detail="Parent variant not found")
disposed_quantity = batch.quantity
if variant.quantity - disposed_quantity < -1e-6:
raise HTTPException(status_code=400, detail="Variant quantity cannot become negative during disposal")
batch.quantity = 0
batch.disposed_at = datetime.utcnow()
batch.disposed_by_user_id = current_user.id
batch.disposed_quantity = disposed_quantity
batch.disposal_notes = (payload.notes or '').strip() or None
recompute_batch_pack_state(batch)
variant.quantity = max(0, variant.quantity - disposed_quantity)
write_audit_log(
db,
action="batch.dispose",
entity_type="batch",
entity_id=batch.id,
actor=current_user,
details={
"batch_number": batch.batch_number,
"variant_id": batch.drug_variant_id,
"disposed_quantity": disposed_quantity,
"expiry_date": batch.expiry_date.isoformat() if batch.expiry_date else None,
"location_id": batch.location_id,
"disposal_notes": batch.disposal_notes,
},
)
db.commit()
db.refresh(batch)
return serialize_batch_response(db, batch)
@router.get("/audit", response_model=List[Dict[str, Any]])
def list_audit_events(skip: int = 0, limit: int = 200, db: Session = Depends(get_db), current_user: User = Depends(get_current_admin_user)):
events = db.query(AuditLog).order_by(AuditLog.created_at.desc()).offset(skip).limit(limit).all()
response = []
for evt in events:
details = None
if evt.details:
try:
details = json.loads(evt.details)
except json.JSONDecodeError:
details = {"raw": evt.details}
response.append(
{
"id": evt.id,
"action": evt.action,
"entity_type": evt.entity_type,
"entity_id": evt.entity_id,
"actor_user_id": evt.actor_user_id,
"actor_username": evt.actor_username,
"details": details,
"created_at": evt.created_at,
}
)
return response
def _csv_response(filename: str, headers: List[str], rows: List[List[Any]]) -> Response:
output = io.StringIO()
writer = csv.writer(output)
writer.writerow(headers)
writer.writerows(rows)
return Response(
content=output.getvalue(),
media_type="text/csv",
headers={"Content-Disposition": f"attachment; filename={filename}"},
)
@router.get("/reports/controlled-movement")
def report_controlled_movement(
from_date: Optional[date] = None,
to_date: Optional[date] = None,
format: str = "json",
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
query = (
db.query(Dispensing, Drug, DrugVariant)
.join(DrugVariant, Dispensing.drug_variant_id == DrugVariant.id)
.join(Drug, DrugVariant.drug_id == Drug.id)
.filter(Drug.is_controlled.is_(True))
)
if from_date is not None:
query = query.filter(Dispensing.dispensed_at >= datetime.combine(from_date, datetime.min.time()))
if to_date is not None:
query = query.filter(Dispensing.dispensed_at < datetime.combine(to_date + timedelta(days=1), datetime.min.time()))
rows = query.order_by(Dispensing.dispensed_at.desc()).all()
result = [
{
"dispensing_id": d.id,
"dispensed_at": d.dispensed_at,
"drug_name": drug.name,
"strength": variant.strength,
"quantity": d.quantity,
"dispense_mode": d.dispense_mode,
"requested_pack_id": d.requested_pack_id,
"requested_pack_count": d.requested_pack_count,
"user_name": d.user_name,
"animal_name": d.animal_name,
"batch_id": d.batch_id,
}
for d, drug, variant in rows
]
if format.lower() == "csv":
csv_rows = [
[
item["dispensing_id"],
item["dispensed_at"],
item["drug_name"],
item["strength"],
item["quantity"],
item["user_name"],
item["animal_name"],
item["batch_id"],
]
for item in result
]
return _csv_response(
"controlled_movement.csv",
["dispensing_id", "dispensed_at", "drug_name", "strength", "quantity", "user_name", "animal_name", "batch_id"],
csv_rows,
)
return result
@router.get("/reports/batch-expiry")
def report_batch_expiry(
days: int = 30,
format: str = "json",
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
if days < 0:
raise HTTPException(status_code=400, detail="Days must be non-negative")
today = date.today()
cutoff = today + timedelta(days=days)
rows = (
db.query(Batch, DrugVariant, Drug, Location)
.join(DrugVariant, Batch.drug_variant_id == DrugVariant.id)
.join(Drug, DrugVariant.drug_id == Drug.id)
.join(Location, Batch.location_id == Location.id)
.filter(Batch.quantity > 0, Batch.expiry_date <= cutoff)
.order_by(Batch.expiry_date.asc())
.all()
)
result = []
for batch, variant, drug, location in rows:
status_label = "expired" if batch.expiry_date < today else "expiring"
result.append(
{
"batch_id": batch.id,
"batch_number": batch.batch_number,
"drug_name": drug.name,
"strength": variant.strength,
"quantity": batch.quantity,
"location": location.name,
"expiry_date": batch.expiry_date,
"status": status_label,
"is_controlled": bool(drug.is_controlled),
}
)
if format.lower() == "csv":
csv_rows = [
[
item["batch_id"],
item["batch_number"],
item["drug_name"],
item["strength"],
item["quantity"],
item["location"],
item["expiry_date"],
item["status"],
item["is_controlled"],
]
for item in result
]
return _csv_response(
"batch_expiry.csv",
["batch_id", "batch_number", "drug_name", "strength", "quantity", "location", "expiry_date", "status", "is_controlled"],
csv_rows,
)
return result
@router.get("/reports/stock-by-location")
def report_stock_by_location(
format: str = "json",
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
rows = (
db.query(Location, Batch, DrugVariant, Drug)
.join(Batch, Batch.location_id == Location.id)
.join(DrugVariant, Batch.drug_variant_id == DrugVariant.id)
.join(Drug, DrugVariant.drug_id == Drug.id)
.filter(Batch.quantity > 0)
.order_by(Location.name.asc(), Drug.name.asc(), DrugVariant.strength.asc())
.all()
)
result = [
{
"location_id": location.id,
"location_name": location.name,
"batch_id": batch.id,
"batch_number": batch.batch_number,
"drug_name": drug.name,
"strength": variant.strength,
"quantity": batch.quantity,
"unit": variant.unit,
"expiry_date": batch.expiry_date,
"is_controlled": bool(drug.is_controlled),
}
for location, batch, variant, drug in rows
]
if format.lower() == "csv":
csv_rows = [
[
item["location_id"],
item["location_name"],
item["batch_id"],
item["batch_number"],
item["drug_name"],
item["strength"],
item["quantity"],
item["unit"],
item["expiry_date"],
item["is_controlled"],
]
for item in result
]
return _csv_response(
"stock_by_location.csv",
["location_id", "location_name", "batch_id", "batch_number", "drug_name", "strength", "quantity", "unit", "expiry_date", "is_controlled"],
csv_rows,
)
return result
@router.get("/reports/global-inventory")
def report_global_inventory(
format: str = "json",
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
variant_rows = (
db.query(DrugVariant, Drug)
.join(Drug, DrugVariant.drug_id == Drug.id)
.order_by(Drug.name.asc(), DrugVariant.strength.asc())
.all()
)
result: List[Dict[str, Any]] = []
for variant, drug in variant_rows:
batch_rows = (
db.query(Batch, Location)
.join(Location, Batch.location_id == Location.id)
.filter(Batch.drug_variant_id == variant.id, Batch.quantity > 0)
.order_by(Batch.expiry_date.asc(), Location.name.asc(), Batch.batch_number.asc())
.all()
)
total_batch_quantity = 0.0
for batch, location in batch_rows:
total_batch_quantity += float(batch.quantity or 0)
result.append(
{
"batch_id": batch.id,
"batch_number": batch.batch_number,
"drug_name": drug.name,
"strength": variant.strength,
"quantity": batch.quantity,
"unit": variant.unit,
"location_name": location.name,
"expiry_date": batch.expiry_date,
"inventory_source": "batch",
"is_controlled": bool(drug.is_controlled),
}
)
legacy_quantity = max(0.0, float(variant.quantity or 0) - total_batch_quantity)
if legacy_quantity > 1e-6:
result.append(
{
"batch_id": None,
"batch_number": "Legacy stock",
"drug_name": drug.name,
"strength": variant.strength,
"quantity": legacy_quantity,
"unit": variant.unit,
"location_name": None,
"expiry_date": None,
"inventory_source": "legacy",
"is_controlled": bool(drug.is_controlled),
}
)
if format.lower() == "csv":
csv_rows = [
[
item["drug_name"],
item["strength"],
item["batch_number"],
item["quantity"],
item["unit"],
item["location_name"],
item["expiry_date"],
item["inventory_source"],
item["is_controlled"],
]
for item in result
]
return _csv_response(
"global_inventory.csv",
[
"drug_name",
"strength",
"batch_number",
"quantity",
"unit",
"location_name",
"expiry_date",
"inventory_source",
"is_controlled",
],
csv_rows,
)
return result
@router.get("/reports/batch-attention")
def report_batch_attention(
format: str = "json",
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
today = date.today()
rows = (
db.query(Batch, DrugVariant, Drug, Location)
.join(DrugVariant, Batch.drug_variant_id == DrugVariant.id)
.join(Drug, DrugVariant.drug_id == Drug.id)
.join(Location, Batch.location_id == Location.id)
.filter(Batch.quantity > 0, Batch.expiry_date < today)
.order_by(Batch.expiry_date.asc(), Drug.name.asc(), DrugVariant.strength.asc(), Batch.batch_number.asc())
.all()
)
result = []
for batch, variant, drug, location in rows:
result.append(
{
"batch_id": batch.id,
"batch_number": batch.batch_number,
"drug_name": drug.name,
"strength": variant.strength,
"quantity": batch.quantity,
"unit": variant.unit,
"location": location.name,
"expiry_date": batch.expiry_date,
"status": "expired",
"received_pack_unit_name": None,
"current_full_pack_count": batch.current_full_pack_count,
"current_loose_base_units": batch.current_loose_base_units,
"is_controlled": bool(drug.is_controlled),
}
)
if format.lower() == "csv":
csv_rows = [
[
item["batch_id"],
item["batch_number"],
item["drug_name"],
item["strength"],
item["quantity"],
item["unit"],
item["location"],
item["expiry_date"],
item["status"],
item["current_full_pack_count"],
item["current_loose_base_units"],
item["is_controlled"],
]
for item in result
]
return _csv_response(
"batch_attention.csv",
[
"batch_id",
"batch_number",
"drug_name",
"strength",
"quantity",
"unit",
"location",
"expiry_date",
"status",
"current_full_pack_count",
"current_loose_base_units",
"is_controlled",
],
csv_rows,
)
return result
@router.get("/reports/audit-trail")
def report_audit_trail(
from_date: Optional[date] = None,
to_date: Optional[date] = None,
format: str = "json",
db: Session = Depends(get_db),
current_user: User = Depends(get_current_admin_user),
):
query = db.query(AuditLog)
if from_date is not None:
query = query.filter(AuditLog.created_at >= datetime.combine(from_date, datetime.min.time()))
if to_date is not None:
query = query.filter(AuditLog.created_at < datetime.combine(to_date + timedelta(days=1), datetime.min.time()))
rows = query.order_by(AuditLog.created_at.desc()).all()
result = []
for evt in rows:
details = None
if evt.details:
try:
details = json.loads(evt.details)
except json.JSONDecodeError:
details = {"raw": evt.details}
result.append(
{
"id": evt.id,
"created_at": evt.created_at,
"action": evt.action,
"entity_type": evt.entity_type,
"entity_id": evt.entity_id,
"actor_user_id": evt.actor_user_id,
"actor_username": evt.actor_username,
"details": details,
}
)
if format.lower() == "csv":
csv_rows = [
[
item["id"],
item["created_at"],
item["action"],
item["entity_type"],
item["entity_id"],
item["actor_user_id"],
item["actor_username"],
json.dumps(item["details"], default=str) if item["details"] is not None else "",
]
for item in result
]
return _csv_response(
"audit_trail.csv",
["id", "created_at", "action", "entity_type", "entity_id", "actor_user_id", "actor_username", "details"],
csv_rows,
)
return result
# Helper function to capitalize text for labels
def capitalize_label_text(text: str) -> str:
"""Capitalize the first letter of each sentence in the text"""
if not text:
return text
# Capitalize first letter of the entire string
result = text[0].upper() + text[1:] if len(text) > 1 else text.upper()
# Also capitalize after periods and common sentence breaks
for delimiter in ['. ', '! ', '? ']:
parts = result.split(delimiter)
result = delimiter.join([
part[0].upper() + part[1:] if part else part
for part in parts
])
return result
# Label printing endpoint
@router.post("/labels/print", response_model=LabelPrintResponse)
def print_label(label_request: LabelPrintRequest, current_user: User = Depends(get_current_non_readonly_user)):
"""
Print a drug label by publishing an MQTT message
This endpoint publishes a label print request to the MQTT broker,
which will be picked up by the label printing service.
"""
try:
# Get label configuration from environment
import os
template_id = os.getenv("LABEL_TEMPLATE_ID", "vet_label")
label_size = os.getenv("LABEL_SIZE", "29x90")
test_mode = os.getenv("LABEL_TEST", "false").lower() == "true"
# Capitalize all text fields for better presentation
variables = label_request.variables.dict()
variables["practice_name"] = capitalize_label_text(variables["practice_name"])
variables["animal_name"] = capitalize_label_text(variables["animal_name"])
variables["drug_name"] = capitalize_label_text(variables["drug_name"])
variables["dosage"] = capitalize_label_text(variables["dosage"])
variables["quantity"] = capitalize_label_text(variables["quantity"])
# expiry_date doesn't need capitalization
# Convert the request to the MQTT message format
mqtt_message = {
"template_id": template_id,
"label_size": label_size,
"variables": variables,
"test": test_mode
}
# Publish to MQTT and wait for response
success, response = publish_label_print_with_response(mqtt_message, timeout=10.0)
print(f"Label print result: success={success}, response={response}")
if success:
result = LabelPrintResponse(
success=True,
message=response.get("message", "Label printed successfully")
)
print(f"Returning success response: {result}")
return result
else:
# Return error details from printer
# Check both 'message' and 'error' fields for error details
if response:
error_msg = response.get("message") or response.get("error", "Unknown error")
else:
error_msg = "No response from printer"
result = LabelPrintResponse(
success=False,
message=f"Print failed: {error_msg}"
)
print(f"Returning error response: {result}")
return result
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Error sending label print request: {str(e)}"
)
# Notes printing endpoint
@router.post("/notes/print", response_model=NotesPrintResponse)
def print_notes(notes_request: NotesPrintRequest, current_user: User = Depends(get_current_non_readonly_user)):
"""
Print notes by publishing an MQTT message
This endpoint publishes a notes print request to the MQTT broker,
which will be picked up by the label printing service.
"""
try:
# Get notes template configuration from environment
import os
template_id = os.getenv("NOTES_TEMPLATE_ID", "notes_template")
label_size = os.getenv("LABEL_SIZE", "29x90")
test_mode = os.getenv("LABEL_TEST", "false").lower() == "true"
# Capitalize text fields for better presentation
variables = notes_request.variables.dict()
variables["animal_name"] = capitalize_label_text(variables["animal_name"])
variables["notes"] = capitalize_label_text(variables["notes"])
# Convert the request to the MQTT message format
mqtt_message = {
"template_id": template_id,
"label_size": label_size,
"variables": variables,
"test": test_mode
}
# Publish to MQTT and wait for response
success, response = publish_label_print_with_response(mqtt_message, timeout=10.0)
print(f"Notes print result: success={success}, response={response}")
if success:
result = NotesPrintResponse(
success=True,
message=response.get("message", "Notes printed successfully")
)
print(f"Returning success response: {result}")
return result
else:
# Return error details from printer
# Check both 'message' and 'error' fields for error details
if response:
error_msg = response.get("message") or response.get("error", "Unknown error")
else:
error_msg = "No response from printer"
result = NotesPrintResponse(
success=False,
message=f"Print failed: {error_msg}"
)
print(f"Returning error response: {result}")
return result
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Error sending notes print request: {str(e)}"
)
@router.get("/gtin/{gtin}", response_model=GtinMappingResponse)
def get_gtin_mapping(
gtin: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
mapping = db.query(GtinMapping).filter(GtinMapping.gtin == gtin).first()
if not mapping:
raise HTTPException(status_code=404, detail="GTIN not found")
variant = db.query(DrugVariant).filter(DrugVariant.id == mapping.drug_variant_id).first()
drug = db.query(Drug).filter(Drug.id == variant.drug_id).first() if variant else None
pack = db.query(VariantPack).filter(VariantPack.id == mapping.variant_pack_id).first()
if not variant or not drug or not pack:
# Referenced records no longer exist — delete the stale mapping and treat as unknown
db.delete(mapping)
db.commit()
raise HTTPException(status_code=404, detail="GTIN not found")
return GtinMappingResponse(
id=mapping.id,
gtin=mapping.gtin,
drug_variant_id=mapping.drug_variant_id,
variant_pack_id=mapping.variant_pack_id,
drug_id=drug.id,
drug_name=drug.name,
variant_strength=variant.strength,
variant_unit=variant.unit,
pack_label=f"{pack.pack_unit_name} of {int(pack.pack_size_in_base_units) if pack.pack_size_in_base_units == int(pack.pack_size_in_base_units) else pack.pack_size_in_base_units}",
pack_size_in_base_units=pack.pack_size_in_base_units,
pack_unit_name=pack.pack_unit_name,
)
@router.post("/gtin", response_model=GtinMappingResponse, status_code=201)
def create_gtin_mapping(
body: GtinMappingCreate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_non_readonly_user),
):
existing = db.query(GtinMapping).filter(GtinMapping.gtin == body.gtin).first()
if existing:
raise HTTPException(status_code=409, detail="GTIN mapping already exists")
variant = db.query(DrugVariant).filter(DrugVariant.id == body.drug_variant_id).first()
if not variant:
raise HTTPException(status_code=404, detail="Drug variant not found")
pack = db.query(VariantPack).filter(
VariantPack.id == body.variant_pack_id,
VariantPack.drug_variant_id == body.drug_variant_id,
).first()
if not pack:
raise HTTPException(status_code=404, detail="Pack not found for this variant")
drug = db.query(Drug).filter(Drug.id == variant.drug_id).first()
mapping = GtinMapping(
gtin=body.gtin,
drug_variant_id=body.drug_variant_id,
variant_pack_id=body.variant_pack_id,
created_by_user_id=current_user.id,
)
db.add(mapping)
db.commit()
db.refresh(mapping)
return GtinMappingResponse(
id=mapping.id,
gtin=mapping.gtin,
drug_variant_id=mapping.drug_variant_id,
variant_pack_id=mapping.variant_pack_id,
drug_id=drug.id,
drug_name=drug.name,
variant_strength=variant.strength,
variant_unit=variant.unit,
pack_label=f"{pack.pack_unit_name} of {int(pack.pack_size_in_base_units) if pack.pack_size_in_base_units == int(pack.pack_size_in_base_units) else pack.pack_size_in_base_units}",
pack_size_in_base_units=pack.pack_size_in_base_units,
pack_unit_name=pack.pack_unit_name,
)
# Include router with /api prefix
app.include_router(router)