""" Compliance schema migration helpers. This module applies additive migrations for SQLite databases used by this project. It is intentionally lightweight and idempotent because the project does not yet use Alembic-style versioned migrations. """ import os import sqlite3 from pathlib import Path DEFAULT_DB_URL = "sqlite:///./data/drugs.db" def _resolve_sqlite_path(db_url: str) -> Path | None: if not db_url.startswith("sqlite:///"): print(f"Unsupported database URL for compliance migration: {db_url}") return None raw_path = db_url.replace("sqlite:///", "") if raw_path.startswith("/"): return Path(raw_path) return Path(raw_path) def _column_exists(cursor: sqlite3.Cursor, table_name: str, column_name: str) -> bool: cursor.execute(f"PRAGMA table_info({table_name})") columns = [row[1] for row in cursor.fetchall()] return column_name in columns def _table_exists(cursor: sqlite3.Cursor, table_name: str) -> bool: cursor.execute( "SELECT name FROM sqlite_master WHERE type='table' AND name=?", (table_name,), ) return cursor.fetchone() is not None def migrate_compliance_schema() -> None: """Apply additive schema changes needed for compliance features.""" db_url = os.getenv("DATABASE_URL", DEFAULT_DB_URL) db_path = _resolve_sqlite_path(db_url) if db_path is None: return if not db_path.exists(): print(f"Database does not exist at {db_path}, skipping compliance migration") return print(f"Running compliance migration on {db_path}") conn = sqlite3.connect(str(db_path)) cursor = conn.cursor() try: if _table_exists(cursor, "drug_variants") and not _table_exists(cursor, "variant_packs"): cursor.execute( """ CREATE TABLE variant_packs ( id INTEGER PRIMARY KEY, drug_variant_id INTEGER NOT NULL, label VARCHAR NOT NULL, pack_unit_name VARCHAR NOT NULL DEFAULT 'pack', pack_size_in_base_units FLOAT NOT NULL DEFAULT 1, is_active BOOLEAN NOT NULL DEFAULT 1, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY(drug_variant_id) REFERENCES drug_variants(id) ) """ ) cursor.execute("CREATE INDEX IF NOT EXISTS ix_variant_packs_drug_variant_id ON variant_packs(drug_variant_id)") print("Created variant_packs table") if _table_exists(cursor, "drugs") and not _column_exists(cursor, "drugs", "is_controlled"): cursor.execute("ALTER TABLE drugs ADD COLUMN is_controlled BOOLEAN NOT NULL DEFAULT 0") print("Added drugs.is_controlled") if _table_exists(cursor, "dispensings") and not _column_exists(cursor, "dispensings", "batch_id"): cursor.execute("ALTER TABLE dispensings ADD COLUMN batch_id INTEGER") print("Added dispensings.batch_id") if _table_exists(cursor, "dispensings") and not _column_exists(cursor, "dispensings", "actor_user_id"): cursor.execute("ALTER TABLE dispensings ADD COLUMN actor_user_id INTEGER") print("Added dispensings.actor_user_id") if _table_exists(cursor, "batches") and not _column_exists(cursor, "batches", "received_pack_id"): cursor.execute("ALTER TABLE batches ADD COLUMN received_pack_id INTEGER") print("Added batches.received_pack_id") if _table_exists(cursor, "batches") and not _column_exists(cursor, "batches", "received_pack_count"): cursor.execute("ALTER TABLE batches ADD COLUMN received_pack_count FLOAT") print("Added batches.received_pack_count") if _table_exists(cursor, "batches") and not _column_exists(cursor, "batches", "received_pack_size_snapshot"): cursor.execute("ALTER TABLE batches ADD COLUMN received_pack_size_snapshot FLOAT") print("Added batches.received_pack_size_snapshot") if _table_exists(cursor, "batches") and not _column_exists(cursor, "batches", "current_full_pack_count"): cursor.execute("ALTER TABLE batches ADD COLUMN current_full_pack_count FLOAT") print("Added batches.current_full_pack_count") if _table_exists(cursor, "batches") and not _column_exists(cursor, "batches", "current_loose_base_units"): cursor.execute("ALTER TABLE batches ADD COLUMN current_loose_base_units FLOAT") print("Added batches.current_loose_base_units") if _table_exists(cursor, "batches") and not _column_exists(cursor, "batches", "disposed_at"): cursor.execute("ALTER TABLE batches ADD COLUMN disposed_at DATETIME") print("Added batches.disposed_at") if _table_exists(cursor, "batches") and not _column_exists(cursor, "batches", "disposed_by_user_id"): cursor.execute("ALTER TABLE batches ADD COLUMN disposed_by_user_id INTEGER") print("Added batches.disposed_by_user_id") if _table_exists(cursor, "batches") and not _column_exists(cursor, "batches", "disposed_quantity"): cursor.execute("ALTER TABLE batches ADD COLUMN disposed_quantity FLOAT") print("Added batches.disposed_quantity") if _table_exists(cursor, "batches") and not _column_exists(cursor, "batches", "disposal_notes"): cursor.execute("ALTER TABLE batches ADD COLUMN disposal_notes VARCHAR") print("Added batches.disposal_notes") if _table_exists(cursor, "dispensings") and not _column_exists(cursor, "dispensings", "requested_pack_id"): cursor.execute("ALTER TABLE dispensings ADD COLUMN requested_pack_id INTEGER") print("Added dispensings.requested_pack_id") if _table_exists(cursor, "dispensings") and not _column_exists(cursor, "dispensings", "requested_pack_count"): cursor.execute("ALTER TABLE dispensings ADD COLUMN requested_pack_count FLOAT") print("Added dispensings.requested_pack_count") if _table_exists(cursor, "dispensings") and not _column_exists(cursor, "dispensings", "dispense_mode"): cursor.execute("ALTER TABLE dispensings ADD COLUMN dispense_mode VARCHAR NOT NULL DEFAULT 'subunit'") print("Added dispensings.dispense_mode") if _table_exists(cursor, "variant_packs") and _table_exists(cursor, "drug_variants"): cursor.execute( """ INSERT INTO variant_packs (drug_variant_id, label, pack_unit_name, pack_size_in_base_units, is_active) SELECT v.id, '1 ' || COALESCE(NULLIF(v.unit, ''), 'unit'), COALESCE(NULLIF(v.unit, ''), 'unit'), 1, 1 FROM drug_variants v WHERE NOT EXISTS ( SELECT 1 FROM variant_packs p WHERE p.drug_variant_id = v.id ) """ ) print("Ensured default pack rows for variants") if _table_exists(cursor, "batches") and _table_exists(cursor, "variant_packs"): cursor.execute( """ UPDATE batches SET received_pack_id = ( SELECT p.id FROM variant_packs p WHERE p.drug_variant_id = batches.drug_variant_id ORDER BY p.id ASC LIMIT 1 ), received_pack_count = quantity WHERE received_pack_id IS NULL """ ) print("Backfilled batches pack context where missing") cursor.execute( """ UPDATE batches SET received_pack_size_snapshot = ( SELECT p.pack_size_in_base_units FROM variant_packs p WHERE p.id = batches.received_pack_id LIMIT 1 ) WHERE received_pack_id IS NOT NULL AND (received_pack_size_snapshot IS NULL OR received_pack_size_snapshot <= 0) """ ) print("Backfilled batches pack size snapshot where missing") cursor.execute( """ UPDATE batches SET current_full_pack_count = CASE WHEN COALESCE(received_pack_size_snapshot, 0) > 0 THEN CAST(quantity / received_pack_size_snapshot AS INTEGER) ELSE NULL END, current_loose_base_units = CASE WHEN COALESCE(received_pack_size_snapshot, 0) > 0 THEN quantity - (CAST(quantity / received_pack_size_snapshot AS INTEGER) * received_pack_size_snapshot) ELSE NULL END """ ) print("Backfilled batches live pack state") if _table_exists(cursor, "dispensings") and _table_exists(cursor, "variant_packs"): cursor.execute( """ UPDATE dispensings SET requested_pack_id = ( SELECT p.id FROM variant_packs p WHERE p.drug_variant_id = dispensings.drug_variant_id ORDER BY p.id ASC LIMIT 1 ), requested_pack_count = quantity WHERE requested_pack_id IS NULL """ ) print("Backfilled dispensing pack context where missing") cursor.execute( """ UPDATE dispensings SET dispense_mode = CASE WHEN requested_pack_id IS NOT NULL AND requested_pack_count IS NOT NULL THEN 'pack' ELSE 'subunit' END WHERE dispense_mode IS NULL OR TRIM(dispense_mode) = '' """ ) print("Backfilled dispensing mode where missing") # Seed default locations once table exists (created via SQLAlchemy create_all). if _table_exists(cursor, "locations"): cursor.execute("INSERT OR IGNORE INTO locations(name, is_active) VALUES ('Cupboard', 1)") cursor.execute("INSERT OR IGNORE INTO locations(name, is_active) VALUES ('Fridge', 1)") print("Ensured default locations exist") conn.commit() print("Compliance migration completed") except sqlite3.Error as exc: conn.rollback() raise RuntimeError(f"Compliance migration failed: {exc}") from exc finally: conn.close()