Files
ppr-ng/backend/app/crud/crud_departure.py

148 lines
4.8 KiB
Python

from typing import List, Optional
from sqlalchemy.orm import Session
from sqlalchemy import and_, or_, func, desc
from datetime import date, datetime
from app.models.departure import Departure, DepartureStatus
from app.schemas.departure import DepartureCreate, DepartureUpdate, DepartureStatusUpdate
from app.models.journal import EntityType
from app.crud.crud_journal import journal
class CRUDDeparture:
def get(self, db: Session, departure_id: int) -> Optional[Departure]:
return db.query(Departure).filter(Departure.id == departure_id).first()
def get_multi(
self,
db: Session,
skip: int = 0,
limit: int = 100,
status: Optional[DepartureStatus] = None,
date_from: Optional[date] = None,
date_to: Optional[date] = None
) -> List[Departure]:
query = db.query(Departure)
if status:
query = query.filter(Departure.status == status)
if date_from:
query = query.filter(func.date(Departure.created_dt) >= date_from)
if date_to:
query = query.filter(func.date(Departure.created_dt) <= date_to)
return query.order_by(desc(Departure.created_dt)).offset(skip).limit(limit).all()
def get_departures_today(self, db: Session) -> List[Departure]:
"""Get today's departures (booked out or departed)"""
today = date.today()
return db.query(Departure).filter(
and_(
func.date(Departure.created_dt) == today,
or_(
Departure.status == DepartureStatus.BOOKED_OUT,
Departure.status == DepartureStatus.DEPARTED
)
)
).order_by(Departure.created_dt).all()
def create(self, db: Session, obj_in: DepartureCreate, created_by: str) -> Departure:
db_obj = Departure(
**obj_in.dict(),
created_by=created_by,
status=DepartureStatus.BOOKED_OUT
)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def update(self, db: Session, db_obj: Departure, obj_in: DepartureUpdate, user: str = "system", user_ip: Optional[str] = None) -> Departure:
from datetime import datetime as dt
update_data = obj_in.dict(exclude_unset=True)
changes = []
for field, value in update_data.items():
old_value = getattr(db_obj, field)
# Normalize datetime values for comparison (ignore timezone differences)
if isinstance(old_value, dt) and isinstance(value, dt):
# Compare only the date and time, ignoring timezone
old_normalized = old_value.replace(tzinfo=None) if old_value.tzinfo else old_value
new_normalized = value.replace(tzinfo=None) if value.tzinfo else value
if old_normalized == new_normalized:
continue # Skip if datetimes are the same
if old_value != value:
changes.append(f"{field} changed from '{old_value}' to '{value}'")
setattr(db_obj, field, value)
if changes:
db.add(db_obj)
db.commit()
db.refresh(db_obj)
# Log changes in journal
for change in changes:
journal.log_change(
db,
EntityType.DEPARTURE,
db_obj.id,
change,
user,
user_ip
)
return db_obj
def update_status(
self,
db: Session,
departure_id: int,
status: DepartureStatus,
timestamp: Optional[datetime] = None,
user: str = "system",
user_ip: Optional[str] = None
) -> Optional[Departure]:
db_obj = self.get(db, departure_id)
if not db_obj:
return None
old_status = db_obj.status
db_obj.status = status
if status == DepartureStatus.DEPARTED and timestamp:
db_obj.departed_dt = timestamp
db.add(db_obj)
db.commit()
db.refresh(db_obj)
# Log status change in journal
journal.log_change(
db,
EntityType.DEPARTURE,
departure_id,
f"Status changed from {old_status.value} to {status.value}",
user,
user_ip
)
return db_obj
def cancel(self, db: Session, departure_id: int) -> Optional[Departure]:
db_obj = self.get(db, departure_id)
if not db_obj:
return None
db_obj.status = DepartureStatus.CANCELLED
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
departure = CRUDDeparture()