73 lines
2.2 KiB
Python
73 lines
2.2 KiB
Python
from typing import Generator
|
|
from fastapi import Depends, HTTPException, status
|
|
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
|
from sqlalchemy.orm import Session
|
|
from app.db.session import SessionLocal
|
|
from app.core.security import verify_token
|
|
from app.crud.crud_user import user as crud_user
|
|
from app.models.ppr import UserRole
|
|
|
|
security = HTTPBearer()
|
|
|
|
|
|
def get_db() -> Generator:
|
|
"""Database dependency"""
|
|
try:
|
|
db = SessionLocal()
|
|
yield db
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
async def get_current_user(
|
|
db: Session = Depends(get_db),
|
|
credentials: HTTPAuthorizationCredentials = Depends(security)
|
|
):
|
|
"""Get current authenticated user"""
|
|
credentials_exception = HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Could not validate credentials",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
|
|
username = verify_token(credentials.credentials)
|
|
if username is None:
|
|
raise credentials_exception
|
|
|
|
user = crud_user.get_by_username(db, username=username)
|
|
if user is None:
|
|
raise credentials_exception
|
|
|
|
return user
|
|
|
|
|
|
def get_current_active_user(
|
|
current_user = Depends(get_current_user),
|
|
):
|
|
"""Get current active user (for future use if we add user status)"""
|
|
return current_user
|
|
|
|
|
|
def get_current_admin_user(current_user = Depends(get_current_user)):
|
|
"""Get current user and ensure they are an administrator"""
|
|
if current_user.role != UserRole.ADMINISTRATOR:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Not enough permissions"
|
|
)
|
|
return current_user
|
|
|
|
|
|
def get_current_operator_user(current_user = Depends(get_current_user)):
|
|
"""Get current user and ensure they are an operator or administrator"""
|
|
if current_user.role not in [UserRole.OPERATOR, UserRole.ADMINISTRATOR]:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Not enough permissions"
|
|
)
|
|
return current_user
|
|
|
|
|
|
def get_current_read_user(current_user = Depends(get_current_user)):
|
|
"""Get current user (read-only or higher)"""
|
|
return current_user |