webhooks: Improve type checking of api requests

Signed-off-by: Kevin O'Connor <kevin@koconnor.net>
This commit is contained in:
Kevin O'Connor 2020-08-14 15:18:27 -04:00
parent 7ec2ec30e3
commit fa83b1319c
1 changed files with 31 additions and 29 deletions

View File

@ -27,10 +27,6 @@ def byteify(data, ignore_dicts=False):
for k, v in data.items()}
return data
def json_loads_byteified(data):
return byteify(
json.loads(data, object_hook=byteify), True)
class WebRequestError(homing.CommandError):
def __init__(self, message,):
Exception.__init__(self, message)
@ -47,28 +43,39 @@ class WebRequest:
error = WebRequestError
def __init__(self, client_conn, request):
self.client_conn = client_conn
base_request = json_loads_byteified(request)
base_request = json.loads(request, object_hook=byteify)
if type(base_request) != dict:
raise ValueError("Not a top-level dictionary")
self.id = base_request.get('id', None)
self.method = base_request['method']
self.method = base_request.get('method')
self.params = base_request.get('params', {})
if type(self.method) != str or type(self.params) != dict:
raise ValueError("Invalid request type")
self.response = None
self.is_error = False
def get_client_connection(self):
return self.client_conn
def get(self, item, default=Sentinel):
if item not in self.params:
if default == Sentinel:
raise WebRequestError("Invalid Argument [%s]" % item)
return default
return self.params[item]
def get(self, item, default=Sentinel, types=None):
value = self.params.get(item, default)
if value is Sentinel:
raise WebRequestError("Missing Argument [%s]" % (item,))
if types is not None and type(value) not in types:
raise WebRequestError("Invalid Argument Type [%s]" % (item,))
return value
def get_int(self, item):
return int(self.get(item))
def get_str(self, item, default=Sentinel):
return self.get(item, default, types=(str,))
def get_float(self, item):
return float(self.get(item))
def get_int(self, item, default=Sentinel):
return self.get(item, default, types=(int,))
def get_float(self, item, default=Sentinel):
return float(self.get(item, default, types=(int, float)))
def get_dict(self, item, default=Sentinel):
return self.get(item, default, types=(dict,))
def get_method(self):
return self.method
@ -196,13 +203,10 @@ class ClientConnection:
requests[0] = self.partial_data + requests[0]
self.partial_data = requests.pop()
for req in requests:
logging.debug(
"webhooks: Request received: %s" % (req))
try:
web_request = WebRequest(self, req)
except Exception:
logging.exception(
"webhooks: Error decoding Server Request %s"
logging.exception("webhooks: Error decoding Server Request %s"
% (req))
continue
self.reactor.register_callback(
@ -320,7 +324,7 @@ class GCodeHelper:
def _handle_help(self, web_request):
web_request.send(self.gcode.get_command_help())
def _handle_script(self, web_request):
self.gcode.run_script(web_request.get('script'))
self.gcode.run_script(web_request.get_str('script'))
def _handle_restart(self, web_request):
self.gcode.run_script('restart')
def _handle_firmware_restart(self, web_request):
@ -335,7 +339,7 @@ class GCodeHelper:
cconn.send(tmp)
def _handle_subscribe_output(self, web_request):
cconn = web_request.get_client_connection()
template = web_request.get('response_template', {})
template = web_request.get_dict('response_template', {})
self.clients[cconn] = template
if not self.is_output_registered:
self.gcode.register_output_handler(self._output_callback)
@ -406,20 +410,18 @@ class QueryStatusHelper:
return reactor.NEVER
return eventtime + SUBSCRIPTION_REFRESH_TIME
def _handle_query(self, web_request, is_subscribe=False):
objects = web_request.get('objects')
objects = web_request.get_dict('objects')
# Validate subscription format
if type(objects) != type({}):
raise web_request.error("Invalid argument")
for k, v in objects.items():
if type(k) != type("") or type(v) not in [type([]), type(None)]:
if type(k) != str or (v is not None and type(v) != list):
raise web_request.error("Invalid argument")
if v is not None:
for ri in v:
if type(ri) != type(""):
if type(ri) != str:
raise web_request.error("Invalid argument")
# Add to pending queries
cconn = web_request.get_client_connection()
template = web_request.get('response_template', {})
template = web_request.get_dict('response_template', {})
if is_subscribe and cconn in self.clients:
del self.clients[cconn]
reactor = self.printer.get_reactor()