from typing import List, Optional from sqlalchemy.orm import Session from sqlalchemy import and_, func from datetime import date, datetime from app.models.movement import Movement, MovementType from app.schemas.movement import MovementCreate class CRUDMovement: def get(self, db: Session, movement_id: int) -> Optional[Movement]: return db.query(Movement).filter(Movement.id == movement_id).first() def get_multi( self, db: Session, skip: int = 0, limit: int = 100, movement_type: Optional[MovementType] = None, aircraft_registration: Optional[str] = None, date_from: Optional[date] = None, date_to: Optional[date] = None, entity_type: Optional[str] = None ) -> List[Movement]: query = db.query(Movement) if movement_type: query = query.filter(Movement.movement_type == movement_type) if aircraft_registration: query = query.filter(Movement.aircraft_registration.ilike(f"%{aircraft_registration}%")) if date_from: query = query.filter(func.date(Movement.timestamp) >= date_from) if date_to: query = query.filter(func.date(Movement.timestamp) <= date_to) if entity_type: query = query.filter(Movement.entity_type == entity_type) return query.order_by(Movement.timestamp.desc()).offset(skip).limit(limit).all() def create(self, db: Session, obj_in: MovementCreate) -> Movement: db_obj = Movement(**obj_in.dict()) db.add(db_obj) db.commit() db.refresh(db_obj) return db_obj def update(self, db: Session, db_obj: Movement, obj_in: MovementCreate) -> Movement: update_data = obj_in.dict() for field, value in update_data.items(): setattr(db_obj, field, value) db.add(db_obj) db.commit() db.refresh(db_obj) return db_obj def find_daily_match( self, db: Session, target_date: date, aircraft_registration: str, movement_type: MovementType, entity_type: Optional[str] = None, entity_types: Optional[List[str]] = None ) -> Optional[Movement]: clean_reg = "".join(char for char in aircraft_registration.upper() if char.isalnum()) clean_column = func.upper(func.replace(func.replace(Movement.aircraft_registration, "-", ""), " ", "")) query = db.query(Movement).filter( func.date(Movement.timestamp) == target_date, clean_column == clean_reg, Movement.movement_type == movement_type ) if entity_type: query = query.filter(Movement.entity_type == entity_type) if entity_types: query = query.filter(Movement.entity_type.in_(entity_types)) return query.order_by(Movement.timestamp.desc()).first() def get_movements_by_entity(self, db: Session, entity_type: str, entity_id: int) -> List[Movement]: return db.query(Movement).filter( and_(Movement.entity_type == entity_type, Movement.entity_id == entity_id) ).order_by(Movement.timestamp).all() def get_daily_movements(self, db: Session, target_date: date) -> List[Movement]: return db.query(Movement).filter( func.date(Movement.timestamp) == target_date ).order_by(Movement.timestamp).all() movement = CRUDMovement()