Files
mt-vet-temps/app/collector.py
T
2026-06-01 21:02:36 +01:00

117 lines
3.8 KiB
Python

from __future__ import annotations
import logging
import time
from datetime import datetime, timezone
from typing import Any
from sqlalchemy import select
from app.config import config
from app.db import init_db, session_scope
from app.models import Device, Reading
from app.switchbot import SwitchBotClient, SwitchBotError
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
)
logger = logging.getLogger(__name__)
SENSOR_DEVICE_TYPES = {"WoIOSensor"}
def upsert_device(device_data: dict[str, Any]) -> None:
device_id = device_data["deviceId"]
with session_scope() as session:
device = session.get(Device, device_id)
if device is None:
device = Device(id=device_id, name=device_data.get("deviceName", device_id), device_type="")
session.add(device)
device.name = device_data.get("deviceName", device_id)
device.device_type = device_data.get("deviceType", "")
device.enable_cloud_service = bool(device_data.get("enableCloudService"))
device.hub_device_id = device_data.get("hubDeviceId")
device.last_seen_at = datetime.now(timezone.utc).replace(tzinfo=None)
def sync_devices(client: SwitchBotClient) -> list[Device]:
logger.info("Syncing SwitchBot device list")
devices = client.devices()
logger.info("SwitchBot returned %s devices", len(devices))
for device_data in devices:
upsert_device(device_data)
with session_scope() as session:
return list(session.scalars(select(Device).order_by(Device.name)))
def save_reading(device: Device, status: dict[str, Any]) -> None:
if "temperature" not in status and "humidity" not in status:
logger.info("Skipping %s (%s): status has no temperature/humidity", device.name, device.id)
return
reading = Reading(
device_id=device.id,
recorded_at=datetime.now(timezone.utc).replace(tzinfo=None),
temperature=status.get("temperature"),
humidity=status.get("humidity"),
battery=status.get("battery"),
version=status.get("version"),
)
with session_scope() as session:
session.add(reading)
logger.info(
"Recorded %s: temp=%s humidity=%s battery=%s",
device.name,
reading.temperature,
reading.humidity,
reading.battery,
)
def collect_once(client: SwitchBotClient) -> None:
logger.info("Starting collection cycle")
devices = sync_devices(client)
for device in devices:
if device.device_type not in SENSOR_DEVICE_TYPES:
logger.info("Skipping %s (%s): %s is not a sensor", device.name, device.id, device.device_type)
continue
if not device.enable_cloud_service:
logger.info("Skipping %s (%s): cloud service disabled", device.name, device.id)
continue
try:
status = client.status(device.id)
except SwitchBotError as exc:
logger.warning("Could not read %s (%s): %s", device.name, device.id, exc)
continue
save_reading(device, status)
logger.info("Collection cycle complete")
def main() -> int:
if not config.switchbot_token or not config.switchbot_secret:
logger.error("SWITCHBOT_TOKEN and SWITCHBOT_SECRET are required")
return 2
init_db()
client = SwitchBotClient(config.switchbot_token, config.switchbot_secret)
interval = max(60, config.collect_interval_seconds)
logger.info("Collector started with %s second interval", interval)
while True:
started = time.monotonic()
try:
collect_once(client)
except Exception:
logger.exception("Collector cycle failed")
elapsed = time.monotonic() - started
time.sleep(max(1, interval - elapsed))
if __name__ == "__main__":
raise SystemExit(main())