all: Replace strings with RequestType flags

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2023-11-05 05:21:10 -05:00
parent 6e8b720d17
commit b18e9cc222
No known key found for this signature in database
GPG Key ID: 5A1EB336DFB4C71B
23 changed files with 313 additions and 229 deletions

View File

@ -11,6 +11,7 @@ import asyncio
import logging
import email.utils
import xml.etree.ElementTree as etree
from ..common import RequestType
from typing import (
TYPE_CHECKING,
Awaitable,
@ -57,23 +58,23 @@ class Announcements:
)
self.server.register_endpoint(
"/server/announcements/list", ["GET"],
"/server/announcements/list", RequestType.GET,
self._list_announcements
)
self.server.register_endpoint(
"/server/announcements/dismiss", ["POST"],
"/server/announcements/dismiss", RequestType.POST,
self._handle_dismiss_request
)
self.server.register_endpoint(
"/server/announcements/update", ["POST"],
"/server/announcements/update", RequestType.POST,
self._handle_update_request
)
self.server.register_endpoint(
"/server/announcements/feed", ["POST", "DELETE"],
"/server/announcements/feed", RequestType.POST | RequestType.DELETE,
self._handle_feed_request
)
self.server.register_endpoint(
"/server/announcements/feeds", ["GET"],
"/server/announcements/feeds", RequestType.GET,
self._handle_list_feeds
)
self.server.register_notification(
@ -170,13 +171,13 @@ class Announcements:
async def _handle_feed_request(
self, web_request: WebRequest
) -> Dict[str, Any]:
action = web_request.get_action()
req_type = web_request.get_request_type()
name: str = web_request.get("name")
name = name.lower()
changed: bool = False
db: MoonrakerDatabase = self.server.lookup_component("database")
result = "skipped"
if action == "POST":
if req_type == RequestType.POST:
if name not in self.subscriptions:
feed = RssFeed(name, self.entry_mgr, self.dev_mode)
self.subscriptions[name] = feed
@ -187,7 +188,7 @@ class Announcements:
"moonraker", "announcements.stored_feeds", self.stored_feeds
)
result = "added"
elif action == "DELETE":
elif req_type == RequestType.DELETE:
if name not in self.stored_feeds:
raise self.server.error(f"Feed '{name}' not stored")
if name in self.configured_feeds:

View File

@ -20,6 +20,7 @@ import logging
from tornado.web import HTTPError
from libnacl.sign import Signer, Verifier
from ..utils import json_wrapper as jsonw
from ..common import RequestType, TransportType
# Annotation imports
from typing import (
@ -226,32 +227,42 @@ class Authorization:
self.permitted_paths.add("/access/refresh_jwt")
self.permitted_paths.add("/access/info")
self.server.register_endpoint(
"/access/login", ['POST'], self._handle_login,
transports=['http', 'websocket'])
"/access/login", RequestType.POST, self._handle_login,
transports=TransportType.HTTP | TransportType.WEBSOCKET
)
self.server.register_endpoint(
"/access/logout", ['POST'], self._handle_logout,
transports=['http', 'websocket'])
"/access/logout", RequestType.POST, self._handle_logout,
transports=TransportType.HTTP | TransportType.WEBSOCKET
)
self.server.register_endpoint(
"/access/refresh_jwt", ['POST'], self._handle_refresh_jwt,
transports=['http', 'websocket'])
"/access/refresh_jwt", RequestType.POST, self._handle_refresh_jwt,
transports=TransportType.HTTP | TransportType.WEBSOCKET
)
self.server.register_endpoint(
"/access/user", ['GET', 'POST', 'DELETE'],
self._handle_user_request, transports=['http', 'websocket'])
"/access/user", RequestType.all(), self._handle_user_request,
transports=TransportType.HTTP | TransportType.WEBSOCKET
)
self.server.register_endpoint(
"/access/users/list", ['GET'], self._handle_list_request,
transports=['http', 'websocket'])
"/access/users/list", RequestType.GET, self._handle_list_request,
transports=TransportType.HTTP | TransportType.WEBSOCKET
)
self.server.register_endpoint(
"/access/user/password", ['POST'], self._handle_password_reset,
transports=['http', 'websocket'])
"/access/user/password", RequestType.POST, self._handle_password_reset,
transports=TransportType.HTTP | TransportType.WEBSOCKET
)
self.server.register_endpoint(
"/access/api_key", ['GET', 'POST'],
self._handle_apikey_request, transports=['http', 'websocket'])
"/access/api_key", RequestType.GET | RequestType.POST,
self._handle_apikey_request,
transports=TransportType.HTTP | TransportType.WEBSOCKET
)
self.server.register_endpoint(
"/access/oneshot_token", ['GET'],
self._handle_oneshot_request, transports=['http', 'websocket'])
"/access/oneshot_token", RequestType.GET, self._handle_oneshot_request,
transports=TransportType.HTTP | TransportType.WEBSOCKET
)
self.server.register_endpoint(
"/access/info", ['GET'],
self._handle_info_request, transports=['http', 'websocket'])
"/access/info", RequestType.GET, self._handle_info_request,
transports=TransportType.HTTP | TransportType.WEBSOCKET
)
wsm: WebsocketManager = self.server.lookup_component("websockets")
wsm.register_notification("authorization:user_created")
wsm.register_notification(
@ -274,8 +285,7 @@ class Authorization:
self.prune_timer.start(delay=PRUNE_CHECK_TIME)
async def _handle_apikey_request(self, web_request: WebRequest) -> str:
action = web_request.get_action()
if action.upper() == 'POST':
if web_request.get_request_type() == RequestType.POST:
self.api_key = uuid.uuid4().hex
self.users[API_USER]['api_key'] = self.api_key
self._sync_user(API_USER)
@ -360,11 +370,11 @@ class Authorization:
'action': 'user_jwt_refresh'
}
async def _handle_user_request(self,
web_request: WebRequest
) -> Dict[str, Any]:
action = web_request.get_action()
if action == "GET":
async def _handle_user_request(
self, web_request: WebRequest
) -> Dict[str, Any]:
req_type = web_request.get_request_type()
if req_type == RequestType.GET:
user = web_request.get_current_user()
if user is None:
return {
@ -378,10 +388,10 @@ class Authorization:
'source': user.get("source", "moonraker"),
'created_on': user.get('created_on')
}
elif action == "POST":
elif req_type == RequestType.POST:
# Create User
return await self._login_jwt_user(web_request, create=True)
elif action == "DELETE":
elif req_type == RequestType.DELETE:
# Delete User
return self._delete_jwt_user(web_request)
raise self.server.error("Invalid Request Method")

