from __future__ import annotations import csv import io import json from datetime import date, datetime, time, timezone from zoneinfo import ZoneInfo from flask import Flask, Response, flash, redirect, render_template, request, url_for from sqlalchemy import Select, func, select from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import Session, joinedload from app.config import config from app.db import SessionLocal, init_db from app.models import Device, Reading, ReportRecipient, ScheduledReport, ScheduledReportRun from app.reporting import cadence_label, send_temperature_report from app.smtp2go import EmailConfigurationError from app.web_time import local_date_range_to_utc SENSOR_DEVICE_TYPES = {"WoIOSensor"} def create_app() -> Flask: app = Flask(__name__) app.secret_key = config.flask_secret_key init_db() @app.context_processor def inject_common_template_vars() -> dict[str, int]: return {"collect_interval_seconds": config.collect_interval_seconds} @app.get("/") def dashboard() -> str: tz = ZoneInfo(config.app_timezone) today = datetime.now(tz).date() start_utc, end_utc = local_date_range_to_utc(today, today, tz) with SessionLocal() as session: devices = list( session.scalars( select(Device) .where(Device.device_type.in_(SENSOR_DEVICE_TYPES)) .order_by(Device.name) ) ) readings = load_readings(session, start_utc, end_utc) latest = latest_by_device(session) return render_template( "dashboard.html", devices=devices, latest=latest, latest_relative=latest_ages_for_display(latest, tz), stats=stats_by_device(readings), chart_json=json.dumps(chart_payload(readings, tz)), timezone=config.app_timezone, ) @app.get("/devices/") def device_detail(device_id: str) -> str: tz = ZoneInfo(config.app_timezone) selected_date_text = request.args.get("date") or datetime.now(tz).date().isoformat() selected_date = date.fromisoformat(selected_date_text) start_utc, end_utc = local_date_range_to_utc(selected_date, selected_date, tz) with SessionLocal() as session: device = session.get(Device, device_id) if device is None or device.device_type not in SENSOR_DEVICE_TYPES: return render_template("not_found.html"), 404 readings = load_device_readings(session, device_id, start_utc, end_utc) return render_template( "device.html", device=device, date=selected_date_text, readings=readings, stats=stats_by_device(readings).get(device_id), chart_json=json.dumps(chart_payload(readings, tz)), timezone=config.app_timezone, local_readings=readings_for_display(readings, tz), ) @app.get("/reports") def reports() -> str: tz = ZoneInfo(config.app_timezone) start_text = request.args.get("start") or datetime.now(tz).date().isoformat() end_text = request.args.get("end") or start_text device_id = request.args.get("device_id") or "" start_date = date.fromisoformat(start_text) end_date = date.fromisoformat(end_text) start_utc, end_utc = local_date_range_to_utc(start_date, end_date, tz) with SessionLocal() as session: devices = list( session.scalars( select(Device) .where(Device.device_type.in_(SENSOR_DEVICE_TYPES)) .order_by(Device.name) ) ) rows = report_rows(session, start_utc, end_utc, device_id) return render_template( "reports.html", devices=devices, rows=rows, start=start_text, end=end_text, device_id=device_id, ) @app.get("/reports.csv") def reports_csv() -> Response: tz = ZoneInfo(config.app_timezone) start_date = date.fromisoformat(request.args.get("start") or datetime.now(tz).date().isoformat()) end_date = date.fromisoformat(request.args.get("end") or start_date.isoformat()) device_id = request.args.get("device_id") or "" start_utc, end_utc = local_date_range_to_utc(start_date, end_date, tz) with SessionLocal() as session: rows = report_rows(session, start_utc, end_utc, device_id) output = io.StringIO() writer = csv.writer(output) writer.writerow( [ "device", "samples", "low_temp", "high_temp", "avg_temp", "low_humidity", "high_humidity", "avg_humidity", ] ) for row in rows: writer.writerow( [ row["device_name"], row["samples"], row["low_temp"], row["high_temp"], row["avg_temp"], row["low_humidity"], row["high_humidity"], row["avg_humidity"], ] ) return Response( output.getvalue(), mimetype="text/csv", headers={"Content-Disposition": "attachment; filename=switchbot-report.csv"}, ) @app.post("/reports/email-pdf") def email_pdf_report() -> Response: tz = ZoneInfo(config.app_timezone) start_text = request.form.get("start") or datetime.now(tz).date().isoformat() end_text = request.form.get("end") or start_text device_id = request.form.get("device_id") or "" to_email = (request.form.get("email") or "").strip() if not to_email: flash("Enter an email address for the PDF report.", "error") return redirect(url_for("reports", start=start_text, end=end_text, device_id=device_id)) try: start_date = date.fromisoformat(start_text) end_date = date.fromisoformat(end_text) except ValueError: flash("Choose a valid report date range.", "error") return redirect(url_for("reports")) if end_date < start_date: flash("Report end date must be on or after the start date.", "error") return redirect(url_for("reports", start=start_text, end=end_text, device_id=device_id)) try: with SessionLocal() as session: result = send_temperature_report( session=session, to_email=to_email, start_date=start_date, end_date=end_date, tz=tz, device_id=device_id, ) except EmailConfigurationError as exc: flash(str(exc), "error") except Exception as exc: flash(f"Could not create or send the PDF report: {exc}", "error") else: flash(f"Sent PDF report for {result.period_label} to {to_email}.", "success") return redirect(url_for("reports", start=start_text, end=end_text, device_id=device_id)) @app.get("/report-settings") def report_settings() -> str: with SessionLocal() as session: recipients = list(session.scalars(select(ReportRecipient).order_by(ReportRecipient.email))) schedules = list( session.scalars( select(ScheduledReport) .options(joinedload(ScheduledReport.recipient), joinedload(ScheduledReport.device)) .order_by(ScheduledReport.cadence, ScheduledReport.send_time, ScheduledReport.id) ) ) devices = list( session.scalars( select(Device) .where(Device.device_type.in_(SENSOR_DEVICE_TYPES)) .order_by(Device.name) ) ) runs = list( session.scalars( select(ScheduledReportRun) .options( joinedload(ScheduledReportRun.scheduled_report).joinedload(ScheduledReport.recipient), joinedload(ScheduledReportRun.scheduled_report).joinedload(ScheduledReport.device), ) .order_by(ScheduledReportRun.created_at.desc()) .limit(20) ) ) return render_template( "report_settings.html", recipients=recipients, schedules=schedules, devices=devices, runs=runs, schedule_labels={schedule.id: cadence_label(schedule) for schedule in schedules}, weekdays=[ (0, "Monday"), (1, "Tuesday"), (2, "Wednesday"), (3, "Thursday"), (4, "Friday"), (5, "Saturday"), (6, "Sunday"), ], ) @app.post("/report-settings/recipients") def add_report_recipient() -> Response: email = (request.form.get("email") or "").strip().lower() name = (request.form.get("name") or "").strip() or None if not email or "@" not in email: flash("Enter a valid recipient email address.", "error") return redirect(url_for("report_settings")) with SessionLocal() as session: session.add(ReportRecipient(email=email, name=name)) try: session.commit() except IntegrityError: session.rollback() flash("That recipient already exists.", "error") else: flash(f"Added report recipient {email}.", "success") return redirect(url_for("report_settings")) @app.post("/report-settings/recipients//toggle") def toggle_report_recipient(recipient_id: int) -> Response: with SessionLocal() as session: recipient = session.get(ReportRecipient, recipient_id) if recipient is None: flash("Recipient not found.", "error") else: recipient.active = not recipient.active session.commit() flash(f"{'Enabled' if recipient.active else 'Paused'} {recipient.email}.", "success") return redirect(url_for("report_settings")) @app.post("/report-settings/schedules") def add_scheduled_report() -> Response: recipient_id = request.form.get("recipient_id", type=int) cadence = (request.form.get("cadence") or "").strip().lower() send_time = (request.form.get("send_time") or "08:00").strip() device_id = (request.form.get("device_id") or "").strip() or None if recipient_id is None or cadence not in {"daily", "weekly", "monthly"}: flash("Choose a recipient and cadence for the scheduled report.", "error") return redirect(url_for("report_settings")) if not valid_send_time(send_time): flash("Use a valid send time in HH:MM format.", "error") return redirect(url_for("report_settings")) weekday = None monthday = None if cadence == "weekly": weekday = request.form.get("weekday", type=int) if weekday is None or weekday < 0 or weekday > 6: flash("Choose a weekday for weekly reports.", "error") return redirect(url_for("report_settings")) if cadence == "monthly": monthday = request.form.get("monthday", type=int) if monthday is None or monthday < 1 or monthday > 31: flash("Choose a day from 1 to 31 for monthly reports.", "error") return redirect(url_for("report_settings")) with SessionLocal() as session: recipient = session.get(ReportRecipient, recipient_id) if recipient is None: flash("Recipient not found.", "error") return redirect(url_for("report_settings")) if device_id and session.get(Device, device_id) is None: flash("Device not found.", "error") return redirect(url_for("report_settings")) if scheduled_report_exists(session, recipient_id, cadence, send_time, weekday, monthday, device_id): flash("That scheduled report already exists.", "error") return redirect(url_for("report_settings")) session.add( ScheduledReport( recipient_id=recipient_id, cadence=cadence, send_time=send_time, weekday=weekday, monthday=monthday, device_id=device_id, ) ) session.commit() flash("Added scheduled report.", "success") return redirect(url_for("report_settings")) @app.post("/report-settings/schedules//toggle") def toggle_scheduled_report(schedule_id: int) -> Response: with SessionLocal() as session: schedule = session.get(ScheduledReport, schedule_id) if schedule is None: flash("Scheduled report not found.", "error") else: schedule.active = not schedule.active session.commit() flash(f"{'Enabled' if schedule.active else 'Paused'} scheduled report.", "success") return redirect(url_for("report_settings")) return app def load_readings(session: Session, start_utc: datetime, end_utc: datetime) -> list[Reading]: stmt = ( select(Reading) .options(joinedload(Reading.device)) .join(Reading.device) .where(Reading.recorded_at >= start_utc, Reading.recorded_at < end_utc) .order_by(Reading.recorded_at) ) return list(session.scalars(stmt)) def load_device_readings( session: Session, device_id: str, start_utc: datetime, end_utc: datetime, ) -> list[Reading]: stmt = ( select(Reading) .options(joinedload(Reading.device)) .where( Reading.device_id == device_id, Reading.recorded_at >= start_utc, Reading.recorded_at < end_utc, ) .order_by(Reading.recorded_at) ) return list(session.scalars(stmt)) def latest_by_device(session: Session) -> dict[str, Reading]: subquery = ( select(Reading.device_id, func.max(Reading.recorded_at).label("latest_at")) .group_by(Reading.device_id) .subquery() ) rows = session.scalars( select(Reading) .join(subquery, (Reading.device_id == subquery.c.device_id) & (Reading.recorded_at == subquery.c.latest_at)) .order_by(Reading.device_id) ) return {reading.device_id: reading for reading in rows} def stats_by_device(readings: list[Reading]) -> dict[str, dict[str, float | int]]: stats: dict[str, dict[str, float | int]] = {} for reading in readings: if reading.temperature is None: continue item = stats.setdefault( reading.device_id, { "low": reading.temperature, "high": reading.temperature, "samples": 0, }, ) item["low"] = min(float(item["low"]), reading.temperature) item["high"] = max(float(item["high"]), reading.temperature) item["samples"] = int(item["samples"]) + 1 return stats def chart_payload(readings: list[Reading], tz: ZoneInfo) -> list[dict[str, object]]: series: dict[str, dict[str, object]] = {} for reading in readings: if reading.temperature is None: continue item = series.setdefault( reading.device_id, { "deviceId": reading.device_id, "name": reading.device.name, "points": [], }, ) local_time = reading.recorded_at.replace(tzinfo=timezone.utc).astimezone(tz) item["points"].append( { "time": local_time.isoformat(timespec="minutes"), "temperature": reading.temperature, "humidity": reading.humidity, } ) return list(series.values()) def readings_for_display(readings: list[Reading], tz: ZoneInfo) -> list[dict[str, object]]: rows = [] for reading in readings: local_time = reading.recorded_at.replace(tzinfo=timezone.utc).astimezone(tz) rows.append( { "timestamp": local_time.strftime("%Y-%m-%d %H:%M"), "temperature": reading.temperature, "humidity": reading.humidity, "battery": reading.battery, } ) return rows def latest_ages_for_display(latest: dict[str, Reading], tz: ZoneInfo) -> dict[str, str]: now_local = datetime.now(tz) ages: dict[str, str] = {} for device_id, reading in latest.items(): collected_local = reading.recorded_at.replace(tzinfo=timezone.utc).astimezone(tz) delta_seconds = max(0, int((now_local - collected_local).total_seconds())) if delta_seconds < 60: ages[device_id] = "just now" continue minutes = delta_seconds // 60 if minutes < 60: ages[device_id] = f"{minutes} min ago" continue hours = minutes // 60 if hours < 24: ages[device_id] = f"{hours} h ago" continue days = hours // 24 ages[device_id] = f"{days} d ago" return ages def report_rows( session: Session, start_utc: datetime, end_utc: datetime, device_id: str = "", ) -> list[dict[str, object]]: stmt: Select[tuple] = ( select( Device.name, func.count(Reading.id), func.min(Reading.temperature), func.max(Reading.temperature), func.avg(Reading.temperature), func.min(Reading.humidity), func.max(Reading.humidity), func.avg(Reading.humidity), ) .join(Reading.device) .where(Reading.recorded_at >= start_utc, Reading.recorded_at < end_utc) .group_by(Device.id, Device.name) .order_by(Device.name) ) if device_id: stmt = stmt.where(Device.id == device_id) rows = [] for row in session.execute(stmt): rows.append( { "device_name": row[0], "samples": row[1], "low_temp": rounded(row[2]), "high_temp": rounded(row[3]), "avg_temp": rounded(row[4]), "low_humidity": rounded(row[5]), "high_humidity": rounded(row[6]), "avg_humidity": rounded(row[7]), } ) return rows def valid_send_time(value: str) -> bool: try: time.fromisoformat(value) except ValueError: return False return len(value) == 5 def scheduled_report_exists( session: Session, recipient_id: int, cadence: str, send_time: str, weekday: int | None, monthday: int | None, device_id: str | None, ) -> bool: schedules = session.scalars( select(ScheduledReport).where( ScheduledReport.recipient_id == recipient_id, ScheduledReport.cadence == cadence, ScheduledReport.send_time == send_time, ) ) for schedule in schedules: if schedule.weekday == weekday and schedule.monthday == monthday and schedule.device_id == device_id: return True return False def rounded(value: float | None) -> float | None: if value is None: return None return round(float(value), 1) app = create_app()