moonraker: refactor KlippyConnection

Move the KlippyConnection class into its own module.  Refactor
init to use loops rather than callbacks, this reduces complexity
of tracking and cancelling callback handles.

All Klippy state previously tracked by the Server is now in the
KlippyConnection.  This improves testing and makes the code
less ambiguous, ie: the `server.make_request()` method is not
as clear as `klippy.request()`.

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2022-02-09 15:48:24 -05:00
parent 737cf8a2cb
commit b4ddffd5d1
6 changed files with 586 additions and 485 deletions

View File

@ -44,6 +44,7 @@ if TYPE_CHECKING:
from moonraker import Server from moonraker import Server
from eventloop import EventLoop from eventloop import EventLoop
from confighelper import ConfigHelper from confighelper import ConfigHelper
from klippy_connection import KlippyConnection as Klippy
from components.file_manager.file_manager import FileManager from components.file_manager.file_manager import FileManager
import components.authorization import components.authorization
MessageDelgate = Optional[tornado.httputil.HTTPMessageDelegate] MessageDelgate = Optional[tornado.httputil.HTTPMessageDelegate]
@ -135,7 +136,8 @@ class InternalTransport(APITransport):
# Request to Klippy # Request to Klippy
method = api_def.jrpc_methods[0] method = api_def.jrpc_methods[0]
action = "" action = ""
cb = self.server.make_request klippy: Klippy = self.server.lookup_component("klippy_connection")
cb = klippy.request
self.callbacks[method] = (ep, action, cb) self.callbacks[method] = (ep, action, cb)
else: else:
for method, action in \ for method, action in \
@ -620,7 +622,8 @@ class DynamicRequestHandler(AuthorizedRequestHandler):
conn: Optional[WebSocket] conn: Optional[WebSocket]
) -> Any: ) -> Any:
assert isinstance(self.callback, str) assert isinstance(self.callback, str)
return await self.server.make_request( klippy: Klippy = self.server.lookup_component("klippy_connection")
return await klippy.request(
WebRequest(self.callback, args, conn=conn, WebRequest(self.callback, args, conn=conn,
ip_addr=self.request.remote_ip, ip_addr=self.request.remote_ip,
user=self.current_user)) user=self.current_user))

View File

@ -22,6 +22,7 @@ from typing import (
if TYPE_CHECKING: if TYPE_CHECKING:
from confighelper import ConfigHelper from confighelper import ConfigHelper
from websockets import WebRequest from websockets import WebRequest
from klippy_connection import KlippyConnection as Klippy
Subscription = Dict[str, Optional[List[Any]]] Subscription = Dict[str, Optional[List[Any]]]
_T = TypeVar("_T") _T = TypeVar("_T")
@ -39,6 +40,7 @@ SENTINEL = SentinelClass.get_instance()
class KlippyAPI(Subscribable): class KlippyAPI(Subscribable):
def __init__(self, config: ConfigHelper) -> None: def __init__(self, config: ConfigHelper) -> None:
self.server = config.get_server() self.server = config.get_server()
self.klippy: Klippy = self.server.lookup_component("klippy_connection")
app_args = self.server.get_app_args() app_args = self.server.get_app_args()
self.version = app_args.get('software_version') self.version = app_args.get('software_version')
# Maintain a subscription for all moonraker requests, as # Maintain a subscription for all moonraker requests, as
@ -84,7 +86,7 @@ class KlippyAPI(Subscribable):
default: Any = SENTINEL default: Any = SENTINEL
) -> Any: ) -> Any:
try: try:
result = await self.server.make_request( result = await self.klippy.request(
WebRequest(method, params, conn=self)) WebRequest(method, params, conn=self))
except self.server.error: except self.server.error:
if isinstance(default, SentinelClass): if isinstance(default, SentinelClass):
@ -102,17 +104,26 @@ class KlippyAPI(Subscribable):
return result return result
async def start_print(self, filename: str) -> str: async def start_print(self, filename: str) -> str:
# WARNING: Do not call this method from within the following
# event handlers:
# klippy_identified, klippy_started, klippy_ready, klippy_disconnect
# Doing so will result in a deadlock
# XXX - validate that file is on disk # XXX - validate that file is on disk
if filename[0] == '/': if filename[0] == '/':
filename = filename[1:] filename = filename[1:]
# Escape existing double quotes in the file name # Escape existing double quotes in the file name
filename = filename.replace("\"", "\\\"") filename = filename.replace("\"", "\\\"")
script = f'SDCARD_PRINT_FILE FILENAME="{filename}"' script = f'SDCARD_PRINT_FILE FILENAME="{filename}"'
await self.server.wait_connection_initialized() await self.klippy.wait_connected()
return await self.run_gcode(script) return await self.run_gcode(script)
async def do_restart(self, gc: str) -> str: async def do_restart(self, gc: str) -> str:
await self.server.wait_connection_initialized() # WARNING: Do not call this method from within the following
# event handlers:
# klippy_identified, klippy_started, klippy_ready, klippy_disconnect
# Doing so will result in a deadlock
# XXX - validate that file is on disk
await self.klippy.wait_connected()
try: try:
result = await self.run_gcode(gc) result = await self.run_gcode(gc)
except self.server.error as e: except self.server.error as e:

View File

