Initial commit

This commit is contained in:
2025-11-10 06:15:34 -05:00
parent b1172a3490
commit 4eccfe2ce8
19 changed files with 2309 additions and 0 deletions

30
.env.example Normal file
View File

@@ -0,0 +1,30 @@
# MQTT Configuration
MQTT_HOST=ikarus.egfh.internal
MQTT_PORT=1883
MQTT_USERNAME=
MQTT_PASSWORD=
MQTT_TOPIC=weather/+
MQTT_CLIENT_ID=wxconnect
# Database Configuration
DB_TYPE=oracle # oracle, mssql, postgresql, mysql
DB_HOST=localhost
DB_PORT=1521
DB_NAME=xe
DB_USERNAME=weather
DB_PASSWORD=password
DB_SCHEMA=weather
# Oracle specific (if using Oracle)
ORACLE_SID=xe
# MS SQL specific (if using MS SQL)
MSSQL_DRIVER=ODBC Driver 17 for SQL Server
# Logging
LOG_LEVEL=INFO
# Application Settings
RECONNECT_INTERVAL=30
BATCH_SIZE=10
BATCH_TIMEOUT=60

14
.gitignore vendored Normal file
View File

@@ -0,0 +1,14 @@
.env
__pycache__/
*.pyc
*.pyo
*.pyd
.Python
*.so
.coverage
.pytest_cache/
.vscode/
.idea/
*.log
logs/
data/

60
Dockerfile Normal file
View File