View File

@ -8,6 +8,7 @@ from __future__ import annotations
import logging
import time
from collections import deque
from ..common import RequestType
# Annotation imports
from typing import (
@ -59,11 +60,13 @@ class DataStore:
# Register endpoints
self.server.register_endpoint(
"/server/temperature_store", ['GET'],
self._handle_temp_store_request)
"/server/temperature_store", RequestType.GET,
self._handle_temp_store_request
)
self.server.register_endpoint(
"/server/gcode_store", ['GET'],
self._handle_gcode_store_request)
"/server/gcode_store", RequestType.GET,
self._handle_gcode_store_request
)
async def _init_sensors(self) -> None:
klippy_apis: APIComp = self.server.lookup_component('klippy_apis')

View File

@ -15,6 +15,7 @@ from threading import Lock as ThreadLock
import lmdb
from ..utils import Sentinel, ServerError
from ..utils import json_wrapper as jsonw
from ..common import RequestType
# Annotation imports
from typing import (
@ -174,15 +175,17 @@ class MoonrakerDatabase:
self.insert_item("moonraker", "database.unsafe_shutdowns",
unsafe_shutdowns + 1)
self.server.register_endpoint(
"/server/database/list", ['GET'], self._handle_list_request)
"/server/database/list", RequestType.GET, self._handle_list_request
)
self.server.register_endpoint(
"/server/database/item", ["GET", "POST", "DELETE"],
self._handle_item_request)
"/server/database/item", RequestType.all(), self._handle_item_request
)
self.server.register_debug_endpoint(
"/debug/database/list", ['GET'], self._handle_list_request)
"/debug/database/list", RequestType.GET, self._handle_list_request
)
self.server.register_debug_endpoint(
"/debug/database/item", ["GET", "POST", "DELETE"],
self._handle_item_request)
"/debug/database/item", RequestType.all(), self._handle_item_request
)
def get_database_path(self) -> str:
return self.database_path
@ -735,7 +738,7 @@ class MoonrakerDatabase:
async def _handle_item_request(self,
web_request: WebRequest
) -> Dict[str, Any]:
action = web_request.get_action()
req_type = web_request.get_request_type()
is_debug = web_request.get_endpoint().startswith("/debug/")
namespace = web_request.get_str("namespace")
if namespace in self.forbidden_namespaces and not is_debug:
@ -744,7 +747,7 @@ class MoonrakerDatabase:
" is forbidden", 403)
key: Any
valid_types: Tuple[type, ...]
if action != "GET":
if req_type != RequestType.GET:
if namespace in self.protected_namespaces and not is_debug:
raise self.server.error(
f"Write access to namespace '{namespace}'"
@ -758,16 +761,17 @@ class MoonrakerDatabase:
raise self.server.error(
"Value for argument 'key' is an invalid type: "
f"{type(key).__name__}")
if action == "GET":
if req_type == RequestType.GET:
val = await self.get_item(namespace, key)
elif action == "POST":
elif req_type == RequestType.POST:
val = web_request.get("value")
await self.insert_item(namespace, key, val)
elif action == "DELETE":
elif req_type == RequestType.DELETE:
val = await self.delete_item(namespace, key, drop_empty_db=True)
if is_debug:
self.debug_counter[action.lower()] += 1
name = req_type.name or str(req_type).split(".", 1)[-1]
self.debug_counter[name.lower()] += 1
await self.insert_item(
"moonraker", "database.debug_counter", self.debug_counter
)

View File

@ -7,7 +7,7 @@ from __future__ import annotations
import asyncio
import pathlib
import logging
from ..common import BaseRemoteConnection
from ..common import BaseRemoteConnection, RequestType, TransportType
from ..utils import get_unix_peer_credentials
# Annotation imports
@ -35,19 +35,19 @@ class ExtensionManager:
self.agent_methods: Dict[int, List[str]] = {}
self.uds_server: Optional[asyncio.AbstractServer] = None
self.server.register_endpoint(
"/connection/register_remote_method", ["POST"],
"/connection/register_remote_method", RequestType.POST,
self._register_agent_method,
transports=["websocket"]
transports=TransportType.WEBSOCKET
)
self.server.register_endpoint(
"/connection/send_event", ["POST"], self._handle_agent_event,
transports=["websocket"]
"/connection/send_event", RequestType.POST, self._handle_agent_event,
transports=TransportType.WEBSOCKET
)
self.server.register_endpoint(
"/server/extensions/list", ["GET"], self._handle_list_extensions
"/server/extensions/list", RequestType.GET, self._handle_list_extensions
)
self.server.register_endpoint(
"/server/extensions/request", ["POST"], self._handle_call_agent
"/server/extensions/request", RequestType.POST, self._handle_call_agent
)
def register_agent(self, connection: BaseRemoteConnection) -> None:

View File

@ -20,6 +20,7 @@ from inotify_simple import INotify
from inotify_simple import flags as iFlags
from ...utils import source_info
from ...utils import json_wrapper as jsonw
from ...common import RequestType, TransportType
# Annotation imports
from typing import (
@ -108,27 +109,37 @@ class FileManager:
# Register file management endpoints
self.server.register_endpoint(
"/server/files/list", ['GET'], self._handle_filelist_request)
"/server/files/list", RequestType.GET, self._handle_filelist_request
)
self.server.register_endpoint(
"/server/files/metadata", ['GET'], self._handle_metadata_request)
"/server/files/metadata", RequestType.GET, self._handle_metadata_request
)
self.server.register_endpoint(
"/server/files/metascan", ['POST'], self._handle_metascan_request)
"/server/files/metascan", RequestType.POST, self._handle_metascan_request
)
self.server.register_endpoint(
"/server/files/thumbnails", ['GET'], self._handle_list_thumbs)
"/server/files/thumbnails", RequestType.GET, self._handle_list_thumbs
)
self.server.register_endpoint(
"/server/files/roots", ['GET'], self._handle_list_roots)
"/server/files/roots", RequestType.GET, self._handle_list_roots
)
self.server.register_endpoint(
"/server/files/directory", ['GET', 'POST', 'DELETE'],
self._handle_directory_request)
"/server/files/directory", RequestType.all(),
self._handle_directory_request
)
self.server.register_endpoint(
"/server/files/move", ['POST'], self._handle_file_move_copy)
"/server/files/move", RequestType.POST, self._handle_file_move_copy
)
self.server.register_endpoint(
"/server/files/copy", ['POST'], self._handle_file_move_copy)
"/server/files/copy", RequestType.POST, self._handle_file_move_copy
)
self.server.register_endpoint(
"/server/files/zip", ['POST'], self._handle_zip_files)
"/server/files/zip", RequestType.POST, self._handle_zip_files
)
self.server.register_endpoint(
"/server/files/delete_file", ['DELETE'], self._handle_file_delete,
transports=["websocket"])
"/server/files/delete_file", RequestType.DELETE, self._handle_file_delete,
transports=TransportType.WEBSOCKET
)
# register client notificaitons
self.server.register_notification("file_manager:filelist_changed")
@ -474,8 +485,8 @@ class FileManager:
) -> Dict[str, Any]:
directory = web_request.get_str('path', "gcodes")
root, dir_path = self._convert_request_path(directory)
method = web_request.get_action()
if method == 'GET':
req_type = web_request.get_request_type()
if req_type == RequestType.GET:
is_extended = web_request.get_boolean('extended', False)
# Get list of files and subdirectories for this target
dir_info = self._list_directory(dir_path, root, is_extended)
@ -483,7 +494,7 @@ class FileManager:
async with self.sync_lock:
self.check_reserved_path(dir_path, True)
action = "create_dir"
if method == 'POST' and root in self.full_access_roots:
if req_type == RequestType.POST and root in self.full_access_roots:
# Create a new directory
self.sync_lock.setup("create_dir", dir_path)
try:
@ -491,7 +502,7 @@ class FileManager:
except Exception as e:
raise self.server.error(str(e))
self.fs_observer.on_item_create(root, dir_path, is_dir=True)
elif method == 'DELETE' and root in self.full_access_roots:
elif req_type == RequestType.DELETE and root in self.full_access_roots:
# Remove a directory
action = "delete_dir"
if directory.strip("/") == root:

