power: register "set_device_power" remote method

This allows device power to be toggled from a klipper gcode macro.

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Arksine 2020-11-01 11:44:54 -05:00
parent 6464bbfc3c
commit 59d27e6829
1 changed files with 38 additions and 27 deletions

View File

@ -24,6 +24,8 @@ class PrinterPower:
self.server.register_endpoint( self.server.register_endpoint(
"/machine/gpio_power/off", ['POST'], "/machine/gpio_power/off", ['POST'],
self._handle_power_request) self._handle_power_request)
self.server.register_remote_method(
"set_device_power", self.set_device_power)
self.current_dev = None self.current_dev = None
self.devices = {} self.devices = {}
@ -61,42 +63,37 @@ class PrinterPower:
return "no_devices" return "no_devices"
result = {} result = {}
req = path.split("/")[-1]
for dev in args: for dev in args:
if dev not in self.devices: if path.startswith("/machine/gpio_power/"):
res = await self._power_dev(dev, req)
if res:
result[dev] = self.devices[dev]["status"]
else:
result[dev] = "device_not_found" result[dev] = "device_not_found"
continue else:
raise self.server.error("Unsupported power request")
return result
async def _power_dev(self, dev, req):
if dev not in self.devices:
return False
await GPIO.verify_pin(self.devices[dev]["pin"], await GPIO.verify_pin(self.devices[dev]["pin"],
self.devices[dev]["active_low"]) self.devices[dev]["active_low"])
if path == "/machine/gpio_power/on": if req in ["on", "off"]:
await self._power_dev(dev, "on") val = 1 if req == "on" else 0
elif path == "/machine/gpio_power/off": GPIO.set_pin_value(self.devices[dev]["pin"], val)
await self._power_dev(dev, "off") self.server.send_event("gpio_power:power_changed", {
elif path != "/machine/gpio_power/status": "device": dev,
"status": req
})
elif req != "status":
raise self.server.error("Unsupported power request") raise self.server.error("Unsupported power request")
self.devices[dev]["status"] = GPIO.is_pin_on( self.devices[dev]["status"] = GPIO.is_pin_on(
self.devices[dev]["pin"]) self.devices[dev]["pin"])
return True
result[dev] = self.devices[dev]["status"]
return result
async def _power_dev(self, dev, status):
if dev not in self.devices:
return
if (status == "on"):
GPIO.set_pin_value(self.devices[dev]["pin"], 1)
self.server.send_event("gpio_power:power_changed", {
"device": dev,
"status": "on"
})
elif (status == "off"):
GPIO.set_pin_value(self.devices[dev]["pin"], 0)
self.server.send_event("gpio_power:power_changed", {
"device": dev,
"status": "off"
})
async def initialize_devices(self, devices): async def initialize_devices(self, devices):
for name, device in devices.items(): for name, device in devices.items():
@ -112,6 +109,20 @@ class PrinterPower:
continue continue
self.devices[name] = device self.devices[name] = device
def set_device_power(self, device, state):
status = None
if isinstance(state, bool):
status = "on" if state else "off"
elif isinstance(state, str):
status = state.lower()
if status in ["true", "false"]:
status = "on" if status == "true" else "off"
if status not in ["on", "off"]:
logging.info(f"Invalid state received: {state}")
return
ioloop = IOLoop.current()
ioloop.spawn_callback(self._power_dev, device, status)
class GPIO: class GPIO:
gpio_root = "/sys/class/gpio" gpio_root = "/sys/class/gpio"