179 lines
5.7 KiB
Python
179 lines
5.7 KiB
Python
from datetime import date, datetime
|
|
import secrets
|
|
from typing import List, Optional
|
|
|
|
from sqlalchemy import desc, func
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.crud.crud_journal import journal as crud_journal
|
|
from app.models.drone_request import DroneRequest, DroneRequestStatus
|
|
from app.models.journal import EntityType
|
|
from app.schemas.drone_request import DroneRequestCreate, DroneRequestUpdate
|
|
|
|
|
|
class CRUDDroneRequest:
|
|
def get(self, db: Session, request_id: int) -> Optional[DroneRequest]:
|
|
return db.query(DroneRequest).filter(DroneRequest.id == request_id).first()
|
|
|
|
def get_by_reference(self, db: Session, reference_number: str) -> Optional[DroneRequest]:
|
|
return db.query(DroneRequest).filter(DroneRequest.reference_number == reference_number).first()
|
|
|
|
def get_by_public_token(self, db: Session, token: str) -> Optional[DroneRequest]:
|
|
return db.query(DroneRequest).filter(DroneRequest.public_token == token).first()
|
|
|
|
def get_multi(
|
|
self,
|
|
db: Session,
|
|
skip: int = 0,
|
|
limit: int = 100,
|
|
status: Optional[DroneRequestStatus] = None,
|
|
date_from: Optional[date] = None,
|
|
date_to: Optional[date] = None,
|
|
) -> List[DroneRequest]:
|
|
query = db.query(DroneRequest)
|
|
|
|
if status:
|
|
query = query.filter(DroneRequest.status == status)
|
|
if date_from:
|
|
query = query.filter(func.date(DroneRequest.estimated_takeoff_at) >= date_from)
|
|
if date_to:
|
|
query = query.filter(func.date(DroneRequest.estimated_takeoff_at) <= date_to)
|
|
|
|
return query.order_by(desc(DroneRequest.submitted_at)).offset(skip).limit(limit).all()
|
|
|
|
def create(
|
|
self,
|
|
db: Session,
|
|
obj_in: DroneRequestCreate,
|
|
created_by: str = "public",
|
|
user_ip: str = "127.0.0.1",
|
|
submitted_via: str = "PUBLIC",
|
|
) -> DroneRequest:
|
|
reference_number = self._generate_reference(db)
|
|
payload = obj_in.dict()
|
|
notes = payload.pop("notes", None)
|
|
|
|
db_obj = DroneRequest(
|
|
**payload,
|
|
applicant_notes=notes,
|
|
reference_number=reference_number,
|
|
public_token=secrets.token_urlsafe(64),
|
|
status=DroneRequestStatus.NEW,
|
|
created_by=created_by,
|
|
submitted_ip=user_ip,
|
|
submitted_via=submitted_via,
|
|
)
|
|
db.add(db_obj)
|
|
db.commit()
|
|
db.refresh(db_obj)
|
|
|
|
crud_journal.log_change(
|
|
db,
|
|
EntityType.DRONE_REQUEST,
|
|
db_obj.id,
|
|
f"Drone request {db_obj.reference_number} created",
|
|
created_by,
|
|
user_ip,
|
|
)
|
|
return db_obj
|
|
|
|
def update(
|
|
self,
|
|
db: Session,
|
|
db_obj: DroneRequest,
|
|
obj_in: DroneRequestUpdate,
|
|
user: str = "system",
|
|
user_ip: str = "127.0.0.1",
|
|
) -> DroneRequest:
|
|
update_data = obj_in.dict(exclude_unset=True)
|
|
if "notes" in update_data:
|
|
update_data["applicant_notes"] = update_data.pop("notes")
|
|
changes = []
|
|
|
|
for field, value in update_data.items():
|
|
old_value = getattr(db_obj, field)
|
|
if old_value != value:
|
|
changes.append(f"{field} changed from '{old_value}' to '{value}'")
|
|
setattr(db_obj, field, value)
|
|
|
|
if changes:
|
|
db.add(db_obj)
|
|
db.commit()
|
|
db.refresh(db_obj)
|
|
for change in changes:
|
|
crud_journal.log_change(db, EntityType.DRONE_REQUEST, db_obj.id, change, user, user_ip)
|
|
|
|
return db_obj
|
|
|
|
def update_status(
|
|
self,
|
|
db: Session,
|
|
request_id: int,
|
|
status: DroneRequestStatus,
|
|
comment: Optional[str] = None,
|
|
user: str = "system",
|
|
user_ip: str = "127.0.0.1",
|
|
) -> Optional[DroneRequest]:
|
|
db_obj = self.get(db, request_id)
|
|
if not db_obj:
|
|
return None
|
|
|
|
old_status = db_obj.status
|
|
db_obj.status = status
|
|
db_obj.status_changed_at = datetime.utcnow()
|
|
db_obj.status_changed_by = user
|
|
if comment:
|
|
db_obj.operator_comments = comment
|
|
|
|
db.add(db_obj)
|
|
db.commit()
|
|
db.refresh(db_obj)
|
|
|
|
entry = f"Status changed from {old_status.value} to {status.value}"
|
|
if comment:
|
|
entry = f"{entry}: {comment}"
|
|
crud_journal.log_change(db, EntityType.DRONE_REQUEST, db_obj.id, entry, user, user_ip)
|
|
return db_obj
|
|
|
|
def add_comment(
|
|
self,
|
|
db: Session,
|
|
request_id: int,
|
|
comment: str,
|
|
user: str = "system",
|
|
user_ip: str = "127.0.0.1",
|
|
) -> Optional[DroneRequest]:
|
|
db_obj = self.get(db, request_id)
|
|
if not db_obj:
|
|
return None
|
|
|
|
db_obj.operator_comments = comment
|
|
db.add(db_obj)
|
|
db.commit()
|
|
db.refresh(db_obj)
|
|
|
|
crud_journal.log_change(db, EntityType.DRONE_REQUEST, db_obj.id, f"Comment added: {comment}", user, user_ip)
|
|
return db_obj
|
|
|
|
def _generate_reference(self, db: Session) -> str:
|
|
prefix = f"DRN-{datetime.utcnow().strftime('%y%m%d')}"
|
|
references = db.query(DroneRequest.reference_number).filter(
|
|
DroneRequest.reference_number.like(f"{prefix}-%")
|
|
).all()
|
|
|
|
highest_sequence = 0
|
|
for (reference_number,) in references:
|
|
suffix = reference_number.rsplit("-", 1)[-1]
|
|
if suffix.isdigit():
|
|
highest_sequence = max(highest_sequence, int(suffix))
|
|
|
|
for sequence in range(highest_sequence + 1, highest_sequence + 11):
|
|
candidate = f"{prefix}-{sequence}"
|
|
if not self.get_by_reference(db, candidate):
|
|
return candidate
|
|
|
|
return f"{prefix}-{highest_sequence + 11}"
|
|
|
|
|
|
drone_request = CRUDDroneRequest()
|