View File

@ -6,7 +6,7 @@ from __future__ import annotations
import time
import logging
from asyncio import Lock
from ..common import JobEvent
from ..common import JobEvent, RequestType
# Annotation imports
from typing import (
@ -54,14 +54,19 @@ class History:
self.server.register_notification("history:history_changed")
self.server.register_endpoint(
"/server/history/job", ['GET', 'DELETE'], self._handle_job_request)
"/server/history/job", RequestType.GET | RequestType.DELETE,
self._handle_job_request
)
self.server.register_endpoint(
"/server/history/list", ['GET'], self._handle_jobs_list)
"/server/history/list", RequestType.GET, self._handle_jobs_list
)
self.server.register_endpoint(
"/server/history/totals", ['GET'], self._handle_job_totals)
"/server/history/totals", RequestType.GET, self._handle_job_totals
)
self.server.register_endpoint(
"/server/history/reset_totals", ['POST'],
self._handle_job_total_reset)
"/server/history/reset_totals", RequestType.POST,
self._handle_job_total_reset
)
database.register_local_namespace(HIST_NAMESPACE)
self.history_ns = database.wrap_namespace(HIST_NAMESPACE,
@ -78,14 +83,14 @@ class History:
web_request: WebRequest
) -> Dict[str, Any]:
async with self.request_lock:
action = web_request.get_action()
if action == "GET":
req_type = web_request.get_request_type()
if req_type == RequestType.GET:
job_id = web_request.get_str("uid")
if job_id not in self.cached_job_ids:
raise self.server.error(f"Invalid job uid: {job_id}", 404)
job = await self.history_ns[job_id]
return {"job": self._prep_requested_job(job, job_id)}
if action == "DELETE":
if req_type == RequestType.DELETE:
all = web_request.get_boolean("all", False)
if all:
deljobs = self.cached_job_ids

