app: allow transport registration

This allows eligible components to register themselves as API transports.  By default the WebsocketManager is registered.

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2021-06-23 14:28:55 -04:00
parent a7801db6e7
commit 40f21b10cd
4 changed files with 83 additions and 53 deletions

View File

@ -42,10 +42,12 @@ if TYPE_CHECKING:
from tornado.httpserver import HTTPServer from tornado.httpserver import HTTPServer
from moonraker import Server from moonraker import Server
from confighelper import ConfigHelper from confighelper import ConfigHelper
from websockets import APITransport
from components.file_manager import FileManager from components.file_manager import FileManager
import components.authorization import components.authorization
MessageDelgate = Optional[tornado.httputil.HTTPMessageDelegate] MessageDelgate = Optional[tornado.httputil.HTTPMessageDelegate]
AuthComp = Optional[components.authorization.Authorization] AuthComp = Optional[components.authorization.Authorization]
APICallback = Callable[[WebRequest], Coroutine]
# These endpoints are reserved for klippy/server communication only and are # These endpoints are reserved for klippy/server communication only and are
# not exposed via http or the websocket # not exposed via http or the websocket
@ -59,6 +61,7 @@ MAX_BODY_SIZE = 50 * 1024 * 1024
EXCLUDED_ARGS = ["_", "token", "access_token", "connection_id"] EXCLUDED_ARGS = ["_", "token", "access_token", "connection_id"]
AUTHORIZED_EXTS = [".png"] AUTHORIZED_EXTS = [".png"]
DEFAULT_KLIPPY_LOG_PATH = "/tmp/klippy.log" DEFAULT_KLIPPY_LOG_PATH = "/tmp/klippy.log"
ALL_TRANSPORTS = ["http", "websocket", "mqtt"]
class MutableRouter(tornado.web.ReversibleRuleRouter): class MutableRouter(tornado.web.ReversibleRuleRouter):
def __init__(self, application: MoonrakerApp) -> None: def __init__(self, application: MoonrakerApp) -> None:
@ -104,15 +107,19 @@ class APIDefinition:
def __init__(self, def __init__(self,
endpoint: str, endpoint: str,
http_uri: str, http_uri: str,
ws_methods: List[str], jrpc_methods: List[str],
request_methods: Union[str, List[str]], request_methods: Union[str, List[str]],
transports: List[str],
callback: Optional[APICallback],
need_object_parser: bool): need_object_parser: bool):
self.endpoint = endpoint self.endpoint = endpoint
self.uri = http_uri self.uri = http_uri
self.ws_methods = ws_methods self.jrpc_methods = jrpc_methods
if not isinstance(request_methods, list): if not isinstance(request_methods, list):
request_methods = [request_methods] request_methods = [request_methods]
self.request_methods = request_methods self.request_methods = request_methods
self.supported_transports = transports
self.callback = callback
self.need_object_parser = need_object_parser self.need_object_parser = need_object_parser
class MoonrakerApp: class MoonrakerApp:
@ -133,6 +140,9 @@ class MoonrakerApp:
# Set Up Websocket and Authorization Managers # Set Up Websocket and Authorization Managers
self.wsm = WebsocketManager(self.server) self.wsm = WebsocketManager(self.server)
self.api_transports: Dict[str, APITransport] = {
"websocket": self.wsm
}
mimetypes.add_type('text/plain', '.log') mimetypes.add_type('text/plain', '.log')
mimetypes.add_type('text/plain', '.gcode') mimetypes.add_type('text/plain', '.gcode')
@ -229,6 +239,13 @@ class MoonrakerApp:
await self.secure_server.close_all_connections() await self.secure_server.close_all_connections()
await self.wsm.close() await self.wsm.close()
def register_api_transport(self,
name: str,
transport: APITransport
) -> Dict[str, APIDefinition]:
self.api_transports[name] = transport
return self.api_cache
def register_remote_handler(self, endpoint: str) -> None: def register_remote_handler(self, endpoint: str) -> None:
if endpoint in RESERVED_ENDPOINTS: if endpoint in RESERVED_ENDPOINTS:
return return
@ -237,10 +254,8 @@ class MoonrakerApp:
# reserved handler or already registered # reserved handler or already registered
return return
logging.info( logging.info(
f"Registering remote endpoint - " f"Registering HTTP endpoint: "
f"HTTP: ({' '.join(api_def.request_methods)}) {api_def.uri}; " f"({' '.join(api_def.request_methods)}) {api_def.uri}")
f"Websocket: {', '.join(api_def.ws_methods)}")
self.wsm.register_remote_handler(api_def)
params: Dict[str, Any] = {} params: Dict[str, Any] = {}
params['methods'] = api_def.request_methods params['methods'] = api_def.request_methods
params['callback'] = api_def.endpoint params['callback'] = api_def.endpoint
@ -248,32 +263,34 @@ class MoonrakerApp:
self.mutable_router.add_handler( self.mutable_router.add_handler(
api_def.uri, DynamicRequestHandler, params) api_def.uri, DynamicRequestHandler, params)
self.registered_base_handlers.append(api_def.uri) self.registered_base_handlers.append(api_def.uri)
for name, transport in self.api_transports.items():
transport.register_api_handler(api_def)
def register_local_handler(self, def register_local_handler(self,
uri: str, uri: str,
request_methods: List[str], request_methods: List[str],
callback: Callable[[WebRequest], Coroutine], callback: APICallback,
protocol: List[str] = ["http", "websocket"], transports: List[str] = ALL_TRANSPORTS,
wrap_result: bool = True wrap_result: bool = True
) -> None: ) -> None:
if uri in self.registered_base_handlers: if uri in self.registered_base_handlers:
return return
api_def = self._create_api_definition( api_def = self._create_api_definition(
uri, request_methods, is_remote=False) uri, request_methods, callback, transports=transports)
msg = "Registering local endpoint" if "http" in transports:
if "http" in protocol: logging.info(
msg += f" - HTTP: ({' '.join(request_methods)}) {uri}" f"Registering HTTP Endpoint: "
f"({' '.join(request_methods)}) {uri}")
params: dict[str, Any] = {} params: dict[str, Any] = {}
params['methods'] = request_methods params['methods'] = request_methods
params['callback'] = callback params['callback'] = callback
params['wrap_result'] = wrap_result params['wrap_result'] = wrap_result
params['is_remote'] = False params['is_remote'] = False
self.mutable_router.add_handler(uri, DynamicRequestHandler, params) self.mutable_router.add_handler(uri, DynamicRequestHandler, params)
self.registered_base_handlers.append(uri) self.registered_base_handlers.append(uri)
if "websocket" in protocol: for name, transport in self.api_transports.items():
msg += f" - Websocket: {', '.join(api_def.ws_methods)}" if name in transports:
self.wsm.register_local_handler(api_def, callback) transport.register_api_handler(api_def)
logging.info(msg)
def register_static_file_handler(self, def register_static_file_handler(self,
pattern: str, pattern: str,
@ -301,17 +318,19 @@ class MoonrakerApp:
{'max_upload_size': self.max_upload_size}) {'max_upload_size': self.max_upload_size})
def remove_handler(self, endpoint: str) -> None: def remove_handler(self, endpoint: str) -> None:
api_def = self.api_cache.get(endpoint) api_def = self.api_cache.pop(endpoint, None)
if api_def is not None: if api_def is not None:
self.mutable_router.remove_handler(api_def.uri) self.mutable_router.remove_handler(api_def.uri)
for ws_method in api_def.ws_methods: for name, transport in self.api_transports.items():
self.wsm.remove_handler(ws_method) transport.remove_api_handler(api_def)
def _create_api_definition(self, def _create_api_definition(self,
endpoint: str, endpoint: str,
request_methods: List[str] = [], request_methods: List[str] = [],
is_remote=True callback: Optional[APICallback] = None,
transports: List[str] = ALL_TRANSPORTS
) -> APIDefinition: ) -> APIDefinition:
is_remote = callback is None
if endpoint in self.api_cache: if endpoint in self.api_cache:
return self.api_cache[endpoint] return self.api_cache[endpoint]
if endpoint[0] == '/': if endpoint[0] == '/':
@ -320,28 +339,29 @@ class MoonrakerApp:
uri = "/printer/" + endpoint uri = "/printer/" + endpoint
else: else:
uri = "/server/" + endpoint uri = "/server/" + endpoint
ws_methods = [] jrpc_methods = []
if is_remote: if is_remote:
# Remote requests accept both GET and POST requests. These # Remote requests accept both GET and POST requests. These
# requests execute the same callback, thus they resolve to # requests execute the same callback, thus they resolve to
# only a single websocket method. # only a single websocket method.
ws_methods.append(uri[1:].replace('/', '.')) jrpc_methods.append(uri[1:].replace('/', '.'))
request_methods = ['GET', 'POST'] request_methods = ['GET', 'POST']
else: else:
name_parts = uri[1:].split('/') name_parts = uri[1:].split('/')
if len(request_methods) > 1: if len(request_methods) > 1:
for req_mthd in request_methods: for req_mthd in request_methods:
func_name = req_mthd.lower() + "_" + name_parts[-1] func_name = req_mthd.lower() + "_" + name_parts[-1]
ws_methods.append(".".join(name_parts[:-1] + [func_name])) jrpc_methods.append(".".join(
name_parts[:-1] + [func_name]))
else: else:
ws_methods.append(".".join(name_parts)) jrpc_methods.append(".".join(name_parts))
if not is_remote and len(request_methods) != len(ws_methods): if not is_remote and len(request_methods) != len(jrpc_methods):
raise self.server.error( raise self.server.error(
"Invalid API definition. Number of websocket methods must " "Invalid API definition. Number of websocket methods must "
"match the number of request methods") "match the number of request methods")
need_object_parser = endpoint.startswith("objects/") need_object_parser = endpoint.startswith("objects/")
api_def = APIDefinition(endpoint, uri, ws_methods, api_def = APIDefinition(endpoint, uri, jrpc_methods, request_methods,
request_methods, need_object_parser) transports, callback, need_object_parser)
self.api_cache[endpoint] = api_def self.api_cache[endpoint] = api_def
return api_def return api_def

