Files
sasa-membership/backend/app/api/v1/auth.py
2025-11-10 16:07:22 +00:00

247 lines
7.6 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from datetime import datetime, timedelta
from typing import List
import uuid
from ...core.database import get_db
from ...core.security import verify_password, get_password_hash, create_access_token
from ...models.models import User, UserRole, PasswordResetToken
from ...schemas import (
UserCreate, UserResponse, Token, LoginRequest, MessageResponse,
ForgotPasswordRequest, ResetPasswordRequest, ChangePasswordRequest
)
from ...services.email_service import email_service
from ...api.dependencies import get_current_active_user
router = APIRouter()
@router.post("/register", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
async def register(
user_data: UserCreate,
db: Session = Depends(get_db)
):
"""Register a new user"""
# Check if user already exists
existing_user = db.query(User).filter(User.email == user_data.email).first()
if existing_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered"
)
# Create new user
hashed_password = get_password_hash(user_data.password)
db_user = User(
email=user_data.email,
hashed_password=hashed_password,
first_name=user_data.first_name,
last_name=user_data.last_name,
phone=user_data.phone,
address=user_data.address,
role=UserRole.MEMBER
)
db.add(db_user)
db.commit()
db.refresh(db_user)
# Send welcome email (non-blocking, ignore errors)
try:
await email_service.send_welcome_email(
to_email=db_user.email,
first_name=db_user.first_name,
db=db
)
except Exception as e:
# Log error but don't fail registration
print(f"Failed to send welcome email: {e}")
return db_user
@router.post("/login", response_model=Token)
async def login(
form_data: OAuth2PasswordRequestForm = Depends(),
db: Session = Depends(get_db)
):
"""Login with email and password"""
# Find user
user = db.query(User).filter(User.email == form_data.username).first()
if not user or not verify_password(form_data.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Inactive user account"
)
# Update last login
user.last_login = datetime.utcnow()
db.commit()
# Create access token
access_token = create_access_token(subject=user.id)
return {
"access_token": access_token,
"token_type": "bearer"
}
@router.post("/login-json", response_model=Token)
async def login_json(
login_data: LoginRequest,
db: Session = Depends(get_db)
):
"""Login with JSON body (email and password)"""
# Find user
user = db.query(User).filter(User.email == login_data.email).first()
if not user or not verify_password(login_data.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Inactive user account"
)
# Update last login
user.last_login = datetime.utcnow()
db.commit()
# Create access token
access_token = create_access_token(subject=user.id)
return {
"access_token": access_token,
"token_type": "bearer"
}
@router.post("/forgot-password", response_model=MessageResponse)
async def forgot_password(
request: ForgotPasswordRequest,
db: Session = Depends(get_db)
):
"""Request password reset for a user"""
# Find user by email
user = db.query(User).filter(User.email == request.email).first()
if not user or not user.is_active:
# Don't reveal if email exists or not for security
return {"message": "If an account with this email exists, a password reset link has been sent."}
# Invalidate any existing reset tokens for this user
db.query(PasswordResetToken).filter(
PasswordResetToken.user_id == user.id,
PasswordResetToken.used == False,
PasswordResetToken.expires_at > datetime.utcnow()
).update({"used": True})
# Generate new reset token
reset_token = str(uuid.uuid4())
expires_at = datetime.utcnow() + timedelta(hours=1) # Token expires in 1 hour
# Create password reset token
db_token = PasswordResetToken(
user_id=user.id,
token=reset_token,
expires_at=expires_at,
used=False
)
db.add(db_token)
db.commit()
# Send password reset email (non-blocking, ignore errors)
try:
await email_service.send_password_reset_email(
to_email=user.email,
first_name=user.first_name,
reset_token=reset_token,
db=db
)
except Exception as e:
# Log error but don't fail the request
print(f"Failed to send password reset email: {e}")
return {"message": "If an account with this email exists, a password reset link has been sent."}
@router.post("/reset-password", response_model=MessageResponse)
async def reset_password(
request: ResetPasswordRequest,
db: Session = Depends(get_db)
):
"""Reset password using reset token"""
# Find valid reset token
reset_token = db.query(PasswordResetToken).filter(
PasswordResetToken.token == request.token,
PasswordResetToken.used == False,
PasswordResetToken.expires_at > datetime.utcnow()
).first()
if not reset_token:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid or expired reset token"
)
# Get the user
user = db.query(User).filter(User.id == reset_token.user_id).first()
if not user or not user.is_active:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid reset token"
)
# Update password
hashed_password = get_password_hash(request.new_password)
user.hashed_password = hashed_password
user.updated_at = datetime.utcnow()
# Mark token as used
reset_token.used = True
db.commit()
return {"message": "Password has been reset successfully. You can now log in with your new password."}
@router.post("/change-password", response_model=MessageResponse)
async def change_password(
request: ChangePasswordRequest,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""Change password for authenticated user"""
# Verify current password
if not verify_password(request.current_password, current_user.hashed_password):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Current password is incorrect"
)
# Update password
hashed_password = get_password_hash(request.new_password)
current_user.hashed_password = hashed_password
current_user.updated_at = datetime.utcnow()
db.commit()
return {"message": "Password has been changed successfully."}