Remove redis
This commit is contained in:
@@ -6,7 +6,6 @@ A modern, containerized Prior Permission Required (PPR) system for aircraft oper
|
|||||||
|
|
||||||
- **Backend**: FastAPI with Python 3.11
|
- **Backend**: FastAPI with Python 3.11
|
||||||
- **Database**: MySQL 8.0
|
- **Database**: MySQL 8.0
|
||||||
- **Cache**: Redis 7
|
|
||||||
- **Container**: Docker & Docker Compose
|
- **Container**: Docker & Docker Compose
|
||||||
|
|
||||||
## Features
|
## Features
|
||||||
@@ -63,7 +62,7 @@ The container automatically handles:
|
|||||||
- Database connection verification
|
- Database connection verification
|
||||||
- Schema creation/migration (Alembic)
|
- Schema creation/migration (Alembic)
|
||||||
- Reference data seeding (if needed)
|
- Reference data seeding (if needed)
|
||||||
- Production server startup (4 workers)
|
- Production server startup (single worker for in-process WebSocket broadcasts)
|
||||||
|
|
||||||
**Monitor deployment:**
|
**Monitor deployment:**
|
||||||
```bash
|
```bash
|
||||||
@@ -289,7 +288,6 @@ This ensures consistency across different time zones and complies with aviation
|
|||||||
|
|
||||||
- Database connection pooling
|
- Database connection pooling
|
||||||
- Indexed columns for fast queries
|
- Indexed columns for fast queries
|
||||||
- Redis caching (ready for implementation)
|
|
||||||
- Async/await for non-blocking operations
|
- Async/await for non-blocking operations
|
||||||
|
|
||||||
## Monitoring
|
## Monitoring
|
||||||
|
|||||||
@@ -1,5 +1,9 @@
|
|||||||
TODO
|
TODO
|
||||||
|
|
||||||
|
Allow corrections
|
||||||
|
|
||||||
|
Post-strip reporting
|
||||||
|
|
||||||
Implement mark's 'tick off the PPRs' in the old admin screen
|
Implement mark's 'tick off the PPRs' in the old admin screen
|
||||||
|
|
||||||
Define schema for 'movements' table. We generate movement records as they happen so as not to reply on maths
|
Define schema for 'movements' table. We generate movement records as they happen so as not to reply on maths
|
||||||
@@ -9,3 +13,4 @@ Flow to create an arrival and maybe departure from a PPR. Perhaps we need a corr
|
|||||||
Ability to add a position report to a strip
|
Ability to add a position report to a strip
|
||||||
|
|
||||||
Improve journaling
|
Improve journaling
|
||||||
|
|
||||||
|
|||||||
@@ -1,5 +1,4 @@
|
|||||||
from pydantic_settings import BaseSettings
|
from pydantic_settings import BaseSettings
|
||||||
from typing import Optional
|
|
||||||
|
|
||||||
|
|
||||||
class Settings(BaseSettings):
|
class Settings(BaseSettings):
|
||||||
@@ -36,9 +35,6 @@ class Settings(BaseSettings):
|
|||||||
# Public booking settings
|
# Public booking settings
|
||||||
allow_public_booking: bool = False # Enable/disable public flight booking
|
allow_public_booking: bool = False # Enable/disable public flight booking
|
||||||
|
|
||||||
# Redis settings (for future use)
|
|
||||||
redis_url: Optional[str] = None
|
|
||||||
|
|
||||||
class Config:
|
class Config:
|
||||||
env_file = ".env"
|
env_file = ".env"
|
||||||
case_sensitive = False
|
case_sensitive = False
|
||||||
|
|||||||
+5
-87
@@ -1,10 +1,8 @@
|
|||||||
from fastapi import FastAPI, Depends, HTTPException, WebSocket, WebSocketDisconnect
|
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
|
||||||
from fastapi.middleware.cors import CORSMiddleware
|
from fastapi.middleware.cors import CORSMiddleware
|
||||||
from typing import List
|
from typing import List
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
import asyncio
|
|
||||||
import redis.asyncio as redis
|
|
||||||
from app.core.config import settings
|
from app.core.config import settings
|
||||||
from app.api.api import api_router
|
from app.api.api import api_router
|
||||||
|
|
||||||
@@ -22,10 +20,6 @@ from app.models.drone_request import DroneRequest
|
|||||||
logging.basicConfig(level=logging.INFO)
|
logging.basicConfig(level=logging.INFO)
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
# Redis client for pub/sub (cross-worker communication)
|
|
||||||
redis_client = None
|
|
||||||
pubsub = None
|
|
||||||
|
|
||||||
app = FastAPI(
|
app = FastAPI(
|
||||||
title=settings.project_name,
|
title=settings.project_name,
|
||||||
openapi_url=f"{settings.api_v1_str}/openapi.json",
|
openapi_url=f"{settings.api_v1_str}/openapi.json",
|
||||||
@@ -46,7 +40,6 @@ app.add_middleware(
|
|||||||
class ConnectionManager:
|
class ConnectionManager:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.active_connections: List[WebSocket] = []
|
self.active_connections: List[WebSocket] = []
|
||||||
self.redis_listener_task = None
|
|
||||||
|
|
||||||
async def connect(self, websocket: WebSocket):
|
async def connect(self, websocket: WebSocket):
|
||||||
await websocket.accept()
|
await websocket.accept()
|
||||||
@@ -61,10 +54,11 @@ class ConnectionManager:
|
|||||||
async def send_personal_message(self, message: str, websocket: WebSocket):
|
async def send_personal_message(self, message: str, websocket: WebSocket):
|
||||||
await websocket.send_text(message)
|
await websocket.send_text(message)
|
||||||
|
|
||||||
async def broadcast_local(self, message_str: str):
|
async def broadcast(self, message: dict):
|
||||||
"""Broadcast to connections on this worker only"""
|
"""Broadcast an update to every websocket connected to this process."""
|
||||||
|
message_str = json.dumps(message)
|
||||||
dead_connections = []
|
dead_connections = []
|
||||||
for connection in self.active_connections:
|
for connection in list(self.active_connections):
|
||||||
try:
|
try:
|
||||||
await connection.send_text(message_str)
|
await connection.send_text(message_str)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@@ -79,84 +73,8 @@ class ConnectionManager:
|
|||||||
if dead_connections:
|
if dead_connections:
|
||||||
logger.info(f"Removed {len(dead_connections)} dead connections")
|
logger.info(f"Removed {len(dead_connections)} dead connections")
|
||||||
|
|
||||||
async def broadcast(self, message: dict):
|
|
||||||
"""Broadcast via Redis pub/sub to all workers"""
|
|
||||||
message_str = json.dumps(message)
|
|
||||||
print(f"Publishing message to Redis channel: {message.get('type', 'unknown')}")
|
|
||||||
logger.info(f"Publishing message to Redis channel: {message.get('type', 'unknown')}")
|
|
||||||
|
|
||||||
try:
|
|
||||||
if redis_client:
|
|
||||||
await redis_client.publish('ppr_updates', message_str)
|
|
||||||
print(f"✓ Message published to Redis")
|
|
||||||
else:
|
|
||||||
# Fallback to local broadcast if Redis not available
|
|
||||||
print("⚠ Redis not available, falling back to local broadcast")
|
|
||||||
logger.warning("Redis not available, falling back to local broadcast")
|
|
||||||
await self.broadcast_local(message_str)
|
|
||||||
except Exception as e:
|
|
||||||
print(f"✗ Failed to publish to Redis: {e}")
|
|
||||||
logger.error(f"Failed to publish to Redis: {e}")
|
|
||||||
# Fallback to local broadcast
|
|
||||||
await self.broadcast_local(message_str)
|
|
||||||
|
|
||||||
async def start_redis_listener(self):
|
|
||||||
"""Listen for Redis pub/sub messages and broadcast to local connections"""
|
|
||||||
global redis_client, pubsub
|
|
||||||
|
|
||||||
try:
|
|
||||||
# Connect to Redis
|
|
||||||
redis_url = settings.redis_url or "redis://redis:6379"
|
|
||||||
print(f"Connecting to Redis at: {redis_url}")
|
|
||||||
redis_client = await redis.from_url(redis_url, encoding="utf-8", decode_responses=True)
|
|
||||||
pubsub = redis_client.pubsub()
|
|
||||||
await pubsub.subscribe('ppr_updates')
|
|
||||||
|
|
||||||
print("✓ Redis listener started for PPR updates")
|
|
||||||
logger.info("Redis listener started for PPR updates")
|
|
||||||
|
|
||||||
async for message in pubsub.listen():
|
|
||||||
if message['type'] == 'message':
|
|
||||||
message_data = message['data']
|
|
||||||
print(f"Received Redis message, broadcasting to {len(self.active_connections)} local connections")
|
|
||||||
logger.info(f"Received Redis message, broadcasting to {len(self.active_connections)} local connections")
|
|
||||||
await self.broadcast_local(message_data)
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
print(f"Redis listener error: {e}")
|
|
||||||
logger.error(f"Redis listener error: {e}")
|
|
||||||
await asyncio.sleep(5) # Wait before retry
|
|
||||||
# Retry connection
|
|
||||||
if self.redis_listener_task and not self.redis_listener_task.done():
|
|
||||||
asyncio.create_task(self.start_redis_listener())
|
|
||||||
|
|
||||||
manager = ConnectionManager()
|
manager = ConnectionManager()
|
||||||
|
|
||||||
@app.on_event("startup")
|
|
||||||
async def startup_event():
|
|
||||||
"""Start Redis listener when application starts"""
|
|
||||||
print("=" * 50)
|
|
||||||
print("STARTUP: Starting application and Redis listener...")
|
|
||||||
print("=" * 50)
|
|
||||||
logger.info("Starting application and Redis listener...")
|
|
||||||
manager.redis_listener_task = asyncio.create_task(manager.start_redis_listener())
|
|
||||||
|
|
||||||
@app.on_event("shutdown")
|
|
||||||
async def shutdown_event():
|
|
||||||
"""Clean up Redis connections on shutdown"""
|
|
||||||
logger.info("Shutting down application...")
|
|
||||||
global redis_client, pubsub
|
|
||||||
|
|
||||||
if manager.redis_listener_task:
|
|
||||||
manager.redis_listener_task.cancel()
|
|
||||||
|
|
||||||
if pubsub:
|
|
||||||
await pubsub.unsubscribe('ppr_updates')
|
|
||||||
await pubsub.close()
|
|
||||||
|
|
||||||
if redis_client:
|
|
||||||
await redis_client.close()
|
|
||||||
|
|
||||||
@app.websocket("/ws/tower-updates")
|
@app.websocket("/ws/tower-updates")
|
||||||
async def websocket_endpoint(websocket: WebSocket):
|
async def websocket_endpoint(websocket: WebSocket):
|
||||||
await manager.connect(websocket)
|
await manager.connect(websocket)
|
||||||
|
|||||||
@@ -15,6 +15,5 @@ pytest==7.4.3
|
|||||||
pytest-cov==4.1.0
|
pytest-cov==4.1.0
|
||||||
pytest-asyncio==0.21.1
|
pytest-asyncio==0.21.1
|
||||||
httpx==0.25.2
|
httpx==0.25.2
|
||||||
redis==5.0.1
|
|
||||||
aiosmtplib==3.0.1
|
aiosmtplib==3.0.1
|
||||||
jinja2==3.1.2
|
jinja2==3.1.2
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
# Backend API Test Guide
|
# Backend API Test Guide
|
||||||
|
|
||||||
This directory contains the backend API test suite. The tests use pytest, FastAPI's `TestClient`, and an isolated in-memory SQLite database. The goal is to cover the business-critical API behaviour without relying on MySQL, Redis, SMTP, or a running browser.
|
This directory contains the backend API test suite. The tests use pytest, FastAPI's `TestClient`, and an isolated in-memory SQLite database. The goal is to cover the business-critical API behaviour without relying on MySQL, SMTP, or a running browser.
|
||||||
|
|
||||||
## How To Run
|
## How To Run
|
||||||
|
|
||||||
@@ -190,8 +190,8 @@ Why it matters:
|
|||||||
|
|
||||||
## Current Scope
|
## Current Scope
|
||||||
|
|
||||||
The suite intentionally focuses on API behaviour and database side effects. It does not deeply test:
|
The suite intentionally focuses on API behaviour, local WebSocket broadcast behaviour, and database side effects. It does not deeply test:
|
||||||
- WebSocket connection lifecycle and Redis pub/sub behaviour.
|
- Full browser WebSocket lifecycle.
|
||||||
- Real SMTP delivery.
|
- Real SMTP delivery.
|
||||||
- Browser UI behaviour.
|
- Browser UI behaviour.
|
||||||
- Every branch of low-level validators or helper functions.
|
- Every branch of low-level validators or helper functions.
|
||||||
|
|||||||
@@ -1,3 +1,10 @@
|
|||||||
|
import json
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from app.main import ConnectionManager
|
||||||
|
|
||||||
|
|
||||||
def test_root_returns_api_metadata(client):
|
def test_root_returns_api_metadata(client):
|
||||||
response = client.get("/")
|
response = client.get("/")
|
||||||
|
|
||||||
@@ -12,3 +19,34 @@ def test_health_check_reports_database_connection(client):
|
|||||||
assert response.status_code == 200
|
assert response.status_code == 200
|
||||||
assert response.json()["status"] == "healthy"
|
assert response.json()["status"] == "healthy"
|
||||||
assert response.json()["database"] == "connected"
|
assert response.json()["database"] == "connected"
|
||||||
|
|
||||||
|
|
||||||
|
class FakeWebSocket:
|
||||||
|
def __init__(self, fail_send=False):
|
||||||
|
self.accepted = False
|
||||||
|
self.fail_send = fail_send
|
||||||
|
self.messages = []
|
||||||
|
|
||||||
|
async def accept(self):
|
||||||
|
self.accepted = True
|
||||||
|
|
||||||
|
async def send_text(self, message):
|
||||||
|
if self.fail_send:
|
||||||
|
raise RuntimeError("socket closed")
|
||||||
|
self.messages.append(message)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_connection_manager_broadcasts_to_active_connections_and_removes_dead_ones():
|
||||||
|
manager = ConnectionManager()
|
||||||
|
active_socket = FakeWebSocket()
|
||||||
|
dead_socket = FakeWebSocket(fail_send=True)
|
||||||
|
|
||||||
|
await manager.connect(active_socket)
|
||||||
|
await manager.connect(dead_socket)
|
||||||
|
await manager.broadcast({"type": "ppr_updated", "id": 123})
|
||||||
|
|
||||||
|
assert active_socket.accepted is True
|
||||||
|
assert dead_socket.accepted is True
|
||||||
|
assert json.loads(active_socket.messages[0]) == {"type": "ppr_updated", "id": 123}
|
||||||
|
assert manager.active_connections == [active_socket]
|
||||||
|
|||||||
@@ -25,12 +25,10 @@ services:
|
|||||||
MAIL_FROM: ${MAIL_FROM}
|
MAIL_FROM: ${MAIL_FROM}
|
||||||
MAIL_FROM_NAME: ${MAIL_FROM_NAME}
|
MAIL_FROM_NAME: ${MAIL_FROM_NAME}
|
||||||
BASE_URL: ${BASE_URL}
|
BASE_URL: ${BASE_URL}
|
||||||
REDIS_URL: ${REDIS_URL}
|
|
||||||
TAG: ${TAG}
|
TAG: ${TAG}
|
||||||
TOP_BAR_BASE_COLOR: ${TOP_BAR_BASE_COLOR}
|
TOP_BAR_BASE_COLOR: ${TOP_BAR_BASE_COLOR}
|
||||||
ALLOW_PUBLIC_BOOKING: ${ALLOW_PUBLIC_BOOKING}
|
ALLOW_PUBLIC_BOOKING: ${ALLOW_PUBLIC_BOOKING}
|
||||||
ENVIRONMENT: production
|
ENVIRONMENT: production
|
||||||
WORKERS: "4"
|
|
||||||
ports:
|
ports:
|
||||||
- "${API_PORT_EXTERNAL}:8000"
|
- "${API_PORT_EXTERNAL}:8000"
|
||||||
volumes:
|
volumes:
|
||||||
@@ -56,18 +54,6 @@ services:
|
|||||||
retries: 3
|
retries: 3
|
||||||
start_period: 40s
|
start_period: 40s
|
||||||
|
|
||||||
# Redis for caching (optional)
|
|
||||||
redis:
|
|
||||||
image: redis:7-alpine
|
|
||||||
restart: always
|
|
||||||
networks:
|
|
||||||
- app_network
|
|
||||||
deploy:
|
|
||||||
resources:
|
|
||||||
limits:
|
|
||||||
cpus: '0.5'
|
|
||||||
memory: 512M
|
|
||||||
|
|
||||||
# Nginx web server for public frontend
|
# Nginx web server for public frontend
|
||||||
web:
|
web:
|
||||||
image: nginx:alpine
|
image: nginx:alpine
|
||||||
|
|||||||
@@ -37,7 +37,6 @@ services:
|
|||||||
MAIL_FROM: ${MAIL_FROM}
|
MAIL_FROM: ${MAIL_FROM}
|
||||||
MAIL_FROM_NAME: ${MAIL_FROM_NAME}
|
MAIL_FROM_NAME: ${MAIL_FROM_NAME}
|
||||||
BASE_URL: ${BASE_URL}
|
BASE_URL: ${BASE_URL}
|
||||||
REDIS_URL: ${REDIS_URL}
|
|
||||||
TOWER_NAME: ${TOWER_NAME}
|
TOWER_NAME: ${TOWER_NAME}
|
||||||
TOP_BAR_BASE_COLOR: ${TOP_BAR_BASE_COLOR}
|
TOP_BAR_BASE_COLOR: ${TOP_BAR_BASE_COLOR}
|
||||||
ENVIRONMENT: ${ENVIRONMENT}
|
ENVIRONMENT: ${ENVIRONMENT}
|
||||||
@@ -72,14 +71,6 @@ services:
|
|||||||
networks:
|
networks:
|
||||||
- public_network
|
- public_network
|
||||||
|
|
||||||
# Redis for caching (optional for now)
|
|
||||||
redis:
|
|
||||||
image: redis:7-alpine
|
|
||||||
container_name: ppr_nextgen_redis
|
|
||||||
restart: unless-stopped
|
|
||||||
networks:
|
|
||||||
- private_network
|
|
||||||
|
|
||||||
# phpMyAdmin for database management
|
# phpMyAdmin for database management
|
||||||
phpmyadmin:
|
phpmyadmin:
|
||||||
image: phpmyadmin/phpmyadmin
|
image: phpmyadmin/phpmyadmin
|
||||||
|
|||||||
Reference in New Issue
Block a user