371 lines
13 KiB
Python
371 lines
13 KiB
Python
"""
|
|
Mailing List Management API
|
|
FastAPI-based REST API for managing mailing lists and members
|
|
"""
|
|
from fastapi import FastAPI, HTTPException, Depends, Header
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
|
from pydantic import BaseModel, EmailStr
|
|
from typing import List, Optional
|
|
import mysql.connector
|
|
from mysql.connector import Error
|
|
import os
|
|
from contextlib import contextmanager
|
|
|
|
# Configuration
|
|
API_TOKEN = os.getenv('API_TOKEN', 'change-this-token')
|
|
MYSQL_HOST = os.getenv('MYSQL_HOST', 'mysql')
|
|
MYSQL_PORT = int(os.getenv('MYSQL_PORT', 3306))
|
|
MYSQL_DATABASE = os.getenv('MYSQL_DATABASE', 'maillist')
|
|
MYSQL_USER = os.getenv('MYSQL_USER', 'maillist')
|
|
MYSQL_PASSWORD = os.getenv('MYSQL_PASSWORD', '')
|
|
|
|
# FastAPI app
|
|
app = FastAPI(
|
|
title="Mailing List Manager API",
|
|
description="API for managing mailing lists and members",
|
|
version="1.0.0"
|
|
)
|
|
|
|
# Add CORS middleware
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"], # In production, specify your frontend domain
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
security = HTTPBearer()
|
|
|
|
# Database connection
|
|
@contextmanager
|
|
def get_db():
|
|
"""Database connection context manager"""
|
|
connection = None
|
|
try:
|
|
connection = mysql.connector.connect(
|
|
host=MYSQL_HOST,
|
|
port=MYSQL_PORT,
|
|
database=MYSQL_DATABASE,
|
|
user=MYSQL_USER,
|
|
password=MYSQL_PASSWORD
|
|
)
|
|
yield connection
|
|
except Error as e:
|
|
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}")
|
|
finally:
|
|
if connection and connection.is_connected():
|
|
connection.close()
|
|
|
|
# Authentication
|
|
def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
|
|
"""Verify API token"""
|
|
if credentials.credentials != API_TOKEN:
|
|
raise HTTPException(status_code=401, detail="Invalid authentication token")
|
|
return credentials.credentials
|
|
|
|
# Pydantic models
|
|
class MailingList(BaseModel):
|
|
list_id: Optional[int] = None
|
|
list_name: str
|
|
list_email: EmailStr
|
|
description: Optional[str] = None
|
|
active: bool = True
|
|
|
|
class MailingListUpdate(BaseModel):
|
|
list_name: Optional[str] = None
|
|
description: Optional[str] = None
|
|
active: Optional[bool] = None
|
|
|
|
class Member(BaseModel):
|
|
member_id: Optional[int] = None
|
|
name: str
|
|
email: EmailStr
|
|
active: bool = True
|
|
|
|
class MemberUpdate(BaseModel):
|
|
name: Optional[str] = None
|
|
active: Optional[bool] = None
|
|
|
|
class Subscription(BaseModel):
|
|
list_email: EmailStr
|
|
member_email: EmailStr
|
|
active: bool = True
|
|
|
|
# Routes
|
|
@app.get("/")
|
|
async def root():
|
|
"""API information"""
|
|
return {
|
|
"name": "Mailing List Manager API",
|
|
"version": "1.0.0",
|
|
"status": "running"
|
|
}
|
|
|
|
@app.get("/health")
|
|
async def health():
|
|
"""Health check endpoint"""
|
|
try:
|
|
with get_db() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT 1")
|
|
cursor.fetchone()
|
|
cursor.close()
|
|
return {"status": "healthy", "database": "connected"}
|
|
except Exception as e:
|
|
raise HTTPException(status_code=503, detail=f"Unhealthy: {str(e)}")
|
|
|
|
# Mailing Lists endpoints
|
|
@app.get("/lists", response_model=List[MailingList])
|
|
async def get_lists(token: str = Depends(verify_token)):
|
|
"""Get all mailing lists"""
|
|
with get_db() as conn:
|
|
cursor = conn.cursor(dictionary=True)
|
|
cursor.execute("SELECT * FROM lists ORDER BY list_name")
|
|
lists = cursor.fetchall()
|
|
cursor.close()
|
|
return lists
|
|
|
|
@app.get("/lists/{list_id}", response_model=MailingList)
|
|
async def get_list(list_id: int, token: str = Depends(verify_token)):
|
|
"""Get a specific mailing list"""
|
|
with get_db() as conn:
|
|
cursor = conn.cursor(dictionary=True)
|
|
cursor.execute("SELECT * FROM lists WHERE list_id = %s", (list_id,))
|
|
mailing_list = cursor.fetchone()
|
|
cursor.close()
|
|
|
|
if not mailing_list:
|
|
raise HTTPException(status_code=404, detail="List not found")
|
|
return mailing_list
|
|
|
|
@app.post("/lists", response_model=MailingList, status_code=201)
|
|
async def create_list(mailing_list: MailingList, token: str = Depends(verify_token)):
|
|
"""Create a new mailing list"""
|
|
with get_db() as conn:
|
|
cursor = conn.cursor()
|
|
try:
|
|
cursor.execute(
|
|
"INSERT INTO lists (list_name, list_email, description, active) VALUES (%s, %s, %s, %s)",
|
|
(mailing_list.list_name, mailing_list.list_email, mailing_list.description, mailing_list.active)
|
|
)
|
|
conn.commit()
|
|
mailing_list.list_id = cursor.lastrowid
|
|
cursor.close()
|
|
return mailing_list
|
|
except Error as e:
|
|
raise HTTPException(status_code=400, detail=f"Failed to create list: {str(e)}")
|
|
|
|
@app.patch("/lists/{list_id}", response_model=MailingList)
|
|
async def update_list(list_id: int, updates: MailingListUpdate, token: str = Depends(verify_token)):
|
|
"""Update a mailing list"""
|
|
with get_db() as conn:
|
|
cursor = conn.cursor(dictionary=True)
|
|
|
|
# Build update query dynamically
|
|
update_fields = []
|
|
values = []
|
|
|
|
if updates.list_name is not None:
|
|
update_fields.append("list_name = %s")
|
|
values.append(updates.list_name)
|
|
if updates.description is not None:
|
|
update_fields.append("description = %s")
|
|
values.append(updates.description)
|
|
if updates.active is not None:
|
|
update_fields.append("active = %s")
|
|
values.append(updates.active)
|
|
|
|
if not update_fields:
|
|
raise HTTPException(status_code=400, detail="No fields to update")
|
|
|
|
values.append(list_id)
|
|
query = f"UPDATE lists SET {', '.join(update_fields)} WHERE list_id = %s"
|
|
|
|
cursor.execute(query, values)
|
|
conn.commit()
|
|
|
|
# Return updated list
|
|
cursor.execute("SELECT * FROM lists WHERE list_id = %s", (list_id,))
|
|
updated_list = cursor.fetchone()
|
|
cursor.close()
|
|
|
|
if not updated_list:
|
|
raise HTTPException(status_code=404, detail="List not found")
|
|
return updated_list
|
|
|
|
@app.delete("/lists/{list_id}", status_code=204)
|
|
async def delete_list(list_id: int, token: str = Depends(verify_token)):
|
|
"""Delete a mailing list"""
|
|
with get_db() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("DELETE FROM lists WHERE list_id = %s", (list_id,))
|
|
conn.commit()
|
|
|
|
if cursor.rowcount == 0:
|
|
raise HTTPException(status_code=404, detail="List not found")
|
|
cursor.close()
|
|
|
|
# Members endpoints
|
|
@app.get("/members", response_model=List[Member])
|
|
async def get_members(token: str = Depends(verify_token)):
|
|
"""Get all members"""
|
|
with get_db() as conn:
|
|
cursor = conn.cursor(dictionary=True)
|
|
cursor.execute("SELECT * FROM members ORDER BY name")
|
|
members = cursor.fetchall()
|
|
cursor.close()
|
|
return members
|
|
|
|
@app.get("/members/{member_id}", response_model=Member)
|
|
async def get_member(member_id: int, token: str = Depends(verify_token)):
|
|
"""Get a specific member"""
|
|
with get_db() as conn:
|
|
cursor = conn.cursor(dictionary=True)
|
|
cursor.execute("SELECT * FROM members WHERE member_id = %s", (member_id,))
|
|
member = cursor.fetchone()
|
|
cursor.close()
|
|
|
|
if not member:
|
|
raise HTTPException(status_code=404, detail="Member not found")
|
|
return member
|
|
|
|
@app.post("/members", response_model=Member, status_code=201)
|
|
async def create_member(member: Member, token: str = Depends(verify_token)):
|
|
"""Create a new member"""
|
|
with get_db() as conn:
|
|
cursor = conn.cursor()
|
|
try:
|
|
cursor.execute(
|
|
"INSERT INTO members (name, email, active) VALUES (%s, %s, %s)",
|
|
(member.name, member.email, member.active)
|
|
)
|
|
conn.commit()
|
|
member.member_id = cursor.lastrowid
|
|
cursor.close()
|
|
return member
|
|
except Error as e:
|
|
raise HTTPException(status_code=400, detail=f"Failed to create member: {str(e)}")
|
|
|
|
@app.patch("/members/{member_id}", response_model=Member)
|
|
async def update_member(member_id: int, updates: MemberUpdate, token: str = Depends(verify_token)):
|
|
"""Update a member"""
|
|
with get_db() as conn:
|
|
cursor = conn.cursor(dictionary=True)
|
|
|
|
update_fields = []
|
|
values = []
|
|
|
|
if updates.name is not None:
|
|
update_fields.append("name = %s")
|
|
values.append(updates.name)
|
|
if updates.active is not None:
|
|
update_fields.append("active = %s")
|
|
values.append(updates.active)
|
|
|
|
if not update_fields:
|
|
raise HTTPException(status_code=400, detail="No fields to update")
|
|
|
|
values.append(member_id)
|
|
query = f"UPDATE members SET {', '.join(update_fields)} WHERE member_id = %s"
|
|
|
|
cursor.execute(query, values)
|
|
conn.commit()
|
|
|
|
cursor.execute("SELECT * FROM members WHERE member_id = %s", (member_id,))
|
|
updated_member = cursor.fetchone()
|
|
cursor.close()
|
|
|
|
if not updated_member:
|
|
raise HTTPException(status_code=404, detail="Member not found")
|
|
return updated_member
|
|
|
|
@app.delete("/members/{member_id}", status_code=204)
|
|
async def delete_member(member_id: int, token: str = Depends(verify_token)):
|
|
"""Delete a member"""
|
|
with get_db() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("DELETE FROM members WHERE member_id = %s", (member_id,))
|
|
conn.commit()
|
|
|
|
if cursor.rowcount == 0:
|
|
raise HTTPException(status_code=404, detail="Member not found")
|
|
cursor.close()
|
|
|
|
# Subscription endpoints
|
|
@app.get("/lists/{list_id}/members", response_model=List[Member])
|
|
async def get_list_members(list_id: int, token: str = Depends(verify_token)):
|
|
"""Get all members of a specific list"""
|
|
with get_db() as conn:
|
|
cursor = conn.cursor(dictionary=True)
|
|
cursor.execute("""
|
|
SELECT m.*
|
|
FROM members m
|
|
JOIN list_members lm ON m.member_id = lm.member_id
|
|
WHERE lm.list_id = %s AND lm.active = 1
|
|
ORDER BY m.name
|
|
""", (list_id,))
|
|
members = cursor.fetchall()
|
|
cursor.close()
|
|
return members
|
|
|
|
@app.post("/subscriptions", status_code=201)
|
|
async def subscribe_member(subscription: Subscription, token: str = Depends(verify_token)):
|
|
"""Subscribe a member to a list"""
|
|
with get_db() as conn:
|
|
cursor = conn.cursor()
|
|
try:
|
|
# Get list_id and member_id
|
|
cursor.execute("SELECT list_id FROM lists WHERE list_email = %s", (subscription.list_email,))
|
|
list_result = cursor.fetchone()
|
|
if not list_result:
|
|
raise HTTPException(status_code=404, detail="List not found")
|
|
|
|
cursor.execute("SELECT member_id FROM members WHERE email = %s", (subscription.member_email,))
|
|
member_result = cursor.fetchone()
|
|
if not member_result:
|
|
raise HTTPException(status_code=404, detail="Member not found")
|
|
|
|
list_id = list_result[0]
|
|
member_id = member_result[0]
|
|
|
|
# Insert subscription
|
|
cursor.execute(
|
|
"INSERT INTO list_members (list_id, member_id, active) VALUES (%s, %s, %s)",
|
|
(list_id, member_id, subscription.active)
|
|
)
|
|
conn.commit()
|
|
cursor.close()
|
|
|
|
return {"message": "Subscription created", "list_email": subscription.list_email, "member_email": subscription.member_email}
|
|
except Error as e:
|
|
if "Duplicate entry" in str(e):
|
|
raise HTTPException(status_code=400, detail="Member already subscribed to this list")
|
|
raise HTTPException(status_code=400, detail=f"Failed to create subscription: {str(e)}")
|
|
|
|
@app.delete("/subscriptions")
|
|
async def unsubscribe_member(list_email: EmailStr, member_email: EmailStr, token: str = Depends(verify_token)):
|
|
"""Unsubscribe a member from a list"""
|
|
with get_db() as conn:
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
DELETE lm FROM list_members lm
|
|
JOIN lists l ON lm.list_id = l.list_id
|
|
JOIN members m ON lm.member_id = m.member_id
|
|
WHERE l.list_email = %s AND m.email = %s
|
|
""", (list_email, member_email))
|
|
|
|
conn.commit()
|
|
|
|
if cursor.rowcount == 0:
|
|
raise HTTPException(status_code=404, detail="Subscription not found")
|
|
|
|
cursor.close()
|
|
return {"message": "Unsubscribed successfully"}
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
uvicorn.run(app, host="0.0.0.0", port=8000)
|