View File

@ -8,7 +8,7 @@ from __future__ import annotations
import asyncio
import time
import logging
from ..common import JobEvent
from ..common import JobEvent, RequestType
# Annotation imports
from typing import (
@ -56,16 +56,21 @@ class JobQueue:
self.start_queue)
self.server.register_endpoint(
"/server/job_queue/job", ['POST', 'DELETE'],
self._handle_job_request)
"/server/job_queue/job", RequestType.POST | RequestType.DELETE,
self._handle_job_request
)
self.server.register_endpoint(
"/server/job_queue/pause", ['POST'], self._handle_pause_queue)
"/server/job_queue/pause", RequestType.POST, self._handle_pause_queue
)
self.server.register_endpoint(
"/server/job_queue/start", ['POST'], self._handle_start_queue)
"/server/job_queue/start", RequestType.POST, self._handle_start_queue
)
self.server.register_endpoint(
"/server/job_queue/status", ['GET'], self._handle_queue_status)
"/server/job_queue/status", RequestType.GET, self._handle_queue_status
)
self.server.register_endpoint(
"/server/job_queue/jump", ['POST'], self._handle_jump)
"/server/job_queue/jump", RequestType.POST, self._handle_jump
)
async def _handle_ready(self) -> None:
async with self.lock:
@ -248,23 +253,23 @@ class JobQueue:
'queue_state': self.queue_state
})
async def _handle_job_request(self,
web_request: WebRequest
) -> Dict[str, Any]:
action = web_request.get_action()
if action == "POST":
async def _handle_job_request(
self, web_request: WebRequest
) -> Dict[str, Any]:
req_type = web_request.get_request_type()
if req_type == RequestType.POST:
files = web_request.get_list('filenames')
reset = web_request.get_boolean("reset", False)
# Validate that all files exist before queueing
await self.queue_job(files, reset=reset)
elif action == "DELETE":
elif req_type == RequestType.DELETE:
if web_request.get_boolean("all", False):
await self.delete_job([], all=True)
else:
job_ids = web_request.get_list('job_ids')
await self.delete_job(job_ids)
else:
raise self.server.error(f"Invalid action: {action}")
raise self.server.error(f"Invalid request type: {req_type}")
return {
'queued_jobs': self._job_map_to_list(),
'queue_state': self.queue_state

View File

@ -6,7 +6,7 @@
from __future__ import annotations
from ..utils import Sentinel
from ..common import WebRequest, Subscribable
from ..common import WebRequest, Subscribable, RequestType
# Annotation imports
from typing import (
@ -52,17 +52,23 @@ class KlippyAPI(Subscribable):
# Register GCode Aliases
self.server.register_endpoint(
"/printer/print/pause", ['POST'], self._gcode_pause)
"/printer/print/pause", RequestType.POST, self._gcode_pause
)
self.server.register_endpoint(
"/printer/print/resume", ['POST'], self._gcode_resume)
"/printer/print/resume", RequestType.POST, self._gcode_resume
)
self.server.register_endpoint(
"/printer/print/cancel", ['POST'], self._gcode_cancel)
"/printer/print/cancel", RequestType.POST, self._gcode_cancel
)
self.server.register_endpoint(
"/printer/print/start", ['POST'], self._gcode_start_print)
"/printer/print/start", RequestType.POST, self._gcode_start_print
)
self.server.register_endpoint(
"/printer/restart", ['POST'], self._gcode_restart)
"/printer/restart", RequestType.POST, self._gcode_restart
)
self.server.register_endpoint(
"/printer/firmware_restart", ['POST'], self._gcode_firmware_restart)
"/printer/firmware_restart", RequestType.POST, self._gcode_firmware_restart
)
self.server.register_event_handler(
"server:klippy_disconnect", self._on_klippy_disconnect
)

View File