@ -31,6 +31,7 @@ from typing import (
if TYPE_CHECKING: if TYPE_CHECKING:
from app import APIDefinition from app import APIDefinition
from confighelper import ConfigHelper from confighelper import ConfigHelper
from klippy_connection import KlippyConnection as Klippy
FlexCallback = Callable[[bytes], Optional[Coroutine]] FlexCallback = Callable[[bytes], Optional[Coroutine]]
RPCCallback = Callable[..., Coroutine] RPCCallback = Callable[..., Coroutine]
@ -141,6 +142,7 @@ class MQTTClient(APITransport, Subscribable):
def __init__(self, config: ConfigHelper) -> None: def __init__(self, config: ConfigHelper) -> None:
self.server = config.get_server() self.server = config.get_server()
self.event_loop = self.server.get_event_loop() self.event_loop = self.server.get_event_loop()
self.klippy: Klippy = self.server.lookup_component("klippy_connection")
self.address: str = config.get('address') self.address: str = config.get('address')
self.port: int = config.getint('port', 1883) self.port: int = config.getint('port', 1883)
user = config.gettemplate('username', None) user = config.gettemplate('username', None)
@ -271,7 +273,7 @@ class MQTTClient(APITransport, Subscribable):
if self.status_objs: if self.status_objs:
args = {'objects': self.status_objs} args = {'objects': self.status_objs}
try: try:
await self.server.make_request( await self.klippy.request(
WebRequest("objects/subscribe", args, conn=self)) WebRequest("objects/subscribe", args, conn=self))
except self.server.error: except self.server.error:
pass pass
@ -607,7 +609,7 @@ class MQTTClient(APITransport, Subscribable):
def _generate_remote_callback(self, endpoint: str) -> RPCCallback: def _generate_remote_callback(self, endpoint: str) -> RPCCallback:
async def func(**kwargs) -> Any: async def func(**kwargs) -> Any:
self._check_timestamp(kwargs) self._check_timestamp(kwargs)
result = await self.server.make_request( result = await self.klippy.request(
WebRequest(endpoint, kwargs)) WebRequest(endpoint, kwargs))
return result return result
return func return func

View File

@ -0,0 +1,543 @@
# KlippyConnection - manage unix socket connection to Klipper
#
# Copyright (C) 2022 Eric Callahan <arksine.code@gmail.com>
#
# This file may be distributed under the terms of the GNU GPLv3 license
from __future__ import annotations
import os
import time
import logging
import json
import getpass
import confighelper
import asyncio
from utils import ServerError
# Annotation imports
from typing import (
TYPE_CHECKING,
Any,
Awaitable,
Optional,
Callable,
Coroutine,
Dict,
List,
)
if TYPE_CHECKING:
from app import MoonrakerApp
from websockets import WebRequest, Subscribable
from components.data_store import DataStore
from components.klippy_apis import KlippyAPI
from components.file_manager.file_manager import FileManager
FlexCallback = Callable[..., Optional[Coroutine]]
INIT_TIME = .25
LOG_ATTEMPT_INTERVAL = int(2. / INIT_TIME + .5)
MAX_LOG_ATTEMPTS = 10 * LOG_ATTEMPT_INTERVAL
UNIX_BUFFER_LIMIT = 2 * 1024 * 1024
class KlippyConnection:
def __init__(self, config: confighelper.ConfigHelper) -> None:
self.server = config.get_server()
self.uds_address: str = config.get(
'klippy_uds_address', "/tmp/klippy_uds")
self.writer: Optional[asyncio.StreamWriter] = None
self.connection_mutex: asyncio.Lock = asyncio.Lock()
self.event_loop = self.server.get_event_loop()
self.log_no_access = True
# Connection State
self.connection_task: Optional[asyncio.Task] = None
self.closing: bool = False
self._klippy_info: Dict[str, Any] = {}
self.init_list: List[str] = []
self.init_attempts: int = 0
self._state: str = "disconnected"
self.subscriptions: Dict[Subscribable, Dict[str, Any]] = {}
# Setup remote methods accessable to Klippy. Note that all
# registered remote methods should be of the notification type,
# they do not return a response to Klippy after execution
self.pending_requests: Dict[int, KlippyRequest] = {}
self.remote_methods: Dict[str, FlexCallback] = {}
self.klippy_reg_methods: List[str] = []
self.register_remote_method(
'process_gcode_response', self._process_gcode_response,
need_klippy_reg=False)
self.register_remote_method(
'process_status_update', self._process_status_update,
need_klippy_reg=False)
self.server.register_component("klippy_connection", self)
@property
def klippy_apis(self) -> KlippyAPI:
return self.server.lookup_component("klippy_apis")
@property
def state(self) -> str:
return self._state
@property
def klippy_info(self) -> Dict[str, Any]:
return self._klippy_info
async def wait_connected(self) -> bool:
if (
self.connection_task is None or
self.connection_task.done()
):
return self.is_connected()
try:
await self.connection_task
except Exception:
pass
return self.is_connected()
async def _read_stream(self, reader: asyncio.StreamReader) -> None:
errors_remaining: int = 10
while not reader.at_eof():
try:
data = await reader.readuntil(b'\x03')
except (ConnectionError, asyncio.IncompleteReadError):
break
except asyncio.CancelledError:
logging.exception("Klippy Stream Read Cancelled")
raise
except Exception:
logging.exception("Klippy Stream Read Error")
errors_remaining -= 1
if not errors_remaining or not self.is_connected():
break
continue
errors_remaining = 10
try:
decoded_cmd = json.loads(data[:-1])
self._process_command(decoded_cmd)
except Exception:
logging.exception(
f"Error processing Klippy Host Response: {data.decode()}")
if not self.closing:
logging.debug("Klippy Disconnection From _read_stream()")
await self.close()
async def _write_request(self, request: KlippyRequest) -> None:
if self.writer is None or self.closing:
self.pending_requests.pop(request.id, None)
request.notify(ServerError("Klippy Host not connected", 503))
return
data = json.dumps(request.to_dict()).encode() + b"\x03"
try:
self.writer.write(data)
await self.writer.drain()
except asyncio.CancelledError:
self.pending_requests.pop(request.id, None)
request.notify(ServerError("Klippy Write Request Cancelled", 503))
raise
except Exception:
self.pending_requests.pop(request.id, None)
request.notify(ServerError("Klippy Write Request Error", 503))
if not self.closing:
logging.debug("Klippy Disconnection From _write_request()")
await self.close()
def register_remote_method(self,
method_name: str,
cb: FlexCallback,
need_klippy_reg: bool = True
) -> None:
if method_name in self.remote_methods:
raise self.server.error(
f"Remote method ({method_name}) already registered")
if self.server.is_running():
raise self.server.error(
f"Failed to register remote method {method_name}, "
"methods must be registered during initialization")
self.remote_methods[method_name] = cb
if need_klippy_reg:
# These methods need to be registered with Klippy
self.klippy_reg_methods.append(method_name)
def connect(self) -> Awaitable[bool]:
if (
self.is_connected() or
not self.server.is_running() or
(self.connection_task is not None and
not self.connection_task.done())
):
# already connecting
fut = self.event_loop.create_future()
fut.set_result(self.is_connected())
return fut
self.connection_task = self.event_loop.create_task(self._do_connect())
return self.connection_task
async def _do_connect(self) -> bool:
async with self.connection_mutex:
while self.writer is None:
await asyncio.sleep(INIT_TIME)
if self.closing or not self.server.is_running():
return False
if not os.path.exists(self.uds_address):
continue
if not os.access(self.uds_address, os.R_OK | os.W_OK):
if self.log_no_access:
user = getpass.getuser()
logging.info(
f"Cannot connect to Klippy, Linux user '{user}' "
"lacks permission to open Unix Domain Socket: "
f"{self.uds_address}")
self.log_no_access = False
continue
self.log_no_access = True
try:
reader, writer = await asyncio.open_unix_connection(
self.uds_address, limit=UNIX_BUFFER_LIMIT)
except asyncio.CancelledError:
raise
except Exception:
continue
logging.info("Klippy Connection Established")
self.writer = writer
self.event_loop.create_task(self._read_stream(reader))
return await self._init_klippy_connection()
async def _init_klippy_connection(self) -> bool:
self.init_list = []
self.init_attempts = 0
self._state = "initializing"
webhooks_err_logged = False
gcout_err_logged = False
while self.server.is_running():
await asyncio.sleep(INIT_TIME)
# Subscribe to "webhooks"
# Register "webhooks" subscription
if "webhooks_sub" not in self.init_list:
try:
await self.klippy_apis.subscribe_objects(
{'webhooks': None})
except ServerError as e:
if not webhooks_err_logged:
webhooks_err_logged = True
logging.info(
f"{e}\nUnable to subscribe to webhooks object")
else:
logging.info("Webhooks Subscribed")
self.init_list.append("webhooks_sub")
# Subscribe to Gcode Output
if "gcode_output_sub" not in self.init_list:
try:
await self.klippy_apis.subscribe_gcode_output()
except ServerError as e:
if not gcout_err_logged:
gcout_err_logged = True
logging.info(
f"{e}\nUnable to register gcode output "
"subscription")
else:
logging.info("GCode Output Subscribed")
self.init_list.append("gcode_output_sub")
if "startup_complete" not in self.init_list:
await self._check_ready()
if len(self.init_list) == 4:
logging.debug("Klippy Connection Initialized")
return True
elif not self.is_connected():
break
else:
self.init_attempts += 1
logging.debug("Klippy Connection Failed to Init")
return False
async def _request_endpoints(self) -> None:
result = await self.klippy_apis.list_endpoints(default=None)
if result is None:
return
endpoints = result.get('endpoints', [])
app: MoonrakerApp = self.server.lookup_component("application")
for ep in endpoints:
app.register_remote_handler(ep)
async def _check_ready(self) -> None:
send_id = "identified" not in self.init_list
result: Dict[str, Any]
try:
result = await self.klippy_apis.get_klippy_info(send_id)
except ServerError as e:
if self.init_attempts % LOG_ATTEMPT_INTERVAL == 0 and \
self.init_attempts <= MAX_LOG_ATTEMPTS:
logging.info(
f"{e}\nKlippy info request error. This indicates that\n"
f"Klippy may have experienced an error during startup.\n"
f"Please check klippy.log for more information")
return
self._klippy_info = dict(result)
self._state = result.get('state', "unknown")
if send_id:
self.init_list.append("identified")
await self.server.send_event("server:klippy_identified")
if self._state != "startup":
self.init_list.append('startup_complete')
await self._request_endpoints()
await self.server.send_event("server:klippy_started",
self._state)
if self._state != "ready":
msg = result.get('state_message', "Klippy Not Ready")
logging.info("\n" + msg)
else:
await self._verify_klippy_requirements()
# register methods with klippy
for method in self.klippy_reg_methods:
try:
await self.klippy_apis.register_method(method)
except ServerError:
logging.exception(
f"Unable to register method '{method}'")
logging.info("Klippy ready")
await self.server.send_event("server:klippy_ready")
async def _verify_klippy_requirements(self) -> None:
result = await self.klippy_apis.get_object_list(default=None)
if result is None:
logging.info(
f"Unable to retrieve Klipper Object List")
return
req_objs = set(["virtual_sdcard", "display_status", "pause_resume"])
missing_objs = req_objs - set(result)
if missing_objs:
err_str = ", ".join([f"[{o}]" for o in missing_objs])
logging.info(
f"\nWarning, unable to detect the following printer "
f"objects:\n{err_str}\nPlease add the the above sections "
f"to printer.cfg for full Moonraker functionality.")
if "virtual_sdcard" not in missing_objs:
# Update the gcode path
query_res = await self.klippy_apis.query_objects(
{'configfile': None}, default=None)
if query_res is None:
logging.info(f"Unable to set SD Card path")
else:
config = query_res.get('configfile', {}).get('config', {})
vsd_config = config.get('virtual_sdcard', {})
vsd_path = vsd_config.get('path', None)
if vsd_path is not None:
file_manager: FileManager = self.server.lookup_component(
'file_manager')
file_manager.register_directory('gcodes', vsd_path,
full_access=True)
else:
logging.info(
"Configuration for [virtual_sdcard] not found,"
" unable to set SD Card path")
def _process_command(self, cmd: Dict[str, Any]) -> None:
method = cmd.get('method', None)
if method is not None:
# This is a remote method called from klippy
if method in self.remote_methods:
params = cmd.get('params', {})
self.event_loop.register_callback(
self._execute_method, method, **params)
else:
logging.info(f"Unknown method received: {method}")
return
# This is a response to a request, process
req_id = cmd.get('id', None)
request: Optional[KlippyRequest]
request = self.pending_requests.pop(req_id, None)
if request is None:
logging.info(
f"No request matching request ID: {req_id}, "
f"response: {cmd}")
return
if 'result' in cmd:
result = cmd['result']
if not result:
result = "ok"
else:
err = cmd.get('error', "Malformed Klippy Response")
result = ServerError(err, 400)
request.notify(result)
async def _execute_method(self, method_name: str, **kwargs) -> None:
try:
ret = self.remote_methods[method_name](**kwargs)
if ret is not None:
await ret
except Exception:
logging.exception(f"Error running remote method: {method_name}")
def _process_gcode_response(self, response: str) -> None:
self.server.send_event("server:gcode_response", response)
def _process_status_update(self,
eventtime: float,
status: Dict[str, Any]
) -> None:
if 'webhooks' in status:
# XXX - process other states (startup, ready, error, etc)?
state: Optional[str] = status['webhooks'].get('state', None)
if state is not None:
if state == "shutdown":
logging.info("Klippy has shutdown")
self.server.send_event("server:klippy_shutdown")
self._state = state
for conn, sub in self.subscriptions.items():
conn_status: Dict[str, Any] = {}
for name, fields in sub.items():
if name in status:
val: Dict[str, Any] = dict(status[name])
if fields is not None:
val = {k: v for k, v in val.items() if k in fields}
if val:
conn_status[name] = val
conn.send_status(conn_status, eventtime)
async def request(self, web_request: WebRequest) -> Any:
if not self.is_connected():
raise ServerError("Klippy Host not connected", 503)
rpc_method = web_request.get_endpoint()
if rpc_method == "objects/subscribe":
return await self._request_subscripton(web_request)
else:
if rpc_method == "gcode/script":
script = web_request.get_str('script', "")
data_store: DataStore
data_store = self.server.lookup_component('data_store')
data_store.store_gcode_command(script)
return await self._request_standard(web_request)
async def _request_subscripton(self,
web_request: WebRequest
) -> Dict[str, Any]:
args = web_request.get_args()
conn = web_request.get_connection()
# Build the subscription request from a superset of all client
# subscriptions
sub = args.get('objects', {})
if conn is None:
raise self.server.error(
"No connection associated with subscription request")
self.subscriptions[conn] = sub
all_subs: Dict[str, Any] = {}
# request superset of all client subscriptions
for sub in self.subscriptions.values():
for obj, items in sub.items():
if obj in all_subs:
pi = all_subs[obj]
if items is None or pi is None:
all_subs[obj] = None
else:
uitems = list(set(pi) | set(items))
all_subs[obj] = uitems
else:
all_subs[obj] = items
args['objects'] = all_subs
args['response_template'] = {'method': "process_status_update"}
result = await self._request_standard(web_request)
# prune the status response
pruned_status = {}
all_status = result['status']
sub = self.subscriptions.get(conn, {})
for obj, fields in all_status.items():
if obj in sub:
valid_fields = sub[obj]
if valid_fields is None:
pruned_status[obj] = fields
else:
pruned_status[obj] = {k: v for k, v in fields.items()
if k in valid_fields}
result['status'] = pruned_status
return result
async def _request_standard(self, web_request: WebRequest) -> Any:
rpc_method = web_request.get_endpoint()
args = web_request.get_args()
# Create a base klippy request
base_request = KlippyRequest(rpc_method, args)
self.pending_requests[base_request.id] = base_request
self.event_loop.register_callback(self._write_request, base_request)
return await base_request.wait()
def remove_subscription(self, conn: Subscribable) -> None:
self.subscriptions.pop(conn, None)
def is_connected(self) -> bool:
return self.writer is not None and not self.closing
async def _on_connection_closed(self) -> None:
self.init_list = []
self._state = "disconnected"
for request in self.pending_requests.values():
request.notify(ServerError("Klippy Disconnected", 503))
self.pending_requests = {}
self.subscriptions = {}
logging.info("Klippy Connection Removed")
await self.server.send_event("server:klippy_disconnect")
if self.server.is_running():
# Reconnect if server is running
loop = self.event_loop
self.connection_task = loop.create_task(self._do_connect())
async def close(self, wait_closed: bool = False) -> None:
if self.closing:
if wait_closed:
await self.connection_mutex.acquire()
self.connection_mutex.release()
return
self.closing = True
if (
self.connection_task is not None and
not self.connection_task.done()
):
self.connection_task.cancel()
async with self.connection_mutex:
if self.writer is not None:
try:
self.writer.close()
await self.writer.wait_closed()
except Exception:
logging.exception("Error closing Klippy Unix Socket")
self.writer = None
await self._on_connection_closed()
self.closing = False
# Basic KlippyRequest class, easily converted to dict for json encoding
class KlippyRequest:
def __init__(self, rpc_method: str, params: Dict[str, Any]) -> None:
self.id = id(self)
self.rpc_method = rpc_method
self.params = params
self._event = asyncio.Event()
self.response: Any = None
async def wait(self) -> Any:
# Log pending requests every 60 seconds
start_time = time.time()
while True:
try:
await asyncio.wait_for(self._event.wait(), 60.)
except asyncio.TimeoutError:
pending_time = time.time() - start_time
logging.info(
f"Request '{self.rpc_method}' pending: "
f"{pending_time:.2f} seconds")
self._event.clear()
continue
break
if isinstance(self.response, ServerError):
raise self.response
return self.response
def notify(self, response: Any) -> None:
if self._event.is_set():
return
self.response = response
self._event.set()
def to_dict(self) -> Dict[str, Any]:
return {'id': self.id, 'method': self.rpc_method,
'params': self.params}

View File

@ -22,8 +22,6 @@ import io
import time import time
import socket import socket
import logging import logging
import json
import getpass
import signal import signal
import confighelper import confighelper
import utils import utils
@ -31,6 +29,7 @@ import asyncio
from tornado.httpclient import AsyncHTTPClient from tornado.httpclient import AsyncHTTPClient
from eventloop import EventLoop from eventloop import EventLoop
from app import MoonrakerApp from app import MoonrakerApp
from klippy_connection import KlippyConnection
from utils import ServerError, SentinelClass from utils import ServerError, SentinelClass
# Annotation imports # Annotation imports
@ -46,18 +45,11 @@ from typing import (
TypeVar, TypeVar,
) )
if TYPE_CHECKING: if TYPE_CHECKING:
from websockets import WebRequest, Subscribable, WebsocketManager from websockets import WebRequest, WebsocketManager
from components.data_store import DataStore
from components.klippy_apis import KlippyAPI
from components.file_manager.file_manager import FileManager from components.file_manager.file_manager import FileManager
FlexCallback = Callable[..., Optional[Coroutine]] FlexCallback = Callable[..., Optional[Coroutine]]
_T = TypeVar("_T") _T = TypeVar("_T")
INIT_TIME = .25
LOG_ATTEMPT_INTERVAL = int(2. / INIT_TIME + .5)
MAX_LOG_ATTEMPTS = 10 * LOG_ATTEMPT_INTERVAL
UNIX_BUFFER_LIMIT = 2 * 1024 * 1024
CORE_COMPONENTS = [ CORE_COMPONENTS = [
'dbus_manager', 'database', 'file_manager', 'klippy_apis', 'dbus_manager', 'database', 'file_manager', 'klippy_apis',
'machine', 'data_store', 'shell_command', 'proc_stats', 'machine', 'data_store', 'shell_command', 'proc_stats',
@ -86,6 +78,7 @@ class Server:
self.port: int = config.getint('port', 7125) self.port: int = config.getint('port', 7125)
self.ssl_port: int = config.getint('ssl_port', 7130) self.ssl_port: int = config.getint('ssl_port', 7130)
self.exit_reason: str = "" self.exit_reason: str = ""
self.server_running: bool = False
# Configure Debug Logging # Configure Debug Logging
self.debug = config.getboolean('enable_debug_logging', False) self.debug = config.getboolean('enable_debug_logging', False)
@ -96,26 +89,12 @@ class Server:
# Event initialization # Event initialization
self.events: Dict[str, List[FlexCallback]] = {} self.events: Dict[str, List[FlexCallback]] = {}
# Klippy Connection Handling
self.klippy_address: str = config.get(
'klippy_uds_address', "/tmp/klippy_uds")
self.klippy_connection = KlippyConnection(
self.process_command, self.on_connection_closed, event_loop)
self.klippy_info: Dict[str, Any] = {}
self.init_list: List[str] = []
self.init_handle: Optional[asyncio.Handle] = None
self.init_attempts: int = 0
self.klippy_state: str = "disconnected"
self.klippy_disconnect_evt: Optional[asyncio.Event] = None
self.connection_init_lock: asyncio.Lock = asyncio.Lock()
self.components: Dict[str, Any] = {} self.components: Dict[str, Any] = {}
self.subscriptions: Dict[Subscribable, Dict[str, Any]] = {}
self.failed_components: List[str] = [] self.failed_components: List[str] = []
self.warnings: List[str] = [] self.warnings: List[str] = []
self.klippy_connection = KlippyConnection(config)
# Tornado Application/Server # Tornado Application/Server
self.server_running: bool = False
self.moonraker_app = app = MoonrakerApp(config) self.moonraker_app = app = MoonrakerApp(config)
self.register_endpoint = app.register_local_handler self.register_endpoint = app.register_local_handler
self.register_static_file_handler = app.register_static_file_handler self.register_static_file_handler = app.register_static_file_handler
@ -135,30 +114,12 @@ class Server:
"/server/config", ['GET'], self._handle_config_request) "/server/config", ['GET'], self._handle_config_request)
self.register_endpoint( self.register_endpoint(
"/server/restart", ['POST'], self._handle_server_restart) "/server/restart", ['POST'], self._handle_server_restart)
self.register_notification("server:klippy_ready") self.register_notification("server:klippy_ready")
self.register_notification("server:klippy_shutdown") self.register_notification("server:klippy_shutdown")
self.register_notification("server:klippy_disconnect", self.register_notification("server:klippy_disconnect",
"klippy_disconnected") "klippy_disconnected")
self.register_notification("server:gcode_response") self.register_notification("server:gcode_response")
# Setup remote methods accessable to Klippy. Note that all
# registered remote methods should be of the notification type,
# they do not return a response to Klippy after execution
self.pending_requests: Dict[int, BaseRequest] = {}
self.remote_methods: Dict[str, FlexCallback] = {}
self.klippy_reg_methods: List[str] = []
self.register_remote_method(
'process_gcode_response', self._process_gcode_response,
need_klippy_reg=False)
self.register_remote_method(
'process_status_update', self._process_status_update,
need_klippy_reg=False)
@property
def klippy_apis(self) -> KlippyAPI:
return self.components['klippy_apis']
def get_app_args(self) -> Dict[str, Any]: def get_app_args(self) -> Dict[str, Any]:
return dict(self.app_args) return dict(self.app_args)
@ -219,11 +180,7 @@ class Server:
self.moonraker_app.listen(self.host, self.port, self.ssl_port) self.moonraker_app.listen(self.host, self.port, self.ssl_port)
self.server_running = True self.server_running = True
if connect_to_klippy: if connect_to_klippy:
await self.connect_klippy() self.klippy_connection.connect()
async def wait_connection_initialized(self) -> None:
async with self.connection_init_lock:
return
def add_log_rollover_item(self, name: str, item: str, def add_log_rollover_item(self, name: str, item: str,
log: bool = True) -> None: log: bool = True) -> None:
@ -347,20 +304,9 @@ class Server:
def register_remote_method(self, def register_remote_method(self,
method_name: str, method_name: str,
cb: FlexCallback, cb: FlexCallback
need_klippy_reg: bool = True
) -> None: ) -> None:
if method_name in self.remote_methods: self.klippy_connection.register_remote_method(method_name, cb)
raise self.error(
f"Remote method ({method_name}) already registered")
if self.server_running:
raise self.error(
f"Failed to register remote method {method_name}, "
"methods must be registered during initialization")
self.remote_methods[method_name] = cb
if need_klippy_reg:
# These methods need to be registered with Klippy
self.klippy_reg_methods.append(method_name)
def get_host_info(self) -> Dict[str, Any]: def get_host_info(self) -> Dict[str, Any]:
return { return {
@ -371,292 +317,10 @@ class Server:
} }
def get_klippy_info(self) -> Dict[str, Any]: def get_klippy_info(self) -> Dict[str, Any]:
return dict(self.klippy_info) return self.klippy_connection.klippy_info
def get_klippy_state(self) -> str: def get_klippy_state(self) -> str:
return self.klippy_state return self.klippy_connection.state
# ***** Klippy Connection *****
async def connect_klippy(self) -> None:
if not self.server_running:
return
if self.klippy_connection.is_connected():
return
ret = await self.klippy_connection.connect(self.klippy_address)
if not ret:
self.event_loop.delay_callback(.25, self.connect_klippy)
return
self.init_handle = self.event_loop.delay_callback(
0.01, self._init_klippy_connection)
def process_command(self, cmd: Dict[str, Any]) -> None:
method = cmd.get('method', None)
if method is not None:
# This is a remote method called from klippy
if method in self.remote_methods:
params = cmd.get('params', {})
self.event_loop.register_callback(
self._execute_method, method, **params)
else:
logging.info(f"Unknown method received: {method}")
return
# This is a response to a request, process
req_id = cmd.get('id', None)
request: Optional[BaseRequest]
request = self.pending_requests.pop(req_id, None)
if request is None:
logging.info(
f"No request matching request ID: {req_id}, "
f"response: {cmd}")
return
if 'result' in cmd:
result = cmd['result']
if not result:
result = "ok"
else:
err = cmd.get('error', "Malformed Klippy Response")
result = ServerError(err, 400)
request.notify(result)
async def _execute_method(self, method_name: str, **kwargs) -> None:
try:
ret = self.remote_methods[method_name](**kwargs)
if ret is not None:
await ret
except Exception:
logging.exception(f"Error running remote method: {method_name}")
def on_connection_closed(self) -> None:
self.init_list = []
self.klippy_state = "disconnected"
for request in self.pending_requests.values():
request.notify(ServerError("Klippy Disconnected", 503))
self.pending_requests = {}
self.subscriptions = {}
logging.info("Klippy Connection Removed")
self.send_event("server:klippy_disconnect")
if self.init_handle is not None:
self.init_handle.cancel()
self.init_handle = None
if self.server_running:
self.event_loop.delay_callback(.25, self.connect_klippy)
if self.klippy_disconnect_evt is not None:
self.klippy_disconnect_evt.set()
async def _init_klippy_connection(self) -> None:
if not self.server_running:
return
async with self.connection_init_lock:
await self._check_ready()
await self._request_endpoints()
# Subscribe to "webhooks"
# Register "webhooks" subscription
if "webhooks_sub" not in self.init_list:
try:
await self.klippy_apis.subscribe_objects(
{'webhooks': None})
except ServerError as e:
logging.info(
f"{e}\nUnable to subscribe to webhooks object")
else:
logging.info("Webhooks Subscribed")
self.init_list.append("webhooks_sub")
# Subscribe to Gcode Output
if "gcode_output_sub" not in self.init_list:
try:
await self.klippy_apis.subscribe_gcode_output()
except ServerError as e:
logging.info(
f"{e}\nUnable to register gcode output subscription")
else:
logging.info("GCode Output Subscribed")
self.init_list.append("gcode_output_sub")
if (
"startup_complete" in self.init_list or
not self.klippy_connection.is_connected()
):
# Either Klippy is ready or the connection dropped
# during initialization. Exit initialization
self.init_attempts = 0
self.init_handle = None
else:
self.init_attempts += 1
self.init_handle = self.event_loop.delay_callback(
INIT_TIME, self._init_klippy_connection)
async def _request_endpoints(self) -> None:
result = await self.klippy_apis.list_endpoints(default=None)
if result is None:
return
endpoints = result.get('endpoints', [])
for ep in endpoints:
self.moonraker_app.register_remote_handler(ep)
async def _check_ready(self) -> None:
send_id = "identified" not in self.init_list
result: Dict[str, Any]
try:
result = await self.klippy_apis.get_klippy_info(send_id)
except ServerError as e:
if self.init_attempts % LOG_ATTEMPT_INTERVAL == 0 and \
self.init_attempts <= MAX_LOG_ATTEMPTS:
logging.info(
f"{e}\nKlippy info request error. This indicates that\n"
f"Klippy may have experienced an error during startup.\n"
f"Please check klippy.log for more information")
return
self.klippy_info = dict(result)
self.klippy_state = result.get('state', "unknown")
if send_id:
self.init_list.append("identified")
await self.send_event("server:klippy_identified")
if self.klippy_state != "startup":
self.init_list.append('startup_complete')
await self.send_event("server:klippy_started", self.klippy_state)
if self.klippy_state != "ready":
msg = result.get('state_message', "Klippy Not Ready")
logging.info("\n" + msg)
else:
await self._verify_klippy_requirements()
# register methods with klippy
for method in self.klippy_reg_methods:
try:
await self.klippy_apis.register_method(method)
except ServerError:
logging.exception(
f"Unable to register method '{method}'")
logging.info("Klippy ready")
await self.send_event("server:klippy_ready")
async def _verify_klippy_requirements(self) -> None:
result = await self.klippy_apis.get_object_list(default=None)
if result is None:
logging.info(
f"Unable to retrieve Klipper Object List")
return
req_objs = set(["virtual_sdcard", "display_status", "pause_resume"])
missing_objs = req_objs - set(result)
if missing_objs:
err_str = ", ".join([f"[{o}]" for o in missing_objs])
logging.info(
f"\nWarning, unable to detect the following printer "
f"objects:\n{err_str}\nPlease add the the above sections "
f"to printer.cfg for full Moonraker functionality.")
if "virtual_sdcard" not in missing_objs:
# Update the gcode path
query_res = await self.klippy_apis.query_objects(
{'configfile': None}, default=None)
if query_res is None:
logging.info(f"Unable to set SD Card path")
else:
config = query_res.get('configfile', {}).get('config', {})
vsd_config = config.get('virtual_sdcard', {})
vsd_path = vsd_config.get('path', None)
if vsd_path is not None:
file_manager: FileManager = self.lookup_component(
'file_manager')
file_manager.register_directory('gcodes', vsd_path,
full_access=True)
else:
logging.info(
"Configuration for [virtual_sdcard] not found,"
" unable to set SD Card path")
def _process_gcode_response(self, response: str) -> None:
self.send_event("server:gcode_response", response)
def _process_status_update(self,
eventtime: float,
status: Dict[str, Any]
) -> None:
if 'webhooks' in status:
# XXX - process other states (startup, ready, error, etc)?
state: Optional[str] = status['webhooks'].get('state', None)
if state is not None:
if state == "shutdown":
logging.info("Klippy has shutdown")
self.send_event("server:klippy_shutdown")
self.klippy_state = state
for conn, sub in self.subscriptions.items():
conn_status: Dict[str, Any] = {}
for name, fields in sub.items():
if name in status:
val: Dict[str, Any] = dict(status[name])
if fields is not None:
val = {k: v for k, v in val.items() if k in fields}
if val:
conn_status[name] = val
conn.send_status(conn_status, eventtime)
async def make_request(self, web_request: WebRequest) -> Any:
rpc_method = web_request.get_endpoint()
if rpc_method == "objects/subscribe":
return await self._request_subscripton(web_request)
else:
if rpc_method == "gcode/script":
script = web_request.get_str('script', "")
data_store: DataStore = self.lookup_component('data_store')
data_store.store_gcode_command(script)
return await self._request_standard(web_request)
async def _request_subscripton(self,
web_request: WebRequest
) -> Dict[str, Any]:
args = web_request.get_args()
conn = web_request.get_connection()
# Build the subscription request from a superset of all client
# subscriptions
sub = args.get('objects', {})
if conn is None:
raise self.error(
"No connection associated with subscription request")
self.subscriptions[conn] = sub
all_subs: Dict[str, Any] = {}
# request superset of all client subscriptions
for sub in self.subscriptions.values():
for obj, items in sub.items():
if obj in all_subs:
pi = all_subs[obj]
if items is None or pi is None:
all_subs[obj] = None
else:
uitems = list(set(pi) | set(items))
all_subs[obj] = uitems
else:
all_subs[obj] = items
args['objects'] = all_subs
args['response_template'] = {'method': "process_status_update"}
result = await self._request_standard(web_request)
# prune the status response
pruned_status = {}
all_status = result['status']
sub = self.subscriptions.get(conn, {})
for obj, fields in all_status.items():
if obj in sub:
valid_fields = sub[obj]
if valid_fields is None:
pruned_status[obj] = fields
else:
pruned_status[obj] = {k: v for k, v in fields.items()
if k in valid_fields}
result['status'] = pruned_status
return result
async def _request_standard(self, web_request: WebRequest) -> Any:
rpc_method = web_request.get_endpoint()
args = web_request.get_args()
# Create a base klippy request
base_request = BaseRequest(rpc_method, args)
self.pending_requests[base_request.id] = base_request
self.event_loop.register_callback(
self.klippy_connection.send_request, base_request)
return await base_request.wait()
def remove_subscription(self, conn: Subscribable) -> None:
self.subscriptions.pop(conn, None)
def _handle_term_signal(self) -> None: def _handle_term_signal(self) -> None:
logging.info(f"Exiting with signal SIGTERM") logging.info(f"Exiting with signal SIGTERM")
@ -686,17 +350,17 @@ class Server:
# Disconnect from Klippy # Disconnect from Klippy
try: try:
if self.klippy_connection.is_connected():
self.klippy_disconnect_evt = asyncio.Event()
await self.klippy_connection.close()
await asyncio.wait_for( await asyncio.wait_for(
self.klippy_disconnect_evt.wait(), 2.) asyncio.shield(self.klippy_connection.close(
self.klippy_disconnect_evt = None wait_closed=True)), 2.)
except Exception: except Exception:
logging.exception("Klippy Disconnect Error") logging.exception("Klippy Disconnect Error")
# Close all components # Close all components
for name, component in self.components.items(): for name, component in self.components.items():
if name in ["application", "websockets", "klippy_connection"]:
# These components have already been closed
continue
if hasattr(component, "close"): if hasattr(component, "close"):
func = getattr(component, "close") func = getattr(component, "close")
try: try:
@ -726,7 +390,7 @@ class Server:
wsm: WebsocketManager = self.lookup_component('websockets') wsm: WebsocketManager = self.lookup_component('websockets')
return { return {
'klippy_connected': self.klippy_connection.is_connected(), 'klippy_connected': self.klippy_connection.is_connected(),
'klippy_state': self.klippy_state, 'klippy_state': self.klippy_connection.state,
'components': list(self.components.keys()), 'components': list(self.components.keys()),
'failed_components': self.failed_components, 'failed_components': self.failed_components,
'registered_directories': reg_dirs, 'registered_directories': reg_dirs,
@ -742,130 +406,6 @@ class Server:
'config': self.config.get_parsed_config() 'config': self.config.get_parsed_config()
} }
class KlippyConnection:
def __init__(self,
on_recd: Callable[[dict], None],
on_close: Callable[[], None],
event_loop: EventLoop
) -> None:
self.writer: Optional[asyncio.StreamWriter] = None
self.connection_mutex: asyncio.Lock = asyncio.Lock()
self.on_recd = on_recd
self.on_close = on_close
self.event_loop = event_loop
self.log_no_access = True
async def connect(self, address: str) -> bool:
if self.is_connected():
await self.close()
async with self.connection_mutex:
if not os.path.exists(address):
return False
if not os.access(address, os.R_OK | os.W_OK):
if self.log_no_access:
user = getpass.getuser()
logging.info(
f"Cannot connect to Klippy, Linux user '{user}' lacks "
f"permission to open Unix Domain Socket: {address}")
self.log_no_access = False
return False
self.log_no_access = True
try:
reader, writer = await asyncio.open_unix_connection(
address, limit=UNIX_BUFFER_LIMIT)
except Exception:
return False
logging.info("Klippy Connection Established")
self.writer = writer
self.event_loop.register_callback(self._read_stream, reader)
return True
async def _read_stream(self, reader: asyncio.StreamReader) -> None:
errors_remaining: int = 10
while not reader.at_eof() and errors_remaining:
try:
data = await reader.readuntil(b'\x03')
except (ConnectionError, asyncio.IncompleteReadError):
break
except Exception:
logging.exception("Klippy Stream Read Error")
errors_remaining -= 1
continue
errors_remaining = 10
try:
decoded_cmd = json.loads(data[:-1])
self.on_recd(decoded_cmd)
except Exception:
logging.exception(
f"Error processing Klippy Host Response: {data.decode()}")
await self.close()
async def send_request(self, request: BaseRequest) -> None:
if self.writer is None:
request.notify(ServerError("Klippy Host not connected", 503))
return
data = json.dumps(request.to_dict()).encode() + b"\x03"
try:
self.writer.write(data)
await self.writer.drain()
except Exception:
request.notify(ServerError("Klippy Host not connected", 503))
await self.close()
def is_connected(self) -> bool:
return self.writer is not None
async def close(self) -> None:
if (
self.connection_mutex.locked() or
self.writer is None
):
return
async with self.connection_mutex:
try:
self.writer.close()
await self.writer.wait_closed()
except Exception:
logging.exception("Error closing Klippy Unix Socket")
self.writer = None
self.on_close()
# Basic WebRequest class, easily converted to dict for json encoding
class BaseRequest:
def __init__(self, rpc_method: str, params: Dict[str, Any]) -> None:
self.id = id(self)
self.rpc_method = rpc_method
self.params = params
self._event = asyncio.Event()
self.response: Any = None
async def wait(self) -> Any:
# Log pending requests every 60 seconds
start_time = time.time()
while True:
try:
await asyncio.wait_for(self._event.wait(), 60.)
except asyncio.TimeoutError:
pending_time = time.time() - start_time
logging.info(
f"Request '{self.rpc_method}' pending: "
f"{pending_time:.2f} seconds")
self._event.clear()
continue
break
if isinstance(self.response, ServerError):
raise self.response
return self.response
def notify(self, response: Any) -> None:
self.response = response
self._event.set()
def to_dict(self) -> Dict[str, Any]:
return {'id': self.id, 'method': self.rpc_method,
'params': self.params}
def main(cmd_line_args: argparse.Namespace) -> None: def main(cmd_line_args: argparse.Namespace) -> None:
cfg_file = cmd_line_args.configfile cfg_file = cmd_line_args.configfile
app_args = {'config_file': cfg_file} app_args = {'config_file': cfg_file}

