CSV Import

This commit is contained in:
James Pattinson
2025-10-13 13:28:12 +00:00
parent 9b6a6dab06
commit f721be7280
6 changed files with 1237 additions and 11 deletions

View File

@@ -10,6 +10,8 @@ from typing import List, Optional
import mysql.connector
from mysql.connector import Error
import os
import csv
import io
from contextlib import contextmanager
# Configuration
@@ -93,6 +95,18 @@ class Subscription(BaseModel):
member_email: EmailStr
active: bool = True
class BulkImportRequest(BaseModel):
csv_data: str
list_ids: List[int]
class BulkImportResult(BaseModel):
total_rows: int
processed_rows: int
created_members: int
updated_members: int
subscriptions_added: int
errors: List[str]
# Routes
@app.get("/")
async def root():
@@ -365,6 +379,118 @@ async def unsubscribe_member(list_email: EmailStr, member_email: EmailStr, token
cursor.close()
return {"message": "Unsubscribed successfully"}
@app.post("/bulk-import", response_model=BulkImportResult)
async def bulk_import_members(bulk_request: BulkImportRequest, token: str = Depends(verify_token)):
"""Bulk import members from CSV data and subscribe them to specified lists"""
result = BulkImportResult(
total_rows=0,
processed_rows=0,
created_members=0,
updated_members=0,
subscriptions_added=0,
errors=[]
)
with get_db() as conn:
cursor = conn.cursor(dictionary=True)
try:
# Verify all list_ids exist
if bulk_request.list_ids:
placeholders = ','.join(['%s'] * len(bulk_request.list_ids))
cursor.execute(f"SELECT list_id FROM lists WHERE list_id IN ({placeholders})", bulk_request.list_ids)
existing_lists = [row['list_id'] for row in cursor.fetchall()]
invalid_lists = set(bulk_request.list_ids) - set(existing_lists)
if invalid_lists:
raise HTTPException(status_code=400, detail=f"Invalid list IDs: {list(invalid_lists)}")
# Parse CSV data
csv_reader = csv.DictReader(io.StringIO(bulk_request.csv_data))
# Validate CSV headers - we need at least Name and Email
if not csv_reader.fieldnames or 'Name' not in csv_reader.fieldnames or 'Email' not in csv_reader.fieldnames:
raise HTTPException(status_code=400, detail="CSV must contain 'Name' and 'Email' columns")
for row_num, row in enumerate(csv_reader, start=1):
result.total_rows += 1
try:
name = row.get('Name', '').strip()
email = row.get('Email', '').strip()
# Skip rows without email (name is optional)
if not email:
result.errors.append(f"Row {row_num}: Missing email address")
continue
# Basic email validation
if '@' not in email or '.' not in email.split('@')[1]:
result.errors.append(f"Row {row_num}: Invalid email format: {email}")
continue
# Use email as name if no name provided
if not name:
name = email.split('@')[0] # Use the part before @ as a default name
# Check if member exists
cursor.execute("SELECT member_id FROM members WHERE email = %s", (email,))
existing_member = cursor.fetchone()
if existing_member:
# Update existing member's name if we have a better name (not auto-generated from email)
should_update_name = (
row.get('Name', '').strip() and # We have a name in the CSV
name != email.split('@')[0] # It's not the auto-generated name
)
if should_update_name:
cursor.execute("UPDATE members SET name = %s WHERE member_id = %s", (name, existing_member['member_id']))
if cursor.rowcount > 0:
result.updated_members += 1
member_id = existing_member['member_id']
else:
# Create new member
cursor.execute(
"INSERT INTO members (name, email, active) VALUES (%s, %s, %s)",
(name, email, True)
)
member_id = cursor.lastrowid
result.created_members += 1
# Subscribe to selected lists
for list_id in bulk_request.list_ids:
try:
cursor.execute(
"INSERT INTO list_members (list_id, member_id, active) VALUES (%s, %s, %s)",
(list_id, member_id, True)
)
result.subscriptions_added += 1
except Error as e:
if "Duplicate entry" in str(e):
# Member already subscribed to this list - not an error
pass
else:
result.errors.append(f"Row {row_num}: Failed to subscribe to list {list_id}: {str(e)}")
result.processed_rows += 1
except Exception as e:
result.errors.append(f"Row {row_num}: {str(e)}")
continue
# Commit all changes
conn.commit()
cursor.close()
return result
except HTTPException:
raise
except Exception as e:
cursor.close()
raise HTTPException(status_code=500, detail=f"Bulk import failed: {str(e)}")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)