power: resolve url encoding issues

Dynamic parts of the url must be encoded individually to
guarantee correct url generation.

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2023-02-11 07:33:28 -05:00
parent 0de712e822
commit 55ebe120c5
No known key found for this signature in database
GPG Key ID: 5A1EB336DFB4C71B
1 changed files with 68 additions and 67 deletions

View File

@ -11,6 +11,7 @@ import struct
import socket
import asyncio
import time
from urllib.parse import quote, urlencode
# Annotation imports
from typing import (
@ -464,12 +465,9 @@ class HTTPDevice(PowerDevice):
self.notify_power_changed()
return
async def _send_http_command(self,
url: str,
command: str,
retries: int = 3
) -> Dict[str, Any]:
url = self.client.escape_url(url)
async def _send_http_command(
self, url: str, command: str, retries: int = 3
) -> Dict[str, Any]:
response = await self.client.get(
url, request_timeout=20., attempts=retries,
retry_pause_time=1., enable_cache=False)
@ -546,7 +544,7 @@ class GpioDevice(PowerDevice):
self.state = state
self._check_timer()
def _check_timer(self):
def _check_timer(self) -> None:
if self.state == "on" and self.timer is not None:
event_loop = self.server.get_event_loop()
power: PrinterPower = self.server.lookup_component("power")
@ -711,14 +709,14 @@ class KlipperDevice(PowerDevice):
if last_state not in [state, "init"] and not in_event:
self.notify_power_changed()
def _check_timer(self):
def _check_timer(self) -> None:
if self.state == "on" and self.timer is not None:
event_loop = self.server.get_event_loop()
power: PrinterPower = self.server.lookup_component("power")
self.timer_handle = event_loop.delay_callback(
self.timer, power.set_device_power, self.name, "off")
def _reset_timer(self):
def _reset_timer(self) -> None:
if self.timer_handle is not None:
self.timer_handle.cancel()
self.timer_handle = None
@ -940,14 +938,11 @@ class TPLinkSmartPlug(PowerDevice):
class Tasmota(HTTPDevice):
def __init__(self, config: ConfigHelper) -> None:
super().__init__(config, default_password="")
super().__init__(config, default_user="admin", default_password="")
self.output_id = config.getint("output_id", 1)
self.timer = config.get("timer", "")
async def _send_tasmota_command(self,
command: str,
password: Optional[str] = None
) -> Dict[str, Any]:
async def _send_tasmota_command(self, command: str) -> Dict[str, Any]:
if command in ["on", "off"]:
out_cmd = f"Power{self.output_id} {command}"
if self.timer != "" and command == "off":
@ -956,9 +951,12 @@ class Tasmota(HTTPDevice):
out_cmd = f"Power{self.output_id}"
else:
raise self.server.error(f"Invalid tasmota command: {command}")
url = f"http://{self.addr}/cm?user=admin&password=" \
f"{self.password}&cmnd={out_cmd}"
query = urlencode({
"user": self.user,
"password": self.password,
"cmnd": out_cmd
})
url = f"{self.protocol}://{quote(self.addr)}/cm?{query}"
return await self._send_http_command(url, command)
async def _send_status_request(self) -> str:
@ -992,22 +990,21 @@ class Shelly(HTTPDevice):
self.timer = config.get("timer", "")
async def _send_shelly_command(self, command: str) -> Dict[str, Any]:
if command == "on":
out_cmd = f"relay/{self.output_id}?turn={command}"
elif command == "off":
if self.timer != "":
out_cmd = f"relay/{self.output_id}?turn=on&timer={self.timer}"
else:
out_cmd = f"relay/{self.output_id}?turn={command}"
elif command == "info":
out_cmd = f"relay/{self.output_id}"
else:
query_args: Dict[str, Any] = {}
out_cmd = f"relay/{self.output_id}"
if command in ["on", "off"]:
query_args["turn"] = command
if command == "off" and self.timer != "":
query_args["turn"] = "on"
query_args["timer"] = self.timer
elif command != "info":
raise self.server.error(f"Invalid shelly command: {command}")
if self.password != "":
out_pwd = f"{self.user}:{self.password}@"
out_pwd = f"{quote(self.user)}:{quote(self.password)}@"
else:
out_pwd = f""
url = f"http://{out_pwd}{self.addr}/{out_cmd}"
query = urlencode(query_args)
url = f"{self.protocol}://{out_pwd}{quote(self.addr)}/{out_cmd}?{query}"
return await self._send_http_command(url, command)
async def _send_status_request(self) -> str:
@ -1029,14 +1026,14 @@ class SmartThings(HTTPDevice):
self.device: str = config.get("device", "")
self.token: str = config.gettemplate("token").render()
async def _send_smartthings_command(self,
command: str
) -> Dict[str, Any]:
async def _send_smartthings_command(self, command: str) -> Dict[str, Any]:
body: Optional[List[Dict[str, Any]]] = None
if (command == "on" or command == "off"):
method = "POST"
url = (f"{self.protocol}://{self.addr}"
f"/v1/devices/{self.device}/commands")
url = (
f"{self.protocol}://{quote(self.addr)}"
f"/v1/devices/{quote(self.device)}/commands"
)
body = [
{
"component": "main",
@ -1046,8 +1043,11 @@ class SmartThings(HTTPDevice):
]
elif command == "info":
method = "GET"
url = (f"{self.protocol}://{self.addr}/v1/devices/{self.device}/"
"components/main/capabilities/switch/status")
url = (
f"{self.protocol}://{quote(self.addr)}/v1/devices/"
f"{quote(self.device)}/components/main/capabilities/"
"switch/status"
)
else:
raise self.server.error(
f"Invalid SmartThings command: {command}")
@ -1055,7 +1055,6 @@ class SmartThings(HTTPDevice):
headers = {
'Authorization': f'Bearer {self.token}'
}
url = self.client.escape_url(url)
response = await self.client.request(
method, url, body=body, headers=headers,
attempts=3, enable_cache=False
@ -1080,13 +1079,22 @@ class HomeSeer(HTTPDevice):
super().__init__(config, default_user="admin", default_password="")
self.device = config.getint("device")
async def _send_homeseer(self,
request: str,
additional: str = ""
) -> Dict[str, Any]:
url = (f"http://{self.user}:{self.password}@{self.addr}"
f"/JSON?user={self.user}&pass={self.password}"
f"&request={request}&ref={self.device}&{additional}")
async def _send_homeseer(
self, request: str, state: str = ""
) -> Dict[str, Any]:
query_args = {
"user": self.user,
"pass": self.password,
"request": request,
"ref": self.device,
}
if state:
query_args["label"] = state
query = urlencode(query_args)
url = (
f"{self.protocol}://{quote(self.user)}:{quote(self.password)}@"
f"{quote(self.addr)}/JSON?{query}"
)
return await self._send_http_command(url, request)
async def _send_status_request(self) -> str:
@ -1094,12 +1102,9 @@ class HomeSeer(HTTPDevice):
return res[f"Devices"][0]["status"].lower()
async def _send_power_request(self, state: str) -> str:
if state == "on":
state_hs = "On"
elif state == "off":
state_hs = "Off"
res = await self._send_homeseer("controldevicebylabel",
f"label={state_hs}")
res = await self._send_homeseer(
"controldevicebylabel", state.capitalize()
)
return state
@ -1111,26 +1116,23 @@ class HomeAssistant(HTTPDevice):
self.domain: str = config.get("domain", "switch")
self.status_delay: float = config.getfloat("status_delay", 1.)
async def _send_homeassistant_command(self,
command: str
) -> Dict[str, Any]:
async def _send_homeassistant_command(self, command: str) -> Dict[str, Any]:
body: Optional[Dict[str, Any]] = None
if command in ["on", "off"]:
out_cmd = f"api/services/{self.domain}/turn_{command}"
out_cmd = f"api/services/{quote(self.domain)}/turn_{command}"
body = {"entity_id": self.device}
method = "POST"
elif command == "info":
out_cmd = f"api/states/{self.device}"
out_cmd = f"api/states/{quote(self.device)}"
method = "GET"
else:
raise self.server.error(
f"Invalid homeassistant command: {command}")
url = f"{self.protocol}://{self.addr}:{self.port}/{out_cmd}"
url = f"{self.protocol}://{quote(self.addr)}:{self.port}/{out_cmd}"
headers = {
'Authorization': f'Bearer {self.token}'
}
data: Dict[str, Any] = {}
url = self.client.escape_url(url)
response = await self.client.request(
method, url, body=body, headers=headers,
attempts=3, enable_cache=False
@ -1159,16 +1161,16 @@ class Loxonev1(HTTPDevice):
async def _send_loxonev1_command(self, command: str) -> Dict[str, Any]:
if command in ["on", "off"]:
out_cmd = f"jdev/sps/io/{self.output_id}/{command}"
out_cmd = f"jdev/sps/io/{quote(self.output_id)}/{command}"
elif command == "info":
out_cmd = f"jdev/sps/io/{self.output_id}"
out_cmd = f"jdev/sps/io/{quote(self.output_id)}"
else:
raise self.server.error(f"Invalid loxonev1 command: {command}")
if self.password != "":
out_pwd = f"{self.user}:{self.password}@"
out_pwd = f"{quote(self.user)}:{quote(self.password)}@"
else:
out_pwd = f""
url = f"http://{out_pwd}{self.addr}/{out_cmd}"
url = f"http://{out_pwd}{quote(self.addr)}/{out_cmd}"
return await self._send_http_command(url, command)
async def _send_status_request(self) -> str:
@ -1374,10 +1376,10 @@ class HueDevice(HTTPDevice):
async def _send_power_request(self, state: str) -> str:
new_state = True if state == "on" else False
url = (
f"http://{self.addr}/api/{self.user}/{self.device_type}s"
f"/{self.device_id}/{self.state_key}"
f"{self.protocol}://{quote(self.addr)}/api/{quote(self.user)}"
f"/{self.device_type}s/{quote(self.device_id)}"
f"/{quote(self.state_key)}"
)
url = self.client.escape_url(url)
ret = await self.client.request("PUT", url, body={"on": new_state})
resp = cast(List[Dict[str, Dict[str, Any]]], ret.json())
state_url = (
@ -1390,10 +1392,9 @@ class HueDevice(HTTPDevice):
async def _send_status_request(self) -> str:
url = (
f"http://{self.addr}/api/{self.user}/{self.device_type}s"
f"/{self.device_id}"
f"{self.protocol}://{quote(self.addr)}/api/{quote(self.user)}"
f"/{self.device_type}s/{quote(self.device_id)}"
)
url = self.client.escape_url(url)
ret = await self.client.request("GET", url)
resp = cast(Dict[str, Dict[str, Any]], ret.json())
return "on" if resp["state"][self.on_state] else "off"