from typing import List, Optional, Dict, Any from datetime import datetime from sqlalchemy.orm import Session from ..models.models import EmailBounce, BounceType from ..core.database import get_db class BounceService: """Service for managing email bounce tracking and prevention""" def __init__(self): pass def record_bounce( self, email: str, bounce_type: BounceType, bounce_reason: Optional[str] = None, smtp2go_message_id: Optional[str] = None, bounce_date: Optional[datetime] = None, db: Session = None ) -> EmailBounce: """ Record an email bounce in the database Args: email: The email address that bounced bounce_type: Type of bounce (hard, soft, complaint, unsubscribe) bounce_reason: Reason for the bounce (optional) smtp2go_message_id: SMTP2GO message ID (optional) bounce_date: When the bounce occurred (defaults to now) db: Database session Returns: EmailBounce: The created bounce record """ if db is None: db = next(get_db()) if bounce_date is None: bounce_date = datetime.utcnow() # Check if bounce already exists for this email and type existing_bounce = db.query(EmailBounce).filter( EmailBounce.email == email, EmailBounce.bounce_type == bounce_type, EmailBounce.is_active == True ).first() if existing_bounce: # Update existing bounce with new information if bounce_reason: existing_bounce.bounce_reason = bounce_reason if smtp2go_message_id: existing_bounce.smtp2go_message_id = smtp2go_message_id existing_bounce.bounce_date = bounce_date existing_bounce.updated_at = datetime.utcnow() db.commit() db.refresh(existing_bounce) return existing_bounce # Create new bounce record bounce = EmailBounce( email=email, bounce_type=bounce_type, bounce_reason=bounce_reason, smtp2go_message_id=smtp2go_message_id, bounce_date=bounce_date, is_active=True ) db.add(bounce) db.commit() db.refresh(bounce) return bounce def is_email_bounced(self, email: str, db: Session = None) -> bool: """ Check if an email address has any active bounces Args: email: Email address to check db: Database session Returns: bool: True if email has active bounces """ if db is None: db = next(get_db()) bounce = db.query(EmailBounce).filter( EmailBounce.email == email, EmailBounce.is_active == True ).first() return bounce is not None def get_bounce_history(self, email: str, db: Session = None) -> List[EmailBounce]: """ Get bounce history for an email address Args: email: Email address to check db: Database session Returns: List[EmailBounce]: List of bounce records """ if db is None: db = next(get_db()) return db.query(EmailBounce).filter( EmailBounce.email == email ).order_by(EmailBounce.bounce_date.desc()).all() def deactivate_bounce(self, bounce_id: int, db: Session = None) -> bool: """ Deactivate a bounce record (mark as resolved) Args: bounce_id: ID of the bounce record db: Database session Returns: bool: True if successfully deactivated """ if db is None: db = next(get_db()) bounce = db.query(EmailBounce).filter(EmailBounce.id == bounce_id).first() if bounce: bounce.is_active = False bounce.updated_at = datetime.utcnow() db.commit() return True return False def process_smtp2go_webhook(self, webhook_data: Dict[str, Any], db: Session = None) -> List[EmailBounce]: """ Process webhook data from SMTP2GO bounce notifications Args: webhook_data: Webhook payload from SMTP2GO db: Database session Returns: List[EmailBounce]: List of created/updated bounce records """ if db is None: db = next(get_db()) bounces_created = [] # SMTP2GO webhook structure may vary, but typically includes: # - email: recipient email # - event: 'bounced', 'complained', etc. # - reason: bounce reason # - message_id: SMTP2GO message ID # - timestamp: when it occurred events = webhook_data.get('events', []) if not isinstance(events, list): events = [webhook_data] # Single event format for event in events: email = event.get('email') or event.get('recipient') event_type = event.get('event', '').lower() reason = event.get('reason') or event.get('bounce_reason') message_id = event.get('message_id') or event.get('smtp2go_message_id') timestamp = event.get('timestamp') if not email: continue # Map SMTP2GO event types to our bounce types bounce_type = None if event_type in ['bounced', 'hard_bounce', 'permanent_bounce']: bounce_type = BounceType.HARD elif event_type in ['soft_bounce', 'transient_bounce']: bounce_type = BounceType.SOFT elif event_type in ['complained', 'spam_complaint']: bounce_type = BounceType.COMPLAINT elif event_type in ['unsubscribed', 'unsubscribe']: bounce_type = BounceType.UNSUBSCRIBE if bounce_type: bounce_date = None if timestamp: try: # SMTP2GO timestamps are typically Unix timestamps if isinstance(timestamp, (int, float)): bounce_date = datetime.fromtimestamp(timestamp) elif isinstance(timestamp, str): bounce_date = datetime.fromisoformat(timestamp.replace('Z', '+00:00')) except (ValueError, TypeError): pass bounce = self.record_bounce( email=email, bounce_type=bounce_type, bounce_reason=reason, smtp2go_message_id=message_id, bounce_date=bounce_date, db=db ) bounces_created.append(bounce) return bounces_created def get_bounce_statistics(self, db: Session = None) -> Dict[str, Any]: """ Get bounce statistics for reporting Args: db: Database session Returns: Dict[str, Any]: Bounce statistics """ if db is None: db = next(get_db()) total_bounces = db.query(EmailBounce).count() active_bounces = db.query(EmailBounce).filter(EmailBounce.is_active == True).count() bounce_type_counts = {} for bounce_type in BounceType: count = db.query(EmailBounce).filter( EmailBounce.bounce_type == bounce_type, EmailBounce.is_active == True ).count() bounce_type_counts[bounce_type.value] = count return { 'total_bounces': total_bounces, 'active_bounces': active_bounces, 'bounce_types': bounce_type_counts } def cleanup_old_bounces(self, days_old: int = 365, db: Session = None) -> int: """ Deactivate bounce records older than specified days (for soft bounces that may be resolved) Args: days_old: Number of days after which to deactivate bounces db: Database session Returns: int: Number of bounces deactivated """ if db is None: db = next(get_db()) from datetime import timedelta cutoff_date = datetime.utcnow() - timedelta(days=days_old) # Only deactivate soft bounces, keep hard bounces and complaints active result = db.query(EmailBounce).filter( EmailBounce.bounce_type == BounceType.SOFT, EmailBounce.is_active == True, EmailBounce.bounce_date < cutoff_date ).update({'is_active': False, 'updated_at': datetime.utcnow()}) db.commit() return result # Create a singleton instance bounce_service = BounceService()