power: add support for gcode_macro Klipper devices

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2022-02-02 15:42:41 -05:00
parent aca7cb78b9
commit 0b13e2da44
1 changed files with 112 additions and 49 deletions

View File

@ -32,7 +32,6 @@ if TYPE_CHECKING:
from .machine import Machine from .machine import Machine
from . import klippy_apis from . import klippy_apis
from .mqtt import MQTTClient from .mqtt import MQTTClient
from .template import TemplateFactory
from .template import JinjaTemplate from .template import JinjaTemplate
APIComp = klippy_apis.KlippyAPI APIComp = klippy_apis.KlippyAPI
@ -516,32 +515,40 @@ class GpioDevice(PowerDevice):
self.timer_handle = None self.timer_handle = None
class KlipperDevice(PowerDevice): class KlipperDevice(PowerDevice):
def __init__(self, def __init__(self, config: ConfigHelper) -> None:
config: ConfigHelper, super().__init__(config)
initial_val: Optional[int] = None if self.off_when_shutdown:
) -> None:
if config.getboolean('off_when_shutdown', None) is not None:
raise config.error( raise config.error(
"Option 'off_when_shutdown' in section " "Option 'off_when_shutdown' in section "
f"[{config.get_name()}] is unsupported for 'klipper_device'") f"[{config.get_name()}] is unsupported for 'klipper_device'")
if config.getboolean('klipper_restart', None) is not None: if self.klipper_restart:
raise config.error( raise config.error(
"Option 'klipper_restart' in section " "Option 'restart_klipper_when_powered' in section "
f"[{config.get_name()}] is unsupported for 'klipper_device'") f"[{config.get_name()}] is unsupported for 'klipper_device'")
super().__init__(config) if (
self.off_when_shutdown = False self.bound_service is not None and
self.klipper_restart = False self.bound_service.startswith("klipper")
self.timer: Optional[float] = config.getfloat('timer', None) ):
if self.timer is not None and self.timer < 0.000001: # Klipper devices cannot be bound to an instance of klipper or
# klipper_mcu
raise config.error( raise config.error(
f"Option 'timer' in section [{config.get_name()}] must " f"Option 'bound_service' cannot be set to {self.bound_service}"
"be above 0.0") f" for 'klipper_device' [{config.get_name()}]")
self.is_shutdown: bool = False
self.update_fut: Optional[asyncio.Future] = None
self.request_mutex = asyncio.Lock()
self.timer: Optional[float] = config.getfloat(
'timer', None, above=0.000001)
self.timer_handle: Optional[asyncio.TimerHandle] = None self.timer_handle: Optional[asyncio.TimerHandle] = None
self.object_name = config.get('object_name', '') self.object_name = config.get('object_name')
if not self.object_name.startswith("output_pin "): obj_parts = self.object_name.split()
self.gc_cmd = f"SET_PIN PIN={obj_parts[-1]} "
if obj_parts[0] == "gcode_macro":
self.gc_cmd = obj_parts[-1]
elif obj_parts[0] != "output_pin":
raise config.error( raise config.error(
"Currently only Klipper 'output_pin' objects supported for " "Klipper object must be either 'output_pin' or 'gcode_macro' "
f"'object_name' in section [{config.get_name()}]") f"for option 'object_name' in section [{config.get_name()}]")
self.server.register_event_handler( self.server.register_event_handler(
"server:status_update", self._status_update) "server:status_update", self._status_update)
@ -553,53 +560,104 @@ class KlipperDevice(PowerDevice):
def _status_update(self, data: Dict[str, Any]) -> None: def _status_update(self, data: Dict[str, Any]) -> None:
self._set_state_from_data(data) self._set_state_from_data(data)
def get_device_info(self) -> Dict[str, Any]:
dev_info = super().get_device_info()
dev_info['is_shutdown'] = self.is_shutdown
return dev_info
async def _handle_ready(self) -> None: async def _handle_ready(self) -> None:
kapis: APIComp = self.server.lookup_component('klippy_apis') kapis: APIComp = self.server.lookup_component('klippy_apis')
sub: Dict[str, Optional[List[str]]] = {self.object_name: None} sub: Dict[str, Optional[List[str]]] = {self.object_name: None}
try: data = await kapis.subscribe_objects(sub, None)
data = await kapis.subscribe_objects(sub) if not self._validate_data(data):
self.state == "error"
else:
assert data is not None
self._set_state_from_data(data) self._set_state_from_data(data)
except self.server.error as e:
logging.info(f"Error subscribing to {self.object_name}")
async def _handle_disconnect(self) -> None: async def _handle_disconnect(self) -> None:
self.is_shutdown = False
self._set_state("init") self._set_state("init")
self._reset_timer()
def process_klippy_shutdown(self) -> None: def process_klippy_shutdown(self) -> None:
self._set_state("init") self.is_shutdown = True
self._set_state("error")
self._reset_timer()
def refresh_status(self) -> None: async def refresh_status(self) -> None:
pass if self.is_shutdown or self.state in ["on", "off", "init"]:
return
async with self.request_mutex:
kapis: APIComp = self.server.lookup_component('klippy_apis')
req: Dict[str, Optional[List[str]]] = {self.object_name: None}
data: Optional[Dict[str, Any]]
data = await kapis.query_objects(req, None)
if not self._validate_data(data):
self.state = "error"
else:
assert data is not None
self._set_state_from_data(data)
async def set_power(self, state) -> None: async def set_power(self, state: str) -> None:
if self.timer_handle is not None: if self.is_shutdown:
self.timer_handle.cancel() raise self.server.error(
self.timer_handle = None f"Power Device {self.name}: Cannot set power for device "
f"when Klipper is shutdown")
async with self.request_mutex:
self._reset_timer()
eventloop = self.server.get_event_loop()
self.update_fut = eventloop.create_future()
try: try:
kapis: APIComp = self.server.lookup_component('klippy_apis') kapis: APIComp = self.server.lookup_component('klippy_apis')
object_name = self.object_name[len("output_pin "):] value = "1" if state == "on" else "0"
object_name_value = "1" if state == "on" else "0" await kapis.run_gcode(f"{self.gc_cmd} VALUE={value}")
await kapis.run_gcode( await asyncio.wait_for(self.update_fut, 1.)
f"SET_PIN PIN={object_name} VALUE={object_name_value}") except TimeoutError:
self.state = "error"
raise self.server.error(
f"Power device {self.name}: Timeout "
"waiting for device state update")
except Exception: except Exception:
self.state = "error" self.state = "error"
msg = f"Error Toggling Device Power: {self.name}" msg = f"Error Toggling Device Power: {self.name}"
logging.exception(msg) logging.exception(msg)
raise self.server.error(msg) from None raise self.server.error(msg) from None
self.state = state finally:
self.update_fut = None
self._check_timer() self._check_timer()
def _set_state_from_data(self, data) -> None: def _validate_data(self, data: Optional[Dict[str, Any]]) -> bool:
if data is None:
logging.info("Error querying klipper object: "
f"{self.object_name}")
elif self.object_name not in data:
logging.info(
f"[power]: Invalid Klipper Device {self.object_name}, "
f"no response returned from subscription.")
elif 'value' not in data[self.object_name]:
logging.info(
f"[power]: Invalid Klipper Device {self.object_name}, "
f"response does not contain a 'value' parameter")
else:
return True
return False
def _set_state_from_data(self, data: Dict[str, Any]) -> None:
if self.object_name not in data: if self.object_name not in data:
return return
is_on = data.get(self.object_name, {}).get('value', 0.0) > 0.0 value = data[self.object_name].get('value')
state = "on" if is_on else "off" if value is not None:
state = "on" if value else "off"
self._set_state(state) self._set_state(state)
if self.update_fut is not None:
self.update_fut.set_result(state)
def _set_state(self, state) -> None: def _set_state(self, state: str) -> None:
in_event = self.update_fut is not None
last_state = self.state last_state = self.state
self.state = state self.state = state
if last_state != state: if last_state != state and not in_event:
self.notify_power_changed() self.notify_power_changed()
def _check_timer(self): def _check_timer(self):
@ -609,6 +667,11 @@ class KlipperDevice(PowerDevice):
self.timer_handle = event_loop.delay_callback( self.timer_handle = event_loop.delay_callback(
self.timer, power.set_device_power, self.name, "off") self.timer, power.set_device_power, self.name, "off")
def _reset_timer(self):
if self.timer_handle is not None:
self.timer_handle.cancel()
self.timer_handle = None
def close(self) -> None: def close(self) -> None:
if self.timer_handle is not None: if self.timer_handle is not None:
self.timer_handle.cancel() self.timer_handle.cancel()