sensor: minor refactoring

Remove dataclasses, refactor for consistency with
other components.

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2023-12-20 18:17:25 -05:00
parent 8d59c424d7
commit 4b1a3b8792
1 changed files with 20 additions and 50 deletions

View File

@ -10,7 +10,6 @@ from __future__ import annotations
import logging import logging
from collections import defaultdict, deque from collections import defaultdict, deque
from dataclasses import dataclass, replace
from functools import partial from functools import partial
from ..common import RequestType from ..common import RequestType
@ -35,15 +34,6 @@ if TYPE_CHECKING:
SENSOR_UPDATE_TIME = 1.0 SENSOR_UPDATE_TIME = 1.0
SENSOR_EVENT_NAME = "sensors:sensor_update" SENSOR_EVENT_NAME = "sensors:sensor_update"
@dataclass(frozen=True)
class SensorConfiguration:
id: str
name: str
type: str
source: str = ""
def _set_result( def _set_result(
name: str, value: Union[int, float], store: Dict[str, Union[int, float]] name: str, value: Union[int, float], store: Dict[str, Union[int, float]]
) -> None: ) -> None:
@ -53,24 +43,16 @@ def _set_result(
store[name] = value store[name] = value
@dataclass(frozen=True)
class Sensor:
config: SensorConfiguration
values: Dict[str, Deque[Union[int, float]]]
class BaseSensor: class BaseSensor:
def __init__(self, name: str, cfg: ConfigHelper, store_size: int = 1200) -> None: def __init__(self, config: ConfigHelper) -> None:
self.server = cfg.get_server() self.server = config.get_server()
self.error_state: Optional[str] = None self.error_state: Optional[str] = None
self.id = config.get_name().split(maxsplit=1)[-1]
self.config = SensorConfiguration( self.type = config.get("type")
id=name, self.name = config.get("name", self.id)
type=cfg.get("type"),
name=cfg.get("name", name),
)
self.last_measurements: Dict[str, Union[int, float]] = {} self.last_measurements: Dict[str, Union[int, float]] = {}
self.last_value: Dict[str, Union[int, float]] = {} self.last_value: Dict[str, Union[int, float]] = {}
store_size = config.getint("sensor_store_size", 1200)
self.values: DefaultDict[str, Deque[Union[int, float]]] = defaultdict( self.values: DefaultDict[str, Deque[Union[int, float]]] = defaultdict(
lambda: deque(maxlen=store_size) lambda: deque(maxlen=store_size)
) )
@ -89,14 +71,14 @@ class BaseSensor:
""" """
Sensor initialization executed on Moonraker startup. Sensor initialization executed on Moonraker startup.
""" """
logging.info("Registered sensor '%s'", self.config.name) logging.info("Registered sensor '%s'", self.name)
return True return True
def get_sensor_info(self) -> Dict[str, Any]: def get_sensor_info(self) -> Dict[str, Any]:
return { return {
"id": self.config.id, "id": self.id,
"friendly_name": self.config.name, "friendly_name": self.name,
"type": self.config.type, "type": self.type,
"values": self.last_measurements, "values": self.last_measurements,
} }
@ -104,22 +86,19 @@ class BaseSensor:
return {key: list(values) for key, values in self.values.items()} return {key: list(values) for key, values in self.values.items()}
def get_name(self) -> str: def get_name(self) -> str:
return self.config.name return self.name
def close(self) -> None: def close(self) -> None:
pass pass
class MQTTSensor(BaseSensor): class MQTTSensor(BaseSensor):
def __init__(self, name: str, cfg: ConfigHelper, store_size: int = 1200): def __init__(self, config: ConfigHelper) -> None:
super().__init__(name=name, cfg=cfg) super().__init__(config=config)
self.mqtt: MQTTClient = self.server.load_component(cfg, "mqtt") self.mqtt: MQTTClient = self.server.load_component(config, "mqtt")
self.state_topic: str = config.get("state_topic")
self.state_topic: str = cfg.get("state_topic") self.state_response = config.gettemplate("state_response_template")
self.state_response = cfg.gettemplate("state_response_template") self.qos: Optional[int] = config.getint("qos", None, minval=0, maxval=2)
self.config = replace(self.config, source=self.state_topic)
self.qos: Optional[int] = cfg.getint("qos", None, minval=0, maxval=2)
self.server.register_event_handler( self.server.register_event_handler(
"mqtt:disconnected", self._on_mqtt_disconnected "mqtt:disconnected", self._on_mqtt_disconnected
) )
@ -141,7 +120,7 @@ class MQTTSensor(BaseSensor):
self.last_measurements = measurements self.last_measurements = measurements
logging.debug( logging.debug(
"Received updated sensor value for %s: %s", "Received updated sensor value for %s: %s",
self.config.name, self.name,
self.last_measurements, self.last_measurements,
) )
@ -169,8 +148,6 @@ class Sensors:
def __init__(self, config: ConfigHelper) -> None: def __init__(self, config: ConfigHelper) -> None:
self.server = config.get_server() self.server = config.get_server()
self.store_size = config.getint("sensor_store_size", 1200)
prefix_sections = config.get_prefix_sections("sensor")
self.sensors: Dict[str, BaseSensor] = {} self.sensors: Dict[str, BaseSensor] = {}
# Register timer to update sensor values in store # Register timer to update sensor values in store
@ -197,18 +174,15 @@ class Sensors:
# Register notifications # Register notifications
self.server.register_notification(SENSOR_EVENT_NAME) self.server.register_notification(SENSOR_EVENT_NAME)
prefix_sections = config.get_prefix_sections("sensor ")
for section in prefix_sections: for section in prefix_sections:
cfg = config[section] cfg = config[section]
try: try:
try: try:
_, name = cfg.get_name().split(maxsplit=1) _, name = cfg.get_name().split(maxsplit=1)
except ValueError: except ValueError:
raise cfg.error(f"Invalid section name: {cfg.get_name()}") raise cfg.error(f"Invalid section name: {cfg.get_name()}")
logging.info(f"Configuring sensor: {name}") logging.info(f"Configuring sensor: {name}")
sensor_type: str = cfg.get("type") sensor_type: str = cfg.get("type")
sensor_class: Optional[Type[BaseSensor]] = self.__sensor_types.get( sensor_class: Optional[Type[BaseSensor]] = self.__sensor_types.get(
sensor_type.upper(), None sensor_type.upper(), None
@ -216,11 +190,7 @@ class Sensors:
if sensor_class is None: if sensor_class is None:
raise config.error(f"Unsupported sensor type: {sensor_type}") raise config.error(f"Unsupported sensor type: {sensor_type}")
self.sensors[name] = sensor_class( self.sensors[name] = sensor_class(cfg)
name=name,
cfg=cfg,
store_size=self.store_size,
)
except Exception as e: except Exception as e:
# Ensures that configuration errors are shown to the user # Ensures that configuration errors are shown to the user
self.server.add_warning( self.server.add_warning(