SES SNS Bounce Handling

This commit is contained in:
James Pattinson
2025-10-13 15:05:42 +00:00
parent ac23638125
commit 72f3297a80
12 changed files with 1276 additions and 3 deletions

View File

@@ -5,6 +5,7 @@ FastAPI-based REST API for managing mailing lists and members
from fastapi import FastAPI, HTTPException, Depends, Header, status, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from fastapi.responses import PlainTextResponse
from pydantic import BaseModel, EmailStr
from typing import List, Optional, Annotated
import mysql.connector
@@ -19,6 +20,15 @@ import bcrypt
from jose import JWTError, jwt
from passlib.context import CryptContext
from enum import Enum
import json
import base64
from urllib.parse import urlparse
import httpx
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives.serialization import load_pem_public_key
# Configuration
API_TOKEN = os.getenv('API_TOKEN', 'change-this-token') # Keep for backward compatibility during transition
@@ -229,6 +239,9 @@ class Member(BaseModel):
name: str
email: EmailStr
active: bool = True
bounce_count: Optional[int] = 0
last_bounce_at: Optional[datetime] = None
bounce_status: Optional[str] = 'clean'
class MemberUpdate(BaseModel):
name: Optional[str] = None
@@ -808,6 +821,257 @@ async def bulk_import_members(bulk_request: BulkImportRequest, current_user: Cur
cursor.close()
raise HTTPException(status_code=500, detail=f"Bulk import failed: {str(e)}")
# SNS Webhook for Bounce Handling
async def verify_sns_signature(request: Request) -> dict:
"""Verify SNS message signature"""
try:
body = await request.body()
message = json.loads(body.decode('utf-8'))
# For SubscriptionConfirmation and UnsubscribeConfirmation, we don't validate signature
# AWS will send a URL to confirm
if message.get('Type') in ['SubscriptionConfirmation', 'UnsubscribeConfirmation']:
return message
# Get certificate URL and download certificate
cert_url = message.get('SigningCertURL')
if not cert_url:
raise HTTPException(status_code=400, detail="Missing SigningCertURL")
# Verify certificate URL is from AWS
parsed_url = urlparse(cert_url)
if not parsed_url.hostname.endswith('.amazonaws.com'):
raise HTTPException(status_code=400, detail="Invalid certificate URL")
# Download certificate
async with httpx.AsyncClient() as client:
cert_response = await client.get(cert_url)
cert_response.raise_for_status()
cert_pem = cert_response.content
# Load certificate and extract public key
cert = x509.load_pem_x509_certificate(cert_pem, default_backend())
public_key = cert.public_key()
# Build string to sign
if message.get('Type') == 'Notification':
string_to_sign = (
f"Message\n{message['Message']}\n"
f"MessageId\n{message['MessageId']}\n"
)
if 'Subject' in message:
string_to_sign += f"Subject\n{message['Subject']}\n"
string_to_sign += (
f"Timestamp\n{message['Timestamp']}\n"
f"TopicArn\n{message['TopicArn']}\n"
f"Type\n{message['Type']}\n"
)
else:
string_to_sign = (
f"Message\n{message['Message']}\n"
f"MessageId\n{message['MessageId']}\n"
f"SubscribeURL\n{message['SubscribeURL']}\n"
f"Timestamp\n{message['Timestamp']}\n"
f"Token\n{message['Token']}\n"
f"TopicArn\n{message['TopicArn']}\n"
f"Type\n{message['Type']}\n"
)
# Verify signature
signature = base64.b64decode(message['Signature'])
try:
public_key.verify(
signature,
string_to_sign.encode('utf-8'),
padding.PKCS1v15(),
hashes.SHA1()
)
except Exception as e:
raise HTTPException(status_code=400, detail=f"Invalid signature: {str(e)}")
return message
except json.JSONDecodeError:
raise HTTPException(status_code=400, detail="Invalid JSON")
except Exception as e:
raise HTTPException(status_code=400, detail=f"Signature verification failed: {str(e)}")
async def process_bounce(bounce_data: dict):
"""Process bounce notification and update database"""
try:
bounce_type = bounce_data.get('bounceType') # Permanent, Transient, Undetermined
bounce_subtype = bounce_data.get('bounceSubType', '')
timestamp = bounce_data.get('timestamp')
feedback_id = bounce_data.get('feedbackId', '')
bounced_recipients = bounce_data.get('bouncedRecipients', [])
with get_db() as conn:
cursor = conn.cursor(dictionary=True)
for recipient in bounced_recipients:
email = recipient.get('emailAddress')
diagnostic_code = recipient.get('diagnosticCode', '')
if not email:
continue
# Find member by email
cursor.execute("SELECT member_id FROM members WHERE email = %s", (email,))
member = cursor.fetchone()
member_id = member['member_id'] if member else None
# Log the bounce
cursor.execute("""
INSERT INTO bounce_logs
(member_id, email, bounce_type, bounce_subtype, diagnostic_code, timestamp, feedback_id)
VALUES (%s, %s, %s, %s, %s, %s, %s)
""", (member_id, email, bounce_type, bounce_subtype, diagnostic_code, timestamp, feedback_id))
# Update member bounce status
if member_id:
# Determine bounce status
if bounce_type == 'Permanent':
new_status = 'hard_bounce'
# Deactivate member with hard bounce
cursor.execute("""
UPDATE members
SET bounce_count = bounce_count + 1,
last_bounce_at = %s,
bounce_status = %s,
active = 0
WHERE member_id = %s
""", (timestamp, new_status, member_id))
elif bounce_type == 'Transient':
# Check current bounce count
cursor.execute("SELECT bounce_count, bounce_status FROM members WHERE member_id = %s", (member_id,))
current = cursor.fetchone()
# If already hard bounced, don't change status
if current and current['bounce_status'] != 'hard_bounce':
new_count = current['bounce_count'] + 1
# After 3 soft bounces, mark as soft_bounce status
new_status = 'soft_bounce' if new_count >= 3 else 'clean'
cursor.execute("""
UPDATE members
SET bounce_count = %s,
last_bounce_at = %s,
bounce_status = %s
WHERE member_id = %s
""", (new_count, timestamp, new_status, member_id))
else: # Undetermined
cursor.execute("""
UPDATE members
SET bounce_count = bounce_count + 1,
last_bounce_at = %s
WHERE member_id = %s
""", (timestamp, member_id))
conn.commit()
cursor.close()
except Exception as e:
print(f"Error processing bounce: {str(e)}")
raise
@app.post("/webhooks/sns", response_class=PlainTextResponse)
async def sns_webhook(request: Request):
"""Handle SNS notifications for bounces and complaints"""
try:
# Verify SNS signature
message = await verify_sns_signature(request)
message_type = message.get('Type')
# Handle subscription confirmation
if message_type == 'SubscriptionConfirmation':
subscribe_url = message.get('SubscribeURL')
if subscribe_url:
# Confirm subscription
async with httpx.AsyncClient() as client:
await client.get(subscribe_url)
return "Subscription confirmed"
# Handle notification
elif message_type == 'Notification':
# Parse the message
notification = json.loads(message.get('Message', '{}'))
notification_type = notification.get('notificationType')
if notification_type == 'Bounce':
bounce = notification.get('bounce', {})
await process_bounce(bounce)
return "Bounce processed"
elif notification_type == 'Complaint':
# We could also track complaints similarly to bounces
return "Complaint received"
return "OK"
except HTTPException:
raise
except Exception as e:
print(f"SNS webhook error: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
# Bounce management endpoints
class BounceLog(BaseModel):
bounce_id: int
email: str
bounce_type: str
bounce_subtype: Optional[str] = None
diagnostic_code: Optional[str] = None
timestamp: datetime
feedback_id: Optional[str] = None
created_at: datetime
class MemberWithBounces(BaseModel):
member_id: int
name: str
email: str
active: bool
bounce_count: int
last_bounce_at: Optional[datetime] = None
bounce_status: str
@app.get("/members/{member_id}/bounces", response_model=List[BounceLog])
async def get_member_bounces(member_id: int, current_user: CurrentUser = require_read_access()):
"""Get bounce history for a member"""
with get_db() as conn:
cursor = conn.cursor(dictionary=True)
cursor.execute("""
SELECT bounce_id, email, bounce_type, bounce_subtype, diagnostic_code,
timestamp, feedback_id, created_at
FROM bounce_logs
WHERE member_id = %s
ORDER BY timestamp DESC
""", (member_id,))
bounces = cursor.fetchall()
cursor.close()
return bounces
@app.patch("/members/{member_id}/bounce-status")
async def reset_bounce_status(member_id: int, current_user: CurrentUser = require_write_access()):
"""Reset bounce status for a member (e.g., after email address is corrected)"""
with get_db() as conn:
cursor = conn.cursor()
cursor.execute("""
UPDATE members
SET bounce_count = 0,
last_bounce_at = NULL,
bounce_status = 'clean'
WHERE member_id = %s
""", (member_id,))
conn.commit()
if cursor.rowcount == 0:
raise HTTPException(status_code=404, detail="Member not found")
cursor.close()
return {"message": "Bounce status reset successfully"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)

View File

@@ -8,3 +8,4 @@ email-validator==2.1.0
bcrypt==4.0.1
python-jose[cryptography]==3.3.0
passlib[bcrypt]==1.7.4
httpx==0.25.2