""" Mailing List Management API FastAPI-based REST API for managing mailing lists and members """ from fastapi import FastAPI, HTTPException, Depends, Header, status, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from pydantic import BaseModel, EmailStr from typing import List, Optional, Annotated import mysql.connector from mysql.connector import Error import os import csv import io from contextlib import contextmanager from datetime import datetime, timedelta import secrets import bcrypt from jose import JWTError, jwt from passlib.context import CryptContext from enum import Enum # Configuration API_TOKEN = os.getenv('API_TOKEN', 'change-this-token') # Keep for backward compatibility during transition JWT_SECRET_KEY = os.getenv('JWT_SECRET_KEY', 'your-secret-key-change-this-in-production') JWT_ALGORITHM = "HS256" JWT_ACCESS_TOKEN_EXPIRE_MINUTES = 30 SESSION_EXPIRE_HOURS = 24 MYSQL_HOST = os.getenv('MYSQL_HOST', 'mysql') MYSQL_PORT = int(os.getenv('MYSQL_PORT', 3306)) MYSQL_DATABASE = os.getenv('MYSQL_DATABASE', 'maillist') MYSQL_USER = os.getenv('MYSQL_USER', 'maillist') MYSQL_PASSWORD = os.getenv('MYSQL_PASSWORD', '') # Password hashing pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") # FastAPI app app = FastAPI( title="Mailing List Manager API", description="API for managing mailing lists and members", version="1.0.0" ) # Add CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], # In production, specify your frontend domain allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) security = HTTPBearer() # Database connection @contextmanager def get_db(): """Database connection context manager""" connection = None try: connection = mysql.connector.connect( host=MYSQL_HOST, port=MYSQL_PORT, database=MYSQL_DATABASE, user=MYSQL_USER, password=MYSQL_PASSWORD ) yield connection except Error as e: raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") finally: if connection and connection.is_connected(): connection.close() # Role-based access control class UserRole(str, Enum): ADMINISTRATOR = "administrator" OPERATOR = "operator" READ_ONLY = "read-only" class CurrentUser(BaseModel): user_id: int username: str role: UserRole active: bool # Authentication and authorization def verify_password(plain_password: str, hashed_password: str) -> bool: """Verify a password against its hash""" return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password: str) -> str: """Hash a password""" return pwd_context.hash(password) def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): """Create a JWT access token""" to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=15) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, JWT_SECRET_KEY, algorithm=JWT_ALGORITHM) return encoded_jwt def create_session_id() -> str: """Create a secure session ID""" return secrets.token_urlsafe(32) async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)) -> CurrentUser: """Get current authenticated user from JWT token""" credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: # Check if it's the old API token (for backward compatibility during transition) if credentials.credentials == API_TOKEN: # Return a fake admin user for legacy API token return CurrentUser( user_id=0, username="legacy_admin", role=UserRole.ADMINISTRATOR, active=True ) # Try to decode JWT token payload = jwt.decode(credentials.credentials, JWT_SECRET_KEY, algorithms=[JWT_ALGORITHM]) username: str = payload.get("sub") if username is None: raise credentials_exception except JWTError: raise credentials_exception # Get user from database with get_db() as conn: cursor = conn.cursor(dictionary=True) cursor.execute("SELECT * FROM users WHERE username = %s AND active = 1", (username,)) user = cursor.fetchone() cursor.close() if user is None: raise credentials_exception return CurrentUser( user_id=user["user_id"], username=user["username"], role=UserRole(user["role"]), active=user["active"] ) def require_role(required_roles: List[UserRole]): """Decorator factory for role-based access control""" def role_checker(current_user: CurrentUser = Depends(get_current_user)) -> CurrentUser: if current_user.role not in required_roles: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Insufficient permissions. Required roles: {[role.value for role in required_roles]}" ) return current_user return role_checker # Convenience functions for common role requirements def require_admin() -> CurrentUser: return Depends(require_role([UserRole.ADMINISTRATOR])) def require_write_access() -> CurrentUser: return Depends(require_role([UserRole.ADMINISTRATOR, UserRole.OPERATOR])) def require_read_access() -> CurrentUser: return Depends(require_role([UserRole.ADMINISTRATOR, UserRole.OPERATOR, UserRole.READ_ONLY])) # Legacy authentication (for backward compatibility) def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)): """Legacy API token verification - deprecated, use get_current_user instead""" if credentials.credentials != API_TOKEN: raise HTTPException(status_code=401, detail="Invalid authentication token") return credentials.credentials # Pydantic models for authentication class LoginRequest(BaseModel): username: str password: str class TokenResponse(BaseModel): access_token: str token_type: str = "bearer" expires_in: int user: dict class CreateUserRequest(BaseModel): username: str password: str role: UserRole class UpdateUserRequest(BaseModel): password: Optional[str] = None role: Optional[UserRole] = None active: Optional[bool] = None class UserResponse(BaseModel): user_id: int username: str role: UserRole created_at: datetime last_login: Optional[datetime] = None active: bool # Pydantic models for mailing list functionality class MailingList(BaseModel): list_id: Optional[int] = None list_name: str list_email: EmailStr description: Optional[str] = None active: bool = True class MailingListUpdate(BaseModel): list_name: Optional[str] = None description: Optional[str] = None active: Optional[bool] = None class Member(BaseModel): member_id: Optional[int] = None name: str email: EmailStr active: bool = True class MemberUpdate(BaseModel): name: Optional[str] = None active: Optional[bool] = None class Subscription(BaseModel): list_email: EmailStr member_email: EmailStr active: bool = True class BulkImportRequest(BaseModel): csv_data: str list_ids: List[int] class BulkImportResult(BaseModel): total_rows: int processed_rows: int created_members: int updated_members: int subscriptions_added: int errors: List[str] # Authentication routes @app.post("/auth/login", response_model=TokenResponse) async def login(login_request: LoginRequest, request: Request): """Authenticate user and return JWT token""" with get_db() as conn: cursor = conn.cursor(dictionary=True) cursor.execute("SELECT * FROM users WHERE username = %s AND active = 1", (login_request.username,)) user = cursor.fetchone() if not user or not verify_password(login_request.password, user["password_hash"]): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password" ) # Update last login cursor.execute("UPDATE users SET last_login = %s WHERE user_id = %s", (datetime.utcnow(), user["user_id"])) conn.commit() # Create session record session_id = create_session_id() expires_at = datetime.utcnow() + timedelta(hours=SESSION_EXPIRE_HOURS) client_ip = request.client.host if request.client else None user_agent = request.headers.get("user-agent", "") cursor.execute(""" INSERT INTO user_sessions (session_id, user_id, expires_at, ip_address, user_agent) VALUES (%s, %s, %s, %s, %s) """, (session_id, user["user_id"], expires_at, client_ip, user_agent)) conn.commit() cursor.close() # Create JWT token access_token_expires = timedelta(minutes=JWT_ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={"sub": user["username"]}, expires_delta=access_token_expires ) return TokenResponse( access_token=access_token, expires_in=JWT_ACCESS_TOKEN_EXPIRE_MINUTES * 60, user={ "user_id": user["user_id"], "username": user["username"], "role": user["role"], "active": user["active"] } ) @app.post("/auth/logout") async def logout(current_user: CurrentUser = Depends(get_current_user)): """Logout current user (invalidate sessions)""" if current_user.user_id == 0: # Legacy admin user return {"message": "Logged out successfully"} with get_db() as conn: cursor = conn.cursor() cursor.execute("UPDATE user_sessions SET active = 0 WHERE user_id = %s", (current_user.user_id,)) conn.commit() cursor.close() return {"message": "Logged out successfully"} @app.get("/users", response_model=List[UserResponse]) async def get_users(current_user: CurrentUser = require_admin()): """Get all users (admin only)""" with get_db() as conn: cursor = conn.cursor(dictionary=True) cursor.execute("SELECT user_id, username, role, created_at, last_login, active FROM users ORDER BY username") users = cursor.fetchall() cursor.close() return users @app.post("/users", response_model=UserResponse, status_code=201) async def create_user(user_request: CreateUserRequest, current_user: CurrentUser = require_admin()): """Create a new user (admin only)""" # Check if username already exists with get_db() as conn: cursor = conn.cursor(dictionary=True) cursor.execute("SELECT user_id FROM users WHERE username = %s", (user_request.username,)) existing_user = cursor.fetchone() if existing_user: raise HTTPException(status_code=400, detail="Username already exists") # Hash password and create user password_hash = get_password_hash(user_request.password) cursor.execute(""" INSERT INTO users (username, password_hash, role) VALUES (%s, %s, %s) """, (user_request.username, password_hash, user_request.role.value)) conn.commit() user_id = cursor.lastrowid # Return created user cursor.execute(""" SELECT user_id, username, role, created_at, last_login, active FROM users WHERE user_id = %s """, (user_id,)) new_user = cursor.fetchone() cursor.close() return new_user @app.patch("/users/{user_id}", response_model=UserResponse) async def update_user(user_id: int, updates: UpdateUserRequest, current_user: CurrentUser = require_admin()): """Update a user (admin only)""" with get_db() as conn: cursor = conn.cursor(dictionary=True) # Build update query dynamically update_fields = [] values = [] if updates.password is not None: update_fields.append("password_hash = %s") values.append(get_password_hash(updates.password)) if updates.role is not None: update_fields.append("role = %s") values.append(updates.role.value) if updates.active is not None: update_fields.append("active = %s") values.append(updates.active) if not update_fields: raise HTTPException(status_code=400, detail="No fields to update") values.append(user_id) query = f"UPDATE users SET {', '.join(update_fields)} WHERE user_id = %s" cursor.execute(query, values) conn.commit() # Return updated user cursor.execute(""" SELECT user_id, username, role, created_at, last_login, active FROM users WHERE user_id = %s """, (user_id,)) updated_user = cursor.fetchone() cursor.close() if not updated_user: raise HTTPException(status_code=404, detail="User not found") return updated_user @app.delete("/users/{user_id}", status_code=204) async def delete_user(user_id: int, current_user: CurrentUser = require_admin()): """Delete a user (admin only)""" if user_id == current_user.user_id: raise HTTPException(status_code=400, detail="Cannot delete your own account") with get_db() as conn: cursor = conn.cursor() cursor.execute("DELETE FROM users WHERE user_id = %s", (user_id,)) conn.commit() if cursor.rowcount == 0: raise HTTPException(status_code=404, detail="User not found") cursor.close() @app.get("/auth/me", response_model=dict) async def get_current_user_info(current_user: CurrentUser = Depends(get_current_user)): """Get current user information""" return { "user_id": current_user.user_id, "username": current_user.username, "role": current_user.role.value, "active": current_user.active } # Routes @app.get("/") async def root(): """API information""" return { "name": "Mailing List Manager API", "version": "1.0.0", "status": "running" } @app.get("/health") async def health(): """Health check endpoint""" try: with get_db() as conn: cursor = conn.cursor() cursor.execute("SELECT 1") cursor.fetchone() cursor.close() return {"status": "healthy", "database": "connected"} except Exception as e: raise HTTPException(status_code=503, detail=f"Unhealthy: {str(e)}") # Mailing Lists endpoints @app.get("/lists", response_model=List[MailingList]) async def get_lists(current_user: CurrentUser = require_read_access()): """Get all mailing lists""" with get_db() as conn: cursor = conn.cursor(dictionary=True) cursor.execute("SELECT * FROM lists ORDER BY list_name") lists = cursor.fetchall() cursor.close() return lists @app.get("/lists/{list_id}", response_model=MailingList) async def get_list(list_id: int, current_user: CurrentUser = require_read_access()): """Get a specific mailing list""" with get_db() as conn: cursor = conn.cursor(dictionary=True) cursor.execute("SELECT * FROM lists WHERE list_id = %s", (list_id,)) mailing_list = cursor.fetchone() cursor.close() if not mailing_list: raise HTTPException(status_code=404, detail="List not found") return mailing_list @app.post("/lists", response_model=MailingList, status_code=201) async def create_list(mailing_list: MailingList, current_user: CurrentUser = require_write_access()): """Create a new mailing list""" with get_db() as conn: cursor = conn.cursor() try: cursor.execute( "INSERT INTO lists (list_name, list_email, description, active) VALUES (%s, %s, %s, %s)", (mailing_list.list_name, mailing_list.list_email, mailing_list.description, mailing_list.active) ) conn.commit() mailing_list.list_id = cursor.lastrowid cursor.close() return mailing_list except Error as e: raise HTTPException(status_code=400, detail=f"Failed to create list: {str(e)}") @app.patch("/lists/{list_id}", response_model=MailingList) async def update_list(list_id: int, updates: MailingListUpdate, current_user: CurrentUser = require_write_access()): """Update a mailing list""" with get_db() as conn: cursor = conn.cursor(dictionary=True) # Build update query dynamically update_fields = [] values = [] if updates.list_name is not None: update_fields.append("list_name = %s") values.append(updates.list_name) if updates.description is not None: update_fields.append("description = %s") values.append(updates.description) if updates.active is not None: update_fields.append("active = %s") values.append(updates.active) if not update_fields: raise HTTPException(status_code=400, detail="No fields to update") values.append(list_id) query = f"UPDATE lists SET {', '.join(update_fields)} WHERE list_id = %s" cursor.execute(query, values) conn.commit() # Return updated list cursor.execute("SELECT * FROM lists WHERE list_id = %s", (list_id,)) updated_list = cursor.fetchone() cursor.close() if not updated_list: raise HTTPException(status_code=404, detail="List not found") return updated_list @app.delete("/lists/{list_id}", status_code=204) async def delete_list(list_id: int, current_user: CurrentUser = require_write_access()): """Delete a mailing list""" with get_db() as conn: cursor = conn.cursor() cursor.execute("DELETE FROM lists WHERE list_id = %s", (list_id,)) conn.commit() if cursor.rowcount == 0: raise HTTPException(status_code=404, detail="List not found") cursor.close() # Members endpoints @app.get("/members", response_model=List[Member]) async def get_members(current_user: CurrentUser = require_read_access()): """Get all members""" with get_db() as conn: cursor = conn.cursor(dictionary=True) cursor.execute("SELECT * FROM members ORDER BY name") members = cursor.fetchall() cursor.close() return members @app.get("/members/{member_id}", response_model=Member) async def get_member(member_id: int, current_user: CurrentUser = require_read_access()): """Get a specific member""" with get_db() as conn: cursor = conn.cursor(dictionary=True) cursor.execute("SELECT * FROM members WHERE member_id = %s", (member_id,)) member = cursor.fetchone() cursor.close() if not member: raise HTTPException(status_code=404, detail="Member not found") return member @app.post("/members", response_model=Member, status_code=201) async def create_member(member: Member, current_user: CurrentUser = require_write_access()): """Create a new member""" with get_db() as conn: cursor = conn.cursor() try: cursor.execute( "INSERT INTO members (name, email, active) VALUES (%s, %s, %s)", (member.name, member.email, member.active) ) conn.commit() member.member_id = cursor.lastrowid cursor.close() return member except Error as e: raise HTTPException(status_code=400, detail=f"Failed to create member: {str(e)}") @app.patch("/members/{member_id}", response_model=Member) async def update_member(member_id: int, updates: MemberUpdate, current_user: CurrentUser = require_write_access()): """Update a member""" with get_db() as conn: cursor = conn.cursor(dictionary=True) update_fields = [] values = [] if updates.name is not None: update_fields.append("name = %s") values.append(updates.name) if updates.active is not None: update_fields.append("active = %s") values.append(updates.active) if not update_fields: raise HTTPException(status_code=400, detail="No fields to update") values.append(member_id) query = f"UPDATE members SET {', '.join(update_fields)} WHERE member_id = %s" cursor.execute(query, values) conn.commit() cursor.execute("SELECT * FROM members WHERE member_id = %s", (member_id,)) updated_member = cursor.fetchone() cursor.close() if not updated_member: raise HTTPException(status_code=404, detail="Member not found") return updated_member @app.delete("/members/{member_id}", status_code=204) async def delete_member(member_id: int, current_user: CurrentUser = require_write_access()): """Delete a member""" with get_db() as conn: cursor = conn.cursor() cursor.execute("DELETE FROM members WHERE member_id = %s", (member_id,)) conn.commit() if cursor.rowcount == 0: raise HTTPException(status_code=404, detail="Member not found") cursor.close() # Subscription endpoints @app.get("/lists/{list_id}/members", response_model=List[Member]) async def get_list_members(list_id: int, current_user: CurrentUser = require_read_access()): """Get all members of a specific list""" with get_db() as conn: cursor = conn.cursor(dictionary=True) cursor.execute(""" SELECT m.* FROM members m JOIN list_members lm ON m.member_id = lm.member_id WHERE lm.list_id = %s AND lm.active = 1 ORDER BY m.name """, (list_id,)) members = cursor.fetchall() cursor.close() return members @app.post("/subscriptions", status_code=201) async def subscribe_member(subscription: Subscription, current_user: CurrentUser = require_write_access()): """Subscribe a member to a list""" with get_db() as conn: cursor = conn.cursor() try: # Get list_id and member_id cursor.execute("SELECT list_id FROM lists WHERE list_email = %s", (subscription.list_email,)) list_result = cursor.fetchone() if not list_result: raise HTTPException(status_code=404, detail="List not found") cursor.execute("SELECT member_id FROM members WHERE email = %s", (subscription.member_email,)) member_result = cursor.fetchone() if not member_result: raise HTTPException(status_code=404, detail="Member not found") list_id = list_result[0] member_id = member_result[0] # Insert subscription cursor.execute( "INSERT INTO list_members (list_id, member_id, active) VALUES (%s, %s, %s)", (list_id, member_id, subscription.active) ) conn.commit() cursor.close() return {"message": "Subscription created", "list_email": subscription.list_email, "member_email": subscription.member_email} except Error as e: if "Duplicate entry" in str(e): raise HTTPException(status_code=400, detail="Member already subscribed to this list") raise HTTPException(status_code=400, detail=f"Failed to create subscription: {str(e)}") @app.delete("/subscriptions") async def unsubscribe_member(list_email: EmailStr, member_email: EmailStr, current_user: CurrentUser = require_write_access()): """Unsubscribe a member from a list""" with get_db() as conn: cursor = conn.cursor() cursor.execute(""" DELETE lm FROM list_members lm JOIN lists l ON lm.list_id = l.list_id JOIN members m ON lm.member_id = m.member_id WHERE l.list_email = %s AND m.email = %s """, (list_email, member_email)) conn.commit() if cursor.rowcount == 0: raise HTTPException(status_code=404, detail="Subscription not found") cursor.close() return {"message": "Unsubscribed successfully"} @app.post("/bulk-import", response_model=BulkImportResult) async def bulk_import_members(bulk_request: BulkImportRequest, current_user: CurrentUser = require_write_access()): """Bulk import members from CSV data and subscribe them to specified lists""" result = BulkImportResult( total_rows=0, processed_rows=0, created_members=0, updated_members=0, subscriptions_added=0, errors=[] ) with get_db() as conn: cursor = conn.cursor(dictionary=True) try: # Verify all list_ids exist if bulk_request.list_ids: placeholders = ','.join(['%s'] * len(bulk_request.list_ids)) cursor.execute(f"SELECT list_id FROM lists WHERE list_id IN ({placeholders})", bulk_request.list_ids) existing_lists = [row['list_id'] for row in cursor.fetchall()] invalid_lists = set(bulk_request.list_ids) - set(existing_lists) if invalid_lists: raise HTTPException(status_code=400, detail=f"Invalid list IDs: {list(invalid_lists)}") # Parse CSV data csv_reader = csv.DictReader(io.StringIO(bulk_request.csv_data)) # Validate CSV headers - we need at least Name and Email if not csv_reader.fieldnames or 'Name' not in csv_reader.fieldnames or 'Email' not in csv_reader.fieldnames: raise HTTPException(status_code=400, detail="CSV must contain 'Name' and 'Email' columns") for row_num, row in enumerate(csv_reader, start=1): result.total_rows += 1 try: name = row.get('Name', '').strip() email = row.get('Email', '').strip() # Skip rows without email (name is optional) if not email: result.errors.append(f"Row {row_num}: Missing email address") continue # Basic email validation if '@' not in email or '.' not in email.split('@')[1]: result.errors.append(f"Row {row_num}: Invalid email format: {email}") continue # Use email as name if no name provided if not name: name = email.split('@')[0] # Use the part before @ as a default name # Check if member exists cursor.execute("SELECT member_id FROM members WHERE email = %s", (email,)) existing_member = cursor.fetchone() if existing_member: # Update existing member's name if we have a better name (not auto-generated from email) should_update_name = ( row.get('Name', '').strip() and # We have a name in the CSV name != email.split('@')[0] # It's not the auto-generated name ) if should_update_name: cursor.execute("UPDATE members SET name = %s WHERE member_id = %s", (name, existing_member['member_id'])) if cursor.rowcount > 0: result.updated_members += 1 member_id = existing_member['member_id'] else: # Create new member cursor.execute( "INSERT INTO members (name, email, active) VALUES (%s, %s, %s)", (name, email, True) ) member_id = cursor.lastrowid result.created_members += 1 # Subscribe to selected lists for list_id in bulk_request.list_ids: try: cursor.execute( "INSERT INTO list_members (list_id, member_id, active) VALUES (%s, %s, %s)", (list_id, member_id, True) ) result.subscriptions_added += 1 except Error as e: if "Duplicate entry" in str(e): # Member already subscribed to this list - not an error pass else: result.errors.append(f"Row {row_num}: Failed to subscribe to list {list_id}: {str(e)}") result.processed_rows += 1 except Exception as e: result.errors.append(f"Row {row_num}: {str(e)}") continue # Commit all changes conn.commit() cursor.close() return result except HTTPException: raise except Exception as e: cursor.close() raise HTTPException(status_code=500, detail=f"Bulk import failed: {str(e)}") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)