Files
ppr-ng/backend/app/crud/crud_local_flight.py
T
2026-06-28 08:35:39 +00:00

272 lines
10 KiB
Python

from typing import List, Optional
from sqlalchemy.orm import Session
from sqlalchemy import and_, or_, func, desc
from datetime import date, datetime
from app.models.local_flight import LocalFlight, LocalFlightStatus, LocalFlightType
from app.schemas.local_flight import LocalFlightCreate, LocalFlightUpdate, LocalFlightStatusUpdate
from app.models.journal import EntityType
from app.models.circuit import Circuit
from app.crud.crud_journal import journal
from app.crud.crud_movement import movement as movement_crud
from app.schemas.movement import MovementCreate
from app.models.movement import MovementType
class CRUDLocalFlight:
def get(self, db: Session, flight_id: int) -> Optional[LocalFlight]:
return db.query(LocalFlight).filter(LocalFlight.id == flight_id).first()
def get_multi(
self,
db: Session,
skip: int = 0,
limit: int = 100,
status: Optional[LocalFlightStatus] = None,
flight_type: Optional[LocalFlightType] = None,
date_from: Optional[date] = None,
date_to: Optional[date] = None
) -> List[LocalFlight]:
query = db.query(LocalFlight)
if status:
if status == LocalFlightStatus.CIRCUIT:
# Special case: when requesting CIRCUIT status, return all circuit-related statuses
circuit_statuses = [
LocalFlightStatus.CIRCUIT,
LocalFlightStatus.CIRCUIT_DOWNWIND,
LocalFlightStatus.CIRCUIT_BASE,
LocalFlightStatus.CIRCUIT_FINAL
]
query = query.filter(LocalFlight.status.in_(circuit_statuses))
else:
query = query.filter(LocalFlight.status == status)
if flight_type:
query = query.filter(LocalFlight.flight_type == flight_type)
if date_from:
query = query.filter(func.date(LocalFlight.created_dt) >= date_from)
if date_to:
query = query.filter(func.date(LocalFlight.created_dt) <= date_to)
return query.order_by(desc(LocalFlight.created_dt)).offset(skip).limit(limit).all()
def get_active_flights(self, db: Session) -> List[LocalFlight]:
"""Get currently active (booked out or departed) flights"""
return db.query(LocalFlight).filter(
or_(
LocalFlight.status == LocalFlightStatus.BOOKED_OUT,
LocalFlight.status == LocalFlightStatus.DEPARTED
)
).order_by(desc(LocalFlight.created_dt)).all()
def get_departures_today(self, db: Session) -> List[LocalFlight]:
"""Get today's departures (booked out or departed)"""
today = date.today()
return db.query(LocalFlight).filter(
and_(
func.date(LocalFlight.created_dt) == today,
or_(
LocalFlight.status == LocalFlightStatus.BOOKED_OUT,
LocalFlight.status == LocalFlightStatus.DEPARTED
)
)
).order_by(LocalFlight.created_dt).all()
def get_booked_out_today(self, db: Session) -> List[LocalFlight]:
"""Get all flights booked out today"""
today = date.today()
return db.query(LocalFlight).filter(
and_(
func.date(LocalFlight.created_dt) == today,
or_(
LocalFlight.status == LocalFlightStatus.BOOKED_OUT,
LocalFlight.status == LocalFlightStatus.LANDED
)
)
).order_by(LocalFlight.created_dt).all()
def create(self, db: Session, obj_in: LocalFlightCreate, created_by: str, submitted_via: str = "ADMIN", user_ip: Optional[str] = None) -> LocalFlight:
from app.models.local_flight import SubmissionSource
# Set initial status based on submission source
initial_status = LocalFlightStatus.BOOKED_OUT
if submitted_via == SubmissionSource.ADMIN:
initial_status = LocalFlightStatus.GROUND
db_obj = LocalFlight(
**obj_in.dict(),
created_by=created_by,
status=initial_status,
submitted_via=submitted_via
)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
# Log creation in journal
journal.log_change(
db,
EntityType.LOCAL_FLIGHT,
db_obj.id,
f"Local flight created: {db_obj.registration} ({db_obj.flight_type.value})",
created_by,
user_ip
)
return db_obj
def update(self, db: Session, db_obj: LocalFlight, obj_in: LocalFlightUpdate, user: str = "system", user_ip: Optional[str] = None) -> LocalFlight:
from datetime import datetime as dt
update_data = obj_in.dict(exclude_unset=True)
changes = []
for field, value in update_data.items():
old_value = getattr(db_obj, field)
# Normalize datetime values for comparison (ignore timezone differences)
if isinstance(old_value, dt) and isinstance(value, dt):
# Compare only the date and time, ignoring timezone
old_normalized = old_value.replace(tzinfo=None) if old_value.tzinfo else old_value
new_normalized = value.replace(tzinfo=None) if value.tzinfo else value
if old_normalized == new_normalized:
continue # Skip if datetimes are the same
if old_value != value:
changes.append(f"{field} changed from '{old_value}' to '{value}'")
setattr(db_obj, field, value)
if changes:
db.add(db_obj)
db.commit()
db.refresh(db_obj)
# Log changes in journal
for change in changes:
journal.log_change(
db,
EntityType.LOCAL_FLIGHT,
db_obj.id,
change,
user,
user_ip
)
return db_obj
def update_status(
self,
db: Session,
flight_id: int,
status: LocalFlightStatus,
timestamp: Optional[datetime] = None,
user: str = "system",
user_ip: Optional[str] = None
) -> Optional[LocalFlight]:
db_obj = self.get(db, flight_id)
if not db_obj:
return None
# Ensure status is a LocalFlightStatus enum
if isinstance(status, str):
status = LocalFlightStatus(status)
old_status = db_obj.status
db_obj.status = status
# Update flight_type based on status changes
if status == LocalFlightStatus.LOCAL:
db_obj.flight_type = LocalFlightType.LOCAL
elif status == LocalFlightStatus.CIRCUIT:
db_obj.flight_type = LocalFlightType.CIRCUITS
# Set timestamps based on status
current_time = timestamp if timestamp is not None else datetime.utcnow()
if status == LocalFlightStatus.GROUND:
db_obj.contact_dt = current_time
elif status == LocalFlightStatus.DEPARTED and not db_obj.departed_dt:
db_obj.departed_dt = current_time
elif status == LocalFlightStatus.LANDED and not db_obj.landed_dt:
db_obj.landed_dt = current_time
# Count circuits from the circuits table and populate the circuits column
circuit_count = db.query(func.count(Circuit.id)).filter(
Circuit.local_flight_id == flight_id
).scalar()
db_obj.circuits = circuit_count
# Takeoff: happens once when transitioning away from GROUND
if old_status == LocalFlightStatus.GROUND and status in (LocalFlightStatus.DEPARTED, LocalFlightStatus.LOCAL, LocalFlightStatus.CIRCUIT) and not db_obj.takeoff_dt:
db_obj.takeoff_dt = current_time
if not db_obj.departed_dt:
db_obj.departed_dt = current_time
db.add(db_obj)
db.commit()
db.refresh(db_obj)
# Create movement record if applicable
if db_obj.takeoff_dt and old_status == LocalFlightStatus.GROUND and status in (LocalFlightStatus.DEPARTED, LocalFlightStatus.LOCAL, LocalFlightStatus.CIRCUIT):
movement_data = MovementCreate(
movement_type=MovementType.TAKEOFF,
aircraft_registration=db_obj.registration,
aircraft_type=db_obj.type,
callsign=db_obj.callsign,
timestamp=db_obj.takeoff_dt,
entity_type="LOCAL_FLIGHT",
entity_id=flight_id,
created_by=user,
ip_address=user_ip
)
movement_crud.create(db, movement_data)
if db_obj.landed_dt and status == LocalFlightStatus.LANDED:
movement_data = MovementCreate(
movement_type=MovementType.LANDING,
aircraft_registration=db_obj.registration,
aircraft_type=db_obj.type,
callsign=db_obj.callsign,
timestamp=db_obj.landed_dt,
entity_type="LOCAL_FLIGHT",
entity_id=flight_id,
created_by=user,
ip_address=user_ip
)
movement_crud.create(db, movement_data)
# Log status change in journal
journal.log_change(
db,
EntityType.LOCAL_FLIGHT,
flight_id,
f"Status changed from {old_status.value} to {status.value}",
user,
user_ip
)
return db_obj
def cancel(self, db: Session, flight_id: int, user: str = "system", user_ip: Optional[str] = None) -> Optional[LocalFlight]:
db_obj = self.get(db, flight_id)
if db_obj:
old_status = db_obj.status
db_obj.status = LocalFlightStatus.CANCELLED
db.add(db_obj)
db.commit()
db.refresh(db_obj)
# Log cancellation in journal
journal.log_change(
db,
EntityType.LOCAL_FLIGHT,
flight_id,
f"Status changed from {old_status.value} to CANCELLED",
user,
user_ip
)
return db_obj
local_flight = CRUDLocalFlight()