Files
mt-vet-temps/app/web.py
T
2026-06-02 04:02:00 -04:00

346 lines
11 KiB
Python

from __future__ import annotations
import csv
import io
import json
from datetime import date, datetime, time, timedelta, timezone
from zoneinfo import ZoneInfo
from flask import Flask, Response, render_template, request
from sqlalchemy import Select, func, select
from sqlalchemy.orm import Session, joinedload
from app.config import config
from app.db import SessionLocal, init_db
from app.models import Device, Reading
SENSOR_DEVICE_TYPES = {"WoIOSensor"}
def create_app() -> Flask:
app = Flask(__name__)
app.secret_key = config.flask_secret_key
init_db()
@app.context_processor
def inject_common_template_vars() -> dict[str, int]:
return {"collect_interval_seconds": config.collect_interval_seconds}
@app.get("/")
def dashboard() -> str:
tz = ZoneInfo(config.app_timezone)
today = datetime.now(tz).date()
start_utc, end_utc = local_date_range_to_utc(today, today, tz)
with SessionLocal() as session:
devices = list(
session.scalars(
select(Device)
.where(Device.device_type.in_(SENSOR_DEVICE_TYPES))
.order_by(Device.name)
)
)
readings = load_readings(session, start_utc, end_utc)
latest = latest_by_device(session)
return render_template(
"dashboard.html",
devices=devices,
latest=latest,
latest_relative=latest_ages_for_display(latest, tz),
stats=stats_by_device(readings),
chart_json=json.dumps(chart_payload(readings, tz)),
timezone=config.app_timezone,
)
@app.get("/devices/<device_id>")
def device_detail(device_id: str) -> str:
tz = ZoneInfo(config.app_timezone)
selected_date_text = request.args.get("date") or datetime.now(tz).date().isoformat()
selected_date = date.fromisoformat(selected_date_text)
start_utc, end_utc = local_date_range_to_utc(selected_date, selected_date, tz)
with SessionLocal() as session:
device = session.get(Device, device_id)
if device is None or device.device_type not in SENSOR_DEVICE_TYPES:
return render_template("not_found.html"), 404
readings = load_device_readings(session, device_id, start_utc, end_utc)
return render_template(
"device.html",
device=device,
date=selected_date_text,
readings=readings,
stats=stats_by_device(readings).get(device_id),
chart_json=json.dumps(chart_payload(readings, tz)),
timezone=config.app_timezone,
local_readings=readings_for_display(readings, tz),
)
@app.get("/reports")
def reports() -> str:
tz = ZoneInfo(config.app_timezone)
start_text = request.args.get("start") or datetime.now(tz).date().isoformat()
end_text = request.args.get("end") or start_text
device_id = request.args.get("device_id") or ""
start_date = date.fromisoformat(start_text)
end_date = date.fromisoformat(end_text)
start_utc, end_utc = local_date_range_to_utc(start_date, end_date, tz)
with SessionLocal() as session:
devices = list(
session.scalars(
select(Device)
.where(Device.device_type.in_(SENSOR_DEVICE_TYPES))
.order_by(Device.name)
)
)
rows = report_rows(session, start_utc, end_utc, device_id)
return render_template(
"reports.html",
devices=devices,
rows=rows,
start=start_text,
end=end_text,
device_id=device_id,
)
@app.get("/reports.csv")
def reports_csv() -> Response:
tz = ZoneInfo(config.app_timezone)
start_date = date.fromisoformat(request.args.get("start") or datetime.now(tz).date().isoformat())
end_date = date.fromisoformat(request.args.get("end") or start_date.isoformat())
device_id = request.args.get("device_id") or ""
start_utc, end_utc = local_date_range_to_utc(start_date, end_date, tz)
with SessionLocal() as session:
rows = report_rows(session, start_utc, end_utc, device_id)
output = io.StringIO()
writer = csv.writer(output)
writer.writerow(
[
"device",
"samples",
"low_temp",
"high_temp",
"avg_temp",
"low_humidity",
"high_humidity",
"avg_humidity",
]
)
for row in rows:
writer.writerow(
[
row["device_name"],
row["samples"],
row["low_temp"],
row["high_temp"],
row["avg_temp"],
row["low_humidity"],
row["high_humidity"],
row["avg_humidity"],
]
)
return Response(
output.getvalue(),
mimetype="text/csv",
headers={"Content-Disposition": "attachment; filename=switchbot-report.csv"},
)
return app
def local_date_range_to_utc(start: date, end: date, tz: ZoneInfo) -> tuple[datetime, datetime]:
local_start = datetime.combine(start, time.min, tzinfo=tz)
local_end = datetime.combine(end + timedelta(days=1), time.min, tzinfo=tz)
return (
local_start.astimezone(timezone.utc).replace(tzinfo=None),
local_end.astimezone(timezone.utc).replace(tzinfo=None),
)
def load_readings(session: Session, start_utc: datetime, end_utc: datetime) -> list[Reading]:
stmt = (
select(Reading)
.options(joinedload(Reading.device))
.join(Reading.device)
.where(Reading.recorded_at >= start_utc, Reading.recorded_at < end_utc)
.order_by(Reading.recorded_at)
)
return list(session.scalars(stmt))
def load_device_readings(
session: Session,
device_id: str,
start_utc: datetime,
end_utc: datetime,
) -> list[Reading]:
stmt = (
select(Reading)
.options(joinedload(Reading.device))
.where(
Reading.device_id == device_id,
Reading.recorded_at >= start_utc,
Reading.recorded_at < end_utc,
)
.order_by(Reading.recorded_at)
)
return list(session.scalars(stmt))
def latest_by_device(session: Session) -> dict[str, Reading]:
subquery = (
select(Reading.device_id, func.max(Reading.recorded_at).label("latest_at"))
.group_by(Reading.device_id)
.subquery()
)
rows = session.scalars(
select(Reading)
.join(subquery, (Reading.device_id == subquery.c.device_id) & (Reading.recorded_at == subquery.c.latest_at))
.order_by(Reading.device_id)
)
return {reading.device_id: reading for reading in rows}
def stats_by_device(readings: list[Reading]) -> dict[str, dict[str, float | int]]:
stats: dict[str, dict[str, float | int]] = {}
for reading in readings:
if reading.temperature is None:
continue
item = stats.setdefault(
reading.device_id,
{
"low": reading.temperature,
"high": reading.temperature,
"samples": 0,
},
)
item["low"] = min(float(item["low"]), reading.temperature)
item["high"] = max(float(item["high"]), reading.temperature)
item["samples"] = int(item["samples"]) + 1
return stats
def chart_payload(readings: list[Reading], tz: ZoneInfo) -> list[dict[str, object]]:
series: dict[str, dict[str, object]] = {}
for reading in readings:
if reading.temperature is None:
continue
item = series.setdefault(
reading.device_id,
{
"deviceId": reading.device_id,
"name": reading.device.name,
"points": [],
},
)
local_time = reading.recorded_at.replace(tzinfo=timezone.utc).astimezone(tz)
item["points"].append(
{
"time": local_time.isoformat(timespec="minutes"),
"temperature": reading.temperature,
"humidity": reading.humidity,
}
)
return list(series.values())
def readings_for_display(readings: list[Reading], tz: ZoneInfo) -> list[dict[str, object]]:
rows = []
for reading in readings:
local_time = reading.recorded_at.replace(tzinfo=timezone.utc).astimezone(tz)
rows.append(
{
"timestamp": local_time.strftime("%Y-%m-%d %H:%M"),
"temperature": reading.temperature,
"humidity": reading.humidity,
"battery": reading.battery,
}
)
return rows
def latest_ages_for_display(latest: dict[str, Reading], tz: ZoneInfo) -> dict[str, str]:
now_local = datetime.now(tz)
ages: dict[str, str] = {}
for device_id, reading in latest.items():
collected_local = reading.recorded_at.replace(tzinfo=timezone.utc).astimezone(tz)
delta_seconds = max(0, int((now_local - collected_local).total_seconds()))
if delta_seconds < 60:
ages[device_id] = "just now"
continue
minutes = delta_seconds // 60
if minutes < 60:
ages[device_id] = f"{minutes} min ago"
continue
hours = minutes // 60
if hours < 24:
ages[device_id] = f"{hours} h ago"
continue
days = hours // 24
ages[device_id] = f"{days} d ago"
return ages
def report_rows(
session: Session,
start_utc: datetime,
end_utc: datetime,
device_id: str = "",
) -> list[dict[str, object]]:
stmt: Select[tuple] = (
select(
Device.name,
func.count(Reading.id),
func.min(Reading.temperature),
func.max(Reading.temperature),
func.avg(Reading.temperature),
func.min(Reading.humidity),
func.max(Reading.humidity),
func.avg(Reading.humidity),
)
.join(Reading.device)
.where(Reading.recorded_at >= start_utc, Reading.recorded_at < end_utc)
.group_by(Device.id, Device.name)
.order_by(Device.name)
)
if device_id:
stmt = stmt.where(Device.id == device_id)
rows = []
for row in session.execute(stmt):
rows.append(
{
"device_name": row[0],
"samples": row[1],
"low_temp": rounded(row[2]),
"high_temp": rounded(row[3]),
"avg_temp": rounded(row[4]),
"low_humidity": rounded(row[5]),
"high_humidity": rounded(row[6]),
"avg_humidity": rounded(row[7]),
}
)
return rows
def rounded(value: float | None) -> float | None:
if value is None:
return None
return round(float(value), 1)
app = create_app()