From 4a57dba5863d53001d1cbd7c9e03d7026aec0b66 Mon Sep 17 00:00:00 2001 From: Arksine Date: Thu, 13 Aug 2020 20:45:03 -0400 Subject: [PATCH] moonraker: update protocol for data received from klippy Signed-off-by: Eric Callahan --- moonraker/app.py | 6 +- moonraker/moonraker.py | 173 ++++++++++++++++++++---------- moonraker/plugins/file_manager.py | 53 ++++----- 3 files changed, 146 insertions(+), 86 deletions(-) diff --git a/moonraker/app.py b/moonraker/app.py index 1e6e4e3..952cf02 100644 --- a/moonraker/app.py +++ b/moonraker/app.py @@ -21,7 +21,7 @@ MAX_UPLOAD_SIZE = 200 * 1024 * 1024 # These endpoints are reserved for klippy/server communication only and are # not exposed via http or the websocket RESERVED_ENDPOINTS = [ - "list_endpoints", "moonraker/check_available" + "list_endpoints", "gcode/subscribe_output" ] @@ -34,8 +34,10 @@ def _status_parser(request): for v in vals: if v: parsed += v.decode().split(',') + if parsed == []: + parsed = None args[key] = parsed - return args + return {'objects': args} # Built-in Query String Parser def _default_parser(request): diff --git a/moonraker/moonraker.py b/moonraker/moonraker.py index 59db882..c729b51 100644 --- a/moonraker/moonraker.py +++ b/moonraker/moonraker.py @@ -22,7 +22,7 @@ from utils import ServerError, MoonrakerLoggingHandler INIT_MS = 1000 CORE_PLUGINS = [ - 'file_manager', 'gcode_apis', 'machine', + 'file_manager', 'klippy_apis', 'machine', 'temperature_store', 'shell_command'] class Sentinel: @@ -43,6 +43,16 @@ class Server: 'klippy_uds_address', "/tmp/klippy_uds") self.klippy_iostream = None self.is_klippy_ready = False + self.gc_response_registered = False + self.klippy_state = "disconnected" + + # XXX - currently moonraker maintains a superset of all + # subscriptions, the results of which are forwarded to all + # connected websockets. A better implementation would open a + # unique unix domain socket for each websocket client and + # allow Klipper to forward only those subscriptions back to + # correct client. + self.all_subscriptions = {} # Server/IOLoop self.server_running = False @@ -58,10 +68,6 @@ class Server: # they do not return a response to Klippy after execution self.pending_requests = {} self.remote_methods = {} - self.register_remote_method( - 'set_klippy_shutdown', self._set_klippy_shutdown) - self.register_remote_method( - 'response', self._handle_klippy_response) self.register_remote_method( 'process_gcode_response', self._process_gcode_response) self.register_remote_method( @@ -69,6 +75,7 @@ class Server: # Plugin initialization self.plugins = {} + self.klippy_apis = self.load_plugin(config, 'klippy_apis') self._load_plugins(config) def start(self): @@ -172,19 +179,40 @@ class Server: continue try: decoded_cmd = json.loads(data[:-1]) - method = decoded_cmd.get('method') - params = decoded_cmd.get('params', {}) - cb = self.remote_methods.get(method) - if cb is not None: - cb(**params) + method = decoded_cmd.get('method', None) + if method is not None: + # This is a remote method called from klippy + cb = self.remote_methods.get(method, None) + if cb is not None: + params = decoded_cmd.get('params', {}) + cb(**params) + else: + logging.info(f"Unknown method received: {method}") + continue + # This is a response to a request, process + req_id = decoded_cmd.get('id', None) + request = self.pending_requests.pop(req_id, None) + if request is None: + logging.info( + f"No request matching request ID: {req_id}, " + f"response: {decoded_cmd}") + continue + if 'result' in decoded_cmd: + result = decoded_cmd['result'] + if not result: + result = "ok" else: - logging.info(f"Unknown command received: {data.decode()}") + err = decoded_cmd.get('error', "Malformed Klippy Response") + result = ServerError(err, 400) + request.notify(result) except Exception: logging.exception( f"Error processing Klippy Host Response: {data.decode()}") def _handle_stream_closed(self): self.is_klippy_ready = False + self.gc_response_registered = False + self.klippy_state = "disconnected" self.klippy_iostream = None self.init_cb.stop() for request in self.pending_requests.values(): @@ -216,30 +244,31 @@ class Server: self.init_cb.stop() async def _request_endpoints(self): - try: - result = await self.make_request("list_endpoints", {}) - except ServerError: + result = await self.klippy_apis.list_endpoints(default=None) + if result is None: return - endpoints = result.get('hooks', {}) - static_paths = result.get('static_paths', {}) + endpoints = result.get('endpoints', {}) for ep in endpoints: self.moonraker_app.register_remote_handler(ep) - mutable_paths = {sp['resource_id']: sp['file_path'] - for sp in static_paths} - file_manager = self.lookup_plugin('file_manager') - file_manager.update_mutable_paths(mutable_paths) + # Subscribe to Gcode Output + if "gcode/subscribe_output" in endpoints and \ + not self.gc_response_registered: + try: + await self.klippy_apis.subscribe_gcode_output() + except ServerError as e: + logging.info( + f"{e}\nUnable to register gcode output subscription") + return + self.gc_response_registered = True async def _check_available_objects(self): - try: - result = await self.make_request("objects/list", {}) - except ServerError as e: + result = await self.klippy_apis.get_object_list(default=None) + if result is None: logging.info( - f"{e}\nUnable to retreive Klipper Object List") + f"Unable to retreive Klipper Object List") return - missing_objs = [] - for obj in ["virtual_sdcard", "display_status", "pause_resume"]: - if obj not in result: - missing_objs.append(obj) + req_objs = set(["virtual_sdcard", "display_status", "pause_resume"]) + missing_objs = req_objs - set(result) if missing_objs: err_str = ", ".join([f"[{o}]" for o in missing_objs]) logging.info( @@ -249,32 +278,51 @@ class Server: async def _check_ready(self): try: - result = await self.make_request("info", {}) + result = await self.klippy_apis.get_klippy_info() except ServerError as e: logging.info( f"{e}\nKlippy info request error. This indicates that\n" f"Klippy may have experienced an error during startup.\n" f"Please check klippy.log for more information") return - is_ready = result.get("is_ready", False) + # Update filemanager fixed paths + fixed_paths = {k: result[k] for k in + ['klipper_path', 'python_path', + 'log_file', 'config_file']} + file_manager = self.lookup_plugin('file_manager') + file_manager.update_fixed_paths(fixed_paths) + is_ready = result.get('state', "") == "ready" if is_ready: - self._set_klippy_ready() + await self._set_klippy_ready() else: - msg = result.get("message", "Klippy Not Ready") + msg = result.get('state_message', "Klippy Not Ready") logging.info("\n" + msg) - - def _handle_klippy_response(self, request_id, response): - req = self.pending_requests.pop(request_id, None) - if req is not None: - if isinstance(response, dict) and 'error' in response: - response = ServerError(response['message'], 400) - req.notify(response) - else: - logging.info(f"No request matching response: {response}") - - def _set_klippy_ready(self): + async def _set_klippy_ready(self): logging.info("Klippy ready") + # Update SD Card Path + result = await self.klippy_apis.query_objects( + {'configfile': None}, default=None) + if result is None: + logging.info(f"Unable to set SD Card path") + else: + config = result.get('configfile', {}).get('config', {}) + vsd_config = config.get('virtual_sdcard', {}) + vsd_path = vsd_config.get('path', None) + if vsd_path is not None: + file_manager = self.lookup_plugin('file_manager') + file_manager.register_directory( + 'gcodes', vsd_path, can_delete=True) + else: + logging.info( + "Configuration for [virtual_sdcard] not found," + " unable to set SD Card path") + # Register "webhooks" subscription + try: + await self.klippy_apis.subscribe_objects({'webhooks': None}) + except ServerError as e: + logging.info("Unable to subscribe to webhooks object") + self.klippy_state = "ready" self.is_klippy_ready = True self.send_event("server:klippy_state_changed", "ready") @@ -286,10 +334,36 @@ class Server: def _process_gcode_response(self, response): self.send_event("server:gcode_response", response) - def _process_status_update(self, status): + def _process_status_update(self, eventtime, status): + if 'webhooks' in status: + # XXX - process other states (startup, ready, error, etc)? + state = status['webhooks'].get('state', None) + if state is not None: + if state == "shutdown": + self._set_klippy_shutdown() + self.klippy_state = state self.send_event("server:status_update", status) async def make_request(self, rpc_method, params): + # XXX - This adds the "response_template" to a subscription + # request and tracks all subscriptions so that each + # client gets what its requesting. In the future we should + # track subscriptions per client and send clients only + # the data they are asking for. + if rpc_method == "objects/subscribe": + for obj, items in params.get('objects', {}).items(): + if obj in self.all_subscriptions: + pi = self.all_subscriptions[obj] + if items is None or pi is None: + self.all_subscriptions[obj] = None + else: + uitems = list(set(pi) | set(items)) + self.all_subscriptions[obj] = uitems + else: + self.all_subscriptions[obj] = items + params['objects'] = dict(self.all_subscriptions) + params['response_template'] = {'method': "process_status_update"} + base_request = BaseRequest(rpc_method, params) self.pending_requests[base_request.id] = base_request self.ioloop.spawn_callback( @@ -297,7 +371,7 @@ class Server: result = await base_request.wait() return result - async def _kill_server(self): + async def _stop_server(self): # XXX - Currently this function is not used. # Should I expose functionality to shutdown # or restart the server, or simply remove this? @@ -309,20 +383,11 @@ class Server: if self.klippy_iostream is not None and \ not self.klippy_iostream.closed(): self.klippy_iostream.close() - self.close_server_sock() if self.server_running: self.server_running = False await self.moonraker_app.close() self.ioloop.stop() - def close_server_sock(self): - try: - self.remove_server_sock() - self.klippy_server_sock.close() - # XXX - remove server sock file (or use abstract?) - except Exception: - logging.exception("Error Closing Server Socket") - # Basic WebRequest class, easily converted to dict for json encoding class BaseRequest: def __init__(self, rpc_method, params): diff --git a/moonraker/plugins/file_manager.py b/moonraker/plugins/file_manager.py index b40787e..ff1b8f0 100644 --- a/moonraker/plugins/file_manager.py +++ b/moonraker/plugins/file_manager.py @@ -24,7 +24,7 @@ class FileManager: self.file_lists = {} self.gcode_metadata = {} self.metadata_lock = Lock() - self.mutable_path_args = {} + self.fixed_path_args = {} # Register file management endpoints self.server.register_endpoint( @@ -49,40 +49,32 @@ class FileManager: # Register Klippy Configuration Path config_path = config.get('config_path', None) if config_path is not None: - ret = self._register_directory( + ret = self.register_directory( 'config', config_path, can_delete=True) if not ret: raise config.error( "Option 'config_path' is not a valid directory") - def update_mutable_paths(self, paths): - # Update paths from Klippy. The sd_path can potentially change - # location on restart. - if paths == self.mutable_path_args: - # No change in mutable paths + def update_fixed_paths(self, paths): + if paths == self.fixed_path_args: + # No change in fixed paths return - self.mutable_path_args = dict(paths) + self.fixed_path_args = dict(paths) str_paths = "\n".join([f"{k}: {v}" for k, v in paths.items()]) - logging.debug(f"\nUpdating Mutable Paths:\n{str_paths}") + logging.debug(f"\nUpdating Fixed Paths:\n{str_paths}") - # Register directories - sd = paths.pop('sd_path', None) - self._register_directory("gcodes", sd, can_delete=True) # Register path for example configs - klipper_path = paths.pop('klipper_path', None) + klipper_path = paths.get('klipper_path', None) if klipper_path is not None: example_cfg_path = os.path.join(klipper_path, "config") - self._register_directory("config_examples", example_cfg_path) - paths.pop('klippy_env', None) - paths.pop('printer.cfg', None) + self.register_directory("config_examples", example_cfg_path) - # register remaining static files - for pattern, path in paths.items(): - if path is not None: - path = os.path.normpath(os.path.expanduser(path)) - self.server.register_static_file_handler(pattern, path) + # Register log path + log_file = paths.get('log_file') + log_path = os.path.normpath(os.path.expanduser(log_file)) + self.server.register_static_file_handler("klippy.log", log_path) - def _register_directory(self, base, path, can_delete=False): + def register_directory(self, base, path, can_delete=False): op_check_cb = None if base == 'gcodes': op_check_cb = self._handle_operation_check @@ -93,7 +85,9 @@ class FileManager: if not os.path.isdir(path) or not path.startswith(home) or \ path == home: logging.info( - f"Supplied path ({path}) for ({base}) not valid") + f"\nSupplied path ({path}) for ({base}) not valid. Please\n" + "check that the path exists and is a subfolder in the HOME\n" + "directory. Note that the path may not BE the home directory.") return False if path != self.file_paths.get(base, ""): self.file_paths[base] = path @@ -109,8 +103,8 @@ class FileManager: def get_sd_directory(self): return self.file_paths.get('gcodes', "") - def get_mutable_path_args(self): - return dict(self.mutable_path_args) + def get_fixed_path_args(self): + return dict(self.fixed_path_args) async def _handle_filelist_request(self, path, method, args): root = args.get('root', "gcodes") @@ -167,8 +161,8 @@ class FileManager: async def _handle_operation_check(self, requested_path): # Get virtual_sdcard status - result = await self.server.make_request( - "objects/status", {'print_stats': []}) + klippy_apis = self.server.lookup_plugin('klippy_apis') + result = await klippy_apis.query_objects({'print_stats': None}) pstats = result.get('print_stats', {}) loaded_file = pstats.get('filename', "") state = pstats.get('state', "") @@ -369,10 +363,9 @@ class FileManager: self._write_file(upload) if start_print: # Make a Klippy Request to "Start Print" - gcode_apis = self.server.lookup_plugin('gcode_apis') + klippy_apis = self.server.lookup_plugin('klippy_apis') try: - await gcode_apis.gcode_start_print( - request.path, 'POST', {'filename': upload['filename']}) + await klippy_apis.start_print(upload['filename']) except self.server.error: # Attempt to start print failed start_print = False