from datetime import datetime, timedelta from typing import Optional from jose import JWTError, jwt from passlib.context import CryptContext from fastapi import Depends, HTTPException, status, Header from sqlalchemy.orm import Session from .database import get_db from .models import User # Configuration SECRET_KEY = "your-secret-key-change-in-production" # Change this in production! ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 720 pwd_context = CryptContext(schemes=["argon2"], deprecated="auto") def hash_password(password: str) -> str: """Hash a password using bcrypt""" return pwd_context.hash(password) def verify_password(plain_password: str, hashed_password: str) -> bool: """Verify a password against its hash""" return pwd_context.verify(plain_password, hashed_password) def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): """Create a JWT access token""" to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=15) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt def get_current_user(authorization: Optional[str] = Header(None), db: Session = Depends(get_db)) -> User: """Get the current authenticated user from JWT token""" if not authorization: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Missing authorization header", headers={"WWW-Authenticate": "Bearer"}, ) try: scheme, credentials = authorization.split() if scheme.lower() != "bearer": raise ValueError("Invalid auth scheme") except (ValueError, AttributeError): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authorization header format", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(credentials, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication credentials", headers={"WWW-Authenticate": "Bearer"}, ) except JWTError: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication credentials", headers={"WWW-Authenticate": "Bearer"}, ) user = db.query(User).filter(User.username == username).first() if user is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication credentials", headers={"WWW-Authenticate": "Bearer"}, ) return user def get_current_admin_user(current_user: User = Depends(get_current_user)) -> User: """Get the current user and verify they are an admin""" if not current_user.is_admin: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions. Admin access required." ) return current_user