View File

@ -168,28 +168,28 @@ class Authorization:
self.permitted_paths.add("/access/refresh_jwt") self.permitted_paths.add("/access/refresh_jwt")
self.server.register_endpoint( self.server.register_endpoint(
"/access/login", ['POST'], self._handle_login, "/access/login", ['POST'], self._handle_login,
protocol=['http']) transports=['http'])
self.server.register_endpoint( self.server.register_endpoint(
"/access/logout", ['POST'], self._handle_logout, "/access/logout", ['POST'], self._handle_logout,
protocol=['http']) transports=['http'])
self.server.register_endpoint( self.server.register_endpoint(
"/access/refresh_jwt", ['POST'], self._handle_refresh_jwt, "/access/refresh_jwt", ['POST'], self._handle_refresh_jwt,
protocol=['http']) transports=['http'])
self.server.register_endpoint( self.server.register_endpoint(
"/access/user", ['GET', 'POST', 'DELETE'], "/access/user", ['GET', 'POST', 'DELETE'],
self._handle_user_request, protocol=['http']) self._handle_user_request, transports=['http'])
self.server.register_endpoint( self.server.register_endpoint(
"/access/users/list", ['GET'], self._handle_list_request, "/access/users/list", ['GET'], self._handle_list_request,
protocol=['http']) transports=['http'])
self.server.register_endpoint( self.server.register_endpoint(
"/access/user/password", ['POST'], self._handle_password_reset, "/access/user/password", ['POST'], self._handle_password_reset,
protocol=['http']) transports=['http'])
self.server.register_endpoint( self.server.register_endpoint(
"/access/api_key", ['GET', 'POST'], "/access/api_key", ['GET', 'POST'],
self._handle_apikey_request, protocol=['http']) self._handle_apikey_request, transports=['http'])
self.server.register_endpoint( self.server.register_endpoint(
"/access/oneshot_token", ['GET'], "/access/oneshot_token", ['GET'],
self._handle_oneshot_request, protocol=['http']) self._handle_oneshot_request, transports=['http'])
self.server.register_notification("authorization:user_created") self.server.register_notification("authorization:user_created")
self.server.register_notification("authorization:user_deleted") self.server.register_notification("authorization:user_deleted")

