84 lines
3.0 KiB
Python
84 lines
3.0 KiB
Python
"""
|
|
Compliance schema migration helpers.
|
|
|
|
This module applies additive migrations for SQLite databases used by this project.
|
|
It is intentionally lightweight and idempotent because the project does not yet
|
|
use Alembic-style versioned migrations.
|
|
"""
|
|
|
|
import os
|
|
import sqlite3
|
|
from pathlib import Path
|
|
|
|
|
|
DEFAULT_DB_URL = "sqlite:///./data/drugs.db"
|
|
|
|
|
|
def _resolve_sqlite_path(db_url: str) -> Path | None:
|
|
if not db_url.startswith("sqlite:///"):
|
|
print(f"Unsupported database URL for compliance migration: {db_url}")
|
|
return None
|
|
|
|
raw_path = db_url.replace("sqlite:///", "")
|
|
if raw_path.startswith("/"):
|
|
return Path(raw_path)
|
|
return Path(raw_path)
|
|
|
|
|
|
def _column_exists(cursor: sqlite3.Cursor, table_name: str, column_name: str) -> bool:
|
|
cursor.execute(f"PRAGMA table_info({table_name})")
|
|
columns = [row[1] for row in cursor.fetchall()]
|
|
return column_name in columns
|
|
|
|
|
|
def _table_exists(cursor: sqlite3.Cursor, table_name: str) -> bool:
|
|
cursor.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='table' AND name=?",
|
|
(table_name,),
|
|
)
|
|
return cursor.fetchone() is not None
|
|
|
|
|
|
def migrate_compliance_schema() -> None:
|
|
"""Apply additive schema changes needed for compliance features."""
|
|
db_url = os.getenv("DATABASE_URL", DEFAULT_DB_URL)
|
|
db_path = _resolve_sqlite_path(db_url)
|
|
if db_path is None:
|
|
return
|
|
|
|
if not db_path.exists():
|
|
print(f"Database does not exist at {db_path}, skipping compliance migration")
|
|
return
|
|
|
|
print(f"Running compliance migration on {db_path}")
|
|
|
|
conn = sqlite3.connect(str(db_path))
|
|
cursor = conn.cursor()
|
|
|
|
try:
|
|
if _table_exists(cursor, "drugs") and not _column_exists(cursor, "drugs", "is_controlled"):
|
|
cursor.execute("ALTER TABLE drugs ADD COLUMN is_controlled BOOLEAN NOT NULL DEFAULT 0")
|
|
print("Added drugs.is_controlled")
|
|
|
|
if _table_exists(cursor, "dispensings") and not _column_exists(cursor, "dispensings", "batch_id"):
|
|
cursor.execute("ALTER TABLE dispensings ADD COLUMN batch_id INTEGER")
|
|
print("Added dispensings.batch_id")
|
|
|
|
if _table_exists(cursor, "dispensings") and not _column_exists(cursor, "dispensings", "actor_user_id"):
|
|
cursor.execute("ALTER TABLE dispensings ADD COLUMN actor_user_id INTEGER")
|
|
print("Added dispensings.actor_user_id")
|
|
|
|
# Seed default locations once table exists (created via SQLAlchemy create_all).
|
|
if _table_exists(cursor, "locations"):
|
|
cursor.execute("INSERT OR IGNORE INTO locations(name, is_active) VALUES ('Cupboard', 1)")
|
|
cursor.execute("INSERT OR IGNORE INTO locations(name, is_active) VALUES ('Fridge', 1)")
|
|
print("Ensured default locations exist")
|
|
|
|
conn.commit()
|
|
print("Compliance migration completed")
|
|
except sqlite3.Error as exc:
|
|
conn.rollback()
|
|
raise RuntimeError(f"Compliance migration failed: {exc}") from exc
|
|
finally:
|
|
conn.close()
|