104 lines
4.2 KiB
Python
104 lines
4.2 KiB
Python
"""
|
|
GTIN mapping table migration.
|
|
|
|
Creates the gtin_mappings table if it does not already exist.
|
|
Idempotent and safe to run on every startup.
|
|
"""
|
|
|
|
import os
|
|
import sqlite3
|
|
from pathlib import Path
|
|
|
|
DEFAULT_DB_URL = "sqlite:///./data/drugs.db"
|
|
|
|
|
|
def _resolve_sqlite_path(db_url: str) -> Path | None:
|
|
if not db_url.startswith("sqlite:///"):
|
|
print(f"Unsupported database URL for GTIN migration: {db_url}")
|
|
return None
|
|
raw_path = db_url.replace("sqlite:///", "")
|
|
if raw_path.startswith("/"):
|
|
return Path(raw_path)
|
|
return Path(raw_path)
|
|
|
|
|
|
def _table_exists(cursor: sqlite3.Cursor, table_name: str) -> bool:
|
|
cursor.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='table' AND name=?",
|
|
(table_name,),
|
|
)
|
|
return cursor.fetchone() is not None
|
|
|
|
|
|
def _column_exists(cursor: sqlite3.Cursor, table_name: str, column_name: str) -> bool:
|
|
cursor.execute(f"PRAGMA table_info({table_name})")
|
|
return any(row[1] == column_name for row in cursor.fetchall())
|
|
|
|
|
|
def migrate_gtin_schema() -> None:
|
|
"""Create gtin_mappings table if it does not exist, and drop label from variant_packs."""
|
|
db_url = os.getenv("DATABASE_URL", DEFAULT_DB_URL)
|
|
db_path = _resolve_sqlite_path(db_url)
|
|
if db_path is None:
|
|
return
|
|
|
|
if not db_path.exists():
|
|
print(f"Database does not exist at {db_path}, skipping GTIN migration")
|
|
return
|
|
|
|
print(f"Running GTIN migration on {db_path}")
|
|
|
|
conn = sqlite3.connect(str(db_path))
|
|
cursor = conn.cursor()
|
|
|
|
try:
|
|
if not _table_exists(cursor, "gtin_mappings"):
|
|
cursor.execute("""
|
|
CREATE TABLE gtin_mappings (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
gtin VARCHAR(14) NOT NULL,
|
|
drug_variant_id INTEGER NOT NULL REFERENCES drug_variants(id),
|
|
variant_pack_id INTEGER NOT NULL REFERENCES variant_packs(id),
|
|
created_by_user_id INTEGER REFERENCES users(id),
|
|
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
""")
|
|
cursor.execute("CREATE UNIQUE INDEX ix_gtin_mappings_gtin ON gtin_mappings (gtin)")
|
|
cursor.execute("CREATE INDEX ix_gtin_mappings_drug_variant_id ON gtin_mappings (drug_variant_id)")
|
|
cursor.execute("CREATE INDEX ix_gtin_mappings_variant_pack_id ON gtin_mappings (variant_pack_id)")
|
|
print("Created gtin_mappings table")
|
|
else:
|
|
print("gtin_mappings table already exists, skipping")
|
|
|
|
# Drop label column from variant_packs if it still exists
|
|
if _table_exists(cursor, "variant_packs") and _column_exists(cursor, "variant_packs", "label"):
|
|
print("Dropping label column from variant_packs")
|
|
cursor.execute("""
|
|
CREATE TABLE variant_packs_new (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
drug_variant_id INTEGER NOT NULL REFERENCES drug_variants(id),
|
|
pack_unit_name VARCHAR NOT NULL DEFAULT 'pack',
|
|
pack_size_in_base_units FLOAT NOT NULL DEFAULT 1,
|
|
is_active BOOLEAN NOT NULL DEFAULT 1,
|
|
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
""")
|
|
cursor.execute("""
|
|
INSERT INTO variant_packs_new (id, drug_variant_id, pack_unit_name, pack_size_in_base_units, is_active, created_at, updated_at)
|
|
SELECT id, drug_variant_id, pack_unit_name, pack_size_in_base_units, is_active, created_at, updated_at
|
|
FROM variant_packs
|
|
""")
|
|
# Re-create indexes
|
|
cursor.execute("DROP INDEX IF EXISTS ix_variant_packs_drug_variant_id")
|
|
cursor.execute("DROP TABLE variant_packs")
|
|
cursor.execute("ALTER TABLE variant_packs_new RENAME TO variant_packs")
|
|
cursor.execute("CREATE INDEX ix_variant_packs_drug_variant_id ON variant_packs (drug_variant_id)")
|
|
print("Dropped label column from variant_packs")
|
|
else:
|
|
print("variant_packs.label already absent, skipping")
|
|
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|