Now we have an API
This commit is contained in:
360
api/main.py
Normal file
360
api/main.py
Normal file
@@ -0,0 +1,360 @@
|
||||
"""
|
||||
Mailing List Management API
|
||||
FastAPI-based REST API for managing mailing lists and members
|
||||
"""
|
||||
from fastapi import FastAPI, HTTPException, Depends, Header
|
||||
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
||||
from pydantic import BaseModel, EmailStr
|
||||
from typing import List, Optional
|
||||
import mysql.connector
|
||||
from mysql.connector import Error
|
||||
import os
|
||||
from contextlib import contextmanager
|
||||
|
||||
# Configuration
|
||||
API_TOKEN = os.getenv('API_TOKEN', 'change-this-token')
|
||||
MYSQL_HOST = os.getenv('MYSQL_HOST', 'mysql')
|
||||
MYSQL_PORT = int(os.getenv('MYSQL_PORT', 3306))
|
||||
MYSQL_DATABASE = os.getenv('MYSQL_DATABASE', 'maillist')
|
||||
MYSQL_USER = os.getenv('MYSQL_USER', 'maillist')
|
||||
MYSQL_PASSWORD = os.getenv('MYSQL_PASSWORD', '')
|
||||
|
||||
# FastAPI app
|
||||
app = FastAPI(
|
||||
title="Mailing List Manager API",
|
||||
description="API for managing mailing lists and members",
|
||||
version="1.0.0"
|
||||
)
|
||||
|
||||
security = HTTPBearer()
|
||||
|
||||
# Database connection
|
||||
@contextmanager
|
||||
def get_db():
|
||||
"""Database connection context manager"""
|
||||
connection = None
|
||||
try:
|
||||
connection = mysql.connector.connect(
|
||||
host=MYSQL_HOST,
|
||||
port=MYSQL_PORT,
|
||||
database=MYSQL_DATABASE,
|
||||
user=MYSQL_USER,
|
||||
password=MYSQL_PASSWORD
|
||||
)
|
||||
yield connection
|
||||
except Error as e:
|
||||
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}")
|
||||
finally:
|
||||
if connection and connection.is_connected():
|
||||
connection.close()
|
||||
|
||||
# Authentication
|
||||
def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
|
||||
"""Verify API token"""
|
||||
if credentials.credentials != API_TOKEN:
|
||||
raise HTTPException(status_code=401, detail="Invalid authentication token")
|
||||
return credentials.credentials
|
||||
|
||||
# Pydantic models
|
||||
class MailingList(BaseModel):
|
||||
list_id: Optional[int] = None
|
||||
list_name: str
|
||||
list_email: EmailStr
|
||||
description: Optional[str] = None
|
||||
active: bool = True
|
||||
|
||||
class MailingListUpdate(BaseModel):
|
||||
list_name: Optional[str] = None
|
||||
description: Optional[str] = None
|
||||
active: Optional[bool] = None
|
||||
|
||||
class Member(BaseModel):
|
||||
member_id: Optional[int] = None
|
||||
name: str
|
||||
email: EmailStr
|
||||
active: bool = True
|
||||
|
||||
class MemberUpdate(BaseModel):
|
||||
name: Optional[str] = None
|
||||
active: Optional[bool] = None
|
||||
|
||||
class Subscription(BaseModel):
|
||||
list_email: EmailStr
|
||||
member_email: EmailStr
|
||||
active: bool = True
|
||||
|
||||
# Routes
|
||||
@app.get("/")
|
||||
async def root():
|
||||
"""API information"""
|
||||
return {
|
||||
"name": "Mailing List Manager API",
|
||||
"version": "1.0.0",
|
||||
"status": "running"
|
||||
}
|
||||
|
||||
@app.get("/health")
|
||||
async def health():
|
||||
"""Health check endpoint"""
|
||||
try:
|
||||
with get_db() as conn:
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("SELECT 1")
|
||||
cursor.fetchone()
|
||||
cursor.close()
|
||||
return {"status": "healthy", "database": "connected"}
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=503, detail=f"Unhealthy: {str(e)}")
|
||||
|
||||
# Mailing Lists endpoints
|
||||
@app.get("/lists", response_model=List[MailingList])
|
||||
async def get_lists(token: str = Depends(verify_token)):
|
||||
"""Get all mailing lists"""
|
||||
with get_db() as conn:
|
||||
cursor = conn.cursor(dictionary=True)
|
||||
cursor.execute("SELECT * FROM lists ORDER BY list_name")
|
||||
lists = cursor.fetchall()
|
||||
cursor.close()
|
||||
return lists
|
||||
|
||||
@app.get("/lists/{list_id}", response_model=MailingList)
|
||||
async def get_list(list_id: int, token: str = Depends(verify_token)):
|
||||
"""Get a specific mailing list"""
|
||||
with get_db() as conn:
|
||||
cursor = conn.cursor(dictionary=True)
|
||||
cursor.execute("SELECT * FROM lists WHERE list_id = %s", (list_id,))
|
||||
mailing_list = cursor.fetchone()
|
||||
cursor.close()
|
||||
|
||||
if not mailing_list:
|
||||
raise HTTPException(status_code=404, detail="List not found")
|
||||
return mailing_list
|
||||
|
||||
@app.post("/lists", response_model=MailingList, status_code=201)
|
||||
async def create_list(mailing_list: MailingList, token: str = Depends(verify_token)):
|
||||
"""Create a new mailing list"""
|
||||
with get_db() as conn:
|
||||
cursor = conn.cursor()
|
||||
try:
|
||||
cursor.execute(
|
||||
"INSERT INTO lists (list_name, list_email, description, active) VALUES (%s, %s, %s, %s)",
|
||||
(mailing_list.list_name, mailing_list.list_email, mailing_list.description, mailing_list.active)
|
||||
)
|
||||
conn.commit()
|
||||
mailing_list.list_id = cursor.lastrowid
|
||||
cursor.close()
|
||||
return mailing_list
|
||||
except Error as e:
|
||||
raise HTTPException(status_code=400, detail=f"Failed to create list: {str(e)}")
|
||||
|
||||
@app.patch("/lists/{list_id}", response_model=MailingList)
|
||||
async def update_list(list_id: int, updates: MailingListUpdate, token: str = Depends(verify_token)):
|
||||
"""Update a mailing list"""
|
||||
with get_db() as conn:
|
||||
cursor = conn.cursor(dictionary=True)
|
||||
|
||||
# Build update query dynamically
|
||||
update_fields = []
|
||||
values = []
|
||||
|
||||
if updates.list_name is not None:
|
||||
update_fields.append("list_name = %s")
|
||||
values.append(updates.list_name)
|
||||
if updates.description is not None:
|
||||
update_fields.append("description = %s")
|
||||
values.append(updates.description)
|
||||
if updates.active is not None:
|
||||
update_fields.append("active = %s")
|
||||
values.append(updates.active)
|
||||
|
||||
if not update_fields:
|
||||
raise HTTPException(status_code=400, detail="No fields to update")
|
||||
|
||||
values.append(list_id)
|
||||
query = f"UPDATE lists SET {', '.join(update_fields)} WHERE list_id = %s"
|
||||
|
||||
cursor.execute(query, values)
|
||||
conn.commit()
|
||||
|
||||
# Return updated list
|
||||
cursor.execute("SELECT * FROM lists WHERE list_id = %s", (list_id,))
|
||||
updated_list = cursor.fetchone()
|
||||
cursor.close()
|
||||
|
||||
if not updated_list:
|
||||
raise HTTPException(status_code=404, detail="List not found")
|
||||
return updated_list
|
||||
|
||||
@app.delete("/lists/{list_id}", status_code=204)
|
||||
async def delete_list(list_id: int, token: str = Depends(verify_token)):
|
||||
"""Delete a mailing list"""
|
||||
with get_db() as conn:
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("DELETE FROM lists WHERE list_id = %s", (list_id,))
|
||||
conn.commit()
|
||||
|
||||
if cursor.rowcount == 0:
|
||||
raise HTTPException(status_code=404, detail="List not found")
|
||||
cursor.close()
|
||||
|
||||
# Members endpoints
|
||||
@app.get("/members", response_model=List[Member])
|
||||
async def get_members(token: str = Depends(verify_token)):
|
||||
"""Get all members"""
|
||||
with get_db() as conn:
|
||||
cursor = conn.cursor(dictionary=True)
|
||||
cursor.execute("SELECT * FROM members ORDER BY name")
|
||||
members = cursor.fetchall()
|
||||
cursor.close()
|
||||
return members
|
||||
|
||||
@app.get("/members/{member_id}", response_model=Member)
|
||||
async def get_member(member_id: int, token: str = Depends(verify_token)):
|
||||
"""Get a specific member"""
|
||||
with get_db() as conn:
|
||||
cursor = conn.cursor(dictionary=True)
|
||||
cursor.execute("SELECT * FROM members WHERE member_id = %s", (member_id,))
|
||||
member = cursor.fetchone()
|
||||
cursor.close()
|
||||
|
||||
if not member:
|
||||
raise HTTPException(status_code=404, detail="Member not found")
|
||||
return member
|
||||
|
||||
@app.post("/members", response_model=Member, status_code=201)
|
||||
async def create_member(member: Member, token: str = Depends(verify_token)):
|
||||
"""Create a new member"""
|
||||
with get_db() as conn:
|
||||
cursor = conn.cursor()
|
||||
try:
|
||||
cursor.execute(
|
||||
"INSERT INTO members (name, email, active) VALUES (%s, %s, %s)",
|
||||
(member.name, member.email, member.active)
|
||||
)
|
||||
conn.commit()
|
||||
member.member_id = cursor.lastrowid
|
||||
cursor.close()
|
||||
return member
|
||||
except Error as e:
|
||||
raise HTTPException(status_code=400, detail=f"Failed to create member: {str(e)}")
|
||||
|
||||
@app.patch("/members/{member_id}", response_model=Member)
|
||||
async def update_member(member_id: int, updates: MemberUpdate, token: str = Depends(verify_token)):
|
||||
"""Update a member"""
|
||||
with get_db() as conn:
|
||||
cursor = conn.cursor(dictionary=True)
|
||||
|
||||
update_fields = []
|
||||
values = []
|
||||
|
||||
if updates.name is not None:
|
||||
update_fields.append("name = %s")
|
||||
values.append(updates.name)
|
||||
if updates.active is not None:
|
||||
update_fields.append("active = %s")
|
||||
values.append(updates.active)
|
||||
|
||||
if not update_fields:
|
||||
raise HTTPException(status_code=400, detail="No fields to update")
|
||||
|
||||
values.append(member_id)
|
||||
query = f"UPDATE members SET {', '.join(update_fields)} WHERE member_id = %s"
|
||||
|
||||
cursor.execute(query, values)
|
||||
conn.commit()
|
||||
|
||||
cursor.execute("SELECT * FROM members WHERE member_id = %s", (member_id,))
|
||||
updated_member = cursor.fetchone()
|
||||
cursor.close()
|
||||
|
||||
if not updated_member:
|
||||
raise HTTPException(status_code=404, detail="Member not found")
|
||||
return updated_member
|
||||
|
||||
@app.delete("/members/{member_id}", status_code=204)
|
||||
async def delete_member(member_id: int, token: str = Depends(verify_token)):
|
||||
"""Delete a member"""
|
||||
with get_db() as conn:
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("DELETE FROM members WHERE member_id = %s", (member_id,))
|
||||
conn.commit()
|
||||
|
||||
if cursor.rowcount == 0:
|
||||
raise HTTPException(status_code=404, detail="Member not found")
|
||||
cursor.close()
|
||||
|
||||
# Subscription endpoints
|
||||
@app.get("/lists/{list_id}/members", response_model=List[Member])
|
||||
async def get_list_members(list_id: int, token: str = Depends(verify_token)):
|
||||
"""Get all members of a specific list"""
|
||||
with get_db() as conn:
|
||||
cursor = conn.cursor(dictionary=True)
|
||||
cursor.execute("""
|
||||
SELECT m.*
|
||||
FROM members m
|
||||
JOIN list_members lm ON m.member_id = lm.member_id
|
||||
WHERE lm.list_id = %s AND lm.active = 1
|
||||
ORDER BY m.name
|
||||
""", (list_id,))
|
||||
members = cursor.fetchall()
|
||||
cursor.close()
|
||||
return members
|
||||
|
||||
@app.post("/subscriptions", status_code=201)
|
||||
async def subscribe_member(subscription: Subscription, token: str = Depends(verify_token)):
|
||||
"""Subscribe a member to a list"""
|
||||
with get_db() as conn:
|
||||
cursor = conn.cursor()
|
||||
try:
|
||||
# Get list_id and member_id
|
||||
cursor.execute("SELECT list_id FROM lists WHERE list_email = %s", (subscription.list_email,))
|
||||
list_result = cursor.fetchone()
|
||||
if not list_result:
|
||||
raise HTTPException(status_code=404, detail="List not found")
|
||||
|
||||
cursor.execute("SELECT member_id FROM members WHERE email = %s", (subscription.member_email,))
|
||||
member_result = cursor.fetchone()
|
||||
if not member_result:
|
||||
raise HTTPException(status_code=404, detail="Member not found")
|
||||
|
||||
list_id = list_result[0]
|
||||
member_id = member_result[0]
|
||||
|
||||
# Insert subscription
|
||||
cursor.execute(
|
||||
"INSERT INTO list_members (list_id, member_id, active) VALUES (%s, %s, %s)",
|
||||
(list_id, member_id, subscription.active)
|
||||
)
|
||||
conn.commit()
|
||||
cursor.close()
|
||||
|
||||
return {"message": "Subscription created", "list_email": subscription.list_email, "member_email": subscription.member_email}
|
||||
except Error as e:
|
||||
if "Duplicate entry" in str(e):
|
||||
raise HTTPException(status_code=400, detail="Member already subscribed to this list")
|
||||
raise HTTPException(status_code=400, detail=f"Failed to create subscription: {str(e)}")
|
||||
|
||||
@app.delete("/subscriptions")
|
||||
async def unsubscribe_member(list_email: EmailStr, member_email: EmailStr, token: str = Depends(verify_token)):
|
||||
"""Unsubscribe a member from a list"""
|
||||
with get_db() as conn:
|
||||
cursor = conn.cursor()
|
||||
|
||||
cursor.execute("""
|
||||
DELETE lm FROM list_members lm
|
||||
JOIN lists l ON lm.list_id = l.list_id
|
||||
JOIN members m ON lm.member_id = m.member_id
|
||||
WHERE l.list_email = %s AND m.email = %s
|
||||
""", (list_email, member_email))
|
||||
|
||||
conn.commit()
|
||||
|
||||
if cursor.rowcount == 0:
|
||||
raise HTTPException(status_code=404, detail="Subscription not found")
|
||||
|
||||
cursor.close()
|
||||
return {"message": "Unsubscribed successfully"}
|
||||
|
||||
if __name__ == "__main__":
|
||||
import uvicorn
|
||||
uvicorn.run(app, host="0.0.0.0", port=8000)
|
||||
Reference in New Issue
Block a user