Fixed bounce handling
This commit is contained in:
52
api/main.py
52
api/main.py
@@ -826,7 +826,16 @@ async def verify_sns_signature(request: Request) -> dict:
|
||||
"""Verify SNS message signature"""
|
||||
try:
|
||||
body = await request.body()
|
||||
print(f"SNS webhook received body: {body}")
|
||||
print(f"SNS webhook body length: {len(body)}")
|
||||
print(f"SNS webhook headers: {dict(request.headers)}")
|
||||
|
||||
if not body:
|
||||
print("ERROR: Empty body received")
|
||||
raise HTTPException(status_code=400, detail="Empty request body")
|
||||
|
||||
message = json.loads(body.decode('utf-8'))
|
||||
print(f"SNS webhook parsed message type: {message.get('Type')}")
|
||||
|
||||
# For SubscriptionConfirmation and UnsubscribeConfirmation, we don't validate signature
|
||||
# AWS will send a URL to confirm
|
||||
@@ -901,9 +910,15 @@ async def process_bounce(bounce_data: dict):
|
||||
try:
|
||||
bounce_type = bounce_data.get('bounceType') # Permanent, Transient, Undetermined
|
||||
bounce_subtype = bounce_data.get('bounceSubType', '')
|
||||
timestamp = bounce_data.get('timestamp')
|
||||
timestamp_str = bounce_data.get('timestamp')
|
||||
feedback_id = bounce_data.get('feedbackId', '')
|
||||
|
||||
# Convert ISO 8601 timestamp to MySQL datetime format
|
||||
# SES sends: '2025-10-13T16:22:40.359Z'
|
||||
# MySQL needs: '2025-10-13 16:22:40'
|
||||
from datetime import datetime as dt
|
||||
timestamp = dt.fromisoformat(timestamp_str.replace('Z', '+00:00')).strftime('%Y-%m-%d %H:%M:%S')
|
||||
|
||||
bounced_recipients = bounce_data.get('bouncedRecipients', [])
|
||||
|
||||
with get_db() as conn:
|
||||
@@ -972,48 +987,75 @@ async def process_bounce(bounce_data: dict):
|
||||
cursor.close()
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error processing bounce: {str(e)}")
|
||||
print(f"✗ Error processing bounce: {str(e)}")
|
||||
print(f"Error type: {type(e).__name__}")
|
||||
print(f"Bounce data: {bounce_data}")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
raise
|
||||
|
||||
@app.post("/webhooks/sns", response_class=PlainTextResponse)
|
||||
async def sns_webhook(request: Request):
|
||||
"""Handle SNS notifications for bounces and complaints"""
|
||||
try:
|
||||
print(f"=== SNS Webhook Request ===")
|
||||
print(f"Headers: {dict(request.headers)}")
|
||||
print(f"Content-Type: {request.headers.get('content-type')}")
|
||||
print(f"User-Agent: {request.headers.get('user-agent')}")
|
||||
|
||||
# Verify SNS signature
|
||||
message = await verify_sns_signature(request)
|
||||
|
||||
print(f"Message Type: {message.get('Type')}")
|
||||
print(f"Message Keys: {list(message.keys())}")
|
||||
|
||||
message_type = message.get('Type')
|
||||
|
||||
# Handle subscription confirmation
|
||||
if message_type == 'SubscriptionConfirmation':
|
||||
subscribe_url = message.get('SubscribeURL')
|
||||
print(f"Subscription confirmation received, URL: {subscribe_url}")
|
||||
if subscribe_url:
|
||||
# Confirm subscription
|
||||
async with httpx.AsyncClient() as client:
|
||||
await client.get(subscribe_url)
|
||||
response = await client.get(subscribe_url)
|
||||
print(f"Subscription confirmation response: {response.status_code}")
|
||||
return "Subscription confirmed"
|
||||
|
||||
# Handle notification
|
||||
elif message_type == 'Notification':
|
||||
# Parse the message
|
||||
notification = json.loads(message.get('Message', '{}'))
|
||||
notification_type = notification.get('notificationType')
|
||||
inner_message = message.get('Message', '{}')
|
||||
print(f"Inner message (first 500 chars): {inner_message[:500]}")
|
||||
notification = json.loads(inner_message)
|
||||
|
||||
# SES can send either 'notificationType' or 'eventType' depending on configuration
|
||||
notification_type = notification.get('notificationType') or notification.get('eventType')
|
||||
print(f"Notification type: {notification_type}")
|
||||
|
||||
if notification_type == 'Bounce':
|
||||
bounce = notification.get('bounce', {})
|
||||
print(f"\n✓ Processing Bounce")
|
||||
print(f" Bounce Type: {bounce.get('bounceType')}")
|
||||
print(f" Recipients: {[r.get('emailAddress') for r in bounce.get('bouncedRecipients', [])]}")
|
||||
await process_bounce(bounce)
|
||||
print(f" ✓ Bounce processed successfully")
|
||||
return "Bounce processed"
|
||||
|
||||
elif notification_type == 'Complaint':
|
||||
# We could also track complaints similarly to bounces
|
||||
print(f"\n✓ Complaint received")
|
||||
return "Complaint received"
|
||||
|
||||
print(f"=== End SNS Webhook Request ===")
|
||||
return "OK"
|
||||
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
print(f"SNS webhook error: {str(e)}")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
# Bounce management endpoints
|
||||
|
||||
Reference in New Issue
Block a user