RBAC in the API

This commit is contained in:
James Pattinson
2025-10-23 20:02:54 +00:00
parent 91b734426c
commit fb21329109
7 changed files with 131 additions and 21 deletions

View File

@@ -5,6 +5,7 @@ from sqlalchemy.orm import Session
from app.db.session import SessionLocal
from app.core.security import verify_token
from app.crud.crud_user import user as crud_user
from app.models.ppr import UserRole
security = HTTPBearer()
@@ -44,4 +45,29 @@ def get_current_active_user(
current_user = Depends(get_current_user),
):
"""Get current active user (for future use if we add user status)"""
return current_user
def get_current_admin_user(current_user = Depends(get_current_user)):
"""Get current user and ensure they are an administrator"""
if current_user.role != UserRole.ADMINISTRATOR:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions"
)
return current_user
def get_current_operator_user(current_user = Depends(get_current_user)):
"""Get current user and ensure they are an operator or administrator"""
if current_user.role not in [UserRole.OPERATOR, UserRole.ADMINISTRATOR]:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions"
)
return current_user
def get_current_read_user(current_user = Depends(get_current_user)):
"""Get current user (read-only or higher)"""
return current_user

View File

@@ -1,12 +1,13 @@
from datetime import timedelta
from typing import List
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from app.api.deps import get_db
from app.api.deps import get_db, get_current_admin_user, get_current_read_user
from app.core.config import settings
from app.core.security import create_access_token
from app.crud.crud_user import user as crud_user
from app.schemas.ppr import Token
from app.schemas.ppr import Token, UserCreate, UserUpdate, User
router = APIRouter()
@@ -26,16 +27,63 @@ async def login_for_access_token(
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=settings.access_token_expire_minutes)
access_token = create_access_token(
subject=user.username, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@router.post("/test-token")
async def test_token(current_user = Depends(get_db)):
@router.post("/test-token", response_model=User)
async def test_token(current_user = Depends(get_current_read_user)):
"""Test access token"""
return current_user
return current_user
@router.get("/users", response_model=List[User])
async def list_users(
db: Session = Depends(get_db),
skip: int = 0,
limit: int = 100,
current_user = Depends(get_current_admin_user)
):
"""List all users (admin only)"""
users = crud_user.get_multi(db, skip=skip, limit=limit)
return users
@router.post("/users", response_model=User)
async def create_user(
user_in: UserCreate,
db: Session = Depends(get_db),
current_user = Depends(get_current_admin_user)
):
"""Create a new user (admin only)"""
user = crud_user.get_by_username(db, username=user_in.username)
if user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Username already registered"
)
user = crud_user.create(db, obj_in=user_in)
return user
@router.put("/users/{user_id}", response_model=User)
async def update_user(
user_id: int,
user_in: UserUpdate,
db: Session = Depends(get_db),
current_user = Depends(get_current_admin_user)
):
"""Update a user (admin only)"""
user = crud_user.get(db, user_id=user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
user = crud_user.update(db, db_obj=user, obj_in=user_in)
return user

View File

@@ -2,7 +2,7 @@ from typing import List, Optional
from fastapi import APIRouter, Depends, HTTPException, status, Request
from sqlalchemy.orm import Session
from datetime import date
from app.api.deps import get_db, get_current_active_user
from app.api.deps import get_db, get_current_read_user, get_current_operator_user
from app.crud.crud_ppr import ppr as crud_ppr
from app.crud.crud_journal import journal as crud_journal
from app.schemas.ppr import PPR, PPRCreate, PPRUpdate, PPRStatus, PPRStatusUpdate, Journal
@@ -21,7 +21,7 @@ async def get_pprs(
date_from: Optional[date] = None,
date_to: Optional[date] = None,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user)
current_user: User = Depends(get_current_read_user)
):
"""Get PPR records with optional filtering"""
pprs = crud_ppr.get_multi(
@@ -36,7 +36,7 @@ async def create_ppr(
request: Request,
ppr_in: PPRCreate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user)
current_user: User = Depends(get_current_operator_user)
):
"""Create a new PPR record"""
client_ip = get_client_ip(request)
@@ -60,7 +60,7 @@ async def create_ppr(
async def get_ppr(
ppr_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user)
current_user: User = Depends(get_current_read_user)
):
"""Get a specific PPR record"""
ppr = crud_ppr.get(db, ppr_id=ppr_id)
@@ -78,7 +78,7 @@ async def update_ppr(
ppr_id: int,
ppr_in: PPRUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user)
current_user: User = Depends(get_current_operator_user)
):
"""Update a PPR record"""
db_ppr = crud_ppr.get(db, ppr_id=ppr_id)
@@ -111,7 +111,7 @@ async def patch_ppr(
ppr_id: int,
ppr_in: PPRUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user)
current_user: User = Depends(get_current_operator_user)
):
"""Partially update a PPR record (only provided fields will be updated)"""
db_ppr = crud_ppr.get(db, ppr_id=ppr_id)
@@ -144,7 +144,7 @@ async def update_ppr_status(
ppr_id: int,
status_update: PPRStatusUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user)
current_user: User = Depends(get_current_operator_user)
):
"""Update PPR status (LANDED, DEPARTED, etc.)"""
client_ip = get_client_ip(request)
@@ -182,7 +182,7 @@ async def delete_ppr(
request: Request,
ppr_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user)
current_user: User = Depends(get_current_operator_user)
):
"""Delete (soft delete) a PPR record"""
client_ip = get_client_ip(request)
@@ -210,7 +210,7 @@ async def delete_ppr(
async def get_ppr_journal(
ppr_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user)
current_user: User = Depends(get_current_read_user)
):
"""Get journal entries for a specific PPR"""
# Verify PPR exists