557 lines
20 KiB
Python
557 lines
20 KiB
Python
from __future__ import annotations
|
|
|
|
import csv
|
|
import io
|
|
import json
|
|
from datetime import date, datetime, time, timezone
|
|
from zoneinfo import ZoneInfo
|
|
|
|
from flask import Flask, Response, flash, redirect, render_template, request, url_for
|
|
from sqlalchemy import Select, func, select
|
|
from sqlalchemy.exc import IntegrityError
|
|
from sqlalchemy.orm import Session, joinedload
|
|
|
|
from app.config import config
|
|
from app.db import SessionLocal, init_db
|
|
from app.models import Device, Reading, ReportRecipient, ScheduledReport, ScheduledReportRun
|
|
from app.reporting import cadence_label, send_temperature_report
|
|
from app.smtp2go import EmailConfigurationError
|
|
from app.web_time import local_date_range_to_utc
|
|
|
|
SENSOR_DEVICE_TYPES = {"WoIOSensor"}
|
|
|
|
|
|
def create_app() -> Flask:
|
|
app = Flask(__name__)
|
|
app.secret_key = config.flask_secret_key
|
|
init_db()
|
|
|
|
@app.context_processor
|
|
def inject_common_template_vars() -> dict[str, int]:
|
|
return {"collect_interval_seconds": config.collect_interval_seconds}
|
|
|
|
@app.get("/")
|
|
def dashboard() -> str:
|
|
tz = ZoneInfo(config.app_timezone)
|
|
today = datetime.now(tz).date()
|
|
start_utc, end_utc = local_date_range_to_utc(today, today, tz)
|
|
|
|
with SessionLocal() as session:
|
|
devices = list(
|
|
session.scalars(
|
|
select(Device)
|
|
.where(Device.device_type.in_(SENSOR_DEVICE_TYPES))
|
|
.order_by(Device.name)
|
|
)
|
|
)
|
|
readings = load_readings(session, start_utc, end_utc)
|
|
latest = latest_by_device(session)
|
|
|
|
return render_template(
|
|
"dashboard.html",
|
|
devices=devices,
|
|
latest=latest,
|
|
latest_relative=latest_ages_for_display(latest, tz),
|
|
stats=stats_by_device(readings),
|
|
chart_json=json.dumps(chart_payload(readings, tz)),
|
|
timezone=config.app_timezone,
|
|
)
|
|
|
|
@app.get("/devices/<device_id>")
|
|
def device_detail(device_id: str) -> str:
|
|
tz = ZoneInfo(config.app_timezone)
|
|
selected_date_text = request.args.get("date") or datetime.now(tz).date().isoformat()
|
|
selected_date = date.fromisoformat(selected_date_text)
|
|
start_utc, end_utc = local_date_range_to_utc(selected_date, selected_date, tz)
|
|
|
|
with SessionLocal() as session:
|
|
device = session.get(Device, device_id)
|
|
if device is None or device.device_type not in SENSOR_DEVICE_TYPES:
|
|
return render_template("not_found.html"), 404
|
|
readings = load_device_readings(session, device_id, start_utc, end_utc)
|
|
|
|
return render_template(
|
|
"device.html",
|
|
device=device,
|
|
date=selected_date_text,
|
|
readings=readings,
|
|
stats=stats_by_device(readings).get(device_id),
|
|
chart_json=json.dumps(chart_payload(readings, tz)),
|
|
timezone=config.app_timezone,
|
|
local_readings=readings_for_display(readings, tz),
|
|
)
|
|
|
|
@app.get("/reports")
|
|
def reports() -> str:
|
|
tz = ZoneInfo(config.app_timezone)
|
|
start_text = request.args.get("start") or datetime.now(tz).date().isoformat()
|
|
end_text = request.args.get("end") or start_text
|
|
device_id = request.args.get("device_id") or ""
|
|
|
|
start_date = date.fromisoformat(start_text)
|
|
end_date = date.fromisoformat(end_text)
|
|
start_utc, end_utc = local_date_range_to_utc(start_date, end_date, tz)
|
|
|
|
with SessionLocal() as session:
|
|
devices = list(
|
|
session.scalars(
|
|
select(Device)
|
|
.where(Device.device_type.in_(SENSOR_DEVICE_TYPES))
|
|
.order_by(Device.name)
|
|
)
|
|
)
|
|
rows = report_rows(session, start_utc, end_utc, device_id)
|
|
|
|
return render_template(
|
|
"reports.html",
|
|
devices=devices,
|
|
rows=rows,
|
|
start=start_text,
|
|
end=end_text,
|
|
device_id=device_id,
|
|
)
|
|
|
|
@app.get("/reports.csv")
|
|
def reports_csv() -> Response:
|
|
tz = ZoneInfo(config.app_timezone)
|
|
start_date = date.fromisoformat(request.args.get("start") or datetime.now(tz).date().isoformat())
|
|
end_date = date.fromisoformat(request.args.get("end") or start_date.isoformat())
|
|
device_id = request.args.get("device_id") or ""
|
|
start_utc, end_utc = local_date_range_to_utc(start_date, end_date, tz)
|
|
|
|
with SessionLocal() as session:
|
|
rows = report_rows(session, start_utc, end_utc, device_id)
|
|
|
|
output = io.StringIO()
|
|
writer = csv.writer(output)
|
|
writer.writerow(
|
|
[
|
|
"device",
|
|
"samples",
|
|
"low_temp",
|
|
"high_temp",
|
|
"avg_temp",
|
|
"low_humidity",
|
|
"high_humidity",
|
|
"avg_humidity",
|
|
]
|
|
)
|
|
for row in rows:
|
|
writer.writerow(
|
|
[
|
|
row["device_name"],
|
|
row["samples"],
|
|
row["low_temp"],
|
|
row["high_temp"],
|
|
row["avg_temp"],
|
|
row["low_humidity"],
|
|
row["high_humidity"],
|
|
row["avg_humidity"],
|
|
]
|
|
)
|
|
|
|
return Response(
|
|
output.getvalue(),
|
|
mimetype="text/csv",
|
|
headers={"Content-Disposition": "attachment; filename=switchbot-report.csv"},
|
|
)
|
|
|
|
@app.post("/reports/email-pdf")
|
|
def email_pdf_report() -> Response:
|
|
tz = ZoneInfo(config.app_timezone)
|
|
start_text = request.form.get("start") or datetime.now(tz).date().isoformat()
|
|
end_text = request.form.get("end") or start_text
|
|
device_id = request.form.get("device_id") or ""
|
|
to_email = (request.form.get("email") or "").strip()
|
|
|
|
if not to_email:
|
|
flash("Enter an email address for the PDF report.", "error")
|
|
return redirect(url_for("reports", start=start_text, end=end_text, device_id=device_id))
|
|
|
|
try:
|
|
start_date = date.fromisoformat(start_text)
|
|
end_date = date.fromisoformat(end_text)
|
|
except ValueError:
|
|
flash("Choose a valid report date range.", "error")
|
|
return redirect(url_for("reports"))
|
|
|
|
if end_date < start_date:
|
|
flash("Report end date must be on or after the start date.", "error")
|
|
return redirect(url_for("reports", start=start_text, end=end_text, device_id=device_id))
|
|
|
|
try:
|
|
with SessionLocal() as session:
|
|
result = send_temperature_report(
|
|
session=session,
|
|
to_email=to_email,
|
|
start_date=start_date,
|
|
end_date=end_date,
|
|
tz=tz,
|
|
device_id=device_id,
|
|
)
|
|
except EmailConfigurationError as exc:
|
|
flash(str(exc), "error")
|
|
except Exception as exc:
|
|
flash(f"Could not create or send the PDF report: {exc}", "error")
|
|
else:
|
|
flash(f"Sent PDF report for {result.period_label} to {to_email}.", "success")
|
|
|
|
return redirect(url_for("reports", start=start_text, end=end_text, device_id=device_id))
|
|
|
|
@app.get("/report-settings")
|
|
def report_settings() -> str:
|
|
with SessionLocal() as session:
|
|
recipients = list(session.scalars(select(ReportRecipient).order_by(ReportRecipient.email)))
|
|
schedules = list(
|
|
session.scalars(
|
|
select(ScheduledReport)
|
|
.options(joinedload(ScheduledReport.recipient), joinedload(ScheduledReport.device))
|
|
.order_by(ScheduledReport.cadence, ScheduledReport.send_time, ScheduledReport.id)
|
|
)
|
|
)
|
|
devices = list(
|
|
session.scalars(
|
|
select(Device)
|
|
.where(Device.device_type.in_(SENSOR_DEVICE_TYPES))
|
|
.order_by(Device.name)
|
|
)
|
|
)
|
|
runs = list(
|
|
session.scalars(
|
|
select(ScheduledReportRun)
|
|
.options(
|
|
joinedload(ScheduledReportRun.scheduled_report).joinedload(ScheduledReport.recipient),
|
|
joinedload(ScheduledReportRun.scheduled_report).joinedload(ScheduledReport.device),
|
|
)
|
|
.order_by(ScheduledReportRun.created_at.desc())
|
|
.limit(20)
|
|
)
|
|
)
|
|
|
|
return render_template(
|
|
"report_settings.html",
|
|
recipients=recipients,
|
|
schedules=schedules,
|
|
devices=devices,
|
|
runs=runs,
|
|
schedule_labels={schedule.id: cadence_label(schedule) for schedule in schedules},
|
|
weekdays=[
|
|
(0, "Monday"),
|
|
(1, "Tuesday"),
|
|
(2, "Wednesday"),
|
|
(3, "Thursday"),
|
|
(4, "Friday"),
|
|
(5, "Saturday"),
|
|
(6, "Sunday"),
|
|
],
|
|
)
|
|
|
|
@app.post("/report-settings/recipients")
|
|
def add_report_recipient() -> Response:
|
|
email = (request.form.get("email") or "").strip().lower()
|
|
name = (request.form.get("name") or "").strip() or None
|
|
if not email or "@" not in email:
|
|
flash("Enter a valid recipient email address.", "error")
|
|
return redirect(url_for("report_settings"))
|
|
|
|
with SessionLocal() as session:
|
|
session.add(ReportRecipient(email=email, name=name))
|
|
try:
|
|
session.commit()
|
|
except IntegrityError:
|
|
session.rollback()
|
|
flash("That recipient already exists.", "error")
|
|
else:
|
|
flash(f"Added report recipient {email}.", "success")
|
|
return redirect(url_for("report_settings"))
|
|
|
|
@app.post("/report-settings/recipients/<int:recipient_id>/toggle")
|
|
def toggle_report_recipient(recipient_id: int) -> Response:
|
|
with SessionLocal() as session:
|
|
recipient = session.get(ReportRecipient, recipient_id)
|
|
if recipient is None:
|
|
flash("Recipient not found.", "error")
|
|
else:
|
|
recipient.active = not recipient.active
|
|
session.commit()
|
|
flash(f"{'Enabled' if recipient.active else 'Paused'} {recipient.email}.", "success")
|
|
return redirect(url_for("report_settings"))
|
|
|
|
@app.post("/report-settings/schedules")
|
|
def add_scheduled_report() -> Response:
|
|
recipient_id = request.form.get("recipient_id", type=int)
|
|
cadence = (request.form.get("cadence") or "").strip().lower()
|
|
send_time = (request.form.get("send_time") or "08:00").strip()
|
|
device_id = (request.form.get("device_id") or "").strip() or None
|
|
|
|
if recipient_id is None or cadence not in {"daily", "weekly", "monthly"}:
|
|
flash("Choose a recipient and cadence for the scheduled report.", "error")
|
|
return redirect(url_for("report_settings"))
|
|
if not valid_send_time(send_time):
|
|
flash("Use a valid send time in HH:MM format.", "error")
|
|
return redirect(url_for("report_settings"))
|
|
|
|
weekday = None
|
|
monthday = None
|
|
if cadence == "weekly":
|
|
weekday = request.form.get("weekday", type=int)
|
|
if weekday is None or weekday < 0 or weekday > 6:
|
|
flash("Choose a weekday for weekly reports.", "error")
|
|
return redirect(url_for("report_settings"))
|
|
if cadence == "monthly":
|
|
monthday = request.form.get("monthday", type=int)
|
|
if monthday is None or monthday < 1 or monthday > 31:
|
|
flash("Choose a day from 1 to 31 for monthly reports.", "error")
|
|
return redirect(url_for("report_settings"))
|
|
|
|
with SessionLocal() as session:
|
|
recipient = session.get(ReportRecipient, recipient_id)
|
|
if recipient is None:
|
|
flash("Recipient not found.", "error")
|
|
return redirect(url_for("report_settings"))
|
|
if device_id and session.get(Device, device_id) is None:
|
|
flash("Device not found.", "error")
|
|
return redirect(url_for("report_settings"))
|
|
if scheduled_report_exists(session, recipient_id, cadence, send_time, weekday, monthday, device_id):
|
|
flash("That scheduled report already exists.", "error")
|
|
return redirect(url_for("report_settings"))
|
|
|
|
session.add(
|
|
ScheduledReport(
|
|
recipient_id=recipient_id,
|
|
cadence=cadence,
|
|
send_time=send_time,
|
|
weekday=weekday,
|
|
monthday=monthday,
|
|
device_id=device_id,
|
|
)
|
|
)
|
|
session.commit()
|
|
flash("Added scheduled report.", "success")
|
|
return redirect(url_for("report_settings"))
|
|
|
|
@app.post("/report-settings/schedules/<int:schedule_id>/toggle")
|
|
def toggle_scheduled_report(schedule_id: int) -> Response:
|
|
with SessionLocal() as session:
|
|
schedule = session.get(ScheduledReport, schedule_id)
|
|
if schedule is None:
|
|
flash("Scheduled report not found.", "error")
|
|
else:
|
|
schedule.active = not schedule.active
|
|
session.commit()
|
|
flash(f"{'Enabled' if schedule.active else 'Paused'} scheduled report.", "success")
|
|
return redirect(url_for("report_settings"))
|
|
|
|
return app
|
|
|
|
|
|
def load_readings(session: Session, start_utc: datetime, end_utc: datetime) -> list[Reading]:
|
|
stmt = (
|
|
select(Reading)
|
|
.options(joinedload(Reading.device))
|
|
.join(Reading.device)
|
|
.where(Reading.recorded_at >= start_utc, Reading.recorded_at < end_utc)
|
|
.order_by(Reading.recorded_at)
|
|
)
|
|
return list(session.scalars(stmt))
|
|
|
|
|
|
def load_device_readings(
|
|
session: Session,
|
|
device_id: str,
|
|
start_utc: datetime,
|
|
end_utc: datetime,
|
|
) -> list[Reading]:
|
|
stmt = (
|
|
select(Reading)
|
|
.options(joinedload(Reading.device))
|
|
.where(
|
|
Reading.device_id == device_id,
|
|
Reading.recorded_at >= start_utc,
|
|
Reading.recorded_at < end_utc,
|
|
)
|
|
.order_by(Reading.recorded_at)
|
|
)
|
|
return list(session.scalars(stmt))
|
|
|
|
|
|
def latest_by_device(session: Session) -> dict[str, Reading]:
|
|
subquery = (
|
|
select(Reading.device_id, func.max(Reading.recorded_at).label("latest_at"))
|
|
.group_by(Reading.device_id)
|
|
.subquery()
|
|
)
|
|
rows = session.scalars(
|
|
select(Reading)
|
|
.join(subquery, (Reading.device_id == subquery.c.device_id) & (Reading.recorded_at == subquery.c.latest_at))
|
|
.order_by(Reading.device_id)
|
|
)
|
|
return {reading.device_id: reading for reading in rows}
|
|
|
|
|
|
def stats_by_device(readings: list[Reading]) -> dict[str, dict[str, float | int]]:
|
|
stats: dict[str, dict[str, float | int]] = {}
|
|
for reading in readings:
|
|
if reading.temperature is None:
|
|
continue
|
|
item = stats.setdefault(
|
|
reading.device_id,
|
|
{
|
|
"low": reading.temperature,
|
|
"high": reading.temperature,
|
|
"samples": 0,
|
|
},
|
|
)
|
|
item["low"] = min(float(item["low"]), reading.temperature)
|
|
item["high"] = max(float(item["high"]), reading.temperature)
|
|
item["samples"] = int(item["samples"]) + 1
|
|
return stats
|
|
|
|
|
|
def chart_payload(readings: list[Reading], tz: ZoneInfo) -> list[dict[str, object]]:
|
|
series: dict[str, dict[str, object]] = {}
|
|
for reading in readings:
|
|
if reading.temperature is None:
|
|
continue
|
|
item = series.setdefault(
|
|
reading.device_id,
|
|
{
|
|
"deviceId": reading.device_id,
|
|
"name": reading.device.name,
|
|
"points": [],
|
|
},
|
|
)
|
|
local_time = reading.recorded_at.replace(tzinfo=timezone.utc).astimezone(tz)
|
|
item["points"].append(
|
|
{
|
|
"time": local_time.isoformat(timespec="minutes"),
|
|
"temperature": reading.temperature,
|
|
"humidity": reading.humidity,
|
|
}
|
|
)
|
|
return list(series.values())
|
|
|
|
|
|
def readings_for_display(readings: list[Reading], tz: ZoneInfo) -> list[dict[str, object]]:
|
|
rows = []
|
|
for reading in readings:
|
|
local_time = reading.recorded_at.replace(tzinfo=timezone.utc).astimezone(tz)
|
|
rows.append(
|
|
{
|
|
"timestamp": local_time.strftime("%Y-%m-%d %H:%M"),
|
|
"temperature": reading.temperature,
|
|
"humidity": reading.humidity,
|
|
"battery": reading.battery,
|
|
}
|
|
)
|
|
return rows
|
|
|
|
|
|
def latest_ages_for_display(latest: dict[str, Reading], tz: ZoneInfo) -> dict[str, str]:
|
|
now_local = datetime.now(tz)
|
|
ages: dict[str, str] = {}
|
|
|
|
for device_id, reading in latest.items():
|
|
collected_local = reading.recorded_at.replace(tzinfo=timezone.utc).astimezone(tz)
|
|
delta_seconds = max(0, int((now_local - collected_local).total_seconds()))
|
|
|
|
if delta_seconds < 60:
|
|
ages[device_id] = "just now"
|
|
continue
|
|
|
|
minutes = delta_seconds // 60
|
|
if minutes < 60:
|
|
ages[device_id] = f"{minutes} min ago"
|
|
continue
|
|
|
|
hours = minutes // 60
|
|
if hours < 24:
|
|
ages[device_id] = f"{hours} h ago"
|
|
continue
|
|
|
|
days = hours // 24
|
|
ages[device_id] = f"{days} d ago"
|
|
|
|
return ages
|
|
|
|
|
|
def report_rows(
|
|
session: Session,
|
|
start_utc: datetime,
|
|
end_utc: datetime,
|
|
device_id: str = "",
|
|
) -> list[dict[str, object]]:
|
|
stmt: Select[tuple] = (
|
|
select(
|
|
Device.name,
|
|
func.count(Reading.id),
|
|
func.min(Reading.temperature),
|
|
func.max(Reading.temperature),
|
|
func.avg(Reading.temperature),
|
|
func.min(Reading.humidity),
|
|
func.max(Reading.humidity),
|
|
func.avg(Reading.humidity),
|
|
)
|
|
.join(Reading.device)
|
|
.where(Reading.recorded_at >= start_utc, Reading.recorded_at < end_utc)
|
|
.group_by(Device.id, Device.name)
|
|
.order_by(Device.name)
|
|
)
|
|
if device_id:
|
|
stmt = stmt.where(Device.id == device_id)
|
|
|
|
rows = []
|
|
for row in session.execute(stmt):
|
|
rows.append(
|
|
{
|
|
"device_name": row[0],
|
|
"samples": row[1],
|
|
"low_temp": rounded(row[2]),
|
|
"high_temp": rounded(row[3]),
|
|
"avg_temp": rounded(row[4]),
|
|
"low_humidity": rounded(row[5]),
|
|
"high_humidity": rounded(row[6]),
|
|
"avg_humidity": rounded(row[7]),
|
|
}
|
|
)
|
|
return rows
|
|
|
|
|
|
def valid_send_time(value: str) -> bool:
|
|
try:
|
|
time.fromisoformat(value)
|
|
except ValueError:
|
|
return False
|
|
return len(value) == 5
|
|
|
|
|
|
def scheduled_report_exists(
|
|
session: Session,
|
|
recipient_id: int,
|
|
cadence: str,
|
|
send_time: str,
|
|
weekday: int | None,
|
|
monthday: int | None,
|
|
device_id: str | None,
|
|
) -> bool:
|
|
schedules = session.scalars(
|
|
select(ScheduledReport).where(
|
|
ScheduledReport.recipient_id == recipient_id,
|
|
ScheduledReport.cadence == cadence,
|
|
ScheduledReport.send_time == send_time,
|
|
)
|
|
)
|
|
for schedule in schedules:
|
|
if schedule.weekday == weekday and schedule.monthday == monthday and schedule.device_id == device_id:
|
|
return True
|
|
return False
|
|
|
|
|
|
def rounded(value: float | None) -> float | None:
|
|
if value is None:
|
|
return None
|
|
return round(float(value), 1)
|
|
|
|
|
|
app = create_app()
|