OctoprintKlipperPlugin/octoprint_klipper/__init__.py

379 lines
12 KiB
Python
Raw Normal View History

# <Octoprint Klipper Plugin>
2018-08-19 14:49:24 +03:00
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import datetime
2018-01-23 17:01:58 +03:00
import logging
import octoprint.plugin
import octoprint.plugin.core
import glob
import os
import sys
2018-02-08 18:38:48 +03:00
from octoprint.util.comm import parse_firmware_line
from octoprint.access.permissions import Permissions, ADMIN_GROUP, USER_GROUP
from .modules import KlipperLogAnalyzer
2018-08-09 03:16:07 +03:00
import flask
from flask_babel import gettext
2018-01-23 17:01:58 +03:00
class KlipperPlugin(
octoprint.plugin.StartupPlugin,
octoprint.plugin.TemplatePlugin,
octoprint.plugin.SettingsPlugin,
2018-02-08 18:38:48 +03:00
octoprint.plugin.AssetPlugin,
octoprint.plugin.SimpleApiPlugin,
2018-02-08 18:38:48 +03:00
octoprint.plugin.EventHandlerPlugin):
2018-08-09 10:25:05 +03:00
_parsing_response = False
_message = ""
2018-02-08 18:38:48 +03:00
2018-08-09 03:16:07 +03:00
#-- Startup Plugin
2020-08-30 01:25:20 +03:00
2018-01-23 17:01:58 +03:00
def on_after_startup(self):
2018-08-09 10:25:05 +03:00
klipper_port = self._settings.get(["connection", "port"])
additional_ports = self._settings.global_get(["serial", "additionalPorts"])
2018-08-09 10:25:05 +03:00
if klipper_port not in additional_ports:
additional_ports.append(klipper_port)
self._settings.global_set(["serial", "additionalPorts"], additional_ports)
self._settings.save()
2018-08-09 10:25:05 +03:00
self._logger.info("Added klipper serial port {} to list of additional ports.".format(klipper_port))
2018-02-02 14:39:06 +03:00
#-- Settings Plugin
2018-08-09 03:16:07 +03:00
def get_additional_permissions(self, *args, **kwargs):
return [
dict(key="CONFIG",
name="Config Klipper",
description=gettext("Allows to config klipper"),
default_groups=[ADMIN_GROUP],
dangerous=True,
roles=["admin"]
)
]
2018-01-23 17:01:58 +03:00
def get_settings_defaults(self):
return dict(
2018-08-09 10:25:05 +03:00
connection = dict(
port="/tmp/printer",
replace_connection_panel=True
),
macros = [dict(
name="E-Stop",
macro="M112",
sidebar=True,
tab=True
)],
probe = dict(
height=0,
lift=5,
speed_xy=1500,
speed_z=500,
points=[dict(
name="point-1",
x=0,
y=0
)]
),
configuration = dict(
configpath="~/printer.cfg",
logpath="/tmp/klippy.log",
2021-02-10 01:45:17 +03:00
reload_command="RESTART",
navbar=True
2018-08-09 10:25:05 +03:00
)
2018-08-09 03:16:07 +03:00
)
2020-08-30 01:25:20 +03:00
2018-08-09 03:16:07 +03:00
def on_settings_load(self):
data = octoprint.plugin.SettingsPlugin.on_settings_load(self)
2020-08-30 01:25:20 +03:00
filepath = os.path.expanduser(
self._settings.get(["configuration", "configpath"])
)
try:
f = open(filepath, "r")
2018-08-09 03:16:07 +03:00
data["config"] = f.read()
f.close()
except IOError:
self._logger.error(
"Error: Klipper config file not found at: {}".format(filepath)
2018-08-09 03:16:07 +03:00
)
return data
def on_settings_save(self, data):
if "config" in data:
try:
filepath = os.path.expanduser(
self._settings.get(["configuration", "configpath"])
)
if sys.version_info[0] < 3:
data["config"] = data["config"].encode('utf-8')
2018-08-15 02:34:06 +03:00
f = open(filepath, "w")
2018-08-09 03:16:07 +03:00
f.write(data["config"])
f.close()
self._logger.info(
2018-08-15 02:34:06 +03:00
"Writing Klipper config to {}".format(filepath)
2018-08-09 03:16:07 +03:00
)
# Restart klipply to reload config
self._printer.commands(self._settings.get(["configuration", "reload_command"]))
2018-08-09 03:16:07 +03:00
self.logInfo("Reloading Klipper Configuration.")
except IOError:
self._logger.error(
"Error: Couldn't write Klipper config file: {}".format(filepath)
)
2018-08-09 10:25:05 +03:00
data.pop("config", None) # we dont want to write the klipper conf to the octoprint settings
2018-08-09 03:16:07 +03:00
else:
octoprint.plugin.SettingsPlugin.on_settings_save(self, data)
def get_settings_restricted_paths(self):
return dict(
admin=[
2018-08-09 10:25:05 +03:00
["connection", "port"],
["configuration", "configpath"],
2018-08-09 10:25:05 +03:00
["configuration", "replace_connection_panel"]
2018-08-09 03:16:07 +03:00
],
user=[
["macros"],
2018-08-09 10:25:05 +03:00
["probe"]
2018-08-09 03:16:07 +03:00
]
)
2018-08-09 10:25:05 +03:00
def get_settings_version(self):
return 2
def on_settings_migrate(self, target, current):
if current is None:
settings = self._settings
2018-08-09 10:25:05 +03:00
if settings.has(["serialport"]):
settings.set(["connection", "port"], settings.get(["serialport"]) )
settings.remove(["serialport"])
if settings.has(["replace_connection_panel"]):
settings.set(
["connection", "replace_connection_panel"],
settings.get(["replace_connection_panel"])
)
settings.remove(["replace_connection_panel"])
if settings.has(["probeHeight"]):
settings.set(["probe", "height"], settings.get(["probeHeight"]))
settings.remove(["probeHeight"])
2018-08-09 10:25:05 +03:00
if settings.has(["probeLift"]):
settings.set(["probe", "lift"], settings.get(["probeLift"]))
settings.remove(["probeLift"])
2018-08-09 10:25:05 +03:00
if settings.has(["probeSpeedXy"]):
settings.set(["probe", "speed_xy"], settings.get(["probeSpeedXy"]))
settings.remove(["probeSpeedXy"])
2018-08-09 10:25:05 +03:00
if settings.has(["probeSpeedZ"]):
settings.set(["probe", "speed_z"], settings.get(["probeSpeedZ"]))
settings.remove(["probeSpeedZ"])
2018-08-09 10:25:05 +03:00
if settings.has(["probePoints"]):
points = settings.get(["probePoints"])
points_new = []
for p in points:
points_new.append(dict(name="", x=int(p["x"]), y=int(p["y"]), z=0))
settings.set(["probe", "points"], points_new)
settings.remove(["probePoints"])
if settings.has(["configPath"]):
settings.set(["config_path"], settings.get(["configPath"]))
settings.remove(["configPath"])
2018-02-02 14:39:06 +03:00
#-- Template Plugin
2018-08-09 03:16:07 +03:00
2018-01-23 17:01:58 +03:00
def get_template_configs(self):
return [
2018-08-09 10:25:05 +03:00
dict(type="navbar", custom_bindings=True),
dict(type="settings", custom_bindings=True),
dict(
type="generic",
name="Assisted Bed Leveling",
template="klipper_leveling_dialog.jinja2",
custom_bindings=True
),
dict(
type="generic",
name="PID Tuning",
template="klipper_pid_tuning_dialog.jinja2",
custom_bindings=True
),
dict(
type="generic",
name="Coordinate Offset",
template="klipper_offset_dialog.jinja2",
custom_bindings=True
),
dict(
type="tab",
name="Klipper",
template="klipper_tab_main.jinja2",
suffix="_main",
custom_bindings=True
),
dict(type="sidebar",
custom_bindings=True,
icon="rocket",
replaces= "connection" if self._settings.get_boolean(["connection", "replace_connection_panel"]) else ""
),
dict(
type="generic",
name="Performance Graph",
template="klipper_graph_dialog.jinja2",
custom_bindings=True
),
dict(
type="generic",
name="Macro Dialog",
template="klipper_param_macro_dialog.jinja2",
custom_bindings=True
2018-08-09 10:25:05 +03:00
)
2018-01-23 17:01:58 +03:00
]
2018-02-02 14:39:06 +03:00
#-- Asset Plugin
2018-01-23 17:01:58 +03:00
def get_assets(self):
return dict(
2018-01-24 19:15:48 +03:00
js=["js/klipper.js",
"js/klipper_settings.js",
"js/klipper_leveling.js",
"js/klipper_pid_tuning.js",
"js/klipper_offset.js",
"js/klipper_param_macro.js",
"js/klipper_graph.js"
],
2018-01-23 17:01:58 +03:00
css=["css/klipper.css"],
less=["css/klipper.less"]
)
2018-02-08 18:38:48 +03:00
#-- Event Handler Plugin
2018-02-08 18:38:48 +03:00
def on_event(self, event, payload):
if "UserLoggedIn" == event:
self.updateStatus("info","Klipper: Standby")
2018-02-08 18:38:48 +03:00
if "Connecting" == event:
self.updateStatus("info", "Klipper: Connecting ...")
2018-02-08 18:38:48 +03:00
elif "Connected" == event:
self.updateStatus("info", "Klipper: Connected to host")
2018-08-09 03:16:07 +03:00
self.logInfo("Connected to host via {} @{}bps".format(payload["port"], payload["baudrate"]))
2018-02-08 18:38:48 +03:00
elif "Disconnected" == event:
self.updateStatus("info", "Klipper: Disconnected from host")
2018-02-08 18:38:48 +03:00
elif "Error" == event:
self.updateStatus("error", "Klipper: Error")
2018-02-08 18:38:48 +03:00
self.logError(payload["error"])
2018-02-02 14:39:06 +03:00
#-- GCODE Hook
2018-02-02 14:39:06 +03:00
def on_parse_gcode(self, comm, line, *args, **kwargs):
2018-02-08 18:38:48 +03:00
if "FIRMWARE_VERSION" in line:
printerInfo = parse_firmware_line(line)
if "FIRMWARE_VERSION" in printerInfo:
2018-08-09 03:16:07 +03:00
self.logInfo("Firmware version: {}".format(printerInfo["FIRMWARE_VERSION"]))
2018-02-08 18:38:48 +03:00
elif "//" in line:
2018-02-02 23:51:24 +03:00
self._message = self._message + line.strip('/')
2018-08-09 10:25:05 +03:00
if not self._parsing_response:
2018-08-09 03:16:07 +03:00
self.updateStatus("info", self._message)
2018-08-09 10:25:05 +03:00
self._parsing_response = True
2018-02-02 14:39:06 +03:00
else:
2018-08-09 10:25:05 +03:00
if self._parsing_response:
self._parsing_response = False
2018-08-09 03:16:07 +03:00
self.logInfo(self._message)
self._message = ""
2018-02-02 14:39:06 +03:00
if "!!" in line:
2018-08-09 03:16:07 +03:00
msg = line.strip('!')
self.updateStatus("error", msg)
self.logError(msg)
2018-02-02 14:39:06 +03:00
return line
2018-02-08 18:38:48 +03:00
def get_api_commands(self):
return dict(
listLogFiles=[],
getStats=["logFile"],
loadConfig=["configFile"]
)
def on_api_command(self, command, data):
if command == "listLogFiles":
files = []
2019-03-13 14:52:14 +03:00
for f in glob.glob(self._settings.get(["configuration", "logpath"]) + "*"):
filesize = os.path.getsize(f)
files.append(dict(
name=os.path.basename(f) + " ({:.1f} KB)".format(filesize / 1000.0),
file=f,
size=filesize
))
return flask.jsonify(data=files)
elif command == "getStats":
if "logFile" in data:
log_analyzer = KlipperLogAnalyzer.KlipperLogAnalyzer(data["logFile"])
return flask.jsonify(log_analyzer.analyze())
elif command == "loadConfig":
kc = Parser()
sections = kc.load(data["configFile"])
return flask.jsonify(sections)
2018-08-15 02:34:06 +03:00
2018-08-09 12:25:28 +03:00
def get_update_information(self):
return dict(
klipper=dict(
displayName=self._plugin_name,
displayVersion=self._plugin_version,
type="github_release",
current=self._plugin_version,
user="AliceGrey",
2018-08-09 12:25:28 +03:00
repo="OctoprintKlipperPlugin",
pip="https://github.com/AliceGrey/OctoprintKlipperPlugin/archive/{target_version}.zip"
2018-08-09 12:25:28 +03:00
)
)
2018-02-02 14:39:06 +03:00
#-- Helpers
2018-02-08 18:38:48 +03:00
def sendMessage(self, type, subtype, payload):
2018-08-09 10:25:05 +03:00
self._plugin_manager.send_plugin_message(
self._identifier,
dict(
time=datetime.datetime.now().strftime("%H:%M:%S"),
type=type,
2018-08-22 17:25:14 +03:00
subtype=subtype,
2018-08-09 10:25:05 +03:00
payload=payload
)
)
2018-02-26 13:46:33 +03:00
def pollStatus(self):
2018-08-09 10:25:05 +03:00
self._printer.commands("STATUS")
2018-02-08 18:38:48 +03:00
def updateStatus(self, type, status):
2018-08-09 10:25:05 +03:00
self.sendMessage("status", type, status)
2018-02-08 18:38:48 +03:00
def logInfo(self, message):
2018-08-09 10:25:05 +03:00
self.sendMessage("log", "info", message)
2018-02-02 14:39:06 +03:00
2018-02-08 18:38:48 +03:00
def logError(self, error):
2018-08-09 10:25:05 +03:00
self.sendMessage("log", "error", error)
2018-02-02 14:39:06 +03:00
__plugin_name__ = "OctoKlipper"
__plugin_pythoncompat__ = ">=2.7,<4"
2018-01-23 17:01:58 +03:00
def __plugin_load__():
global __plugin_implementation__
global __plugin_hooks__
__plugin_implementation__ = KlipperPlugin()
__plugin_hooks__ = {
"octoprint.access.permissions": __plugin_implementation__.get_additional_permissions,
2018-08-09 12:25:28 +03:00
"octoprint.comm.protocol.gcode.received": __plugin_implementation__.on_parse_gcode,
"octoprint.plugin.softwareupdate.check_config": __plugin_implementation__.get_update_information
2018-01-23 17:01:58 +03:00
}