#!/usr/bin/env python # -*- coding: utf-8 -*- """ Standalone Meteostick reader script Based on the meteostick.py weewx driver This script opens a Meteostick device, initializes it, and continuously prints the raw messages received from Davis weather stations. """ import time import sys import optparse import math import string try: import serial except ImportError: print("Error: pyserial module is required. Install with: pip install pyserial") sys.exit(1) # Simple CRC16 implementation for standalone use def crc16(data): """Simple CRC16 implementation""" crc = 0 for byte in data: crc ^= ord(byte) << 8 for _ in range(8): if crc & 0x8000: crc = (crc << 1) ^ 0x1021 else: crc <<= 1 crc &= 0xFFFF return crc # Simple logging functions def logdbg(msg): print(f"DEBUG: {msg}") def loginf(msg): print(f"INFO: {msg}") def logerr(msg): print(f"ERROR: {msg}") # Constants from meteostick.py DEBUG_SERIAL = 0 DEBUG_RAIN = 0 DEBUG_PARSE = 0 MPH_TO_MPS = 1609.34 / 3600.0 # meter/mile * hour/second DEFAULT_SOIL_TEMP = 24 # C RAW = 0 # indices of table with raw values POT = 1 # indices of table with potentials # Lookup tables SM_MAP = {RAW: ( 99.2, 140.1, 218.7, 226.9, 266.8, 391.7, 475.6, 538.2, 596.1, 673.7, 720.1), POT: ( 0.0, 1.0, 9.0, 10.0, 15.0, 35.0, 55.0, 75.0, 100.0, 150.0, 200.0)} LW_MAP = {RAW: (857.0, 864.0, 895.0, 911.0, 940.0, 952.0, 991.0, 1013.0), POT: ( 15.0, 14.0, 5.0, 4.0, 3.0, 2.0, 1.0, 0.0)} def dbg_serial(verbosity, msg): if DEBUG_SERIAL >= verbosity: logdbg(msg) def dbg_parse(verbosity, msg): if DEBUG_PARSE >= verbosity: logdbg(msg) def _fmt(data): if not data: return '' return ' '.join(['%02x' % int(ord(x)) for x in data]) def calculate_thermistor_temp(temp_raw): """Calculate thermistor temperature using Davis formulas.""" # Convert temp_raw to a resistance (R) in kiloOhms a = 18.81099 b = 0.0009988027 r = a / (1.0 / temp_raw - b) / 1000 # k ohms # Steinhart-Hart parameters s1 = 0.002783573 s2 = 0.0002509406 try: thermistor_temp = 1 / (s1 + s2 * math.log(r)) - 273 dbg_parse(3, 'r (k ohm) %s temp_raw %s thermistor_temp %s' % (r, temp_raw, thermistor_temp)) return thermistor_temp except ValueError as e: logerr('thermistor_temp failed for temp_raw %s r (k ohm) %s error: %s' % (temp_raw, r, e)) return DEFAULT_SOIL_TEMP def lookup_potential(sensor_name, norm_fact, sensor_raw, sensor_temp, lookup): """Look up potential based upon a normalized raw value.""" # normalize raw value for standard temperature (DEFAULT_SOIL_TEMP) sensor_raw_norm = sensor_raw * (1 + norm_fact * (sensor_temp - DEFAULT_SOIL_TEMP)) numcols = len(lookup[RAW]) if sensor_raw_norm >= lookup[RAW][numcols - 1]: potential = lookup[POT][numcols - 1] # preset potential to last value dbg_parse(3, "%s: temp=%s fact=%s raw=%s norm=%s potential=%s >= RAW=%s" % (sensor_name, sensor_temp, norm_fact, sensor_raw, sensor_raw_norm, potential, lookup[RAW][numcols - 1])) else: potential = lookup[POT][0] # preset potential to first value # lookup sensor_raw_norm value in table for x in range(0, numcols): if sensor_raw_norm < lookup[RAW][x]: if x == 0: # 'pre zero' phase; potential = first value dbg_parse(3, "%s: temp=%s fact=%s raw=%s norm=%s potential=%s < RAW=%s" % (sensor_name, sensor_temp, norm_fact, sensor_raw, sensor_raw_norm, potential, lookup[RAW][0])) break else: # determine the potential value potential_per_raw = (lookup[POT][x] - lookup[POT][x - 1]) / (lookup[RAW][x] - lookup[RAW][x - 1]) potential_offset = (sensor_raw_norm - lookup[RAW][x - 1]) * potential_per_raw potential = lookup[POT][x - 1] + potential_offset dbg_parse(3, "%s: temp=%s fact=%s raw=%s norm=%s potential=%s RAW=%s to %s POT=%s to %s " % (sensor_name, sensor_temp, norm_fact, sensor_raw, sensor_raw_norm, potential, lookup[RAW][x - 1], lookup[RAW][x], lookup[POT][x - 1], lookup[POT][x])) break return potential # Simplified Fahrenheit to Celsius conversion def FtoC(temp_f): """Convert Fahrenheit to Celsius""" return (temp_f - 32.0) * 5.0 / 9.0 class Meteostick(object): DEFAULT_PORT = '/dev/ttyUSB0' DEFAULT_BAUDRATE = 115200 DEFAULT_FREQUENCY = 'EU' DEFAULT_RF_SENSITIVITY = 90 MAX_RF_SENSITIVITY = 125 RAW_CHANNEL = 0 # unused channel for the receiver stats in raw format def __init__(self, **cfg): self.port = cfg.get('port', self.DEFAULT_PORT) loginf('using serial port %s' % self.port) self.baudrate = cfg.get('baudrate', self.DEFAULT_BAUDRATE) loginf('using baudrate %s' % self.baudrate) freq = cfg.get('transceiver_frequency', self.DEFAULT_FREQUENCY) if freq not in ['EU', 'US', 'AU']: raise ValueError("invalid frequency %s" % freq) self.frequency = freq loginf('using frequency %s' % self.frequency) rfs = int(cfg.get('rf_sensitivity', self.DEFAULT_RF_SENSITIVITY)) absrfs = abs(rfs) if absrfs > self.MAX_RF_SENSITIVITY: raise ValueError("invalid RF sensitivity %s" % rfs) self.rfs = absrfs self.rf_threshold = absrfs * 2 loginf('using rf sensitivity %s (-%s dB)' % (rfs, absrfs)) channels = dict() channels['iss'] = int(cfg.get('iss_channel', 1)) channels['anemometer'] = int(cfg.get('anemometer_channel', 0)) channels['leaf_soil'] = int(cfg.get('leaf_soil_channel', 0)) channels['temp_hum_1'] = int(cfg.get('temp_hum_1_channel', 0)) channels['temp_hum_2'] = int(cfg.get('temp_hum_2_channel', 0)) if channels['anemometer'] == 0: channels['wind_channel'] = channels['iss'] else: channels['wind_channel'] = channels['anemometer'] self.channels = channels loginf('using iss_channel %s' % channels['iss']) loginf('using anemometer_channel %s' % channels['anemometer']) loginf('using leaf_soil_channel %s' % channels['leaf_soil']) loginf('using temp_hum_1_channel %s' % channels['temp_hum_1']) loginf('using temp_hum_2_channel %s' % channels['temp_hum_2']) self.transmitters = self.ch_to_xmit( channels['iss'], channels['anemometer'], channels['leaf_soil'], channels['temp_hum_1'], channels['temp_hum_2']) loginf('using transmitters %02x' % self.transmitters) self.timeout = 3 # seconds self.serial_port = None @staticmethod def ch_to_xmit(iss_channel, anemometer_channel, leaf_soil_channel, temp_hum_1_channel, temp_hum_2_channel): transmitters = 0 transmitters += 1 << (iss_channel - 1) if anemometer_channel != 0: transmitters += 1 << (anemometer_channel - 1) if leaf_soil_channel != 0: transmitters += 1 << (leaf_soil_channel - 1) if temp_hum_1_channel != 0: transmitters += 1 << (temp_hum_1_channel - 1) if temp_hum_2_channel != 0: transmitters += 1 << (temp_hum_2_channel - 1) return transmitters @staticmethod def _check_crc(msg, chksum): crc_result = crc16(msg) if crc_result != chksum: logerr('CRC result is 0x%04x, should be 0x%04x' % (crc_result, chksum)) raise ValueError("CRC error") def __enter__(self): self.open() return self def __exit__(self, _, value, traceback): self.close() def open(self): dbg_serial(1, "open serial port %s" % self.port) self.serial_port = serial.Serial(self.port, self.baudrate, timeout=self.timeout) def close(self): if self.serial_port is not None: dbg_serial(1, "close serial port %s" % self.port) self.serial_port.close() self.serial_port = None def get_readings(self): buf = self.serial_port.readline().decode('utf-8') if len(buf) > 0: dbg_serial(2, "station said: %s" % ' '.join(["%0.2X" % ord(c) for c in buf])) return buf.strip() def get_readings_with_retry(self, max_tries=5, retry_wait=10): for ntries in range(0, max_tries): try: return self.get_readings() except serial.serialutil.SerialException as e: loginf("Failed attempt %d of %d to get readings: %s" % (ntries + 1, max_tries, e)) time.sleep(retry_wait) else: msg = "Max retries (%d) exceeded for readings" % max_tries logerr(msg) raise Exception(msg) def reset(self, max_wait=30): """Reset the device, leaving it in a state that we can talk to it.""" loginf("establish communication with the meteostick") # flush any previous data in the input buffer self.serial_port.flushInput() # Send a reset command self.serial_port.write(b'r\n') # Wait until we see the ? character start_ts = time.time() ready = False response = '' while not ready: time.sleep(0.1) while self.serial_port.inWaiting() > 0: c = self.serial_port.read(1).decode('utf-8') if c == '?': ready = True elif c in string.printable: response += c if time.time() - start_ts > max_wait: raise Exception("No 'ready' response from meteostick after %s seconds" % max_wait) loginf("reset: %s" % response.split('\n')[0]) dbg_serial(2, "full response to reset: %s" % response) # Discard any serial input from the device time.sleep(0.2) self.serial_port.flushInput() return response def configure(self): """Configure the device to send data continuously.""" loginf("configure meteostick to logger mode") # Show default settings (they might change with a new firmware version) self.send_command('?') # Set RF threshold self.send_command('x' + str(self.rf_threshold)) # Listen to configured transmitters self.send_command('t' + str(self.transmitters)) # Filter transmissions from anything other than configured transmitters self.send_command('f1') # Listen to configured repeaters self.send_command('r1') # repeater 1 # Set device to produce 10-bytes raw data command = 'o3' self.send_command(command) # Set the frequency. Valid frequencies are US, EU and AU command = 'm0' # default to US if self.frequency == 'AU': command = 'm2' elif self.frequency == 'EU': command = 'm1' self.send_command(command) # From now on the device will produce lines with received data def send_command(self, cmd): cmd2 = (cmd + "\r").encode('utf-8') self.serial_port.write(cmd2) time.sleep(0.2) response = self.serial_port.read(self.serial_port.inWaiting()).decode('utf-8') dbg_serial(1, "cmd: '%s': %s" % (cmd, response)) self.serial_port.flushInput() @staticmethod def get_parts(raw): raw_str = str(raw) dbg_parse(1, "readings: %s" % raw_str) parts = raw.split(' ') dbg_parse(3, "parts: %s (%s)" % (parts, len(parts))) if len(parts) < 2: raise ValueError("not enough parts in '%s'" % raw) return parts def parse_readings(self, raw, rain_per_tip): data = dict() if not raw: return data if not all(c in string.printable for c in raw): logerr("unprintable characters in readings: %s" % _fmt(raw)) return data try: data = self.parse_raw(raw, self.channels['iss'], self.channels['anemometer'], self.channels['leaf_soil'], self.channels['temp_hum_1'], self.channels['temp_hum_2'], rain_per_tip) except ValueError as e: logerr("parse failed for '%s': %s" % (raw, e)) return data @staticmethod def parse_raw(raw, iss_ch, wind_ch, ls_ch, th1_ch, th2_ch, rain_per_tip): data = dict() parts = Meteostick.get_parts(raw) n = len(parts) if parts[0] == 'B': # message example: # B 29530 338141 366 101094 60 37 data['channel'] = Meteostick.RAW_CHANNEL # rf_signal data will not be used data['rf_signal'] = 0 # not available data['rf_missed'] = 0 # not available if n >= 6: data['temp_in'] = float(parts[3]) / 10.0 # C data['pressure'] = float(parts[4]) / 100.0 # hPa if n > 7: # only with custom receiver data['humidity_in'] = float(parts[7]) else: logerr("B: not enough parts (%s) in '%s'" % (n, raw)) elif parts[0] == 'I': # ...existing code... raw_msg = [0] * 10 for i in range(0, 10): raw_msg[i] = parts[i + 2] pkt = bytearray([int(i, base=16) for i in raw_msg]) # perform crc-check raw_msg_crc = [0] * 8 if pkt[8] == 0xFF and pkt[9] == 0xFF: # message received from davis equipment # Calculate crc with bytes 0-7, result must be equal to 0 chksum = 0 for i in range(0, 8): raw_msg_crc[i] = chr(int(parts[i + 2], 16)) Meteostick._check_crc(raw_msg_crc, chksum) else: # message received via repeater # Calculate crc with bytes 0-5 and 8-9, result must be equal # to bytes 6-7 chksum = (pkt[6] << 8) + pkt[7] for i in range(0, 6): raw_msg_crc[i] = chr(int(parts[i + 2], 16)) for i in range(6, 8): raw_msg_crc[i] = chr(int(parts[i + 4], 16)) Meteostick._check_crc(raw_msg_crc, chksum) data['channel'] = (pkt[0] & 0x7) + 1 battery_low = (pkt[0] >> 3) & 0x1 data['rf_signal'] = int(parts[13]) time_since_last = int(parts[14]) # the cyclus time varies from 2.5 to 3 seconds for channels 1 to 8 # simplifiy calculation with max cyclus time of 3.0 seconds data['rf_missed'] = (time_since_last // 2500000) - 1 if data['rf_missed'] > 0: dbg_parse(3, "channel %s missed %s" % (data['channel'], data['rf_missed'])) if data['channel'] == iss_ch or data['channel'] == wind_ch \ or data['channel'] == th1_ch or data['channel'] == th2_ch: if data['channel'] == iss_ch: data['bat_iss'] = battery_low elif data['channel'] == wind_ch: data['bat_anemometer'] = battery_low elif data['channel'] == th1_ch: data['bat_th_1'] = battery_low else: data['bat_th_2'] = battery_low # Wind data processing wind_speed_raw = pkt[1] wind_dir_raw = pkt[2] if not(wind_speed_raw == 0 and wind_dir_raw == 0): dbg_parse(3, "wind_speed_raw=%03x wind_dir_raw=0x%03x" % (wind_speed_raw, wind_dir_raw)) # Vantage Pro and Pro2 if wind_dir_raw == 0: wind_dir_pro = 5.0 elif wind_dir_raw == 255: wind_dir_pro = 355.0 else: wind_dir_pro = 9.0 + (wind_dir_raw - 1) * 342.0 / 253.0 # Vantage Vue wind_dir_vue = wind_dir_raw * 1.40625 + 0.3 data['wind_speed_raw'] = wind_speed_raw data['wind_dir'] = wind_dir_pro data['wind_speed'] = wind_speed_raw * MPH_TO_MPS dbg_parse(3, "WS=%s WD=%s WS_raw=%s WD_raw=%s WD_pro=%s WD_vue=%s" % (data['wind_speed'], data['wind_dir'], wind_speed_raw, wind_dir_raw if wind_dir_raw <= 180 else 360 - wind_dir_raw, wind_dir_pro, wind_dir_vue)) # data from both iss sensors and extra sensors on # Anemometer Transport Kit message_type = (pkt[0] >> 4 & 0xF) # Process different message types (simplified for reader) if message_type == 8: # outside temperature temp_raw = (pkt[3] << 4) + (pkt[4] >> 4) # 12-bits temp value if temp_raw != 0xFFC and temp_raw != 0xFF8: if pkt[4] & 0x8: # digital temp sensor - value is twos-complement if pkt[3] & 0x80 != 0: temp_f = -(temp_raw ^ 0xFFF) / 10.0 else: temp_f = temp_raw / 10.0 temp_c = FtoC(temp_f) dbg_parse(3, "digital temp_raw=0x%03x temp_f=%s temp_c=%s" % (temp_raw, temp_f, temp_c)) else: # analog sensor (thermistor) temp_raw = temp_raw // 4 # 10-bits temp value temp_c = calculate_thermistor_temp(temp_raw) dbg_parse(3, "thermistor temp_raw=0x%03x temp_c=%s" % (temp_raw, temp_c)) if data['channel'] == th1_ch: data['temp_1'] = temp_c elif data['channel'] == th2_ch: data['temp_2'] = temp_c elif data['channel'] == wind_ch: data['temp_3'] = temp_c else: data['temperature'] = temp_c elif message_type == 0xA: # outside humidity humidity_raw = ((pkt[4] >> 4) << 8) + pkt[3] if humidity_raw != 0: if pkt[4] & 0x08 == 0x8: # digital sensor humidity = humidity_raw / 10.0 else: # analog sensor (pkt[4] & 0x0f == 0x5) humidity = humidity_raw * -0.301 + 710.23 if data['channel'] == th1_ch: data['humid_1'] = humidity elif data['channel'] == th2_ch: data['humid_2'] = humidity elif data['channel'] == wind_ch: loginf("Warning: humidity sensor of Anemometer Transmitter Kit not in sensor map: %s" % humidity) else: data['humidity'] = humidity dbg_parse(3, "humidity_raw=0x%03x value=%s" % (humidity_raw, humidity)) elif message_type == 0xE: # rain rain_count_raw = pkt[3] if rain_count_raw != 0x80: rain_count = rain_count_raw & 0x7F # skip high bit data['rain_count'] = rain_count dbg_parse(3, "rain_count_raw=0x%02x value=%s" % (rain_count_raw, rain_count)) elif parts[0] == '#': loginf("%s" % raw) else: logerr("unknown sensor identifier '%s' in %s" % (parts[0], raw)) return data def main(): usage = """%prog [options] [--help] Read and display messages from a Meteostick device.""" parser = optparse.OptionParser(usage=usage) parser.add_option('--port', dest='port', metavar='PORT', help='serial port to which the meteostick is connected', default=Meteostick.DEFAULT_PORT) parser.add_option('--baud', dest='baud', metavar='BAUDRATE', type=int, help='serial port baud rate', default=Meteostick.DEFAULT_BAUDRATE) parser.add_option('--freq', dest='freq', metavar='FREQUENCY', help='comm frequency: US (915MHz), EU (868MHz), or AU (915MHz)', default=Meteostick.DEFAULT_FREQUENCY) parser.add_option('--rfs', dest='rfs', metavar='RF_SENSITIVITY', type=int, help='RF sensitivity (0-125, default 90)', default=Meteostick.DEFAULT_RF_SENSITIVITY) parser.add_option('--iss-channel', dest='c_iss', metavar='ISS_CHANNEL', type=int, help='channel for ISS (1-8)', default=1) parser.add_option('--anemometer-channel', dest='c_anem', metavar='ANEM_CHANNEL', type=int, help='channel for anemometer (0=none, 1-8)', default=0) parser.add_option('--leaf-soil-channel', dest='c_ls', metavar='LS_CHANNEL', type=int, help='channel for leaf-soil station (0=none, 1-8)', default=0) parser.add_option('--th1-channel', dest='c_th1', metavar='TH1_CHANNEL', type=int, help='channel for T/H sensor 1 (0=none, 1-8)', default=0) parser.add_option('--th2-channel', dest='c_th2', metavar='TH2_CHANNEL', type=int, help='channel for T/H sensor 2 (0=none, 1-8)', default=0) parser.add_option('--rain-bucket', dest='bucket', metavar='BUCKET_TYPE', type=int, help='rain bucket type: 0=0.01in, 1=0.2mm (default 1)', default=1) (opts, args) = parser.parse_args() print("Meteostick Reader") print("================") print(f"Port: {opts.port}") print(f"Baudrate: {opts.baud}") print(f"Frequency: {opts.freq}") print(f"RF Sensitivity: {opts.rfs}") print(f"ISS Channel: {opts.c_iss}") print(f"Anemometer Channel: {opts.c_anem}") print(f"Leaf/Soil Channel: {opts.c_ls}") print(f"T/H 1 Channel: {opts.c_th1}") print(f"T/H 2 Channel: {opts.c_th2}") print(f"Rain bucket type: {opts.bucket}") print() # Calculate rain per tip based on bucket type rain_per_tip = 0.254 if opts.bucket == 0 else 0.2 # mm try: # Create and configure the Meteostick station = Meteostick( port=opts.port, baudrate=opts.baud, transceiver_frequency=opts.freq, rf_sensitivity=opts.rfs, iss_channel=opts.c_iss, anemometer_channel=opts.c_anem, leaf_soil_channel=opts.c_ls, temp_hum_1_channel=opts.c_th1, temp_hum_2_channel=opts.c_th2 ) print("Opening connection...") station.open() print("Resetting device...") reset_info = station.reset() print(f"Reset response: {reset_info.strip()}") print("Configuring device...") station.configure() print("\nListening for messages (Press Ctrl+C to exit)...") print("=" * 60) message_count = 0 while True: try: # Get raw reading raw_reading = station.get_readings() if raw_reading: message_count += 1 timestamp = time.strftime("%Y-%m-%d %H:%M:%S") print(f"[{timestamp}] Message #{message_count}: {raw_reading}") # Parse the reading for additional info try: parsed_data = station.parse_readings(raw_reading, rain_per_tip) if parsed_data: print(f" Parsed data: {parsed_data}") except Exception as e: print(f" Parse error: {e}") print() else: # No data received, small delay time.sleep(0.1) except KeyboardInterrupt: print("\nShutting down...") break except Exception as e: print(f"Error reading data: {e}") time.sleep(1) except Exception as e: print(f"Error: {e}") return 1 finally: try: station.close() print("Connection closed.") except: pass print(f"Total messages received: {message_count}") return 0 if __name__ == '__main__': sys.exit(main())