Files
ppr-ng/backend/app/api/deps.py
James Pattinson fb21329109 RBAC in the API
2025-10-23 20:02:54 +00:00

73 lines
2.2 KiB
Python

from typing import Generator
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.orm import Session
from app.db.session import SessionLocal
from app.core.security import verify_token
from app.crud.crud_user import user as crud_user
from app.models.ppr import UserRole
security = HTTPBearer()
def get_db() -> Generator:
"""Database dependency"""
try:
db = SessionLocal()
yield db
finally:
db.close()
async def get_current_user(
db: Session = Depends(get_db),
credentials: HTTPAuthorizationCredentials = Depends(security)
):
"""Get current authenticated user"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
username = verify_token(credentials.credentials)
if username is None:
raise credentials_exception
user = crud_user.get_by_username(db, username=username)
if user is None:
raise credentials_exception
return user
def get_current_active_user(
current_user = Depends(get_current_user),
):
"""Get current active user (for future use if we add user status)"""
return current_user
def get_current_admin_user(current_user = Depends(get_current_user)):
"""Get current user and ensure they are an administrator"""
if current_user.role != UserRole.ADMINISTRATOR:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions"
)
return current_user
def get_current_operator_user(current_user = Depends(get_current_user)):
"""Get current user and ensure they are an operator or administrator"""
if current_user.role not in [UserRole.OPERATOR, UserRole.ADMINISTRATOR]:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions"
)
return current_user
def get_current_read_user(current_user = Depends(get_current_user)):
"""Get current user (read-only or higher)"""
return current_user