248 lines
11 KiB
Python
248 lines
11 KiB
Python
"""
|
|
Compliance schema migration helpers.
|
|
|
|
This module applies additive migrations for SQLite databases used by this project.
|
|
It is intentionally lightweight and idempotent because the project does not yet
|
|
use Alembic-style versioned migrations.
|
|
"""
|
|
|
|
import os
|
|
import sqlite3
|
|
from pathlib import Path
|
|
|
|
|
|
DEFAULT_DB_URL = "sqlite:///./data/drugs.db"
|
|
|
|
|
|
def _resolve_sqlite_path(db_url: str) -> Path | None:
|
|
if not db_url.startswith("sqlite:///"):
|
|
print(f"Unsupported database URL for compliance migration: {db_url}")
|
|
return None
|
|
|
|
raw_path = db_url.replace("sqlite:///", "")
|
|
if raw_path.startswith("/"):
|
|
return Path(raw_path)
|
|
return Path(raw_path)
|
|
|
|
|
|
def _column_exists(cursor: sqlite3.Cursor, table_name: str, column_name: str) -> bool:
|
|
cursor.execute(f"PRAGMA table_info({table_name})")
|
|
columns = [row[1] for row in cursor.fetchall()]
|
|
return column_name in columns
|
|
|
|
|
|
def _table_exists(cursor: sqlite3.Cursor, table_name: str) -> bool:
|
|
cursor.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='table' AND name=?",
|
|
(table_name,),
|
|
)
|
|
return cursor.fetchone() is not None
|
|
|
|
|
|
def migrate_compliance_schema() -> None:
|
|
"""Apply additive schema changes needed for compliance features."""
|
|
db_url = os.getenv("DATABASE_URL", DEFAULT_DB_URL)
|
|
db_path = _resolve_sqlite_path(db_url)
|
|
if db_path is None:
|
|
return
|
|
|
|
if not db_path.exists():
|
|
print(f"Database does not exist at {db_path}, skipping compliance migration")
|
|
return
|
|
|
|
print(f"Running compliance migration on {db_path}")
|
|
|
|
conn = sqlite3.connect(str(db_path))
|
|
cursor = conn.cursor()
|
|
|
|
try:
|
|
if _table_exists(cursor, "drug_variants") and not _table_exists(cursor, "variant_packs"):
|
|
cursor.execute(
|
|
"""
|
|
CREATE TABLE variant_packs (
|
|
id INTEGER PRIMARY KEY,
|
|
drug_variant_id INTEGER NOT NULL,
|
|
pack_unit_name VARCHAR NOT NULL DEFAULT 'pack',
|
|
pack_size_in_base_units FLOAT NOT NULL DEFAULT 1,
|
|
is_active BOOLEAN NOT NULL DEFAULT 1,
|
|
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
FOREIGN KEY(drug_variant_id) REFERENCES drug_variants(id)
|
|
)
|
|
"""
|
|
)
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS ix_variant_packs_drug_variant_id ON variant_packs(drug_variant_id)")
|
|
print("Created variant_packs table")
|
|
|
|
if _table_exists(cursor, "drugs") and not _column_exists(cursor, "drugs", "is_controlled"):
|
|
cursor.execute("ALTER TABLE drugs ADD COLUMN is_controlled BOOLEAN NOT NULL DEFAULT 0")
|
|
print("Added drugs.is_controlled")
|
|
|
|
if _table_exists(cursor, "dispensings") and not _column_exists(cursor, "dispensings", "batch_id"):
|
|
cursor.execute("ALTER TABLE dispensings ADD COLUMN batch_id INTEGER")
|
|
print("Added dispensings.batch_id")
|
|
|
|
if _table_exists(cursor, "dispensings") and not _column_exists(cursor, "dispensings", "actor_user_id"):
|
|
cursor.execute("ALTER TABLE dispensings ADD COLUMN actor_user_id INTEGER")
|
|
print("Added dispensings.actor_user_id")
|
|
|
|
if _table_exists(cursor, "batches") and not _column_exists(cursor, "batches", "received_pack_id"):
|
|
cursor.execute("ALTER TABLE batches ADD COLUMN received_pack_id INTEGER")
|
|
print("Added batches.received_pack_id")
|
|
|
|
if _table_exists(cursor, "batches") and not _column_exists(cursor, "batches", "received_pack_count"):
|
|
cursor.execute("ALTER TABLE batches ADD COLUMN received_pack_count FLOAT")
|
|
print("Added batches.received_pack_count")
|
|
|
|
if _table_exists(cursor, "batches") and not _column_exists(cursor, "batches", "received_pack_size_snapshot"):
|
|
cursor.execute("ALTER TABLE batches ADD COLUMN received_pack_size_snapshot FLOAT")
|
|
print("Added batches.received_pack_size_snapshot")
|
|
|
|
if _table_exists(cursor, "batches") and not _column_exists(cursor, "batches", "current_full_pack_count"):
|
|
cursor.execute("ALTER TABLE batches ADD COLUMN current_full_pack_count FLOAT")
|
|
print("Added batches.current_full_pack_count")
|
|
|
|
if _table_exists(cursor, "batches") and not _column_exists(cursor, "batches", "current_loose_base_units"):
|
|
cursor.execute("ALTER TABLE batches ADD COLUMN current_loose_base_units FLOAT")
|
|
print("Added batches.current_loose_base_units")
|
|
|
|
if _table_exists(cursor, "batches") and not _column_exists(cursor, "batches", "disposed_at"):
|
|
cursor.execute("ALTER TABLE batches ADD COLUMN disposed_at DATETIME")
|
|
print("Added batches.disposed_at")
|
|
|
|
if _table_exists(cursor, "batches") and not _column_exists(cursor, "batches", "disposed_by_user_id"):
|
|
cursor.execute("ALTER TABLE batches ADD COLUMN disposed_by_user_id INTEGER")
|
|
print("Added batches.disposed_by_user_id")
|
|
|
|
if _table_exists(cursor, "batches") and not _column_exists(cursor, "batches", "disposed_quantity"):
|
|
cursor.execute("ALTER TABLE batches ADD COLUMN disposed_quantity FLOAT")
|
|
print("Added batches.disposed_quantity")
|
|
|
|
if _table_exists(cursor, "batches") and not _column_exists(cursor, "batches", "disposal_notes"):
|
|
cursor.execute("ALTER TABLE batches ADD COLUMN disposal_notes VARCHAR")
|
|
print("Added batches.disposal_notes")
|
|
|
|
if _table_exists(cursor, "dispensings") and not _column_exists(cursor, "dispensings", "requested_pack_id"):
|
|
cursor.execute("ALTER TABLE dispensings ADD COLUMN requested_pack_id INTEGER")
|
|
print("Added dispensings.requested_pack_id")
|
|
|
|
if _table_exists(cursor, "dispensings") and not _column_exists(cursor, "dispensings", "requested_pack_count"):
|
|
cursor.execute("ALTER TABLE dispensings ADD COLUMN requested_pack_count FLOAT")
|
|
print("Added dispensings.requested_pack_count")
|
|
|
|
if _table_exists(cursor, "dispensings") and not _column_exists(cursor, "dispensings", "dispense_mode"):
|
|
cursor.execute("ALTER TABLE dispensings ADD COLUMN dispense_mode VARCHAR NOT NULL DEFAULT 'subunit'")
|
|
print("Added dispensings.dispense_mode")
|
|
|
|
if _table_exists(cursor, "variant_packs") and _table_exists(cursor, "drug_variants"):
|
|
cursor.execute(
|
|
"""
|
|
INSERT INTO variant_packs (drug_variant_id, pack_unit_name, pack_size_in_base_units, is_active)
|
|
SELECT v.id,
|
|
COALESCE(NULLIF(v.unit, ''), 'unit'),
|
|
1,
|
|
1
|
|
FROM drug_variants v
|
|
WHERE NOT EXISTS (
|
|
SELECT 1
|
|
FROM variant_packs p
|
|
WHERE p.drug_variant_id = v.id
|
|
)
|
|
"""
|
|
)
|
|
print("Ensured default pack rows for variants")
|
|
|
|
if _table_exists(cursor, "batches") and _table_exists(cursor, "variant_packs"):
|
|
cursor.execute(
|
|
"""
|
|
UPDATE batches
|
|
SET received_pack_id = (
|
|
SELECT p.id
|
|
FROM variant_packs p
|
|
WHERE p.drug_variant_id = batches.drug_variant_id
|
|
ORDER BY p.id ASC
|
|
LIMIT 1
|
|
),
|
|
received_pack_count = quantity
|
|
WHERE received_pack_id IS NULL
|
|
"""
|
|
)
|
|
print("Backfilled batches pack context where missing")
|
|
|
|
cursor.execute(
|
|
"""
|
|
UPDATE batches
|
|
SET received_pack_size_snapshot = (
|
|
SELECT p.pack_size_in_base_units
|
|
FROM variant_packs p
|
|
WHERE p.id = batches.received_pack_id
|
|
LIMIT 1
|
|
)
|
|
WHERE received_pack_id IS NOT NULL
|
|
AND (received_pack_size_snapshot IS NULL OR received_pack_size_snapshot <= 0)
|
|
"""
|
|
)
|
|
print("Backfilled batches pack size snapshot where missing")
|
|
|
|
cursor.execute(
|
|
"""
|
|
UPDATE batches
|
|
SET current_full_pack_count = CASE
|
|
WHEN COALESCE(received_pack_size_snapshot, 0) > 0 THEN CAST(quantity / received_pack_size_snapshot AS INTEGER)
|
|
ELSE NULL
|
|
END,
|
|
current_loose_base_units = CASE
|
|
WHEN COALESCE(received_pack_size_snapshot, 0) > 0 THEN quantity - (CAST(quantity / received_pack_size_snapshot AS INTEGER) * received_pack_size_snapshot)
|
|
ELSE NULL
|
|
END
|
|
"""
|
|
)
|
|
print("Backfilled batches live pack state")
|
|
|
|
if _table_exists(cursor, "dispensings") and _table_exists(cursor, "variant_packs"):
|
|
cursor.execute(
|
|
"""
|
|
UPDATE dispensings
|
|
SET requested_pack_id = (
|
|
SELECT p.id
|
|
FROM variant_packs p
|
|
WHERE p.drug_variant_id = dispensings.drug_variant_id
|
|
ORDER BY p.id ASC
|
|
LIMIT 1
|
|
),
|
|
requested_pack_count = quantity
|
|
WHERE requested_pack_id IS NULL
|
|
"""
|
|
)
|
|
print("Backfilled dispensing pack context where missing")
|
|
|
|
cursor.execute(
|
|
"""
|
|
UPDATE dispensings
|
|
SET dispense_mode = CASE
|
|
WHEN requested_pack_id IS NOT NULL AND requested_pack_count IS NOT NULL THEN 'pack'
|
|
ELSE 'subunit'
|
|
END
|
|
WHERE dispense_mode IS NULL OR TRIM(dispense_mode) = ''
|
|
"""
|
|
)
|
|
print("Backfilled dispensing mode where missing")
|
|
|
|
if _table_exists(cursor, "dispensings") and not _column_exists(cursor, "dispensings", "prescribing_vet"):
|
|
cursor.execute("ALTER TABLE dispensings ADD COLUMN prescribing_vet VARCHAR")
|
|
print("Added dispensings.prescribing_vet")
|
|
|
|
# Seed default locations once table exists (created via SQLAlchemy create_all).
|
|
if _table_exists(cursor, "locations"):
|
|
cursor.execute("INSERT OR IGNORE INTO locations(name, is_active) VALUES ('Cupboard', 1)")
|
|
cursor.execute("INSERT OR IGNORE INTO locations(name, is_active) VALUES ('Fridge', 1)")
|
|
print("Ensured default locations exist")
|
|
|
|
conn.commit()
|
|
print("Compliance migration completed")
|
|
except sqlite3.Error as exc:
|
|
conn.rollback()
|
|
raise RuntimeError(f"Compliance migration failed: {exc}") from exc
|
|
finally:
|
|
conn.close()
|