power: add support for smartthings switch
power: add support for smartthings switch Signed-off-by: Mitch Gallman <mitchell.gallman@gmail.com>
This commit is contained in:
parent
b60153c919
commit
d01b8a9763
|
@ -51,7 +51,8 @@ class PrinterPower:
|
|||
"homeassistant": HomeAssistant,
|
||||
"loxonev1": Loxonev1,
|
||||
"rf": RFDevice,
|
||||
"mqtt": MQTTDevice
|
||||
"mqtt": MQTTDevice,
|
||||
"smartthings": SmartThings
|
||||
}
|
||||
|
||||
for section in prefix_sections:
|
||||
|
@ -946,6 +947,60 @@ class Shelly(HTTPDevice):
|
|||
return "on" if state and timer_remaining == 0 else "off"
|
||||
|
||||
|
||||
class SmartThings(HTTPDevice):
|
||||
def __init__(self, config: ConfigHelper) -> None:
|
||||
super().__init__(config, default_port=443, default_protocol="https")
|
||||
self.device: str = config.get("device", "")
|
||||
self.token: str = config.get("token", "")
|
||||
|
||||
async def _send_smartthings_command(self,
|
||||
command: str
|
||||
) -> Dict[str, Any]:
|
||||
|
||||
if (command == "on" or command == "off"):
|
||||
method = "POST"
|
||||
url = (f"{self.protocol}://{self.addr}"
|
||||
f"/v1/devices/{self.device}/commands")
|
||||
body = (f'[{{"component":"main", '
|
||||
'"capability":"switch", '
|
||||
f'"command": "{command}"}}]')
|
||||
elif command == "info":
|
||||
method = "GET"
|
||||
url = (f"{self.protocol}://{self.addr}/v1/devices/{self.device}/"
|
||||
"components/main/capabilities/switch/status")
|
||||
else:
|
||||
raise self.server.error(
|
||||
f"Invalid SmartThings command: {command}")
|
||||
|
||||
headers = {
|
||||
'Authorization': f'Bearer {self.token}',
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
|
||||
try:
|
||||
if method == "POST":
|
||||
response = await self.client.fetch(
|
||||
url, method=method, headers=headers, body=body)
|
||||
elif method == "GET":
|
||||
response = await self.client.fetch(
|
||||
url, method=method, headers=headers)
|
||||
data: Dict[str, Any] = json_decode(response.body)
|
||||
except Exception:
|
||||
msg = f"Error sending SmartThings command: {command}"
|
||||
logging.exception(msg)
|
||||
raise self.server.error(msg)
|
||||
return data
|
||||
|
||||
async def _send_status_request(self) -> str:
|
||||
res = await self._send_smartthings_command("info")
|
||||
return res["switch"]["value"].lower()
|
||||
|
||||
async def _send_power_request(self, state: str) -> str:
|
||||
res = await self._send_smartthings_command(state)
|
||||
acknowledgment = res["results"][0]["status"].lower()
|
||||
return state if acknowledgment == "accepted" else "error"
|
||||
|
||||
|
||||
class HomeSeer(HTTPDevice):
|
||||
def __init__(self, config: ConfigHelper) -> None:
|
||||
super().__init__(config, default_user="admin", default_password="")
|
||||
|
|
Loading…
Reference in New Issue