@@ -0,0 +1,60 @@
FROM python:3.11-slim
# Set environment variables
ENV PYTHONPATH=/app/src
ENV PYTHONUNBUFFERED=1
# Install system dependencies
RUN apt-get update && apt-get install -y \
gcc \
g++ \
unixodbc-dev \
curl \
unzip \
libaio-dev \
&& rm -rf /var/lib/apt/lists/*
# Install Oracle Instant Client for cx_Oracle
RUN mkdir -p /opt/oracle && \
cd /opt/oracle && \
curl -o instantclient-basic-linux.x64-21.9.0.0.0dbru.zip \
https://download.oracle.com/otn_software/linux/instantclient/219000/instantclient-basic-linux.x64-21.9.0.0.0dbru.zip && \
unzip instantclient-basic-linux.x64-21.9.0.0.0dbru.zip && \
rm instantclient-basic-linux.x64-21.9.0.0.0dbru.zip && \
cd instantclient_21_9 && \
mkdir -p lib && \
ln -s ../libclntsh.so.21.1 lib/libclntsh.so && \
ln -s ../libnnz21.so lib/libnnz21.so && \
ln -s ../libociicus.so lib/libociicus.so && \
echo /opt/oracle/instantclient_21_9 > /etc/ld.so.conf.d/oracle-instantclient.conf && \
ldconfig
# Set Oracle environment variables
ENV ORACLE_HOME=/opt/oracle/instantclient_21_9
ENV LD_LIBRARY_PATH=/opt/oracle/instantclient_21_9:$LD_LIBRARY_PATH
ENV PATH=/opt/oracle/instantclient_21_9:$PATH
# Create app directory
WORKDIR /app
# Copy requirements and install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY src/ ./src/
# Create logs directory
RUN mkdir -p /app/logs
# Create non-root user
RUN groupadd -r wxconnect && useradd -r -g wxconnect wxconnect
RUN chown -R wxconnect:wxconnect /app
USER wxconnect
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=60s --retries=3 \
CMD python -c "import sys; sys.path.insert(0, '/app/src'); from wxconnect.config import Config; c = Config(); print('OK' if c.validate() else 'FAIL')" || exit 1
# Entry point
CMD ["python", "-m", "wxconnect.main"]

102
docker-compose.yml Normal file
View File

@@ -0,0 +1,102 @@
version: '3.8'
services:
wxconnect:
build: .
container_name: wxconnect
restart: unless-stopped
environment:
# MQTT Configuration
MQTT_HOST: ikarus.egfh.internal
MQTT_PORT: 1883
MQTT_TOPIC: weather/loop
MQTT_CLIENT_ID: wxconnect-docker
# Database Configuration - Oracle on pve-ora19c-1
DB_TYPE: oracle
DB_HOST: pve-ora19c-1
DB_PORT: 1521
DB_NAME: shed.pattinson.org
DB_USERNAME: C##WEATHER
DB_PASSWORD: weather123
DB_SCHEMA: C##WEATHER
# Oracle specific
ORACLE_SID: shed.pattinson.org
# Logging
LOG_LEVEL: DEBUG
# Application Settings
RECONNECT_INTERVAL: 30
BATCH_SIZE: 1
BATCH_TIMEOUT: 60
volumes:
- ./logs:/app/logs
- ./src:/app/src
- ./.env:/app/.env:ro # Mount environment file if it exists
networks:
- wxconnect-network
# Remove dependency on local Oracle container since using external Oracle
# depends_on:
# - oracle-db
# Example Oracle database service (commented out - using external Oracle)
# oracle-db:
# image: gvenzl/oracle-xe:21-slim
# container_name: oracle-wxconnect
# restart: unless-stopped
# environment:
# ORACLE_PASSWORD: password
# APP_USER: weather
# APP_USER_PASSWORD: password
# ports:
# - "1521:1521"
# volumes:
# - oracle_data:/opt/oracle/oradata
# networks:
# - wxconnect-network
# Example PostgreSQL database service (alternative to Oracle)
# postgres-db:
# image: postgres:15
# container_name: postgres-wxconnect
# restart: unless-stopped
# environment:
# POSTGRES_DB: weather
# POSTGRES_USER: weather
# POSTGRES_PASSWORD: password
# ports:
# - "5432:5432"
# volumes:
# - postgres_data:/var/lib/postgresql/data
# networks:
# - wxconnect-network
# Example MS SQL Server database service (alternative)
# mssql-db:
# image: mcr.microsoft.com/mssql/server:2022-latest
# container_name: mssql-wxconnect
# restart: unless-stopped
# environment:
# SA_PASSWORD: "YourStrong@Passw0rd"
# ACCEPT_EULA: "Y"
# MSSQL_DB: weather
# ports:
# - "1433:1433"
# volumes:
# - mssql_data:/var/opt/mssql
# networks:
# - wxconnect-network
volumes:
oracle_data:
# postgres_data:
# mssql_data:
networks:
wxconnect-network:
driver: bridge

View File

@@ -0,0 +1,45 @@
-- Oracle Common User Setup for Weather Connect
-- Run this as SYSTEM or SYSDBA user
-- This creates a common user that works across all containers
-- 1. Check current environment
COLUMN is_cdb FORMAT A10
SELECT DECODE(cdb, 'YES', 'CDB', 'Regular') as is_cdb FROM v$database;
SHOW CON_NAME;
-- 2. Create common user (prefixed with C##)
CREATE USER C##weather IDENTIFIED BY weather123
DEFAULT TABLESPACE USERS
TEMPORARY TABLESPACE TEMP
QUOTA UNLIMITED ON USERS
CONTAINER=ALL;
-- 3. Grant necessary privileges
GRANT CONNECT TO C##weather CONTAINER=ALL;
GRANT RESOURCE TO C##weather CONTAINER=ALL;
GRANT CREATE SESSION TO C##weather CONTAINER=ALL;
GRANT CREATE TABLE TO C##weather CONTAINER=ALL;
GRANT CREATE SEQUENCE TO C##weather CONTAINER=ALL;
GRANT CREATE VIEW TO C##weather CONTAINER=ALL;
-- Note: CREATE INDEX privilege is included in RESOURCE role
-- GRANT CREATE INDEX TO C##weather CONTAINER=ALL; -- This is redundant with RESOURCE
-- Optional: Grant additional privileges if needed
-- GRANT CREATE PROCEDURE TO C##weather CONTAINER=ALL;
-- GRANT CREATE TRIGGER TO C##weather CONTAINER=ALL;
COMMIT;
-- 4. Verify user creation
SELECT username, common, default_tablespace, temporary_tablespace, account_status
FROM dba_users
WHERE username = 'C##WEATHER';
-- 5. Display granted privileges
SELECT grantee, privilege
FROM dba_sys_privs
WHERE grantee = 'C##WEATHER'
ORDER BY privilege;
-- 6. Test connection as common user
-- CONNECT C##weather/weather123@pve-ora19c-1:1521/SHED;

View File

@@ -0,0 +1,52 @@
-- Oracle CDB/PDB Setup for Weather Connect
-- Run this as SYSTEM or SYSDBA user
-- 1. Check the current environment
COLUMN is_cdb FORMAT A10
COLUMN con_name FORMAT A20
SELECT DECODE(cdb, 'YES', 'CDB', 'Regular') as is_cdb FROM v$database;
SHOW CON_NAME;
-- 2. Show available PDBs (if in CDB)
SELECT name, open_mode FROM v$pdbs;
-- 3. Connect to the SHED PDB (assuming SHED is your PDB name)
-- If SHED is the PDB name, connect to it:
ALTER SESSION SET CONTAINER = SHDPDB1;
SHOW CON_NAME;
-- 4. Now create the regular user in the PDB
CREATE USER weather IDENTIFIED BY weather123
DEFAULT TABLESPACE USERS
TEMPORARY TABLESPACE TEMP
QUOTA UNLIMITED ON USERS;
-- 5. Grant necessary privileges
GRANT CONNECT TO weather;
GRANT RESOURCE TO weather;
GRANT CREATE SESSION TO weather;
GRANT CREATE TABLE TO weather;
GRANT CREATE SEQUENCE TO weather;
GRANT CREATE VIEW TO weather;
-- Note: CREATE INDEX privilege is included in RESOURCE role
-- GRANT CREATE INDEX TO weather; -- This is redundant with RESOURCE
-- Optional: Grant additional privileges if needed
-- GRANT CREATE PROCEDURE TO weather;
-- GRANT CREATE TRIGGER TO weather;
COMMIT;
-- 6. Verify user creation
SELECT username, default_tablespace, temporary_tablespace, account_status
FROM dba_users
WHERE username = 'WEATHER';
-- 7. Display granted privileges
SELECT grantee, privilege
FROM dba_sys_privs
WHERE grantee = 'WEATHER'
ORDER BY privilege;
-- 8. Test connection as weather user
-- CONNECT weather/weather123@pve-ora19c-1:1521/SHDPDB1;

10
requirements.txt Normal file
View File

@@ -0,0 +1,10 @@
paho-mqtt==1.6.1
oracledb==2.0.1
pyodbc==4.0.39
psycopg2-binary==2.9.7
PyMySQL==1.1.0
sqlalchemy==2.0.23
python-dotenv==1.0.0
pydantic==2.5.0
PyYAML==6.0.1
structlog==23.2.0

View File

@@ -0,0 +1,5 @@
"""Weather Connect - MQTT to Database Bridge"""
__version__ = "1.0.0"
__author__ = "Your Name"
__description__ = "A Docker-based Python application that subscribes to MQTT weather data and inserts it into various databases."

86
src/wxconnect/config.py Normal file
View File

@@ -0,0 +1,86 @@
"""Configuration management for Weather Connect."""
import os
from typing import Optional
from dataclasses import dataclass
from dotenv import load_dotenv
@dataclass
class MQTTConfig:
"""MQTT connection configuration."""
host: str
port: int
username: Optional[str]
password: Optional[str]
topic: str
client_id: str
@dataclass
class DatabaseConfig:
"""Database connection configuration."""
db_type: str
host: str
port: int
name: str
username: str
password: str
schema: Optional[str] = None
oracle_sid: Optional[str] = None
mssql_driver: Optional[str] = None
@dataclass
class AppConfig:
"""Application configuration."""
log_level: str
reconnect_interval: int
batch_size: int
batch_timeout: int
class Config:
"""Main configuration class."""
def __init__(self, env_file: str = ".env"):
"""Initialize configuration from environment variables."""
load_dotenv(env_file)
self.mqtt = MQTTConfig(
host=os.getenv("MQTT_HOST", "localhost"),
port=int(os.getenv("MQTT_PORT", "1883")),
username=os.getenv("MQTT_USERNAME"),
password=os.getenv("MQTT_PASSWORD"),
topic=os.getenv("MQTT_TOPIC", "weather/+"),
client_id=os.getenv("MQTT_CLIENT_ID", "wxconnect")
)
self.database = DatabaseConfig(
db_type=os.getenv("DB_TYPE", "oracle"),
host=os.getenv("DB_HOST", "localhost"),
port=int(os.getenv("DB_PORT", "1521")),
name=os.getenv("DB_NAME", "xe"),
username=os.getenv("DB_USERNAME", "weather"),
password=os.getenv("DB_PASSWORD", "password"),
schema=os.getenv("DB_SCHEMA"),
oracle_sid=os.getenv("ORACLE_SID"),
mssql_driver=os.getenv("MSSQL_DRIVER", "ODBC Driver 17 for SQL Server")
)
self.app = AppConfig(
log_level=os.getenv("LOG_LEVEL", "INFO"),
reconnect_interval=int(os.getenv("RECONNECT_INTERVAL", "30")),
batch_size=int(os.getenv("BATCH_SIZE", "10")),
batch_timeout=int(os.getenv("BATCH_TIMEOUT", "60"))
)
def validate(self) -> bool:
"""Validate configuration."""
if not self.mqtt.host:
return False
if not self.database.host:
return False
if self.database.db_type not in ["oracle", "mssql", "postgresql", "mysql"]:
return False
return True

View File

@@ -0,0 +1,417 @@
"""Data processing logic for weather data."""
import json
import re
from datetime import datetime
from typing import Any, Dict, Optional
import structlog
from .database.base import WeatherData
logger = structlog.get_logger(__name__)
class WeatherDataProcessor:
"""Processes MQTT weather data and converts it to database format."""
def __init__(self):
"""Initialize the weather data processor."""
self.field_mappings = {
# Common field name variations
'temp': 'temperature',
'temperature_c': 'temperature',
'temperature_f': 'temperature',
'temp_c': 'temperature',
'temp_f': 'temperature',
'outTemp_C': 'temperature',
'humid': 'humidity',
'humidity_pct': 'humidity',
'rh': 'humidity',
'press': 'pressure',
'pressure_hpa': 'pressure',
'pressure_mbar': 'pressure',
'barometric_pressure': 'pressure',
'wind_speed_ms': 'wind_speed',
'wind_speed_kmh': 'wind_speed',
'wind_speed_mph': 'wind_speed',
'wind_speed_knot': 'wind_speed',
'windSpeed_knot': 'wind_speed',
'windspeed': 'wind_speed',
'wind_dir': 'wind_direction',
'wind_dir': 'wind_direction',
'windDir': 'wind_direction',
'wind_direction_deg': 'wind_direction',
'winddirection': 'wind_direction',
'rain': 'rainfall',
'rainfall_mm': 'rainfall',
'precipitation': 'rainfall',
'precip': 'rainfall',
'rain_cm': 'rainfall',
'hourRain_cm': 'rainfall',
'rain24_cm': 'rainfall',
'dayRain_cm': 'rainfall',
'rainRate_cm_per_hour': 'rainfall',
'maxSolarRad_Wpm2': 'solar_radiation',
# Note: heatingTemp_hertz, heatingVoltage_hertz, and hail_hertz are frequency readings, not temperature
}
def process_message(self, topic: str, data: Any) -> Optional[WeatherData]:
"""Process an MQTT message and convert to WeatherData.
Args:
topic: MQTT topic
data: Message payload (dict or string)
Returns:
WeatherData object or None if processing fails
"""
try:
# Convert data to dict if it's a string
if isinstance(data, str):
try:
data = json.loads(data)
except json.JSONDecodeError:
logger.warning("Failed to parse string data as JSON", topic=topic, data=data[:100])
# Create minimal record with raw data
return WeatherData(
timestamp=datetime.now(),
topic=topic,
raw_data=data
)
if not isinstance(data, dict):
logger.warning("Data is not a dictionary", topic=topic, data_type=type(data))
return WeatherData(
timestamp=datetime.now(),
topic=topic,
raw_data=str(data)
)
# Extract timestamp
timestamp = self._extract_timestamp(data)
# Extract location from topic or data
location = self._extract_location(topic, data)
# Extract weather fields
weather_fields = self._extract_weather_fields(data)
# Create WeatherData object
weather_data = WeatherData(
timestamp=timestamp,
topic=topic,
location=location,
temperature=weather_fields.get('temperature'),
humidity=weather_fields.get('humidity'),
pressure=weather_fields.get('pressure'),
wind_speed=weather_fields.get('wind_speed'),
wind_direction=weather_fields.get('wind_direction'),
rainfall=weather_fields.get('rainfall'),
raw_data=json.dumps(data) if isinstance(data, dict) else str(data)
)
logger.debug("Processed weather data", topic=topic, location=location, fields=len(weather_fields))
return weather_data
except Exception as e:
logger.error("Failed to process weather data", topic=topic, error=str(e))
return None
def _extract_timestamp(self, data: Dict[str, Any]) -> datetime:
"""Extract timestamp from data or use current time.
Args:
data: Message data
Returns:
Datetime object
"""
timestamp_fields = ['timestamp', 'time', 'datetime', 'ts', 'date_time']
for field in timestamp_fields:
if field in data:
try:
value = data[field]
if isinstance(value, str):
# Try different timestamp formats
formats = [
'%Y-%m-%dT%H:%M:%S.%fZ',
'%Y-%m-%dT%H:%M:%SZ',
'%Y-%m-%d %H:%M:%S.%f',
'%Y-%m-%d %H:%M:%S',
'%Y/%m/%d %H:%M:%S',
'%d/%m/%Y %H:%M:%S',
]
for fmt in formats:
try:
return datetime.strptime(value, fmt)
except ValueError:
continue
elif isinstance(value, (int, float)):
# Unix timestamp
return datetime.fromtimestamp(value)
except Exception as e:
logger.debug("Failed to parse timestamp field", field=field, value=value, error=str(e))
continue
# Default to current time
return datetime.now()
def _extract_location(self, topic: str, data: Dict[str, Any]) -> Optional[str]:
"""Extract location from topic or data.
Args:
topic: MQTT topic
data: Message data
Returns:
Location string or None
"""
# Check data fields first
location_fields = ['location', 'station', 'site', 'sensor_id', 'device_id', 'name']
for field in location_fields:
if field in data and data[field]:
return str(data[field])
# Extract from topic (e.g., weather/garden/temp -> garden)
topic_parts = topic.split('/')
if len(topic_parts) >= 2:
# Skip 'weather' and take next part as location
for part in topic_parts[1:]:
if part and part not in ['temp', 'humidity', 'pressure', 'wind', 'rain']:
return part
return None
def _extract_weather_fields(self, data: Dict[str, Any]) -> Dict[str, Optional[float]]:
"""Extract weather measurement fields from data.
Args:
data: Message data
Returns:
Dictionary of weather fields
"""
weather_fields = {
'temperature': None,
'humidity': None,
'pressure': None,
'wind_speed': None,
'wind_direction': None,
'rainfall': None,
'solar_radiation': None
}
# Direct field mapping
for field_name, canonical_name in self.field_mappings.items():
if field_name in data:
value = self._convert_to_float(data[field_name])
if value is not None:
weather_fields[canonical_name] = value
# Check for exact matches
for canonical_name in weather_fields.keys():
if canonical_name in data:
value = self._convert_to_float(data[canonical_name])
if value is not None:
weather_fields[canonical_name] = value
# Special handling for temperature units
if weather_fields['temperature'] is not None:
temp_unit = self._detect_temperature_unit(data)
if temp_unit == 'F':
# Convert Fahrenheit to Celsius
weather_fields['temperature'] = (weather_fields['temperature'] - 32) * 5/9
# Special handling for pressure units
if weather_fields['pressure'] is not None:
pressure_unit = self._detect_pressure_unit(data)
if pressure_unit == 'inHg':
# Convert inches of mercury to hPa
weather_fields['pressure'] = weather_fields['pressure'] * 33.8639
elif pressure_unit == 'mmHg':
# Convert mmHg to hPa
weather_fields['pressure'] = weather_fields['pressure'] * 1.33322
# Special handling for wind speed units
if weather_fields['wind_speed'] is not None:
wind_unit = self._detect_wind_speed_unit(data)
if wind_unit == 'knot':
# Convert knots to m/s (1 knot = 0.514444 m/s)
weather_fields['wind_speed'] = weather_fields['wind_speed'] * 0.514444
elif wind_unit == 'mph':
# Convert mph to m/s
weather_fields['wind_speed'] = weather_fields['wind_speed'] * 0.44704
elif wind_unit == 'kmh':
# Convert km/h to m/s
weather_fields['wind_speed'] = weather_fields['wind_speed'] * 0.277778
# Validate and clamp values to database constraints
weather_fields = self._validate_field_ranges(weather_fields, data)
return weather_fields
def _convert_to_float(self, value: Any) -> Optional[float]:
"""Convert value to float if possible.
Args:
value: Value to convert
Returns:
Float value or None
"""
if value is None:
return None
try:
if isinstance(value, (int, float)):
return float(value)
elif isinstance(value, str):
# Remove common non-numeric suffixes
cleaned = re.sub(r'%\s]*[CFcf°]?$', '', value.strip())
cleaned = re.sub(r'%\s]*$', '', cleaned)
return float(cleaned)
else:
return float(value)
except (ValueError, TypeError):
return None
def _detect_temperature_unit(self, data: Dict[str, Any]) -> str:
"""Detect temperature unit from data.
Args:
data: Message data
Returns:
'C' for Celsius, 'F' for Fahrenheit
"""
# Check for unit fields
if 'temp_unit' in data:
unit = str(data['temp_unit']).upper()
if 'F' in unit:
return 'F'
# Check field names for unit indicators
for key in data.keys():
key_lower = key.lower()
if 'temp' in key_lower:
if '_f' in key_lower or 'fahrenheit' in key_lower:
return 'F'
elif '_c' in key_lower or 'celsius' in key_lower:
return 'C'
# Default to Celsius
return 'C'
def _detect_pressure_unit(self, data: Dict[str, Any]) -> str:
"""Detect pressure unit from data.
Args:
data: Message data
Returns:
'hPa', 'inHg', or 'mmHg'
"""
# Check for unit fields
if 'pressure_unit' in data:
unit = str(data['pressure_unit']).lower()
if 'inhg' in unit or 'inch' in unit:
return 'inHg'
elif 'mmhg' in unit:
return 'mmHg'
# Check field names for unit indicators
for key in data.keys():
key_lower = key.lower()
if 'pressure' in key_lower or 'press' in key_lower:
if 'inhg' in key_lower or 'inch' in key_lower:
return 'inHg'
elif 'mmhg' in key_lower:
return 'mmHg'
elif 'hpa' in key_lower or 'mbar' in key_lower:
return 'hPa'
# Default to hPa
return 'hPa'
def _detect_wind_speed_unit(self, data: Dict[str, Any]) -> str:
"""Detect wind speed unit from data.
Args:
data: Message data
Returns:
'ms', 'kmh', 'mph', or 'knot'
"""
# Check for unit fields
if 'wind_speed_unit' in data:
unit = str(data['wind_speed_unit']).lower()
if 'knot' in unit or 'kt' in unit:
return 'knot'
elif 'mph' in unit:
return 'mph'
elif 'kmh' in unit or 'km/h' in unit:
return 'kmh'
# Check field names for unit indicators
for key in data.keys():
key_lower = key.lower()
if 'wind' in key_lower and 'speed' in key_lower:
if 'knot' in key_lower or '_kt' in key_lower:
return 'knot'
elif 'mph' in key_lower:
return 'mph'
elif 'kmh' in key_lower or 'km_h' in key_lower:
return 'kmh'
elif 'ms' in key_lower or 'm_s' in key_lower:
return 'ms'
# Default to m/s
return 'ms'
def _validate_field_ranges(self, fields: Dict[str, Optional[float]], raw_data: Dict[str, Any]) -> Dict[str, Optional[float]]:
"""Validate and clamp field values to database constraints.
Args:
fields: Weather fields dictionary
raw_data: Original MQTT message data
Returns:
Validated fields dictionary
"""
# Database column constraints (precision, scale)
constraints = {
'temperature': {'max': 999.99, 'min': -999.99},
'humidity': {'max': 100.00, 'min': 0.00},
'pressure': {'max': 9999.99, 'min': 0.00},
'wind_speed': {'max': 999.99, 'min': 0.00},
'wind_direction': {'max': 360.0, 'min': 0.0},
'rainfall': {'max': 9999.99, 'min': 0.00}
}
validated_fields = {}
for field_name, value in fields.items():
if value is None:
validated_fields[field_name] = None
continue
constraint = constraints.get(field_name)
if constraint:
# Check if value is out of range
if value > constraint['max'] or value < constraint['min']:
logger.warning(f"Clamping {field_name} value {value} to range [{constraint['min']}, {constraint['max']}]")
if value > constraint['max']:
validated_fields[field_name] = constraint['max']
else:
validated_fields[field_name] = constraint['min']
else:
validated_fields[field_name] = value
else:
validated_fields[field_name] = value
return validated_fields

View File

@@ -0,0 +1,15 @@
"""Database abstraction layer for Weather Connect."""
from .base import DatabaseInterface
from .oracle_db import OracleDatabase
from .mssql_db import MSSQLDatabase
from .postgresql_db import PostgreSQLDatabase
from .mysql_db import MySQLDatabase
__all__ = [
"DatabaseInterface",
"OracleDatabase",
"MSSQLDatabase",
"PostgreSQLDatabase",
"MySQLDatabase"
]

View File

@@ -0,0 +1,121 @@
"""Base database interface and common functionality."""
from abc import ABC, abstractmethod
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from datetime import datetime
import structlog
logger = structlog.get_logger(__name__)
@dataclass
class WeatherData:
"""Weather data structure for database storage."""
timestamp: datetime
topic: str
location: Optional[str] = None
temperature: Optional[float] = None
humidity: Optional[float] = None
pressure: Optional[float] = None
wind_speed: Optional[float] = None
wind_direction: Optional[float] = None
rainfall: Optional[float] = None
raw_data: Optional[str] = None
class DatabaseInterface(ABC):
"""Abstract base class for database implementations."""
def __init__(self, config: Dict[str, Any]):
"""Initialize database connection.
Args:
config: Database configuration dictionary
"""
self.config = config
self.connection = None
self.schema = config.get('schema')
@abstractmethod
def connect(self) -> bool:
"""Establish database connection.
Returns:
True if connection successful, False otherwise
"""
pass
@abstractmethod
def disconnect(self):
"""Close database connection."""
pass
@abstractmethod
def create_tables(self) -> bool:
"""Create necessary database tables.
Returns:
True if tables created successfully, False otherwise
"""
pass
@abstractmethod
def insert_weather_data(self, data: WeatherData) -> bool:
"""Insert weather data into the database.
Args:
data: Weather data to insert
Returns:
True if insertion successful, False otherwise
"""
pass
@abstractmethod
def insert_weather_data_batch(self, data_list: List[WeatherData]) -> bool:
"""Insert multiple weather data records into the database.
Args:
data_list: List of weather data to insert
Returns:
True if all insertions successful, False otherwise
"""
pass
@abstractmethod
def is_connected(self) -> bool:
"""Check if database connection is active.
Returns:
True if connected, False otherwise
"""
pass
def get_table_name(self, base_name: str) -> str:
"""Get fully qualified table name with schema if specified.
Args:
base_name: Base table name
Returns:
Fully qualified table name
"""
if self.schema:
return f"{self.schema}.{base_name}"
return base_name
def log_error(self, operation: str, error: Exception):
"""Log database errors consistently.
Args:
operation: The operation that failed
error: The exception that occurred
"""
logger.error(
"Database operation failed",
operation=operation,
error=str(error),
db_type=self.__class__.__name__
)

View File

@@ -0,0 +1,47 @@
"""Database factory for creating database instances."""
from typing import Dict, Any
from .base import DatabaseInterface
from .oracle_db import OracleDatabase
from .mssql_db import MSSQLDatabase
from .postgresql_db import PostgreSQLDatabase
from .mysql_db import MySQLDatabase
import structlog
logger = structlog.get_logger(__name__)
class DatabaseFactory:
"""Factory class for creating database instances."""
@staticmethod
def create_database(db_type: str, config: Dict[str, Any]) -> DatabaseInterface:
"""Create a database instance based on the type.
Args:
db_type: Type of database (oracle, mssql, postgresql, mysql)
config: Database configuration
Returns:
Database instance
Raises:
ValueError: If database type is not supported
"""
db_type = db_type.lower()
if db_type == "oracle":
return OracleDatabase(config)
elif db_type == "mssql":
return MSSQLDatabase(config)
elif db_type == "postgresql":
return PostgreSQLDatabase(config)
elif db_type == "mysql":
return MySQLDatabase(config)
else:
raise ValueError(f"Unsupported database type: {db_type}")
@staticmethod
def get_supported_databases() -> list:
"""Get list of supported database types."""
return ["oracle", "mssql", "postgresql", "mysql"]

View File

@@ -0,0 +1,202 @@
"""Microsoft SQL Server database implementation."""
import pyodbc
from typing import Dict, List, Any
import structlog
from .base import DatabaseInterface, WeatherData
logger = structlog.get_logger(__name__)
class MSSQLDatabase(DatabaseInterface):
"""Microsoft SQL Server database implementation."""
def __init__(self, config: Dict[str, Any]):
"""Initialize MS SQL database connection."""
super().__init__(config)
def connect(self) -> bool:
"""Establish MS SQL database connection."""
try:
driver = self.config.get('mssql_driver', 'ODBC Driver 17 for SQL Server')
connection_string = (
f"DRIVER={{{driver}}};"
f"SERVER={self.config['host']},{self.config['port']};"
f"DATABASE={self.config['name']};"
f"UID={self.config['username']};"
f"PWD={self.config['password']};"
"TrustServerCertificate=yes;"
)
logger.info("Connecting to MS SQL database", host=self.config['host'], port=self.config['port'])
self.connection = pyodbc.connect(connection_string)
self.connection.autocommit = False
# Test the connection
cursor = self.connection.cursor()
cursor.execute("SELECT 1")
cursor.close()
logger.info("Successfully connected to MS SQL database")
return True
except Exception as e:
self.log_error("connect", e)
return False
def disconnect(self):
"""Close MS SQL database connection."""
if self.connection:
try:
self.connection.close()
logger.info("Disconnected from MS SQL database")
except Exception as e:
self.log_error("disconnect", e)
finally:
self.connection = None
def create_tables(self) -> bool:
"""Create MS SQL database tables."""
if not self.is_connected():
return False
try:
cursor = self.connection.cursor()
# Create weather_data table
table_name = self.get_table_name("weather_data")
create_sql = f"""
IF NOT EXISTS (SELECT * FROM sysobjects WHERE name='{table_name.split('.')[-1]}' AND xtype='U')
CREATE TABLE {table_name} (
id BIGINT IDENTITY(1,1) PRIMARY KEY,
timestamp DATETIME2(6) NOT NULL,
topic NVARCHAR(255) NOT NULL,
location NVARCHAR(100),
temperature DECIMAL(5,2),
humidity DECIMAL(5,2),
pressure DECIMAL(7,2),
wind_speed DECIMAL(5,2),
wind_direction DECIMAL(5,2),
rainfall DECIMAL(6,2),
raw_data NVARCHAR(MAX),
created_at DATETIME2(6) DEFAULT GETDATE()
)
"""
cursor.execute(create_sql)
# Create index on timestamp and topic
index_name = f"idx_{table_name.replace('.', '_')}_ts_topic"
index_sql = f"""
IF NOT EXISTS (SELECT * FROM sys.indexes WHERE name = '{index_name}')
CREATE INDEX {index_name} ON {table_name} (timestamp, topic)
"""
cursor.execute(index_sql)
self.connection.commit()
cursor.close()
logger.info("Successfully created MS SQL tables", table=table_name)
return True
except Exception as e:
self.log_error("create_tables", e)
return False
def insert_weather_data(self, data: WeatherData) -> bool:
"""Insert weather data into MS SQL database."""
if not self.is_connected():
return False
try:
cursor = self.connection.cursor()
table_name = self.get_table_name("weather_data")
insert_sql = f"""
INSERT INTO {table_name}
(timestamp, topic, location, temperature, humidity, pressure,
wind_speed, wind_direction, rainfall, raw_data)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
"""
cursor.execute(insert_sql, (
data.timestamp,
data.topic,
data.location,
data.temperature,
data.humidity,
data.pressure,
data.wind_speed,
data.wind_direction,
data.rainfall,
data.raw_data
))
self.connection.commit()
cursor.close()
logger.debug("Successfully inserted weather data into MS SQL", topic=data.topic)
return True
except Exception as e:
self.log_error("insert_weather_data", e)
return False
def insert_weather_data_batch(self, data_list: List[WeatherData]) -> bool:
"""Insert multiple weather data records into MS SQL database."""
if not self.is_connected() or not data_list:
return False
try:
cursor = self.connection.cursor()
table_name = self.get_table_name("weather_data")
insert_sql = f"""
INSERT INTO {table_name}
(timestamp, topic, location, temperature, humidity, pressure,
wind_speed, wind_direction, rainfall, raw_data)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
"""
# Prepare data for batch insert
batch_data = [
(
data.timestamp,
data.topic,
data.location,
data.temperature,
data.humidity,
data.pressure,
data.wind_speed,
data.wind_direction,
data.rainfall,
data.raw_data
)
for data in data_list
]
cursor.executemany(insert_sql, batch_data)
self.connection.commit()
cursor.close()
logger.info("Successfully inserted weather data batch into MS SQL", count=len(data_list))
return True
except Exception as e:
self.log_error("insert_weather_data_batch", e)
return False
def is_connected(self) -> bool:
"""Check if MS SQL database connection is active."""
if not self.connection:
return False
try:
cursor = self.connection.cursor()
cursor.execute("SELECT 1")
cursor.close()
return True
except:
return False

View File

@@ -0,0 +1,189 @@
"""MySQL database implementation."""
import pymysql
from typing import Dict, List, Any
import structlog
from .base import DatabaseInterface, WeatherData
logger = structlog.get_logger(__name__)
class MySQLDatabase(DatabaseInterface):
"""MySQL database implementation."""
def __init__(self, config: Dict[str, Any]):
"""Initialize MySQL database connection."""
super().__init__(config)
def connect(self) -> bool:
"""Establish MySQL database connection."""
try:
logger.info("Connecting to MySQL database", host=self.config['host'], port=self.config['port'])
self.connection = pymysql.connect(
host=self.config['host'],
port=self.config['port'],
database=self.config['name'],
user=self.config['username'],
password=self.config['password'],
charset='utf8mb4',
autocommit=False
)
# Test the connection
cursor = self.connection.cursor()
cursor.execute("SELECT 1")
cursor.close()
logger.info("Successfully connected to MySQL database")
return True
except Exception as e:
self.log_error("connect", e)
return False
def disconnect(self):
"""Close MySQL database connection."""
if self.connection:
try:
self.connection.close()
logger.info("Disconnected from MySQL database")
except Exception as e:
self.log_error("disconnect", e)
finally:
self.connection = None
def create_tables(self) -> bool:
"""Create MySQL database tables."""
if not self.is_connected():
return False
try:
cursor = self.connection.cursor()
# Create weather_data table
table_name = self.get_table_name("weather_data")
create_sql = f"""
CREATE TABLE IF NOT EXISTS {table_name} (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
timestamp DATETIME(6) NOT NULL,
topic VARCHAR(255) NOT NULL,
location VARCHAR(100),
temperature DECIMAL(5,2),
humidity DECIMAL(5,2),
pressure DECIMAL(7,2),
wind_speed DECIMAL(5,2),
wind_direction DECIMAL(5,2),
rainfall DECIMAL(6,2),
raw_data TEXT,
created_at TIMESTAMP(6) DEFAULT CURRENT_TIMESTAMP(6),
INDEX idx_timestamp_topic (timestamp, topic)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
"""
cursor.execute(create_sql)
self.connection.commit()
cursor.close()
logger.info("Successfully created MySQL tables", table=table_name)
return True
except Exception as e:
self.log_error("create_tables", e)
return False
def insert_weather_data(self, data: WeatherData) -> bool:
"""Insert weather data into MySQL database."""
if not self.is_connected():
return False
try:
cursor = self.connection.cursor()
table_name = self.get_table_name("weather_data")
insert_sql = f"""
INSERT INTO {table_name}
(timestamp, topic, location, temperature, humidity, pressure,
wind_speed, wind_direction, rainfall, raw_data)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
cursor.execute(insert_sql, (
data.timestamp,
data.topic,
data.location,
data.temperature,
data.humidity,
data.pressure,
data.wind_speed,
data.wind_direction,
data.rainfall,
data.raw_data
))
self.connection.commit()
cursor.close()
logger.debug("Successfully inserted weather data into MySQL", topic=data.topic)
return True
except Exception as e:
self.log_error("insert_weather_data", e)
return False
def insert_weather_data_batch(self, data_list: List[WeatherData]) -> bool:
"""Insert multiple weather data records into MySQL database."""
if not self.is_connected() or not data_list:
return False
try:
cursor = self.connection.cursor()
table_name = self.get_table_name("weather_data")
insert_sql = f"""
INSERT INTO {table_name}
(timestamp, topic, location, temperature, humidity, pressure,
wind_speed, wind_direction, rainfall, raw_data)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
# Prepare data for batch insert
batch_data = [
(
data.timestamp,
data.topic,
data.location,
data.temperature,
data.humidity,
data.pressure,
data.wind_speed,
data.wind_direction,
data.rainfall,
data.raw_data
)
for data in data_list
]
cursor.executemany(insert_sql, batch_data)
self.connection.commit()
cursor.close()
logger.info("Successfully inserted weather data batch into MySQL", count=len(data_list))
return True
except Exception as e:
self.log_error("insert_weather_data_batch", e)
return False
def is_connected(self) -> bool:
"""Check if MySQL database connection is active."""
if not self.connection:
return False
try:
cursor = self.connection.cursor()
cursor.execute("SELECT 1")
cursor.close()
return True
except:
return False

View File

@@ -0,0 +1,252 @@
"""Oracle database implementation."""
import oracledb
from typing import Dict, List, Any
import structlog
from .base import DatabaseInterface, WeatherData
logger = structlog.get_logger(__name__)
class OracleDatabase(DatabaseInterface):
"""Oracle database implementation."""
def __init__(self, config: Dict[str, Any]):
"""Initialize Oracle database connection."""
super().__init__(config)
self.dsn = None
def get_table_name(self, base_name: str) -> str:
"""Get table name for Oracle (no schema prefix needed when connected as owner)."""
return base_name
def connect(self) -> bool:
"""Establish Oracle database connection."""
try:
# Use the working connection string we validated
connect_string = f"{self.config['host']}:{self.config['port']}/{self.config['name']}"
logger.info("Attempting Oracle database connection",
host=self.config['host'],
port=self.config['port'])
logger.info("Connecting to Oracle", connect_string=connect_string)
self.connection = oracledb.connect(
user=self.config['username'],
password=self.config['password'],
dsn=connect_string
)
# Test the connection
cursor = self.connection.cursor()
cursor.execute("SELECT 1 FROM DUAL")
result = cursor.fetchone()
cursor.close()
logger.info("Successfully connected to Oracle database")
return True
except Exception as e:
self.log_error("connect", e)
return False
def disconnect(self):
"""Close Oracle database connection."""
if self.connection:
try:
self.connection.close()
logger.info("Disconnected from Oracle database")
except Exception as e:
self.log_error("disconnect", e)
finally:
self.connection = None
def create_tables(self) -> bool:
"""Create Oracle database tables."""
if not self.is_connected():
return False
try:
cursor = self.connection.cursor()
# Create sequence for weather_data id
seq_sql = "CREATE SEQUENCE weather_data_seq START WITH 1 INCREMENT BY 1"
cursor.execute(seq_sql)
# Create weather_data table
create_sql = """
CREATE TABLE weather_data (
id NUMBER PRIMARY KEY,
timestamp TIMESTAMP(6) NOT NULL,
topic VARCHAR2(255) NOT NULL,
location VARCHAR2(100),
temperature NUMBER(5,2),
humidity NUMBER(5,2),
pressure NUMBER(7,2),
wind_speed NUMBER(5,2),
wind_direction NUMBER(5,2),
rainfall NUMBER(6,2),
created_at TIMESTAMP(6) DEFAULT CURRENT_TIMESTAMP
)
"""
cursor.execute(create_sql)
# Create raw_data table
raw_create_sql = """
CREATE TABLE raw_data (
weather_data_id NUMBER PRIMARY KEY,
raw_data CLOB,
CONSTRAINT fk_raw_data_weather_data
FOREIGN KEY (weather_data_id) REFERENCES weather_data (id) ON DELETE CASCADE,
CONSTRAINT chk_raw_data_json CHECK (raw_data IS JSON)
)
"""
cursor.execute(raw_create_sql)
# Create index on timestamp and topic
index_sql = "CREATE INDEX idx_weather_data_ts_topic ON weather_data (timestamp, topic)"
cursor.execute(index_sql)
self.connection.commit()
cursor.close()
logger.info("Successfully created Oracle tables")
return True
except oracledb.DatabaseError as e:
error, = e.args
if error.code == 955: # Table/sequence already exists
logger.info("Oracle tables already exist")
return True
else:
self.log_error("create_tables", e)
return False
except Exception as e:
self.log_error("create_tables", e)
return False
def insert_weather_data(self, data: WeatherData) -> bool:
"""Insert weather data into Oracle database."""
if not self.is_connected():
return False
try:
cursor = self.connection.cursor()
table_name = "weather_data"
seq_name = "weather_data_seq"
raw_table_name = "raw_data"
# Insert into weather_data first
insert_sql = f"""
INSERT INTO {table_name}
(id, timestamp, topic, location, temperature, humidity, pressure,
wind_speed, wind_direction, rainfall)
VALUES ({seq_name}.NEXTVAL, :timestamp, :topic, :location, :temperature,
:humidity, :pressure, :wind_speed, :wind_direction, :rainfall)
"""
cursor.execute(insert_sql, {
'timestamp': data.timestamp,
'topic': data.topic,
'location': data.location,
'temperature': data.temperature,
'humidity': data.humidity,
'pressure': data.pressure,
'wind_speed': data.wind_speed,
'wind_direction': data.wind_direction,
'rainfall': data.rainfall
})
# Get the inserted ID
cursor.execute(f"SELECT {seq_name}.CURRVAL FROM DUAL")
weather_id = cursor.fetchone()[0]
# Insert into raw_data if raw_data is provided
if data.raw_data is not None:
raw_insert_sql = f"""
INSERT INTO {raw_table_name} (weather_data_id, raw_data)
VALUES (:1, :2)
"""
cursor.execute(raw_insert_sql, (weather_id, data.raw_data))
self.connection.commit()
cursor.close()
logger.debug("Successfully inserted weather data into Oracle", topic=data.topic)
return True
except Exception as e:
self.log_error("insert_weather_data", e)
return False
def insert_weather_data_batch(self, data_list: List[WeatherData]) -> bool:
"""Insert multiple weather data records into Oracle database."""
if not self.is_connected() or not data_list:
return False
try:
cursor = self.connection.cursor()
table_name = "weather_data"
seq_name = "weather_data_seq"
raw_table_name = "raw_data"
weather_insert_sql = f"""
INSERT INTO {table_name}
(id, timestamp, topic, location, temperature, humidity, pressure,
wind_speed, wind_direction, rainfall)
VALUES ({seq_name}.NEXTVAL, :timestamp, :topic, :location, :temperature,
:humidity, :pressure, :wind_speed, :wind_direction, :rainfall)
"""
raw_insert_sql = f"""
INSERT INTO {raw_table_name} (weather_data_id, raw_data)
VALUES (:id, :raw_data)
"""
for data in data_list:
cursor.execute(weather_insert_sql, {
'timestamp': data.timestamp,
'topic': data.topic,
'location': data.location,
'temperature': data.temperature,
'humidity': data.humidity,
'pressure': data.pressure,
'wind_speed': data.wind_speed,
'wind_direction': data.wind_direction,
'rainfall': data.rainfall
})
# Get the inserted ID
cursor.execute(f"SELECT {seq_name}.CURRVAL FROM DUAL")
weather_id = cursor.fetchone()[0]
if data.raw_data is not None:
cursor.execute(raw_insert_sql, {'id': weather_id, 'raw_data': data.raw_data})
self.connection.commit()
cursor.close()
logger.info("Successfully inserted weather data batch into Oracle", count=len(data_list))
return True
except Exception as e:
self.log_error("insert_weather_data_batch", e)
return False
def is_connected(self) -> bool:
"""Check if Oracle database connection is active."""
if not self.connection:
return False
try:
cursor = self.connection.cursor()
cursor.execute("SELECT 1 FROM DUAL")
cursor.close()
return True
except:
return False

View File

@@ -0,0 +1,194 @@
"""PostgreSQL database implementation."""
import psycopg2
from typing import Dict, List, Any
import structlog
from .base import DatabaseInterface, WeatherData
logger = structlog.get_logger(__name__)
class PostgreSQLDatabase(DatabaseInterface):
"""PostgreSQL database implementation."""
def __init__(self, config: Dict[str, Any]):
"""Initialize PostgreSQL database connection."""
super().__init__(config)
def connect(self) -> bool:
"""Establish PostgreSQL database connection."""
try:
logger.info("Connecting to PostgreSQL database", host=self.config['host'], port=self.config['port'])
self.connection = psycopg2.connect(
host=self.config['host'],
port=self.config['port'],
database=self.config['name'],
user=self.config['username'],
password=self.config['password']
)
self.connection.autocommit = False
# Test the connection
cursor = self.connection.cursor()
cursor.execute("SELECT 1")
cursor.close()
logger.info("Successfully connected to PostgreSQL database")
return True
except Exception as e:
self.log_error("connect", e)
return False
def disconnect(self):
"""Close PostgreSQL database connection."""
if self.connection:
try:
self.connection.close()
logger.info("Disconnected from PostgreSQL database")
except Exception as e:
self.log_error("disconnect", e)
finally:
self.connection = None
def create_tables(self) -> bool:
"""Create PostgreSQL database tables."""
if not self.is_connected():
return False
try:
cursor = self.connection.cursor()
# Create weather_data table
table_name = self.get_table_name("weather_data")
create_sql = f"""
CREATE TABLE IF NOT EXISTS {table_name} (
id BIGSERIAL PRIMARY KEY,
timestamp TIMESTAMP(6) NOT NULL,
topic VARCHAR(255) NOT NULL,
location VARCHAR(100),
temperature DECIMAL(5,2),
humidity DECIMAL(5,2),
pressure DECIMAL(7,2),
wind_speed DECIMAL(5,2),
wind_direction DECIMAL(5,2),
rainfall DECIMAL(6,2),
raw_data TEXT,
created_at TIMESTAMP(6) DEFAULT CURRENT_TIMESTAMP
)
"""
cursor.execute(create_sql)
# Create index on timestamp and topic
index_name = f"idx_{table_name.replace('.', '_')}_ts_topic"
index_sql = f"CREATE INDEX IF NOT EXISTS {index_name} ON {table_name} (timestamp, topic)"
cursor.execute(index_sql)
self.connection.commit()
cursor.close()
logger.info("Successfully created PostgreSQL tables", table=table_name)
return True
except Exception as e:
self.log_error("create_tables", e)
return False
def insert_weather_data(self, data: WeatherData) -> bool:
"""Insert weather data into PostgreSQL database."""
if not self.is_connected():
return False
try:
cursor = self.connection.cursor()
table_name = self.get_table_name("weather_data")
insert_sql = f"""
INSERT INTO {table_name}
(timestamp, topic, location, temperature, humidity, pressure,
wind_speed, wind_direction, rainfall, raw_data)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
cursor.execute(insert_sql, (
data.timestamp,
data.topic,
data.location,
data.temperature,
data.humidity,
data.pressure,
data.wind_speed,
data.wind_direction,
data.rainfall,
data.raw_data
))
self.connection.commit()
cursor.close()
logger.debug("Successfully inserted weather data into PostgreSQL", topic=data.topic)
return True
except Exception as e:
self.log_error("insert_weather_data", e)
return False
def insert_weather_data_batch(self, data_list: List[WeatherData]) -> bool:
"""Insert multiple weather data records into PostgreSQL database."""
if not self.is_connected() or not data_list:
return False
try:
cursor = self.connection.cursor()
table_name = self.get_table_name("weather_data")
insert_sql = f"""
INSERT INTO {table_name}
(timestamp, topic, location, temperature, humidity, pressure,
wind_speed, wind_direction, rainfall, raw_data)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
# Prepare data for batch insert
batch_data = [
(
data.timestamp,
data.topic,
data.location,
data.temperature,
data.humidity,
data.pressure,
data.wind_speed,
data.wind_direction,
data.rainfall,
data.raw_data
)
for data in data_list
]
cursor.executemany(insert_sql, batch_data)
self.connection.commit()
cursor.close()
logger.info("Successfully inserted weather data batch into PostgreSQL", count=len(data_list))
return True
except Exception as e:
self.log_error("insert_weather_data_batch", e)
return False
def is_connected(self) -> bool:
"""Check if PostgreSQL database connection is active."""
if not self.connection:
return False
try:
cursor = self.connection.cursor()
cursor.execute("SELECT 1")
cursor.close()
return True
except:
return False

286
src/wxconnect/main.py Normal file
View File

@@ -0,0 +1,286 @@
"""Main application entry point."""
import sys
import signal
import time
from queue import Queue, Empty
from threading import Thread, Event
import structlog
from .config import Config
from .mqtt_client import MQTTClient
from .data_processor import WeatherDataProcessor
from .database.factory import DatabaseFactory
# Configure structured logging
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.JSONRenderer()
],
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
cache_logger_on_first_use=True,
)
logger = structlog.get_logger(__name__)
class WeatherConnectApp:
"""Main application class for Weather Connect."""
def __init__(self):
"""Initialize the application."""
self.config = None
self.mqtt_client = None
self.database = None
self.data_processor = WeatherDataProcessor()
self.message_queue = Queue()
self.shutdown_event = Event()
self.database_thread = None
def load_config(self, config_file: str = ".env") -> bool:
"""Load application configuration.
Args:
config_file: Path to configuration file
Returns:
True if configuration loaded successfully
"""
try:
self.config = Config(config_file)
if not self.config.validate():
logger.error("Configuration validation failed")
return False
# Set log level
import logging
log_level = getattr(logging, self.config.app.log_level.upper(), logging.INFO)
logging.basicConfig(level=log_level)
logger.info("Configuration loaded successfully",
mqtt_host=self.config.mqtt.host,
db_type=self.config.database.db_type)
return True
except Exception as e:
logger.error("Failed to load configuration", error=str(e))
return False
def setup_database(self):
"""Initialize database connection with retry logic."""
logger.info("Setting up database connection")
max_retries = 10
retry_delay = 30 # 30 seconds between retries
for attempt in range(max_retries):
try:
import dataclasses
config_dict = dataclasses.asdict(self.config.database)
self.db = DatabaseFactory.create_database(
self.config.database.db_type,
config_dict
)
if self.db.connect():
logger.info("Database connection established")
self.db.create_tables()
logger.info("Database tables ready")
self.database = self.db
return True
else:
raise Exception("Failed to connect to database")
except Exception as e:
logger.warning(f"Database connection attempt {attempt + 1}/{max_retries} failed",
error=str(e))
if attempt < max_retries - 1:
logger.info(f"Retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
else:
logger.error("All database connection attempts exhausted")
return False
def initialize_mqtt(self) -> bool:
"""Initialize MQTT client.
Returns:
True if MQTT client initialized successfully
"""
try:
self.mqtt_client = MQTTClient(
self.config.mqtt,
self._on_mqtt_message
)
logger.info("MQTT client initialized successfully")
return True
except Exception as e:
logger.error("Failed to initialize MQTT client", error=str(e))
return False
def _on_mqtt_message(self, topic: str, data: any):
"""Handle incoming MQTT messages.
Args:
topic: MQTT topic
data: Message data
"""
try:
# Put message in queue for processing
self.message_queue.put((topic, data), block=False)
except Exception as e:
logger.error("Failed to queue MQTT message", topic=topic, error=str(e))
def _database_worker(self):
"""Database worker thread for processing messages."""
batch_data = []
last_batch_time = time.time()
logger.info("Database worker started")
while not self.shutdown_event.is_set():
try:
# Try to get a message from the queue
try:
topic, data = self.message_queue.get(timeout=1.0)
except Empty:
# Check if we should flush the batch due to timeout
if batch_data and (time.time() - last_batch_time) > self.config.app.batch_timeout:
self._flush_batch(batch_data)
batch_data = []
last_batch_time = time.time()
continue
# Process the message
weather_data = self.data_processor.process_message(topic, data)
if weather_data:
batch_data.append(weather_data)
# Check if batch is full
if len(batch_data) >= self.config.app.batch_size:
self._flush_batch(batch_data)
batch_data = []
last_batch_time = time.time()
self.message_queue.task_done()
except Exception as e:
logger.error("Error in database worker", error=str(e))
time.sleep(1)
# Flush remaining data on shutdown
if batch_data:
self._flush_batch(batch_data)
logger.info("Database worker stopped")
def _flush_batch(self, batch_data):
"""Flush batch data to database.
Args:
batch_data: List of WeatherData objects
"""
if not batch_data or not self.database:
return
try:
if len(batch_data) == 1:
success = self.database.insert_weather_data(batch_data[0])
else:
success = self.database.insert_weather_data_batch(batch_data)
if success:
logger.info("Successfully inserted weather data batch", count=len(batch_data))
else:
logger.error("Failed to insert weather data batch", count=len(batch_data))
except Exception as e:
logger.error("Error flushing batch to database", error=str(e), count=len(batch_data))
def run(self):
"""Run the main application."""
logger.info("Starting Weather Connect application")
# Set up signal handlers
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
try:
# Load configuration
if not self.load_config():
return 1
# Initialize database
if not self.setup_database():
return 1
# Initialize MQTT
if not self.initialize_mqtt():
return 1
# Start database worker thread
self.database_thread = Thread(target=self._database_worker, daemon=True)
self.database_thread.start()
# Start MQTT client (blocking)
logger.info("Starting MQTT client loop")
self.mqtt_client.start_loop(self.config.app.reconnect_interval)
except KeyboardInterrupt:
logger.info("Received keyboard interrupt")
except Exception as e:
logger.error("Application error", error=str(e))
return 1
finally:
self.shutdown()
logger.info("Weather Connect application stopped")
return 0
def _signal_handler(self, signum, frame):
"""Handle shutdown signals.
Args:
signum: Signal number
frame: Current stack frame
"""
logger.info("Received shutdown signal", signal=signum)
self.shutdown()
def shutdown(self):
"""Shutdown the application gracefully."""
logger.info("Shutting down application")
# Set shutdown event
self.shutdown_event.set()
# Stop MQTT client
if self.mqtt_client:
self.mqtt_client.stop()
# Wait for database thread to finish
if self.database_thread and self.database_thread.is_alive():
logger.info("Waiting for database worker to finish")
self.database_thread.join(timeout=5)
# Close database connection
if self.database:
self.database.disconnect()
def main():
"""Main entry point."""
app = WeatherConnectApp()
return app.run()
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,182 @@
"""MQTT client for weather data subscription."""
import json
import time
from typing import Callable, Any, Optional
import paho.mqtt.client as mqtt
import structlog
from .config import MQTTConfig
logger = structlog.get_logger(__name__)
class MQTTClient:
"""MQTT client for subscribing to weather data."""
def __init__(self, config: MQTTConfig, message_callback: Callable[[str, Any], None]):
"""Initialize MQTT client.
Args:
config: MQTT configuration
message_callback: Callback function for processing messages
"""
self.config = config
self.message_callback = message_callback
self.client = mqtt.Client(client_id=config.client_id)
self.connected = False
self.running = False
# Set up callbacks
self.client.on_connect = self._on_connect
self.client.on_disconnect = self._on_disconnect
self.client.on_message = self._on_message
self.client.on_subscribe = self._on_subscribe
self.client.on_log = self._on_log
# Set credentials if provided
if config.username and config.password:
self.client.username_pw_set(config.username, config.password)
def _on_connect(self, client, userdata, flags, rc):
"""Callback for when the client connects to the broker."""
if rc == 0:
self.connected = True
logger.info("Connected to MQTT broker", host=self.config.host, port=self.config.port)
# Subscribe to the weather topic
result = client.subscribe(self.config.topic)
if result[0] == mqtt.MQTT_ERR_SUCCESS:
logger.info("Subscribed to topic", topic=self.config.topic)
else:
logger.error("Failed to subscribe to topic", topic=self.config.topic, error=result[0])
else:
self.connected = False
logger.error("Failed to connect to MQTT broker", rc=rc, host=self.config.host)
def _on_disconnect(self, client, userdata, rc):
"""Callback for when the client disconnects from the broker."""
self.connected = False
if rc != 0:
logger.warning("Unexpected disconnection from MQTT broker", rc=rc)
else:
logger.info("Disconnected from MQTT broker")
def _on_subscribe(self, client, userdata, mid, granted_qos):
"""Callback for successful subscription."""
logger.info("*** SUBSCRIPTION ACKNOWLEDGED ***", mid=mid, granted_qos=granted_qos)
def _on_message(self, client, userdata, msg):
"""Callback for when a message is received."""
try:
topic = msg.topic
payload = msg.payload.decode('utf-8')
logger.info("*** RECEIVED MQTT MESSAGE ***", topic=topic, payload_size=len(payload))
# Try to parse as JSON, fall back to string if it fails
try:
data = json.loads(payload)
except json.JSONDecodeError:
logger.warning("Failed to parse message as JSON, treating as string", topic=topic)
data = payload
# Call the message callback
self.message_callback(topic, data)
except Exception as e:
logger.error("Error processing MQTT message", error=str(e), topic=msg.topic)
def _on_log(self, client, userdata, level, buf):
"""Callback for MQTT client logging."""
if level == mqtt.MQTT_LOG_ERR:
logger.error("MQTT client error", message=buf)
elif level == mqtt.MQTT_LOG_WARNING:
logger.warning("MQTT client warning", message=buf)
else:
logger.debug("MQTT client log", level=level, message=buf)
def connect(self) -> bool:
"""Connect to the MQTT broker.
Returns:
True if connection successful, False otherwise
"""
try:
logger.info("Connecting to MQTT broker", host=self.config.host, port=self.config.port)
result = self.client.connect(self.config.host, self.config.port, 60)
if result == mqtt.MQTT_ERR_SUCCESS:
return True
else:
logger.error("Failed to connect to MQTT broker", error=result)
return False
except Exception as e:
logger.error("Exception while connecting to MQTT broker", error=str(e))
return False
def disconnect(self):
"""Disconnect from the MQTT broker."""
if self.connected:
logger.info("Disconnecting from MQTT broker")
self.client.disconnect()
def stop(self):
"""Stop the MQTT client loop."""
logger.info("Stopping MQTT client")
self.running = False
if self.connected:
self.client.disconnect()
# Stop the MQTT loop
try:
self.client.loop_stop()
except:
pass # Ignore errors if loop is already stopped
def start_loop(self, reconnect_interval: int = 30):
"""Start the MQTT client loop with reconnection logic.
Args:
reconnect_interval: Seconds to wait before attempting reconnection
"""
import threading
def mqtt_loop():
"""Run the MQTT loop in a separate thread."""
self.running = True
while self.running:
try:
if not self.connected:
if self.connect():
# Wait a bit for connection to establish
time.sleep(1)
else:
logger.warning("Failed to connect, retrying in {} seconds".format(reconnect_interval))
time.sleep(reconnect_interval)
continue
# Run the MQTT loop (blocking)
logger.info("Starting MQTT loop_forever")
self.client.loop_forever()
# If we get here, the loop exited
if self.running: # Only log if we're still supposed to be running
logger.warning("MQTT loop exited, attempting to reconnect in {} seconds".format(reconnect_interval))
time.sleep(reconnect_interval)
except Exception as e:
if self.running: # Only log if we're still supposed to be running
logger.error("Unexpected error in MQTT loop", error=str(e))
time.sleep(reconnect_interval)
# Start the MQTT loop in a separate thread
self.mqtt_thread = threading.Thread(target=mqtt_loop, daemon=True)
self.mqtt_thread.start()
# Keep the main thread alive until stopped
while self.running:
time.sleep(0.1)
def is_connected(self) -> bool:
"""Check if the client is connected to the broker."""
return self.connected