@ -23,6 +23,7 @@ import configparser
from ..confighelper import FileSourceWrapper
from ..utils import source_info
from ..utils import json_wrapper as jsonw
from ..common import RequestType
# Annotation imports
from typing import (
@ -132,26 +133,29 @@ class Machine:
self.sudo_requests: List[Tuple[SudoCallback, str]] = []
self.server.register_endpoint(
"/machine/reboot", ['POST'], self._handle_machine_request)
"/machine/reboot", RequestType.POST, self._handle_machine_request
)
self.server.register_endpoint(
"/machine/shutdown", ['POST'], self._handle_machine_request)
"/machine/shutdown", RequestType.POST, self._handle_machine_request
)
self.server.register_endpoint(
"/machine/services/restart", ['POST'],
self._handle_service_request)
"/machine/services/restart", RequestType.POST, self._handle_service_request
)
self.server.register_endpoint(
"/machine/services/stop", ['POST'],
self._handle_service_request)
"/machine/services/stop", RequestType.POST, self._handle_service_request
)
self.server.register_endpoint(
"/machine/services/start", ['POST'],
self._handle_service_request)
"/machine/services/start", RequestType.POST, self._handle_service_request
)
self.server.register_endpoint(
"/machine/system_info", ['GET'],
self._handle_sysinfo_request)
"/machine/system_info", RequestType.GET, self._handle_sysinfo_request
)
self.server.register_endpoint(
"/machine/sudo/info", ["GET"], self._handle_sudo_info)
"/machine/sudo/info", RequestType.GET, self._handle_sudo_info
)
self.server.register_endpoint(
"/machine/sudo/password", ["POST"],
self._set_sudo_password)
"/machine/sudo/password", RequestType.POST, self._set_sudo_password
)
self.server.register_notification("machine:service_state_changed")
self.server.register_notification("machine:sudo_alert")

View File

@ -303,14 +303,18 @@ class MQTTClient(APITransport, Subscribable):
self.pending_responses: List[asyncio.Future] = []
self.pending_acks: Dict[int, asyncio.Future] = {}
# We don't need to register these endpoints over the MQTT transport as they
# are redundant. MQTT clients can already publish and subscribe.
ep_transports = TransportType.all() & ~TransportType.MQTT
self.server.register_endpoint(
"/server/mqtt/publish", ["POST"],
self._handle_publish_request,
transports=["http", "websocket", "internal"])
"/server/mqtt/publish", RequestType.POST, self._handle_publish_request,
transports=ep_transports
)
self.server.register_endpoint(
"/server/mqtt/subscribe", ["POST"],
"/server/mqtt/subscribe", RequestType.POST,
self._handle_subscription_request,
transports=["http", "websocket", "internal"])
transports=ep_transports
)
# Subscribe to API requests
self.json_rpc = JsonRPC(self.server, transport="MQTT")

View File

@ -10,7 +10,7 @@ import apprise
import logging
import pathlib
import re
from ..common import JobEvent
from ..common import JobEvent, RequestType
# Annotation imports
from typing import (
@ -76,10 +76,10 @@ class Notifier:
def register_endpoints(self, config: ConfigHelper):
self.server.register_endpoint(
"/server/notifiers/list", ["GET"], self._handle_notifier_list
"/server/notifiers/list", RequestType.GET, self._handle_notifier_list
)
self.server.register_debug_endpoint(
"/debug/notifiers/test", ["POST"], self._handle_notifier_test
"/debug/notifiers/test", RequestType.POST, self._handle_notifier_test
)
async def _handle_notifier_list(

View File

@ -6,6 +6,7 @@
from __future__ import annotations
import logging
from ..common import RequestType, TransportType
# Annotation imports
from typing import (
@ -65,22 +66,27 @@ class OctoPrintCompat:
# Version & Server information
self.server.register_endpoint(
'/api/version', ['GET'], self._get_version,
transports=['http'], wrap_result=False)
'/api/version', RequestType.GET, self._get_version,
transports=TransportType.HTTP, wrap_result=False
)
self.server.register_endpoint(
'/api/server', ['GET'], self._get_server,
transports=['http'], wrap_result=False)
'/api/server', RequestType.GET, self._get_server,
transports=TransportType.HTTP, wrap_result=False
)
# Login, User & Settings
self.server.register_endpoint(
'/api/login', ['POST'], self._post_login_user,
transports=['http'], wrap_result=False)
'/api/login', RequestType.POST, self._post_login_user,
transports=TransportType.HTTP, wrap_result=False
)
self.server.register_endpoint(
'/api/currentuser', ['GET'], self._post_login_user,
transports=['http'], wrap_result=False)
'/api/currentuser', RequestType.GET, self._post_login_user,
transports=TransportType.HTTP, wrap_result=False
)
self.server.register_endpoint(
'/api/settings', ['GET'], self._get_settings,
transports=['http'], wrap_result=False)
'/api/settings', RequestType.GET, self._get_settings,
transports=TransportType.HTTP, wrap_result=False
)
# File operations
# Note that file upload is handled in file_manager.py
@ -88,30 +94,34 @@ class OctoPrintCompat:
# Job operations
self.server.register_endpoint(
'/api/job', ['GET'], self._get_job,
transports=['http'], wrap_result=False)
'/api/job', RequestType.GET, self._get_job,
transports=TransportType.HTTP, wrap_result=False
)
# TODO: start/cancel/restart/pause jobs
# Printer operations
self.server.register_endpoint(
'/api/printer', ['GET'], self._get_printer,
transports=['http'], wrap_result=False)
'/api/printer', RequestType.GET, self._get_printer,
transports=TransportType.HTTP, wrap_result=False)
self.server.register_endpoint(
'/api/printer/command', ['POST'], self._post_command,
transports=['http'], wrap_result=False)
'/api/printer/command', RequestType.POST, self._post_command,
transports=TransportType.HTTP, wrap_result=False
)
# TODO: head/tool/bed/chamber specific read/issue
# Printer profiles
self.server.register_endpoint(
'/api/printerprofiles', ['GET'], self._get_printerprofiles,
transports=['http'], wrap_result=False)
'/api/printerprofiles', RequestType.GET, self._get_printerprofiles,
transports=TransportType.HTTP, wrap_result=False
)
# Upload Handlers
self.server.register_upload_handler(
"/api/files/local", location_prefix="api/files/moonraker")
self.server.register_endpoint(
"/api/files/moonraker/(?P<relative_path>.+)", ['POST'],
self._select_file, transports=['http'], wrap_result=False)
"/api/files/moonraker/(?P<relative_path>.+)", RequestType.POST,
self._select_file, transports=TransportType.HTTP, wrap_result=False
)
# System
# TODO: shutdown/reboot/restart operations

