power: implement an HTTPDevice base class

This allows for some additional code reuse that is duplicated amongst several HTTP Device implementations.
This commit is contained in:
Arksine 2021-05-10 07:22:57 -04:00
parent 8e6bb34d1b
commit 724c29c4a7
1 changed files with 110 additions and 192 deletions

View File

@ -199,6 +199,7 @@ class PowerDevice:
raise config.error(f"Invalid Section Name: {config.get_name()}") raise config.error(f"Invalid Section Name: {config.get_name()}")
self.server = config.get_server() self.server = config.get_server()
self.name = name_parts[1] self.name = name_parts[1]
self.type = config.get('type')
self.state = "init" self.state = "init"
self.locked_while_printing = config.getboolean( self.locked_while_printing = config.getboolean(
'locked_while_printing', False) 'locked_while_printing', False)
@ -218,7 +219,8 @@ class PowerDevice:
return { return {
'device': self.name, 'device': self.name,
'status': self.state, 'status': self.state,
'locked_while_printing': self.locked_while_printing 'locked_while_printing': self.locked_while_printing,
'type': self.type
} }
def get_locked_while_printing(self): def get_locked_while_printing(self):
@ -231,6 +233,58 @@ class PowerDevice:
ioloop.call_later(self.restart_delay, klippy_apis.do_restart, ioloop.call_later(self.restart_delay, klippy_apis.do_restart,
"FIRMWARE_RESTART") "FIRMWARE_RESTART")
class HTTPDevice(PowerDevice):
def __init__(self, config, default_port=None,
default_user=None, default_password=None):
super().__init__(config)
self.client = AsyncHTTPClient()
self.addr = config.get("address")
self.port = config.getint("port", default_port)
self.user = config.get("user", default_user)
self.password = config.get("password", default_password)
async def initialize(self):
await self.refresh_status()
async def _send_http_command(self, url, command):
try:
response = await self.client.fetch(url)
data = json_decode(response.body)
except Exception:
msg = f"Error sending '{self.type}' command: {command}"
logging.exception(msg)
raise self.server.error(msg)
return data
async def _send_power_request(self, state):
raise NotImplementedError(
"_send_power_request must be implemented by children")
async def _send_status_request(self):
raise NotImplementedError(
"_send_status_request must be implemented by children")
async def refresh_status(self):
try:
state = await self._send_status_request()
except Exception:
self.state = "error"
msg = f"Error Refeshing Device Status: {self.name}"
logging.exception(msg)
raise self.server.error(msg) from None
self.state = state
async def set_power(self, state):
try:
state = await self._send_power_request(state)
except Exception:
self.state = "error"
msg = f"Error Setting Device Status: {self.name} to {state}"
logging.exception(msg)
raise self.server.error(msg) from None
self.state = state
class GpioChipFactory: class GpioChipFactory:
def __init__(self): def __init__(self):
self.chips = {} self.chips = {}
@ -293,12 +347,6 @@ class GpioDevice(PowerDevice):
def initialize(self): def initialize(self):
self.set_power("on" if self.initial_state else "off") self.set_power("on" if self.initial_state else "off")
def get_device_info(self):
return {
**super().get_device_info(),
'type': "gpio"
}
def refresh_status(self): def refresh_status(self):
try: try:
val = self.line.get_value() val = self.line.get_value()
@ -399,12 +447,6 @@ class TPLinkSmartPlug(PowerDevice):
async def initialize(self): async def initialize(self):
await self.refresh_status() await self.refresh_status()
def get_device_info(self):
return {
**super().get_device_info(),
'type': "tplink_smartplug"
}
async def refresh_status(self): async def refresh_status(self):
try: try:
res = await self._send_tplink_command("info") res = await self._send_tplink_command("info")
@ -435,12 +477,10 @@ class TPLinkSmartPlug(PowerDevice):
self.state = state self.state = state
class Tasmota(PowerDevice): class Tasmota(HTTPDevice):
def __init__(self, config): def __init__(self, config):
super().__init__(config) super().__init__(config, default_password="")
self.addr = config.get("address")
self.output_id = config.getint("output_id", 1) self.output_id = config.getint("output_id", 1)
self.password = config.get("password", "")
self.timer = config.get("timer", "") self.timer = config.get("timer", "")
async def _send_tasmota_command(self, command, password=None): async def _send_tasmota_command(self, command, password=None):
@ -455,28 +495,9 @@ class Tasmota(PowerDevice):
url = f"http://{self.addr}/cm?user=admin&password=" \ url = f"http://{self.addr}/cm?user=admin&password=" \
f"{self.password}&cmnd={out_cmd}" f"{self.password}&cmnd={out_cmd}"
data = "" return await self._send_http_command(url, command)
http_client = AsyncHTTPClient()
try:
response = await http_client.fetch(url)
data = json_decode(response.body)
except Exception:
msg = f"Error sending tplink command: {command}"
logging.exception(msg)
raise self.server.error(msg)
return data
async def initialize(self): async def _send_status_request(self):
await self.refresh_status()
def get_device_info(self):
return {
**super().get_device_info(),
'type': "tasmota"
}
async def refresh_status(self):
try:
res = await self._send_tasmota_command("info") res = await self._send_tasmota_command("info")
try: try:
state = res[f"POWER{self.output_id}"].lower() state = res[f"POWER{self.output_id}"].lower()
@ -485,15 +506,9 @@ class Tasmota(PowerDevice):
state = res[f"POWER"].lower() state = res[f"POWER"].lower()
else: else:
raise KeyError(e) raise KeyError(e)
except Exception: return state
self.state = "error"
msg = f"Error Refeshing Device Status: {self.name}"
logging.exception(msg)
raise self.server.error(msg) from None
self.state = state
async def set_power(self, state): async def _send_power_request(self, state):
try:
res = await self._send_tasmota_command(state) res = await self._send_tasmota_command(state)
if self.timer == "" or state != "off": if self.timer == "" or state != "off":
try: try:
@ -503,21 +518,13 @@ class Tasmota(PowerDevice):
state = res[f"POWER"].lower() state = res[f"POWER"].lower()
else: else:
raise KeyError(e) raise KeyError(e)
except Exception: return state
self.state = "error"
msg = f"Error Setting Device Status: {self.name} to {state}"
logging.exception(msg)
raise self.server.error(msg) from None
self.state = state
class Shelly(PowerDevice): class Shelly(HTTPDevice):
def __init__(self, config): def __init__(self, config):
super().__init__(config) super().__init__(config, default_user="admin", default_password="")
self.addr = config.get("address")
self.output_id = config.getint("output_id", 0) self.output_id = config.getint("output_id", 0)
self.user = config.get("user", "admin")
self.password = config.get("password", "")
self.timer = config.get("timer", "") self.timer = config.get("timer", "")
async def _send_shelly_command(self, command): async def _send_shelly_command(self, command):
@ -537,113 +544,49 @@ class Shelly(PowerDevice):
else: else:
out_pwd = f"" out_pwd = f""
url = f"http://{out_pwd}{self.addr}/{out_cmd}" url = f"http://{out_pwd}{self.addr}/{out_cmd}"
data = "" return await self._send_http_command(url, command)
http_client = AsyncHTTPClient()
try:
response = await http_client.fetch(url)
data = json_decode(response.body)
except Exception:
msg = f"Error sending shelly command: {command}"
logging.exception(msg)
raise self.server.error(msg)
return data
async def initialize(self): async def _send_status_request(self):
await self.refresh_status()
def get_device_info(self):
return {
**super().get_device_info(),
'type': "shelly"
}
async def refresh_status(self):
try:
res = await self._send_shelly_command("info") res = await self._send_shelly_command("info")
state = res[f"ison"] state = res[f"ison"]
timer_remaining = res[f"timer_remaining"] if self.timer != "" else 0 timer_remaining = res[f"timer_remaining"] if self.timer != "" else 0
except Exception: return "on" if state and timer_remaining == 0 else "off"
self.state = "error"
msg = f"Error Refeshing Device Status: {self.name}"
logging.exception(msg)
raise self.server.error(msg) from None
self.state = "on" if state and timer_remaining == 0 else "off"
async def set_power(self, state): async def _send_power_request(self, state):
try:
res = await self._send_shelly_command(state) res = await self._send_shelly_command(state)
state = res[f"ison"] state = res[f"ison"]
timer_remaining = res[f"timer_remaining"] if self.timer != "" else 0 timer_remaining = res[f"timer_remaining"] if self.timer != "" else 0
except Exception: return "on" if state and timer_remaining == 0 else "off"
self.state = "error"
msg = f"Error Setting Device Status: {self.name} to {state}"
logging.exception(msg)
raise self.server.error(msg) from None
self.state = "on" if state and timer_remaining == 0 else "off"
class HomeSeer(PowerDevice):
class HomeSeer(HTTPDevice):
def __init__(self, config): def __init__(self, config):
super().__init__(config) super().__init__(config, default_user="admin", default_password="")
self.addr = config.get("address")
self.device = config.getint("device") self.device = config.getint("device")
self.user = config.get("user", "admin")
self.password = config.get("password", "")
async def _send_homeseer(self, request, additional=""): async def _send_homeseer(self, request, additional=""):
url = (f"http://{self.user}:{self.password}@{self.addr}" url = (f"http://{self.user}:{self.password}@{self.addr}"
f"/JSON?user={self.user}&pass={self.password}" f"/JSON?user={self.user}&pass={self.password}"
f"&request={request}&ref={self.device}&{additional}") f"&request={request}&ref={self.device}&{additional}")
data = "" return await self._send_http_command(url, request)
http_client = AsyncHTTPClient()
try:
response = await http_client.fetch(url)
data = json_decode(response.body)
except Exception:
msg = f"Error sending HomeSeer command: {request}"
logging.exception(msg)
raise self.server.error(msg)
return data
async def initialize(self): async def _send_status_request(self):
await self.refresh_status()
def get_device_info(self):
return {
**super().get_device_info(),
'type': "homeseer"
}
async def refresh_status(self):
try:
res = await self._send_homeseer("getstatus") res = await self._send_homeseer("getstatus")
state = res[f"Devices"][0]["status"].lower() return res[f"Devices"][0]["status"].lower()
except Exception:
self.state = "error"
msg = f"Error Refeshing Device Status: {self.name}"
logging.exception(msg)
raise self.server.error(msg) from None
self.state = state
async def set_power(self, state): async def _send_power_request(self, state):
try:
if state == "on": if state == "on":
state_hs = "On" state_hs = "On"
elif state == "off": elif state == "off":
state_hs = "Off" state_hs = "Off"
res = await self._send_homeseer("controldevicebylabel", res = await self._send_homeseer("controldevicebylabel",
f"label={state_hs}") f"label={state_hs}")
except Exception: return state
self.state = "error"
msg = f"Error Setting Device Status: {self.name} to {state}"
logging.exception(msg)
raise self.server.error(msg) from None
self.state = state
class HomeAssistant(PowerDevice):
class HomeAssistant(HTTPDevice):
def __init__(self, config): def __init__(self, config):
super().__init__(config) super().__init__(config, default_port=8123)
self.addr = config.get("address")
self.port = config.getint("port", 8123)
self.device = config.get("device") self.device = config.get("device")
self.token = config.get("token") self.token = config.get("token")
@ -667,15 +610,12 @@ class HomeAssistant(PowerDevice):
'Authorization': f'Bearer {self.token}', 'Authorization': f'Bearer {self.token}',
'Content-Type': 'application/json' 'Content-Type': 'application/json'
} }
data = ""
http_client = AsyncHTTPClient()
try: try:
if (method == "POST"): if (method == "POST"):
response = await http_client.fetch( response = await self.client.fetch(
url, method="POST", body=json.dumps(body), headers=headers) url, method="POST", body=json.dumps(body), headers=headers)
data = json_decode(response.body)
else: else:
response = await http_client.fetch( response = await self.client.fetch(
url, method="GET", headers=headers) url, method="GET", headers=headers)
data = json_decode(response.body) data = json_decode(response.body)
except Exception: except Exception:
@ -684,36 +624,14 @@ class HomeAssistant(PowerDevice):
raise self.server.error(msg) raise self.server.error(msg)
return data return data
async def initialize(self): async def _send_status_request(self):
await self.refresh_status()
def get_device_info(self):
return {
**super().get_device_info(),
'type': "homeassistant"
}
async def refresh_status(self):
try:
res = await self._send_homeassistant_command("info") res = await self._send_homeassistant_command("info")
state = res[f"state"] return res[f"state"]
except Exception:
self.state = "error"
msg = f"Error Refeshing Device Status: {self.name}"
logging.exception(msg)
raise self.server.error(msg) from None
self.state = state
async def set_power(self, state): async def _send_power_request(self, state):
try:
res = await self._send_homeassistant_command(state) res = await self._send_homeassistant_command(state)
state = res[0][f"state"] return res[0][f"state"]
except Exception:
self.state = "error"
msg = f"Error Setting Device Status: {self.name} to {state}"
logging.exception(msg)
raise self.server.error(msg) from None
self.state = state
# The power component has multiple configuration sections # The power component has multiple configuration sections
def load_component_multi(config): def load_component_multi(config):