View File

@ -82,7 +82,7 @@ class FileManager:
"/server/files/copy", ['POST'], self._handle_file_move_copy) "/server/files/copy", ['POST'], self._handle_file_move_copy)
self.server.register_endpoint( self.server.register_endpoint(
"/server/files/delete_file", ['DELETE'], self._handle_file_delete, "/server/files/delete_file", ['DELETE'], self._handle_file_delete,
protocol=["websocket"]) transports=["websocket"])
# register client notificaitons # register client notificaitons
self.server.register_notification("file_manager:filelist_changed") self.server.register_notification("file_manager:filelist_changed")
# Register APIs to handle file uploads # Register APIs to handle file uploads

View File

@ -258,7 +258,14 @@ class JsonRPC:
'id': req_id 'id': req_id
} }
class WebsocketManager: class APITransport:
def register_api_handler(self, api_def: APIDefinition) -> None:
raise NotImplementedError
def remove_api_handler(self, api_def: APIDefinition) -> None:
raise NotImplementedError
class WebsocketManager(APITransport):
def __init__(self, server: Server) -> None: def __init__(self, server: Server) -> None:
self.server = server self.server = server
self.websockets: Dict[int, WebSocket] = {} self.websockets: Dict[int, WebSocket] = {}
@ -279,23 +286,26 @@ class WebsocketManager:
self.server.register_event_handler( self.server.register_event_handler(
event_name, notify_handler) event_name, notify_handler)
def register_local_handler(self, def register_api_handler(self, api_def: APIDefinition) -> None:
api_def: APIDefinition, if api_def.callback is None:
callback: Callable[[WebRequest], Coroutine] # Remote API, uses RPC to reach out to Klippy
) -> None: ws_method = api_def.jrpc_methods[0]
for ws_method, req_method in \ rpc_cb = self._generate_callback(api_def.endpoint)
zip(api_def.ws_methods, api_def.request_methods):
rpc_cb = self._generate_local_callback(
api_def.endpoint, req_method, callback)
self.rpc.register_method(ws_method, rpc_cb) self.rpc.register_method(ws_method, rpc_cb)
else:
# Local API, uses local callback
for ws_method, req_method in \
zip(api_def.jrpc_methods, api_def.request_methods):
rpc_cb = self._generate_local_callback(
api_def.endpoint, req_method, api_def.callback)
self.rpc.register_method(ws_method, rpc_cb)
logging.info(
"Registering Websocket JSON-RPC methods: "
f"{', '.join(api_def.jrpc_methods)}")
def register_remote_handler(self, api_def: APIDefinition) -> None: def remove_api_handler(self, api_def: APIDefinition) -> None:
ws_method = api_def.ws_methods[0] for jrpc_method in api_def.jrpc_methods:
rpc_cb = self._generate_callback(api_def.endpoint) self.rpc.remove_method(jrpc_method)
self.rpc.register_method(ws_method, rpc_cb)
def remove_handler(self, ws_method: str) -> None:
self.rpc.remove_method(ws_method)
def _generate_callback(self, endpoint: str) -> RPCCallback: def _generate_callback(self, endpoint: str) -> RPCCallback:
async def func(ws: WebSocket, **kwargs) -> Any: async def func(ws: WebSocket, **kwargs) -> Any: