Files
2026-06-18 21:07:34 +01:00

396 lines
14 KiB
Python

from __future__ import annotations
from collections import defaultdict
from dataclasses import dataclass
from datetime import date, datetime, timezone
from pathlib import Path
from zoneinfo import ZoneInfo
from PIL import Image, ImageDraw, ImageFont
from reportlab.lib import colors
from reportlab.lib.enums import TA_CENTER
from reportlab.lib.pagesizes import A4, landscape
from reportlab.lib.styles import ParagraphStyle, getSampleStyleSheet
from reportlab.lib.units import mm
from reportlab.platypus import (
Image as RLImage,
PageBreak,
Paragraph,
SimpleDocTemplate,
Spacer,
Table,
TableStyle,
)
from sqlalchemy import select
from sqlalchemy.orm import Session, joinedload
from app.models import Device, Reading
@dataclass(frozen=True)
class DailySummary:
day: date
high: float
low: float
average: float
@dataclass(frozen=True)
class ReportDevice:
name: str
readings: list[Reading]
summary: list[DailySummary]
def build_temperature_pdf(
session: Session,
output_dir: Path,
chart_dir: Path,
start_utc: datetime,
end_utc: datetime,
start_date: date,
end_date: date,
tz: ZoneInfo,
device_id: str = "",
) -> Path:
output_dir.mkdir(parents=True, exist_ok=True)
chart_dir.mkdir(parents=True, exist_ok=True)
devices = load_report_devices(session, start_utc, end_utc, tz, device_id)
generated_at = datetime.now(tz)
filename = f"temperature-report-{start_date:%Y%m%d}-{end_date:%Y%m%d}"
if device_id and len(devices) == 1:
filename += f"-{safe_filename(devices[0].name)}"
pdf_path = output_dir / f"{filename}.pdf"
styles = getSampleStyleSheet()
title_style = ParagraphStyle(
"RecordTitle",
parent=styles["Title"],
fontName="Helvetica-Bold",
fontSize=20,
leading=24,
textColor=colors.HexColor("#1F4E78"),
alignment=TA_CENTER,
spaceAfter=5,
)
subtitle_style = ParagraphStyle(
"Subtitle",
parent=styles["Normal"],
fontName="Helvetica",
fontSize=9,
leading=11,
textColor=colors.HexColor("#475467"),
alignment=TA_CENTER,
spaceAfter=8,
)
section_style = ParagraphStyle(
"Section",
parent=styles["Heading2"],
fontName="Helvetica-Bold",
fontSize=14,
leading=17,
textColor=colors.HexColor("#1F2933"),
spaceBefore=2,
spaceAfter=5,
)
period_label = format_period(start_date, end_date)
doc = SimpleDocTemplate(
str(pdf_path),
pagesize=landscape(A4),
leftMargin=12 * mm,
rightMargin=12 * mm,
topMargin=10 * mm,
bottomMargin=12 * mm,
title=f"Temperature Report - {period_label}",
)
story = []
if not devices:
story.extend(
[
Paragraph(f"Temperature Records - {period_label}", title_style),
Paragraph("No readings found for this period.", subtitle_style),
]
)
else:
first_device = True
for device in devices:
if not first_device:
story.append(PageBreak())
first_device = False
chart_path = chart_dir / f"{safe_filename(device.name)}-{start_date:%Y%m%d}-{end_date:%Y%m%d}.png"
draw_chart(device.summary, f"{device.name} daily temperature trend", chart_path)
story.append(Paragraph(f"Temperature Records - {period_label}", title_style))
story.append(
Paragraph(
f"Daily high, minimum, and average temperature summaries. Generated {generated_at:%Y-%m-%d %H:%M %Z}.",
subtitle_style,
)
)
story.append(Paragraph(device.name, section_style))
chart = RLImage(str(chart_path), width=141 * mm, height=41.5 * mm)
top_row = Table(
[[make_stat_cards(device, period_label, tz), chart]],
colWidths=[143 * mm, 142 * mm],
)
top_row.setStyle(
TableStyle(
[
("VALIGN", (0, 0), (-1, -1), "TOP"),
("LEFTPADDING", (0, 0), (-1, -1), 0),
("RIGHTPADDING", (0, 0), (-1, -1), 0),
]
)
)
story.append(top_row)
story.append(Spacer(1, 4 * mm))
story.append(make_daily_table(device.summary))
doc.build(story, onFirstPage=footer, onLaterPages=footer)
return pdf_path
def load_report_devices(
session: Session,
start_utc: datetime,
end_utc: datetime,
tz: ZoneInfo,
device_id: str = "",
) -> list[ReportDevice]:
stmt = (
select(Reading)
.options(joinedload(Reading.device))
.join(Reading.device)
.where(Reading.recorded_at >= start_utc, Reading.recorded_at < end_utc)
.order_by(Device.name, Reading.recorded_at)
)
if device_id:
stmt = stmt.where(Device.id == device_id)
grouped: dict[str, list[Reading]] = defaultdict(list)
names: dict[str, str] = {}
for reading in session.scalars(stmt):
if reading.device.device_type != "WoIOSensor":
continue
grouped[reading.device_id].append(reading)
names[reading.device_id] = reading.device.name
report_devices = []
for item_device_id in sorted(grouped.keys(), key=lambda value: names[value].lower()):
readings = grouped[item_device_id]
summary = daily_summary(readings, tz)
if summary:
report_devices.append(ReportDevice(names[item_device_id], readings, summary))
return report_devices
def daily_summary(readings: list[Reading], tz: ZoneInfo) -> list[DailySummary]:
temperatures_by_day: dict[date, list[float]] = defaultdict(list)
for reading in readings:
if reading.temperature is None:
continue
local_time = reading.recorded_at.replace(tzinfo=timezone.utc).astimezone(tz)
temperatures_by_day[local_time.date()].append(float(reading.temperature))
summaries = []
for day in sorted(temperatures_by_day):
values = temperatures_by_day[day]
summaries.append(DailySummary(day, max(values), min(values), sum(values) / len(values)))
return summaries
def font(size: int, bold: bool = False) -> ImageFont.FreeTypeFont | ImageFont.ImageFont:
candidates = [
"/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf" if bold else "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf",
"C:/Windows/Fonts/arialbd.ttf" if bold else "C:/Windows/Fonts/arial.ttf",
"C:/Windows/Fonts/calibrib.ttf" if bold else "C:/Windows/Fonts/calibri.ttf",
]
for candidate in candidates:
try:
return ImageFont.truetype(candidate, size)
except OSError:
continue
return ImageFont.load_default()
def draw_chart(summary: list[DailySummary], title: str, output_path: Path) -> None:
width, height = 1050, 310
left, top, right, bottom = 64, 31, 28, 56
plot_w = width - left - right
plot_h = height - top - bottom
image = Image.new("RGB", (width, height), "white")
draw = ImageDraw.Draw(image)
series = [
("High", [row.high for row in summary], (21, 101, 150)),
("Min", [row.low for row in summary], (221, 107, 45)),
("Average", [row.average for row in summary], (29, 111, 54)),
]
values = [value for _, values, _ in series for value in values]
y_min = max(0, int(min(values) // 5 * 5))
y_max = int((max(values) + 4.999) // 5 * 5)
if y_max == y_min:
y_max += 5
title_font = font(22, True)
label_font = font(15)
small_font = font(12)
draw.text((width // 2, 8), title, fill=(30, 41, 51), font=title_font, anchor="ma")
def x_pos(idx: int) -> float:
if len(summary) == 1:
return left + plot_w / 2
return left + idx * plot_w / (len(summary) - 1)
def y_pos(value: float) -> float:
return top + (y_max - value) * plot_h / (y_max - y_min)
for tick in range(y_min, y_max + 1, 5):
y = y_pos(tick)
draw.line((left, y, width - right, y), fill=(215, 221, 226), width=1)
draw.text((left - 12, y), f"{tick}", fill=(55, 65, 81), font=small_font, anchor="rm")
draw.line((left, top, left, top + plot_h), fill=(99, 110, 123), width=2)
draw.line((left, top + plot_h, width - right, top + plot_h), fill=(99, 110, 123), width=2)
tick_indexes = list(range(0, len(summary), max(1, len(summary) // 7)))
if len(summary) - 1 not in tick_indexes:
tick_indexes.append(len(summary) - 1)
for idx in tick_indexes:
x = x_pos(idx)
draw.line((x, top, x, top + plot_h), fill=(235, 238, 241), width=1)
draw.text((x, top + plot_h + 11), summary[idx].day.strftime("%d %b"), fill=(55, 65, 81), font=small_font, anchor="ma")
for name, values, color in series:
points = [(x_pos(idx), y_pos(value)) for idx, value in enumerate(values)]
if len(points) > 1:
draw.line(points, fill=color, width=4)
for x, y in points:
draw.ellipse((x - 3, y - 3, x + 3, y + 3), fill=color)
legend_x = left + 330
legend_y = height - 24
for name, _, color in series:
draw.line((legend_x, legend_y, legend_x + 28, legend_y), fill=color, width=4)
draw.text((legend_x + 36, legend_y), f"{name} C", fill=(30, 41, 51), font=label_font, anchor="lm")
legend_x += 160
output_path.parent.mkdir(parents=True, exist_ok=True)
image.save(output_path)
def make_stat_cards(device: ReportDevice, period_label: str, tz: ZoneInfo) -> Table:
readings = device.readings
summary = device.summary
temperatures = [float(reading.temperature) for reading in readings if reading.temperature is not None]
humidity_values = [float(reading.humidity) for reading in readings if reading.humidity is not None]
start = readings[0].recorded_at.replace(tzinfo=timezone.utc).astimezone(tz).strftime("%d %b %Y %H:%M")
end = readings[-1].recorded_at.replace(tzinfo=timezone.utc).astimezone(tz).strftime("%d %b %Y %H:%M")
stats = [
["Device", device.name, "Period", period_label],
["Readings", f"{len(readings):,}", "Days", f"{len(summary)}"],
["First", start, "Last", end],
["Highest", f"{max(temperatures):.1f} C", "Lowest", f"{min(temperatures):.1f} C"],
["Average", f"{sum(temperatures) / len(temperatures):.1f} C", "Avg RH", format_average(humidity_values, "%")],
]
table = Table(stats, colWidths=[22 * mm, 47 * mm, 20 * mm, 52 * mm])
table.setStyle(
TableStyle(
[
("BACKGROUND", (0, 0), (-1, 0), colors.HexColor("#D9EAF7")),
("TEXTCOLOR", (0, 0), (-1, -1), colors.HexColor("#1F2933")),
("FONTNAME", (0, 0), (-1, -1), "Helvetica"),
("FONTNAME", (0, 0), (0, -1), "Helvetica-Bold"),
("FONTNAME", (2, 0), (2, -1), "Helvetica-Bold"),
("FONTSIZE", (0, 0), (-1, -1), 7.5),
("GRID", (0, 0), (-1, -1), 0.35, colors.HexColor("#C9D7E5")),
("VALIGN", (0, 0), (-1, -1), "MIDDLE"),
("ROWBACKGROUNDS", (0, 1), (-1, -1), [colors.white, colors.HexColor("#F6FAFD")]),
]
)
)
return table
def make_daily_table(summary: list[DailySummary]) -> Table:
data = daily_table_data(summary)
if len(data) <= 18:
return style_daily_table(Table(data, colWidths=[38 * mm, 28 * mm, 28 * mm, 32 * mm], repeatRows=1))
headers, rows = data[0], data[1:]
split_at = (len(rows) + 1) // 2
left = [headers] + rows[:split_at]
right = [headers] + rows[split_at:]
left_table = style_daily_table(Table(left, colWidths=[35 * mm, 25 * mm, 25 * mm, 29 * mm], repeatRows=1))
right_table = style_daily_table(Table(right, colWidths=[35 * mm, 25 * mm, 25 * mm, 29 * mm], repeatRows=1))
wrapper = Table([[left_table, right_table]], colWidths=[124 * mm, 124 * mm])
wrapper.setStyle(
TableStyle(
[
("VALIGN", (0, 0), (-1, -1), "TOP"),
("LEFTPADDING", (0, 0), (-1, -1), 0),
("RIGHTPADDING", (0, 0), (-1, -1), 0),
]
)
)
return wrapper
def daily_table_data(summary: list[DailySummary]) -> list[list[str]]:
data = [["Date", "High C", "Min C", "Average C"]]
for row in summary:
data.append([row.day.isoformat(), f"{row.high:.1f}", f"{row.low:.1f}", f"{row.average:.1f}"])
return data
def style_daily_table(table: Table) -> Table:
table.setStyle(
TableStyle(
[
("BACKGROUND", (0, 0), (-1, 0), colors.HexColor("#4472C4")),
("TEXTCOLOR", (0, 0), (-1, 0), colors.white),
("FONTNAME", (0, 0), (-1, 0), "Helvetica-Bold"),
("FONTNAME", (0, 1), (-1, -1), "Helvetica"),
("FONTSIZE", (0, 0), (-1, -1), 7.6),
("ALIGN", (1, 1), (-1, -1), "RIGHT"),
("ALIGN", (0, 0), (-1, 0), "CENTER"),
("GRID", (0, 0), (-1, -1), 0.3, colors.HexColor("#E0E7EF")),
("ROWBACKGROUNDS", (0, 1), (-1, -1), [colors.white, colors.HexColor("#D7F0FA")]),
("BOTTOMPADDING", (0, 0), (-1, -1), 2),
("TOPPADDING", (0, 0), (-1, -1), 2),
]
)
)
return table
def footer(canvas, doc) -> None:
canvas.saveState()
canvas.setFont("Helvetica", 8)
canvas.setFillColor(colors.HexColor("#667085"))
canvas.drawRightString(285 * mm, 9 * mm, f"Page {doc.page}")
canvas.restoreState()
def format_period(start_date: date, end_date: date) -> str:
if start_date == end_date:
return start_date.strftime("%d %B %Y")
if start_date.year == end_date.year and start_date.month == end_date.month:
return f"{start_date:%d} to {end_date:%d %B %Y}"
return f"{start_date:%d %B %Y} to {end_date:%d %B %Y}"
def format_average(values: list[float], suffix: str) -> str:
if not values:
return "n/a"
return f"{sum(values) / len(values):.1f}{suffix}"
def safe_filename(value: str) -> str:
safe = "".join(char.lower() if char.isalnum() else "-" for char in value)
return "-".join(part for part in safe.split("-") if part) or "device"