Ability to disable SNS bounce handling
This commit is contained in:
586
api/main.py
586
api/main.py
@@ -43,6 +43,10 @@ MYSQL_DATABASE = os.getenv('MYSQL_DATABASE', 'maillist')
|
||||
MYSQL_USER = os.getenv('MYSQL_USER', 'maillist')
|
||||
MYSQL_PASSWORD = os.getenv('MYSQL_PASSWORD', '')
|
||||
|
||||
# Bounce handling configuration
|
||||
ENABLE_SNS_WEBHOOKS = os.getenv('ENABLE_SNS_WEBHOOKS', 'false').lower() == 'true'
|
||||
ENABLE_BOUNCE_HANDLING = os.getenv('ENABLE_BOUNCE_HANDLING', 'false').lower() == 'true'
|
||||
|
||||
# Password hashing
|
||||
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
|
||||
|
||||
@@ -463,6 +467,14 @@ async def health():
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=503, detail=f"Unhealthy: {str(e)}")
|
||||
|
||||
@app.get("/config")
|
||||
async def get_config():
|
||||
"""Get public configuration settings"""
|
||||
return {
|
||||
"bounce_handling_enabled": ENABLE_BOUNCE_HANDLING,
|
||||
"sns_webhooks_enabled": ENABLE_SNS_WEBHOOKS
|
||||
}
|
||||
|
||||
# Mailing Lists endpoints
|
||||
@app.get("/lists", response_model=List[MailingList])
|
||||
async def get_lists(current_user: CurrentUser = require_read_access()):
|
||||
@@ -824,298 +836,316 @@ async def bulk_import_members(bulk_request: BulkImportRequest, current_user: Cur
|
||||
cursor.close()
|
||||
raise HTTPException(status_code=500, detail=f"Bulk import failed: {str(e)}")
|
||||
|
||||
# SNS Webhook for Bounce Handling
|
||||
async def verify_sns_signature(request: Request) -> dict:
|
||||
"""Verify SNS message signature"""
|
||||
try:
|
||||
body = await request.body()
|
||||
print(f"SNS webhook received body: {body}")
|
||||
print(f"SNS webhook body length: {len(body)}")
|
||||
print(f"SNS webhook headers: {dict(request.headers)}")
|
||||
|
||||
if not body:
|
||||
print("ERROR: Empty body received")
|
||||
raise HTTPException(status_code=400, detail="Empty request body")
|
||||
|
||||
message = json.loads(body.decode('utf-8'))
|
||||
print(f"SNS webhook parsed message type: {message.get('Type')}")
|
||||
|
||||
# For SubscriptionConfirmation and UnsubscribeConfirmation, we don't validate signature
|
||||
# AWS will send a URL to confirm
|
||||
if message.get('Type') in ['SubscriptionConfirmation', 'UnsubscribeConfirmation']:
|
||||
return message
|
||||
|
||||
# Get certificate URL and download certificate
|
||||
cert_url = message.get('SigningCertURL')
|
||||
if not cert_url:
|
||||
raise HTTPException(status_code=400, detail="Missing SigningCertURL")
|
||||
|
||||
# Verify certificate URL is from AWS
|
||||
parsed_url = urlparse(cert_url)
|
||||
if not parsed_url.hostname.endswith('.amazonaws.com'):
|
||||
raise HTTPException(status_code=400, detail="Invalid certificate URL")
|
||||
|
||||
# Download certificate
|
||||
async with httpx.AsyncClient() as client:
|
||||
cert_response = await client.get(cert_url)
|
||||
cert_response.raise_for_status()
|
||||
cert_pem = cert_response.content
|
||||
|
||||
# Load certificate and extract public key
|
||||
cert = x509.load_pem_x509_certificate(cert_pem, default_backend())
|
||||
public_key = cert.public_key()
|
||||
|
||||
# Build string to sign
|
||||
if message.get('Type') == 'Notification':
|
||||
string_to_sign = (
|
||||
f"Message\n{message['Message']}\n"
|
||||
f"MessageId\n{message['MessageId']}\n"
|
||||
)
|
||||
if 'Subject' in message:
|
||||
string_to_sign += f"Subject\n{message['Subject']}\n"
|
||||
string_to_sign += (
|
||||
f"Timestamp\n{message['Timestamp']}\n"
|
||||
f"TopicArn\n{message['TopicArn']}\n"
|
||||
f"Type\n{message['Type']}\n"
|
||||
)
|
||||
else:
|
||||
string_to_sign = (
|
||||
f"Message\n{message['Message']}\n"
|
||||
f"MessageId\n{message['MessageId']}\n"
|
||||
f"SubscribeURL\n{message['SubscribeURL']}\n"
|
||||
f"Timestamp\n{message['Timestamp']}\n"
|
||||
f"Token\n{message['Token']}\n"
|
||||
f"TopicArn\n{message['TopicArn']}\n"
|
||||
f"Type\n{message['Type']}\n"
|
||||
)
|
||||
|
||||
# Verify signature
|
||||
signature = base64.b64decode(message['Signature'])
|
||||
# SNS Webhook for Bounce Handling (conditionally enabled)
|
||||
if ENABLE_SNS_WEBHOOKS:
|
||||
async def verify_sns_signature(request: Request) -> dict:
|
||||
"""Verify SNS message signature"""
|
||||
try:
|
||||
public_key.verify(
|
||||
signature,
|
||||
string_to_sign.encode('utf-8'),
|
||||
padding.PKCS1v15(),
|
||||
hashes.SHA1()
|
||||
)
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=400, detail=f"Invalid signature: {str(e)}")
|
||||
|
||||
return message
|
||||
|
||||
except json.JSONDecodeError:
|
||||
raise HTTPException(status_code=400, detail="Invalid JSON")
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=400, detail=f"Signature verification failed: {str(e)}")
|
||||
|
||||
async def process_bounce(bounce_data: dict):
|
||||
"""Process bounce notification and update database"""
|
||||
try:
|
||||
bounce_type = bounce_data.get('bounceType') # Permanent, Transient, Undetermined
|
||||
bounce_subtype = bounce_data.get('bounceSubType', '')
|
||||
timestamp_str = bounce_data.get('timestamp')
|
||||
feedback_id = bounce_data.get('feedbackId', '')
|
||||
|
||||
# Convert ISO 8601 timestamp to MySQL datetime format
|
||||
# SES sends: '2025-10-13T16:22:40.359Z'
|
||||
# MySQL needs: '2025-10-13 16:22:40'
|
||||
from datetime import datetime as dt
|
||||
timestamp = dt.fromisoformat(timestamp_str.replace('Z', '+00:00')).strftime('%Y-%m-%d %H:%M:%S')
|
||||
|
||||
bounced_recipients = bounce_data.get('bouncedRecipients', [])
|
||||
|
||||
with get_db() as conn:
|
||||
cursor = conn.cursor(dictionary=True)
|
||||
body = await request.body()
|
||||
print(f"SNS webhook received body: {body}")
|
||||
print(f"SNS webhook body length: {len(body)}")
|
||||
print(f"SNS webhook headers: {dict(request.headers)}")
|
||||
|
||||
for recipient in bounced_recipients:
|
||||
email = recipient.get('emailAddress')
|
||||
diagnostic_code = recipient.get('diagnosticCode', '')
|
||||
if not body:
|
||||
print("ERROR: Empty body received")
|
||||
raise HTTPException(status_code=400, detail="Empty request body")
|
||||
|
||||
message = json.loads(body.decode('utf-8'))
|
||||
print(f"SNS webhook parsed message type: {message.get('Type')}")
|
||||
|
||||
# For SubscriptionConfirmation and UnsubscribeConfirmation, we don't validate signature
|
||||
# AWS will send a URL to confirm
|
||||
if message.get('Type') in ['SubscriptionConfirmation', 'UnsubscribeConfirmation']:
|
||||
return message
|
||||
|
||||
# Get certificate URL and download certificate
|
||||
cert_url = message.get('SigningCertURL')
|
||||
if not cert_url:
|
||||
raise HTTPException(status_code=400, detail="Missing SigningCertURL")
|
||||
|
||||
# Verify certificate URL is from AWS
|
||||
parsed_url = urlparse(cert_url)
|
||||
if not parsed_url.hostname.endswith('.amazonaws.com'):
|
||||
raise HTTPException(status_code=400, detail="Invalid certificate URL")
|
||||
|
||||
# Download certificate
|
||||
async with httpx.AsyncClient() as client:
|
||||
cert_response = await client.get(cert_url)
|
||||
cert_response.raise_for_status()
|
||||
cert_pem = cert_response.content
|
||||
|
||||
# Load certificate and extract public key
|
||||
cert = x509.load_pem_x509_certificate(cert_pem, default_backend())
|
||||
public_key = cert.public_key()
|
||||
|
||||
# Build string to sign
|
||||
if message.get('Type') == 'Notification':
|
||||
string_to_sign = (
|
||||
f"Message\n{message['Message']}\n"
|
||||
f"MessageId\n{message['MessageId']}\n"
|
||||
)
|
||||
if 'Subject' in message:
|
||||
string_to_sign += f"Subject\n{message['Subject']}\n"
|
||||
string_to_sign += (
|
||||
f"Timestamp\n{message['Timestamp']}\n"
|
||||
f"TopicArn\n{message['TopicArn']}\n"
|
||||
f"Type\n{message['Type']}\n"
|
||||
)
|
||||
else:
|
||||
string_to_sign = (
|
||||
f"Message\n{message['Message']}\n"
|
||||
f"MessageId\n{message['MessageId']}\n"
|
||||
f"SubscribeURL\n{message['SubscribeURL']}\n"
|
||||
f"Timestamp\n{message['Timestamp']}\n"
|
||||
f"Token\n{message['Token']}\n"
|
||||
f"TopicArn\n{message['TopicArn']}\n"
|
||||
f"Type\n{message['Type']}\n"
|
||||
)
|
||||
|
||||
# Verify signature
|
||||
signature = base64.b64decode(message['Signature'])
|
||||
try:
|
||||
public_key.verify(
|
||||
signature,
|
||||
string_to_sign.encode('utf-8'),
|
||||
padding.PKCS1v15(),
|
||||
hashes.SHA1()
|
||||
)
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=400, detail=f"Invalid signature: {str(e)}")
|
||||
|
||||
return message
|
||||
|
||||
except json.JSONDecodeError:
|
||||
raise HTTPException(status_code=400, detail="Invalid JSON")
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=400, detail=f"Signature verification failed: {str(e)}")
|
||||
|
||||
async def process_bounce(bounce_data: dict):
|
||||
"""Process bounce notification and update database"""
|
||||
try:
|
||||
bounce_type = bounce_data.get('bounceType') # Permanent, Transient, Undetermined
|
||||
bounce_subtype = bounce_data.get('bounceSubType', '')
|
||||
timestamp_str = bounce_data.get('timestamp')
|
||||
feedback_id = bounce_data.get('feedbackId', '')
|
||||
|
||||
# Convert ISO 8601 timestamp to MySQL datetime format
|
||||
# SES sends: '2025-10-13T16:22:40.359Z'
|
||||
# MySQL needs: '2025-10-13 16:22:40'
|
||||
from datetime import datetime as dt
|
||||
timestamp = dt.fromisoformat(timestamp_str.replace('Z', '+00:00')).strftime('%Y-%m-%d %H:%M:%S')
|
||||
|
||||
bounced_recipients = bounce_data.get('bouncedRecipients', [])
|
||||
|
||||
with get_db() as conn:
|
||||
cursor = conn.cursor(dictionary=True)
|
||||
|
||||
if not email:
|
||||
continue
|
||||
|
||||
# Find member by email
|
||||
cursor.execute("SELECT member_id FROM members WHERE email = %s", (email,))
|
||||
member = cursor.fetchone()
|
||||
member_id = member['member_id'] if member else None
|
||||
|
||||
# Log the bounce
|
||||
cursor.execute("""
|
||||
INSERT INTO bounce_logs
|
||||
(member_id, email, bounce_type, bounce_subtype, diagnostic_code, timestamp, feedback_id)
|
||||
VALUES (%s, %s, %s, %s, %s, %s, %s)
|
||||
""", (member_id, email, bounce_type, bounce_subtype, diagnostic_code, timestamp, feedback_id))
|
||||
|
||||
# Update member bounce status
|
||||
if member_id:
|
||||
# Determine bounce status
|
||||
if bounce_type == 'Permanent':
|
||||
new_status = 'hard_bounce'
|
||||
# Deactivate member with hard bounce
|
||||
cursor.execute("""
|
||||
UPDATE members
|
||||
SET bounce_count = bounce_count + 1,
|
||||
last_bounce_at = %s,
|
||||
bounce_status = %s,
|
||||
active = 0
|
||||
WHERE member_id = %s
|
||||
""", (timestamp, new_status, member_id))
|
||||
elif bounce_type == 'Transient':
|
||||
# Check current bounce count
|
||||
cursor.execute("SELECT bounce_count, bounce_status FROM members WHERE member_id = %s", (member_id,))
|
||||
current = cursor.fetchone()
|
||||
|
||||
# If already hard bounced, don't change status
|
||||
if current and current['bounce_status'] != 'hard_bounce':
|
||||
new_count = current['bounce_count'] + 1
|
||||
# After 3 soft bounces, mark as soft_bounce status
|
||||
new_status = 'soft_bounce' if new_count >= 3 else 'clean'
|
||||
|
||||
for recipient in bounced_recipients:
|
||||
email = recipient.get('emailAddress')
|
||||
diagnostic_code = recipient.get('diagnosticCode', '')
|
||||
|
||||
if not email:
|
||||
continue
|
||||
|
||||
# Find member by email
|
||||
cursor.execute("SELECT member_id FROM members WHERE email = %s", (email,))
|
||||
member = cursor.fetchone()
|
||||
member_id = member['member_id'] if member else None
|
||||
|
||||
# Log the bounce
|
||||
cursor.execute("""
|
||||
INSERT INTO bounce_logs
|
||||
(member_id, email, bounce_type, bounce_subtype, diagnostic_code, timestamp, feedback_id)
|
||||
VALUES (%s, %s, %s, %s, %s, %s, %s)
|
||||
""", (member_id, email, bounce_type, bounce_subtype, diagnostic_code, timestamp, feedback_id))
|
||||
|
||||
# Update member bounce status
|
||||
if member_id:
|
||||
# Determine bounce status
|
||||
if bounce_type == 'Permanent':
|
||||
new_status = 'hard_bounce'
|
||||
# Deactivate member with hard bounce
|
||||
cursor.execute("""
|
||||
UPDATE members
|
||||
SET bounce_count = %s,
|
||||
SET bounce_count = bounce_count + 1,
|
||||
last_bounce_at = %s,
|
||||
bounce_status = %s
|
||||
bounce_status = %s,
|
||||
active = 0
|
||||
WHERE member_id = %s
|
||||
""", (new_count, timestamp, new_status, member_id))
|
||||
else: # Undetermined
|
||||
cursor.execute("""
|
||||
UPDATE members
|
||||
SET bounce_count = bounce_count + 1,
|
||||
last_bounce_at = %s
|
||||
WHERE member_id = %s
|
||||
""", (timestamp, member_id))
|
||||
""", (timestamp, new_status, member_id))
|
||||
elif bounce_type == 'Transient':
|
||||
# Check current bounce count
|
||||
cursor.execute("SELECT bounce_count, bounce_status FROM members WHERE member_id = %s", (member_id,))
|
||||
current = cursor.fetchone()
|
||||
|
||||
# If already hard bounced, don't change status
|
||||
if current and current['bounce_status'] != 'hard_bounce':
|
||||
new_count = current['bounce_count'] + 1
|
||||
# After 3 soft bounces, mark as soft_bounce status
|
||||
new_status = 'soft_bounce' if new_count >= 3 else 'clean'
|
||||
|
||||
cursor.execute("""
|
||||
UPDATE members
|
||||
SET bounce_count = %s,
|
||||
last_bounce_at = %s,
|
||||
bounce_status = %s
|
||||
WHERE member_id = %s
|
||||
""", (new_count, timestamp, new_status, member_id))
|
||||
else: # Undetermined
|
||||
cursor.execute("""
|
||||
UPDATE members
|
||||
SET bounce_count = bounce_count + 1,
|
||||
last_bounce_at = %s
|
||||
WHERE member_id = %s
|
||||
""", (timestamp, member_id))
|
||||
|
||||
conn.commit()
|
||||
cursor.close()
|
||||
|
||||
except Exception as e:
|
||||
print(f"✗ Error processing bounce: {str(e)}")
|
||||
print(f"Error type: {type(e).__name__}")
|
||||
print(f"Bounce data: {bounce_data}")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
raise
|
||||
|
||||
@app.post("/webhooks/sns", response_class=PlainTextResponse)
|
||||
async def sns_webhook(request: Request):
|
||||
"""Handle SNS notifications for bounces and complaints"""
|
||||
try:
|
||||
print(f"=== SNS Webhook Request ===")
|
||||
print(f"Headers: {dict(request.headers)}")
|
||||
print(f"Content-Type: {request.headers.get('content-type')}")
|
||||
print(f"User-Agent: {request.headers.get('user-agent')}")
|
||||
|
||||
conn.commit()
|
||||
# Verify SNS signature
|
||||
message = await verify_sns_signature(request)
|
||||
|
||||
print(f"Message Type: {message.get('Type')}")
|
||||
print(f"Message Keys: {list(message.keys())}")
|
||||
|
||||
message_type = message.get('Type')
|
||||
|
||||
# Handle subscription confirmation
|
||||
if message_type == 'SubscriptionConfirmation':
|
||||
subscribe_url = message.get('SubscribeURL')
|
||||
print(f"Subscription confirmation received, URL: {subscribe_url}")
|
||||
if subscribe_url:
|
||||
# Confirm subscription
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.get(subscribe_url)
|
||||
print(f"Subscription confirmation response: {response.status_code}")
|
||||
return "Subscription confirmed"
|
||||
|
||||
# Handle notification
|
||||
elif message_type == 'Notification':
|
||||
# Parse the message
|
||||
inner_message = message.get('Message', '{}')
|
||||
print(f"Inner message (first 500 chars): {inner_message[:500]}")
|
||||
notification = json.loads(inner_message)
|
||||
|
||||
# SES can send either 'notificationType' or 'eventType' depending on configuration
|
||||
notification_type = notification.get('notificationType') or notification.get('eventType')
|
||||
print(f"Notification type: {notification_type}")
|
||||
|
||||
if notification_type == 'Bounce':
|
||||
bounce = notification.get('bounce', {})
|
||||
print(f"\n✓ Processing Bounce")
|
||||
print(f" Bounce Type: {bounce.get('bounceType')}")
|
||||
print(f" Recipients: {[r.get('emailAddress') for r in bounce.get('bouncedRecipients', [])]}")
|
||||
await process_bounce(bounce)
|
||||
print(f" ✓ Bounce processed successfully")
|
||||
return "Bounce processed"
|
||||
|
||||
elif notification_type == 'Complaint':
|
||||
# We could also track complaints similarly to bounces
|
||||
print(f"\n✓ Complaint received")
|
||||
return "Complaint received"
|
||||
|
||||
print(f"=== End SNS Webhook Request ===")
|
||||
return "OK"
|
||||
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
print(f"SNS webhook error: {str(e)}")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
else:
|
||||
# Provide stub functions when SNS webhooks are disabled
|
||||
print("SNS webhooks disabled - bounce handling via email only")
|
||||
|
||||
# Bounce management endpoints (conditionally enabled)
|
||||
if ENABLE_BOUNCE_HANDLING:
|
||||
class BounceLog(BaseModel):
|
||||
bounce_id: int
|
||||
email: str
|
||||
bounce_type: str
|
||||
bounce_subtype: Optional[str] = None
|
||||
diagnostic_code: Optional[str] = None
|
||||
timestamp: datetime
|
||||
feedback_id: Optional[str] = None
|
||||
created_at: datetime
|
||||
|
||||
class MemberWithBounces(BaseModel):
|
||||
member_id: int
|
||||
name: str
|
||||
email: str
|
||||
active: bool
|
||||
bounce_count: int
|
||||
last_bounce_at: Optional[datetime] = None
|
||||
bounce_status: str
|
||||
|
||||
@app.get("/members/{member_id}/bounces", response_model=List[BounceLog])
|
||||
async def get_member_bounces(member_id: int, current_user: CurrentUser = require_read_access()):
|
||||
"""Get bounce history for a member"""
|
||||
with get_db() as conn:
|
||||
cursor = conn.cursor(dictionary=True)
|
||||
cursor.execute("""
|
||||
SELECT bounce_id, email, bounce_type, bounce_subtype, diagnostic_code,
|
||||
timestamp, feedback_id, created_at
|
||||
FROM bounce_logs
|
||||
WHERE member_id = %s
|
||||
ORDER BY timestamp DESC
|
||||
""", (member_id,))
|
||||
bounces = cursor.fetchall()
|
||||
cursor.close()
|
||||
|
||||
except Exception as e:
|
||||
print(f"✗ Error processing bounce: {str(e)}")
|
||||
print(f"Error type: {type(e).__name__}")
|
||||
print(f"Bounce data: {bounce_data}")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
raise
|
||||
return bounces
|
||||
|
||||
@app.post("/webhooks/sns", response_class=PlainTextResponse)
|
||||
async def sns_webhook(request: Request):
|
||||
"""Handle SNS notifications for bounces and complaints"""
|
||||
try:
|
||||
print(f"=== SNS Webhook Request ===")
|
||||
print(f"Headers: {dict(request.headers)}")
|
||||
print(f"Content-Type: {request.headers.get('content-type')}")
|
||||
print(f"User-Agent: {request.headers.get('user-agent')}")
|
||||
|
||||
# Verify SNS signature
|
||||
message = await verify_sns_signature(request)
|
||||
|
||||
print(f"Message Type: {message.get('Type')}")
|
||||
print(f"Message Keys: {list(message.keys())}")
|
||||
|
||||
message_type = message.get('Type')
|
||||
|
||||
# Handle subscription confirmation
|
||||
if message_type == 'SubscriptionConfirmation':
|
||||
subscribe_url = message.get('SubscribeURL')
|
||||
print(f"Subscription confirmation received, URL: {subscribe_url}")
|
||||
if subscribe_url:
|
||||
# Confirm subscription
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.get(subscribe_url)
|
||||
print(f"Subscription confirmation response: {response.status_code}")
|
||||
return "Subscription confirmed"
|
||||
|
||||
# Handle notification
|
||||
elif message_type == 'Notification':
|
||||
# Parse the message
|
||||
inner_message = message.get('Message', '{}')
|
||||
print(f"Inner message (first 500 chars): {inner_message[:500]}")
|
||||
notification = json.loads(inner_message)
|
||||
@app.patch("/members/{member_id}/bounce-status")
|
||||
async def reset_bounce_status(member_id: int, current_user: CurrentUser = require_write_access()):
|
||||
"""Reset bounce status for a member (e.g., after email address is corrected)"""
|
||||
with get_db() as conn:
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("""
|
||||
UPDATE members
|
||||
SET bounce_count = 0,
|
||||
last_bounce_at = NULL,
|
||||
bounce_status = 'clean'
|
||||
WHERE member_id = %s
|
||||
""", (member_id,))
|
||||
conn.commit()
|
||||
|
||||
# SES can send either 'notificationType' or 'eventType' depending on configuration
|
||||
notification_type = notification.get('notificationType') or notification.get('eventType')
|
||||
print(f"Notification type: {notification_type}")
|
||||
if cursor.rowcount == 0:
|
||||
raise HTTPException(status_code=404, detail="Member not found")
|
||||
|
||||
if notification_type == 'Bounce':
|
||||
bounce = notification.get('bounce', {})
|
||||
print(f"\n✓ Processing Bounce")
|
||||
print(f" Bounce Type: {bounce.get('bounceType')}")
|
||||
print(f" Recipients: {[r.get('emailAddress') for r in bounce.get('bouncedRecipients', [])]}")
|
||||
await process_bounce(bounce)
|
||||
print(f" ✓ Bounce processed successfully")
|
||||
return "Bounce processed"
|
||||
|
||||
elif notification_type == 'Complaint':
|
||||
# We could also track complaints similarly to bounces
|
||||
print(f"\n✓ Complaint received")
|
||||
return "Complaint received"
|
||||
|
||||
print(f"=== End SNS Webhook Request ===")
|
||||
return "OK"
|
||||
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
print(f"SNS webhook error: {str(e)}")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
cursor.close()
|
||||
return {"message": "Bounce status reset successfully"}
|
||||
|
||||
# Bounce management endpoints
|
||||
class BounceLog(BaseModel):
|
||||
bounce_id: int
|
||||
email: str
|
||||
bounce_type: str
|
||||
bounce_subtype: Optional[str] = None
|
||||
diagnostic_code: Optional[str] = None
|
||||
timestamp: datetime
|
||||
feedback_id: Optional[str] = None
|
||||
created_at: datetime
|
||||
else:
|
||||
# When bounce handling is disabled, provide stub endpoints that return appropriate responses
|
||||
@app.get("/members/{member_id}/bounces")
|
||||
async def get_member_bounces_disabled(member_id: int, current_user: CurrentUser = require_read_access()):
|
||||
"""Bounce history disabled - returns empty list"""
|
||||
return []
|
||||
|
||||
class MemberWithBounces(BaseModel):
|
||||
member_id: int
|
||||
name: str
|
||||
email: str
|
||||
active: bool
|
||||
bounce_count: int
|
||||
last_bounce_at: Optional[datetime] = None
|
||||
bounce_status: str
|
||||
|
||||
@app.get("/members/{member_id}/bounces", response_model=List[BounceLog])
|
||||
async def get_member_bounces(member_id: int, current_user: CurrentUser = require_read_access()):
|
||||
"""Get bounce history for a member"""
|
||||
with get_db() as conn:
|
||||
cursor = conn.cursor(dictionary=True)
|
||||
cursor.execute("""
|
||||
SELECT bounce_id, email, bounce_type, bounce_subtype, diagnostic_code,
|
||||
timestamp, feedback_id, created_at
|
||||
FROM bounce_logs
|
||||
WHERE member_id = %s
|
||||
ORDER BY timestamp DESC
|
||||
""", (member_id,))
|
||||
bounces = cursor.fetchall()
|
||||
cursor.close()
|
||||
return bounces
|
||||
|
||||
@app.patch("/members/{member_id}/bounce-status")
|
||||
async def reset_bounce_status(member_id: int, current_user: CurrentUser = require_write_access()):
|
||||
"""Reset bounce status for a member (e.g., after email address is corrected)"""
|
||||
with get_db() as conn:
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("""
|
||||
UPDATE members
|
||||
SET bounce_count = 0,
|
||||
last_bounce_at = NULL,
|
||||
bounce_status = 'clean'
|
||||
WHERE member_id = %s
|
||||
""", (member_id,))
|
||||
conn.commit()
|
||||
|
||||
if cursor.rowcount == 0:
|
||||
raise HTTPException(status_code=404, detail="Member not found")
|
||||
|
||||
cursor.close()
|
||||
return {"message": "Bounce status reset successfully"}
|
||||
@app.patch("/members/{member_id}/bounce-status")
|
||||
async def reset_bounce_status_disabled(member_id: int, current_user: CurrentUser = require_write_access()):
|
||||
"""Bounce status reset disabled"""
|
||||
raise HTTPException(status_code=501, detail="Bounce handling is disabled")
|
||||
|
||||
if __name__ == "__main__":
|
||||
import uvicorn
|
||||
|
||||
Reference in New Issue
Block a user