moonraker: move common classes to common.py

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2023-02-23 19:34:04 -05:00
parent 160f6d64be
commit 6d73c60a38
No known key found for this signature in database
GPG Key ID: 5A1EB336DFB4C71B
26 changed files with 199 additions and 165 deletions

View File

@ -22,12 +22,11 @@ from tornado.escape import url_unescape, url_escape
from tornado.routing import Rule, PathMatches, AnyMatches
from tornado.http1connection import HTTP1Connection
from tornado.log import access_log
from .common import WebRequest, APIDefinition, APITransport
from .utils import ServerError, source_info
from .websockets import (
WebRequest,
WebsocketManager,
WebSocket,
APITransport,
BridgeSocket
)
from streaming_form_data import StreamingFormDataParser
@ -111,24 +110,6 @@ class MutableRouter(tornado.web.ReversibleRuleRouter):
except Exception:
logging.exception(f"Unable to remove rule: {pattern}")
class APIDefinition:
def __init__(self,
endpoint: str,
http_uri: str,
jrpc_methods: List[str],
request_methods: Union[str, List[str]],
transports: List[str],
callback: Optional[APICallback],
need_object_parser: bool):
self.endpoint = endpoint
self.uri = http_uri
self.jrpc_methods = jrpc_methods
if not isinstance(request_methods, list):
request_methods = [request_methods]
self.request_methods = request_methods
self.supported_transports = transports
self.callback = callback
self.need_object_parser = need_object_parser
class InternalTransport(APITransport):
def __init__(self, server: Server) -> None:

169
moonraker/common.py Normal file
View File

@ -0,0 +1,169 @@
# Common classes used throughout Moonraker
#
# Copyright (C) 2023 Eric Callahan <arksine.code@gmail.com>
#
# This file may be distributed under the terms of the GNU GPLv3 license
from __future__ import annotations
import ipaddress
from .utils import ServerError, Sentinel
# Annotation imports
from typing import (
TYPE_CHECKING,
Any,
Optional,
Callable,
Coroutine,
Type,
TypeVar,
Union,
Dict,
List,
)
if TYPE_CHECKING:
from .websockets import BaseSocketClient
from .components.authorization import Authorization
_T = TypeVar("_T")
_C = TypeVar("_C", str, bool, float, int)
IPUnion = Union[ipaddress.IPv4Address, ipaddress.IPv6Address]
ConvType = Union[str, bool, float, int]
ArgVal = Union[None, int, float, bool, str]
RPCCallback = Callable[..., Coroutine]
AuthComp = Optional[Authorization]
class Subscribable:
def send_status(self,
status: Dict[str, Any],
eventtime: float
) -> None:
raise NotImplementedError
class WebRequest:
def __init__(self,
endpoint: str,
args: Dict[str, Any],
action: Optional[str] = "",
conn: Optional[Subscribable] = None,
ip_addr: str = "",
user: Optional[Dict[str, Any]] = None
) -> None:
self.endpoint = endpoint
self.action = action or ""
self.args = args
self.conn = conn
self.ip_addr: Optional[IPUnion] = None
try:
self.ip_addr = ipaddress.ip_address(ip_addr)
except Exception:
self.ip_addr = None
self.current_user = user
def get_endpoint(self) -> str:
return self.endpoint
def get_action(self) -> str:
return self.action
def get_args(self) -> Dict[str, Any]:
return self.args
def get_subscribable(self) -> Optional[Subscribable]:
return self.conn
def get_client_connection(self) -> Optional[BaseSocketClient]:
if isinstance(self.conn, BaseSocketClient):
return self.conn
return None
def get_ip_address(self) -> Optional[IPUnion]:
return self.ip_addr
def get_current_user(self) -> Optional[Dict[str, Any]]:
return self.current_user
def _get_converted_arg(self,
key: str,
default: Union[Sentinel, _T],
dtype: Type[_C]
) -> Union[_C, _T]:
if key not in self.args:
if default is Sentinel.MISSING:
raise ServerError(f"No data for argument: {key}")
return default
val = self.args[key]
try:
if dtype is not bool:
return dtype(val)
else:
if isinstance(val, str):
val = val.lower()
if val in ["true", "false"]:
return True if val == "true" else False # type: ignore
elif isinstance(val, bool):
return val # type: ignore
raise TypeError
except Exception:
raise ServerError(
f"Unable to convert argument [{key}] to {dtype}: "
f"value recieved: {val}")
def get(self,
key: str,
default: Union[Sentinel, _T] = Sentinel.MISSING
) -> Union[_T, Any]:
val = self.args.get(key, default)
if val is Sentinel.MISSING:
raise ServerError(f"No data for argument: {key}")
return val
def get_str(self,
key: str,
default: Union[Sentinel, _T] = Sentinel.MISSING
) -> Union[str, _T]:
return self._get_converted_arg(key, default, str)
def get_int(self,
key: str,
default: Union[Sentinel, _T] = Sentinel.MISSING
) -> Union[int, _T]:
return self._get_converted_arg(key, default, int)
def get_float(self,
key: str,
default: Union[Sentinel, _T] = Sentinel.MISSING
) -> Union[float, _T]:
return self._get_converted_arg(key, default, float)
def get_boolean(self,
key: str,
default: Union[Sentinel, _T] = Sentinel.MISSING
) -> Union[bool, _T]:
return self._get_converted_arg(key, default, bool)
class APIDefinition:
def __init__(self,
endpoint: str,
http_uri: str,
jrpc_methods: List[str],
request_methods: Union[str, List[str]],
transports: List[str],
callback: Optional[Callable[[WebRequest], Coroutine]],
need_object_parser: bool):
self.endpoint = endpoint
self.uri = http_uri
self.jrpc_methods = jrpc_methods
if not isinstance(request_methods, list):
request_methods = [request_methods]
self.request_methods = request_methods
self.supported_transports = transports
self.callback = callback
self.need_object_parser = need_object_parser
class APITransport:
def register_api_handler(self, api_def: APIDefinition) -> None:
raise NotImplementedError
def remove_api_handler(self, api_def: APIDefinition) -> None:
raise NotImplementedError

