from __future__ import annotations import csv import io import json from datetime import date, datetime, time, timedelta, timezone from zoneinfo import ZoneInfo from flask import Flask, Response, render_template, request from sqlalchemy import Select, func, select from sqlalchemy.orm import Session, joinedload from app.config import config from app.db import SessionLocal, init_db from app.models import Device, Reading SENSOR_DEVICE_TYPES = {"WoIOSensor"} def create_app() -> Flask: app = Flask(__name__) app.secret_key = config.flask_secret_key init_db() @app.get("/") def dashboard() -> str: tz = ZoneInfo(config.app_timezone) today = datetime.now(tz).date() start_utc, end_utc = local_date_range_to_utc(today, today, tz) with SessionLocal() as session: devices = list( session.scalars( select(Device) .where(Device.device_type.in_(SENSOR_DEVICE_TYPES)) .order_by(Device.name) ) ) readings = load_readings(session, start_utc, end_utc) latest = latest_by_device(session) return render_template( "dashboard.html", devices=devices, latest=latest, stats=stats_by_device(readings), chart_json=json.dumps(chart_payload(readings, tz)), timezone=config.app_timezone, collect_interval_seconds=config.collect_interval_seconds, ) @app.get("/devices/") def device_detail(device_id: str) -> str: tz = ZoneInfo(config.app_timezone) selected_date_text = request.args.get("date") or datetime.now(tz).date().isoformat() selected_date = date.fromisoformat(selected_date_text) start_utc, end_utc = local_date_range_to_utc(selected_date, selected_date, tz) with SessionLocal() as session: device = session.get(Device, device_id) if device is None or device.device_type not in SENSOR_DEVICE_TYPES: return render_template("not_found.html"), 404 readings = load_device_readings(session, device_id, start_utc, end_utc) return render_template( "device.html", device=device, date=selected_date_text, readings=readings, stats=stats_by_device(readings).get(device_id), chart_json=json.dumps(chart_payload(readings, tz)), timezone=config.app_timezone, local_readings=readings_for_display(readings, tz), ) @app.get("/reports") def reports() -> str: tz = ZoneInfo(config.app_timezone) start_text = request.args.get("start") or datetime.now(tz).date().isoformat() end_text = request.args.get("end") or start_text device_id = request.args.get("device_id") or "" start_date = date.fromisoformat(start_text) end_date = date.fromisoformat(end_text) start_utc, end_utc = local_date_range_to_utc(start_date, end_date, tz) with SessionLocal() as session: devices = list( session.scalars( select(Device) .where(Device.device_type.in_(SENSOR_DEVICE_TYPES)) .order_by(Device.name) ) ) rows = report_rows(session, start_utc, end_utc, device_id) return render_template( "reports.html", devices=devices, rows=rows, start=start_text, end=end_text, device_id=device_id, ) @app.get("/reports.csv") def reports_csv() -> Response: tz = ZoneInfo(config.app_timezone) start_date = date.fromisoformat(request.args.get("start") or datetime.now(tz).date().isoformat()) end_date = date.fromisoformat(request.args.get("end") or start_date.isoformat()) device_id = request.args.get("device_id") or "" start_utc, end_utc = local_date_range_to_utc(start_date, end_date, tz) with SessionLocal() as session: rows = report_rows(session, start_utc, end_utc, device_id) output = io.StringIO() writer = csv.writer(output) writer.writerow( [ "device", "samples", "low_temp", "high_temp", "avg_temp", "low_humidity", "high_humidity", "avg_humidity", ] ) for row in rows: writer.writerow( [ row["device_name"], row["samples"], row["low_temp"], row["high_temp"], row["avg_temp"], row["low_humidity"], row["high_humidity"], row["avg_humidity"], ] ) return Response( output.getvalue(), mimetype="text/csv", headers={"Content-Disposition": "attachment; filename=switchbot-report.csv"}, ) return app def local_date_range_to_utc(start: date, end: date, tz: ZoneInfo) -> tuple[datetime, datetime]: local_start = datetime.combine(start, time.min, tzinfo=tz) local_end = datetime.combine(end + timedelta(days=1), time.min, tzinfo=tz) return ( local_start.astimezone(timezone.utc).replace(tzinfo=None), local_end.astimezone(timezone.utc).replace(tzinfo=None), ) def load_readings(session: Session, start_utc: datetime, end_utc: datetime) -> list[Reading]: stmt = ( select(Reading) .options(joinedload(Reading.device)) .join(Reading.device) .where(Reading.recorded_at >= start_utc, Reading.recorded_at < end_utc) .order_by(Reading.recorded_at) ) return list(session.scalars(stmt)) def load_device_readings( session: Session, device_id: str, start_utc: datetime, end_utc: datetime, ) -> list[Reading]: stmt = ( select(Reading) .options(joinedload(Reading.device)) .where( Reading.device_id == device_id, Reading.recorded_at >= start_utc, Reading.recorded_at < end_utc, ) .order_by(Reading.recorded_at) ) return list(session.scalars(stmt)) def latest_by_device(session: Session) -> dict[str, Reading]: subquery = ( select(Reading.device_id, func.max(Reading.recorded_at).label("latest_at")) .group_by(Reading.device_id) .subquery() ) rows = session.scalars( select(Reading) .join(subquery, (Reading.device_id == subquery.c.device_id) & (Reading.recorded_at == subquery.c.latest_at)) .order_by(Reading.device_id) ) return {reading.device_id: reading for reading in rows} def stats_by_device(readings: list[Reading]) -> dict[str, dict[str, float | int]]: stats: dict[str, dict[str, float | int]] = {} for reading in readings: if reading.temperature is None: continue item = stats.setdefault( reading.device_id, { "low": reading.temperature, "high": reading.temperature, "samples": 0, }, ) item["low"] = min(float(item["low"]), reading.temperature) item["high"] = max(float(item["high"]), reading.temperature) item["samples"] = int(item["samples"]) + 1 return stats def chart_payload(readings: list[Reading], tz: ZoneInfo) -> list[dict[str, object]]: series: dict[str, dict[str, object]] = {} for reading in readings: if reading.temperature is None: continue item = series.setdefault( reading.device_id, { "deviceId": reading.device_id, "name": reading.device.name, "points": [], }, ) local_time = reading.recorded_at.replace(tzinfo=timezone.utc).astimezone(tz) item["points"].append( { "time": local_time.isoformat(timespec="minutes"), "temperature": reading.temperature, "humidity": reading.humidity, } ) return list(series.values()) def readings_for_display(readings: list[Reading], tz: ZoneInfo) -> list[dict[str, object]]: rows = [] for reading in readings: local_time = reading.recorded_at.replace(tzinfo=timezone.utc).astimezone(tz) rows.append( { "timestamp": local_time.strftime("%Y-%m-%d %H:%M"), "temperature": reading.temperature, "humidity": reading.humidity, "battery": reading.battery, } ) return rows def report_rows( session: Session, start_utc: datetime, end_utc: datetime, device_id: str = "", ) -> list[dict[str, object]]: stmt: Select[tuple] = ( select( Device.name, func.count(Reading.id), func.min(Reading.temperature), func.max(Reading.temperature), func.avg(Reading.temperature), func.min(Reading.humidity), func.max(Reading.humidity), func.avg(Reading.humidity), ) .join(Reading.device) .where(Reading.recorded_at >= start_utc, Reading.recorded_at < end_utc) .group_by(Device.id, Device.name) .order_by(Device.name) ) if device_id: stmt = stmt.where(Device.id == device_id) rows = [] for row in session.execute(stmt): rows.append( { "device_name": row[0], "samples": row[1], "low_temp": rounded(row[2]), "high_temp": rounded(row[3]), "avg_temp": rounded(row[4]), "low_humidity": rounded(row[5]), "high_humidity": rounded(row[6]), "avg_humidity": rounded(row[7]), } ) return rows def rounded(value: float | None) -> float | None: if value is None: return None return round(float(value), 1) app = create_app()