import hashlib import hmac from datetime import datetime, timedelta from typing import Optional, Union, Any from jose import JWTError, jwt from passlib.context import CryptContext from .config import settings from .datetime import utc_now pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") MACHINE_TOKEN_PREFIX = "sha256$" def create_access_token( subject: Union[str, Any], expires_delta: Optional[timedelta] = None ) -> str: """Create JWT access token""" if expires_delta: expire = utc_now() + expires_delta else: expire = utc_now() + timedelta( minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES ) to_encode = {"exp": expire, "sub": str(subject)} encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM) return encoded_jwt def verify_password(plain_password: str, hashed_password: str) -> bool: """Verify a password against a hash""" return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password: str) -> str: """Hash a password""" return pwd_context.hash(password) def get_machine_token_hash(token: str) -> str: """Hash a machine token for fast constant-time verification.""" digest = hashlib.sha256(token.encode("utf-8")).hexdigest() return f"{MACHINE_TOKEN_PREFIX}{digest}" def verify_machine_token(token: str, stored_hash: str) -> bool: """Verify a machine token, supporting legacy bcrypt hashes during migration.""" if not stored_hash: return False if stored_hash.startswith(MACHINE_TOKEN_PREFIX): expected_hash = get_machine_token_hash(token) return hmac.compare_digest(expected_hash, stored_hash) return verify_password(token, stored_hash) def is_machine_token_hash(stored_hash: str | None) -> bool: return bool(stored_hash and stored_hash.startswith(MACHINE_TOKEN_PREFIX)) def decode_token(token: str) -> Optional[str]: """Decode JWT token and return subject""" try: payload = jwt.decode( token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM] ) return payload.get("sub") except JWTError: return None