Files
ppr-ng/backend/app/crud/crud_ppr.py
2025-10-21 20:23:58 +00:00

165 lines
5.0 KiB
Python

from typing import List, Optional
from sqlalchemy.orm import Session
from sqlalchemy import and_, or_, func, desc
from datetime import date, datetime
from app.models.ppr import PPRRecord, PPRStatus
from app.schemas.ppr import PPRCreate, PPRUpdate
from app.crud.crud_journal import journal as crud_journal
class CRUDPPR:
def get(self, db: Session, ppr_id: int) -> Optional[PPRRecord]:
return db.query(PPRRecord).filter(PPRRecord.id == ppr_id).first()
def get_multi(
self,
db: Session,
skip: int = 0,
limit: int = 100,
status: Optional[PPRStatus] = None,
date_from: Optional[date] = None,
date_to: Optional[date] = None
) -> List[PPRRecord]:
query = db.query(PPRRecord)
if status:
query = query.filter(PPRRecord.status == status)
if date_from:
query = query.filter(
or_(
func.date(PPRRecord.eta) >= date_from,
func.date(PPRRecord.etd) >= date_from
)
)
if date_to:
query = query.filter(
or_(
func.date(PPRRecord.eta) <= date_to,
func.date(PPRRecord.etd) <= date_to
)
)
return query.order_by(desc(PPRRecord.submitted_dt)).offset(skip).limit(limit).all()
def get_arrivals_today(self, db: Session) -> List[PPRRecord]:
"""Get today's arrivals"""
today = date.today()
return db.query(PPRRecord).filter(
and_(
func.date(PPRRecord.eta) == today,
PPRRecord.status == PPRStatus.NEW
)
).order_by(PPRRecord.eta).all()
def get_departures_today(self, db: Session) -> List[PPRRecord]:
"""Get today's departures"""
today = date.today()
return db.query(PPRRecord).filter(
and_(
func.date(PPRRecord.etd) == today,
PPRRecord.status == PPRStatus.LANDED
)
).order_by(PPRRecord.etd).all()
def create(self, db: Session, obj_in: PPRCreate, created_by: str, user_ip: str = "127.0.0.1") -> PPRRecord:
db_obj = PPRRecord(
**obj_in.dict(),
created_by=created_by,
status=PPRStatus.NEW
)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
# Log creation in journal
crud_journal.log_change(
db,
db_obj.id,
f"PPR created for {db_obj.ac_reg}",
created_by,
user_ip
)
return db_obj
def update(self, db: Session, db_obj: PPRRecord, obj_in: PPRUpdate, user: str = "system", user_ip: str = "127.0.0.1") -> PPRRecord:
update_data = obj_in.dict(exclude_unset=True)
changes = []
for field, value in update_data.items():
old_value = getattr(db_obj, field)
if old_value != value:
changes.append(f"{field} changed from '{old_value}' to '{value}'")
setattr(db_obj, field, value)
if changes:
db.add(db_obj)
db.commit()
db.refresh(db_obj)
# Log changes in journal
for change in changes:
crud_journal.log_change(db, db_obj.id, change, user, user_ip)
return db_obj
def update_status(
self,
db: Session,
ppr_id: int,
status: PPRStatus,
user: str = "system",
user_ip: str = "127.0.0.1"
) -> Optional[PPRRecord]:
db_obj = self.get(db, ppr_id)
if not db_obj:
return None
old_status = db_obj.status
db_obj.status = status
# Set timestamps based on status
if status == PPRStatus.LANDED:
db_obj.landed_dt = datetime.utcnow()
elif status == PPRStatus.DEPARTED:
db_obj.departed_dt = datetime.utcnow()
db.add(db_obj)
db.commit()
db.refresh(db_obj)
# Log status change in journal
crud_journal.log_change(
db,
db_obj.id,
f"Status changed from {old_status.value} to {status.value}",
user,
user_ip
)
return db_obj
def delete(self, db: Session, ppr_id: int, user: str = "system", user_ip: str = "127.0.0.1") -> Optional[PPRRecord]:
db_obj = self.get(db, ppr_id)
if db_obj:
old_status = db_obj.status
# Soft delete by setting status
db_obj.status = PPRStatus.DELETED
db.add(db_obj)
db.commit()
db.refresh(db_obj)
# Log the deletion in journal
crud_journal.log_change(
db,
db_obj.id,
f"PPR marked as DELETED (was {old_status.value})",
user,
user_ip
)
return db_obj
ppr = CRUDPPR()