power: add Shelly device support

Signed-off-by: Alex Zellner <alexander.zellner@googlemali.com>
This commit is contained in:
zellneralex 2021-02-07 11:02:09 +01:00 committed by Arksine
parent dd81beab47
commit 0fcae5b6e4
2 changed files with 83 additions and 0 deletions

View File

@ -192,6 +192,17 @@ output_id:
# If your single-relay Tasmota device switches on/off successfully, # If your single-relay Tasmota device switches on/off successfully,
# but fails to report its state, ensure that 'SetOption26' is set in # but fails to report its state, ensure that 'SetOption26' is set in
# Tasmota. # Tasmota.
address:
user:
password:
output_id:
# The above options are used for "shelly" devices. The
# address should be a valid ip or hostname for the Shelly device.
# Provide a user and password if configured in Shelly (default is empty).
# If password is set but user is empty the default user "admin" will be used
# Provided an output_id (relay id) if the Shelly device supports
# more than one (default is 0).
``` ```
Below are some potential examples: Below are some potential examples:
@ -222,6 +233,12 @@ address: 192.168.1.123
type: tasmota type: tasmota
address: 192.168.1.124 address: 192.168.1.124
password: password1 password: password1
[power shelly_plug]
type: shelly
address: 192.168.1.125
user: user2
password: password2
``` ```
It is possible to toggle device power from the Klippy host, this can be done It is possible to toggle device power from the Klippy host, this can be done

View File

@ -34,6 +34,8 @@ class PrinterPower:
dev = TPLinkSmartPlug(cfg) dev = TPLinkSmartPlug(cfg)
elif dev_type == "tasmota": elif dev_type == "tasmota":
dev = Tasmota(cfg) dev = Tasmota(cfg)
elif dev_type == "shelly":
dev = Shelly(cfg)
else: else:
raise config.error(f"Unsupported Device Type: {dev_type}") raise config.error(f"Unsupported Device Type: {dev_type}")
self.devices[dev.get_name()] = dev self.devices[dev.get_name()] = dev
@ -455,6 +457,70 @@ class Tasmota(PowerDevice):
raise self.server.error(msg) from None raise self.server.error(msg) from None
self.state = state self.state = state
class Shelly(PowerDevice):
def __init__(self, config):
super().__init__(config)
self.server = config.get_server()
self.addr = config.get("address")
self.output_id = config.getint("output_id", 0)
self.user = config.get("user", "admin")
self.password = config.get("password", "")
async def _send_shelly_command(self, command):
if command in ["on", "off"]:
out_cmd = f"relay/{self.output_id}?turn={command}"
elif command == "info":
out_cmd = f"relay/{self.output_id}"
else:
raise self.server.error(f"Invalid shelly command: {command}")
if self.password != "":
out_pwd = f"{self.user}:{self.password}@"
else:
out_pwd = f""
url = f"http://{out_pwd}{self.addr}/{out_cmd}"
data = ""
http_client = AsyncHTTPClient()
try:
response = await http_client.fetch(url)
data = json_decode(response.body)
except Exception:
msg = f"Error sending shelly command: {command}"
logging.exception(msg)
raise self.server.error(msg)
return data
async def initialize(self):
await self.refresh_status()
def get_device_info(self):
return {
**super().get_device_info(),
'type': "shelly"
}
async def refresh_status(self):
try:
res = await self._send_shelly_command("info")
state = res[f"ison"]
except Exception:
self.state = "error"
msg = f"Error Refeshing Device Status: {self.name}"
logging.exception(msg)
raise self.server.error(msg) from None
self.state = "on" if state else "off"
async def set_power(self, state):
try:
res = await self._send_shelly_command(state)
state = res[f"ison"]
except Exception:
self.state = "error"
msg = f"Error Setting Device Status: {self.name} to {state}"
logging.exception(msg)
raise self.server.error(msg) from None
self.state = "on" if state else "off"
# The power plugin has multiple configuration sections # The power plugin has multiple configuration sections
def load_plugin_multi(config): def load_plugin_multi(config):
return PrinterPower(config) return PrinterPower(config)