View File

@ -12,6 +12,7 @@ import asyncio
import time
from urllib.parse import quote, urlencode
from ..utils import json_wrapper as jsonw
from ..common import RequestType
# Annotation imports
from typing import (
@ -74,20 +75,24 @@ class PrinterPower:
self.devices[dev.get_name()] = dev
self.server.register_endpoint(
"/machine/device_power/devices", ['GET'],
self._handle_list_devices)
"/machine/device_power/devices", RequestType.GET, self._handle_list_devices
)
self.server.register_endpoint(
"/machine/device_power/status", ['GET'],
self._handle_batch_power_request)
"/machine/device_power/status", RequestType.GET,
self._handle_batch_power_request
)
self.server.register_endpoint(
"/machine/device_power/on", ['POST'],
self._handle_batch_power_request)
"/machine/device_power/on", RequestType.POST,
self._handle_batch_power_request
)
self.server.register_endpoint(
"/machine/device_power/off", ['POST'],
self._handle_batch_power_request)
"/machine/device_power/off", RequestType.POST,
self._handle_batch_power_request
)
self.server.register_endpoint(
"/machine/device_power/device", ['GET', 'POST'],
self._handle_single_power_request)
"/machine/device_power/device", RequestType.GET | RequestType.POST,
self._handle_single_power_request
)
self.server.register_remote_method(
"set_device_power", self.set_device_power)
self.server.register_event_handler(
@ -122,34 +127,35 @@ class PrinterPower:
)
await dev.process_request("on")
async def _handle_list_devices(self,
web_request: WebRequest
) -> Dict[str, Any]:
async def _handle_list_devices(
self, web_request: WebRequest
) -> Dict[str, Any]:
dev_list = [d.get_device_info() for d in self.devices.values()]
output = {"devices": dev_list}
return output
async def _handle_single_power_request(self,
web_request: WebRequest
) -> Dict[str, Any]:
async def _handle_single_power_request(
self, web_request: WebRequest
) -> Dict[str, Any]:
dev_name: str = web_request.get_str('device')
req_action = web_request.get_action()
req_type = web_request.get_request_type()
if dev_name not in self.devices:
raise self.server.error(f"No valid device named {dev_name}")
dev = self.devices[dev_name]
if req_action == 'GET':
if req_type == RequestType.GET:
action = "status"
elif req_action == "POST":
elif req_type == RequestType.POST:
action = web_request.get_str('action').lower()
if action not in ["on", "off", "toggle"]:
raise self.server.error(
f"Invalid requested action '{action}'")
raise self.server.error(f"Invalid requested action '{action}'")
else:
raise self.server.error(f"Invalid Request Type: {req_type}")
result = await dev.process_request(action)
return {dev_name: result}
async def _handle_batch_power_request(self,
web_request: WebRequest
) -> Dict[str, Any]:
async def _handle_batch_power_request(
self, web_request: WebRequest
) -> Dict[str, Any]:
args = web_request.get_args()
ep = web_request.get_endpoint()
if not args:

View File

@ -15,6 +15,7 @@ import pathlib
import logging
from collections import deque
from ..utils import ioctl_macros
from ..common import RequestType
# Annotation imports
from typing import (
@ -79,9 +80,11 @@ class ProcStats:
self.cpu_stats_file = pathlib.Path(CPU_STAT_PATH)
self.meminfo_file = pathlib.Path(MEM_AVAIL_PATH)
self.server.register_endpoint(
"/machine/proc_stats", ["GET"], self._handle_stat_request)
"/machine/proc_stats", RequestType.GET, self._handle_stat_request
)
self.server.register_event_handler(
"server:klippy_shutdown", self._handle_shutdown)
"server:klippy_shutdown", self._handle_shutdown
)
self.server.register_notification("proc_stats:proc_stat_update")
self.proc_stat_queue: Deque[Dict[str, Any]] = deque(maxlen=30)
self.last_update_time = time.time()

View File

