first upload

This commit is contained in:
2026-01-16 12:48:44 -05:00
parent b7a13e9c39
commit f83d672d28
12 changed files with 1941 additions and 0 deletions

0
backend/app/__init__.py Normal file
View File

30
backend/app/database.py Normal file
View File

@@ -0,0 +1,30 @@
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, declarative_base
import os
DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///./drugs.db")
# For SQLite, ensure the directory exists
if "sqlite" in DATABASE_URL:
# Extract path from sqlite:/// URL
db_path = DATABASE_URL.replace("sqlite:///", "")
os.makedirs(os.path.dirname(db_path) or ".", exist_ok=True)
engine = create_engine(
DATABASE_URL,
connect_args={"check_same_thread": False} if "sqlite" in DATABASE_URL else {}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
# Drop and recreate all tables (for development only)
Base.metadata.drop_all(bind=engine)
Base.metadata.create_all(bind=engine)
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()

314
backend/app/main.py Normal file
View File

@@ -0,0 +1,314 @@
from fastapi import FastAPI, Depends, HTTPException, APIRouter
from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy.orm import Session
from typing import List, Optional
from datetime import datetime
from .database import engine, get_db, Base
from .models import Drug, DrugVariant, Dispensing
from pydantic import BaseModel
# Create tables
Base.metadata.create_all(bind=engine)
app = FastAPI(title="Drug Inventory API")
# CORS middleware for frontend
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # In production, restrict this
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Create a router with /api prefix
router = APIRouter(prefix="/api")
# Pydantic schemas
class DrugCreate(BaseModel):
name: str
description: str = None
class DrugUpdate(BaseModel):
name: str = None
description: str = None
class DrugResponse(BaseModel):
id: int
name: str
description: str = None
class Config:
from_attributes = True
class DrugVariantCreate(BaseModel):
strength: str
quantity: float
unit: str = "units"
low_stock_threshold: float = 10
class DrugVariantUpdate(BaseModel):
strength: str = None
quantity: float = None
unit: str = None
low_stock_threshold: float = None
class DrugVariantResponse(BaseModel):
id: int
drug_id: int
strength: str
quantity: float
unit: str
low_stock_threshold: float
class Config:
from_attributes = True
class DrugWithVariantsResponse(BaseModel):
id: int
name: str
description: str = None
variants: List[DrugVariantResponse] = []
class Config:
from_attributes = True
class DispensingCreate(BaseModel):
drug_variant_id: int
quantity: float
animal_name: str
user_name: str
notes: Optional[str] = None
class DispensingResponse(BaseModel):
id: int
drug_variant_id: int
quantity: float
animal_name: str
user_name: str
notes: Optional[str] = None
dispensed_at: datetime
class Config:
from_attributes = True
# Routes
@router.get("/")
def read_root():
return {"message": "Drug Inventory API"}
@router.get("/drugs", response_model=List[DrugWithVariantsResponse])
def list_drugs(db: Session = Depends(get_db)):
"""Get all drugs with their variants"""
drugs = db.query(Drug).all()
result = []
for drug in drugs:
variants = db.query(DrugVariant).filter(DrugVariant.drug_id == drug.id).all()
drug_dict = drug.__dict__.copy()
drug_dict['variants'] = variants
result.append(drug_dict)
return result
@router.get("/drugs/low-stock", response_model=List[DrugWithVariantsResponse])
def low_stock_drugs(db: Session = Depends(get_db)):
"""Get drugs with low stock variants"""
# Get variants that are low on stock
low_stock_variants = db.query(DrugVariant).filter(
DrugVariant.quantity <= DrugVariant.low_stock_threshold
).all()
# Get unique drug IDs
drug_ids = list(set(v.drug_id for v in low_stock_variants))
drugs = db.query(Drug).filter(Drug.id.in_(drug_ids)).all()
result = []
for drug in drugs:
variants = [v for v in low_stock_variants if v.drug_id == drug.id]
drug_dict = drug.__dict__.copy()
drug_dict['variants'] = variants
result.append(drug_dict)
return result
@router.get("/drugs/{drug_id}", response_model=DrugWithVariantsResponse)
def get_drug(drug_id: int, db: Session = Depends(get_db)):
"""Get a specific drug with its variants"""
drug = db.query(Drug).filter(Drug.id == drug_id).first()
if not drug:
raise HTTPException(status_code=404, detail="Drug not found")
variants = db.query(DrugVariant).filter(DrugVariant.drug_id == drug.id).all()
drug_dict = drug.__dict__.copy()
drug_dict['variants'] = variants
return drug_dict
@router.post("/drugs", response_model=DrugWithVariantsResponse)
def create_drug(drug: DrugCreate, db: Session = Depends(get_db)):
"""Create a new drug"""
# Check if drug name already exists
existing = db.query(Drug).filter(Drug.name == drug.name).first()
if existing:
raise HTTPException(status_code=400, detail="Drug with this name already exists")
db_drug = Drug(name=drug.name, description=drug.description)
db.add(db_drug)
db.commit()
db.refresh(db_drug)
# Return drug with empty variants list
drug_dict = db_drug.__dict__.copy()
drug_dict['variants'] = []
return drug_dict
@router.put("/drugs/{drug_id}", response_model=DrugWithVariantsResponse)
def update_drug(drug_id: int, drug_update: DrugUpdate, db: Session = Depends(get_db)):
"""Update a drug"""
drug = db.query(Drug).filter(Drug.id == drug_id).first()
if not drug:
raise HTTPException(status_code=404, detail="Drug not found")
for field, value in drug_update.dict(exclude_unset=True).items():
setattr(drug, field, value)
db.commit()
db.refresh(drug)
variants = db.query(DrugVariant).filter(DrugVariant.drug_id == drug.id).all()
drug_dict = drug.__dict__.copy()
drug_dict['variants'] = variants
return drug_dict
@router.delete("/drugs/{drug_id}")
def delete_drug(drug_id: int, db: Session = Depends(get_db)):
"""Delete a drug and all its variants"""
drug = db.query(Drug).filter(Drug.id == drug_id).first()
if not drug:
raise HTTPException(status_code=404, detail="Drug not found")
# Delete all variants first
db.query(DrugVariant).filter(DrugVariant.drug_id == drug_id).delete()
# Delete the drug
db.delete(drug)
db.commit()
return {"message": "Drug and all variants deleted successfully"}
# Drug Variant endpoints
@router.post("/drugs/{drug_id}/variants", response_model=DrugVariantResponse)
def create_drug_variant(drug_id: int, variant: DrugVariantCreate, db: Session = Depends(get_db)):
"""Create a new variant for a drug"""
# Check if drug exists
drug = db.query(Drug).filter(Drug.id == drug_id).first()
if not drug:
raise HTTPException(status_code=404, detail="Drug not found")
# Check if variant with same strength already exists for this drug
existing = db.query(DrugVariant).filter(
DrugVariant.drug_id == drug_id,
DrugVariant.strength == variant.strength
).first()
if existing:
raise HTTPException(status_code=400, detail="Variant with this strength already exists for this drug")
db_variant = DrugVariant(
drug_id=drug_id,
strength=variant.strength,
quantity=variant.quantity,
unit=variant.unit,
low_stock_threshold=variant.low_stock_threshold
)
db.add(db_variant)
db.commit()
db.refresh(db_variant)
return db_variant
@router.get("/variants/{variant_id}", response_model=DrugVariantResponse)
def get_drug_variant(variant_id: int, db: Session = Depends(get_db)):
"""Get a specific drug variant"""
variant = db.query(DrugVariant).filter(DrugVariant.id == variant_id).first()
if not variant:
raise HTTPException(status_code=404, detail="Drug variant not found")
return variant
@router.put("/variants/{variant_id}", response_model=DrugVariantResponse)
def update_drug_variant(variant_id: int, variant_update: DrugVariantUpdate, db: Session = Depends(get_db)):
"""Update a drug variant"""
variant = db.query(DrugVariant).filter(DrugVariant.id == variant_id).first()
if not variant:
raise HTTPException(status_code=404, detail="Drug variant not found")
for field, value in variant_update.dict(exclude_unset=True).items():
setattr(variant, field, value)
db.commit()
db.refresh(variant)
return variant
@router.delete("/variants/{variant_id}")
def delete_drug_variant(variant_id: int, db: Session = Depends(get_db)):
"""Delete a drug variant"""
variant = db.query(DrugVariant).filter(DrugVariant.id == variant_id).first()
if not variant:
raise HTTPException(status_code=404, detail="Drug variant not found")
db.delete(variant)
db.commit()
return {"message": "Drug variant deleted successfully"}
# Dispensing endpoints
@router.post("/dispense", response_model=DispensingResponse)
def dispense_drug(dispensing: DispensingCreate, db: Session = Depends(get_db)):
"""Record a drug dispensing and reduce inventory"""
# Check if drug variant exists
variant = db.query(DrugVariant).filter(DrugVariant.id == dispensing.drug_variant_id).first()
if not variant:
raise HTTPException(status_code=404, detail="Drug variant not found")
# Check if enough quantity available
if variant.quantity < dispensing.quantity:
raise HTTPException(
status_code=400,
detail=f"Insufficient quantity. Available: {variant.quantity}, Requested: {dispensing.quantity}"
)
# Reduce variant quantity
variant.quantity -= dispensing.quantity
db.commit()
# Create dispensing record
db_dispensing = Dispensing(**dispensing.dict())
db.add(db_dispensing)
db.commit()
db.refresh(db_dispensing)
return db_dispensing
@router.get("/dispense/history", response_model=List[DispensingResponse])
def list_dispensings(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
"""Get dispensing records (audit log)"""
return db.query(Dispensing).order_by(Dispensing.dispensed_at.desc()).offset(skip).limit(limit).all()
@router.get("/drugs/{drug_id}/dispense/history", response_model=List[DispensingResponse])
def get_drug_dispensings(drug_id: int, db: Session = Depends(get_db)):
"""Get dispensing history for a specific drug (all variants)"""
# Verify drug exists
drug = db.query(Drug).filter(Drug.id == drug_id).first()
if not drug:
raise HTTPException(status_code=404, detail="Drug not found")
# Get all variant IDs for this drug
variant_ids = db.query(DrugVariant.id).filter(DrugVariant.drug_id == drug_id).subquery()
return db.query(Dispensing).filter(Dispensing.drug_variant_id.in_(variant_ids)).order_by(Dispensing.dispensed_at.desc()).all()
@router.get("/variants/{variant_id}/dispense/history", response_model=List[DispensingResponse])
def get_variant_dispensings(variant_id: int, db: Session = Depends(get_db)):
"""Get dispensing history for a specific drug variant"""
# Verify variant exists
variant = db.query(DrugVariant).filter(DrugVariant.id == variant_id).first()
if not variant:
raise HTTPException(status_code=404, detail="Drug variant not found")
return db.query(Dispensing).filter(Dispensing.drug_variant_id == variant_id).order_by(Dispensing.dispensed_at.desc()).all()
# Include router with /api prefix
app.include_router(router)

37
backend/app/models.py Normal file
View File

@@ -0,0 +1,37 @@
from sqlalchemy import Column, Integer, String, Float, DateTime, ForeignKey
from sqlalchemy.sql import func
from .database import Base
class Drug(Base):
__tablename__ = "drugs"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, unique=True, index=True, nullable=False)
description = Column(String, nullable=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now(), server_default=func.now())
class DrugVariant(Base):
__tablename__ = "drug_variants"
id = Column(Integer, primary_key=True, index=True)
drug_id = Column(Integer, ForeignKey("drugs.id"), nullable=False)
strength = Column(String, nullable=False) # e.g., "5.4mg", "16mg", "100ml"
quantity = Column(Float, nullable=False, default=0)
unit = Column(String, nullable=False, default="units") # tablets, bottles, ml, etc
low_stock_threshold = Column(Float, nullable=False, default=10)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now(), server_default=func.now())
class Dispensing(Base):
__tablename__ = "dispensings"
id = Column(Integer, primary_key=True, index=True)
drug_variant_id = Column(Integer, ForeignKey("drug_variants.id"), nullable=False)
quantity = Column(Float, nullable=False)
animal_name = Column(String, nullable=False) # Name/ID of the animal
user_name = Column(String, nullable=False) # User who dispensed
dispensed_at = Column(DateTime(timezone=True), server_default=func.now(), index=True)
notes = Column(String, nullable=True)