from datetime import timedelta from typing import List from fastapi import APIRouter, Depends, HTTPException, status from fastapi.security import OAuth2PasswordRequestForm from sqlalchemy.orm import Session from app.api.deps import get_db, get_current_admin_user, get_current_read_user from app.core.config import settings from app.core.security import create_access_token from app.crud.crud_user import user as crud_user from app.schemas.ppr import Token, UserCreate, UserUpdate, User, ChangePassword router = APIRouter() @router.post("/login", response_model=Token) async def login_for_access_token( db: Session = Depends(get_db), form_data: OAuth2PasswordRequestForm = Depends() ): """OAuth2 compatible token login, get an access token for future requests""" user = crud_user.authenticate( db, username=form_data.username, password=form_data.password ) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) access_token_expires = timedelta(minutes=settings.access_token_expire_minutes) access_token = create_access_token( subject=user.username, expires_delta=access_token_expires ) return { "access_token": access_token, "token_type": "bearer", "expires_in": settings.access_token_expire_minutes * 60 # seconds } @router.post("/test-token", response_model=User) async def test_token(current_user = Depends(get_current_read_user)): """Test access token""" return current_user @router.get("/users", response_model=List[User]) async def list_users( db: Session = Depends(get_db), skip: int = 0, limit: int = 100, current_user = Depends(get_current_admin_user) ): """List all users (admin only)""" users = crud_user.get_multi(db, skip=skip, limit=limit) return users @router.get("/users/{user_id}", response_model=User) async def get_user( user_id: int, db: Session = Depends(get_db), current_user = Depends(get_current_admin_user) ): """Get a specific user's details (admin only)""" user = crud_user.get(db, user_id=user_id) if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) return user @router.post("/users", response_model=User) async def create_user( user_in: UserCreate, db: Session = Depends(get_db), current_user = Depends(get_current_admin_user) ): """Create a new user (admin only)""" user = crud_user.get_by_username(db, username=user_in.username) if user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Username already registered" ) user = crud_user.create(db, obj_in=user_in) return user @router.put("/users/{user_id}", response_model=User) async def update_user( user_id: int, user_in: UserUpdate, db: Session = Depends(get_db), current_user = Depends(get_current_admin_user) ): """Update a user (admin only)""" user = crud_user.get(db, user_id=user_id) if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) user = crud_user.update(db, db_obj=user, obj_in=user_in) return user @router.post("/users/{user_id}/change-password", response_model=User) async def change_user_password( user_id: int, password_data: ChangePassword, db: Session = Depends(get_db), current_user = Depends(get_current_admin_user) ): """Change a user's password (admin only)""" user = crud_user.get(db, user_id=user_id) if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) user = crud_user.change_password(db, db_obj=user, new_password=password_data.password) return user