Files
sasa-maillist/api/main.py
2025-10-13 19:30:19 +00:00

1123 lines
43 KiB
Python

"""
Mailing List Management API
FastAPI-based REST API for managing mailing lists and members
"""
from fastapi import FastAPI, HTTPException, Depends, Header, status, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from fastapi.responses import PlainTextResponse
from pydantic import BaseModel, EmailStr
from typing import List, Optional, Annotated
import mysql.connector
from mysql.connector import Error
import os
import csv
import io
from contextlib import contextmanager
from datetime import datetime, timedelta
import secrets
import bcrypt
from jose import JWTError, jwt
from passlib.context import CryptContext
from enum import Enum
import json
import base64
from urllib.parse import urlparse
import httpx
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives.serialization import load_pem_public_key
# Configuration
API_TOKEN = os.getenv('API_TOKEN', 'change-this-token') # Keep for backward compatibility during transition
JWT_SECRET_KEY = os.getenv('JWT_SECRET_KEY', 'your-secret-key-change-this-in-production')
JWT_ALGORITHM = "HS256"
JWT_ACCESS_TOKEN_EXPIRE_MINUTES = 30
SESSION_EXPIRE_HOURS = 24
MYSQL_HOST = os.getenv('MYSQL_HOST', 'mysql')
MYSQL_PORT = int(os.getenv('MYSQL_PORT', 3306))
MYSQL_DATABASE = os.getenv('MYSQL_DATABASE', 'maillist')
MYSQL_USER = os.getenv('MYSQL_USER', 'maillist')
MYSQL_PASSWORD = os.getenv('MYSQL_PASSWORD', '')
# Password hashing
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# FastAPI app
app = FastAPI(
title="Mailing List Manager API",
description="API for managing mailing lists and members",
version="1.0.0"
)
# Add CORS middleware
# Get allowed origins from environment or use secure defaults
ALLOWED_ORIGINS = os.getenv('ALLOWED_ORIGINS', 'http://localhost:3000,http://127.0.0.1:3000').split(',')
app.add_middleware(
CORSMiddleware,
allow_origins=ALLOWED_ORIGINS, # Specific origins only
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "PATCH", "DELETE", "OPTIONS"], # Specific methods
allow_headers=["Authorization", "Content-Type", "Accept", "Origin", "X-Requested-With"], # Specific headers
)
security = HTTPBearer()
# Database connection
@contextmanager
def get_db():
"""Database connection context manager"""
connection = None
try:
connection = mysql.connector.connect(
host=MYSQL_HOST,
port=MYSQL_PORT,
database=MYSQL_DATABASE,
user=MYSQL_USER,
password=MYSQL_PASSWORD
)
yield connection
except Error as e:
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}")
finally:
if connection and connection.is_connected():
connection.close()
# Role-based access control
class UserRole(str, Enum):
ADMINISTRATOR = "administrator"
OPERATOR = "operator"
READ_ONLY = "read-only"
class CurrentUser(BaseModel):
user_id: int
username: str
role: UserRole
active: bool
# Authentication and authorization
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a password against its hash"""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
"""Hash a password"""
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
"""Create a JWT access token"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, JWT_SECRET_KEY, algorithm=JWT_ALGORITHM)
return encoded_jwt
def create_session_id() -> str:
"""Create a secure session ID"""
return secrets.token_urlsafe(32)
async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)) -> CurrentUser:
"""Get current authenticated user from JWT token"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
# Check if it's the old API token (for backward compatibility during transition)
if credentials.credentials == API_TOKEN:
# Return a fake admin user for legacy API token
return CurrentUser(
user_id=0,
username="legacy_admin",
role=UserRole.ADMINISTRATOR,
active=True
)
# Try to decode JWT token
payload = jwt.decode(credentials.credentials, JWT_SECRET_KEY, algorithms=[JWT_ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
except JWTError:
raise credentials_exception
# Get user from database
with get_db() as conn:
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM users WHERE username = %s AND active = 1", (username,))
user = cursor.fetchone()
cursor.close()
if user is None:
raise credentials_exception
return CurrentUser(
user_id=user["user_id"],
username=user["username"],
role=UserRole(user["role"]),
active=user["active"]
)
def require_role(required_roles: List[UserRole]):
"""Decorator factory for role-based access control"""
def role_checker(current_user: CurrentUser = Depends(get_current_user)) -> CurrentUser:
if current_user.role not in required_roles:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Insufficient permissions. Required roles: {[role.value for role in required_roles]}"
)
return current_user
return role_checker
# Convenience functions for common role requirements
def require_admin() -> CurrentUser:
return Depends(require_role([UserRole.ADMINISTRATOR]))
def require_write_access() -> CurrentUser:
return Depends(require_role([UserRole.ADMINISTRATOR, UserRole.OPERATOR]))
def require_read_access() -> CurrentUser:
return Depends(require_role([UserRole.ADMINISTRATOR, UserRole.OPERATOR, UserRole.READ_ONLY]))
# Legacy authentication (for backward compatibility)
def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""Legacy API token verification - deprecated, use get_current_user instead"""
if credentials.credentials != API_TOKEN:
raise HTTPException(status_code=401, detail="Invalid authentication token")
return credentials.credentials
# Pydantic models for authentication
class LoginRequest(BaseModel):
username: str
password: str
class TokenResponse(BaseModel):
access_token: str
token_type: str = "bearer"
expires_in: int
user: dict
class CreateUserRequest(BaseModel):
username: str
password: str
role: UserRole
class UpdateUserRequest(BaseModel):
password: Optional[str] = None
role: Optional[UserRole] = None
active: Optional[bool] = None
class UserResponse(BaseModel):
user_id: int
username: str
role: UserRole
created_at: datetime
last_login: Optional[datetime] = None
active: bool
# Pydantic models for mailing list functionality
class MailingList(BaseModel):
list_id: Optional[int] = None
list_name: str
list_email: EmailStr
description: Optional[str] = None
active: bool = True
class MailingListUpdate(BaseModel):
list_name: Optional[str] = None
description: Optional[str] = None
active: Optional[bool] = None
class Member(BaseModel):
member_id: Optional[int] = None
name: str
email: EmailStr
active: bool = True
bounce_count: Optional[int] = 0
last_bounce_at: Optional[datetime] = None
bounce_status: Optional[str] = 'clean'
class MemberUpdate(BaseModel):
name: Optional[str] = None
active: Optional[bool] = None
class Subscription(BaseModel):
list_email: EmailStr
member_email: EmailStr
active: bool = True
class BulkImportRequest(BaseModel):
csv_data: str
list_ids: List[int]
class BulkImportResult(BaseModel):
total_rows: int
processed_rows: int
created_members: int
updated_members: int
subscriptions_added: int
errors: List[str]
# Authentication routes
@app.post("/auth/login", response_model=TokenResponse)
async def login(login_request: LoginRequest, request: Request):
"""Authenticate user and return JWT token"""
with get_db() as conn:
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM users WHERE username = %s AND active = 1", (login_request.username,))
user = cursor.fetchone()
if not user or not verify_password(login_request.password, user["password_hash"]):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password"
)
# Update last login
cursor.execute("UPDATE users SET last_login = %s WHERE user_id = %s",
(datetime.utcnow(), user["user_id"]))
conn.commit()
# Create session record
session_id = create_session_id()
expires_at = datetime.utcnow() + timedelta(hours=SESSION_EXPIRE_HOURS)
client_ip = request.client.host if request.client else None
user_agent = request.headers.get("user-agent", "")
cursor.execute("""
INSERT INTO user_sessions (session_id, user_id, expires_at, ip_address, user_agent)
VALUES (%s, %s, %s, %s, %s)
""", (session_id, user["user_id"], expires_at, client_ip, user_agent))
conn.commit()
cursor.close()
# Create JWT token
access_token_expires = timedelta(minutes=JWT_ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user["username"]}, expires_delta=access_token_expires
)
return TokenResponse(
access_token=access_token,
expires_in=JWT_ACCESS_TOKEN_EXPIRE_MINUTES * 60,
user={
"user_id": user["user_id"],
"username": user["username"],
"role": user["role"],
"active": user["active"]
}
)
@app.post("/auth/logout")
async def logout(current_user: CurrentUser = Depends(get_current_user)):
"""Logout current user (invalidate sessions)"""
if current_user.user_id == 0: # Legacy admin user
return {"message": "Logged out successfully"}
with get_db() as conn:
cursor = conn.cursor()
cursor.execute("UPDATE user_sessions SET active = 0 WHERE user_id = %s", (current_user.user_id,))
conn.commit()
cursor.close()
return {"message": "Logged out successfully"}
@app.get("/users", response_model=List[UserResponse])
async def get_users(current_user: CurrentUser = require_admin()):
"""Get all users (admin only)"""
with get_db() as conn:
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT user_id, username, role, created_at, last_login, active FROM users ORDER BY username")
users = cursor.fetchall()
cursor.close()
return users
@app.post("/users", response_model=UserResponse, status_code=201)
async def create_user(user_request: CreateUserRequest, current_user: CurrentUser = require_admin()):
"""Create a new user (admin only)"""
# Check if username already exists
with get_db() as conn:
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT user_id FROM users WHERE username = %s", (user_request.username,))
existing_user = cursor.fetchone()
if existing_user:
raise HTTPException(status_code=400, detail="Username already exists")
# Hash password and create user
password_hash = get_password_hash(user_request.password)
cursor.execute("""
INSERT INTO users (username, password_hash, role)
VALUES (%s, %s, %s)
""", (user_request.username, password_hash, user_request.role.value))
conn.commit()
user_id = cursor.lastrowid
# Return created user
cursor.execute("""
SELECT user_id, username, role, created_at, last_login, active
FROM users WHERE user_id = %s
""", (user_id,))
new_user = cursor.fetchone()
cursor.close()
return new_user
@app.patch("/users/{user_id}", response_model=UserResponse)
async def update_user(user_id: int, updates: UpdateUserRequest, current_user: CurrentUser = require_admin()):
"""Update a user (admin only)"""
with get_db() as conn:
cursor = conn.cursor(dictionary=True)
# Build update query dynamically
update_fields = []
values = []
if updates.password is not None:
update_fields.append("password_hash = %s")
values.append(get_password_hash(updates.password))
if updates.role is not None:
update_fields.append("role = %s")
values.append(updates.role.value)
if updates.active is not None:
update_fields.append("active = %s")
values.append(updates.active)
if not update_fields:
raise HTTPException(status_code=400, detail="No fields to update")
values.append(user_id)
query = f"UPDATE users SET {', '.join(update_fields)} WHERE user_id = %s"
cursor.execute(query, values)
conn.commit()
# Return updated user
cursor.execute("""
SELECT user_id, username, role, created_at, last_login, active
FROM users WHERE user_id = %s
""", (user_id,))
updated_user = cursor.fetchone()
cursor.close()
if not updated_user:
raise HTTPException(status_code=404, detail="User not found")
return updated_user
@app.delete("/users/{user_id}", status_code=204)
async def delete_user(user_id: int, current_user: CurrentUser = require_admin()):
"""Delete a user (admin only)"""
if user_id == current_user.user_id:
raise HTTPException(status_code=400, detail="Cannot delete your own account")
with get_db() as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM users WHERE user_id = %s", (user_id,))
conn.commit()
if cursor.rowcount == 0:
raise HTTPException(status_code=404, detail="User not found")
cursor.close()
@app.get("/auth/me", response_model=dict)
async def get_current_user_info(current_user: CurrentUser = Depends(get_current_user)):
"""Get current user information"""
return {
"user_id": current_user.user_id,
"username": current_user.username,
"role": current_user.role.value,
"active": current_user.active
}
# Routes
@app.get("/")
async def root():
"""API information"""
return {
"name": "Mailing List Manager API",
"version": "1.0.0",
"status": "running"
}
@app.get("/health")
async def health():
"""Health check endpoint"""
try:
with get_db() as conn:
cursor = conn.cursor()
cursor.execute("SELECT 1")
cursor.fetchone()
cursor.close()
return {"status": "healthy", "database": "connected"}
except Exception as e:
raise HTTPException(status_code=503, detail=f"Unhealthy: {str(e)}")
# Mailing Lists endpoints
@app.get("/lists", response_model=List[MailingList])
async def get_lists(current_user: CurrentUser = require_read_access()):
"""Get all mailing lists"""
with get_db() as conn:
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM lists ORDER BY list_name")
lists = cursor.fetchall()
cursor.close()
return lists
@app.get("/lists/{list_id}", response_model=MailingList)
async def get_list(list_id: int, current_user: CurrentUser = require_read_access()):
"""Get a specific mailing list"""
with get_db() as conn:
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM lists WHERE list_id = %s", (list_id,))
mailing_list = cursor.fetchone()
cursor.close()
if not mailing_list:
raise HTTPException(status_code=404, detail="List not found")
return mailing_list
@app.post("/lists", response_model=MailingList, status_code=201)
async def create_list(mailing_list: MailingList, current_user: CurrentUser = require_write_access()):
"""Create a new mailing list"""
with get_db() as conn:
cursor = conn.cursor()
try:
cursor.execute(
"INSERT INTO lists (list_name, list_email, description, active) VALUES (%s, %s, %s, %s)",
(mailing_list.list_name, mailing_list.list_email, mailing_list.description, mailing_list.active)
)
conn.commit()
mailing_list.list_id = cursor.lastrowid
cursor.close()
return mailing_list
except Error as e:
raise HTTPException(status_code=400, detail=f"Failed to create list: {str(e)}")
@app.patch("/lists/{list_id}", response_model=MailingList)
async def update_list(list_id: int, updates: MailingListUpdate, current_user: CurrentUser = require_write_access()):
"""Update a mailing list"""
with get_db() as conn:
cursor = conn.cursor(dictionary=True)
# Build update query dynamically
update_fields = []
values = []
if updates.list_name is not None:
update_fields.append("list_name = %s")
values.append(updates.list_name)
if updates.description is not None:
update_fields.append("description = %s")
values.append(updates.description)
if updates.active is not None:
update_fields.append("active = %s")
values.append(updates.active)
if not update_fields:
raise HTTPException(status_code=400, detail="No fields to update")
values.append(list_id)
query = f"UPDATE lists SET {', '.join(update_fields)} WHERE list_id = %s"
cursor.execute(query, values)
conn.commit()
# Return updated list
cursor.execute("SELECT * FROM lists WHERE list_id = %s", (list_id,))
updated_list = cursor.fetchone()
cursor.close()
if not updated_list:
raise HTTPException(status_code=404, detail="List not found")
return updated_list
@app.delete("/lists/{list_id}", status_code=204)
async def delete_list(list_id: int, current_user: CurrentUser = require_write_access()):
"""Delete a mailing list"""
with get_db() as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM lists WHERE list_id = %s", (list_id,))
conn.commit()
if cursor.rowcount == 0:
raise HTTPException(status_code=404, detail="List not found")
cursor.close()
# Members endpoints
@app.get("/members", response_model=List[Member])
async def get_members(current_user: CurrentUser = require_read_access()):
"""Get all members"""
with get_db() as conn:
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM members ORDER BY name")
members = cursor.fetchall()
cursor.close()
return members
@app.get("/members/{member_id}", response_model=Member)
async def get_member(member_id: int, current_user: CurrentUser = require_read_access()):
"""Get a specific member"""
with get_db() as conn:
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM members WHERE member_id = %s", (member_id,))
member = cursor.fetchone()
cursor.close()
if not member:
raise HTTPException(status_code=404, detail="Member not found")
return member
@app.post("/members", response_model=Member, status_code=201)
async def create_member(member: Member, current_user: CurrentUser = require_write_access()):
"""Create a new member"""
with get_db() as conn:
cursor = conn.cursor()
try:
cursor.execute(
"INSERT INTO members (name, email, active) VALUES (%s, %s, %s)",
(member.name, member.email, member.active)
)
conn.commit()
member.member_id = cursor.lastrowid
cursor.close()
return member
except Error as e:
raise HTTPException(status_code=400, detail=f"Failed to create member: {str(e)}")
@app.patch("/members/{member_id}", response_model=Member)
async def update_member(member_id: int, updates: MemberUpdate, current_user: CurrentUser = require_write_access()):
"""Update a member"""
with get_db() as conn:
cursor = conn.cursor(dictionary=True)
update_fields = []
values = []
if updates.name is not None:
update_fields.append("name = %s")
values.append(updates.name)
if updates.active is not None:
update_fields.append("active = %s")
values.append(updates.active)
if not update_fields:
raise HTTPException(status_code=400, detail="No fields to update")
values.append(member_id)
query = f"UPDATE members SET {', '.join(update_fields)} WHERE member_id = %s"
cursor.execute(query, values)
conn.commit()
cursor.execute("SELECT * FROM members WHERE member_id = %s", (member_id,))
updated_member = cursor.fetchone()
cursor.close()
if not updated_member:
raise HTTPException(status_code=404, detail="Member not found")
return updated_member
@app.delete("/members/{member_id}", status_code=204)
async def delete_member(member_id: int, current_user: CurrentUser = require_write_access()):
"""Delete a member"""
with get_db() as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM members WHERE member_id = %s", (member_id,))
conn.commit()
if cursor.rowcount == 0:
raise HTTPException(status_code=404, detail="Member not found")
cursor.close()
# Subscription endpoints
@app.get("/lists/{list_id}/members", response_model=List[Member])
async def get_list_members(list_id: int, current_user: CurrentUser = require_read_access()):
"""Get all members of a specific list"""
with get_db() as conn:
cursor = conn.cursor(dictionary=True)
cursor.execute("""
SELECT m.*
FROM members m
JOIN list_members lm ON m.member_id = lm.member_id
WHERE lm.list_id = %s AND lm.active = 1
ORDER BY m.name
""", (list_id,))
members = cursor.fetchall()
cursor.close()
return members
@app.post("/subscriptions", status_code=201)
async def subscribe_member(subscription: Subscription, current_user: CurrentUser = require_write_access()):
"""Subscribe a member to a list"""
with get_db() as conn:
cursor = conn.cursor()
try:
# Get list_id and member_id
cursor.execute("SELECT list_id FROM lists WHERE list_email = %s", (subscription.list_email,))
list_result = cursor.fetchone()
if not list_result:
raise HTTPException(status_code=404, detail="List not found")
cursor.execute("SELECT member_id FROM members WHERE email = %s", (subscription.member_email,))
member_result = cursor.fetchone()
if not member_result:
raise HTTPException(status_code=404, detail="Member not found")
list_id = list_result[0]
member_id = member_result[0]
# Insert subscription
cursor.execute(
"INSERT INTO list_members (list_id, member_id, active) VALUES (%s, %s, %s)",
(list_id, member_id, subscription.active)
)
conn.commit()
cursor.close()
return {"message": "Subscription created", "list_email": subscription.list_email, "member_email": subscription.member_email}
except Error as e:
if "Duplicate entry" in str(e):
raise HTTPException(status_code=400, detail="Member already subscribed to this list")
raise HTTPException(status_code=400, detail=f"Failed to create subscription: {str(e)}")
@app.delete("/subscriptions")
async def unsubscribe_member(list_email: EmailStr, member_email: EmailStr, current_user: CurrentUser = require_write_access()):
"""Unsubscribe a member from a list"""
with get_db() as conn:
cursor = conn.cursor()
cursor.execute("""
DELETE lm FROM list_members lm
JOIN lists l ON lm.list_id = l.list_id
JOIN members m ON lm.member_id = m.member_id
WHERE l.list_email = %s AND m.email = %s
""", (list_email, member_email))
conn.commit()
if cursor.rowcount == 0:
raise HTTPException(status_code=404, detail="Subscription not found")
cursor.close()
return {"message": "Unsubscribed successfully"}
@app.post("/bulk-import", response_model=BulkImportResult)
async def bulk_import_members(bulk_request: BulkImportRequest, current_user: CurrentUser = require_write_access()):
"""Bulk import members from CSV data and subscribe them to specified lists"""
result = BulkImportResult(
total_rows=0,
processed_rows=0,
created_members=0,
updated_members=0,
subscriptions_added=0,
errors=[]
)
with get_db() as conn:
cursor = conn.cursor(dictionary=True)
try:
# Verify all list_ids exist
if bulk_request.list_ids:
placeholders = ','.join(['%s'] * len(bulk_request.list_ids))
cursor.execute(f"SELECT list_id FROM lists WHERE list_id IN ({placeholders})", bulk_request.list_ids)
existing_lists = [row['list_id'] for row in cursor.fetchall()]
invalid_lists = set(bulk_request.list_ids) - set(existing_lists)
if invalid_lists:
raise HTTPException(status_code=400, detail=f"Invalid list IDs: {list(invalid_lists)}")
# Parse CSV data
csv_reader = csv.DictReader(io.StringIO(bulk_request.csv_data))
# Validate CSV headers - we need at least Name and Email
if not csv_reader.fieldnames or 'Name' not in csv_reader.fieldnames or 'Email' not in csv_reader.fieldnames:
raise HTTPException(status_code=400, detail="CSV must contain 'Name' and 'Email' columns")
for row_num, row in enumerate(csv_reader, start=1):
result.total_rows += 1
try:
name = row.get('Name', '').strip()
email = row.get('Email', '').strip()
# Skip rows without email (name is optional)
if not email:
result.errors.append(f"Row {row_num}: Missing email address")
continue
# Basic email validation
if '@' not in email or '.' not in email.split('@')[1]:
result.errors.append(f"Row {row_num}: Invalid email format: {email}")
continue
# Use email as name if no name provided
if not name:
name = email.split('@')[0] # Use the part before @ as a default name
# Check if member exists
cursor.execute("SELECT member_id FROM members WHERE email = %s", (email,))
existing_member = cursor.fetchone()
if existing_member:
# Update existing member's name if we have a better name (not auto-generated from email)
should_update_name = (
row.get('Name', '').strip() and # We have a name in the CSV
name != email.split('@')[0] # It's not the auto-generated name
)
if should_update_name:
cursor.execute("UPDATE members SET name = %s WHERE member_id = %s", (name, existing_member['member_id']))
if cursor.rowcount > 0:
result.updated_members += 1
member_id = existing_member['member_id']
else:
# Create new member
cursor.execute(
"INSERT INTO members (name, email, active) VALUES (%s, %s, %s)",
(name, email, True)
)
member_id = cursor.lastrowid
result.created_members += 1
# Subscribe to selected lists
for list_id in bulk_request.list_ids:
try:
cursor.execute(
"INSERT INTO list_members (list_id, member_id, active) VALUES (%s, %s, %s)",
(list_id, member_id, True)
)
result.subscriptions_added += 1
except Error as e:
if "Duplicate entry" in str(e):
# Member already subscribed to this list - not an error
pass
else:
result.errors.append(f"Row {row_num}: Failed to subscribe to list {list_id}: {str(e)}")
result.processed_rows += 1
except Exception as e:
result.errors.append(f"Row {row_num}: {str(e)}")
continue
# Commit all changes
conn.commit()
cursor.close()
return result
except HTTPException:
raise
except Exception as e:
cursor.close()
raise HTTPException(status_code=500, detail=f"Bulk import failed: {str(e)}")
# SNS Webhook for Bounce Handling
async def verify_sns_signature(request: Request) -> dict:
"""Verify SNS message signature"""
try:
body = await request.body()
print(f"SNS webhook received body: {body}")
print(f"SNS webhook body length: {len(body)}")
print(f"SNS webhook headers: {dict(request.headers)}")
if not body:
print("ERROR: Empty body received")
raise HTTPException(status_code=400, detail="Empty request body")
message = json.loads(body.decode('utf-8'))
print(f"SNS webhook parsed message type: {message.get('Type')}")
# For SubscriptionConfirmation and UnsubscribeConfirmation, we don't validate signature
# AWS will send a URL to confirm
if message.get('Type') in ['SubscriptionConfirmation', 'UnsubscribeConfirmation']:
return message
# Get certificate URL and download certificate
cert_url = message.get('SigningCertURL')
if not cert_url:
raise HTTPException(status_code=400, detail="Missing SigningCertURL")
# Verify certificate URL is from AWS
parsed_url = urlparse(cert_url)
if not parsed_url.hostname.endswith('.amazonaws.com'):
raise HTTPException(status_code=400, detail="Invalid certificate URL")
# Download certificate
async with httpx.AsyncClient() as client:
cert_response = await client.get(cert_url)
cert_response.raise_for_status()
cert_pem = cert_response.content
# Load certificate and extract public key
cert = x509.load_pem_x509_certificate(cert_pem, default_backend())
public_key = cert.public_key()
# Build string to sign
if message.get('Type') == 'Notification':
string_to_sign = (
f"Message\n{message['Message']}\n"
f"MessageId\n{message['MessageId']}\n"
)
if 'Subject' in message:
string_to_sign += f"Subject\n{message['Subject']}\n"
string_to_sign += (
f"Timestamp\n{message['Timestamp']}\n"
f"TopicArn\n{message['TopicArn']}\n"
f"Type\n{message['Type']}\n"
)
else:
string_to_sign = (
f"Message\n{message['Message']}\n"
f"MessageId\n{message['MessageId']}\n"
f"SubscribeURL\n{message['SubscribeURL']}\n"
f"Timestamp\n{message['Timestamp']}\n"
f"Token\n{message['Token']}\n"
f"TopicArn\n{message['TopicArn']}\n"
f"Type\n{message['Type']}\n"
)
# Verify signature
signature = base64.b64decode(message['Signature'])
try:
public_key.verify(
signature,
string_to_sign.encode('utf-8'),
padding.PKCS1v15(),
hashes.SHA1()
)
except Exception as e:
raise HTTPException(status_code=400, detail=f"Invalid signature: {str(e)}")
return message
except json.JSONDecodeError:
raise HTTPException(status_code=400, detail="Invalid JSON")
except Exception as e:
raise HTTPException(status_code=400, detail=f"Signature verification failed: {str(e)}")
async def process_bounce(bounce_data: dict):
"""Process bounce notification and update database"""
try:
bounce_type = bounce_data.get('bounceType') # Permanent, Transient, Undetermined
bounce_subtype = bounce_data.get('bounceSubType', '')
timestamp_str = bounce_data.get('timestamp')
feedback_id = bounce_data.get('feedbackId', '')
# Convert ISO 8601 timestamp to MySQL datetime format
# SES sends: '2025-10-13T16:22:40.359Z'
# MySQL needs: '2025-10-13 16:22:40'
from datetime import datetime as dt
timestamp = dt.fromisoformat(timestamp_str.replace('Z', '+00:00')).strftime('%Y-%m-%d %H:%M:%S')
bounced_recipients = bounce_data.get('bouncedRecipients', [])
with get_db() as conn:
cursor = conn.cursor(dictionary=True)
for recipient in bounced_recipients:
email = recipient.get('emailAddress')
diagnostic_code = recipient.get('diagnosticCode', '')
if not email:
continue
# Find member by email
cursor.execute("SELECT member_id FROM members WHERE email = %s", (email,))
member = cursor.fetchone()
member_id = member['member_id'] if member else None
# Log the bounce
cursor.execute("""
INSERT INTO bounce_logs
(member_id, email, bounce_type, bounce_subtype, diagnostic_code, timestamp, feedback_id)
VALUES (%s, %s, %s, %s, %s, %s, %s)
""", (member_id, email, bounce_type, bounce_subtype, diagnostic_code, timestamp, feedback_id))
# Update member bounce status
if member_id:
# Determine bounce status
if bounce_type == 'Permanent':
new_status = 'hard_bounce'
# Deactivate member with hard bounce
cursor.execute("""
UPDATE members
SET bounce_count = bounce_count + 1,
last_bounce_at = %s,
bounce_status = %s,
active = 0
WHERE member_id = %s
""", (timestamp, new_status, member_id))
elif bounce_type == 'Transient':
# Check current bounce count
cursor.execute("SELECT bounce_count, bounce_status FROM members WHERE member_id = %s", (member_id,))
current = cursor.fetchone()
# If already hard bounced, don't change status
if current and current['bounce_status'] != 'hard_bounce':
new_count = current['bounce_count'] + 1
# After 3 soft bounces, mark as soft_bounce status
new_status = 'soft_bounce' if new_count >= 3 else 'clean'
cursor.execute("""
UPDATE members
SET bounce_count = %s,
last_bounce_at = %s,
bounce_status = %s
WHERE member_id = %s
""", (new_count, timestamp, new_status, member_id))
else: # Undetermined
cursor.execute("""
UPDATE members
SET bounce_count = bounce_count + 1,
last_bounce_at = %s
WHERE member_id = %s
""", (timestamp, member_id))
conn.commit()
cursor.close()
except Exception as e:
print(f"✗ Error processing bounce: {str(e)}")
print(f"Error type: {type(e).__name__}")
print(f"Bounce data: {bounce_data}")
import traceback
traceback.print_exc()
raise
@app.post("/webhooks/sns", response_class=PlainTextResponse)
async def sns_webhook(request: Request):
"""Handle SNS notifications for bounces and complaints"""
try:
print(f"=== SNS Webhook Request ===")
print(f"Headers: {dict(request.headers)}")
print(f"Content-Type: {request.headers.get('content-type')}")
print(f"User-Agent: {request.headers.get('user-agent')}")
# Verify SNS signature
message = await verify_sns_signature(request)
print(f"Message Type: {message.get('Type')}")
print(f"Message Keys: {list(message.keys())}")
message_type = message.get('Type')
# Handle subscription confirmation
if message_type == 'SubscriptionConfirmation':
subscribe_url = message.get('SubscribeURL')
print(f"Subscription confirmation received, URL: {subscribe_url}")
if subscribe_url:
# Confirm subscription
async with httpx.AsyncClient() as client:
response = await client.get(subscribe_url)
print(f"Subscription confirmation response: {response.status_code}")
return "Subscription confirmed"
# Handle notification
elif message_type == 'Notification':
# Parse the message
inner_message = message.get('Message', '{}')
print(f"Inner message (first 500 chars): {inner_message[:500]}")
notification = json.loads(inner_message)
# SES can send either 'notificationType' or 'eventType' depending on configuration
notification_type = notification.get('notificationType') or notification.get('eventType')
print(f"Notification type: {notification_type}")
if notification_type == 'Bounce':
bounce = notification.get('bounce', {})
print(f"\n✓ Processing Bounce")
print(f" Bounce Type: {bounce.get('bounceType')}")
print(f" Recipients: {[r.get('emailAddress') for r in bounce.get('bouncedRecipients', [])]}")
await process_bounce(bounce)
print(f" ✓ Bounce processed successfully")
return "Bounce processed"
elif notification_type == 'Complaint':
# We could also track complaints similarly to bounces
print(f"\n✓ Complaint received")
return "Complaint received"
print(f"=== End SNS Webhook Request ===")
return "OK"
except HTTPException:
raise
except Exception as e:
print(f"SNS webhook error: {str(e)}")
import traceback
traceback.print_exc()
raise HTTPException(status_code=500, detail=str(e))
# Bounce management endpoints
class BounceLog(BaseModel):
bounce_id: int
email: str
bounce_type: str
bounce_subtype: Optional[str] = None
diagnostic_code: Optional[str] = None
timestamp: datetime
feedback_id: Optional[str] = None
created_at: datetime
class MemberWithBounces(BaseModel):
member_id: int
name: str
email: str
active: bool
bounce_count: int
last_bounce_at: Optional[datetime] = None
bounce_status: str
@app.get("/members/{member_id}/bounces", response_model=List[BounceLog])
async def get_member_bounces(member_id: int, current_user: CurrentUser = require_read_access()):
"""Get bounce history for a member"""
with get_db() as conn:
cursor = conn.cursor(dictionary=True)
cursor.execute("""
SELECT bounce_id, email, bounce_type, bounce_subtype, diagnostic_code,
timestamp, feedback_id, created_at
FROM bounce_logs
WHERE member_id = %s
ORDER BY timestamp DESC
""", (member_id,))
bounces = cursor.fetchall()
cursor.close()
return bounces
@app.patch("/members/{member_id}/bounce-status")
async def reset_bounce_status(member_id: int, current_user: CurrentUser = require_write_access()):
"""Reset bounce status for a member (e.g., after email address is corrected)"""
with get_db() as conn:
cursor = conn.cursor()
cursor.execute("""
UPDATE members
SET bounce_count = 0,
last_bounce_at = NULL,
bounce_status = 'clean'
WHERE member_id = %s
""", (member_id,))
conn.commit()
if cursor.rowcount == 0:
raise HTTPException(status_code=404, detail="Member not found")
cursor.close()
return {"message": "Bounce status reset successfully"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)