@ -12,6 +12,7 @@ import logging
from collections import defaultdict, deque
from dataclasses import dataclass, replace
from functools import partial
from ..common import RequestType
# Annotation imports
from typing import (
@ -180,17 +181,17 @@ class Sensors:
# Register endpoints
self.server.register_endpoint(
"/server/sensors/list",
["GET"],
RequestType.GET,
self._handle_sensor_list_request,
)
self.server.register_endpoint(
"/server/sensors/info",
["GET"],
RequestType.GET,
self._handle_sensor_info_request,
)
self.server.register_endpoint(
"/server/sensors/measurements",
["GET"],
RequestType.GET,
self._handle_sensor_measurements_request,
)

View File

@ -9,6 +9,7 @@ import asyncio
import datetime
import logging
from typing import TYPE_CHECKING, Dict, Any
from ..common import RequestType
if TYPE_CHECKING:
from typing import Optional
@ -64,12 +65,12 @@ class SpoolManager:
def _register_endpoints(self):
self.server.register_endpoint(
"/server/spoolman/spool_id",
["GET", "POST"],
RequestType.GET | RequestType.POST,
self._handle_spool_id_request,
)
self.server.register_endpoint(
"/server/spoolman/proxy",
["POST"],
RequestType.POST,
self._proxy_spoolman_request,
)
@ -157,7 +158,7 @@ class SpoolManager:
self.extruded = 0
async def _handle_spool_id_request(self, web_request: WebRequest):
if web_request.get_action() == "POST":
if web_request.get_request_type() == RequestType.POST:
spool_id = web_request.get_int("spool_id", None)
await self.set_active_spool(spool_id)
# For GET requests we will simply return the spool_id

View File

@ -17,6 +17,7 @@ from .git_deploy import GitDeploy
from .zip_deploy import ZipDeploy
from .system_deploy import PackageDeploy
from .web_deploy import WebClientDeploy
from ...common import RequestType
# Annotation imports
from typing import (
@ -130,32 +131,32 @@ class UpdateManager:
self._handle_auto_refresh)
self.server.register_endpoint(
"/machine/update/moonraker", ["POST"],
self._handle_update_request)
"/machine/update/moonraker", RequestType.POST, self._handle_update_request
)
self.server.register_endpoint(
"/machine/update/klipper", ["POST"],
self._handle_update_request)
"/machine/update/klipper", RequestType.POST, self._handle_update_request
)
self.server.register_endpoint(
"/machine/update/system", ["POST"],
self._handle_update_request)
"/machine/update/system", RequestType.POST, self._handle_update_request
)
self.server.register_endpoint(
"/machine/update/client", ["POST"],
self._handle_update_request)
"/machine/update/client", RequestType.POST, self._handle_update_request
)
self.server.register_endpoint(
"/machine/update/full", ["POST"],
self._handle_full_update_request)
"/machine/update/full", RequestType.POST, self._handle_full_update_request
)
self.server.register_endpoint(
"/machine/update/status", ["GET"],
self._handle_status_request)
"/machine/update/status", RequestType.GET, self._handle_status_request
)
self.server.register_endpoint(
"/machine/update/refresh", ["POST"],
self._handle_refresh_request)
"/machine/update/refresh", RequestType.POST, self._handle_refresh_request
)
self.server.register_endpoint(
"/machine/update/recover", ["POST"],
self._handle_repo_recovery)
"/machine/update/recover", RequestType.POST, self._handle_repo_recovery
)
self.server.register_endpoint(
"/machine/update/rollback", ["POST"],
self._handle_rollback)
"/machine/update/rollback", RequestType.POST, self._handle_rollback
)
self.server.register_notification("update_manager:update_response")
self.server.register_notification("update_manager:update_refreshed")

View File

