348 lines
14 KiB
Python
348 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Email-based bounce processing script for Postfix
|
|
Parses bounce emails and updates the database with bounce information
|
|
|
|
This script is called by Postfix when bounce emails are received.
|
|
It reads the email from stdin, parses it for bounce information,
|
|
and updates the database accordingly.
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import re
|
|
import email
|
|
from email.message import Message
|
|
import logging
|
|
from datetime import datetime
|
|
from typing import List, Tuple, Optional, Dict
|
|
import pymysql
|
|
|
|
# Configure logging
|
|
log_file = '/var/log/bounce-processor.log'
|
|
log_handlers = [logging.StreamHandler()]
|
|
|
|
# Add file handler if we can write to the log directory
|
|
try:
|
|
log_handlers.append(logging.FileHandler(log_file))
|
|
except (PermissionError, FileNotFoundError):
|
|
pass # Just use stdout if we can't write to log file
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
|
handlers=log_handlers
|
|
)
|
|
logger = logging.getLogger('bounce-processor')
|
|
|
|
# Database configuration from environment
|
|
MYSQL_HOST = os.getenv('MYSQL_HOST', 'mysql')
|
|
MYSQL_PORT = int(os.getenv('MYSQL_PORT', 3306))
|
|
MYSQL_DATABASE = os.getenv('MYSQL_DATABASE', 'maillist')
|
|
MYSQL_USER = os.getenv('MYSQL_USER', 'maillist')
|
|
MYSQL_PASSWORD = os.getenv('MYSQL_PASSWORD', '')
|
|
|
|
class BounceProcessor:
|
|
"""Processes bounce emails and updates the database"""
|
|
|
|
def __init__(self):
|
|
self.bounce_patterns = self._compile_bounce_patterns()
|
|
|
|
def _compile_bounce_patterns(self) -> List[re.Pattern]:
|
|
"""Compile regex patterns for detecting bounce information"""
|
|
patterns = [
|
|
# Standard bounce formats
|
|
re.compile(r'550.*?([a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,})', re.IGNORECASE),
|
|
re.compile(r'554.*?([a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,})', re.IGNORECASE),
|
|
re.compile(r'553.*?([a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,})', re.IGNORECASE),
|
|
re.compile(r'552.*?([a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,})', re.IGNORECASE),
|
|
re.compile(r'551.*?([a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,})', re.IGNORECASE),
|
|
|
|
# Delivery Status Notification (DSN) format
|
|
re.compile(r'Final-Recipient:.*?([a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,})', re.IGNORECASE),
|
|
re.compile(r'Original-Recipient:.*?([a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,})', re.IGNORECASE),
|
|
|
|
# Common bounce message patterns
|
|
re.compile(r'user.*?([a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}).*?unknown', re.IGNORECASE),
|
|
re.compile(r'([a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}).*?does not exist', re.IGNORECASE),
|
|
re.compile(r'([a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}).*?not found', re.IGNORECASE),
|
|
re.compile(r'([a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}).*?mailbox.*?unavailable', re.IGNORECASE),
|
|
re.compile(r'([a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}).*?recipient.*?rejected', re.IGNORECASE),
|
|
|
|
# Generic email extraction (fallback)
|
|
re.compile(r'([a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,})', re.IGNORECASE),
|
|
]
|
|
return patterns
|
|
|
|
def parse_bounce_email(self, email_content: str) -> List[Dict]:
|
|
"""Parse bounce email and extract bounce information"""
|
|
try:
|
|
# Parse the email
|
|
msg = email.message_from_string(email_content)
|
|
bounces = []
|
|
|
|
# Get the email body
|
|
body = self._get_email_body(msg)
|
|
if not body:
|
|
logger.warning("No email body found")
|
|
return bounces
|
|
|
|
logger.info(f"Processing email body (first 500 chars): {body[:500]}")
|
|
|
|
# Extract bounced addresses
|
|
bounced_addresses = self._extract_bounced_addresses(body)
|
|
|
|
# Determine bounce type and create bounce records
|
|
for address in bounced_addresses:
|
|
bounce_info = self._analyze_bounce(body, address)
|
|
if bounce_info:
|
|
bounces.append(bounce_info)
|
|
|
|
return bounces
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error parsing bounce email: {str(e)}")
|
|
return []
|
|
|
|
def _get_email_body(self, msg: Message) -> Optional[str]:
|
|
"""Extract the email body from the message"""
|
|
body = ""
|
|
|
|
if msg.is_multipart():
|
|
for part in msg.walk():
|
|
content_type = part.get_content_type()
|
|
if content_type in ['text/plain', 'text/html']:
|
|
payload = part.get_payload(decode=True)
|
|
if payload:
|
|
body += payload.decode('utf-8', errors='ignore') + "\n"
|
|
else:
|
|
payload = msg.get_payload(decode=True)
|
|
if payload:
|
|
body = payload.decode('utf-8', errors='ignore')
|
|
|
|
return body.strip() if body else None
|
|
|
|
def _extract_bounced_addresses(self, body: str) -> List[str]:
|
|
"""Extract bounced email addresses from the bounce message"""
|
|
addresses = []
|
|
|
|
for pattern in self.bounce_patterns:
|
|
matches = pattern.findall(body)
|
|
for match in matches:
|
|
email_addr = match.strip().lower()
|
|
if self._is_valid_email(email_addr) and email_addr not in addresses:
|
|
# Skip our own addresses
|
|
if not email_addr.endswith('@lists.sasalliance.org'):
|
|
addresses.append(email_addr)
|
|
|
|
logger.info(f"Extracted addresses: {addresses}")
|
|
return addresses
|
|
|
|
def _is_valid_email(self, email_addr: str) -> bool:
|
|
"""Validate email address format"""
|
|
email_pattern = re.compile(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$')
|
|
return bool(email_pattern.match(email_addr))
|
|
|
|
def _analyze_bounce(self, body: str, email_addr: str) -> Optional[Dict]:
|
|
"""Analyze bounce message to determine bounce type and details"""
|
|
bounce_info = {
|
|
'email': email_addr,
|
|
'bounce_type': 'Undetermined',
|
|
'bounce_subtype': '',
|
|
'diagnostic_code': '',
|
|
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
|
}
|
|
|
|
# Analyze bounce type based on SMTP codes and message content
|
|
if re.search(r'5[0-9]{2}', body): # 5xx codes are permanent failures
|
|
bounce_info['bounce_type'] = 'Permanent'
|
|
|
|
if re.search(r'550|551|553', body):
|
|
bounce_info['bounce_subtype'] = 'General'
|
|
elif re.search(r'552', body):
|
|
bounce_info['bounce_subtype'] = 'MailboxFull'
|
|
elif re.search(r'554', body):
|
|
bounce_info['bounce_subtype'] = 'MessageTooLarge'
|
|
|
|
elif re.search(r'4[0-9]{2}', body): # 4xx codes are temporary failures
|
|
bounce_info['bounce_type'] = 'Transient'
|
|
|
|
if re.search(r'450|451|452', body):
|
|
bounce_info['bounce_subtype'] = 'General'
|
|
elif re.search(r'452', body):
|
|
bounce_info['bounce_subtype'] = 'MailboxFull'
|
|
|
|
# Extract diagnostic code
|
|
smtp_code_match = re.search(r'([45][0-9]{2}.*?)[\r\n]', body)
|
|
if smtp_code_match:
|
|
bounce_info['diagnostic_code'] = smtp_code_match.group(1).strip()[:500] # Limit length
|
|
|
|
return bounce_info
|
|
|
|
def update_database(self, bounces: List[Dict]) -> None:
|
|
"""Update the database with bounce information"""
|
|
if not bounces:
|
|
logger.info("No bounces to process")
|
|
return
|
|
|
|
try:
|
|
connection = pymysql.connect(
|
|
host=MYSQL_HOST,
|
|
port=MYSQL_PORT,
|
|
database=MYSQL_DATABASE,
|
|
user=MYSQL_USER,
|
|
password=MYSQL_PASSWORD,
|
|
cursorclass=pymysql.cursors.DictCursor
|
|
)
|
|
|
|
cursor = connection.cursor()
|
|
|
|
for bounce in bounces:
|
|
try:
|
|
email_addr = bounce['email']
|
|
|
|
# Find member by email
|
|
cursor.execute("SELECT member_id FROM members WHERE email = %s", (email_addr,))
|
|
member = cursor.fetchone()
|
|
member_id = member['member_id'] if member else None
|
|
|
|
# Log the bounce
|
|
cursor.execute("""
|
|
INSERT INTO bounce_logs
|
|
(member_id, email, bounce_type, bounce_subtype, diagnostic_code, timestamp, created_at)
|
|
VALUES (%s, %s, %s, %s, %s, %s, NOW())
|
|
""", (member_id, email_addr, bounce['bounce_type'], bounce['bounce_subtype'],
|
|
bounce['diagnostic_code'], bounce['timestamp']))
|
|
|
|
logger.info(f"Logged bounce for {email_addr}: {bounce['bounce_type']}")
|
|
|
|
# Update member bounce status if member exists
|
|
if member_id:
|
|
if bounce['bounce_type'] == 'Permanent':
|
|
# Hard bounce - deactivate member
|
|
cursor.execute("""
|
|
UPDATE members
|
|
SET bounce_count = bounce_count + 1,
|
|
last_bounce_at = %s,
|
|
bounce_status = 'hard_bounce',
|
|
active = 0
|
|
WHERE member_id = %s
|
|
""", (bounce['timestamp'], member_id))
|
|
logger.info(f"Deactivated member {email_addr} due to hard bounce")
|
|
|
|
elif bounce['bounce_type'] == 'Transient':
|
|
# Soft bounce - increment counter and check threshold
|
|
cursor.execute("SELECT bounce_count, bounce_status FROM members WHERE member_id = %s", (member_id,))
|
|
current = cursor.fetchone()
|
|
|
|
if current and current['bounce_status'] != 'hard_bounce':
|
|
new_count = current['bounce_count'] + 1
|
|
new_status = 'soft_bounce' if new_count >= 3 else 'clean'
|
|
|
|
cursor.execute("""
|
|
UPDATE members
|
|
SET bounce_count = %s,
|
|
last_bounce_at = %s,
|
|
bounce_status = %s
|
|
WHERE member_id = %s
|
|
""", (new_count, bounce['timestamp'], new_status, member_id))
|
|
|
|
logger.info(f"Updated member {email_addr} bounce count to {new_count}, status: {new_status}")
|
|
else:
|
|
# Undetermined - just increment counter
|
|
cursor.execute("""
|
|
UPDATE members
|
|
SET bounce_count = bounce_count + 1,
|
|
last_bounce_at = %s
|
|
WHERE member_id = %s
|
|
""", (bounce['timestamp'], member_id))
|
|
|
|
logger.info(f"Updated member {email_addr} bounce count (undetermined)")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Database error processing bounce for {bounce['email']}: {str(e)}")
|
|
continue
|
|
|
|
connection.commit()
|
|
cursor.close()
|
|
connection.close()
|
|
|
|
logger.info(f"Successfully processed {len(bounces)} bounces")
|
|
|
|
except pymysql.Error as e:
|
|
logger.error(f"Database connection error: {str(e)}")
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error updating database: {str(e)}")
|
|
|
|
def main():
|
|
"""Main function - reads email from stdin and processes bounces"""
|
|
try:
|
|
logger.info("Starting bounce processing")
|
|
|
|
# Check if we're in test mode
|
|
test_mode = len(sys.argv) > 1 and sys.argv[1] == '--test'
|
|
|
|
if test_mode:
|
|
# Test mode - use sample bounce email
|
|
email_content = """From: Mail Delivery Subsystem <MAILER-DAEMON@example.com>
|
|
To: bounces@lists.sasalliance.org
|
|
Subject: Delivery Status Notification (Failure)
|
|
Content-Type: multipart/report; report-type=delivery-status; boundary="boundary123"
|
|
|
|
--boundary123
|
|
Content-Type: text/plain
|
|
|
|
This is a test bounce message.
|
|
|
|
The following address(es) failed:
|
|
testuser@example.com
|
|
SMTP error from remote mail server after RCPT TO:<testuser@example.com>:
|
|
550 5.1.1 User unknown
|
|
|
|
--boundary123
|
|
Content-Type: message/delivery-status
|
|
|
|
Reporting-MTA: dns; mail.example.com
|
|
Received-From-MTA: dns; localhost
|
|
|
|
Final-Recipient: rfc822; testuser@example.com
|
|
Action: failed
|
|
Status: 5.1.1
|
|
Diagnostic-Code: smtp; 550 5.1.1 User unknown
|
|
|
|
--boundary123--
|
|
"""
|
|
logger.info("Running in test mode with sample bounce email")
|
|
else:
|
|
# Read email from stdin
|
|
email_content = sys.stdin.read()
|
|
|
|
if not email_content.strip():
|
|
logger.warning("No email content received")
|
|
return
|
|
|
|
logger.info(f"Received email content ({len(email_content)} bytes)")
|
|
|
|
# Process the bounce
|
|
processor = BounceProcessor()
|
|
bounces = processor.parse_bounce_email(email_content)
|
|
|
|
if test_mode:
|
|
logger.info(f"Test mode - would process {len(bounces)} bounce(s):")
|
|
for bounce in bounces:
|
|
logger.info(f" {bounce}")
|
|
else:
|
|
if bounces:
|
|
processor.update_database(bounces)
|
|
logger.info(f"Processed {len(bounces)} bounce(s)")
|
|
else:
|
|
logger.info("No bounces detected in email")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in main: {str(e)}")
|
|
import traceback
|
|
logger.error(f"Traceback: {traceback.format_exc()}")
|
|
sys.exit(1)
|
|
|
|
if __name__ == "__main__":
|
|
main() |