346 lines
11 KiB
Python
346 lines
11 KiB
Python
from __future__ import annotations
|
|
|
|
import csv
|
|
import io
|
|
import json
|
|
from datetime import date, datetime, time, timedelta, timezone
|
|
from zoneinfo import ZoneInfo
|
|
|
|
from flask import Flask, Response, render_template, request
|
|
from sqlalchemy import Select, func, select
|
|
from sqlalchemy.orm import Session, joinedload
|
|
|
|
from app.config import config
|
|
from app.db import SessionLocal, init_db
|
|
from app.models import Device, Reading
|
|
|
|
SENSOR_DEVICE_TYPES = {"WoIOSensor"}
|
|
|
|
|
|
def create_app() -> Flask:
|
|
app = Flask(__name__)
|
|
app.secret_key = config.flask_secret_key
|
|
init_db()
|
|
|
|
@app.context_processor
|
|
def inject_common_template_vars() -> dict[str, int]:
|
|
return {"collect_interval_seconds": config.collect_interval_seconds}
|
|
|
|
@app.get("/")
|
|
def dashboard() -> str:
|
|
tz = ZoneInfo(config.app_timezone)
|
|
today = datetime.now(tz).date()
|
|
start_utc, end_utc = local_date_range_to_utc(today, today, tz)
|
|
|
|
with SessionLocal() as session:
|
|
devices = list(
|
|
session.scalars(
|
|
select(Device)
|
|
.where(Device.device_type.in_(SENSOR_DEVICE_TYPES))
|
|
.order_by(Device.name)
|
|
)
|
|
)
|
|
readings = load_readings(session, start_utc, end_utc)
|
|
latest = latest_by_device(session)
|
|
|
|
return render_template(
|
|
"dashboard.html",
|
|
devices=devices,
|
|
latest=latest,
|
|
latest_relative=latest_ages_for_display(latest, tz),
|
|
stats=stats_by_device(readings),
|
|
chart_json=json.dumps(chart_payload(readings, tz)),
|
|
timezone=config.app_timezone,
|
|
)
|
|
|
|
@app.get("/devices/<device_id>")
|
|
def device_detail(device_id: str) -> str:
|
|
tz = ZoneInfo(config.app_timezone)
|
|
selected_date_text = request.args.get("date") or datetime.now(tz).date().isoformat()
|
|
selected_date = date.fromisoformat(selected_date_text)
|
|
start_utc, end_utc = local_date_range_to_utc(selected_date, selected_date, tz)
|
|
|
|
with SessionLocal() as session:
|
|
device = session.get(Device, device_id)
|
|
if device is None or device.device_type not in SENSOR_DEVICE_TYPES:
|
|
return render_template("not_found.html"), 404
|
|
readings = load_device_readings(session, device_id, start_utc, end_utc)
|
|
|
|
return render_template(
|
|
"device.html",
|
|
device=device,
|
|
date=selected_date_text,
|
|
readings=readings,
|
|
stats=stats_by_device(readings).get(device_id),
|
|
chart_json=json.dumps(chart_payload(readings, tz)),
|
|
timezone=config.app_timezone,
|
|
local_readings=readings_for_display(readings, tz),
|
|
)
|
|
|
|
@app.get("/reports")
|
|
def reports() -> str:
|
|
tz = ZoneInfo(config.app_timezone)
|
|
start_text = request.args.get("start") or datetime.now(tz).date().isoformat()
|
|
end_text = request.args.get("end") or start_text
|
|
device_id = request.args.get("device_id") or ""
|
|
|
|
start_date = date.fromisoformat(start_text)
|
|
end_date = date.fromisoformat(end_text)
|
|
start_utc, end_utc = local_date_range_to_utc(start_date, end_date, tz)
|
|
|
|
with SessionLocal() as session:
|
|
devices = list(
|
|
session.scalars(
|
|
select(Device)
|
|
.where(Device.device_type.in_(SENSOR_DEVICE_TYPES))
|
|
.order_by(Device.name)
|
|
)
|
|
)
|
|
rows = report_rows(session, start_utc, end_utc, device_id)
|
|
|
|
return render_template(
|
|
"reports.html",
|
|
devices=devices,
|
|
rows=rows,
|
|
start=start_text,
|
|
end=end_text,
|
|
device_id=device_id,
|
|
)
|
|
|
|
@app.get("/reports.csv")
|
|
def reports_csv() -> Response:
|
|
tz = ZoneInfo(config.app_timezone)
|
|
start_date = date.fromisoformat(request.args.get("start") or datetime.now(tz).date().isoformat())
|
|
end_date = date.fromisoformat(request.args.get("end") or start_date.isoformat())
|
|
device_id = request.args.get("device_id") or ""
|
|
start_utc, end_utc = local_date_range_to_utc(start_date, end_date, tz)
|
|
|
|
with SessionLocal() as session:
|
|
rows = report_rows(session, start_utc, end_utc, device_id)
|
|
|
|
output = io.StringIO()
|
|
writer = csv.writer(output)
|
|
writer.writerow(
|
|
[
|
|
"device",
|
|
"samples",
|
|
"low_temp",
|
|
"high_temp",
|
|
"avg_temp",
|
|
"low_humidity",
|
|
"high_humidity",
|
|
"avg_humidity",
|
|
]
|
|
)
|
|
for row in rows:
|
|
writer.writerow(
|
|
[
|
|
row["device_name"],
|
|
row["samples"],
|
|
row["low_temp"],
|
|
row["high_temp"],
|
|
row["avg_temp"],
|
|
row["low_humidity"],
|
|
row["high_humidity"],
|
|
row["avg_humidity"],
|
|
]
|
|
)
|
|
|
|
return Response(
|
|
output.getvalue(),
|
|
mimetype="text/csv",
|
|
headers={"Content-Disposition": "attachment; filename=switchbot-report.csv"},
|
|
)
|
|
|
|
return app
|
|
|
|
|
|
def local_date_range_to_utc(start: date, end: date, tz: ZoneInfo) -> tuple[datetime, datetime]:
|
|
local_start = datetime.combine(start, time.min, tzinfo=tz)
|
|
local_end = datetime.combine(end + timedelta(days=1), time.min, tzinfo=tz)
|
|
return (
|
|
local_start.astimezone(timezone.utc).replace(tzinfo=None),
|
|
local_end.astimezone(timezone.utc).replace(tzinfo=None),
|
|
)
|
|
|
|
|
|
def load_readings(session: Session, start_utc: datetime, end_utc: datetime) -> list[Reading]:
|
|
stmt = (
|
|
select(Reading)
|
|
.options(joinedload(Reading.device))
|
|
.join(Reading.device)
|
|
.where(Reading.recorded_at >= start_utc, Reading.recorded_at < end_utc)
|
|
.order_by(Reading.recorded_at)
|
|
)
|
|
return list(session.scalars(stmt))
|
|
|
|
|
|
def load_device_readings(
|
|
session: Session,
|
|
device_id: str,
|
|
start_utc: datetime,
|
|
end_utc: datetime,
|
|
) -> list[Reading]:
|
|
stmt = (
|
|
select(Reading)
|
|
.options(joinedload(Reading.device))
|
|
.where(
|
|
Reading.device_id == device_id,
|
|
Reading.recorded_at >= start_utc,
|
|
Reading.recorded_at < end_utc,
|
|
)
|
|
.order_by(Reading.recorded_at)
|
|
)
|
|
return list(session.scalars(stmt))
|
|
|
|
|
|
def latest_by_device(session: Session) -> dict[str, Reading]:
|
|
subquery = (
|
|
select(Reading.device_id, func.max(Reading.recorded_at).label("latest_at"))
|
|
.group_by(Reading.device_id)
|
|
.subquery()
|
|
)
|
|
rows = session.scalars(
|
|
select(Reading)
|
|
.join(subquery, (Reading.device_id == subquery.c.device_id) & (Reading.recorded_at == subquery.c.latest_at))
|
|
.order_by(Reading.device_id)
|
|
)
|
|
return {reading.device_id: reading for reading in rows}
|
|
|
|
|
|
def stats_by_device(readings: list[Reading]) -> dict[str, dict[str, float | int]]:
|
|
stats: dict[str, dict[str, float | int]] = {}
|
|
for reading in readings:
|
|
if reading.temperature is None:
|
|
continue
|
|
item = stats.setdefault(
|
|
reading.device_id,
|
|
{
|
|
"low": reading.temperature,
|
|
"high": reading.temperature,
|
|
"samples": 0,
|
|
},
|
|
)
|
|
item["low"] = min(float(item["low"]), reading.temperature)
|
|
item["high"] = max(float(item["high"]), reading.temperature)
|
|
item["samples"] = int(item["samples"]) + 1
|
|
return stats
|
|
|
|
|
|
def chart_payload(readings: list[Reading], tz: ZoneInfo) -> list[dict[str, object]]:
|
|
series: dict[str, dict[str, object]] = {}
|
|
for reading in readings:
|
|
if reading.temperature is None:
|
|
continue
|
|
item = series.setdefault(
|
|
reading.device_id,
|
|
{
|
|
"deviceId": reading.device_id,
|
|
"name": reading.device.name,
|
|
"points": [],
|
|
},
|
|
)
|
|
local_time = reading.recorded_at.replace(tzinfo=timezone.utc).astimezone(tz)
|
|
item["points"].append(
|
|
{
|
|
"time": local_time.isoformat(timespec="minutes"),
|
|
"temperature": reading.temperature,
|
|
"humidity": reading.humidity,
|
|
}
|
|
)
|
|
return list(series.values())
|
|
|
|
|
|
def readings_for_display(readings: list[Reading], tz: ZoneInfo) -> list[dict[str, object]]:
|
|
rows = []
|
|
for reading in readings:
|
|
local_time = reading.recorded_at.replace(tzinfo=timezone.utc).astimezone(tz)
|
|
rows.append(
|
|
{
|
|
"timestamp": local_time.strftime("%Y-%m-%d %H:%M"),
|
|
"temperature": reading.temperature,
|
|
"humidity": reading.humidity,
|
|
"battery": reading.battery,
|
|
}
|
|
)
|
|
return rows
|
|
|
|
|
|
def latest_ages_for_display(latest: dict[str, Reading], tz: ZoneInfo) -> dict[str, str]:
|
|
now_local = datetime.now(tz)
|
|
ages: dict[str, str] = {}
|
|
|
|
for device_id, reading in latest.items():
|
|
collected_local = reading.recorded_at.replace(tzinfo=timezone.utc).astimezone(tz)
|
|
delta_seconds = max(0, int((now_local - collected_local).total_seconds()))
|
|
|
|
if delta_seconds < 60:
|
|
ages[device_id] = "just now"
|
|
continue
|
|
|
|
minutes = delta_seconds // 60
|
|
if minutes < 60:
|
|
ages[device_id] = f"{minutes} min ago"
|
|
continue
|
|
|
|
hours = minutes // 60
|
|
if hours < 24:
|
|
ages[device_id] = f"{hours} h ago"
|
|
continue
|
|
|
|
days = hours // 24
|
|
ages[device_id] = f"{days} d ago"
|
|
|
|
return ages
|
|
|
|
|
|
def report_rows(
|
|
session: Session,
|
|
start_utc: datetime,
|
|
end_utc: datetime,
|
|
device_id: str = "",
|
|
) -> list[dict[str, object]]:
|
|
stmt: Select[tuple] = (
|
|
select(
|
|
Device.name,
|
|
func.count(Reading.id),
|
|
func.min(Reading.temperature),
|
|
func.max(Reading.temperature),
|
|
func.avg(Reading.temperature),
|
|
func.min(Reading.humidity),
|
|
func.max(Reading.humidity),
|
|
func.avg(Reading.humidity),
|
|
)
|
|
.join(Reading.device)
|
|
.where(Reading.recorded_at >= start_utc, Reading.recorded_at < end_utc)
|
|
.group_by(Device.id, Device.name)
|
|
.order_by(Device.name)
|
|
)
|
|
if device_id:
|
|
stmt = stmt.where(Device.id == device_id)
|
|
|
|
rows = []
|
|
for row in session.execute(stmt):
|
|
rows.append(
|
|
{
|
|
"device_name": row[0],
|
|
"samples": row[1],
|
|
"low_temp": rounded(row[2]),
|
|
"high_temp": rounded(row[3]),
|
|
"avg_temp": rounded(row[4]),
|
|
"low_humidity": rounded(row[5]),
|
|
"high_humidity": rounded(row[6]),
|
|
"avg_humidity": rounded(row[7]),
|
|
}
|
|
)
|
|
return rows
|
|
|
|
|
|
def rounded(value: float | None) -> float | None:
|
|
if value is None:
|
|
return None
|
|
return round(float(value), 1)
|
|
|
|
|
|
app = create_app()
|