Files
nathanb d024bf7fa3 stuff changed:
- ui has been made 'kinda better' (after making it worse for a while lol
- ESP rfid readers are now supported [ill upload the code for them in another repo later]
- admin system has been secured a bit better and seems to be working well
2026-05-08 20:46:58 +01:00

746 lines
25 KiB
Python

import json
from datetime import date, datetime, timedelta
from typing import Any, List, Optional
import uuid
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from ...core.database import get_db
from ...core.datetime import utc_now
from ...models.models import ProfileQuestion, User, UserProfileAnswer, UserRole, PasswordResetToken
from ...schemas import (
MessageResponse,
ProfileAnswersUpdateRequest,
ProfileQuestionCreate,
ProfileQuestionForUser,
ProfileQuestionResponse,
ProfileQuestionUpdate,
UserResponse,
UserUpdate,
)
from ...api.dependencies import get_current_active_user, get_admin_user
from ...services.email_service import email_service
router = APIRouter()
def _parse_options(options_json: Optional[str]) -> list[dict[str, str]]:
if not options_json:
return []
try:
parsed = json.loads(options_json)
except (TypeError, json.JSONDecodeError):
return []
if not isinstance(parsed, list):
return []
normalized: list[dict[str, str]] = []
for item in parsed:
if not isinstance(item, dict):
continue
label = str(item.get("label", "")).strip()
value = str(item.get("value", "")).strip()
if label and value:
normalized.append({"label": label, "value": value})
return normalized
def _serialize_options(options: Optional[list[Any]]) -> Optional[str]:
if not options:
return None
normalized = []
for item in options:
data = item.model_dump() if hasattr(item, "model_dump") else item
normalized.append({"label": str(data["label"]), "value": str(data["value"])})
return json.dumps(normalized)
def _normalize_answer_value(question: ProfileQuestion, value: Any) -> Optional[str]:
if value is None:
return None
if isinstance(value, str) and value.strip() == "":
return None
input_type = question.input_type
if input_type == "boolean":
if isinstance(value, bool):
return "true" if value else "false"
text = str(value).strip().lower()
if text in {"true", "1", "yes", "y"}:
return "true"
if text in {"false", "0", "no", "n"}:
return "false"
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid boolean answer for question '{question.key}'"
)
if input_type == "number":
try:
number = float(value)
return str(int(number)) if number.is_integer() else str(number)
except (TypeError, ValueError):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid number answer for question '{question.key}'"
)
if input_type == "date":
if isinstance(value, datetime):
return value.date().isoformat()
if isinstance(value, date):
return value.isoformat()
text = str(value).strip()
try:
parsed = datetime.strptime(text, "%Y-%m-%d")
return parsed.date().isoformat()
except ValueError:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid date answer for question '{question.key}'. Use YYYY-MM-DD"
)
if input_type == "select":
selected = str(value).strip()
option_values = {opt["value"] for opt in _parse_options(question.options_json)}
if selected not in option_values:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid selection for question '{question.key}'"
)
return selected
return str(value).strip()
def _deserialize_answer_value(question: ProfileQuestion, value_text: Optional[str]) -> Any:
if value_text is None:
return None
if question.input_type == "boolean":
return value_text.lower() == "true"
if question.input_type == "number":
try:
number = float(value_text)
return int(number) if number.is_integer() else number
except ValueError:
return value_text
return value_text
def _validate_question_dependencies(
db: Session,
depends_on_question_id: Optional[int],
depends_on_value: Optional[str],
current_question_id: Optional[int] = None,
) -> None:
if depends_on_question_id is None:
if depends_on_value is not None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="depends_on_value requires depends_on_question_id"
)
return
dependent_question = db.query(ProfileQuestion).filter(ProfileQuestion.id == depends_on_question_id).first()
if not dependent_question:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="depends_on_question_id does not exist"
)
if current_question_id is not None and current_question_id == depends_on_question_id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="A question cannot depend on itself"
)
def _normalize_volunteer_level(value: Optional[str]) -> Optional[str]:
if value is None:
return None
normalized = str(value).strip().lower()
if normalized == "":
return None
if normalized in {"yes", "true", "1"}:
return "yes"
if normalized in {"no", "false", "0"}:
return "no"
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Volunteer flag must be yes or no"
)
@router.get("/me", response_model=UserResponse)
async def get_current_user_profile(
current_user: User = Depends(get_current_active_user)
):
"""Get current user's profile"""
return current_user
@router.put("/me", response_model=UserResponse)
async def update_current_user_profile(
user_update: UserUpdate,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""Update current user's profile"""
update_data = user_update.model_dump(exclude_unset=True)
# Prevent privilege and volunteer-level edits through self-service profile endpoint.
update_data.pop("role", None)
update_data.pop("volunteer_level", None)
if "email" in update_data and update_data["email"] != current_user.email:
existing_user = db.query(User).filter(User.email == update_data["email"]).first()
if existing_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered"
)
for field, value in update_data.items():
setattr(current_user, field, value)
db.commit()
db.refresh(current_user)
return current_user
@router.get("/me/profile-questions", response_model=List[ProfileQuestionForUser])
async def list_my_profile_questions(
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
questions = db.query(ProfileQuestion).filter(ProfileQuestion.is_active == True).order_by(ProfileQuestion.display_order.asc(), ProfileQuestion.id.asc()).all()
answers = db.query(UserProfileAnswer).filter(UserProfileAnswer.user_id == current_user.id).all()
answers_by_question = {answer.question_id: answer for answer in answers}
response: list[ProfileQuestionForUser] = []
for question in questions:
user_answer = answers_by_question.get(question.id)
can_edit = (not question.admin_only_edit) or (current_user.role in [UserRole.ADMIN, UserRole.SUPER_ADMIN])
response.append(
ProfileQuestionForUser(
id=question.id,
key=question.key,
label=question.label,
help_text=question.help_text,
input_type=question.input_type,
placeholder=question.placeholder,
options=_parse_options(question.options_json),
is_required=question.is_required,
is_active=question.is_active,
admin_only_edit=question.admin_only_edit,
display_order=question.display_order,
depends_on_question_id=question.depends_on_question_id,
depends_on_value=question.depends_on_value,
created_at=question.created_at,
updated_at=question.updated_at,
answer=_deserialize_answer_value(question, user_answer.value_text if user_answer else None),
can_edit=can_edit,
)
)
return response
@router.put("/me/profile-answers", response_model=MessageResponse)
async def update_my_profile_answers(
payload: ProfileAnswersUpdateRequest,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
if not payload.answers:
return {"message": "No changes submitted"}
question_ids = {item.question_id for item in payload.answers}
questions = db.query(ProfileQuestion).filter(ProfileQuestion.id.in_(question_ids), ProfileQuestion.is_active == True).all()
questions_by_id = {question.id: question for question in questions}
missing_ids = [q_id for q_id in question_ids if q_id not in questions_by_id]
if missing_ids:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Questions not found: {missing_ids}"
)
for item in payload.answers:
question = questions_by_id[item.question_id]
if question.admin_only_edit:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Question '{question.label}' can only be changed by admins"
)
normalized_value = _normalize_answer_value(question, item.value)
answer = db.query(UserProfileAnswer).filter(
UserProfileAnswer.user_id == current_user.id,
UserProfileAnswer.question_id == question.id
).first()
if normalized_value is None:
if answer:
db.delete(answer)
continue
if answer:
answer.value_text = normalized_value
answer.updated_by_user_id = current_user.id
else:
db.add(UserProfileAnswer(
user_id=current_user.id,
question_id=question.id,
value_text=normalized_value,
updated_by_user_id=current_user.id,
))
db.commit()
return {"message": "Profile answers updated successfully"}
@router.get("/", response_model=List[UserResponse])
async def list_users(
skip: int = 0,
limit: int = 100,
current_user: User = Depends(get_admin_user),
db: Session = Depends(get_db)
):
"""List all users (admin only)"""
users = db.query(User).offset(skip).limit(limit).all()
return users
@router.get("/admin/profile-questions", response_model=List[ProfileQuestionResponse])
async def list_profile_questions_admin(
include_inactive: bool = True,
current_user: User = Depends(get_admin_user),
db: Session = Depends(get_db)
):
query = db.query(ProfileQuestion)
if not include_inactive:
query = query.filter(ProfileQuestion.is_active == True)
questions = query.order_by(ProfileQuestion.display_order.asc(), ProfileQuestion.id.asc()).all()
return [
ProfileQuestionResponse(
id=question.id,
key=question.key,
label=question.label,
help_text=question.help_text,
input_type=question.input_type,
placeholder=question.placeholder,
options=_parse_options(question.options_json),
is_required=question.is_required,
is_active=question.is_active,
admin_only_edit=question.admin_only_edit,
display_order=question.display_order,
depends_on_question_id=question.depends_on_question_id,
depends_on_value=question.depends_on_value,
created_at=question.created_at,
updated_at=question.updated_at,
)
for question in questions
]
@router.post("/admin/profile-questions", response_model=ProfileQuestionResponse)
async def create_profile_question_admin(
payload: ProfileQuestionCreate,
current_user: User = Depends(get_admin_user),
db: Session = Depends(get_db)
):
if payload.input_type == "select" and not payload.options:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Select questions require options"
)
_validate_question_dependencies(db, payload.depends_on_question_id, payload.depends_on_value)
existing = db.query(ProfileQuestion).filter(ProfileQuestion.key == payload.key).first()
if existing:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Question key already exists"
)
question = ProfileQuestion(
key=payload.key,
label=payload.label,
help_text=payload.help_text,
input_type=payload.input_type,
placeholder=payload.placeholder,
options_json=_serialize_options(payload.options),
is_required=payload.is_required,
is_active=payload.is_active,
admin_only_edit=payload.admin_only_edit,
display_order=payload.display_order,
depends_on_question_id=payload.depends_on_question_id,
depends_on_value=payload.depends_on_value,
)
db.add(question)
db.commit()
db.refresh(question)
return ProfileQuestionResponse(
id=question.id,
key=question.key,
label=question.label,
help_text=question.help_text,
input_type=question.input_type,
placeholder=question.placeholder,
options=_parse_options(question.options_json),
is_required=question.is_required,
is_active=question.is_active,
admin_only_edit=question.admin_only_edit,
display_order=question.display_order,
depends_on_question_id=question.depends_on_question_id,
depends_on_value=question.depends_on_value,
created_at=question.created_at,
updated_at=question.updated_at,
)
@router.put("/admin/profile-questions/{question_id}", response_model=ProfileQuestionResponse)
async def update_profile_question_admin(
question_id: int,
payload: ProfileQuestionUpdate,
current_user: User = Depends(get_admin_user),
db: Session = Depends(get_db)
):
question = db.query(ProfileQuestion).filter(ProfileQuestion.id == question_id).first()
if not question:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Question not found"
)
update_data = payload.model_dump(exclude_unset=True)
if "key" in update_data and update_data["key"] != question.key:
existing = db.query(ProfileQuestion).filter(ProfileQuestion.key == update_data["key"]).first()
if existing:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Question key already exists"
)
input_type = update_data.get("input_type", question.input_type)
options = update_data.get("options")
options_to_validate = options if options is not None else _parse_options(question.options_json)
if input_type == "select" and not options_to_validate:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Select questions require options"
)
depends_on_question_id = update_data.get("depends_on_question_id", question.depends_on_question_id)
depends_on_value = update_data.get("depends_on_value", question.depends_on_value)
_validate_question_dependencies(db, depends_on_question_id, depends_on_value, current_question_id=question.id)
for field, value in update_data.items():
if field == "options":
question.options_json = _serialize_options(value)
else:
setattr(question, field, value)
db.commit()
db.refresh(question)
return ProfileQuestionResponse(
id=question.id,
key=question.key,
label=question.label,
help_text=question.help_text,
input_type=question.input_type,
placeholder=question.placeholder,
options=_parse_options(question.options_json),
is_required=question.is_required,
is_active=question.is_active,
admin_only_edit=question.admin_only_edit,
display_order=question.display_order,
depends_on_question_id=question.depends_on_question_id,
depends_on_value=question.depends_on_value,
created_at=question.created_at,
updated_at=question.updated_at,
)
@router.delete("/admin/profile-questions/{question_id}", response_model=MessageResponse)
async def deactivate_profile_question_admin(
question_id: int,
current_user: User = Depends(get_admin_user),
db: Session = Depends(get_db)
):
question = db.query(ProfileQuestion).filter(ProfileQuestion.id == question_id).first()
if not question:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Question not found"
)
question.is_active = False
db.commit()
return {"message": "Question deactivated successfully"}
@router.get("/admin/users/{user_id}/profile-answers", response_model=List[ProfileQuestionForUser])
async def get_user_profile_answers_admin(
user_id: int,
current_user: User = Depends(get_admin_user),
db: Session = Depends(get_db)
):
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
questions = db.query(ProfileQuestion).order_by(ProfileQuestion.display_order.asc(), ProfileQuestion.id.asc()).all()
answers = db.query(UserProfileAnswer).filter(UserProfileAnswer.user_id == user_id).all()
answers_by_question = {answer.question_id: answer for answer in answers}
return [
ProfileQuestionForUser(
id=question.id,
key=question.key,
label=question.label,
help_text=question.help_text,
input_type=question.input_type,
placeholder=question.placeholder,
options=_parse_options(question.options_json),
is_required=question.is_required,
is_active=question.is_active,
admin_only_edit=question.admin_only_edit,
display_order=question.display_order,
depends_on_question_id=question.depends_on_question_id,
depends_on_value=question.depends_on_value,
created_at=question.created_at,
updated_at=question.updated_at,
answer=_deserialize_answer_value(question, answers_by_question.get(question.id).value_text if answers_by_question.get(question.id) else None),
can_edit=True,
)
for question in questions
]
@router.put("/admin/users/{user_id}/profile-answers", response_model=MessageResponse)
async def update_user_profile_answers_admin(
user_id: int,
payload: ProfileAnswersUpdateRequest,
current_user: User = Depends(get_admin_user),
db: Session = Depends(get_db)
):
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
if not payload.answers:
return {"message": "No changes submitted"}
question_ids = {item.question_id for item in payload.answers}
questions = db.query(ProfileQuestion).filter(ProfileQuestion.id.in_(question_ids)).all()
questions_by_id = {question.id: question for question in questions}
missing_ids = [q_id for q_id in question_ids if q_id not in questions_by_id]
if missing_ids:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Questions not found: {missing_ids}"
)
for item in payload.answers:
question = questions_by_id[item.question_id]
normalized_value = _normalize_answer_value(question, item.value)
answer = db.query(UserProfileAnswer).filter(
UserProfileAnswer.user_id == user_id,
UserProfileAnswer.question_id == question.id
).first()
if normalized_value is None:
if answer:
db.delete(answer)
continue
if answer:
answer.value_text = normalized_value
answer.updated_by_user_id = current_user.id
else:
db.add(UserProfileAnswer(
user_id=user_id,
question_id=question.id,
value_text=normalized_value,
updated_by_user_id=current_user.id,
))
db.commit()
return {"message": "User profile answers updated successfully"}
@router.get("/{user_id}", response_model=UserResponse)
async def get_user(
user_id: int,
current_user: User = Depends(get_admin_user),
db: Session = Depends(get_db)
):
"""Get user by ID (admin only)"""
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
return user
@router.put("/{user_id}", response_model=UserResponse)
async def update_user(
user_id: int,
user_update: UserUpdate,
current_user: User = Depends(get_admin_user),
db: Session = Depends(get_db)
):
"""Update user by ID (admin only)"""
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
update_data = user_update.model_dump(exclude_unset=True)
if "email" in update_data and update_data["email"] != user.email:
existing_user = db.query(User).filter(User.email == update_data["email"]).first()
if existing_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered"
)
if "role" in update_data and update_data["role"] == UserRole.SUPER_ADMIN and current_user.role != UserRole.SUPER_ADMIN:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only super admins can assign super admin role"
)
if "volunteer_level" in update_data:
update_data["volunteer_level"] = _normalize_volunteer_level(update_data["volunteer_level"])
for field, value in update_data.items():
setattr(user, field, value)
db.commit()
db.refresh(user)
return user
@router.post("/{user_id}/send-password-reset", response_model=MessageResponse)
async def send_user_password_reset(
user_id: int,
current_user: User = Depends(get_admin_user),
db: Session = Depends(get_db)
):
"""Send a one-time password reset link email for a user."""
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
if user.role in [UserRole.ADMIN, UserRole.SUPER_ADMIN] and current_user.role != UserRole.SUPER_ADMIN:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only super admins can send password reset emails for admin users"
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot reset password for inactive user"
)
db.query(PasswordResetToken).filter(
PasswordResetToken.user_id == user.id,
PasswordResetToken.used == False,
PasswordResetToken.expires_at > utc_now()
).update({"used": True})
reset_token = str(uuid.uuid4())
expires_at = utc_now() + timedelta(hours=1)
db_token = PasswordResetToken(
user_id=user.id,
token=reset_token,
expires_at=expires_at,
used=False
)
db.add(db_token)
db.commit()
try:
await email_service.send_password_reset_email(
to_email=user.email,
first_name=user.first_name,
reset_token=reset_token,
db=db
)
except Exception as exc:
print(f"Failed to send admin password reset email: {exc}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to send reset email"
)
return {"message": "One-time password reset email sent successfully"}
@router.delete("/{user_id}", response_model=MessageResponse)
async def delete_user(
user_id: int,
current_user: User = Depends(get_admin_user),
db: Session = Depends(get_db)
):
"""Delete user (admin only)"""
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
db.delete(user)
db.commit()
return {"message": "User deleted successfully"}