forked from jamesp/sasa-membership
632e66e21d
- Add configurable profile questions with conditional visibility, admin-only fields, user answers, and seeded onboarding/volunteer questions
- Add admin UI for managing profile questions and member profile answers
- Add volunteer level/profile data support across backend schemas, models, API, and migration
- Update dashboard/profile UI, super admin menu, membership service types, and related styling
- Add privacy policy, terms of service, cookie notice, and footer links
- Add frontend Vitest coverage for profile question logic
- Add backend pytest coverage for profile answer normalization and validation
- Update restart.sh to build, run frontend/backend unit tests, and restart only after tests pass
- Refresh README, quickstart, project structure, instructions, and Square docs to match current app features
- Protect feature flag reload behind super-admin access
- Restrict admin-triggered password resets so admins can only reset member accounts
- Replace email template HTML preview rendering with escaped text preview
- Update docs for feature flag reload access, password reset scope, and email template preview safety
-- test user questions are also made by AI and not very useful. but i didn't know what to put there so its good enough for a test
745 lines
25 KiB
Python
745 lines
25 KiB
Python
import json
|
|
from datetime import date, datetime, timedelta
|
|
from typing import Any, List, Optional
|
|
import uuid
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, status
|
|
from sqlalchemy.orm import Session
|
|
|
|
from ...core.database import get_db
|
|
from ...models.models import ProfileQuestion, User, UserProfileAnswer, UserRole, PasswordResetToken
|
|
from ...schemas import (
|
|
MessageResponse,
|
|
ProfileAnswersUpdateRequest,
|
|
ProfileQuestionCreate,
|
|
ProfileQuestionForUser,
|
|
ProfileQuestionResponse,
|
|
ProfileQuestionUpdate,
|
|
UserResponse,
|
|
UserUpdate,
|
|
)
|
|
from ...api.dependencies import get_current_active_user, get_admin_user
|
|
from ...services.email_service import email_service
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
def _parse_options(options_json: Optional[str]) -> list[dict[str, str]]:
|
|
if not options_json:
|
|
return []
|
|
try:
|
|
parsed = json.loads(options_json)
|
|
except (TypeError, json.JSONDecodeError):
|
|
return []
|
|
|
|
if not isinstance(parsed, list):
|
|
return []
|
|
|
|
normalized: list[dict[str, str]] = []
|
|
for item in parsed:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
label = str(item.get("label", "")).strip()
|
|
value = str(item.get("value", "")).strip()
|
|
if label and value:
|
|
normalized.append({"label": label, "value": value})
|
|
return normalized
|
|
|
|
|
|
def _serialize_options(options: Optional[list[Any]]) -> Optional[str]:
|
|
if not options:
|
|
return None
|
|
normalized = []
|
|
for item in options:
|
|
data = item.model_dump() if hasattr(item, "model_dump") else item
|
|
normalized.append({"label": str(data["label"]), "value": str(data["value"])})
|
|
return json.dumps(normalized)
|
|
|
|
|
|
def _normalize_answer_value(question: ProfileQuestion, value: Any) -> Optional[str]:
|
|
if value is None:
|
|
return None
|
|
|
|
if isinstance(value, str) and value.strip() == "":
|
|
return None
|
|
|
|
input_type = question.input_type
|
|
|
|
if input_type == "boolean":
|
|
if isinstance(value, bool):
|
|
return "true" if value else "false"
|
|
|
|
text = str(value).strip().lower()
|
|
if text in {"true", "1", "yes", "y"}:
|
|
return "true"
|
|
if text in {"false", "0", "no", "n"}:
|
|
return "false"
|
|
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Invalid boolean answer for question '{question.key}'"
|
|
)
|
|
|
|
if input_type == "number":
|
|
try:
|
|
number = float(value)
|
|
return str(int(number)) if number.is_integer() else str(number)
|
|
except (TypeError, ValueError):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Invalid number answer for question '{question.key}'"
|
|
)
|
|
|
|
if input_type == "date":
|
|
if isinstance(value, datetime):
|
|
return value.date().isoformat()
|
|
if isinstance(value, date):
|
|
return value.isoformat()
|
|
|
|
text = str(value).strip()
|
|
try:
|
|
parsed = datetime.strptime(text, "%Y-%m-%d")
|
|
return parsed.date().isoformat()
|
|
except ValueError:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Invalid date answer for question '{question.key}'. Use YYYY-MM-DD"
|
|
)
|
|
|
|
if input_type == "select":
|
|
selected = str(value).strip()
|
|
option_values = {opt["value"] for opt in _parse_options(question.options_json)}
|
|
if selected not in option_values:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Invalid selection for question '{question.key}'"
|
|
)
|
|
return selected
|
|
|
|
return str(value).strip()
|
|
|
|
|
|
def _deserialize_answer_value(question: ProfileQuestion, value_text: Optional[str]) -> Any:
|
|
if value_text is None:
|
|
return None
|
|
|
|
if question.input_type == "boolean":
|
|
return value_text.lower() == "true"
|
|
|
|
if question.input_type == "number":
|
|
try:
|
|
number = float(value_text)
|
|
return int(number) if number.is_integer() else number
|
|
except ValueError:
|
|
return value_text
|
|
|
|
return value_text
|
|
|
|
|
|
def _validate_question_dependencies(
|
|
db: Session,
|
|
depends_on_question_id: Optional[int],
|
|
depends_on_value: Optional[str],
|
|
current_question_id: Optional[int] = None,
|
|
) -> None:
|
|
if depends_on_question_id is None:
|
|
if depends_on_value is not None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="depends_on_value requires depends_on_question_id"
|
|
)
|
|
return
|
|
|
|
dependent_question = db.query(ProfileQuestion).filter(ProfileQuestion.id == depends_on_question_id).first()
|
|
if not dependent_question:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="depends_on_question_id does not exist"
|
|
)
|
|
|
|
if current_question_id is not None and current_question_id == depends_on_question_id:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="A question cannot depend on itself"
|
|
)
|
|
|
|
|
|
def _normalize_volunteer_level(value: Optional[str]) -> Optional[str]:
|
|
if value is None:
|
|
return None
|
|
|
|
normalized = str(value).strip().lower()
|
|
if normalized == "":
|
|
return None
|
|
|
|
if normalized in {"yes", "true", "1"}:
|
|
return "yes"
|
|
if normalized in {"no", "false", "0"}:
|
|
return "no"
|
|
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Volunteer flag must be yes or no"
|
|
)
|
|
|
|
|
|
@router.get("/me", response_model=UserResponse)
|
|
async def get_current_user_profile(
|
|
current_user: User = Depends(get_current_active_user)
|
|
):
|
|
"""Get current user's profile"""
|
|
return current_user
|
|
|
|
|
|
@router.put("/me", response_model=UserResponse)
|
|
async def update_current_user_profile(
|
|
user_update: UserUpdate,
|
|
current_user: User = Depends(get_current_active_user),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""Update current user's profile"""
|
|
update_data = user_update.model_dump(exclude_unset=True)
|
|
|
|
# Prevent privilege and volunteer-level edits through self-service profile endpoint.
|
|
update_data.pop("role", None)
|
|
update_data.pop("volunteer_level", None)
|
|
|
|
if "email" in update_data and update_data["email"] != current_user.email:
|
|
existing_user = db.query(User).filter(User.email == update_data["email"]).first()
|
|
if existing_user:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Email already registered"
|
|
)
|
|
|
|
for field, value in update_data.items():
|
|
setattr(current_user, field, value)
|
|
|
|
db.commit()
|
|
db.refresh(current_user)
|
|
|
|
return current_user
|
|
|
|
|
|
@router.get("/me/profile-questions", response_model=List[ProfileQuestionForUser])
|
|
async def list_my_profile_questions(
|
|
current_user: User = Depends(get_current_active_user),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
questions = db.query(ProfileQuestion).filter(ProfileQuestion.is_active == True).order_by(ProfileQuestion.display_order.asc(), ProfileQuestion.id.asc()).all()
|
|
|
|
answers = db.query(UserProfileAnswer).filter(UserProfileAnswer.user_id == current_user.id).all()
|
|
answers_by_question = {answer.question_id: answer for answer in answers}
|
|
|
|
response: list[ProfileQuestionForUser] = []
|
|
for question in questions:
|
|
user_answer = answers_by_question.get(question.id)
|
|
can_edit = (not question.admin_only_edit) or (current_user.role in [UserRole.ADMIN, UserRole.SUPER_ADMIN])
|
|
|
|
response.append(
|
|
ProfileQuestionForUser(
|
|
id=question.id,
|
|
key=question.key,
|
|
label=question.label,
|
|
help_text=question.help_text,
|
|
input_type=question.input_type,
|
|
placeholder=question.placeholder,
|
|
options=_parse_options(question.options_json),
|
|
is_required=question.is_required,
|
|
is_active=question.is_active,
|
|
admin_only_edit=question.admin_only_edit,
|
|
display_order=question.display_order,
|
|
depends_on_question_id=question.depends_on_question_id,
|
|
depends_on_value=question.depends_on_value,
|
|
created_at=question.created_at,
|
|
updated_at=question.updated_at,
|
|
answer=_deserialize_answer_value(question, user_answer.value_text if user_answer else None),
|
|
can_edit=can_edit,
|
|
)
|
|
)
|
|
|
|
return response
|
|
|
|
|
|
@router.put("/me/profile-answers", response_model=MessageResponse)
|
|
async def update_my_profile_answers(
|
|
payload: ProfileAnswersUpdateRequest,
|
|
current_user: User = Depends(get_current_active_user),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
if not payload.answers:
|
|
return {"message": "No changes submitted"}
|
|
|
|
question_ids = {item.question_id for item in payload.answers}
|
|
questions = db.query(ProfileQuestion).filter(ProfileQuestion.id.in_(question_ids), ProfileQuestion.is_active == True).all()
|
|
questions_by_id = {question.id: question for question in questions}
|
|
|
|
missing_ids = [q_id for q_id in question_ids if q_id not in questions_by_id]
|
|
if missing_ids:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Questions not found: {missing_ids}"
|
|
)
|
|
|
|
for item in payload.answers:
|
|
question = questions_by_id[item.question_id]
|
|
if question.admin_only_edit:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail=f"Question '{question.label}' can only be changed by admins"
|
|
)
|
|
|
|
normalized_value = _normalize_answer_value(question, item.value)
|
|
|
|
answer = db.query(UserProfileAnswer).filter(
|
|
UserProfileAnswer.user_id == current_user.id,
|
|
UserProfileAnswer.question_id == question.id
|
|
).first()
|
|
|
|
if normalized_value is None:
|
|
if answer:
|
|
db.delete(answer)
|
|
continue
|
|
|
|
if answer:
|
|
answer.value_text = normalized_value
|
|
answer.updated_by_user_id = current_user.id
|
|
else:
|
|
db.add(UserProfileAnswer(
|
|
user_id=current_user.id,
|
|
question_id=question.id,
|
|
value_text=normalized_value,
|
|
updated_by_user_id=current_user.id,
|
|
))
|
|
|
|
db.commit()
|
|
return {"message": "Profile answers updated successfully"}
|
|
|
|
|
|
@router.get("/", response_model=List[UserResponse])
|
|
async def list_users(
|
|
skip: int = 0,
|
|
limit: int = 100,
|
|
current_user: User = Depends(get_admin_user),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""List all users (admin only)"""
|
|
users = db.query(User).offset(skip).limit(limit).all()
|
|
return users
|
|
|
|
|
|
@router.get("/admin/profile-questions", response_model=List[ProfileQuestionResponse])
|
|
async def list_profile_questions_admin(
|
|
include_inactive: bool = True,
|
|
current_user: User = Depends(get_admin_user),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
query = db.query(ProfileQuestion)
|
|
if not include_inactive:
|
|
query = query.filter(ProfileQuestion.is_active == True)
|
|
|
|
questions = query.order_by(ProfileQuestion.display_order.asc(), ProfileQuestion.id.asc()).all()
|
|
|
|
return [
|
|
ProfileQuestionResponse(
|
|
id=question.id,
|
|
key=question.key,
|
|
label=question.label,
|
|
help_text=question.help_text,
|
|
input_type=question.input_type,
|
|
placeholder=question.placeholder,
|
|
options=_parse_options(question.options_json),
|
|
is_required=question.is_required,
|
|
is_active=question.is_active,
|
|
admin_only_edit=question.admin_only_edit,
|
|
display_order=question.display_order,
|
|
depends_on_question_id=question.depends_on_question_id,
|
|
depends_on_value=question.depends_on_value,
|
|
created_at=question.created_at,
|
|
updated_at=question.updated_at,
|
|
)
|
|
for question in questions
|
|
]
|
|
|
|
|
|
@router.post("/admin/profile-questions", response_model=ProfileQuestionResponse)
|
|
async def create_profile_question_admin(
|
|
payload: ProfileQuestionCreate,
|
|
current_user: User = Depends(get_admin_user),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
if payload.input_type == "select" and not payload.options:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Select questions require options"
|
|
)
|
|
|
|
_validate_question_dependencies(db, payload.depends_on_question_id, payload.depends_on_value)
|
|
|
|
existing = db.query(ProfileQuestion).filter(ProfileQuestion.key == payload.key).first()
|
|
if existing:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Question key already exists"
|
|
)
|
|
|
|
question = ProfileQuestion(
|
|
key=payload.key,
|
|
label=payload.label,
|
|
help_text=payload.help_text,
|
|
input_type=payload.input_type,
|
|
placeholder=payload.placeholder,
|
|
options_json=_serialize_options(payload.options),
|
|
is_required=payload.is_required,
|
|
is_active=payload.is_active,
|
|
admin_only_edit=payload.admin_only_edit,
|
|
display_order=payload.display_order,
|
|
depends_on_question_id=payload.depends_on_question_id,
|
|
depends_on_value=payload.depends_on_value,
|
|
)
|
|
|
|
db.add(question)
|
|
db.commit()
|
|
db.refresh(question)
|
|
|
|
return ProfileQuestionResponse(
|
|
id=question.id,
|
|
key=question.key,
|
|
label=question.label,
|
|
help_text=question.help_text,
|
|
input_type=question.input_type,
|
|
placeholder=question.placeholder,
|
|
options=_parse_options(question.options_json),
|
|
is_required=question.is_required,
|
|
is_active=question.is_active,
|
|
admin_only_edit=question.admin_only_edit,
|
|
display_order=question.display_order,
|
|
depends_on_question_id=question.depends_on_question_id,
|
|
depends_on_value=question.depends_on_value,
|
|
created_at=question.created_at,
|
|
updated_at=question.updated_at,
|
|
)
|
|
|
|
|
|
@router.put("/admin/profile-questions/{question_id}", response_model=ProfileQuestionResponse)
|
|
async def update_profile_question_admin(
|
|
question_id: int,
|
|
payload: ProfileQuestionUpdate,
|
|
current_user: User = Depends(get_admin_user),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
question = db.query(ProfileQuestion).filter(ProfileQuestion.id == question_id).first()
|
|
if not question:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Question not found"
|
|
)
|
|
|
|
update_data = payload.model_dump(exclude_unset=True)
|
|
|
|
if "key" in update_data and update_data["key"] != question.key:
|
|
existing = db.query(ProfileQuestion).filter(ProfileQuestion.key == update_data["key"]).first()
|
|
if existing:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Question key already exists"
|
|
)
|
|
|
|
input_type = update_data.get("input_type", question.input_type)
|
|
options = update_data.get("options")
|
|
options_to_validate = options if options is not None else _parse_options(question.options_json)
|
|
if input_type == "select" and not options_to_validate:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Select questions require options"
|
|
)
|
|
|
|
depends_on_question_id = update_data.get("depends_on_question_id", question.depends_on_question_id)
|
|
depends_on_value = update_data.get("depends_on_value", question.depends_on_value)
|
|
_validate_question_dependencies(db, depends_on_question_id, depends_on_value, current_question_id=question.id)
|
|
|
|
for field, value in update_data.items():
|
|
if field == "options":
|
|
question.options_json = _serialize_options(value)
|
|
else:
|
|
setattr(question, field, value)
|
|
|
|
db.commit()
|
|
db.refresh(question)
|
|
|
|
return ProfileQuestionResponse(
|
|
id=question.id,
|
|
key=question.key,
|
|
label=question.label,
|
|
help_text=question.help_text,
|
|
input_type=question.input_type,
|
|
placeholder=question.placeholder,
|
|
options=_parse_options(question.options_json),
|
|
is_required=question.is_required,
|
|
is_active=question.is_active,
|
|
admin_only_edit=question.admin_only_edit,
|
|
display_order=question.display_order,
|
|
depends_on_question_id=question.depends_on_question_id,
|
|
depends_on_value=question.depends_on_value,
|
|
created_at=question.created_at,
|
|
updated_at=question.updated_at,
|
|
)
|
|
|
|
|
|
@router.delete("/admin/profile-questions/{question_id}", response_model=MessageResponse)
|
|
async def deactivate_profile_question_admin(
|
|
question_id: int,
|
|
current_user: User = Depends(get_admin_user),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
question = db.query(ProfileQuestion).filter(ProfileQuestion.id == question_id).first()
|
|
if not question:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Question not found"
|
|
)
|
|
|
|
question.is_active = False
|
|
db.commit()
|
|
|
|
return {"message": "Question deactivated successfully"}
|
|
|
|
|
|
@router.get("/admin/users/{user_id}/profile-answers", response_model=List[ProfileQuestionForUser])
|
|
async def get_user_profile_answers_admin(
|
|
user_id: int,
|
|
current_user: User = Depends(get_admin_user),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
user = db.query(User).filter(User.id == user_id).first()
|
|
if not user:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="User not found"
|
|
)
|
|
|
|
questions = db.query(ProfileQuestion).order_by(ProfileQuestion.display_order.asc(), ProfileQuestion.id.asc()).all()
|
|
answers = db.query(UserProfileAnswer).filter(UserProfileAnswer.user_id == user_id).all()
|
|
answers_by_question = {answer.question_id: answer for answer in answers}
|
|
|
|
return [
|
|
ProfileQuestionForUser(
|
|
id=question.id,
|
|
key=question.key,
|
|
label=question.label,
|
|
help_text=question.help_text,
|
|
input_type=question.input_type,
|
|
placeholder=question.placeholder,
|
|
options=_parse_options(question.options_json),
|
|
is_required=question.is_required,
|
|
is_active=question.is_active,
|
|
admin_only_edit=question.admin_only_edit,
|
|
display_order=question.display_order,
|
|
depends_on_question_id=question.depends_on_question_id,
|
|
depends_on_value=question.depends_on_value,
|
|
created_at=question.created_at,
|
|
updated_at=question.updated_at,
|
|
answer=_deserialize_answer_value(question, answers_by_question.get(question.id).value_text if answers_by_question.get(question.id) else None),
|
|
can_edit=True,
|
|
)
|
|
for question in questions
|
|
]
|
|
|
|
|
|
@router.put("/admin/users/{user_id}/profile-answers", response_model=MessageResponse)
|
|
async def update_user_profile_answers_admin(
|
|
user_id: int,
|
|
payload: ProfileAnswersUpdateRequest,
|
|
current_user: User = Depends(get_admin_user),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
user = db.query(User).filter(User.id == user_id).first()
|
|
if not user:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="User not found"
|
|
)
|
|
|
|
if not payload.answers:
|
|
return {"message": "No changes submitted"}
|
|
|
|
question_ids = {item.question_id for item in payload.answers}
|
|
questions = db.query(ProfileQuestion).filter(ProfileQuestion.id.in_(question_ids)).all()
|
|
questions_by_id = {question.id: question for question in questions}
|
|
|
|
missing_ids = [q_id for q_id in question_ids if q_id not in questions_by_id]
|
|
if missing_ids:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Questions not found: {missing_ids}"
|
|
)
|
|
|
|
for item in payload.answers:
|
|
question = questions_by_id[item.question_id]
|
|
normalized_value = _normalize_answer_value(question, item.value)
|
|
|
|
answer = db.query(UserProfileAnswer).filter(
|
|
UserProfileAnswer.user_id == user_id,
|
|
UserProfileAnswer.question_id == question.id
|
|
).first()
|
|
|
|
if normalized_value is None:
|
|
if answer:
|
|
db.delete(answer)
|
|
continue
|
|
|
|
if answer:
|
|
answer.value_text = normalized_value
|
|
answer.updated_by_user_id = current_user.id
|
|
else:
|
|
db.add(UserProfileAnswer(
|
|
user_id=user_id,
|
|
question_id=question.id,
|
|
value_text=normalized_value,
|
|
updated_by_user_id=current_user.id,
|
|
))
|
|
|
|
db.commit()
|
|
return {"message": "User profile answers updated successfully"}
|
|
|
|
|
|
@router.get("/{user_id}", response_model=UserResponse)
|
|
async def get_user(
|
|
user_id: int,
|
|
current_user: User = Depends(get_admin_user),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""Get user by ID (admin only)"""
|
|
user = db.query(User).filter(User.id == user_id).first()
|
|
if not user:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="User not found"
|
|
)
|
|
return user
|
|
|
|
|
|
@router.put("/{user_id}", response_model=UserResponse)
|
|
async def update_user(
|
|
user_id: int,
|
|
user_update: UserUpdate,
|
|
current_user: User = Depends(get_admin_user),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""Update user by ID (admin only)"""
|
|
user = db.query(User).filter(User.id == user_id).first()
|
|
if not user:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="User not found"
|
|
)
|
|
|
|
update_data = user_update.model_dump(exclude_unset=True)
|
|
|
|
if "email" in update_data and update_data["email"] != user.email:
|
|
existing_user = db.query(User).filter(User.email == update_data["email"]).first()
|
|
if existing_user:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Email already registered"
|
|
)
|
|
|
|
if "role" in update_data and update_data["role"] == UserRole.SUPER_ADMIN and current_user.role != UserRole.SUPER_ADMIN:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Only super admins can assign super admin role"
|
|
)
|
|
|
|
if "volunteer_level" in update_data:
|
|
update_data["volunteer_level"] = _normalize_volunteer_level(update_data["volunteer_level"])
|
|
|
|
for field, value in update_data.items():
|
|
setattr(user, field, value)
|
|
|
|
db.commit()
|
|
db.refresh(user)
|
|
|
|
return user
|
|
|
|
|
|
@router.post("/{user_id}/send-password-reset", response_model=MessageResponse)
|
|
async def send_user_password_reset(
|
|
user_id: int,
|
|
current_user: User = Depends(get_admin_user),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""Send a one-time password reset link email for a user."""
|
|
user = db.query(User).filter(User.id == user_id).first()
|
|
if not user:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="User not found"
|
|
)
|
|
|
|
if user.role in [UserRole.ADMIN, UserRole.SUPER_ADMIN] and current_user.role != UserRole.SUPER_ADMIN:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Only super admins can send password reset emails for admin users"
|
|
)
|
|
|
|
if not user.is_active:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Cannot reset password for inactive user"
|
|
)
|
|
|
|
db.query(PasswordResetToken).filter(
|
|
PasswordResetToken.user_id == user.id,
|
|
PasswordResetToken.used == False,
|
|
PasswordResetToken.expires_at > datetime.utcnow()
|
|
).update({"used": True})
|
|
|
|
reset_token = str(uuid.uuid4())
|
|
expires_at = datetime.utcnow() + timedelta(hours=1)
|
|
|
|
db_token = PasswordResetToken(
|
|
user_id=user.id,
|
|
token=reset_token,
|
|
expires_at=expires_at,
|
|
used=False
|
|
)
|
|
|
|
db.add(db_token)
|
|
db.commit()
|
|
|
|
try:
|
|
await email_service.send_password_reset_email(
|
|
to_email=user.email,
|
|
first_name=user.first_name,
|
|
reset_token=reset_token,
|
|
db=db
|
|
)
|
|
except Exception as exc:
|
|
print(f"Failed to send admin password reset email: {exc}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail="Failed to send reset email"
|
|
)
|
|
|
|
return {"message": "One-time password reset email sent successfully"}
|
|
|
|
|
|
@router.delete("/{user_id}", response_model=MessageResponse)
|
|
async def delete_user(
|
|
user_id: int,
|
|
current_user: User = Depends(get_admin_user),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""Delete user (admin only)"""
|
|
user = db.query(User).filter(User.id == user_id).first()
|
|
if not user:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="User not found"
|
|
)
|
|
|
|
db.delete(user)
|
|
db.commit()
|
|
|
|
return {"message": "User deleted successfully"}
|