from __future__ import annotations from collections import defaultdict from dataclasses import dataclass from datetime import date, datetime, timezone from pathlib import Path from zoneinfo import ZoneInfo from PIL import Image, ImageDraw, ImageFont from reportlab.lib import colors from reportlab.lib.enums import TA_CENTER from reportlab.lib.pagesizes import A4, landscape from reportlab.lib.styles import ParagraphStyle, getSampleStyleSheet from reportlab.lib.units import mm from reportlab.platypus import ( Image as RLImage, PageBreak, Paragraph, SimpleDocTemplate, Spacer, Table, TableStyle, ) from sqlalchemy import select from sqlalchemy.orm import Session, joinedload from app.models import Device, Reading @dataclass(frozen=True) class DailySummary: day: date high: float low: float average: float @dataclass(frozen=True) class ReportDevice: name: str readings: list[Reading] summary: list[DailySummary] def build_temperature_pdf( session: Session, output_dir: Path, chart_dir: Path, start_utc: datetime, end_utc: datetime, start_date: date, end_date: date, tz: ZoneInfo, device_id: str = "", ) -> Path: output_dir.mkdir(parents=True, exist_ok=True) chart_dir.mkdir(parents=True, exist_ok=True) devices = load_report_devices(session, start_utc, end_utc, tz, device_id) generated_at = datetime.now(tz) filename = f"temperature-report-{start_date:%Y%m%d}-{end_date:%Y%m%d}" if device_id and len(devices) == 1: filename += f"-{safe_filename(devices[0].name)}" pdf_path = output_dir / f"{filename}.pdf" styles = getSampleStyleSheet() title_style = ParagraphStyle( "RecordTitle", parent=styles["Title"], fontName="Helvetica-Bold", fontSize=20, leading=24, textColor=colors.HexColor("#1F4E78"), alignment=TA_CENTER, spaceAfter=5, ) subtitle_style = ParagraphStyle( "Subtitle", parent=styles["Normal"], fontName="Helvetica", fontSize=9, leading=11, textColor=colors.HexColor("#475467"), alignment=TA_CENTER, spaceAfter=8, ) section_style = ParagraphStyle( "Section", parent=styles["Heading2"], fontName="Helvetica-Bold", fontSize=14, leading=17, textColor=colors.HexColor("#1F2933"), spaceBefore=2, spaceAfter=5, ) period_label = format_period(start_date, end_date) doc = SimpleDocTemplate( str(pdf_path), pagesize=landscape(A4), leftMargin=12 * mm, rightMargin=12 * mm, topMargin=10 * mm, bottomMargin=12 * mm, title=f"Temperature Report - {period_label}", ) story = [] if not devices: story.extend( [ Paragraph(f"Temperature Records - {period_label}", title_style), Paragraph("No readings found for this period.", subtitle_style), ] ) else: first_device = True for device in devices: if not first_device: story.append(PageBreak()) first_device = False chart_path = chart_dir / f"{safe_filename(device.name)}-{start_date:%Y%m%d}-{end_date:%Y%m%d}.png" draw_chart(device.summary, f"{device.name} daily temperature trend", chart_path) story.append(Paragraph(f"Temperature Records - {period_label}", title_style)) story.append( Paragraph( f"Daily high, minimum, and average temperature summaries. Generated {generated_at:%Y-%m-%d %H:%M %Z}.", subtitle_style, ) ) story.append(Paragraph(device.name, section_style)) chart = RLImage(str(chart_path), width=141 * mm, height=41.5 * mm) top_row = Table( [[make_stat_cards(device, period_label, tz), chart]], colWidths=[143 * mm, 142 * mm], ) top_row.setStyle( TableStyle( [ ("VALIGN", (0, 0), (-1, -1), "TOP"), ("LEFTPADDING", (0, 0), (-1, -1), 0), ("RIGHTPADDING", (0, 0), (-1, -1), 0), ] ) ) story.append(top_row) story.append(Spacer(1, 4 * mm)) story.append(make_daily_table(device.summary)) doc.build(story, onFirstPage=footer, onLaterPages=footer) return pdf_path def load_report_devices( session: Session, start_utc: datetime, end_utc: datetime, tz: ZoneInfo, device_id: str = "", ) -> list[ReportDevice]: stmt = ( select(Reading) .options(joinedload(Reading.device)) .join(Reading.device) .where(Reading.recorded_at >= start_utc, Reading.recorded_at < end_utc) .order_by(Device.name, Reading.recorded_at) ) if device_id: stmt = stmt.where(Device.id == device_id) grouped: dict[str, list[Reading]] = defaultdict(list) names: dict[str, str] = {} for reading in session.scalars(stmt): if reading.device.device_type != "WoIOSensor": continue grouped[reading.device_id].append(reading) names[reading.device_id] = reading.device.name report_devices = [] for item_device_id in sorted(grouped.keys(), key=lambda value: names[value].lower()): readings = grouped[item_device_id] summary = daily_summary(readings, tz) if summary: report_devices.append(ReportDevice(names[item_device_id], readings, summary)) return report_devices def daily_summary(readings: list[Reading], tz: ZoneInfo) -> list[DailySummary]: temperatures_by_day: dict[date, list[float]] = defaultdict(list) for reading in readings: if reading.temperature is None: continue local_time = reading.recorded_at.replace(tzinfo=timezone.utc).astimezone(tz) temperatures_by_day[local_time.date()].append(float(reading.temperature)) summaries = [] for day in sorted(temperatures_by_day): values = temperatures_by_day[day] summaries.append(DailySummary(day, max(values), min(values), sum(values) / len(values))) return summaries def font(size: int, bold: bool = False) -> ImageFont.FreeTypeFont | ImageFont.ImageFont: candidates = [ "/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf" if bold else "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", "C:/Windows/Fonts/arialbd.ttf" if bold else "C:/Windows/Fonts/arial.ttf", "C:/Windows/Fonts/calibrib.ttf" if bold else "C:/Windows/Fonts/calibri.ttf", ] for candidate in candidates: try: return ImageFont.truetype(candidate, size) except OSError: continue return ImageFont.load_default() def draw_chart(summary: list[DailySummary], title: str, output_path: Path) -> None: width, height = 1050, 310 left, top, right, bottom = 64, 31, 28, 56 plot_w = width - left - right plot_h = height - top - bottom image = Image.new("RGB", (width, height), "white") draw = ImageDraw.Draw(image) series = [ ("High", [row.high for row in summary], (21, 101, 150)), ("Min", [row.low for row in summary], (221, 107, 45)), ("Average", [row.average for row in summary], (29, 111, 54)), ] values = [value for _, values, _ in series for value in values] y_min = max(0, int(min(values) // 5 * 5)) y_max = int((max(values) + 4.999) // 5 * 5) if y_max == y_min: y_max += 5 title_font = font(22, True) label_font = font(15) small_font = font(12) draw.text((width // 2, 8), title, fill=(30, 41, 51), font=title_font, anchor="ma") def x_pos(idx: int) -> float: if len(summary) == 1: return left + plot_w / 2 return left + idx * plot_w / (len(summary) - 1) def y_pos(value: float) -> float: return top + (y_max - value) * plot_h / (y_max - y_min) for tick in range(y_min, y_max + 1, 5): y = y_pos(tick) draw.line((left, y, width - right, y), fill=(215, 221, 226), width=1) draw.text((left - 12, y), f"{tick}", fill=(55, 65, 81), font=small_font, anchor="rm") draw.line((left, top, left, top + plot_h), fill=(99, 110, 123), width=2) draw.line((left, top + plot_h, width - right, top + plot_h), fill=(99, 110, 123), width=2) tick_indexes = list(range(0, len(summary), max(1, len(summary) // 7))) if len(summary) - 1 not in tick_indexes: tick_indexes.append(len(summary) - 1) for idx in tick_indexes: x = x_pos(idx) draw.line((x, top, x, top + plot_h), fill=(235, 238, 241), width=1) draw.text((x, top + plot_h + 11), summary[idx].day.strftime("%d %b"), fill=(55, 65, 81), font=small_font, anchor="ma") for name, values, color in series: points = [(x_pos(idx), y_pos(value)) for idx, value in enumerate(values)] if len(points) > 1: draw.line(points, fill=color, width=4) for x, y in points: draw.ellipse((x - 3, y - 3, x + 3, y + 3), fill=color) legend_x = left + 330 legend_y = height - 24 for name, _, color in series: draw.line((legend_x, legend_y, legend_x + 28, legend_y), fill=color, width=4) draw.text((legend_x + 36, legend_y), f"{name} C", fill=(30, 41, 51), font=label_font, anchor="lm") legend_x += 160 output_path.parent.mkdir(parents=True, exist_ok=True) image.save(output_path) def make_stat_cards(device: ReportDevice, period_label: str, tz: ZoneInfo) -> Table: readings = device.readings summary = device.summary temperatures = [float(reading.temperature) for reading in readings if reading.temperature is not None] humidity_values = [float(reading.humidity) for reading in readings if reading.humidity is not None] start = readings[0].recorded_at.replace(tzinfo=timezone.utc).astimezone(tz).strftime("%d %b %Y %H:%M") end = readings[-1].recorded_at.replace(tzinfo=timezone.utc).astimezone(tz).strftime("%d %b %Y %H:%M") stats = [ ["Device", device.name, "Period", period_label], ["Readings", f"{len(readings):,}", "Days", f"{len(summary)}"], ["First", start, "Last", end], ["Highest", f"{max(temperatures):.1f} C", "Lowest", f"{min(temperatures):.1f} C"], ["Average", f"{sum(temperatures) / len(temperatures):.1f} C", "Avg RH", format_average(humidity_values, "%")], ] table = Table(stats, colWidths=[22 * mm, 47 * mm, 20 * mm, 52 * mm]) table.setStyle( TableStyle( [ ("BACKGROUND", (0, 0), (-1, 0), colors.HexColor("#D9EAF7")), ("TEXTCOLOR", (0, 0), (-1, -1), colors.HexColor("#1F2933")), ("FONTNAME", (0, 0), (-1, -1), "Helvetica"), ("FONTNAME", (0, 0), (0, -1), "Helvetica-Bold"), ("FONTNAME", (2, 0), (2, -1), "Helvetica-Bold"), ("FONTSIZE", (0, 0), (-1, -1), 7.5), ("GRID", (0, 0), (-1, -1), 0.35, colors.HexColor("#C9D7E5")), ("VALIGN", (0, 0), (-1, -1), "MIDDLE"), ("ROWBACKGROUNDS", (0, 1), (-1, -1), [colors.white, colors.HexColor("#F6FAFD")]), ] ) ) return table def make_daily_table(summary: list[DailySummary]) -> Table: data = daily_table_data(summary) if len(data) <= 18: return style_daily_table(Table(data, colWidths=[38 * mm, 28 * mm, 28 * mm, 32 * mm], repeatRows=1)) headers, rows = data[0], data[1:] split_at = (len(rows) + 1) // 2 left = [headers] + rows[:split_at] right = [headers] + rows[split_at:] left_table = style_daily_table(Table(left, colWidths=[35 * mm, 25 * mm, 25 * mm, 29 * mm], repeatRows=1)) right_table = style_daily_table(Table(right, colWidths=[35 * mm, 25 * mm, 25 * mm, 29 * mm], repeatRows=1)) wrapper = Table([[left_table, right_table]], colWidths=[124 * mm, 124 * mm]) wrapper.setStyle( TableStyle( [ ("VALIGN", (0, 0), (-1, -1), "TOP"), ("LEFTPADDING", (0, 0), (-1, -1), 0), ("RIGHTPADDING", (0, 0), (-1, -1), 0), ] ) ) return wrapper def daily_table_data(summary: list[DailySummary]) -> list[list[str]]: data = [["Date", "High C", "Min C", "Average C"]] for row in summary: data.append([row.day.isoformat(), f"{row.high:.1f}", f"{row.low:.1f}", f"{row.average:.1f}"]) return data def style_daily_table(table: Table) -> Table: table.setStyle( TableStyle( [ ("BACKGROUND", (0, 0), (-1, 0), colors.HexColor("#4472C4")), ("TEXTCOLOR", (0, 0), (-1, 0), colors.white), ("FONTNAME", (0, 0), (-1, 0), "Helvetica-Bold"), ("FONTNAME", (0, 1), (-1, -1), "Helvetica"), ("FONTSIZE", (0, 0), (-1, -1), 7.6), ("ALIGN", (1, 1), (-1, -1), "RIGHT"), ("ALIGN", (0, 0), (-1, 0), "CENTER"), ("GRID", (0, 0), (-1, -1), 0.3, colors.HexColor("#E0E7EF")), ("ROWBACKGROUNDS", (0, 1), (-1, -1), [colors.white, colors.HexColor("#D7F0FA")]), ("BOTTOMPADDING", (0, 0), (-1, -1), 2), ("TOPPADDING", (0, 0), (-1, -1), 2), ] ) ) return table def footer(canvas, doc) -> None: canvas.saveState() canvas.setFont("Helvetica", 8) canvas.setFillColor(colors.HexColor("#667085")) canvas.drawRightString(285 * mm, 9 * mm, f"Page {doc.page}") canvas.restoreState() def format_period(start_date: date, end_date: date) -> str: if start_date == end_date: return start_date.strftime("%d %B %Y") if start_date.year == end_date.year and start_date.month == end_date.month: return f"{start_date:%d} to {end_date:%d %B %Y}" return f"{start_date:%d %B %Y} to {end_date:%d %B %Y}" def format_average(values: list[float], suffix: str) -> str: if not values: return "n/a" return f"{sum(values) / len(values):.1f}{suffix}" def safe_filename(value: str) -> str: safe = "".join(char.lower() if char.isalnum() else "-" for char in value) return "-".join(part for part in safe.split("-") if part) or "device"