Initial Commit

This commit is contained in:
2026-06-01 21:02:36 +01:00
commit 4224a535ef
20 changed files with 1512 additions and 0 deletions
+7
View File
@@ -0,0 +1,7 @@
.git
.env
__pycache__/
*.py[cod]
.DS_Store
.venv/
mysql_data/
+11
View File
@@ -0,0 +1,11 @@
SWITCHBOT_TOKEN=replace-me
SWITCHBOT_SECRET=replace-me
MYSQL_DATABASE=switchbot
MYSQL_USER=switchbot
MYSQL_PASSWORD=switchbot_password
MYSQL_ROOT_PASSWORD=root_password
COLLECT_INTERVAL_SECONDS=900
APP_TIMEZONE=Europe/London
FLASK_SECRET_KEY=replace-with-a-random-string
+6
View File
@@ -0,0 +1,6 @@
.env
__pycache__/
*.py[cod]
.DS_Store
.venv/
mysql_data/
+13
View File
@@ -0,0 +1,13 @@
FROM python:3.12-slim
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8000
+140
View File
@@ -0,0 +1,140 @@
# SwitchBot Temperature Dashboard
Dockerised SwitchBot temperature monitor with:
- Flask web dashboard for desktop and mobile.
- Python collector polling the SwitchBot API v1.1.
- MySQL storage.
- Auto-populated device names from the SwitchBot API.
- Configurable collection interval, defaulting to 15 minutes.
- Day-so-far temperature graph with high/low cards.
- Date-range reports with CSV export.
## Quick Start
Create your local environment file:
```sh
cp .env.example .env
```
Edit `.env` and set:
```sh
SWITCHBOT_TOKEN=your-token
SWITCHBOT_SECRET=your-secret
FLASK_SECRET_KEY=a-random-local-secret
```
Start everything:
```sh
docker compose up --build
```
Open:
```text
http://localhost:8000
```
The collector service will create database tables, discover devices, and begin
recording temperature/humidity readings. To change the polling interval, update
`COLLECT_INTERVAL_SECONDS` in `.env`; `900` is 15 minutes.
Compose builds the internal database URL from `MYSQL_USER`, `MYSQL_PASSWORD`,
and `MYSQL_DATABASE`. Set `DATABASE_URL` only if you want to point the app at a
different database.
## Reports
Open `/reports` in the web app to generate a date-range summary. Use the
download button for a CSV export.
## Services
- `db`: MySQL 8.4 with persistent `mysql_data` volume.
- `web`: Flask app served by Gunicorn on port 8000.
- `collector`: SwitchBot polling loop.
## POC Script
The original proof-of-concept script is still available:
This demo signs a SwitchBot API v1.1 request and prints the JSON response.
## Usage
Set your credentials as environment variables:
```sh
export SWITCHBOT_TOKEN="your-token"
export SWITCHBOT_SECRET="your-secret"
```
List devices:
```sh
python3 switchbot_poc.py
```
List devices with an empty-inventory diagnostic:
```sh
python3 switchbot_poc.py --diagnose
```
List scenes:
```sh
python3 switchbot_poc.py --endpoint /scenes
```
Get a device status:
```sh
python3 switchbot_poc.py --endpoint /devices/YOUR_DEVICE_ID/status
```
You can also pass credentials directly:
```sh
python3 switchbot_poc.py --token "your-token" --secret "your-secret"
```
The v1.1 signature is:
```text
Base64(HMAC-SHA256(secret, token + timestamp_ms + nonce))
```
## Empty Device List
If `/devices` returns:
```json
{
"statusCode": 100,
"message": "success",
"body": {
"deviceList": [],
"infraredRemoteList": []
}
}
```
the signed API request is working, but the account behind that token has no
Cloud API-visible devices.
Things to check:
- The token and secret were generated from the same SwitchBot app account that
owns the devices.
- You are not logged into a different SwitchBot account, Apple/Google login, or
home/family than the one containing the devices.
- The devices appear in the SwitchBot mobile app while logged into that account.
- Bluetooth-only devices such as Bots, Locks, Curtains, and Meters generally
need to be reachable through a SwitchBot Hub for cloud/API access.
- If using app version 9 or newer, SwitchBot says the old manual "Cloud Service"
toggle was removed for some devices; keep the device near a Hub and verify it
is remotely controllable from the app.
+1
View File
@@ -0,0 +1 @@
"""SwitchBot temperature dashboard."""
+116
View File
@@ -0,0 +1,116 @@
from __future__ import annotations
import logging
import time
from datetime import datetime, timezone
from typing import Any
from sqlalchemy import select
from app.config import config
from app.db import init_db, session_scope
from app.models import Device, Reading
from app.switchbot import SwitchBotClient, SwitchBotError
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
)
logger = logging.getLogger(__name__)
SENSOR_DEVICE_TYPES = {"WoIOSensor"}
def upsert_device(device_data: dict[str, Any]) -> None:
device_id = device_data["deviceId"]
with session_scope() as session:
device = session.get(Device, device_id)
if device is None:
device = Device(id=device_id, name=device_data.get("deviceName", device_id), device_type="")
session.add(device)
device.name = device_data.get("deviceName", device_id)
device.device_type = device_data.get("deviceType", "")
device.enable_cloud_service = bool(device_data.get("enableCloudService"))
device.hub_device_id = device_data.get("hubDeviceId")
device.last_seen_at = datetime.now(timezone.utc).replace(tzinfo=None)
def sync_devices(client: SwitchBotClient) -> list[Device]:
logger.info("Syncing SwitchBot device list")
devices = client.devices()
logger.info("SwitchBot returned %s devices", len(devices))
for device_data in devices:
upsert_device(device_data)
with session_scope() as session:
return list(session.scalars(select(Device).order_by(Device.name)))
def save_reading(device: Device, status: dict[str, Any]) -> None:
if "temperature" not in status and "humidity" not in status:
logger.info("Skipping %s (%s): status has no temperature/humidity", device.name, device.id)
return
reading = Reading(
device_id=device.id,
recorded_at=datetime.now(timezone.utc).replace(tzinfo=None),
temperature=status.get("temperature"),
humidity=status.get("humidity"),
battery=status.get("battery"),
version=status.get("version"),
)
with session_scope() as session:
session.add(reading)
logger.info(
"Recorded %s: temp=%s humidity=%s battery=%s",
device.name,
reading.temperature,
reading.humidity,
reading.battery,
)
def collect_once(client: SwitchBotClient) -> None:
logger.info("Starting collection cycle")
devices = sync_devices(client)
for device in devices:
if device.device_type not in SENSOR_DEVICE_TYPES:
logger.info("Skipping %s (%s): %s is not a sensor", device.name, device.id, device.device_type)
continue
if not device.enable_cloud_service:
logger.info("Skipping %s (%s): cloud service disabled", device.name, device.id)
continue
try:
status = client.status(device.id)
except SwitchBotError as exc:
logger.warning("Could not read %s (%s): %s", device.name, device.id, exc)
continue
save_reading(device, status)
logger.info("Collection cycle complete")
def main() -> int:
if not config.switchbot_token or not config.switchbot_secret:
logger.error("SWITCHBOT_TOKEN and SWITCHBOT_SECRET are required")
return 2
init_db()
client = SwitchBotClient(config.switchbot_token, config.switchbot_secret)
interval = max(60, config.collect_interval_seconds)
logger.info("Collector started with %s second interval", interval)
while True:
started = time.monotonic()
try:
collect_once(client)
except Exception:
logger.exception("Collector cycle failed")
elapsed = time.monotonic() - started
time.sleep(max(1, interval - elapsed))
if __name__ == "__main__":
raise SystemExit(main())
+25
View File
@@ -0,0 +1,25 @@
from __future__ import annotations
import os
from dataclasses import dataclass
from dotenv import load_dotenv
load_dotenv()
@dataclass(frozen=True)
class Config:
database_url: str = os.getenv(
"DATABASE_URL",
"mysql+pymysql://switchbot:switchbot_password@localhost:3306/switchbot",
)
switchbot_token: str | None = os.getenv("SWITCHBOT_TOKEN")
switchbot_secret: str | None = os.getenv("SWITCHBOT_SECRET")
collect_interval_seconds: int = int(os.getenv("COLLECT_INTERVAL_SECONDS", "900"))
app_timezone: str = os.getenv("APP_TIMEZONE", "Europe/London")
flask_secret_key: str = os.getenv("FLASK_SECRET_KEY", "dev-only-secret")
config = Config()
+36
View File
@@ -0,0 +1,36 @@
from __future__ import annotations
from contextlib import contextmanager
from typing import Iterator
from sqlalchemy import create_engine
from sqlalchemy.orm import DeclarativeBase, Session, sessionmaker
from app.config import config
class Base(DeclarativeBase):
pass
engine = create_engine(config.database_url, pool_pre_ping=True, future=True)
SessionLocal = sessionmaker(bind=engine, autoflush=False, expire_on_commit=False, future=True)
def init_db() -> None:
from app import models # noqa: F401
Base.metadata.create_all(bind=engine)
@contextmanager
def session_scope() -> Iterator[Session]:
session = SessionLocal()
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
+46
View File
@@ -0,0 +1,46 @@
from __future__ import annotations
from datetime import datetime
from sqlalchemy import Boolean, DateTime, Float, ForeignKey, Index, Integer, String, func
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.db import Base
class Device(Base):
__tablename__ = "devices"
id: Mapped[str] = mapped_column(String(64), primary_key=True)
name: Mapped[str] = mapped_column(String(255), nullable=False)
device_type: Mapped[str] = mapped_column(String(64), nullable=False)
enable_cloud_service: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False)
hub_device_id: Mapped[str | None] = mapped_column(String(64))
last_seen_at: Mapped[datetime | None] = mapped_column(DateTime)
created_at: Mapped[datetime] = mapped_column(DateTime, server_default=func.now(), nullable=False)
updated_at: Mapped[datetime] = mapped_column(
DateTime,
server_default=func.now(),
onupdate=func.now(),
nullable=False,
)
readings: Mapped[list["Reading"]] = relationship(back_populates="device")
class Reading(Base):
__tablename__ = "readings"
__table_args__ = (
Index("ix_readings_device_recorded", "device_id", "recorded_at"),
Index("ix_readings_recorded", "recorded_at"),
)
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
device_id: Mapped[str] = mapped_column(ForeignKey("devices.id"), nullable=False)
recorded_at: Mapped[datetime] = mapped_column(DateTime, nullable=False)
temperature: Mapped[float | None] = mapped_column(Float)
humidity: Mapped[float | None] = mapped_column(Float)
battery: Mapped[int | None] = mapped_column(Integer)
version: Mapped[str | None] = mapped_column(String(32))
device: Mapped[Device] = relationship(back_populates="readings")
+110
View File
@@ -0,0 +1,110 @@
(function () {
const dataEl = document.getElementById("chart-data");
const canvas = document.getElementById("temperatureChart");
if (!dataEl || !canvas) return;
const series = JSON.parse(dataEl.textContent || "[]");
const colors = ["#166b5b", "#cf5a24", "#355c9a", "#7a4e9f", "#9a6235"];
const ctx = canvas.getContext("2d");
function resizeCanvas() {
const ratio = window.devicePixelRatio || 1;
const rect = canvas.getBoundingClientRect();
canvas.width = Math.max(320, Math.floor(rect.width * ratio));
canvas.height = Math.floor(360 * ratio);
ctx.setTransform(ratio, 0, 0, ratio, 0, 0);
}
function draw() {
resizeCanvas();
const width = canvas.clientWidth;
const height = 360;
const pad = { top: 24, right: 24, bottom: 44, left: 54 };
const plotW = width - pad.left - pad.right;
const plotH = height - pad.top - pad.bottom;
ctx.clearRect(0, 0, width, height);
const points = series.flatMap((item) =>
item.points.map((point) => ({
...point,
date: new Date(point.time),
}))
);
if (!points.length) {
ctx.fillStyle = "#64706b";
ctx.font = "16px system-ui, sans-serif";
ctx.fillText("Waiting for readings for today.", pad.left, pad.top + 40);
return;
}
const minTime = Math.min(...points.map((point) => point.date.getTime()));
const maxTime = Math.max(...points.map((point) => point.date.getTime()));
const temps = points.map((point) => Number(point.temperature));
const minTemp = Math.floor(Math.min(...temps) - 1);
const maxTemp = Math.ceil(Math.max(...temps) + 1);
const timeSpan = Math.max(1, maxTime - minTime);
const tempSpan = Math.max(1, maxTemp - minTemp);
function xFor(date) {
return pad.left + ((date.getTime() - minTime) / timeSpan) * plotW;
}
function yFor(temp) {
return pad.top + plotH - ((temp - minTemp) / tempSpan) * plotH;
}
ctx.strokeStyle = "#dfe5df";
ctx.lineWidth = 1;
ctx.fillStyle = "#64706b";
ctx.font = "12px system-ui, sans-serif";
for (let i = 0; i <= 4; i += 1) {
const y = pad.top + (plotH / 4) * i;
const temp = maxTemp - (tempSpan / 4) * i;
ctx.beginPath();
ctx.moveTo(pad.left, y);
ctx.lineTo(width - pad.right, y);
ctx.stroke();
ctx.fillText(`${temp.toFixed(1)}C`, 8, y + 4);
}
series.forEach((item, index) => {
const color = colors[index % colors.length];
const sorted = item.points
.map((point) => ({ ...point, date: new Date(point.time) }))
.sort((a, b) => a.date - b.date);
ctx.strokeStyle = color;
ctx.lineWidth = 3;
ctx.beginPath();
sorted.forEach((point, pointIndex) => {
const x = xFor(point.date);
const y = yFor(Number(point.temperature));
if (pointIndex === 0) ctx.moveTo(x, y);
else ctx.lineTo(x, y);
});
ctx.stroke();
sorted.forEach((point) => {
ctx.fillStyle = color;
ctx.beginPath();
ctx.arc(xFor(point.date), yFor(Number(point.temperature)), 3, 0, Math.PI * 2);
ctx.fill();
});
});
const legendY = height - 18;
series.forEach((item, index) => {
const x = pad.left + index * 150;
ctx.fillStyle = colors[index % colors.length];
ctx.fillRect(x, legendY - 9, 10, 10);
ctx.fillStyle = "#1d2522";
ctx.font = "12px system-ui, sans-serif";
ctx.fillText(item.name, x + 16, legendY);
});
}
window.addEventListener("resize", draw);
draw();
})();
+327
View File
@@ -0,0 +1,327 @@
:root {
color-scheme: light;
--bg: #f6f7f4;
--panel: #ffffff;
--ink: #1d2522;
--muted: #64706b;
--line: #dfe5df;
--brand: #166b5b;
--brand-strong: #0d4d41;
--accent: #cf5a24;
--shadow: 0 10px 30px rgba(22, 37, 32, 0.08);
}
* {
box-sizing: border-box;
}
body {
margin: 0;
background: var(--bg);
color: var(--ink);
font-family: Inter, ui-sans-serif, system-ui, -apple-system, BlinkMacSystemFont, "Segoe UI", sans-serif;
}
a {
color: inherit;
}
.topbar {
position: sticky;
top: 0;
z-index: 10;
display: flex;
align-items: center;
justify-content: space-between;
gap: 16px;
padding: 14px clamp(16px, 4vw, 44px);
border-bottom: 1px solid var(--line);
background: rgba(246, 247, 244, 0.92);
backdrop-filter: blur(10px);
}
.brand {
font-weight: 800;
text-decoration: none;
}
.nav {
display: flex;
gap: 8px;
}
.nav a {
border-radius: 999px;
padding: 8px 12px;
color: var(--muted);
text-decoration: none;
}
.nav a[aria-current="page"] {
background: var(--ink);
color: #fff;
}
.page {
width: min(1180px, calc(100vw - 32px));
margin: 0 auto;
padding: 28px 0 48px;
}
.hero {
display: flex;
align-items: end;
justify-content: space-between;
gap: 20px;
margin-bottom: 22px;
}
.eyebrow {
margin: 0 0 6px;
color: var(--accent);
font-size: 0.78rem;
font-weight: 800;
letter-spacing: 0.08em;
text-transform: uppercase;
}
h1,
h2,
p {
margin-top: 0;
}
h1 {
margin-bottom: 8px;
font-size: clamp(2rem, 4vw, 3.4rem);
line-height: 1;
}
h2 {
margin-bottom: 6px;
font-size: 1.05rem;
}
.muted {
color: var(--muted);
}
.button {
display: inline-flex;
align-items: center;
justify-content: center;
min-height: 42px;
border: 0;
border-radius: 8px;
padding: 0 16px;
background: var(--brand);
color: #fff;
font-weight: 700;
text-decoration: none;
cursor: pointer;
}
.button:hover {
background: var(--brand-strong);
}
.button.secondary {
border: 1px solid var(--line);
background: #fff;
color: var(--ink);
}
.cards {
display: grid;
grid-template-columns: repeat(4, minmax(0, 1fr));
gap: 14px;
margin-bottom: 18px;
}
.metric-card,
.panel,
.empty-state {
border: 1px solid var(--line);
border-radius: 8px;
background: var(--panel);
box-shadow: var(--shadow);
}
.metric-card {
padding: 16px;
}
.card-heading {
display: flex;
align-items: start;
justify-content: space-between;
gap: 12px;
margin-bottom: 18px;
}
.card-heading span {
flex: 0 0 auto;
border-radius: 999px;
padding: 4px 8px;
background: #edf3ef;
color: var(--muted);
font-size: 0.74rem;
font-weight: 700;
}
.reading-row {
display: flex;
align-items: baseline;
justify-content: space-between;
gap: 12px;
margin-bottom: 18px;
}
.reading-row strong {
font-size: 2.1rem;
line-height: 1;
}
.reading-row span {
color: var(--muted);
font-weight: 700;
}
.mini-stats {
display: grid;
grid-template-columns: repeat(3, minmax(0, 1fr));
gap: 10px;
margin: 0;
}
.mini-stats dt {
color: var(--muted);
font-size: 0.75rem;
}
.mini-stats dd {
margin: 2px 0 0;
font-weight: 800;
}
.panel {
padding: 18px;
}
.panel-heading {
display: flex;
align-items: center;
justify-content: space-between;
gap: 16px;
margin-bottom: 12px;
}
.chart-wrap {
min-height: 360px;
}
#temperatureChart {
display: block;
width: 100%;
min-height: 360px;
}
.empty-state {
padding: 28px;
}
.empty {
color: var(--muted);
text-align: center;
}
.report-form {
display: grid;
grid-template-columns: repeat(5, minmax(0, auto));
align-items: end;
gap: 12px;
margin-bottom: 18px;
}
.report-form label {
display: grid;
gap: 6px;
color: var(--muted);
font-size: 0.85rem;
font-weight: 700;
}
input,
select {
min-height: 42px;
border: 1px solid var(--line);
border-radius: 8px;
padding: 0 12px;
background: #fff;
color: var(--ink);
font: inherit;
}
.table-wrap {
overflow-x: auto;
}
table {
width: 100%;
border-collapse: collapse;
}
th,
td {
border-bottom: 1px solid var(--line);
padding: 12px 10px;
text-align: left;
white-space: nowrap;
}
th {
color: var(--muted);
font-size: 0.78rem;
text-transform: uppercase;
}
@media (max-width: 900px) {
.cards {
grid-template-columns: repeat(2, minmax(0, 1fr));
}
.report-form {
grid-template-columns: repeat(2, minmax(0, 1fr));
}
}
@media (max-width: 620px) {
.topbar,
.hero,
.panel-heading {
align-items: stretch;
flex-direction: column;
}
.nav {
width: 100%;
}
.nav a {
flex: 1;
text-align: center;
}
.cards,
.report-form {
grid-template-columns: 1fr;
}
.button {
width: 100%;
}
.reading-row strong {
font-size: 1.8rem;
}
}
+72
View File
@@ -0,0 +1,72 @@
from __future__ import annotations
import base64
import hashlib
import hmac
import json
import time
import urllib.error
import urllib.request
import uuid
from typing import Any
BASE_URL = "https://api.switch-bot.com/v1.1"
class SwitchBotError(RuntimeError):
pass
class SwitchBotClient:
def __init__(self, token: str, secret: str) -> None:
self.token = token
self.secret = secret
def _headers(self) -> dict[str, str]:
timestamp_ms = str(int(time.time() * 1000))
nonce = str(uuid.uuid4())
message = f"{self.token}{timestamp_ms}{nonce}".encode("utf-8")
digest = hmac.new(self.secret.encode("utf-8"), message, hashlib.sha256).digest()
signature = base64.b64encode(digest).decode("utf-8")
return {
"Authorization": self.token,
"Content-Type": "application/json; charset=utf8",
"sign": signature,
"t": timestamp_ms,
"nonce": nonce,
}
def get(self, path: str) -> dict[str, Any]:
endpoint = path if path.startswith("/") else f"/{path}"
request = urllib.request.Request(
f"{BASE_URL}{endpoint}",
headers=self._headers(),
method="GET",
)
try:
with urllib.request.urlopen(request, timeout=20) as response:
body = response.read().decode("utf-8")
except urllib.error.HTTPError as exc:
details = exc.read().decode("utf-8", errors="replace")
raise SwitchBotError(f"HTTP {exc.code}: {details}") from exc
except urllib.error.URLError as exc:
raise SwitchBotError(f"Could not reach SwitchBot API: {exc.reason}") from exc
try:
payload = json.loads(body)
except json.JSONDecodeError as exc:
raise SwitchBotError(f"SwitchBot returned invalid JSON: {body}") from exc
if payload.get("statusCode") != 100:
raise SwitchBotError(f"SwitchBot API error: {payload}")
return payload
def devices(self) -> list[dict[str, Any]]:
payload = self.get("/devices")
body = payload.get("body", {})
return body.get("deviceList", [])
def status(self, device_id: str) -> dict[str, Any]:
payload = self.get(f"/devices/{device_id}/status")
return payload.get("body", {})
+21
View File
@@ -0,0 +1,21 @@
<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>{% block title %}SwitchBot Temps{% endblock %}</title>
<link rel="stylesheet" href="{{ url_for('static', filename='styles.css') }}">
</head>
<body>
<header class="topbar">
<a class="brand" href="/">SwitchBot Temps</a>
<nav class="nav">
<a href="/" {% if request.path == "/" %}aria-current="page"{% endif %}>Dashboard</a>
<a href="/reports" {% if request.path == "/reports" %}aria-current="page"{% endif %}>Reports</a>
</nav>
</header>
<main class="page">
{% block content %}{% endblock %}
</main>
</body>
</html>
+70
View File
@@ -0,0 +1,70 @@
{% extends "base.html" %}
{% block title %}Dashboard - SwitchBot Temps{% endblock %}
{% block content %}
<section class="hero">
<div>
<p class="eyebrow">Today so far</p>
<h1>Temperature dashboard</h1>
<p class="muted">Local timezone: {{ timezone }}. Collector interval: {{ collect_interval_seconds // 60 }} min.</p>
</div>
<a class="button" href="/reports">Make report</a>
</section>
{% if devices %}
<section class="cards">
{% for device in devices %}
{% set reading = latest.get(device.id) %}
{% set stat = stats.get(device.id) %}
<article class="metric-card">
<div class="card-heading">
<h2>{{ device.name }}</h2>
<span>{{ device.device_type }}</span>
</div>
{% if reading %}
<div class="reading-row">
<strong>{{ "%.1f"|format(reading.temperature) if reading.temperature is not none else "n/a" }}&deg;C</strong>
<span>{{ reading.humidity if reading.humidity is not none else "n/a" }}% RH</span>
</div>
<dl class="mini-stats">
<div>
<dt>Day low</dt>
<dd>{{ "%.1f"|format(stat.low) if stat else "n/a" }}&deg;C</dd>
</div>
<div>
<dt>Day high</dt>
<dd>{{ "%.1f"|format(stat.high) if stat else "n/a" }}&deg;C</dd>
</div>
<div>
<dt>Battery</dt>
<dd>{{ reading.battery if reading.battery is not none else "n/a" }}%</dd>
</div>
</dl>
{% else %}
<p class="empty">Waiting for the first reading.</p>
{% endif %}
</article>
{% endfor %}
</section>
<section class="panel">
<div class="panel-heading">
<div>
<h2>Day graph</h2>
<p class="muted">Temperature readings from midnight to now.</p>
</div>
</div>
<div class="chart-wrap">
<canvas id="temperatureChart" height="360"></canvas>
</div>
<script id="chart-data" type="application/json">{{ chart_json|safe }}</script>
<script src="{{ url_for('static', filename='chart.js') }}"></script>
</section>
{% else %}
<section class="empty-state">
<h2>No devices yet</h2>
<p>Start the collector and it will populate device names from the SwitchBot API automatically.</p>
</section>
{% endif %}
{% endblock %}
+72
View File
@@ -0,0 +1,72 @@
{% extends "base.html" %}
{% block title %}Reports - SwitchBot Temps{% endblock %}
{% block content %}
<section class="hero">
<div>
<p class="eyebrow">Reports</p>
<h1>Build a temperature report</h1>
<p class="muted">Choose a date range and export the summary as CSV.</p>
</div>
</section>
<form class="report-form" method="get" action="/reports">
<label>
Start
<input type="date" name="start" value="{{ start }}">
</label>
<label>
End
<input type="date" name="end" value="{{ end }}">
</label>
<label>
Device
<select name="device_id">
<option value="">All devices</option>
{% for device in devices %}
<option value="{{ device.id }}" {% if device.id == device_id %}selected{% endif %}>{{ device.name }}</option>
{% endfor %}
</select>
</label>
<button class="button" type="submit">Run report</button>
<a class="button secondary" href="/reports.csv?start={{ start }}&end={{ end }}&device_id={{ device_id }}">Download CSV</a>
</form>
<section class="panel">
<div class="table-wrap">
<table>
<thead>
<tr>
<th>Device</th>
<th>Samples</th>
<th>Low temp</th>
<th>High temp</th>
<th>Avg temp</th>
<th>Low RH</th>
<th>High RH</th>
<th>Avg RH</th>
</tr>
</thead>
<tbody>
{% for row in rows %}
<tr>
<td>{{ row.device_name }}</td>
<td>{{ row.samples }}</td>
<td>{{ row.low_temp if row.low_temp is not none else "n/a" }}&deg;C</td>
<td>{{ row.high_temp if row.high_temp is not none else "n/a" }}&deg;C</td>
<td>{{ row.avg_temp if row.avg_temp is not none else "n/a" }}&deg;C</td>
<td>{{ row.low_humidity if row.low_humidity is not none else "n/a" }}%</td>
<td>{{ row.high_humidity if row.high_humidity is not none else "n/a" }}%</td>
<td>{{ row.avg_humidity if row.avg_humidity is not none else "n/a" }}%</td>
</tr>
{% else %}
<tr>
<td colspan="8" class="empty">No readings found for this range.</td>
</tr>
{% endfor %}
</tbody>
</table>
</div>
</section>
{% endblock %}
+255
View File
@@ -0,0 +1,255 @@
from __future__ import annotations
import csv
import io
import json
from datetime import date, datetime, time, timedelta, timezone
from zoneinfo import ZoneInfo
from flask import Flask, Response, render_template, request
from sqlalchemy import Select, func, select
from sqlalchemy.orm import Session, joinedload
from app.config import config
from app.db import SessionLocal, init_db
from app.models import Device, Reading
SENSOR_DEVICE_TYPES = {"WoIOSensor"}
def create_app() -> Flask:
app = Flask(__name__)
app.secret_key = config.flask_secret_key
init_db()
@app.get("/")
def dashboard() -> str:
tz = ZoneInfo(config.app_timezone)
today = datetime.now(tz).date()
start_utc, end_utc = local_date_range_to_utc(today, today, tz)
with SessionLocal() as session:
devices = list(
session.scalars(
select(Device)
.where(Device.device_type.in_(SENSOR_DEVICE_TYPES))
.order_by(Device.name)
)
)
readings = load_readings(session, start_utc, end_utc)
latest = latest_by_device(session)
return render_template(
"dashboard.html",
devices=devices,
latest=latest,
stats=stats_by_device(readings),
chart_json=json.dumps(chart_payload(readings, tz)),
timezone=config.app_timezone,
collect_interval_seconds=config.collect_interval_seconds,
)
@app.get("/reports")
def reports() -> str:
tz = ZoneInfo(config.app_timezone)
start_text = request.args.get("start") or datetime.now(tz).date().isoformat()
end_text = request.args.get("end") or start_text
device_id = request.args.get("device_id") or ""
start_date = date.fromisoformat(start_text)
end_date = date.fromisoformat(end_text)
start_utc, end_utc = local_date_range_to_utc(start_date, end_date, tz)
with SessionLocal() as session:
devices = list(
session.scalars(
select(Device)
.where(Device.device_type.in_(SENSOR_DEVICE_TYPES))
.order_by(Device.name)
)
)
rows = report_rows(session, start_utc, end_utc, device_id)
return render_template(
"reports.html",
devices=devices,
rows=rows,
start=start_text,
end=end_text,
device_id=device_id,
)
@app.get("/reports.csv")
def reports_csv() -> Response:
tz = ZoneInfo(config.app_timezone)
start_date = date.fromisoformat(request.args.get("start") or datetime.now(tz).date().isoformat())
end_date = date.fromisoformat(request.args.get("end") or start_date.isoformat())
device_id = request.args.get("device_id") or ""
start_utc, end_utc = local_date_range_to_utc(start_date, end_date, tz)
with SessionLocal() as session:
rows = report_rows(session, start_utc, end_utc, device_id)
output = io.StringIO()
writer = csv.writer(output)
writer.writerow(
[
"device",
"samples",
"low_temp",
"high_temp",
"avg_temp",
"low_humidity",
"high_humidity",
"avg_humidity",
]
)
for row in rows:
writer.writerow(
[
row["device_name"],
row["samples"],
row["low_temp"],
row["high_temp"],
row["avg_temp"],
row["low_humidity"],
row["high_humidity"],
row["avg_humidity"],
]
)
return Response(
output.getvalue(),
mimetype="text/csv",
headers={"Content-Disposition": "attachment; filename=switchbot-report.csv"},
)
return app
def local_date_range_to_utc(start: date, end: date, tz: ZoneInfo) -> tuple[datetime, datetime]:
local_start = datetime.combine(start, time.min, tzinfo=tz)
local_end = datetime.combine(end + timedelta(days=1), time.min, tzinfo=tz)
return (
local_start.astimezone(timezone.utc).replace(tzinfo=None),
local_end.astimezone(timezone.utc).replace(tzinfo=None),
)
def load_readings(session: Session, start_utc: datetime, end_utc: datetime) -> list[Reading]:
stmt = (
select(Reading)
.options(joinedload(Reading.device))
.join(Reading.device)
.where(Reading.recorded_at >= start_utc, Reading.recorded_at < end_utc)
.order_by(Reading.recorded_at)
)
return list(session.scalars(stmt))
def latest_by_device(session: Session) -> dict[str, Reading]:
subquery = (
select(Reading.device_id, func.max(Reading.recorded_at).label("latest_at"))
.group_by(Reading.device_id)
.subquery()
)
rows = session.scalars(
select(Reading)
.join(subquery, (Reading.device_id == subquery.c.device_id) & (Reading.recorded_at == subquery.c.latest_at))
.order_by(Reading.device_id)
)
return {reading.device_id: reading for reading in rows}
def stats_by_device(readings: list[Reading]) -> dict[str, dict[str, float | int]]:
stats: dict[str, dict[str, float | int]] = {}
for reading in readings:
if reading.temperature is None:
continue
item = stats.setdefault(
reading.device_id,
{
"low": reading.temperature,
"high": reading.temperature,
"samples": 0,
},
)
item["low"] = min(float(item["low"]), reading.temperature)
item["high"] = max(float(item["high"]), reading.temperature)
item["samples"] = int(item["samples"]) + 1
return stats
def chart_payload(readings: list[Reading], tz: ZoneInfo) -> list[dict[str, object]]:
series: dict[str, dict[str, object]] = {}
for reading in readings:
if reading.temperature is None:
continue
item = series.setdefault(
reading.device_id,
{
"deviceId": reading.device_id,
"name": reading.device.name,
"points": [],
},
)
local_time = reading.recorded_at.replace(tzinfo=timezone.utc).astimezone(tz)
item["points"].append(
{
"time": local_time.isoformat(timespec="minutes"),
"temperature": reading.temperature,
"humidity": reading.humidity,
}
)
return list(series.values())
def report_rows(
session: Session,
start_utc: datetime,
end_utc: datetime,
device_id: str = "",
) -> list[dict[str, object]]:
stmt: Select[tuple] = (
select(
Device.name,
func.count(Reading.id),
func.min(Reading.temperature),
func.max(Reading.temperature),
func.avg(Reading.temperature),
func.min(Reading.humidity),
func.max(Reading.humidity),
func.avg(Reading.humidity),
)
.join(Reading.device)
.where(Reading.recorded_at >= start_utc, Reading.recorded_at < end_utc)
.group_by(Device.id, Device.name)
.order_by(Device.name)
)
if device_id:
stmt = stmt.where(Device.id == device_id)
rows = []
for row in session.execute(stmt):
rows.append(
{
"device_name": row[0],
"samples": row[1],
"low_temp": rounded(row[2]),
"high_temp": rounded(row[3]),
"avg_temp": rounded(row[4]),
"low_humidity": rounded(row[5]),
"high_humidity": rounded(row[6]),
"avg_humidity": rounded(row[7]),
}
)
return rows
def rounded(value: float | None) -> float | None:
if value is None:
return None
return round(float(value), 1)
app = create_app()
+48
View File
@@ -0,0 +1,48 @@
services:
db:
image: mysql:8.4
environment:
MYSQL_DATABASE: ${MYSQL_DATABASE:-switchbot}
MYSQL_USER: ${MYSQL_USER:-switchbot}
MYSQL_PASSWORD: ${MYSQL_PASSWORD:-switchbot_password}
MYSQL_ROOT_PASSWORD: ${MYSQL_ROOT_PASSWORD:-root_password}
volumes:
- mysql_data:/var/lib/mysql
healthcheck:
test: ["CMD", "mysqladmin", "ping", "-h", "localhost"]
interval: 10s
timeout: 5s
retries: 10
web:
build: .
command: ["gunicorn", "--bind", "0.0.0.0:8000", "app.web:app"]
environment:
DATABASE_URL: ${DATABASE_URL:-mysql+pymysql://${MYSQL_USER:-switchbot}:${MYSQL_PASSWORD:-switchbot_password}@db:3306/${MYSQL_DATABASE:-switchbot}}
SWITCHBOT_TOKEN: ${SWITCHBOT_TOKEN:-}
SWITCHBOT_SECRET: ${SWITCHBOT_SECRET:-}
COLLECT_INTERVAL_SECONDS: ${COLLECT_INTERVAL_SECONDS:-900}
APP_TIMEZONE: ${APP_TIMEZONE:-Europe/London}
FLASK_SECRET_KEY: ${FLASK_SECRET_KEY:-dev-only-secret}
ports:
- "8000:8000"
depends_on:
db:
condition: service_healthy
collector:
build: .
command: ["python", "-m", "app.collector"]
environment:
DATABASE_URL: ${DATABASE_URL:-mysql+pymysql://${MYSQL_USER:-switchbot}:${MYSQL_PASSWORD:-switchbot_password}@db:3306/${MYSQL_DATABASE:-switchbot}}
SWITCHBOT_TOKEN: ${SWITCHBOT_TOKEN:-}
SWITCHBOT_SECRET: ${SWITCHBOT_SECRET:-}
COLLECT_INTERVAL_SECONDS: ${COLLECT_INTERVAL_SECONDS:-900}
APP_TIMEZONE: ${APP_TIMEZONE:-Europe/London}
depends_on:
db:
condition: service_healthy
restart: unless-stopped
volumes:
mysql_data:
+7
View File
@@ -0,0 +1,7 @@
Flask==3.0.3
cryptography==43.0.0
gunicorn==22.0.0
PyMySQL==1.1.1
python-dotenv==1.0.1
SQLAlchemy==2.0.32
tzdata>=2024.1
+129
View File
@@ -0,0 +1,129 @@
#!/usr/bin/env python3
"""Small SwitchBot API v1.1 proof-of-concept.
By default this lists devices from:
GET https://api.switch-bot.com/v1.1/devices
Credentials are read from environment variables so the token and secret do not
need to be committed into source files:
SWITCHBOT_TOKEN=... SWITCHBOT_SECRET=... python3 switchbot_poc.py
"""
from __future__ import annotations
import argparse
import base64
import hashlib
import hmac
import json
import os
import sys
import time
import urllib.error
import urllib.request
import uuid
BASE_URL = "https://api.switch-bot.com/v1.1"
def build_headers(token: str, secret: str) -> dict[str, str]:
"""Build SwitchBot v1.1 authentication headers."""
timestamp_ms = str(int(time.time() * 1000))
nonce = str(uuid.uuid4())
message = f"{token}{timestamp_ms}{nonce}".encode("utf-8")
digest = hmac.new(secret.encode("utf-8"), message, hashlib.sha256).digest()
signature = base64.b64encode(digest).decode("utf-8")
return {
"Authorization": token,
"Content-Type": "application/json; charset=utf8",
"sign": signature,
"t": timestamp_ms,
"nonce": nonce,
}
def switchbot_get(path: str, token: str, secret: str) -> dict:
"""Perform a signed GET request and parse the JSON response."""
url = f"{BASE_URL}{path}"
request = urllib.request.Request(
url,
headers=build_headers(token, secret),
method="GET",
)
try:
with urllib.request.urlopen(request, timeout=15) as response:
body = response.read().decode("utf-8")
except urllib.error.HTTPError as exc:
details = exc.read().decode("utf-8", errors="replace")
raise RuntimeError(f"HTTP {exc.code} from SwitchBot: {details}") from exc
except urllib.error.URLError as exc:
raise RuntimeError(f"Could not reach SwitchBot API: {exc.reason}") from exc
return json.loads(body)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Query the SwitchBot API v1.1 using signed requests."
)
parser.add_argument(
"--endpoint",
default="/devices",
help="API path to query, for example /devices, /scenes, or /devices/<id>/status.",
)
parser.add_argument(
"--token",
default=os.environ.get("SWITCHBOT_TOKEN"),
help="SwitchBot token. Defaults to SWITCHBOT_TOKEN.",
)
parser.add_argument(
"--secret",
default=os.environ.get("SWITCHBOT_SECRET"),
help="SwitchBot secret. Defaults to SWITCHBOT_SECRET.",
)
parser.add_argument(
"--diagnose",
action="store_true",
help="Print a short hint when /devices returns an empty inventory.",
)
return parser.parse_args()
def main() -> int:
args = parse_args()
if not args.token or not args.secret:
print(
"Missing credentials. Set SWITCHBOT_TOKEN and SWITCHBOT_SECRET, "
"or pass --token and --secret.",
file=sys.stderr,
)
return 2
endpoint = args.endpoint if args.endpoint.startswith("/") else f"/{args.endpoint}"
try:
payload = switchbot_get(endpoint, args.token, args.secret)
except (RuntimeError, json.JSONDecodeError) as exc:
print(f"Request failed: {exc}", file=sys.stderr)
return 1
print(json.dumps(payload, indent=2, sort_keys=True))
if args.diagnose and endpoint == "/devices":
device_list = payload.get("body", {}).get("deviceList", [])
remote_list = payload.get("body", {}).get("infraredRemoteList", [])
if payload.get("statusCode") == 100 and not device_list and not remote_list:
print(
"\nDiagnostic: authentication succeeded, but this SwitchBot "
"account has no Cloud API-visible devices. Check that the "
"token belongs to the same app account/home as your devices, "
"that devices are added to the SwitchBot app, and that "
"Bluetooth-only devices are reachable via a SwitchBot Hub.",
file=sys.stderr,
)
return 0
if __name__ == "__main__":
raise SystemExit(main())