Files
mt-drugs/backend/app/migrate_compliance.py
T
2026-04-06 10:41:33 -04:00

246 lines
11 KiB
Python

"""
Compliance schema migration helpers.
This module applies additive migrations for SQLite databases used by this project.
It is intentionally lightweight and idempotent because the project does not yet
use Alembic-style versioned migrations.
"""
import os
import sqlite3
from pathlib import Path
DEFAULT_DB_URL = "sqlite:///./data/drugs.db"
def _resolve_sqlite_path(db_url: str) -> Path | None:
if not db_url.startswith("sqlite:///"):
print(f"Unsupported database URL for compliance migration: {db_url}")
return None
raw_path = db_url.replace("sqlite:///", "")
if raw_path.startswith("/"):
return Path(raw_path)
return Path(raw_path)
def _column_exists(cursor: sqlite3.Cursor, table_name: str, column_name: str) -> bool:
cursor.execute(f"PRAGMA table_info({table_name})")
columns = [row[1] for row in cursor.fetchall()]
return column_name in columns
def _table_exists(cursor: sqlite3.Cursor, table_name: str) -> bool:
cursor.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name=?",
(table_name,),
)
return cursor.fetchone() is not None
def migrate_compliance_schema() -> None:
"""Apply additive schema changes needed for compliance features."""
db_url = os.getenv("DATABASE_URL", DEFAULT_DB_URL)
db_path = _resolve_sqlite_path(db_url)
if db_path is None:
return
if not db_path.exists():
print(f"Database does not exist at {db_path}, skipping compliance migration")
return
print(f"Running compliance migration on {db_path}")
conn = sqlite3.connect(str(db_path))
cursor = conn.cursor()
try:
if _table_exists(cursor, "drug_variants") and not _table_exists(cursor, "variant_packs"):
cursor.execute(
"""
CREATE TABLE variant_packs (
id INTEGER PRIMARY KEY,
drug_variant_id INTEGER NOT NULL,
label VARCHAR NOT NULL,
pack_unit_name VARCHAR NOT NULL DEFAULT 'pack',
pack_size_in_base_units FLOAT NOT NULL DEFAULT 1,
is_active BOOLEAN NOT NULL DEFAULT 1,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY(drug_variant_id) REFERENCES drug_variants(id)
)
"""
)
cursor.execute("CREATE INDEX IF NOT EXISTS ix_variant_packs_drug_variant_id ON variant_packs(drug_variant_id)")
print("Created variant_packs table")
if _table_exists(cursor, "drugs") and not _column_exists(cursor, "drugs", "is_controlled"):
cursor.execute("ALTER TABLE drugs ADD COLUMN is_controlled BOOLEAN NOT NULL DEFAULT 0")
print("Added drugs.is_controlled")
if _table_exists(cursor, "dispensings") and not _column_exists(cursor, "dispensings", "batch_id"):
cursor.execute("ALTER TABLE dispensings ADD COLUMN batch_id INTEGER")
print("Added dispensings.batch_id")
if _table_exists(cursor, "dispensings") and not _column_exists(cursor, "dispensings", "actor_user_id"):
cursor.execute("ALTER TABLE dispensings ADD COLUMN actor_user_id INTEGER")
print("Added dispensings.actor_user_id")
if _table_exists(cursor, "batches") and not _column_exists(cursor, "batches", "received_pack_id"):
cursor.execute("ALTER TABLE batches ADD COLUMN received_pack_id INTEGER")
print("Added batches.received_pack_id")
if _table_exists(cursor, "batches") and not _column_exists(cursor, "batches", "received_pack_count"):
cursor.execute("ALTER TABLE batches ADD COLUMN received_pack_count FLOAT")
print("Added batches.received_pack_count")
if _table_exists(cursor, "batches") and not _column_exists(cursor, "batches", "received_pack_size_snapshot"):
cursor.execute("ALTER TABLE batches ADD COLUMN received_pack_size_snapshot FLOAT")
print("Added batches.received_pack_size_snapshot")
if _table_exists(cursor, "batches") and not _column_exists(cursor, "batches", "current_full_pack_count"):
cursor.execute("ALTER TABLE batches ADD COLUMN current_full_pack_count FLOAT")
print("Added batches.current_full_pack_count")
if _table_exists(cursor, "batches") and not _column_exists(cursor, "batches", "current_loose_base_units"):
cursor.execute("ALTER TABLE batches ADD COLUMN current_loose_base_units FLOAT")
print("Added batches.current_loose_base_units")
if _table_exists(cursor, "batches") and not _column_exists(cursor, "batches", "disposed_at"):
cursor.execute("ALTER TABLE batches ADD COLUMN disposed_at DATETIME")
print("Added batches.disposed_at")
if _table_exists(cursor, "batches") and not _column_exists(cursor, "batches", "disposed_by_user_id"):
cursor.execute("ALTER TABLE batches ADD COLUMN disposed_by_user_id INTEGER")
print("Added batches.disposed_by_user_id")
if _table_exists(cursor, "batches") and not _column_exists(cursor, "batches", "disposed_quantity"):
cursor.execute("ALTER TABLE batches ADD COLUMN disposed_quantity FLOAT")
print("Added batches.disposed_quantity")
if _table_exists(cursor, "batches") and not _column_exists(cursor, "batches", "disposal_notes"):
cursor.execute("ALTER TABLE batches ADD COLUMN disposal_notes VARCHAR")
print("Added batches.disposal_notes")
if _table_exists(cursor, "dispensings") and not _column_exists(cursor, "dispensings", "requested_pack_id"):
cursor.execute("ALTER TABLE dispensings ADD COLUMN requested_pack_id INTEGER")
print("Added dispensings.requested_pack_id")
if _table_exists(cursor, "dispensings") and not _column_exists(cursor, "dispensings", "requested_pack_count"):
cursor.execute("ALTER TABLE dispensings ADD COLUMN requested_pack_count FLOAT")
print("Added dispensings.requested_pack_count")
if _table_exists(cursor, "dispensings") and not _column_exists(cursor, "dispensings", "dispense_mode"):
cursor.execute("ALTER TABLE dispensings ADD COLUMN dispense_mode VARCHAR NOT NULL DEFAULT 'subunit'")
print("Added dispensings.dispense_mode")
if _table_exists(cursor, "variant_packs") and _table_exists(cursor, "drug_variants"):
cursor.execute(
"""
INSERT INTO variant_packs (drug_variant_id, label, pack_unit_name, pack_size_in_base_units, is_active)
SELECT v.id,
'1 ' || COALESCE(NULLIF(v.unit, ''), 'unit'),
COALESCE(NULLIF(v.unit, ''), 'unit'),
1,
1
FROM drug_variants v
WHERE NOT EXISTS (
SELECT 1
FROM variant_packs p
WHERE p.drug_variant_id = v.id
)
"""
)
print("Ensured default pack rows for variants")
if _table_exists(cursor, "batches") and _table_exists(cursor, "variant_packs"):
cursor.execute(
"""
UPDATE batches
SET received_pack_id = (
SELECT p.id
FROM variant_packs p
WHERE p.drug_variant_id = batches.drug_variant_id
ORDER BY p.id ASC
LIMIT 1
),
received_pack_count = quantity
WHERE received_pack_id IS NULL
"""
)
print("Backfilled batches pack context where missing")
cursor.execute(
"""
UPDATE batches
SET received_pack_size_snapshot = (
SELECT p.pack_size_in_base_units
FROM variant_packs p
WHERE p.id = batches.received_pack_id
LIMIT 1
)
WHERE received_pack_id IS NOT NULL
AND (received_pack_size_snapshot IS NULL OR received_pack_size_snapshot <= 0)
"""
)
print("Backfilled batches pack size snapshot where missing")
cursor.execute(
"""
UPDATE batches
SET current_full_pack_count = CASE
WHEN COALESCE(received_pack_size_snapshot, 0) > 0 THEN CAST(quantity / received_pack_size_snapshot AS INTEGER)
ELSE NULL
END,
current_loose_base_units = CASE
WHEN COALESCE(received_pack_size_snapshot, 0) > 0 THEN quantity - (CAST(quantity / received_pack_size_snapshot AS INTEGER) * received_pack_size_snapshot)
ELSE NULL
END
"""
)
print("Backfilled batches live pack state")
if _table_exists(cursor, "dispensings") and _table_exists(cursor, "variant_packs"):
cursor.execute(
"""
UPDATE dispensings
SET requested_pack_id = (
SELECT p.id
FROM variant_packs p
WHERE p.drug_variant_id = dispensings.drug_variant_id
ORDER BY p.id ASC
LIMIT 1
),
requested_pack_count = quantity
WHERE requested_pack_id IS NULL
"""
)
print("Backfilled dispensing pack context where missing")
cursor.execute(
"""
UPDATE dispensings
SET dispense_mode = CASE
WHEN requested_pack_id IS NOT NULL AND requested_pack_count IS NOT NULL THEN 'pack'
ELSE 'subunit'
END
WHERE dispense_mode IS NULL OR TRIM(dispense_mode) = ''
"""
)
print("Backfilled dispensing mode where missing")
# Seed default locations once table exists (created via SQLAlchemy create_all).
if _table_exists(cursor, "locations"):
cursor.execute("INSERT OR IGNORE INTO locations(name, is_active) VALUES ('Cupboard', 1)")
cursor.execute("INSERT OR IGNORE INTO locations(name, is_active) VALUES ('Fridge', 1)")
print("Ensured default locations exist")
conn.commit()
print("Compliance migration completed")
except sqlite3.Error as exc:
conn.rollback()
raise RuntimeError(f"Compliance migration failed: {exc}") from exc
finally:
conn.close()