""" GTIN mapping table migration. Creates the gtin_mappings table if it does not already exist. Idempotent and safe to run on every startup. """ import os import sqlite3 from pathlib import Path DEFAULT_DB_URL = "sqlite:///./data/drugs.db" def _resolve_sqlite_path(db_url: str) -> Path | None: if not db_url.startswith("sqlite:///"): print(f"Unsupported database URL for GTIN migration: {db_url}") return None raw_path = db_url.replace("sqlite:///", "") if raw_path.startswith("/"): return Path(raw_path) return Path(raw_path) def _table_exists(cursor: sqlite3.Cursor, table_name: str) -> bool: cursor.execute( "SELECT name FROM sqlite_master WHERE type='table' AND name=?", (table_name,), ) return cursor.fetchone() is not None def _column_exists(cursor: sqlite3.Cursor, table_name: str, column_name: str) -> bool: cursor.execute(f"PRAGMA table_info({table_name})") return any(row[1] == column_name for row in cursor.fetchall()) def migrate_gtin_schema() -> None: """Create gtin_mappings table if it does not exist, and drop label from variant_packs.""" db_url = os.getenv("DATABASE_URL", DEFAULT_DB_URL) db_path = _resolve_sqlite_path(db_url) if db_path is None: return if not db_path.exists(): print(f"Database does not exist at {db_path}, skipping GTIN migration") return print(f"Running GTIN migration on {db_path}") conn = sqlite3.connect(str(db_path)) cursor = conn.cursor() try: if not _table_exists(cursor, "gtin_mappings"): cursor.execute(""" CREATE TABLE gtin_mappings ( id INTEGER PRIMARY KEY AUTOINCREMENT, gtin VARCHAR(14) NOT NULL, drug_variant_id INTEGER NOT NULL REFERENCES drug_variants(id), variant_pack_id INTEGER NOT NULL REFERENCES variant_packs(id), created_by_user_id INTEGER REFERENCES users(id), created_at DATETIME DEFAULT CURRENT_TIMESTAMP ) """) cursor.execute("CREATE UNIQUE INDEX ix_gtin_mappings_gtin ON gtin_mappings (gtin)") cursor.execute("CREATE INDEX ix_gtin_mappings_drug_variant_id ON gtin_mappings (drug_variant_id)") cursor.execute("CREATE INDEX ix_gtin_mappings_variant_pack_id ON gtin_mappings (variant_pack_id)") print("Created gtin_mappings table") else: print("gtin_mappings table already exists, skipping") # Drop label column from variant_packs if it still exists if _table_exists(cursor, "variant_packs") and _column_exists(cursor, "variant_packs", "label"): print("Dropping label column from variant_packs") cursor.execute(""" CREATE TABLE variant_packs_new ( id INTEGER PRIMARY KEY AUTOINCREMENT, drug_variant_id INTEGER NOT NULL REFERENCES drug_variants(id), pack_unit_name VARCHAR NOT NULL DEFAULT 'pack', pack_size_in_base_units FLOAT NOT NULL DEFAULT 1, is_active BOOLEAN NOT NULL DEFAULT 1, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ) """) cursor.execute(""" INSERT INTO variant_packs_new (id, drug_variant_id, pack_unit_name, pack_size_in_base_units, is_active, created_at, updated_at) SELECT id, drug_variant_id, pack_unit_name, pack_size_in_base_units, is_active, created_at, updated_at FROM variant_packs """) # Re-create indexes cursor.execute("DROP INDEX IF EXISTS ix_variant_packs_drug_variant_id") cursor.execute("DROP TABLE variant_packs") cursor.execute("ALTER TABLE variant_packs_new RENAME TO variant_packs") cursor.execute("CREATE INDEX ix_variant_packs_drug_variant_id ON variant_packs (drug_variant_id)") print("Dropped label column from variant_packs") else: print("variant_packs.label already absent, skipping") conn.commit() finally: conn.close()