93 lines
3.4 KiB
Python
93 lines
3.4 KiB
Python
from typing import List, Optional
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy import and_, func
|
|
from datetime import date, datetime
|
|
from app.models.movement import Movement, MovementType
|
|
from app.schemas.movement import MovementCreate
|
|
|
|
|
|
class CRUDMovement:
|
|
def get(self, db: Session, movement_id: int) -> Optional[Movement]:
|
|
return db.query(Movement).filter(Movement.id == movement_id).first()
|
|
|
|
def get_multi(
|
|
self,
|
|
db: Session,
|
|
skip: int = 0,
|
|
limit: int = 100,
|
|
movement_type: Optional[MovementType] = None,
|
|
aircraft_registration: Optional[str] = None,
|
|
date_from: Optional[date] = None,
|
|
date_to: Optional[date] = None,
|
|
entity_type: Optional[str] = None
|
|
) -> List[Movement]:
|
|
query = db.query(Movement)
|
|
|
|
if movement_type:
|
|
query = query.filter(Movement.movement_type == movement_type)
|
|
|
|
if aircraft_registration:
|
|
query = query.filter(Movement.aircraft_registration.ilike(f"%{aircraft_registration}%"))
|
|
|
|
if date_from:
|
|
query = query.filter(func.date(Movement.timestamp) >= date_from)
|
|
|
|
if date_to:
|
|
query = query.filter(func.date(Movement.timestamp) <= date_to)
|
|
|
|
if entity_type:
|
|
query = query.filter(Movement.entity_type == entity_type)
|
|
|
|
return query.order_by(Movement.timestamp.desc()).offset(skip).limit(limit).all()
|
|
|
|
def create(self, db: Session, obj_in: MovementCreate) -> Movement:
|
|
db_obj = Movement(**obj_in.dict())
|
|
db.add(db_obj)
|
|
db.commit()
|
|
db.refresh(db_obj)
|
|
return db_obj
|
|
|
|
def update(self, db: Session, db_obj: Movement, obj_in: MovementCreate) -> Movement:
|
|
update_data = obj_in.dict()
|
|
for field, value in update_data.items():
|
|
setattr(db_obj, field, value)
|
|
db.add(db_obj)
|
|
db.commit()
|
|
db.refresh(db_obj)
|
|
return db_obj
|
|
|
|
def find_daily_match(
|
|
self,
|
|
db: Session,
|
|
target_date: date,
|
|
aircraft_registration: str,
|
|
movement_type: MovementType,
|
|
entity_type: Optional[str] = None,
|
|
entity_types: Optional[List[str]] = None
|
|
) -> Optional[Movement]:
|
|
clean_reg = "".join(char for char in aircraft_registration.upper() if char.isalnum())
|
|
clean_column = func.upper(func.replace(func.replace(Movement.aircraft_registration, "-", ""), " ", ""))
|
|
query = db.query(Movement).filter(
|
|
func.date(Movement.timestamp) == target_date,
|
|
clean_column == clean_reg,
|
|
Movement.movement_type == movement_type
|
|
)
|
|
if entity_type:
|
|
query = query.filter(Movement.entity_type == entity_type)
|
|
if entity_types:
|
|
query = query.filter(Movement.entity_type.in_(entity_types))
|
|
return query.order_by(Movement.timestamp.desc()).first()
|
|
|
|
def get_movements_by_entity(self, db: Session, entity_type: str, entity_id: int) -> List[Movement]:
|
|
return db.query(Movement).filter(
|
|
and_(Movement.entity_type == entity_type, Movement.entity_id == entity_id)
|
|
).order_by(Movement.timestamp).all()
|
|
|
|
def get_daily_movements(self, db: Session, target_date: date) -> List[Movement]:
|
|
return db.query(Movement).filter(
|
|
func.date(Movement.timestamp) == target_date
|
|
).order_by(Movement.timestamp).all()
|
|
|
|
|
|
movement = CRUDMovement()
|