396 lines
14 KiB
Python
396 lines
14 KiB
Python
from __future__ import annotations
|
|
|
|
from collections import defaultdict
|
|
from dataclasses import dataclass
|
|
from datetime import date, datetime, timezone
|
|
from pathlib import Path
|
|
from zoneinfo import ZoneInfo
|
|
|
|
from PIL import Image, ImageDraw, ImageFont
|
|
from reportlab.lib import colors
|
|
from reportlab.lib.enums import TA_CENTER
|
|
from reportlab.lib.pagesizes import A4, landscape
|
|
from reportlab.lib.styles import ParagraphStyle, getSampleStyleSheet
|
|
from reportlab.lib.units import mm
|
|
from reportlab.platypus import (
|
|
Image as RLImage,
|
|
PageBreak,
|
|
Paragraph,
|
|
SimpleDocTemplate,
|
|
Spacer,
|
|
Table,
|
|
TableStyle,
|
|
)
|
|
from sqlalchemy import select
|
|
from sqlalchemy.orm import Session, joinedload
|
|
|
|
from app.models import Device, Reading
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class DailySummary:
|
|
day: date
|
|
high: float
|
|
low: float
|
|
average: float
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ReportDevice:
|
|
name: str
|
|
readings: list[Reading]
|
|
summary: list[DailySummary]
|
|
|
|
|
|
def build_temperature_pdf(
|
|
session: Session,
|
|
output_dir: Path,
|
|
chart_dir: Path,
|
|
start_utc: datetime,
|
|
end_utc: datetime,
|
|
start_date: date,
|
|
end_date: date,
|
|
tz: ZoneInfo,
|
|
device_id: str = "",
|
|
) -> Path:
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
chart_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
devices = load_report_devices(session, start_utc, end_utc, tz, device_id)
|
|
generated_at = datetime.now(tz)
|
|
filename = f"temperature-report-{start_date:%Y%m%d}-{end_date:%Y%m%d}"
|
|
if device_id and len(devices) == 1:
|
|
filename += f"-{safe_filename(devices[0].name)}"
|
|
pdf_path = output_dir / f"{filename}.pdf"
|
|
|
|
styles = getSampleStyleSheet()
|
|
title_style = ParagraphStyle(
|
|
"RecordTitle",
|
|
parent=styles["Title"],
|
|
fontName="Helvetica-Bold",
|
|
fontSize=20,
|
|
leading=24,
|
|
textColor=colors.HexColor("#1F4E78"),
|
|
alignment=TA_CENTER,
|
|
spaceAfter=5,
|
|
)
|
|
subtitle_style = ParagraphStyle(
|
|
"Subtitle",
|
|
parent=styles["Normal"],
|
|
fontName="Helvetica",
|
|
fontSize=9,
|
|
leading=11,
|
|
textColor=colors.HexColor("#475467"),
|
|
alignment=TA_CENTER,
|
|
spaceAfter=8,
|
|
)
|
|
section_style = ParagraphStyle(
|
|
"Section",
|
|
parent=styles["Heading2"],
|
|
fontName="Helvetica-Bold",
|
|
fontSize=14,
|
|
leading=17,
|
|
textColor=colors.HexColor("#1F2933"),
|
|
spaceBefore=2,
|
|
spaceAfter=5,
|
|
)
|
|
|
|
period_label = format_period(start_date, end_date)
|
|
doc = SimpleDocTemplate(
|
|
str(pdf_path),
|
|
pagesize=landscape(A4),
|
|
leftMargin=12 * mm,
|
|
rightMargin=12 * mm,
|
|
topMargin=10 * mm,
|
|
bottomMargin=12 * mm,
|
|
title=f"Temperature Report - {period_label}",
|
|
)
|
|
story = []
|
|
|
|
if not devices:
|
|
story.extend(
|
|
[
|
|
Paragraph(f"Temperature Records - {period_label}", title_style),
|
|
Paragraph("No readings found for this period.", subtitle_style),
|
|
]
|
|
)
|
|
else:
|
|
first_device = True
|
|
for device in devices:
|
|
if not first_device:
|
|
story.append(PageBreak())
|
|
first_device = False
|
|
|
|
chart_path = chart_dir / f"{safe_filename(device.name)}-{start_date:%Y%m%d}-{end_date:%Y%m%d}.png"
|
|
draw_chart(device.summary, f"{device.name} daily temperature trend", chart_path)
|
|
|
|
story.append(Paragraph(f"Temperature Records - {period_label}", title_style))
|
|
story.append(
|
|
Paragraph(
|
|
f"Daily high, minimum, and average temperature summaries. Generated {generated_at:%Y-%m-%d %H:%M %Z}.",
|
|
subtitle_style,
|
|
)
|
|
)
|
|
story.append(Paragraph(device.name, section_style))
|
|
chart = RLImage(str(chart_path), width=141 * mm, height=41.5 * mm)
|
|
top_row = Table(
|
|
[[make_stat_cards(device, period_label, tz), chart]],
|
|
colWidths=[143 * mm, 142 * mm],
|
|
)
|
|
top_row.setStyle(
|
|
TableStyle(
|
|
[
|
|
("VALIGN", (0, 0), (-1, -1), "TOP"),
|
|
("LEFTPADDING", (0, 0), (-1, -1), 0),
|
|
("RIGHTPADDING", (0, 0), (-1, -1), 0),
|
|
]
|
|
)
|
|
)
|
|
story.append(top_row)
|
|
story.append(Spacer(1, 4 * mm))
|
|
story.append(make_daily_table(device.summary))
|
|
|
|
doc.build(story, onFirstPage=footer, onLaterPages=footer)
|
|
return pdf_path
|
|
|
|
|
|
def load_report_devices(
|
|
session: Session,
|
|
start_utc: datetime,
|
|
end_utc: datetime,
|
|
tz: ZoneInfo,
|
|
device_id: str = "",
|
|
) -> list[ReportDevice]:
|
|
stmt = (
|
|
select(Reading)
|
|
.options(joinedload(Reading.device))
|
|
.join(Reading.device)
|
|
.where(Reading.recorded_at >= start_utc, Reading.recorded_at < end_utc)
|
|
.order_by(Device.name, Reading.recorded_at)
|
|
)
|
|
if device_id:
|
|
stmt = stmt.where(Device.id == device_id)
|
|
|
|
grouped: dict[str, list[Reading]] = defaultdict(list)
|
|
names: dict[str, str] = {}
|
|
for reading in session.scalars(stmt):
|
|
if reading.device.device_type != "WoIOSensor":
|
|
continue
|
|
grouped[reading.device_id].append(reading)
|
|
names[reading.device_id] = reading.device.name
|
|
|
|
report_devices = []
|
|
for item_device_id in sorted(grouped.keys(), key=lambda value: names[value].lower()):
|
|
readings = grouped[item_device_id]
|
|
summary = daily_summary(readings, tz)
|
|
if summary:
|
|
report_devices.append(ReportDevice(names[item_device_id], readings, summary))
|
|
return report_devices
|
|
|
|
|
|
def daily_summary(readings: list[Reading], tz: ZoneInfo) -> list[DailySummary]:
|
|
temperatures_by_day: dict[date, list[float]] = defaultdict(list)
|
|
for reading in readings:
|
|
if reading.temperature is None:
|
|
continue
|
|
local_time = reading.recorded_at.replace(tzinfo=timezone.utc).astimezone(tz)
|
|
temperatures_by_day[local_time.date()].append(float(reading.temperature))
|
|
|
|
summaries = []
|
|
for day in sorted(temperatures_by_day):
|
|
values = temperatures_by_day[day]
|
|
summaries.append(DailySummary(day, max(values), min(values), sum(values) / len(values)))
|
|
return summaries
|
|
|
|
|
|
def font(size: int, bold: bool = False) -> ImageFont.FreeTypeFont | ImageFont.ImageFont:
|
|
candidates = [
|
|
"/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf" if bold else "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf",
|
|
"C:/Windows/Fonts/arialbd.ttf" if bold else "C:/Windows/Fonts/arial.ttf",
|
|
"C:/Windows/Fonts/calibrib.ttf" if bold else "C:/Windows/Fonts/calibri.ttf",
|
|
]
|
|
for candidate in candidates:
|
|
try:
|
|
return ImageFont.truetype(candidate, size)
|
|
except OSError:
|
|
continue
|
|
return ImageFont.load_default()
|
|
|
|
|
|
def draw_chart(summary: list[DailySummary], title: str, output_path: Path) -> None:
|
|
width, height = 1050, 310
|
|
left, top, right, bottom = 64, 31, 28, 56
|
|
plot_w = width - left - right
|
|
plot_h = height - top - bottom
|
|
image = Image.new("RGB", (width, height), "white")
|
|
draw = ImageDraw.Draw(image)
|
|
|
|
series = [
|
|
("High", [row.high for row in summary], (21, 101, 150)),
|
|
("Min", [row.low for row in summary], (221, 107, 45)),
|
|
("Average", [row.average for row in summary], (29, 111, 54)),
|
|
]
|
|
values = [value for _, values, _ in series for value in values]
|
|
y_min = max(0, int(min(values) // 5 * 5))
|
|
y_max = int((max(values) + 4.999) // 5 * 5)
|
|
if y_max == y_min:
|
|
y_max += 5
|
|
|
|
title_font = font(22, True)
|
|
label_font = font(15)
|
|
small_font = font(12)
|
|
draw.text((width // 2, 8), title, fill=(30, 41, 51), font=title_font, anchor="ma")
|
|
|
|
def x_pos(idx: int) -> float:
|
|
if len(summary) == 1:
|
|
return left + plot_w / 2
|
|
return left + idx * plot_w / (len(summary) - 1)
|
|
|
|
def y_pos(value: float) -> float:
|
|
return top + (y_max - value) * plot_h / (y_max - y_min)
|
|
|
|
for tick in range(y_min, y_max + 1, 5):
|
|
y = y_pos(tick)
|
|
draw.line((left, y, width - right, y), fill=(215, 221, 226), width=1)
|
|
draw.text((left - 12, y), f"{tick}", fill=(55, 65, 81), font=small_font, anchor="rm")
|
|
|
|
draw.line((left, top, left, top + plot_h), fill=(99, 110, 123), width=2)
|
|
draw.line((left, top + plot_h, width - right, top + plot_h), fill=(99, 110, 123), width=2)
|
|
|
|
tick_indexes = list(range(0, len(summary), max(1, len(summary) // 7)))
|
|
if len(summary) - 1 not in tick_indexes:
|
|
tick_indexes.append(len(summary) - 1)
|
|
for idx in tick_indexes:
|
|
x = x_pos(idx)
|
|
draw.line((x, top, x, top + plot_h), fill=(235, 238, 241), width=1)
|
|
draw.text((x, top + plot_h + 11), summary[idx].day.strftime("%d %b"), fill=(55, 65, 81), font=small_font, anchor="ma")
|
|
|
|
for name, values, color in series:
|
|
points = [(x_pos(idx), y_pos(value)) for idx, value in enumerate(values)]
|
|
if len(points) > 1:
|
|
draw.line(points, fill=color, width=4)
|
|
for x, y in points:
|
|
draw.ellipse((x - 3, y - 3, x + 3, y + 3), fill=color)
|
|
|
|
legend_x = left + 330
|
|
legend_y = height - 24
|
|
for name, _, color in series:
|
|
draw.line((legend_x, legend_y, legend_x + 28, legend_y), fill=color, width=4)
|
|
draw.text((legend_x + 36, legend_y), f"{name} C", fill=(30, 41, 51), font=label_font, anchor="lm")
|
|
legend_x += 160
|
|
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
image.save(output_path)
|
|
|
|
|
|
def make_stat_cards(device: ReportDevice, period_label: str, tz: ZoneInfo) -> Table:
|
|
readings = device.readings
|
|
summary = device.summary
|
|
temperatures = [float(reading.temperature) for reading in readings if reading.temperature is not None]
|
|
humidity_values = [float(reading.humidity) for reading in readings if reading.humidity is not None]
|
|
start = readings[0].recorded_at.replace(tzinfo=timezone.utc).astimezone(tz).strftime("%d %b %Y %H:%M")
|
|
end = readings[-1].recorded_at.replace(tzinfo=timezone.utc).astimezone(tz).strftime("%d %b %Y %H:%M")
|
|
stats = [
|
|
["Device", device.name, "Period", period_label],
|
|
["Readings", f"{len(readings):,}", "Days", f"{len(summary)}"],
|
|
["First", start, "Last", end],
|
|
["Highest", f"{max(temperatures):.1f} C", "Lowest", f"{min(temperatures):.1f} C"],
|
|
["Average", f"{sum(temperatures) / len(temperatures):.1f} C", "Avg RH", format_average(humidity_values, "%")],
|
|
]
|
|
table = Table(stats, colWidths=[22 * mm, 47 * mm, 20 * mm, 52 * mm])
|
|
table.setStyle(
|
|
TableStyle(
|
|
[
|
|
("BACKGROUND", (0, 0), (-1, 0), colors.HexColor("#D9EAF7")),
|
|
("TEXTCOLOR", (0, 0), (-1, -1), colors.HexColor("#1F2933")),
|
|
("FONTNAME", (0, 0), (-1, -1), "Helvetica"),
|
|
("FONTNAME", (0, 0), (0, -1), "Helvetica-Bold"),
|
|
("FONTNAME", (2, 0), (2, -1), "Helvetica-Bold"),
|
|
("FONTSIZE", (0, 0), (-1, -1), 7.5),
|
|
("GRID", (0, 0), (-1, -1), 0.35, colors.HexColor("#C9D7E5")),
|
|
("VALIGN", (0, 0), (-1, -1), "MIDDLE"),
|
|
("ROWBACKGROUNDS", (0, 1), (-1, -1), [colors.white, colors.HexColor("#F6FAFD")]),
|
|
]
|
|
)
|
|
)
|
|
return table
|
|
|
|
|
|
def make_daily_table(summary: list[DailySummary]) -> Table:
|
|
data = daily_table_data(summary)
|
|
if len(data) <= 18:
|
|
return style_daily_table(Table(data, colWidths=[38 * mm, 28 * mm, 28 * mm, 32 * mm], repeatRows=1))
|
|
|
|
headers, rows = data[0], data[1:]
|
|
split_at = (len(rows) + 1) // 2
|
|
left = [headers] + rows[:split_at]
|
|
right = [headers] + rows[split_at:]
|
|
left_table = style_daily_table(Table(left, colWidths=[35 * mm, 25 * mm, 25 * mm, 29 * mm], repeatRows=1))
|
|
right_table = style_daily_table(Table(right, colWidths=[35 * mm, 25 * mm, 25 * mm, 29 * mm], repeatRows=1))
|
|
wrapper = Table([[left_table, right_table]], colWidths=[124 * mm, 124 * mm])
|
|
wrapper.setStyle(
|
|
TableStyle(
|
|
[
|
|
("VALIGN", (0, 0), (-1, -1), "TOP"),
|
|
("LEFTPADDING", (0, 0), (-1, -1), 0),
|
|
("RIGHTPADDING", (0, 0), (-1, -1), 0),
|
|
]
|
|
)
|
|
)
|
|
return wrapper
|
|
|
|
|
|
def daily_table_data(summary: list[DailySummary]) -> list[list[str]]:
|
|
data = [["Date", "High C", "Min C", "Average C"]]
|
|
for row in summary:
|
|
data.append([row.day.isoformat(), f"{row.high:.1f}", f"{row.low:.1f}", f"{row.average:.1f}"])
|
|
return data
|
|
|
|
|
|
def style_daily_table(table: Table) -> Table:
|
|
table.setStyle(
|
|
TableStyle(
|
|
[
|
|
("BACKGROUND", (0, 0), (-1, 0), colors.HexColor("#4472C4")),
|
|
("TEXTCOLOR", (0, 0), (-1, 0), colors.white),
|
|
("FONTNAME", (0, 0), (-1, 0), "Helvetica-Bold"),
|
|
("FONTNAME", (0, 1), (-1, -1), "Helvetica"),
|
|
("FONTSIZE", (0, 0), (-1, -1), 7.6),
|
|
("ALIGN", (1, 1), (-1, -1), "RIGHT"),
|
|
("ALIGN", (0, 0), (-1, 0), "CENTER"),
|
|
("GRID", (0, 0), (-1, -1), 0.3, colors.HexColor("#E0E7EF")),
|
|
("ROWBACKGROUNDS", (0, 1), (-1, -1), [colors.white, colors.HexColor("#D7F0FA")]),
|
|
("BOTTOMPADDING", (0, 0), (-1, -1), 2),
|
|
("TOPPADDING", (0, 0), (-1, -1), 2),
|
|
]
|
|
)
|
|
)
|
|
return table
|
|
|
|
|
|
def footer(canvas, doc) -> None:
|
|
canvas.saveState()
|
|
canvas.setFont("Helvetica", 8)
|
|
canvas.setFillColor(colors.HexColor("#667085"))
|
|
canvas.drawRightString(285 * mm, 9 * mm, f"Page {doc.page}")
|
|
canvas.restoreState()
|
|
|
|
|
|
def format_period(start_date: date, end_date: date) -> str:
|
|
if start_date == end_date:
|
|
return start_date.strftime("%d %B %Y")
|
|
if start_date.year == end_date.year and start_date.month == end_date.month:
|
|
return f"{start_date:%d} to {end_date:%d %B %Y}"
|
|
return f"{start_date:%d %B %Y} to {end_date:%d %B %Y}"
|
|
|
|
|
|
def format_average(values: list[float], suffix: str) -> str:
|
|
if not values:
|
|
return "n/a"
|
|
return f"{sum(values) / len(values):.1f}{suffix}"
|
|
|
|
|
|
def safe_filename(value: str) -> str:
|
|
safe = "".join(char.lower() if char.isalnum() else "-" for char in value)
|
|
return "-".join(part for part in safe.split("-") if part) or "device"
|