diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..33f294a --- /dev/null +++ b/.env.example @@ -0,0 +1,30 @@ +# MQTT Configuration +MQTT_HOST=ikarus.egfh.internal +MQTT_PORT=1883 +MQTT_USERNAME= +MQTT_PASSWORD= +MQTT_TOPIC=weather/+ +MQTT_CLIENT_ID=wxconnect + +# Database Configuration +DB_TYPE=oracle # oracle, mssql, postgresql, mysql +DB_HOST=localhost +DB_PORT=1521 +DB_NAME=xe +DB_USERNAME=weather +DB_PASSWORD=password +DB_SCHEMA=weather + +# Oracle specific (if using Oracle) +ORACLE_SID=xe + +# MS SQL specific (if using MS SQL) +MSSQL_DRIVER=ODBC Driver 17 for SQL Server + +# Logging +LOG_LEVEL=INFO + +# Application Settings +RECONNECT_INTERVAL=30 +BATCH_SIZE=10 +BATCH_TIMEOUT=60 \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..84c6e80 --- /dev/null +++ b/.gitignore @@ -0,0 +1,14 @@ +.env +__pycache__/ +*.pyc +*.pyo +*.pyd +.Python +*.so +.coverage +.pytest_cache/ +.vscode/ +.idea/ +*.log +logs/ +data/ \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..4f7336e --- /dev/null +++ b/Dockerfile @@ -0,0 +1,60 @@ +FROM python:3.11-slim + +# Set environment variables +ENV PYTHONPATH=/app/src +ENV PYTHONUNBUFFERED=1 + +# Install system dependencies +RUN apt-get update && apt-get install -y \ + gcc \ + g++ \ + unixodbc-dev \ + curl \ + unzip \ + libaio-dev \ + && rm -rf /var/lib/apt/lists/* + +# Install Oracle Instant Client for cx_Oracle +RUN mkdir -p /opt/oracle && \ + cd /opt/oracle && \ + curl -o instantclient-basic-linux.x64-21.9.0.0.0dbru.zip \ + https://download.oracle.com/otn_software/linux/instantclient/219000/instantclient-basic-linux.x64-21.9.0.0.0dbru.zip && \ + unzip instantclient-basic-linux.x64-21.9.0.0.0dbru.zip && \ + rm instantclient-basic-linux.x64-21.9.0.0.0dbru.zip && \ + cd instantclient_21_9 && \ + mkdir -p lib && \ + ln -s ../libclntsh.so.21.1 lib/libclntsh.so && \ + ln -s ../libnnz21.so lib/libnnz21.so && \ + ln -s ../libociicus.so lib/libociicus.so && \ + echo /opt/oracle/instantclient_21_9 > /etc/ld.so.conf.d/oracle-instantclient.conf && \ + ldconfig + +# Set Oracle environment variables +ENV ORACLE_HOME=/opt/oracle/instantclient_21_9 +ENV LD_LIBRARY_PATH=/opt/oracle/instantclient_21_9:$LD_LIBRARY_PATH +ENV PATH=/opt/oracle/instantclient_21_9:$PATH + +# Create app directory +WORKDIR /app + +# Copy requirements and install Python dependencies +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Copy application code +COPY src/ ./src/ + +# Create logs directory +RUN mkdir -p /app/logs + +# Create non-root user +RUN groupadd -r wxconnect && useradd -r -g wxconnect wxconnect +RUN chown -R wxconnect:wxconnect /app +USER wxconnect + +# Health check +HEALTHCHECK --interval=30s --timeout=10s --start-period=60s --retries=3 \ + CMD python -c "import sys; sys.path.insert(0, '/app/src'); from wxconnect.config import Config; c = Config(); print('OK' if c.validate() else 'FAIL')" || exit 1 + +# Entry point +CMD ["python", "-m", "wxconnect.main"] \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..7207a82 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,102 @@ +version: '3.8' + +services: + wxconnect: + build: . + container_name: wxconnect + restart: unless-stopped + environment: + # MQTT Configuration + MQTT_HOST: ikarus.egfh.internal + MQTT_PORT: 1883 + MQTT_TOPIC: weather/loop + MQTT_CLIENT_ID: wxconnect-docker + + # Database Configuration - Oracle on pve-ora19c-1 + DB_TYPE: oracle + DB_HOST: pve-ora19c-1 + DB_PORT: 1521 + DB_NAME: shed.pattinson.org + DB_USERNAME: C##WEATHER + DB_PASSWORD: weather123 + DB_SCHEMA: C##WEATHER + + # Oracle specific + ORACLE_SID: shed.pattinson.org + + # Logging + LOG_LEVEL: DEBUG + + # Application Settings + RECONNECT_INTERVAL: 30 + BATCH_SIZE: 1 + BATCH_TIMEOUT: 60 + + volumes: + - ./logs:/app/logs + - ./src:/app/src + - ./.env:/app/.env:ro # Mount environment file if it exists + + networks: + - wxconnect-network + + # Remove dependency on local Oracle container since using external Oracle + # depends_on: + # - oracle-db + + # Example Oracle database service (commented out - using external Oracle) + # oracle-db: + # image: gvenzl/oracle-xe:21-slim + # container_name: oracle-wxconnect + # restart: unless-stopped + # environment: + # ORACLE_PASSWORD: password + # APP_USER: weather + # APP_USER_PASSWORD: password + # ports: + # - "1521:1521" + # volumes: + # - oracle_data:/opt/oracle/oradata + # networks: + # - wxconnect-network + + # Example PostgreSQL database service (alternative to Oracle) + # postgres-db: + # image: postgres:15 + # container_name: postgres-wxconnect + # restart: unless-stopped + # environment: + # POSTGRES_DB: weather + # POSTGRES_USER: weather + # POSTGRES_PASSWORD: password + # ports: + # - "5432:5432" + # volumes: + # - postgres_data:/var/lib/postgresql/data + # networks: + # - wxconnect-network + + # Example MS SQL Server database service (alternative) + # mssql-db: + # image: mcr.microsoft.com/mssql/server:2022-latest + # container_name: mssql-wxconnect + # restart: unless-stopped + # environment: + # SA_PASSWORD: "YourStrong@Passw0rd" + # ACCEPT_EULA: "Y" + # MSSQL_DB: weather + # ports: + # - "1433:1433" + # volumes: + # - mssql_data:/var/opt/mssql + # networks: + # - wxconnect-network + +volumes: + oracle_data: + # postgres_data: + # mssql_data: + +networks: + wxconnect-network: + driver: bridge \ No newline at end of file diff --git a/oracle-setup/01_create_user_common.sql b/oracle-setup/01_create_user_common.sql new file mode 100644 index 0000000..925ee2d --- /dev/null +++ b/oracle-setup/01_create_user_common.sql @@ -0,0 +1,45 @@ +-- Oracle Common User Setup for Weather Connect +-- Run this as SYSTEM or SYSDBA user +-- This creates a common user that works across all containers + +-- 1. Check current environment +COLUMN is_cdb FORMAT A10 +SELECT DECODE(cdb, 'YES', 'CDB', 'Regular') as is_cdb FROM v$database; +SHOW CON_NAME; + +-- 2. Create common user (prefixed with C##) +CREATE USER C##weather IDENTIFIED BY weather123 +DEFAULT TABLESPACE USERS +TEMPORARY TABLESPACE TEMP +QUOTA UNLIMITED ON USERS +CONTAINER=ALL; + +-- 3. Grant necessary privileges +GRANT CONNECT TO C##weather CONTAINER=ALL; +GRANT RESOURCE TO C##weather CONTAINER=ALL; +GRANT CREATE SESSION TO C##weather CONTAINER=ALL; +GRANT CREATE TABLE TO C##weather CONTAINER=ALL; +GRANT CREATE SEQUENCE TO C##weather CONTAINER=ALL; +GRANT CREATE VIEW TO C##weather CONTAINER=ALL; +-- Note: CREATE INDEX privilege is included in RESOURCE role +-- GRANT CREATE INDEX TO C##weather CONTAINER=ALL; -- This is redundant with RESOURCE + +-- Optional: Grant additional privileges if needed +-- GRANT CREATE PROCEDURE TO C##weather CONTAINER=ALL; +-- GRANT CREATE TRIGGER TO C##weather CONTAINER=ALL; + +COMMIT; + +-- 4. Verify user creation +SELECT username, common, default_tablespace, temporary_tablespace, account_status +FROM dba_users +WHERE username = 'C##WEATHER'; + +-- 5. Display granted privileges +SELECT grantee, privilege +FROM dba_sys_privs +WHERE grantee = 'C##WEATHER' +ORDER BY privilege; + +-- 6. Test connection as common user +-- CONNECT C##weather/weather123@pve-ora19c-1:1521/SHED; \ No newline at end of file diff --git a/oracle-setup/01_create_user_pdb.sql b/oracle-setup/01_create_user_pdb.sql new file mode 100644 index 0000000..ea566d6 --- /dev/null +++ b/oracle-setup/01_create_user_pdb.sql @@ -0,0 +1,52 @@ +-- Oracle CDB/PDB Setup for Weather Connect +-- Run this as SYSTEM or SYSDBA user + +-- 1. Check the current environment +COLUMN is_cdb FORMAT A10 +COLUMN con_name FORMAT A20 +SELECT DECODE(cdb, 'YES', 'CDB', 'Regular') as is_cdb FROM v$database; +SHOW CON_NAME; + +-- 2. Show available PDBs (if in CDB) +SELECT name, open_mode FROM v$pdbs; + +-- 3. Connect to the SHED PDB (assuming SHED is your PDB name) +-- If SHED is the PDB name, connect to it: +ALTER SESSION SET CONTAINER = SHDPDB1; +SHOW CON_NAME; + +-- 4. Now create the regular user in the PDB +CREATE USER weather IDENTIFIED BY weather123 +DEFAULT TABLESPACE USERS +TEMPORARY TABLESPACE TEMP +QUOTA UNLIMITED ON USERS; + +-- 5. Grant necessary privileges +GRANT CONNECT TO weather; +GRANT RESOURCE TO weather; +GRANT CREATE SESSION TO weather; +GRANT CREATE TABLE TO weather; +GRANT CREATE SEQUENCE TO weather; +GRANT CREATE VIEW TO weather; +-- Note: CREATE INDEX privilege is included in RESOURCE role +-- GRANT CREATE INDEX TO weather; -- This is redundant with RESOURCE + +-- Optional: Grant additional privileges if needed +-- GRANT CREATE PROCEDURE TO weather; +-- GRANT CREATE TRIGGER TO weather; + +COMMIT; + +-- 6. Verify user creation +SELECT username, default_tablespace, temporary_tablespace, account_status +FROM dba_users +WHERE username = 'WEATHER'; + +-- 7. Display granted privileges +SELECT grantee, privilege +FROM dba_sys_privs +WHERE grantee = 'WEATHER' +ORDER BY privilege; + +-- 8. Test connection as weather user +-- CONNECT weather/weather123@pve-ora19c-1:1521/SHDPDB1; \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..4fd35d2 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,10 @@ +paho-mqtt==1.6.1 +oracledb==2.0.1 +pyodbc==4.0.39 +psycopg2-binary==2.9.7 +PyMySQL==1.1.0 +sqlalchemy==2.0.23 +python-dotenv==1.0.0 +pydantic==2.5.0 +PyYAML==6.0.1 +structlog==23.2.0 \ No newline at end of file diff --git a/src/wxconnect/__init__.py b/src/wxconnect/__init__.py new file mode 100644 index 0000000..60a1feb --- /dev/null +++ b/src/wxconnect/__init__.py @@ -0,0 +1,5 @@ +"""Weather Connect - MQTT to Database Bridge""" + +__version__ = "1.0.0" +__author__ = "Your Name" +__description__ = "A Docker-based Python application that subscribes to MQTT weather data and inserts it into various databases." \ No newline at end of file diff --git a/src/wxconnect/config.py b/src/wxconnect/config.py new file mode 100644 index 0000000..2574c22 --- /dev/null +++ b/src/wxconnect/config.py @@ -0,0 +1,86 @@ +"""Configuration management for Weather Connect.""" + +import os +from typing import Optional +from dataclasses import dataclass +from dotenv import load_dotenv + + +@dataclass +class MQTTConfig: + """MQTT connection configuration.""" + host: str + port: int + username: Optional[str] + password: Optional[str] + topic: str + client_id: str + + +@dataclass +class DatabaseConfig: + """Database connection configuration.""" + db_type: str + host: str + port: int + name: str + username: str + password: str + schema: Optional[str] = None + oracle_sid: Optional[str] = None + mssql_driver: Optional[str] = None + + +@dataclass +class AppConfig: + """Application configuration.""" + log_level: str + reconnect_interval: int + batch_size: int + batch_timeout: int + + +class Config: + """Main configuration class.""" + + def __init__(self, env_file: str = ".env"): + """Initialize configuration from environment variables.""" + load_dotenv(env_file) + + self.mqtt = MQTTConfig( + host=os.getenv("MQTT_HOST", "localhost"), + port=int(os.getenv("MQTT_PORT", "1883")), + username=os.getenv("MQTT_USERNAME"), + password=os.getenv("MQTT_PASSWORD"), + topic=os.getenv("MQTT_TOPIC", "weather/+"), + client_id=os.getenv("MQTT_CLIENT_ID", "wxconnect") + ) + + self.database = DatabaseConfig( + db_type=os.getenv("DB_TYPE", "oracle"), + host=os.getenv("DB_HOST", "localhost"), + port=int(os.getenv("DB_PORT", "1521")), + name=os.getenv("DB_NAME", "xe"), + username=os.getenv("DB_USERNAME", "weather"), + password=os.getenv("DB_PASSWORD", "password"), + schema=os.getenv("DB_SCHEMA"), + oracle_sid=os.getenv("ORACLE_SID"), + mssql_driver=os.getenv("MSSQL_DRIVER", "ODBC Driver 17 for SQL Server") + ) + + self.app = AppConfig( + log_level=os.getenv("LOG_LEVEL", "INFO"), + reconnect_interval=int(os.getenv("RECONNECT_INTERVAL", "30")), + batch_size=int(os.getenv("BATCH_SIZE", "10")), + batch_timeout=int(os.getenv("BATCH_TIMEOUT", "60")) + ) + + def validate(self) -> bool: + """Validate configuration.""" + if not self.mqtt.host: + return False + if not self.database.host: + return False + if self.database.db_type not in ["oracle", "mssql", "postgresql", "mysql"]: + return False + return True \ No newline at end of file diff --git a/src/wxconnect/data_processor.py b/src/wxconnect/data_processor.py new file mode 100644 index 0000000..548ecda --- /dev/null +++ b/src/wxconnect/data_processor.py @@ -0,0 +1,417 @@ +"""Data processing logic for weather data.""" + +import json +import re +from datetime import datetime +from typing import Any, Dict, Optional +import structlog +from .database.base import WeatherData + +logger = structlog.get_logger(__name__) + + +class WeatherDataProcessor: + """Processes MQTT weather data and converts it to database format.""" + + def __init__(self): + """Initialize the weather data processor.""" + self.field_mappings = { + # Common field name variations + 'temp': 'temperature', + 'temperature_c': 'temperature', + 'temperature_f': 'temperature', + 'temp_c': 'temperature', + 'temp_f': 'temperature', + 'outTemp_C': 'temperature', + 'humid': 'humidity', + 'humidity_pct': 'humidity', + 'rh': 'humidity', + 'press': 'pressure', + 'pressure_hpa': 'pressure', + 'pressure_mbar': 'pressure', + 'barometric_pressure': 'pressure', + 'wind_speed_ms': 'wind_speed', + 'wind_speed_kmh': 'wind_speed', + 'wind_speed_mph': 'wind_speed', + 'wind_speed_knot': 'wind_speed', + 'windSpeed_knot': 'wind_speed', + 'windspeed': 'wind_speed', + 'wind_dir': 'wind_direction', + 'wind_dir': 'wind_direction', + 'windDir': 'wind_direction', + 'wind_direction_deg': 'wind_direction', + 'winddirection': 'wind_direction', + 'rain': 'rainfall', + 'rainfall_mm': 'rainfall', + 'precipitation': 'rainfall', + 'precip': 'rainfall', + 'rain_cm': 'rainfall', + 'hourRain_cm': 'rainfall', + 'rain24_cm': 'rainfall', + 'dayRain_cm': 'rainfall', + 'rainRate_cm_per_hour': 'rainfall', + 'maxSolarRad_Wpm2': 'solar_radiation', + # Note: heatingTemp_hertz, heatingVoltage_hertz, and hail_hertz are frequency readings, not temperature + } + + def process_message(self, topic: str, data: Any) -> Optional[WeatherData]: + """Process an MQTT message and convert to WeatherData. + + Args: + topic: MQTT topic + data: Message payload (dict or string) + + Returns: + WeatherData object or None if processing fails + """ + try: + # Convert data to dict if it's a string + if isinstance(data, str): + try: + data = json.loads(data) + except json.JSONDecodeError: + logger.warning("Failed to parse string data as JSON", topic=topic, data=data[:100]) + # Create minimal record with raw data + return WeatherData( + timestamp=datetime.now(), + topic=topic, + raw_data=data + ) + + if not isinstance(data, dict): + logger.warning("Data is not a dictionary", topic=topic, data_type=type(data)) + return WeatherData( + timestamp=datetime.now(), + topic=topic, + raw_data=str(data) + ) + + # Extract timestamp + timestamp = self._extract_timestamp(data) + + # Extract location from topic or data + location = self._extract_location(topic, data) + + # Extract weather fields + weather_fields = self._extract_weather_fields(data) + + # Create WeatherData object + weather_data = WeatherData( + timestamp=timestamp, + topic=topic, + location=location, + temperature=weather_fields.get('temperature'), + humidity=weather_fields.get('humidity'), + pressure=weather_fields.get('pressure'), + wind_speed=weather_fields.get('wind_speed'), + wind_direction=weather_fields.get('wind_direction'), + rainfall=weather_fields.get('rainfall'), + raw_data=json.dumps(data) if isinstance(data, dict) else str(data) + ) + + logger.debug("Processed weather data", topic=topic, location=location, fields=len(weather_fields)) + return weather_data + + except Exception as e: + logger.error("Failed to process weather data", topic=topic, error=str(e)) + return None + + def _extract_timestamp(self, data: Dict[str, Any]) -> datetime: + """Extract timestamp from data or use current time. + + Args: + data: Message data + + Returns: + Datetime object + """ + timestamp_fields = ['timestamp', 'time', 'datetime', 'ts', 'date_time'] + + for field in timestamp_fields: + if field in data: + try: + value = data[field] + if isinstance(value, str): + # Try different timestamp formats + formats = [ + '%Y-%m-%dT%H:%M:%S.%fZ', + '%Y-%m-%dT%H:%M:%SZ', + '%Y-%m-%d %H:%M:%S.%f', + '%Y-%m-%d %H:%M:%S', + '%Y/%m/%d %H:%M:%S', + '%d/%m/%Y %H:%M:%S', + ] + + for fmt in formats: + try: + return datetime.strptime(value, fmt) + except ValueError: + continue + + elif isinstance(value, (int, float)): + # Unix timestamp + return datetime.fromtimestamp(value) + + except Exception as e: + logger.debug("Failed to parse timestamp field", field=field, value=value, error=str(e)) + continue + + # Default to current time + return datetime.now() + + def _extract_location(self, topic: str, data: Dict[str, Any]) -> Optional[str]: + """Extract location from topic or data. + + Args: + topic: MQTT topic + data: Message data + + Returns: + Location string or None + """ + # Check data fields first + location_fields = ['location', 'station', 'site', 'sensor_id', 'device_id', 'name'] + + for field in location_fields: + if field in data and data[field]: + return str(data[field]) + + # Extract from topic (e.g., weather/garden/temp -> garden) + topic_parts = topic.split('/') + if len(topic_parts) >= 2: + # Skip 'weather' and take next part as location + for part in topic_parts[1:]: + if part and part not in ['temp', 'humidity', 'pressure', 'wind', 'rain']: + return part + + return None + + def _extract_weather_fields(self, data: Dict[str, Any]) -> Dict[str, Optional[float]]: + """Extract weather measurement fields from data. + + Args: + data: Message data + + Returns: + Dictionary of weather fields + """ + weather_fields = { + 'temperature': None, + 'humidity': None, + 'pressure': None, + 'wind_speed': None, + 'wind_direction': None, + 'rainfall': None, + 'solar_radiation': None + } + + # Direct field mapping + for field_name, canonical_name in self.field_mappings.items(): + if field_name in data: + value = self._convert_to_float(data[field_name]) + if value is not None: + weather_fields[canonical_name] = value + + # Check for exact matches + for canonical_name in weather_fields.keys(): + if canonical_name in data: + value = self._convert_to_float(data[canonical_name]) + if value is not None: + weather_fields[canonical_name] = value + + # Special handling for temperature units + if weather_fields['temperature'] is not None: + temp_unit = self._detect_temperature_unit(data) + if temp_unit == 'F': + # Convert Fahrenheit to Celsius + weather_fields['temperature'] = (weather_fields['temperature'] - 32) * 5/9 + + # Special handling for pressure units + if weather_fields['pressure'] is not None: + pressure_unit = self._detect_pressure_unit(data) + if pressure_unit == 'inHg': + # Convert inches of mercury to hPa + weather_fields['pressure'] = weather_fields['pressure'] * 33.8639 + elif pressure_unit == 'mmHg': + # Convert mmHg to hPa + weather_fields['pressure'] = weather_fields['pressure'] * 1.33322 + + # Special handling for wind speed units + if weather_fields['wind_speed'] is not None: + wind_unit = self._detect_wind_speed_unit(data) + if wind_unit == 'knot': + # Convert knots to m/s (1 knot = 0.514444 m/s) + weather_fields['wind_speed'] = weather_fields['wind_speed'] * 0.514444 + elif wind_unit == 'mph': + # Convert mph to m/s + weather_fields['wind_speed'] = weather_fields['wind_speed'] * 0.44704 + elif wind_unit == 'kmh': + # Convert km/h to m/s + weather_fields['wind_speed'] = weather_fields['wind_speed'] * 0.277778 + + # Validate and clamp values to database constraints + weather_fields = self._validate_field_ranges(weather_fields, data) + + return weather_fields + + def _convert_to_float(self, value: Any) -> Optional[float]: + """Convert value to float if possible. + + Args: + value: Value to convert + + Returns: + Float value or None + """ + if value is None: + return None + + try: + if isinstance(value, (int, float)): + return float(value) + elif isinstance(value, str): + # Remove common non-numeric suffixes + cleaned = re.sub(r'[°%\s]*[CFcf°]?$', '', value.strip()) + cleaned = re.sub(r'[°%\s]*$', '', cleaned) + return float(cleaned) + else: + return float(value) + except (ValueError, TypeError): + return None + + def _detect_temperature_unit(self, data: Dict[str, Any]) -> str: + """Detect temperature unit from data. + + Args: + data: Message data + + Returns: + 'C' for Celsius, 'F' for Fahrenheit + """ + # Check for unit fields + if 'temp_unit' in data: + unit = str(data['temp_unit']).upper() + if 'F' in unit: + return 'F' + + # Check field names for unit indicators + for key in data.keys(): + key_lower = key.lower() + if 'temp' in key_lower: + if '_f' in key_lower or 'fahrenheit' in key_lower: + return 'F' + elif '_c' in key_lower or 'celsius' in key_lower: + return 'C' + + # Default to Celsius + return 'C' + + def _detect_pressure_unit(self, data: Dict[str, Any]) -> str: + """Detect pressure unit from data. + + Args: + data: Message data + + Returns: + 'hPa', 'inHg', or 'mmHg' + """ + # Check for unit fields + if 'pressure_unit' in data: + unit = str(data['pressure_unit']).lower() + if 'inhg' in unit or 'inch' in unit: + return 'inHg' + elif 'mmhg' in unit: + return 'mmHg' + + # Check field names for unit indicators + for key in data.keys(): + key_lower = key.lower() + if 'pressure' in key_lower or 'press' in key_lower: + if 'inhg' in key_lower or 'inch' in key_lower: + return 'inHg' + elif 'mmhg' in key_lower: + return 'mmHg' + elif 'hpa' in key_lower or 'mbar' in key_lower: + return 'hPa' + + # Default to hPa + return 'hPa' + + def _detect_wind_speed_unit(self, data: Dict[str, Any]) -> str: + """Detect wind speed unit from data. + + Args: + data: Message data + + Returns: + 'ms', 'kmh', 'mph', or 'knot' + """ + # Check for unit fields + if 'wind_speed_unit' in data: + unit = str(data['wind_speed_unit']).lower() + if 'knot' in unit or 'kt' in unit: + return 'knot' + elif 'mph' in unit: + return 'mph' + elif 'kmh' in unit or 'km/h' in unit: + return 'kmh' + + # Check field names for unit indicators + for key in data.keys(): + key_lower = key.lower() + if 'wind' in key_lower and 'speed' in key_lower: + if 'knot' in key_lower or '_kt' in key_lower: + return 'knot' + elif 'mph' in key_lower: + return 'mph' + elif 'kmh' in key_lower or 'km_h' in key_lower: + return 'kmh' + elif 'ms' in key_lower or 'm_s' in key_lower: + return 'ms' + + # Default to m/s + return 'ms' + + def _validate_field_ranges(self, fields: Dict[str, Optional[float]], raw_data: Dict[str, Any]) -> Dict[str, Optional[float]]: + """Validate and clamp field values to database constraints. + + Args: + fields: Weather fields dictionary + raw_data: Original MQTT message data + + Returns: + Validated fields dictionary + """ + # Database column constraints (precision, scale) + constraints = { + 'temperature': {'max': 999.99, 'min': -999.99}, + 'humidity': {'max': 100.00, 'min': 0.00}, + 'pressure': {'max': 9999.99, 'min': 0.00}, + 'wind_speed': {'max': 999.99, 'min': 0.00}, + 'wind_direction': {'max': 360.0, 'min': 0.0}, + 'rainfall': {'max': 9999.99, 'min': 0.00} + } + + validated_fields = {} + + for field_name, value in fields.items(): + if value is None: + validated_fields[field_name] = None + continue + + constraint = constraints.get(field_name) + if constraint: + # Check if value is out of range + if value > constraint['max'] or value < constraint['min']: + logger.warning(f"Clamping {field_name} value {value} to range [{constraint['min']}, {constraint['max']}]") + + if value > constraint['max']: + validated_fields[field_name] = constraint['max'] + else: + validated_fields[field_name] = constraint['min'] + else: + validated_fields[field_name] = value + else: + validated_fields[field_name] = value + + return validated_fields \ No newline at end of file diff --git a/src/wxconnect/database/__init__.py b/src/wxconnect/database/__init__.py new file mode 100644 index 0000000..bae2403 --- /dev/null +++ b/src/wxconnect/database/__init__.py @@ -0,0 +1,15 @@ +"""Database abstraction layer for Weather Connect.""" + +from .base import DatabaseInterface +from .oracle_db import OracleDatabase +from .mssql_db import MSSQLDatabase +from .postgresql_db import PostgreSQLDatabase +from .mysql_db import MySQLDatabase + +__all__ = [ + "DatabaseInterface", + "OracleDatabase", + "MSSQLDatabase", + "PostgreSQLDatabase", + "MySQLDatabase" +] \ No newline at end of file diff --git a/src/wxconnect/database/base.py b/src/wxconnect/database/base.py new file mode 100644 index 0000000..3cfa327 --- /dev/null +++ b/src/wxconnect/database/base.py @@ -0,0 +1,121 @@ +"""Base database interface and common functionality.""" + +from abc import ABC, abstractmethod +from typing import Dict, List, Any, Optional +from dataclasses import dataclass +from datetime import datetime +import structlog + +logger = structlog.get_logger(__name__) + + +@dataclass +class WeatherData: + """Weather data structure for database storage.""" + timestamp: datetime + topic: str + location: Optional[str] = None + temperature: Optional[float] = None + humidity: Optional[float] = None + pressure: Optional[float] = None + wind_speed: Optional[float] = None + wind_direction: Optional[float] = None + rainfall: Optional[float] = None + raw_data: Optional[str] = None + + +class DatabaseInterface(ABC): + """Abstract base class for database implementations.""" + + def __init__(self, config: Dict[str, Any]): + """Initialize database connection. + + Args: + config: Database configuration dictionary + """ + self.config = config + self.connection = None + self.schema = config.get('schema') + + @abstractmethod + def connect(self) -> bool: + """Establish database connection. + + Returns: + True if connection successful, False otherwise + """ + pass + + @abstractmethod + def disconnect(self): + """Close database connection.""" + pass + + @abstractmethod + def create_tables(self) -> bool: + """Create necessary database tables. + + Returns: + True if tables created successfully, False otherwise + """ + pass + + @abstractmethod + def insert_weather_data(self, data: WeatherData) -> bool: + """Insert weather data into the database. + + Args: + data: Weather data to insert + + Returns: + True if insertion successful, False otherwise + """ + pass + + @abstractmethod + def insert_weather_data_batch(self, data_list: List[WeatherData]) -> bool: + """Insert multiple weather data records into the database. + + Args: + data_list: List of weather data to insert + + Returns: + True if all insertions successful, False otherwise + """ + pass + + @abstractmethod + def is_connected(self) -> bool: + """Check if database connection is active. + + Returns: + True if connected, False otherwise + """ + pass + + def get_table_name(self, base_name: str) -> str: + """Get fully qualified table name with schema if specified. + + Args: + base_name: Base table name + + Returns: + Fully qualified table name + """ + if self.schema: + return f"{self.schema}.{base_name}" + return base_name + + def log_error(self, operation: str, error: Exception): + """Log database errors consistently. + + Args: + operation: The operation that failed + error: The exception that occurred + """ + logger.error( + "Database operation failed", + operation=operation, + error=str(error), + db_type=self.__class__.__name__ + ) \ No newline at end of file diff --git a/src/wxconnect/database/factory.py b/src/wxconnect/database/factory.py new file mode 100644 index 0000000..52da457 --- /dev/null +++ b/src/wxconnect/database/factory.py @@ -0,0 +1,47 @@ +"""Database factory for creating database instances.""" + +from typing import Dict, Any +from .base import DatabaseInterface +from .oracle_db import OracleDatabase +from .mssql_db import MSSQLDatabase +from .postgresql_db import PostgreSQLDatabase +from .mysql_db import MySQLDatabase +import structlog + +logger = structlog.get_logger(__name__) + + +class DatabaseFactory: + """Factory class for creating database instances.""" + + @staticmethod + def create_database(db_type: str, config: Dict[str, Any]) -> DatabaseInterface: + """Create a database instance based on the type. + + Args: + db_type: Type of database (oracle, mssql, postgresql, mysql) + config: Database configuration + + Returns: + Database instance + + Raises: + ValueError: If database type is not supported + """ + db_type = db_type.lower() + + if db_type == "oracle": + return OracleDatabase(config) + elif db_type == "mssql": + return MSSQLDatabase(config) + elif db_type == "postgresql": + return PostgreSQLDatabase(config) + elif db_type == "mysql": + return MySQLDatabase(config) + else: + raise ValueError(f"Unsupported database type: {db_type}") + + @staticmethod + def get_supported_databases() -> list: + """Get list of supported database types.""" + return ["oracle", "mssql", "postgresql", "mysql"] \ No newline at end of file diff --git a/src/wxconnect/database/mssql_db.py b/src/wxconnect/database/mssql_db.py new file mode 100644 index 0000000..f87b41a --- /dev/null +++ b/src/wxconnect/database/mssql_db.py @@ -0,0 +1,202 @@ +"""Microsoft SQL Server database implementation.""" + +import pyodbc +from typing import Dict, List, Any +import structlog +from .base import DatabaseInterface, WeatherData + +logger = structlog.get_logger(__name__) + + +class MSSQLDatabase(DatabaseInterface): + """Microsoft SQL Server database implementation.""" + + def __init__(self, config: Dict[str, Any]): + """Initialize MS SQL database connection.""" + super().__init__(config) + + def connect(self) -> bool: + """Establish MS SQL database connection.""" + try: + driver = self.config.get('mssql_driver', 'ODBC Driver 17 for SQL Server') + + connection_string = ( + f"DRIVER={{{driver}}};" + f"SERVER={self.config['host']},{self.config['port']};" + f"DATABASE={self.config['name']};" + f"UID={self.config['username']};" + f"PWD={self.config['password']};" + "TrustServerCertificate=yes;" + ) + + logger.info("Connecting to MS SQL database", host=self.config['host'], port=self.config['port']) + + self.connection = pyodbc.connect(connection_string) + self.connection.autocommit = False + + # Test the connection + cursor = self.connection.cursor() + cursor.execute("SELECT 1") + cursor.close() + + logger.info("Successfully connected to MS SQL database") + return True + + except Exception as e: + self.log_error("connect", e) + return False + + def disconnect(self): + """Close MS SQL database connection.""" + if self.connection: + try: + self.connection.close() + logger.info("Disconnected from MS SQL database") + except Exception as e: + self.log_error("disconnect", e) + finally: + self.connection = None + + def create_tables(self) -> bool: + """Create MS SQL database tables.""" + if not self.is_connected(): + return False + + try: + cursor = self.connection.cursor() + + # Create weather_data table + table_name = self.get_table_name("weather_data") + create_sql = f""" + IF NOT EXISTS (SELECT * FROM sysobjects WHERE name='{table_name.split('.')[-1]}' AND xtype='U') + CREATE TABLE {table_name} ( + id BIGINT IDENTITY(1,1) PRIMARY KEY, + timestamp DATETIME2(6) NOT NULL, + topic NVARCHAR(255) NOT NULL, + location NVARCHAR(100), + temperature DECIMAL(5,2), + humidity DECIMAL(5,2), + pressure DECIMAL(7,2), + wind_speed DECIMAL(5,2), + wind_direction DECIMAL(5,2), + rainfall DECIMAL(6,2), + raw_data NVARCHAR(MAX), + created_at DATETIME2(6) DEFAULT GETDATE() + ) + """ + + cursor.execute(create_sql) + + # Create index on timestamp and topic + index_name = f"idx_{table_name.replace('.', '_')}_ts_topic" + index_sql = f""" + IF NOT EXISTS (SELECT * FROM sys.indexes WHERE name = '{index_name}') + CREATE INDEX {index_name} ON {table_name} (timestamp, topic) + """ + cursor.execute(index_sql) + + self.connection.commit() + cursor.close() + + logger.info("Successfully created MS SQL tables", table=table_name) + return True + + except Exception as e: + self.log_error("create_tables", e) + return False + + def insert_weather_data(self, data: WeatherData) -> bool: + """Insert weather data into MS SQL database.""" + if not self.is_connected(): + return False + + try: + cursor = self.connection.cursor() + + table_name = self.get_table_name("weather_data") + insert_sql = f""" + INSERT INTO {table_name} + (timestamp, topic, location, temperature, humidity, pressure, + wind_speed, wind_direction, rainfall, raw_data) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """ + + cursor.execute(insert_sql, ( + data.timestamp, + data.topic, + data.location, + data.temperature, + data.humidity, + data.pressure, + data.wind_speed, + data.wind_direction, + data.rainfall, + data.raw_data + )) + + self.connection.commit() + cursor.close() + + logger.debug("Successfully inserted weather data into MS SQL", topic=data.topic) + return True + + except Exception as e: + self.log_error("insert_weather_data", e) + return False + + def insert_weather_data_batch(self, data_list: List[WeatherData]) -> bool: + """Insert multiple weather data records into MS SQL database.""" + if not self.is_connected() or not data_list: + return False + + try: + cursor = self.connection.cursor() + + table_name = self.get_table_name("weather_data") + insert_sql = f""" + INSERT INTO {table_name} + (timestamp, topic, location, temperature, humidity, pressure, + wind_speed, wind_direction, rainfall, raw_data) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """ + + # Prepare data for batch insert + batch_data = [ + ( + data.timestamp, + data.topic, + data.location, + data.temperature, + data.humidity, + data.pressure, + data.wind_speed, + data.wind_direction, + data.rainfall, + data.raw_data + ) + for data in data_list + ] + + cursor.executemany(insert_sql, batch_data) + self.connection.commit() + cursor.close() + + logger.info("Successfully inserted weather data batch into MS SQL", count=len(data_list)) + return True + + except Exception as e: + self.log_error("insert_weather_data_batch", e) + return False + + def is_connected(self) -> bool: + """Check if MS SQL database connection is active.""" + if not self.connection: + return False + + try: + cursor = self.connection.cursor() + cursor.execute("SELECT 1") + cursor.close() + return True + except: + return False \ No newline at end of file diff --git a/src/wxconnect/database/mysql_db.py b/src/wxconnect/database/mysql_db.py new file mode 100644 index 0000000..6a5886c --- /dev/null +++ b/src/wxconnect/database/mysql_db.py @@ -0,0 +1,189 @@ +"""MySQL database implementation.""" + +import pymysql +from typing import Dict, List, Any +import structlog +from .base import DatabaseInterface, WeatherData + +logger = structlog.get_logger(__name__) + + +class MySQLDatabase(DatabaseInterface): + """MySQL database implementation.""" + + def __init__(self, config: Dict[str, Any]): + """Initialize MySQL database connection.""" + super().__init__(config) + + def connect(self) -> bool: + """Establish MySQL database connection.""" + try: + logger.info("Connecting to MySQL database", host=self.config['host'], port=self.config['port']) + + self.connection = pymysql.connect( + host=self.config['host'], + port=self.config['port'], + database=self.config['name'], + user=self.config['username'], + password=self.config['password'], + charset='utf8mb4', + autocommit=False + ) + + # Test the connection + cursor = self.connection.cursor() + cursor.execute("SELECT 1") + cursor.close() + + logger.info("Successfully connected to MySQL database") + return True + + except Exception as e: + self.log_error("connect", e) + return False + + def disconnect(self): + """Close MySQL database connection.""" + if self.connection: + try: + self.connection.close() + logger.info("Disconnected from MySQL database") + except Exception as e: + self.log_error("disconnect", e) + finally: + self.connection = None + + def create_tables(self) -> bool: + """Create MySQL database tables.""" + if not self.is_connected(): + return False + + try: + cursor = self.connection.cursor() + + # Create weather_data table + table_name = self.get_table_name("weather_data") + create_sql = f""" + CREATE TABLE IF NOT EXISTS {table_name} ( + id BIGINT AUTO_INCREMENT PRIMARY KEY, + timestamp DATETIME(6) NOT NULL, + topic VARCHAR(255) NOT NULL, + location VARCHAR(100), + temperature DECIMAL(5,2), + humidity DECIMAL(5,2), + pressure DECIMAL(7,2), + wind_speed DECIMAL(5,2), + wind_direction DECIMAL(5,2), + rainfall DECIMAL(6,2), + raw_data TEXT, + created_at TIMESTAMP(6) DEFAULT CURRENT_TIMESTAMP(6), + INDEX idx_timestamp_topic (timestamp, topic) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 + """ + + cursor.execute(create_sql) + self.connection.commit() + cursor.close() + + logger.info("Successfully created MySQL tables", table=table_name) + return True + + except Exception as e: + self.log_error("create_tables", e) + return False + + def insert_weather_data(self, data: WeatherData) -> bool: + """Insert weather data into MySQL database.""" + if not self.is_connected(): + return False + + try: + cursor = self.connection.cursor() + + table_name = self.get_table_name("weather_data") + insert_sql = f""" + INSERT INTO {table_name} + (timestamp, topic, location, temperature, humidity, pressure, + wind_speed, wind_direction, rainfall, raw_data) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + """ + + cursor.execute(insert_sql, ( + data.timestamp, + data.topic, + data.location, + data.temperature, + data.humidity, + data.pressure, + data.wind_speed, + data.wind_direction, + data.rainfall, + data.raw_data + )) + + self.connection.commit() + cursor.close() + + logger.debug("Successfully inserted weather data into MySQL", topic=data.topic) + return True + + except Exception as e: + self.log_error("insert_weather_data", e) + return False + + def insert_weather_data_batch(self, data_list: List[WeatherData]) -> bool: + """Insert multiple weather data records into MySQL database.""" + if not self.is_connected() or not data_list: + return False + + try: + cursor = self.connection.cursor() + + table_name = self.get_table_name("weather_data") + insert_sql = f""" + INSERT INTO {table_name} + (timestamp, topic, location, temperature, humidity, pressure, + wind_speed, wind_direction, rainfall, raw_data) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + """ + + # Prepare data for batch insert + batch_data = [ + ( + data.timestamp, + data.topic, + data.location, + data.temperature, + data.humidity, + data.pressure, + data.wind_speed, + data.wind_direction, + data.rainfall, + data.raw_data + ) + for data in data_list + ] + + cursor.executemany(insert_sql, batch_data) + self.connection.commit() + cursor.close() + + logger.info("Successfully inserted weather data batch into MySQL", count=len(data_list)) + return True + + except Exception as e: + self.log_error("insert_weather_data_batch", e) + return False + + def is_connected(self) -> bool: + """Check if MySQL database connection is active.""" + if not self.connection: + return False + + try: + cursor = self.connection.cursor() + cursor.execute("SELECT 1") + cursor.close() + return True + except: + return False \ No newline at end of file diff --git a/src/wxconnect/database/oracle_db.py b/src/wxconnect/database/oracle_db.py new file mode 100644 index 0000000..00d5c22 --- /dev/null +++ b/src/wxconnect/database/oracle_db.py @@ -0,0 +1,252 @@ +"""Oracle database implementation.""" + +import oracledb +from typing import Dict, List, Any +import structlog +from .base import DatabaseInterface, WeatherData + +logger = structlog.get_logger(__name__) + + +class OracleDatabase(DatabaseInterface): + """Oracle database implementation.""" + + def __init__(self, config: Dict[str, Any]): + """Initialize Oracle database connection.""" + super().__init__(config) + self.dsn = None + + def get_table_name(self, base_name: str) -> str: + """Get table name for Oracle (no schema prefix needed when connected as owner).""" + return base_name + + def connect(self) -> bool: + """Establish Oracle database connection.""" + try: + # Use the working connection string we validated + connect_string = f"{self.config['host']}:{self.config['port']}/{self.config['name']}" + + logger.info("Attempting Oracle database connection", + host=self.config['host'], + port=self.config['port']) + + logger.info("Connecting to Oracle", connect_string=connect_string) + + self.connection = oracledb.connect( + user=self.config['username'], + password=self.config['password'], + dsn=connect_string + ) + + # Test the connection + cursor = self.connection.cursor() + cursor.execute("SELECT 1 FROM DUAL") + result = cursor.fetchone() + cursor.close() + + logger.info("Successfully connected to Oracle database") + return True + + except Exception as e: + self.log_error("connect", e) + return False + + def disconnect(self): + """Close Oracle database connection.""" + if self.connection: + try: + self.connection.close() + logger.info("Disconnected from Oracle database") + except Exception as e: + self.log_error("disconnect", e) + finally: + self.connection = None + + def create_tables(self) -> bool: + """Create Oracle database tables.""" + if not self.is_connected(): + return False + + try: + cursor = self.connection.cursor() + + # Create sequence for weather_data id + seq_sql = "CREATE SEQUENCE weather_data_seq START WITH 1 INCREMENT BY 1" + cursor.execute(seq_sql) + + # Create weather_data table + create_sql = """ + CREATE TABLE weather_data ( + id NUMBER PRIMARY KEY, + timestamp TIMESTAMP(6) NOT NULL, + topic VARCHAR2(255) NOT NULL, + location VARCHAR2(100), + temperature NUMBER(5,2), + humidity NUMBER(5,2), + pressure NUMBER(7,2), + wind_speed NUMBER(5,2), + wind_direction NUMBER(5,2), + rainfall NUMBER(6,2), + created_at TIMESTAMP(6) DEFAULT CURRENT_TIMESTAMP + ) + """ + + cursor.execute(create_sql) + + # Create raw_data table + raw_create_sql = """ + CREATE TABLE raw_data ( + weather_data_id NUMBER PRIMARY KEY, + raw_data CLOB, + CONSTRAINT fk_raw_data_weather_data + FOREIGN KEY (weather_data_id) REFERENCES weather_data (id) ON DELETE CASCADE, + CONSTRAINT chk_raw_data_json CHECK (raw_data IS JSON) + ) + """ + + cursor.execute(raw_create_sql) + + # Create index on timestamp and topic + index_sql = "CREATE INDEX idx_weather_data_ts_topic ON weather_data (timestamp, topic)" + cursor.execute(index_sql) + + self.connection.commit() + cursor.close() + + logger.info("Successfully created Oracle tables") + return True + + except oracledb.DatabaseError as e: + error, = e.args + if error.code == 955: # Table/sequence already exists + logger.info("Oracle tables already exist") + return True + else: + self.log_error("create_tables", e) + return False + except Exception as e: + self.log_error("create_tables", e) + return False + + def insert_weather_data(self, data: WeatherData) -> bool: + """Insert weather data into Oracle database.""" + if not self.is_connected(): + return False + + try: + cursor = self.connection.cursor() + + table_name = "weather_data" + seq_name = "weather_data_seq" + raw_table_name = "raw_data" + + # Insert into weather_data first + insert_sql = f""" + INSERT INTO {table_name} + (id, timestamp, topic, location, temperature, humidity, pressure, + wind_speed, wind_direction, rainfall) + VALUES ({seq_name}.NEXTVAL, :timestamp, :topic, :location, :temperature, + :humidity, :pressure, :wind_speed, :wind_direction, :rainfall) + """ + + cursor.execute(insert_sql, { + 'timestamp': data.timestamp, + 'topic': data.topic, + 'location': data.location, + 'temperature': data.temperature, + 'humidity': data.humidity, + 'pressure': data.pressure, + 'wind_speed': data.wind_speed, + 'wind_direction': data.wind_direction, + 'rainfall': data.rainfall + }) + + # Get the inserted ID + cursor.execute(f"SELECT {seq_name}.CURRVAL FROM DUAL") + weather_id = cursor.fetchone()[0] + + # Insert into raw_data if raw_data is provided + if data.raw_data is not None: + raw_insert_sql = f""" + INSERT INTO {raw_table_name} (weather_data_id, raw_data) + VALUES (:1, :2) + """ + cursor.execute(raw_insert_sql, (weather_id, data.raw_data)) + + self.connection.commit() + cursor.close() + + logger.debug("Successfully inserted weather data into Oracle", topic=data.topic) + return True + + except Exception as e: + self.log_error("insert_weather_data", e) + return False + + def insert_weather_data_batch(self, data_list: List[WeatherData]) -> bool: + """Insert multiple weather data records into Oracle database.""" + if not self.is_connected() or not data_list: + return False + + try: + cursor = self.connection.cursor() + + table_name = "weather_data" + seq_name = "weather_data_seq" + raw_table_name = "raw_data" + + weather_insert_sql = f""" + INSERT INTO {table_name} + (id, timestamp, topic, location, temperature, humidity, pressure, + wind_speed, wind_direction, rainfall) + VALUES ({seq_name}.NEXTVAL, :timestamp, :topic, :location, :temperature, + :humidity, :pressure, :wind_speed, :wind_direction, :rainfall) + """ + + raw_insert_sql = f""" + INSERT INTO {raw_table_name} (weather_data_id, raw_data) + VALUES (:id, :raw_data) + """ + + for data in data_list: + cursor.execute(weather_insert_sql, { + 'timestamp': data.timestamp, + 'topic': data.topic, + 'location': data.location, + 'temperature': data.temperature, + 'humidity': data.humidity, + 'pressure': data.pressure, + 'wind_speed': data.wind_speed, + 'wind_direction': data.wind_direction, + 'rainfall': data.rainfall + }) + + # Get the inserted ID + cursor.execute(f"SELECT {seq_name}.CURRVAL FROM DUAL") + weather_id = cursor.fetchone()[0] + + if data.raw_data is not None: + cursor.execute(raw_insert_sql, {'id': weather_id, 'raw_data': data.raw_data}) + + self.connection.commit() + cursor.close() + + logger.info("Successfully inserted weather data batch into Oracle", count=len(data_list)) + return True + + except Exception as e: + self.log_error("insert_weather_data_batch", e) + return False + + def is_connected(self) -> bool: + """Check if Oracle database connection is active.""" + if not self.connection: + return False + + try: + cursor = self.connection.cursor() + cursor.execute("SELECT 1 FROM DUAL") + cursor.close() + return True + except: + return False \ No newline at end of file diff --git a/src/wxconnect/database/postgresql_db.py b/src/wxconnect/database/postgresql_db.py new file mode 100644 index 0000000..ce292cd --- /dev/null +++ b/src/wxconnect/database/postgresql_db.py @@ -0,0 +1,194 @@ +"""PostgreSQL database implementation.""" + +import psycopg2 +from typing import Dict, List, Any +import structlog +from .base import DatabaseInterface, WeatherData + +logger = structlog.get_logger(__name__) + + +class PostgreSQLDatabase(DatabaseInterface): + """PostgreSQL database implementation.""" + + def __init__(self, config: Dict[str, Any]): + """Initialize PostgreSQL database connection.""" + super().__init__(config) + + def connect(self) -> bool: + """Establish PostgreSQL database connection.""" + try: + logger.info("Connecting to PostgreSQL database", host=self.config['host'], port=self.config['port']) + + self.connection = psycopg2.connect( + host=self.config['host'], + port=self.config['port'], + database=self.config['name'], + user=self.config['username'], + password=self.config['password'] + ) + + self.connection.autocommit = False + + # Test the connection + cursor = self.connection.cursor() + cursor.execute("SELECT 1") + cursor.close() + + logger.info("Successfully connected to PostgreSQL database") + return True + + except Exception as e: + self.log_error("connect", e) + return False + + def disconnect(self): + """Close PostgreSQL database connection.""" + if self.connection: + try: + self.connection.close() + logger.info("Disconnected from PostgreSQL database") + except Exception as e: + self.log_error("disconnect", e) + finally: + self.connection = None + + def create_tables(self) -> bool: + """Create PostgreSQL database tables.""" + if not self.is_connected(): + return False + + try: + cursor = self.connection.cursor() + + # Create weather_data table + table_name = self.get_table_name("weather_data") + create_sql = f""" + CREATE TABLE IF NOT EXISTS {table_name} ( + id BIGSERIAL PRIMARY KEY, + timestamp TIMESTAMP(6) NOT NULL, + topic VARCHAR(255) NOT NULL, + location VARCHAR(100), + temperature DECIMAL(5,2), + humidity DECIMAL(5,2), + pressure DECIMAL(7,2), + wind_speed DECIMAL(5,2), + wind_direction DECIMAL(5,2), + rainfall DECIMAL(6,2), + raw_data TEXT, + created_at TIMESTAMP(6) DEFAULT CURRENT_TIMESTAMP + ) + """ + + cursor.execute(create_sql) + + # Create index on timestamp and topic + index_name = f"idx_{table_name.replace('.', '_')}_ts_topic" + index_sql = f"CREATE INDEX IF NOT EXISTS {index_name} ON {table_name} (timestamp, topic)" + cursor.execute(index_sql) + + self.connection.commit() + cursor.close() + + logger.info("Successfully created PostgreSQL tables", table=table_name) + return True + + except Exception as e: + self.log_error("create_tables", e) + return False + + def insert_weather_data(self, data: WeatherData) -> bool: + """Insert weather data into PostgreSQL database.""" + if not self.is_connected(): + return False + + try: + cursor = self.connection.cursor() + + table_name = self.get_table_name("weather_data") + insert_sql = f""" + INSERT INTO {table_name} + (timestamp, topic, location, temperature, humidity, pressure, + wind_speed, wind_direction, rainfall, raw_data) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + """ + + cursor.execute(insert_sql, ( + data.timestamp, + data.topic, + data.location, + data.temperature, + data.humidity, + data.pressure, + data.wind_speed, + data.wind_direction, + data.rainfall, + data.raw_data + )) + + self.connection.commit() + cursor.close() + + logger.debug("Successfully inserted weather data into PostgreSQL", topic=data.topic) + return True + + except Exception as e: + self.log_error("insert_weather_data", e) + return False + + def insert_weather_data_batch(self, data_list: List[WeatherData]) -> bool: + """Insert multiple weather data records into PostgreSQL database.""" + if not self.is_connected() or not data_list: + return False + + try: + cursor = self.connection.cursor() + + table_name = self.get_table_name("weather_data") + insert_sql = f""" + INSERT INTO {table_name} + (timestamp, topic, location, temperature, humidity, pressure, + wind_speed, wind_direction, rainfall, raw_data) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + """ + + # Prepare data for batch insert + batch_data = [ + ( + data.timestamp, + data.topic, + data.location, + data.temperature, + data.humidity, + data.pressure, + data.wind_speed, + data.wind_direction, + data.rainfall, + data.raw_data + ) + for data in data_list + ] + + cursor.executemany(insert_sql, batch_data) + self.connection.commit() + cursor.close() + + logger.info("Successfully inserted weather data batch into PostgreSQL", count=len(data_list)) + return True + + except Exception as e: + self.log_error("insert_weather_data_batch", e) + return False + + def is_connected(self) -> bool: + """Check if PostgreSQL database connection is active.""" + if not self.connection: + return False + + try: + cursor = self.connection.cursor() + cursor.execute("SELECT 1") + cursor.close() + return True + except: + return False \ No newline at end of file diff --git a/src/wxconnect/main.py b/src/wxconnect/main.py new file mode 100644 index 0000000..c5296ef --- /dev/null +++ b/src/wxconnect/main.py @@ -0,0 +1,286 @@ +"""Main application entry point.""" + +import sys +import signal +import time +from queue import Queue, Empty +from threading import Thread, Event +import structlog +from .config import Config +from .mqtt_client import MQTTClient +from .data_processor import WeatherDataProcessor +from .database.factory import DatabaseFactory + +# Configure structured logging +structlog.configure( + processors=[ + structlog.stdlib.filter_by_level, + structlog.stdlib.add_logger_name, + structlog.stdlib.add_log_level, + structlog.processors.TimeStamper(fmt="iso"), + structlog.processors.StackInfoRenderer(), + structlog.processors.format_exc_info, + structlog.processors.JSONRenderer() + ], + context_class=dict, + logger_factory=structlog.stdlib.LoggerFactory(), + cache_logger_on_first_use=True, +) + +logger = structlog.get_logger(__name__) + + +class WeatherConnectApp: + """Main application class for Weather Connect.""" + + def __init__(self): + """Initialize the application.""" + self.config = None + self.mqtt_client = None + self.database = None + self.data_processor = WeatherDataProcessor() + self.message_queue = Queue() + self.shutdown_event = Event() + self.database_thread = None + + def load_config(self, config_file: str = ".env") -> bool: + """Load application configuration. + + Args: + config_file: Path to configuration file + + Returns: + True if configuration loaded successfully + """ + try: + self.config = Config(config_file) + + if not self.config.validate(): + logger.error("Configuration validation failed") + return False + + # Set log level + import logging + log_level = getattr(logging, self.config.app.log_level.upper(), logging.INFO) + logging.basicConfig(level=log_level) + + logger.info("Configuration loaded successfully", + mqtt_host=self.config.mqtt.host, + db_type=self.config.database.db_type) + return True + + except Exception as e: + logger.error("Failed to load configuration", error=str(e)) + return False + + def setup_database(self): + """Initialize database connection with retry logic.""" + logger.info("Setting up database connection") + + max_retries = 10 + retry_delay = 30 # 30 seconds between retries + + for attempt in range(max_retries): + try: + import dataclasses + config_dict = dataclasses.asdict(self.config.database) + self.db = DatabaseFactory.create_database( + self.config.database.db_type, + config_dict + ) + if self.db.connect(): + logger.info("Database connection established") + self.db.create_tables() + logger.info("Database tables ready") + self.database = self.db + return True + else: + raise Exception("Failed to connect to database") + except Exception as e: + logger.warning(f"Database connection attempt {attempt + 1}/{max_retries} failed", + error=str(e)) + if attempt < max_retries - 1: + logger.info(f"Retrying in {retry_delay} seconds...") + time.sleep(retry_delay) + else: + logger.error("All database connection attempts exhausted") + return False + + def initialize_mqtt(self) -> bool: + """Initialize MQTT client. + + Returns: + True if MQTT client initialized successfully + """ + try: + self.mqtt_client = MQTTClient( + self.config.mqtt, + self._on_mqtt_message + ) + + logger.info("MQTT client initialized successfully") + return True + + except Exception as e: + logger.error("Failed to initialize MQTT client", error=str(e)) + return False + + def _on_mqtt_message(self, topic: str, data: any): + """Handle incoming MQTT messages. + + Args: + topic: MQTT topic + data: Message data + """ + try: + # Put message in queue for processing + self.message_queue.put((topic, data), block=False) + + except Exception as e: + logger.error("Failed to queue MQTT message", topic=topic, error=str(e)) + + def _database_worker(self): + """Database worker thread for processing messages.""" + batch_data = [] + last_batch_time = time.time() + + logger.info("Database worker started") + + while not self.shutdown_event.is_set(): + try: + # Try to get a message from the queue + try: + topic, data = self.message_queue.get(timeout=1.0) + except Empty: + # Check if we should flush the batch due to timeout + if batch_data and (time.time() - last_batch_time) > self.config.app.batch_timeout: + self._flush_batch(batch_data) + batch_data = [] + last_batch_time = time.time() + continue + + # Process the message + weather_data = self.data_processor.process_message(topic, data) + if weather_data: + batch_data.append(weather_data) + + # Check if batch is full + if len(batch_data) >= self.config.app.batch_size: + self._flush_batch(batch_data) + batch_data = [] + last_batch_time = time.time() + + self.message_queue.task_done() + + except Exception as e: + logger.error("Error in database worker", error=str(e)) + time.sleep(1) + + # Flush remaining data on shutdown + if batch_data: + self._flush_batch(batch_data) + + logger.info("Database worker stopped") + + def _flush_batch(self, batch_data): + """Flush batch data to database. + + Args: + batch_data: List of WeatherData objects + """ + if not batch_data or not self.database: + return + + try: + if len(batch_data) == 1: + success = self.database.insert_weather_data(batch_data[0]) + else: + success = self.database.insert_weather_data_batch(batch_data) + + if success: + logger.info("Successfully inserted weather data batch", count=len(batch_data)) + else: + logger.error("Failed to insert weather data batch", count=len(batch_data)) + + except Exception as e: + logger.error("Error flushing batch to database", error=str(e), count=len(batch_data)) + + def run(self): + """Run the main application.""" + logger.info("Starting Weather Connect application") + + # Set up signal handlers + signal.signal(signal.SIGINT, self._signal_handler) + signal.signal(signal.SIGTERM, self._signal_handler) + + try: + # Load configuration + if not self.load_config(): + return 1 + + # Initialize database + if not self.setup_database(): + return 1 + + # Initialize MQTT + if not self.initialize_mqtt(): + return 1 + + # Start database worker thread + self.database_thread = Thread(target=self._database_worker, daemon=True) + self.database_thread.start() + + # Start MQTT client (blocking) + logger.info("Starting MQTT client loop") + self.mqtt_client.start_loop(self.config.app.reconnect_interval) + + except KeyboardInterrupt: + logger.info("Received keyboard interrupt") + except Exception as e: + logger.error("Application error", error=str(e)) + return 1 + finally: + self.shutdown() + + logger.info("Weather Connect application stopped") + return 0 + + def _signal_handler(self, signum, frame): + """Handle shutdown signals. + + Args: + signum: Signal number + frame: Current stack frame + """ + logger.info("Received shutdown signal", signal=signum) + self.shutdown() + + def shutdown(self): + """Shutdown the application gracefully.""" + logger.info("Shutting down application") + + # Set shutdown event + self.shutdown_event.set() + + # Stop MQTT client + if self.mqtt_client: + self.mqtt_client.stop() + + # Wait for database thread to finish + if self.database_thread and self.database_thread.is_alive(): + logger.info("Waiting for database worker to finish") + self.database_thread.join(timeout=5) + + # Close database connection + if self.database: + self.database.disconnect() + + +def main(): + """Main entry point.""" + app = WeatherConnectApp() + return app.run() + + +if __name__ == "__main__": + sys.exit(main()) \ No newline at end of file diff --git a/src/wxconnect/mqtt_client.py b/src/wxconnect/mqtt_client.py new file mode 100644 index 0000000..9e29daf --- /dev/null +++ b/src/wxconnect/mqtt_client.py @@ -0,0 +1,182 @@ +"""MQTT client for weather data subscription.""" + +import json +import time +from typing import Callable, Any, Optional +import paho.mqtt.client as mqtt +import structlog +from .config import MQTTConfig + +logger = structlog.get_logger(__name__) + + +class MQTTClient: + """MQTT client for subscribing to weather data.""" + + def __init__(self, config: MQTTConfig, message_callback: Callable[[str, Any], None]): + """Initialize MQTT client. + + Args: + config: MQTT configuration + message_callback: Callback function for processing messages + """ + self.config = config + self.message_callback = message_callback + self.client = mqtt.Client(client_id=config.client_id) + self.connected = False + self.running = False + + # Set up callbacks + self.client.on_connect = self._on_connect + self.client.on_disconnect = self._on_disconnect + self.client.on_message = self._on_message + self.client.on_subscribe = self._on_subscribe + self.client.on_log = self._on_log + + # Set credentials if provided + if config.username and config.password: + self.client.username_pw_set(config.username, config.password) + + def _on_connect(self, client, userdata, flags, rc): + """Callback for when the client connects to the broker.""" + if rc == 0: + self.connected = True + logger.info("Connected to MQTT broker", host=self.config.host, port=self.config.port) + + # Subscribe to the weather topic + result = client.subscribe(self.config.topic) + if result[0] == mqtt.MQTT_ERR_SUCCESS: + logger.info("Subscribed to topic", topic=self.config.topic) + else: + logger.error("Failed to subscribe to topic", topic=self.config.topic, error=result[0]) + else: + self.connected = False + logger.error("Failed to connect to MQTT broker", rc=rc, host=self.config.host) + + def _on_disconnect(self, client, userdata, rc): + """Callback for when the client disconnects from the broker.""" + self.connected = False + if rc != 0: + logger.warning("Unexpected disconnection from MQTT broker", rc=rc) + else: + logger.info("Disconnected from MQTT broker") + + def _on_subscribe(self, client, userdata, mid, granted_qos): + """Callback for successful subscription.""" + logger.info("*** SUBSCRIPTION ACKNOWLEDGED ***", mid=mid, granted_qos=granted_qos) + + def _on_message(self, client, userdata, msg): + """Callback for when a message is received.""" + try: + topic = msg.topic + payload = msg.payload.decode('utf-8') + + logger.info("*** RECEIVED MQTT MESSAGE ***", topic=topic, payload_size=len(payload)) + + # Try to parse as JSON, fall back to string if it fails + try: + data = json.loads(payload) + except json.JSONDecodeError: + logger.warning("Failed to parse message as JSON, treating as string", topic=topic) + data = payload + + # Call the message callback + self.message_callback(topic, data) + + except Exception as e: + logger.error("Error processing MQTT message", error=str(e), topic=msg.topic) + + def _on_log(self, client, userdata, level, buf): + """Callback for MQTT client logging.""" + if level == mqtt.MQTT_LOG_ERR: + logger.error("MQTT client error", message=buf) + elif level == mqtt.MQTT_LOG_WARNING: + logger.warning("MQTT client warning", message=buf) + else: + logger.debug("MQTT client log", level=level, message=buf) + + def connect(self) -> bool: + """Connect to the MQTT broker. + + Returns: + True if connection successful, False otherwise + """ + try: + logger.info("Connecting to MQTT broker", host=self.config.host, port=self.config.port) + result = self.client.connect(self.config.host, self.config.port, 60) + + if result == mqtt.MQTT_ERR_SUCCESS: + return True + else: + logger.error("Failed to connect to MQTT broker", error=result) + return False + + except Exception as e: + logger.error("Exception while connecting to MQTT broker", error=str(e)) + return False + + def disconnect(self): + """Disconnect from the MQTT broker.""" + if self.connected: + logger.info("Disconnecting from MQTT broker") + self.client.disconnect() + + def stop(self): + """Stop the MQTT client loop.""" + logger.info("Stopping MQTT client") + self.running = False + if self.connected: + self.client.disconnect() + # Stop the MQTT loop + try: + self.client.loop_stop() + except: + pass # Ignore errors if loop is already stopped + + def start_loop(self, reconnect_interval: int = 30): + """Start the MQTT client loop with reconnection logic. + + Args: + reconnect_interval: Seconds to wait before attempting reconnection + """ + import threading + + def mqtt_loop(): + """Run the MQTT loop in a separate thread.""" + self.running = True + while self.running: + try: + if not self.connected: + if self.connect(): + # Wait a bit for connection to establish + time.sleep(1) + else: + logger.warning("Failed to connect, retrying in {} seconds".format(reconnect_interval)) + time.sleep(reconnect_interval) + continue + + # Run the MQTT loop (blocking) + logger.info("Starting MQTT loop_forever") + self.client.loop_forever() + + # If we get here, the loop exited + if self.running: # Only log if we're still supposed to be running + logger.warning("MQTT loop exited, attempting to reconnect in {} seconds".format(reconnect_interval)) + time.sleep(reconnect_interval) + + except Exception as e: + if self.running: # Only log if we're still supposed to be running + logger.error("Unexpected error in MQTT loop", error=str(e)) + time.sleep(reconnect_interval) + + # Start the MQTT loop in a separate thread + self.mqtt_thread = threading.Thread(target=mqtt_loop, daemon=True) + self.mqtt_thread.start() + + # Keep the main thread alive until stopped + while self.running: + time.sleep(0.1) + + def is_connected(self) -> bool: + """Check if the client is connected to the broker.""" + return self.connected \ No newline at end of file