Backup script

This commit is contained in:
2026-06-19 17:42:28 +01:00
parent b093e1a90c
commit 0026ac2274
4 changed files with 154 additions and 8 deletions
+103
View File
@@ -0,0 +1,103 @@
#!/usr/bin/env python3
import gzip
import os
import shutil
import sqlite3
import time
from datetime import datetime, timezone
from pathlib import Path
DB_PATH = Path(os.getenv("SQLITE_DB_PATH", "/data/drugs.db"))
BACKUP_DIR = Path(os.getenv("BACKUP_DIR", "/data/backups"))
INTERVAL_SECONDS = int(os.getenv("BACKUP_INTERVAL_SECONDS", "3600"))
RETENTION_DAYS = int(os.getenv("BACKUP_RETENTION_DAYS", "7"))
LATEST_BACKUP = BACKUP_DIR / "latest.db.gz"
def utc_stamp() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H-%M-%SZ")
def log(message: str) -> None:
print(f"[{utc_stamp()}] {message}", flush=True)
def run_integrity_check(db_path: Path) -> None:
with sqlite3.connect(str(db_path)) as conn:
result = conn.execute("PRAGMA integrity_check").fetchone()
if not result or result[0] != "ok":
detail = result[0] if result else "no result"
raise RuntimeError(f"SQLite integrity check failed: {detail}")
def create_backup() -> None:
if not DB_PATH.exists():
log(f"database not found at {DB_PATH}; skipping backup")
return
BACKUP_DIR.mkdir(parents=True, exist_ok=True)
stamp = utc_stamp()
temp_db = BACKUP_DIR / f".{stamp}.db.tmp"
temp_gz = BACKUP_DIR / f".{stamp}.db.gz.tmp"
final_gz = BACKUP_DIR / f"drugs-{stamp}.db.gz"
for path in (temp_db, temp_gz):
if path.exists():
path.unlink()
source_uri = f"file:{DB_PATH}?mode=ro"
with sqlite3.connect(source_uri, uri=True) as source:
with sqlite3.connect(str(temp_db)) as target:
source.backup(target)
run_integrity_check(temp_db)
with temp_db.open("rb") as raw, gzip.open(temp_gz, "wb", compresslevel=6) as compressed:
shutil.copyfileobj(raw, compressed)
temp_gz.replace(final_gz)
shutil.copy2(final_gz, LATEST_BACKUP)
temp_db.unlink(missing_ok=True)
size_kb = final_gz.stat().st_size / 1024
log(f"created {final_gz.name} ({size_kb:.1f} KiB)")
def prune_old_backups() -> None:
if RETENTION_DAYS <= 0 or not BACKUP_DIR.exists():
return
cutoff = time.time() - (RETENTION_DAYS * 24 * 60 * 60)
deleted = 0
for backup in BACKUP_DIR.glob("drugs-*.db.gz"):
if backup.stat().st_mtime < cutoff:
backup.unlink()
deleted += 1
if deleted:
log(f"deleted {deleted} backup(s) older than {RETENTION_DAYS} day(s)")
def main() -> None:
log(
"starting SQLite backup loop: "
f"db={DB_PATH}, dir={BACKUP_DIR}, interval={INTERVAL_SECONDS}s, "
f"retention={RETENTION_DAYS}d"
)
while True:
try:
create_backup()
prune_old_backups()
except Exception as exc:
log(f"backup failed: {exc}")
time.sleep(INTERVAL_SECONDS)
if __name__ == "__main__":
main()