# # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . from __future__ import absolute_import, division, print_function, unicode_literals import logging import octoprint.plugin import octoprint.plugin.core import glob import os import time import sys from octoprint.server import NO_CONTENT from octoprint.util import is_hidden_path from octoprint.util import get_formatted_size from . import util, cfgUtils, logger from octoprint.util.comm import parse_firmware_line from octoprint.access.permissions import Permissions, ADMIN_GROUP from .modules import KlipperLogAnalyzer from octoprint.server.util.flask import restricted_access import flask from flask_babel import gettext try: import configparser except ImportError: import ConfigParser as configparser if sys.version_info[0] < 3: import StringIO MAX_UPLOAD_SIZE = 5 * 1024 * 1024 # 5Mb class KlipperPlugin( octoprint.plugin.StartupPlugin, octoprint.plugin.TemplatePlugin, octoprint.plugin.SettingsPlugin, octoprint.plugin.AssetPlugin, octoprint.plugin.SimpleApiPlugin, octoprint.plugin.EventHandlerPlugin, octoprint.plugin.BlueprintPlugin): _parsing_response = False _parsing_check_response = True _message = "" def __init__(self): self._logger = logging.getLogger("octoprint.plugins.klipper") self._octoklipper_logger = logging.getLogger("octoprint.plugins.klipper.debug") # -- Startup Plugin def on_startup(self, host, port): from octoprint.logging.handlers import CleaningTimedRotatingFileHandler octoklipper_logging_handler = CleaningTimedRotatingFileHandler( self._settings.get_plugin_logfile_path(postfix="debug"), when="D", backupCount=3) octoklipper_logging_handler.setFormatter(logging.Formatter("[%(asctime)s] %(levelname)s: %(message)s")) octoklipper_logging_handler.setLevel(logging.DEBUG) self._octoklipper_logger.addHandler(octoklipper_logging_handler) self._octoklipper_logger.setLevel( logging.DEBUG if self._settings.get_boolean(["debug_logging"]) else logging.INFO) self._octoklipper_logger.propagate = False def on_after_startup(self): klipper_port = self._settings.get(["connection", "port"]) additional_ports = self._settings.global_get( ["serial", "additionalPorts"]) if klipper_port not in additional_ports: additional_ports.append(klipper_port) self._settings.global_set( ["serial", "additionalPorts"], additional_ports) self._settings.save() logger.log_info( self, "Added klipper serial port {} to list of additional ports.".format(klipper_port) ) # -- Settings Plugin def get_additional_permissions(self, *args, **kwargs): return [ { "key": "CONFIG", "name": "Config Klipper", "description": gettext("Allows to config klipper"), "default_groups": [ADMIN_GROUP], "dangerous": True, "roles": ["admin"] }, { "key": "MACRO", "name": "Use Klipper Macros", "description": gettext("Allows to use klipper macros"), "default_groups": [ADMIN_GROUP], "dangerous": True, "roles": ["admin"] }, ] def get_settings_defaults(self): return dict( connection=dict( port="/tmp/printer", replace_connection_panel=True ), macros=[dict( name="E-Stop", macro="M112", sidebar=True, tab=True )], probe=dict( height=0, lift=5, speed_xy=1500, speed_z=500, points=[dict( name="point-1", x=0, y=0 )] ), configuration=dict( debug_logging=False, configpath="~/", old_config="", logpath="/tmp/klippy.log", reload_command="RESTART", shortStatus_navbar=True, shortStatus_sidebar=True, parse_check=False, fontsize=9 ) ) def on_settings_load(self): data = octoprint.plugin.SettingsPlugin.on_settings_load(self) configpath = os.path.expanduser( self._settings.get(["configuration", "configpath"]) ) standardconfigfile = os.path.join(configpath, "printer.cfg") data["config"] = cfgUtils.get_cfg(self, standardconfigfile) util.send_message(self, "reload", "config", "", data["config"]) # send the configdata to frontend to update ace editor return data def on_settings_save(self, data): if "config" in data: if cfgUtils.save_cfg(self, data["config"], "printer.cfg"): #load the reload command from changed data if it is not existing load the saved setting if util.key_exist(data, "configuration", "reload_command"): reload_command = os.path.expanduser( data["configuration"]["reload_command"] ) else: reload_command = self._settings.get(["configuration", "reload_command"]) if reload_command != "manually": # Restart klippy to reload config self._printer.commands(reload_command) logger.log_info(self, "Restarting Klipper.") # we dont want to write the klipper conf to the octoprint settings else: # save not sure. saving to the octoprintconfig: self._settings.set(["configuration", "temp_config"], data) data.pop("config", None) # save the rest of changed settings into config.yaml of octoprint old_debug_logging = self._settings.get_boolean(["configuration", "debug_logging"]) octoprint.plugin.SettingsPlugin.on_settings_save(self, data) new_debug_logging = self._settings.get_boolean(["configuration", "debug_logging"]) if old_debug_logging != new_debug_logging: if new_debug_logging: self._octoklipper_logger.setLevel(logging.DEBUG) else: self._octoklipper_logger.setLevel(logging.INFO) def get_settings_restricted_paths(self): return dict( admin=[ ["connection", "port"], ["configuration", "configpath"], ["configuration", "replace_connection_panel"] ], user=[ ["macros"], ["probe"] ] ) def get_settings_version(self): return 4 #migrate Settings def on_settings_migrate(self, target, current): settings = self._settings if current is None: self.migrate_old_settings(settings) if current is not None and current < 3: self.migrate_configuration( settings, "shortStatus_navbar", "navbar", ) if current is not None and current < 4: self.migrate_configuration( settings, "old_config", "temp_config", ) setting_path = settings.get(["configuration", "configpath"]) if setting_path.find("printer.cfg")!=-1: new_setting_path = setting_path.replace("printer.cfg","") logger.log_info(self, "migrate setting for 'configuration/configpath': " + setting_path + " -> " + new_setting_path) settings.set(["configuration", "configpath"], new_setting_path) def migrate_old_settings(self, settings): self.migrate_settings(settings, "serialport", "connection", "port") self.migrate_settings(settings, "replace_connection_panel", "connection", "replace_connection_panel") self.migrate_settings(settings, "probeHeight", "probe", "height") self.migrate_settings(settings, "probeLift", "probe", "lift") self.migrate_settings(settings, "probeSpeedXy", "probe", "speed_xy") self.migrate_settings(settings, "probeSpeedZ", "probe", "speed_z") self.migrate_settings(settings, "configPath", "configpath") if settings.has(["probePoints"]): points = settings.get(["probePoints"]) points_new = [dict(name="", x=int(p["x"]), y=int(p["y"]), z=0) for p in points] settings.set(["probe", "points"], points_new) settings.remove(["probePoints"]) def migrate_settings(self, settings, old, new, new2="") -> None: if settings.has([old]): if new2 != "": logger.log_info(self, "migrate setting for '" + old + "' -> '" + new + "/" + new2 + "'") settings.set([new, new2], settings.get([old])) else: logger.log_info(self, "migrate setting for '" + old + "' -> '" + new + "'") settings.set([new], settings.get([old])) settings.remove([old]) def migrate_settings_configuration(self, settings, new, old): if settings.has(["configuration", old]): logger.log_info(self, "migrate setting for 'configuration/" + old + "' -> 'configuration/" + new + "'") settings.set(["configuration", new], settings.get(["configuration", old])) settings.remove(["configuration", old]) # -- Template Plugin def get_template_configs(self): return [ dict(type="navbar", custom_bindings=True), dict(type="settings", custom_bindings=True), dict( type="generic", name="Assisted Bed Leveling", template="klipper_leveling_dialog.jinja2", custom_bindings=True ), dict( type="generic", name="PID Tuning", template="klipper_pid_tuning_dialog.jinja2", custom_bindings=True ), dict( type="generic", name="Coordinate Offset", template="klipper_offset_dialog.jinja2", custom_bindings=True ), dict( type="tab", name="Klipper", template="klipper_tab_main.jinja2", suffix="_main", custom_bindings=True ), dict(type="sidebar", custom_bindings=True, icon="rocket", replaces="connection" if self._settings.get_boolean( ["connection", "replace_connection_panel"]) else "" ), dict( type="generic", name="Performance Graph", template="klipper_graph_dialog.jinja2", custom_bindings=True ), dict( type="generic", name="Config Backups", template="klipper_backups_dialog.jinja2", custom_bindings=True ), dict( type="generic", name="Config Editor", template="klipper_editor.jinja2", custom_bindings=True ), dict( type="generic", name="Macro Dialog", template="klipper_param_macro_dialog.jinja2", custom_bindings=True ) ] def get_template_vars(self): return { "max_upload_size": MAX_UPLOAD_SIZE, "max_upload_size_str": get_formatted_size(MAX_UPLOAD_SIZE), } # -- Asset Plugin def get_assets(self): return dict( js=["js/klipper.js", "js/klipper_settings.js", "js/klipper_leveling.js", "js/klipper_pid_tuning.js", "js/klipper_offset.js", "js/klipper_param_macro.js", "js/klipper_graph.js", "js/klipper_backup.js", "js/klipper_editor.js" ], clientjs=["clientjs/klipper.js"], css=["css/klipper.css"] ) # -- Event Handler Plugin def on_event(self, event, payload): if event == "UserLoggedIn": util.update_status(self, "info", "Klipper: Standby") if event == "Connecting": util.update_status(self, "info", "Klipper: Connecting ...") elif event == "Connected": util.update_status(self, "info", "Klipper: Connected to host") logger.log_info( self, "Connected to host via {} @{}bps".format(payload["port"], payload["baudrate"])) elif event == "Disconnected": util.update_status(self, "info", "Klipper: Disconnected from host") elif event == "Error": util.update_status(self, "error", "Klipper: Error") logger.log_error(self, payload["error"]) # -- GCODE Hook def on_parse_gcode(self, comm, line, *args, **kwargs): if "FIRMWARE_VERSION" in line: printerInfo = parse_firmware_line(line) if "FIRMWARE_VERSION" in printerInfo: logger.log_info(self, "Firmware version: {}".format( printerInfo["FIRMWARE_VERSION"])) elif "// probe" in line or "// Failed to verify BLTouch" in line: msg = line.strip('/') logger.log_info(self, msg) self.write_parsing_response_buffer() elif "//" in line: # add lines with // to a buffer self._message = self._message + line.strip('/') if not self._parsing_response: util.update_status(self, "info", self._message) self._parsing_response = True elif "!!" in line: msg = line.strip('!') util.update_status(self, "error", msg) logger.log_error(self, msg) self.write_parsing_response_buffer() else: self.write_parsing_response_buffer() return line def write_parsing_response_buffer(self): # write buffer with // lines after a gcode response without // if self._parsing_response: self._parsing_response = False logger.log_info(self, self._message) self._message = "" def get_api_commands(self): return dict( listLogFiles=[], getStats=["logFile"] ) def on_api_command(self, command, data): if command == "listLogFiles": files = [] logpath = os.path.expanduser( self._settings.get(["configuration", "logpath"]) ) if util.file_exist(self, logpath): for f in glob.glob(self._settings.get(["configuration", "logpath"]) + "*"): filesize = os.path.getsize(f) filemdate = time.strftime("%d.%m.%Y %H:%M",time.localtime(os.path.getctime(f))) files.append(dict( name=os.path.basename(f) + " (" + filemdate + ")", file=f, size=filesize )) return flask.jsonify(data=files) elif command == "getStats": if "logFile" in data: log_analyzer = KlipperLogAnalyzer.KlipperLogAnalyzer( data["logFile"]) return flask.jsonify(log_analyzer.analyze()) def is_blueprint_protected(self): return False def route_hook(self, server_routes, *args, **kwargs): from octoprint.server.util.tornado import LargeResponseHandler, path_validation_factory from octoprint.util import is_hidden_path configpath = os.path.expanduser( self._settings.get(["configuration", "configpath"]) ) bak_path = os.path.join(self.get_plugin_data_folder(), "configs", "") return [ (r"/download/configs/(.*)", LargeResponseHandler, dict(path=configpath, as_attachment=True, path_validation=path_validation_factory(lambda path: not is_hidden_path(path), status_code=404))), (r"/download/backup/(.*)", LargeResponseHandler, dict(path=bak_path, as_attachment=True, path_validation=path_validation_factory(lambda path: not is_hidden_path(path), status_code=404))) ] # API for Backups # Get Content of a Backupconfig @octoprint.plugin.BlueprintPlugin.route("/backup/", methods=["GET"]) @restricted_access @Permissions.PLUGIN_KLIPPER_CONFIG.require(403) def get_backup(self, filename): data_folder = self.get_plugin_data_folder() full_path = os.path.realpath(os.path.join(data_folder, "configs", filename)) response = cfgUtils.get_cfg(self, full_path) return flask.jsonify(response = response) # Delete a Backupconfig @octoprint.plugin.BlueprintPlugin.route("/backup/", methods=["DELETE"]) @restricted_access @Permissions.PLUGIN_KLIPPER_CONFIG.require(403) def delete_backup(self, filename): data_folder = self.get_plugin_data_folder() full_path = os.path.realpath(os.path.join(data_folder, "configs", filename)) if ( full_path.startswith(data_folder) and os.path.exists(full_path) and not is_hidden_path(full_path) ): try: os.remove(full_path) except Exception: self._octoklipper_logger.exception("Could not delete {}".format(filename)) raise return NO_CONTENT # Get a list of all backuped configfiles @octoprint.plugin.BlueprintPlugin.route("/backup/list", methods=["GET"]) @restricted_access @Permissions.PLUGIN_KLIPPER_CONFIG.require(403) def list_backups(self): files = cfgUtils.list_cfg_files(self, "backup") return flask.jsonify(files = files) # restore a backuped configfile @octoprint.plugin.BlueprintPlugin.route("/backup/restore/", methods=["GET"]) @restricted_access @Permissions.PLUGIN_KLIPPER_CONFIG.require(403) def restore_backup(self, filename): configpath = os.path.expanduser( self._settings.get(["configuration", "configpath"]) ) data_folder = self.get_plugin_data_folder() backupfile = os.path.realpath(os.path.join(data_folder, "configs", filename)) return flask.jsonify(restored = cfgUtils.copy_cfg(self, backupfile, configpath)) # API for Configs # Get Content of a Configfile @octoprint.plugin.BlueprintPlugin.route("/config/", methods=["GET"]) @restricted_access @Permissions.PLUGIN_KLIPPER_CONFIG.require(403) def get_config(self, filename): cfg_path = os.path.expanduser( self._settings.get(["configuration", "configpath"]) ) full_path = os.path.realpath(os.path.join(cfg_path, filename)) response = cfgUtils.get_cfg(self, full_path) return flask.jsonify(response = response) # Delete a Configfile @octoprint.plugin.BlueprintPlugin.route("/config/", methods=["DELETE"]) @restricted_access @Permissions.PLUGIN_KLIPPER_CONFIG.require(403) def delete_config(self, filename): cfg_path = os.path.expanduser( self._settings.get(["configuration", "configpath"]) ) full_path = os.path.realpath(os.path.join(cfg_path, filename)) if ( full_path.startswith(cfg_path) and os.path.exists(full_path) and not is_hidden_path(full_path) ): try: os.remove(full_path) except Exception: self._octoklipper_logger.exception("Could not delete {}".format(filename)) raise return NO_CONTENT # Get a list of all configfiles @octoprint.plugin.BlueprintPlugin.route("/config/list", methods=["GET"]) @restricted_access @Permissions.PLUGIN_KLIPPER_CONFIG.require(403) def list_configs(self): files = cfgUtils.list_cfg_files(self, "") return flask.jsonify(files = files, max_upload_size = MAX_UPLOAD_SIZE) # check syntax of a given data @octoprint.plugin.BlueprintPlugin.route("/config/check", methods=["POST"]) @restricted_access @Permissions.PLUGIN_KLIPPER_CONFIG.require(403) def check_config(self): data = flask.request.json data_to_check = data.get("DataToCheck", []) response = cfgUtils.check_cfg(self, data_to_check) return flask.jsonify(is_syntax_ok = response) # save a configfile @octoprint.plugin.BlueprintPlugin.route("/config/save", methods=["POST"]) @restricted_access @Permissions.PLUGIN_KLIPPER_CONFIG.require(403) def save_config(self): data = flask.request.json filename = data.get("filename", []) if filename == []: flask.abort( 400, description="Invalid request, the filename is not set", ) Filecontent = data.get("DataToSave", []) saved = cfgUtils.save_cfg(self, Filecontent, filename) if saved == True: util.send_message(self, "reload", "configlist", "", "") return flask.jsonify(saved = saved) # APIs end def get_update_information(self): return dict( klipper=dict( displayName=self._plugin_name, displayVersion=self._plugin_version, type="github_release", current=self._plugin_version, user="thelastWallE", repo="OctoprintKlipperPlugin", pip="https://github.com/thelastWallE/OctoprintKlipperPlugin/archive/{target_version}.zip", stable_branch=dict( name="Stable", branch="master", comittish=["master"] ), prerelease_branches=[ dict( name="Release Candidate", branch="rc", comittish=["rc", "master"] )] ) ) __plugin_name__ = "OctoKlipper" __plugin_pythoncompat__ = ">=2.7,<4" __plugin_settings_overlay__ = { 'system': { 'actions': [{ 'action': 'octoklipper_restart', 'command': 'sudo service klipper restart', 'name': 'Restart Klipper', 'confirm': '

' + gettext("You are about to restart Klipper!") + '
' + gettext("This will stop ongoing prints!") + '


Command = "sudo service klipper restart"' }] } } def __plugin_load__(): global __plugin_implementation__ global __plugin_hooks__ __plugin_implementation__ = KlipperPlugin() __plugin_hooks__ = { "octoprint.server.http.routes": __plugin_implementation__.route_hook, "octoprint.access.permissions": __plugin_implementation__.get_additional_permissions, "octoprint.comm.protocol.gcode.received": __plugin_implementation__.on_parse_gcode, "octoprint.plugin.softwareupdate.check_config": __plugin_implementation__.get_update_information }