Files
sasa-maillist/api/main.py
2025-10-12 20:55:13 +00:00

371 lines
13 KiB
Python

"""
Mailing List Management API
FastAPI-based REST API for managing mailing lists and members
"""
from fastapi import FastAPI, HTTPException, Depends, Header
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel, EmailStr
from typing import List, Optional
import mysql.connector
from mysql.connector import Error
import os
from contextlib import contextmanager
# Configuration
API_TOKEN = os.getenv('API_TOKEN', 'change-this-token')
MYSQL_HOST = os.getenv('MYSQL_HOST', 'mysql')
MYSQL_PORT = int(os.getenv('MYSQL_PORT', 3306))
MYSQL_DATABASE = os.getenv('MYSQL_DATABASE', 'maillist')
MYSQL_USER = os.getenv('MYSQL_USER', 'maillist')
MYSQL_PASSWORD = os.getenv('MYSQL_PASSWORD', '')
# FastAPI app
app = FastAPI(
title="Mailing List Manager API",
description="API for managing mailing lists and members",
version="1.0.0"
)
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # In production, specify your frontend domain
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
security = HTTPBearer()
# Database connection
@contextmanager
def get_db():
"""Database connection context manager"""
connection = None
try:
connection = mysql.connector.connect(
host=MYSQL_HOST,
port=MYSQL_PORT,
database=MYSQL_DATABASE,
user=MYSQL_USER,
password=MYSQL_PASSWORD
)
yield connection
except Error as e:
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}")
finally:
if connection and connection.is_connected():
connection.close()
# Authentication
def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""Verify API token"""
if credentials.credentials != API_TOKEN:
raise HTTPException(status_code=401, detail="Invalid authentication token")
return credentials.credentials
# Pydantic models
class MailingList(BaseModel):
list_id: Optional[int] = None
list_name: str
list_email: EmailStr
description: Optional[str] = None
active: bool = True
class MailingListUpdate(BaseModel):
list_name: Optional[str] = None
description: Optional[str] = None
active: Optional[bool] = None
class Member(BaseModel):
member_id: Optional[int] = None
name: str
email: EmailStr
active: bool = True
class MemberUpdate(BaseModel):
name: Optional[str] = None
active: Optional[bool] = None
class Subscription(BaseModel):
list_email: EmailStr
member_email: EmailStr
active: bool = True
# Routes
@app.get("/")
async def root():
"""API information"""
return {
"name": "Mailing List Manager API",
"version": "1.0.0",
"status": "running"
}
@app.get("/health")
async def health():
"""Health check endpoint"""
try:
with get_db() as conn:
cursor = conn.cursor()
cursor.execute("SELECT 1")
cursor.fetchone()
cursor.close()
return {"status": "healthy", "database": "connected"}
except Exception as e:
raise HTTPException(status_code=503, detail=f"Unhealthy: {str(e)}")
# Mailing Lists endpoints
@app.get("/lists", response_model=List[MailingList])
async def get_lists(token: str = Depends(verify_token)):
"""Get all mailing lists"""
with get_db() as conn:
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM lists ORDER BY list_name")
lists = cursor.fetchall()
cursor.close()
return lists
@app.get("/lists/{list_id}", response_model=MailingList)
async def get_list(list_id: int, token: str = Depends(verify_token)):
"""Get a specific mailing list"""
with get_db() as conn:
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM lists WHERE list_id = %s", (list_id,))
mailing_list = cursor.fetchone()
cursor.close()
if not mailing_list:
raise HTTPException(status_code=404, detail="List not found")
return mailing_list
@app.post("/lists", response_model=MailingList, status_code=201)
async def create_list(mailing_list: MailingList, token: str = Depends(verify_token)):
"""Create a new mailing list"""
with get_db() as conn:
cursor = conn.cursor()
try:
cursor.execute(
"INSERT INTO lists (list_name, list_email, description, active) VALUES (%s, %s, %s, %s)",
(mailing_list.list_name, mailing_list.list_email, mailing_list.description, mailing_list.active)
)
conn.commit()
mailing_list.list_id = cursor.lastrowid
cursor.close()
return mailing_list
except Error as e:
raise HTTPException(status_code=400, detail=f"Failed to create list: {str(e)}")
@app.patch("/lists/{list_id}", response_model=MailingList)
async def update_list(list_id: int, updates: MailingListUpdate, token: str = Depends(verify_token)):
"""Update a mailing list"""
with get_db() as conn:
cursor = conn.cursor(dictionary=True)
# Build update query dynamically
update_fields = []
values = []
if updates.list_name is not None:
update_fields.append("list_name = %s")
values.append(updates.list_name)
if updates.description is not None:
update_fields.append("description = %s")
values.append(updates.description)
if updates.active is not None:
update_fields.append("active = %s")
values.append(updates.active)
if not update_fields:
raise HTTPException(status_code=400, detail="No fields to update")
values.append(list_id)
query = f"UPDATE lists SET {', '.join(update_fields)} WHERE list_id = %s"
cursor.execute(query, values)
conn.commit()
# Return updated list
cursor.execute("SELECT * FROM lists WHERE list_id = %s", (list_id,))
updated_list = cursor.fetchone()
cursor.close()
if not updated_list:
raise HTTPException(status_code=404, detail="List not found")
return updated_list
@app.delete("/lists/{list_id}", status_code=204)
async def delete_list(list_id: int, token: str = Depends(verify_token)):
"""Delete a mailing list"""
with get_db() as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM lists WHERE list_id = %s", (list_id,))
conn.commit()
if cursor.rowcount == 0:
raise HTTPException(status_code=404, detail="List not found")
cursor.close()
# Members endpoints
@app.get("/members", response_model=List[Member])
async def get_members(token: str = Depends(verify_token)):
"""Get all members"""
with get_db() as conn:
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM members ORDER BY name")
members = cursor.fetchall()
cursor.close()
return members
@app.get("/members/{member_id}", response_model=Member)
async def get_member(member_id: int, token: str = Depends(verify_token)):
"""Get a specific member"""
with get_db() as conn:
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM members WHERE member_id = %s", (member_id,))
member = cursor.fetchone()
cursor.close()
if not member:
raise HTTPException(status_code=404, detail="Member not found")
return member
@app.post("/members", response_model=Member, status_code=201)
async def create_member(member: Member, token: str = Depends(verify_token)):
"""Create a new member"""
with get_db() as conn:
cursor = conn.cursor()
try:
cursor.execute(
"INSERT INTO members (name, email, active) VALUES (%s, %s, %s)",
(member.name, member.email, member.active)
)
conn.commit()
member.member_id = cursor.lastrowid
cursor.close()
return member
except Error as e:
raise HTTPException(status_code=400, detail=f"Failed to create member: {str(e)}")
@app.patch("/members/{member_id}", response_model=Member)
async def update_member(member_id: int, updates: MemberUpdate, token: str = Depends(verify_token)):
"""Update a member"""
with get_db() as conn:
cursor = conn.cursor(dictionary=True)
update_fields = []
values = []
if updates.name is not None:
update_fields.append("name = %s")
values.append(updates.name)
if updates.active is not None:
update_fields.append("active = %s")
values.append(updates.active)
if not update_fields:
raise HTTPException(status_code=400, detail="No fields to update")
values.append(member_id)
query = f"UPDATE members SET {', '.join(update_fields)} WHERE member_id = %s"
cursor.execute(query, values)
conn.commit()
cursor.execute("SELECT * FROM members WHERE member_id = %s", (member_id,))
updated_member = cursor.fetchone()
cursor.close()
if not updated_member:
raise HTTPException(status_code=404, detail="Member not found")
return updated_member
@app.delete("/members/{member_id}", status_code=204)
async def delete_member(member_id: int, token: str = Depends(verify_token)):
"""Delete a member"""
with get_db() as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM members WHERE member_id = %s", (member_id,))
conn.commit()
if cursor.rowcount == 0:
raise HTTPException(status_code=404, detail="Member not found")
cursor.close()
# Subscription endpoints
@app.get("/lists/{list_id}/members", response_model=List[Member])
async def get_list_members(list_id: int, token: str = Depends(verify_token)):
"""Get all members of a specific list"""
with get_db() as conn:
cursor = conn.cursor(dictionary=True)
cursor.execute("""
SELECT m.*
FROM members m
JOIN list_members lm ON m.member_id = lm.member_id
WHERE lm.list_id = %s AND lm.active = 1
ORDER BY m.name
""", (list_id,))
members = cursor.fetchall()
cursor.close()
return members
@app.post("/subscriptions", status_code=201)
async def subscribe_member(subscription: Subscription, token: str = Depends(verify_token)):
"""Subscribe a member to a list"""
with get_db() as conn:
cursor = conn.cursor()
try:
# Get list_id and member_id
cursor.execute("SELECT list_id FROM lists WHERE list_email = %s", (subscription.list_email,))
list_result = cursor.fetchone()
if not list_result:
raise HTTPException(status_code=404, detail="List not found")
cursor.execute("SELECT member_id FROM members WHERE email = %s", (subscription.member_email,))
member_result = cursor.fetchone()
if not member_result:
raise HTTPException(status_code=404, detail="Member not found")
list_id = list_result[0]
member_id = member_result[0]
# Insert subscription
cursor.execute(
"INSERT INTO list_members (list_id, member_id, active) VALUES (%s, %s, %s)",
(list_id, member_id, subscription.active)
)
conn.commit()
cursor.close()
return {"message": "Subscription created", "list_email": subscription.list_email, "member_email": subscription.member_email}
except Error as e:
if "Duplicate entry" in str(e):
raise HTTPException(status_code=400, detail="Member already subscribed to this list")
raise HTTPException(status_code=400, detail=f"Failed to create subscription: {str(e)}")
@app.delete("/subscriptions")
async def unsubscribe_member(list_email: EmailStr, member_email: EmailStr, token: str = Depends(verify_token)):
"""Unsubscribe a member from a list"""
with get_db() as conn:
cursor = conn.cursor()
cursor.execute("""
DELETE lm FROM list_members lm
JOIN lists l ON lm.list_id = l.list_id
JOIN members m ON lm.member_id = m.member_id
WHERE l.list_email = %s AND m.email = %s
""", (list_email, member_email))
conn.commit()
if cursor.rowcount == 0:
raise HTTPException(status_code=404, detail="Subscription not found")
cursor.close()
return {"message": "Unsubscribed successfully"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)