53 lines
1.5 KiB
Python
53 lines
1.5 KiB
Python
import json
|
|
|
|
import pytest
|
|
|
|
from app.main import ConnectionManager
|
|
|
|
|
|
def test_root_returns_api_metadata(client):
|
|
response = client.get("/")
|
|
|
|
assert response.status_code == 200
|
|
assert response.json()["message"] == "Airfield PPR API"
|
|
assert response.json()["docs"] == "/docs"
|
|
|
|
|
|
def test_health_check_reports_database_connection(client):
|
|
response = client.get("/health")
|
|
|
|
assert response.status_code == 200
|
|
assert response.json()["status"] == "healthy"
|
|
assert response.json()["database"] == "connected"
|
|
|
|
|
|
class FakeWebSocket:
|
|
def __init__(self, fail_send=False):
|
|
self.accepted = False
|
|
self.fail_send = fail_send
|
|
self.messages = []
|
|
|
|
async def accept(self):
|
|
self.accepted = True
|
|
|
|
async def send_text(self, message):
|
|
if self.fail_send:
|
|
raise RuntimeError("socket closed")
|
|
self.messages.append(message)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_connection_manager_broadcasts_to_active_connections_and_removes_dead_ones():
|
|
manager = ConnectionManager()
|
|
active_socket = FakeWebSocket()
|
|
dead_socket = FakeWebSocket(fail_send=True)
|
|
|
|
await manager.connect(active_socket)
|
|
await manager.connect(dead_socket)
|
|
await manager.broadcast({"type": "ppr_updated", "id": 123})
|
|
|
|
assert active_socket.accepted is True
|
|
assert dead_socket.accepted is True
|
|
assert json.loads(active_socket.messages[0]) == {"type": "ppr_updated", "id": 123}
|
|
assert manager.active_connections == [active_socket]
|