View File

@ -22,7 +22,7 @@ from typing import (
)
if TYPE_CHECKING:
from ..confighelper import ConfigHelper
from ..websockets import WebRequest
from ..common import WebRequest
from .http_client import HttpClient
from .database import MoonrakerDatabase

View File

@ -35,7 +35,8 @@ from typing import (
if TYPE_CHECKING:
from ..confighelper import ConfigHelper
from ..websockets import WebRequest, WebsocketManager
from ..common import WebRequest
from ..websockets import WebsocketManager
from tornado.httputil import HTTPServerRequest
from tornado.web import RequestHandler
from .database import MoonrakerDatabase as DBComp

View File

@ -21,7 +21,7 @@ from typing import (
)
if TYPE_CHECKING:
from ..confighelper import ConfigHelper
from ..websockets import WebRequest
from ..common import WebRequest
from .klippy_apis import KlippyAPI as APIComp
GCQueue = Deque[Dict[str, Any]]
TempStore = Dict[str, Dict[str, Deque[float]]]

View File

@ -34,7 +34,7 @@ from typing import (
)
if TYPE_CHECKING:
from ..confighelper import ConfigHelper
from ..websockets import WebRequest
from ..common import WebRequest
DBRecord = Union[int, float, bool, str, List[Any], Dict[str, Any]]
DBType = Optional[DBRecord]
_T = TypeVar("_T")

View File

@ -24,7 +24,7 @@ from typing import (
if TYPE_CHECKING:
from ..server import Server
from ..confighelper import ConfigHelper
from ..websockets import WebRequest
from ..common import WebRequest
from ..klippy_connection import KlippyConnection as Klippy
UNIX_BUFFER_LIMIT = 20 * 1024 * 1024

View File

@ -41,7 +41,7 @@ from typing import (
if TYPE_CHECKING:
from inotify_simple import Event as InotifyEvent
from ...confighelper import ConfigHelper
from ...websockets import WebRequest
from ...common import WebRequest
from ...klippy_connection import KlippyConnection
from .. import database
from .. import klippy_apis

View File

@ -18,7 +18,7 @@ from typing import (
)
if TYPE_CHECKING:
from ..confighelper import ConfigHelper
from ..websockets import WebRequest
from ..common import WebRequest
from .database import MoonrakerDatabase as DBComp
from .job_state import JobState
from .file_manager.file_manager import FileManager

View File

@ -20,7 +20,7 @@ from typing import (
)
if TYPE_CHECKING:
from ..confighelper import ConfigHelper
from ..websockets import WebRequest
from ..common import WebRequest
from .klippy_apis import KlippyAPI
from .file_manager.file_manager import FileManager

View File

@ -6,7 +6,7 @@
from __future__ import annotations
from ..utils import Sentinel
from ..websockets import WebRequest, Subscribable
from ..common import WebRequest, Subscribable
# Annotation imports
from typing import (
@ -21,7 +21,6 @@ from typing import (
)
if TYPE_CHECKING:
from ..confighelper import ConfigHelper
from ..websockets import WebRequest
from ..klippy_connection import KlippyConnection as Klippy
Subscription = Dict[str, Optional[List[Any]]]
_T = TypeVar("_T")

View File

@ -40,7 +40,7 @@ from typing import (
if TYPE_CHECKING:
from ..confighelper import ConfigHelper
from ..websockets import WebRequest
from ..common import WebRequest
from ..app import MoonrakerApp
from ..klippy_connection import KlippyConnection
from .shell_command import ShellCommandFactory as SCMDComp

View File

@ -13,7 +13,8 @@ import pathlib
import ssl
from collections import deque
import paho.mqtt.client as paho_mqtt
from ..websockets import Subscribable, WebRequest, JsonRPC, APITransport
from ..common import Subscribable, WebRequest, APITransport
from ..websockets import JsonRPC
# Annotation imports
from typing import (

View File

@ -19,7 +19,7 @@ from typing import (
if TYPE_CHECKING:
from ..confighelper import ConfigHelper
from ..websockets import WebRequest
from ..common import WebRequest
from .http_client import HttpClient
from .klippy_apis import KlippyAPI as APIComp

View File

@ -16,7 +16,7 @@ from typing import (
)
if TYPE_CHECKING:
from ..confighelper import ConfigHelper
from ..websockets import WebRequest
from ..common import WebRequest
from .klippy_apis import KlippyAPI as APIComp
from .file_manager.file_manager import FileManager
from .job_queue import JobQueue

View File

@ -28,7 +28,7 @@ from typing import (
if TYPE_CHECKING:
from ..confighelper import ConfigHelper
from ..websockets import WebRequest
from ..common import WebRequest
from .machine import Machine
from .klippy_apis import KlippyAPI as APIComp
from .mqtt import MQTTClient

View File

@ -27,7 +27,8 @@ from typing import (
)
if TYPE_CHECKING:
from ..confighelper import ConfigHelper
from ..websockets import WebRequest, WebsocketManager
from ..common import WebRequest
from ..websockets import WebsocketManager
from . import shell_command
STAT_CALLBACK = Callable[[int], Optional[Awaitable]]

View File

@ -28,7 +28,7 @@ from typing import (
if TYPE_CHECKING:
from ..confighelper import ConfigHelper
from ..websockets import WebRequest
from ..common import WebRequest
from .mqtt import MQTTClient
SENSOR_UPDATE_TIME = 1.0

View File

@ -18,7 +18,7 @@ import logging.handlers
import tempfile
from queue import SimpleQueue
from ..loghelper import LocalQueueHandler
from ..websockets import Subscribable, WebRequest
from ..common import Subscribable, WebRequest
from typing import (
TYPE_CHECKING,

View File

@ -39,7 +39,7 @@ from typing import (
if TYPE_CHECKING:
from ...server import Server
from ...confighelper import ConfigHelper
from ...websockets import WebRequest
from ...common import WebRequest
from ...klippy_connection import KlippyConnection
from ..shell_command import ShellCommandFactory as SCMDComp
from ..database import MoonrakerDatabase as DBComp

View File

@ -22,7 +22,7 @@ from typing import (
if TYPE_CHECKING:
from ..server import Server
from ..confighelper import ConfigHelper
from ..websockets import WebRequest
from ..common import WebRequest
from .database import MoonrakerDatabase
from .machine import Machine
from .shell_command import ShellCommandFactory

View File

@ -29,7 +29,7 @@ from typing import (
if TYPE_CHECKING:
from ..confighelper import ConfigHelper
from ..websockets import WebRequest
from ..common import WebRequest
class OnOff(str, Enum):
on: str = "on"

View File

@ -31,7 +31,7 @@ from typing import (
if TYPE_CHECKING:
from .server import Server
from .app import MoonrakerApp
from .websockets import WebRequest, Subscribable
from .common import WebRequest, Subscribable
from .confighelper import ConfigHelper
from .components.klippy_apis import KlippyAPI
from .components.file_manager.file_manager import FileManager

View File

@ -24,7 +24,7 @@ from typing import (
if TYPE_CHECKING:
from .server import Server
from .websockets import WebRequest
from .common import WebRequest
from .klippy_connection import KlippyConnection
# Coroutine friendly QueueHandler courtesy of Martjin Pieters:

View File

@ -38,7 +38,8 @@ from typing import (
TypeVar,
)
if TYPE_CHECKING:
from .websockets import WebRequest, WebsocketManager
from .common import WebRequest
from .websockets import WebsocketManager
from .components.file_manager.file_manager import FileManager
from .components.machine import Machine
from .components.extensions import ExtensionManager

View File

@ -12,6 +12,7 @@ import asyncio
import copy
from tornado.websocket import WebSocketHandler, WebSocketClosedError
from tornado.web import HTTPError
from .common import WebRequest, Subscribable, APITransport, APIDefinition
from .utils import ServerError, Sentinel
# Annotation imports
@ -23,20 +24,16 @@ from typing import (
Callable,
Coroutine,
Tuple,
Type,
TypeVar,
Union,
Dict,
List,
)
if TYPE_CHECKING:
from .server import Server
from .app import APIDefinition
from .klippy_connection import KlippyConnection as Klippy
from .components.extensions import ExtensionManager
from .components.authorization import Authorization
_T = TypeVar("_T")
_C = TypeVar("_C", str, bool, float, int)
IPUnion = Union[ipaddress.IPv4Address, ipaddress.IPv6Address]
ConvType = Union[str, bool, float, int]
ArgVal = Union[None, int, float, bool, str]
@ -45,115 +42,6 @@ if TYPE_CHECKING:
CLIENT_TYPES = ["web", "mobile", "desktop", "display", "bot", "agent", "other"]
class Subscribable:
def send_status(self,
status: Dict[str, Any],
eventtime: float
) -> None:
raise NotImplementedError
class WebRequest:
def __init__(self,
endpoint: str,
args: Dict[str, Any],
action: Optional[str] = "",
conn: Optional[Subscribable] = None,
ip_addr: str = "",
user: Optional[Dict[str, Any]] = None
) -> None:
self.endpoint = endpoint
self.action = action or ""
self.args = args
self.conn = conn
self.ip_addr: Optional[IPUnion] = None
try:
self.ip_addr = ipaddress.ip_address(ip_addr)
except Exception:
self.ip_addr = None
self.current_user = user
def get_endpoint(self) -> str:
return self.endpoint
def get_action(self) -> str:
return self.action
def get_args(self) -> Dict[str, Any]:
return self.args
def get_subscribable(self) -> Optional[Subscribable]:
return self.conn
def get_client_connection(self) -> Optional[BaseSocketClient]:
if isinstance(self.conn, BaseSocketClient):
return self.conn
return None
def get_ip_address(self) -> Optional[IPUnion]:
return self.ip_addr
def get_current_user(self) -> Optional[Dict[str, Any]]:
return self.current_user
def _get_converted_arg(self,
key: str,
default: Union[Sentinel, _T],
dtype: Type[_C]
) -> Union[_C, _T]:
if key not in self.args:
if default is Sentinel.MISSING:
raise ServerError(f"No data for argument: {key}")
return default
val = self.args[key]
try:
if dtype is not bool:
return dtype(val)
else:
if isinstance(val, str):
val = val.lower()
if val in ["true", "false"]:
return True if val == "true" else False # type: ignore
elif isinstance(val, bool):
return val # type: ignore
raise TypeError
except Exception:
raise ServerError(
f"Unable to convert argument [{key}] to {dtype}: "
f"value recieved: {val}")
def get(self,
key: str,
default: Union[Sentinel, _T] = Sentinel.MISSING
) -> Union[_T, Any]:
val = self.args.get(key, default)
if val is Sentinel.MISSING:
raise ServerError(f"No data for argument: {key}")
return val
def get_str(self,
key: str,
default: Union[Sentinel, _T] = Sentinel.MISSING
) -> Union[str, _T]:
return self._get_converted_arg(key, default, str)
def get_int(self,
key: str,
default: Union[Sentinel, _T] = Sentinel.MISSING
) -> Union[int, _T]:
return self._get_converted_arg(key, default, int)
def get_float(self,
key: str,
default: Union[Sentinel, _T] = Sentinel.MISSING
) -> Union[float, _T]:
return self._get_converted_arg(key, default, float)
def get_boolean(self,
key: str,
default: Union[Sentinel, _T] = Sentinel.MISSING
) -> Union[bool, _T]:
return self._get_converted_arg(key, default, bool)
class JsonRPC:
def __init__(
self, server: Server, transport: str = "Websocket"
@ -339,13 +227,6 @@ class JsonRPC:
'id': req_id
}
class APITransport:
def register_api_handler(self, api_def: APIDefinition) -> None:
raise NotImplementedError
def remove_api_handler(self, api_def: APIDefinition) -> None:
raise NotImplementedError
class WebsocketManager(APITransport):
def __init__(self, server: Server) -> None:
self.server = server