View File

@ -28,6 +28,7 @@ from typing import (
if TYPE_CHECKING: if TYPE_CHECKING:
from moonraker import Server from moonraker import Server
from app import APIDefinition from app import APIDefinition
from klippy_connection import KlippyConnection as Klippy
import components.authorization import components.authorization
_T = TypeVar("_T") _T = TypeVar("_T")
_C = TypeVar("_C", str, bool, float, int) _C = TypeVar("_C", str, bool, float, int)
@ -270,6 +271,7 @@ class APITransport:
class WebsocketManager(APITransport): class WebsocketManager(APITransport):
def __init__(self, server: Server) -> None: def __init__(self, server: Server) -> None:
self.server = server self.server = server
self.klippy: Klippy = server.lookup_component("klippy_connection")
self.websockets: Dict[int, WebSocket] = {} self.websockets: Dict[int, WebSocket] = {}
self.rpc = JsonRPC() self.rpc = JsonRPC()
self.closed_event: Optional[asyncio.Event] = None self.closed_event: Optional[asyncio.Event] = None
@ -311,7 +313,7 @@ class WebsocketManager(APITransport):
def _generate_callback(self, endpoint: str) -> RPCCallback: def _generate_callback(self, endpoint: str) -> RPCCallback:
async def func(ws: WebSocket, **kwargs) -> Any: async def func(ws: WebSocket, **kwargs) -> Any:
result = await self.server.make_request( result = await self.klippy.request(
WebRequest(endpoint, kwargs, conn=ws, ip_addr=ws.ip_addr, WebRequest(endpoint, kwargs, conn=ws, ip_addr=ws.ip_addr,
user=ws.current_user)) user=ws.current_user))
return result return result
@ -348,7 +350,7 @@ class WebsocketManager(APITransport):
def remove_websocket(self, ws: WebSocket) -> None: def remove_websocket(self, ws: WebSocket) -> None:
old_ws = self.websockets.pop(ws.uid, None) old_ws = self.websockets.pop(ws.uid, None)
if old_ws is not None: if old_ws is not None:
self.server.remove_subscription(old_ws) self.klippy.remove_subscription(old_ws)
logging.debug(f"Websocket Removed: {ws.uid}") logging.debug(f"Websocket Removed: {ws.uid}")
if self.closed_event is not None and not self.websockets: if self.closed_event is not None and not self.websockets:
self.closed_event.set() self.closed_event.set()