@ -10,6 +10,7 @@ import ipaddress
import socket
import uuid
import logging
from ..common import RequestType
from typing import (
TYPE_CHECKING,
Optional,
@ -50,14 +51,14 @@ class WebcamManager:
self.webcams[webcam.name] = webcam
self.server.register_endpoint(
"/server/webcams/list", ["GET"], self._handle_webcam_list
"/server/webcams/list", RequestType.GET, self._handle_webcam_list
)
self.server.register_endpoint(
"/server/webcams/item", ["GET", "POST", "DELETE"],
"/server/webcams/item", RequestType.all(),
self._handle_webcam_request
)
self.server.register_endpoint(
"/server/webcams/test", ["POST"], self._handle_webcam_test
"/server/webcams/test", RequestType.POST, self._handle_webcam_test
)
self.server.register_notification("webcam:webcams_changed")
self.server.register_event_handler(
@ -163,13 +164,13 @@ class WebcamManager:
return webcam
async def _handle_webcam_request(self, web_request: WebRequest) -> Dict[str, Any]:
action = web_request.get_action()
webcam = self._lookup_camera(web_request, action != "POST")
req_type = web_request.get_request_type()
webcam = self._lookup_camera(web_request, req_type != RequestType.POST)
webcam_data: Dict[str, Any] = {}
if action == "GET":
if req_type == RequestType.GET:
assert webcam is not None
webcam_data = webcam.as_dict()
elif action == "POST":
elif req_type == RequestType.POST:
if webcam is not None:
if webcam.source == "config":
raise self.server.error(
@ -191,7 +192,7 @@ class WebcamManager:
webcam = WebCam.from_web_request(self.server, web_request, uid)
await self._save_cam(webcam)
webcam_data = webcam.as_dict()
elif action == "DELETE":
elif req_type == RequestType.DELETE:
assert webcam is not None
if webcam.source == "config":
raise self.server.error(
@ -200,7 +201,7 @@ class WebcamManager:
)
webcam_data = webcam.as_dict()
self._delete_cam(webcam)
if action != "GET":
if req_type != RequestType.GET:
self.server.send_event(
"webcam:webcams_changed", {"webcams": self._list_webcams()}
)

View File

@ -16,6 +16,7 @@ import serial_asyncio
from tornado.httpclient import AsyncHTTPClient
from tornado.httpclient import HTTPRequest
from ..utils import json_wrapper as jsonw
from ..common import RequestType
# Annotation imports
from typing import (
@ -388,23 +389,24 @@ class WLED:
# As moonraker is about making things a web api, let's try it
# Yes, this is largely a cut-n-paste from power.py
self.server.register_endpoint(
"/machine/wled/strips", ["GET"],
self._handle_list_strips)
"/machine/wled/strips", RequestType.GET, self._handle_list_strips
)
self.server.register_endpoint(
"/machine/wled/status", ["GET"],
self._handle_batch_wled_request)
"/machine/wled/status", RequestType.GET, self._handle_batch_wled_request
)
self.server.register_endpoint(
"/machine/wled/on", ["POST"],
self._handle_batch_wled_request)
"/machine/wled/on", RequestType.POST, self._handle_batch_wled_request
)
self.server.register_endpoint(
"/machine/wled/off", ["POST"],
self._handle_batch_wled_request)
"/machine/wled/off", RequestType.POST, self._handle_batch_wled_request
)
self.server.register_endpoint(
"/machine/wled/toggle", ["POST"],
self._handle_batch_wled_request)
"/machine/wled/toggle", RequestType.POST, self._handle_batch_wled_request
)
self.server.register_endpoint(
"/machine/wled/strip", ["GET", "POST"],
self._handle_single_wled_request)
"/machine/wled/strip", RequestType.GET | RequestType.POST,
self._handle_single_wled_request
)
async def component_init(self) -> None:
try:
@ -521,19 +523,19 @@ class WLED:
intensity: int = web_request.get_int('intensity', -1)
speed: int = web_request.get_int('speed', -1)
req_action = web_request.get_action()
req_type = web_request.get_request_type()
if strip_name not in self.strips:
raise self.server.error(f"No valid strip named {strip_name}")
strip = self.strips[strip_name]
if req_action == 'GET':
if req_type == RequestType.GET:
return {strip_name: strip.get_strip_info()}
elif req_action == "POST":
elif req_type == RequestType.POST:
action = web_request.get_str('action').lower()
if action not in ["on", "off", "toggle", "control"]:
raise self.server.error(
f"Invalid requested action '{action}'")
result = await self._process_request(strip, action, preset,
brightness, intensity, speed)
raise self.server.error(f"Invalid requested action '{action}'")
result = await self._process_request(
strip, action, preset, brightness, intensity, speed
)
return {strip_name: result}
async def _handle_batch_wled_request(self: WLED,

View File

@ -14,6 +14,7 @@ from itertools import cycle
from email.utils import formatdate
from zeroconf import IPVersion
from zeroconf.asyncio import AsyncServiceInfo, AsyncZeroconf
from ..common import RequestType, TransportType
from typing import (
TYPE_CHECKING,
@ -214,9 +215,9 @@ class SSDPServer(asyncio.protocols.DatagramProtocol):
auth.register_permited_path("/server/zeroconf/ssdp")
self.server.register_endpoint(
"/server/zeroconf/ssdp",
["GET"],
RequestType.GET,
self._handle_xml_request,
transports=["http"],
transports=TransportType.HTTP,
wrap_result=False,
content_type="application/xml"
)

View File

@ -12,6 +12,7 @@ import os
import sys
import asyncio
from queue import SimpleQueue as Queue
from .common import RequestType
# Annotation imports
from typing import (
@ -112,7 +113,7 @@ class LogManager:
def set_server(self, server: Server) -> None:
self.server = server
self.server.register_endpoint(
"/server/logs/rollover", ['POST'], self._handle_log_rollover
"/server/logs/rollover", RequestType.POST, self._handle_log_rollover
)
def set_rollover_info(self, name: str, item: str) -> None:

View File

@ -25,6 +25,7 @@ from .app import MoonrakerApp
from .klippy_connection import KlippyConnection
from .utils import ServerError, Sentinel, get_software_info, json_wrapper
from .loghelper import LogManager
from .common import RequestType
# Annotation imports
from typing import (
@ -102,11 +103,14 @@ class Server:
self.add_warning(warning)
self.register_endpoint(
"/server/info", ['GET'], self._handle_info_request)
"/server/info", RequestType.GET, self._handle_info_request
)
self.register_endpoint(
"/server/config", ['GET'], self._handle_config_request)
"/server/config", RequestType.GET, self._handle_config_request
)
self.register_endpoint(
"/server/restart", ['POST'], self._handle_server_restart)
"/server/restart", RequestType.POST, self._handle_server_restart
)
self.register_notification("server:klippy_ready")
self.register_notification("server:klippy_shutdown")
self.register_notification("server:klippy_disconnect",