diff --git a/moonraker/components/job_queue.py b/moonraker/components/job_queue.py new file mode 100644 index 0000000..7777fa2 --- /dev/null +++ b/moonraker/components/job_queue.py @@ -0,0 +1,283 @@ +# Print Job Queue Implementation +# +# Copyright (C) 2021 Eric Callahan +# +# This file may be distributed under the terms of the GNU GPLv3 license. + +from __future__ import annotations +import asyncio +import time +import logging + +# Annotation imports +from typing import ( + TYPE_CHECKING, + Any, + Optional, + Dict, + List, + Union, +) +if TYPE_CHECKING: + from confighelper import ConfigHelper + from websockets import WebRequest + from .klippy_apis import KlippyAPI + from .file_manager.file_manager import FileManager + +class JobQueue: + def __init__(self, config: ConfigHelper) -> None: + self.server = config.get_server() + self.queued_jobs: Dict[str, QueuedJob] = {} + self.queue_state: str = "ready" + self.lock = asyncio.Lock() + self.load_on_start = config.getboolean("load_on_startup", False) + self.job_delay = config.getfloat("job_transition_delay", 0.01) + if self.job_delay <= 0.: + raise config.error( + "Value for option 'job_transition_delay' in section [job_queue]" + " must be above 0.0") + self.job_transition_gcode = config.get( + "job_transition_gcode", "").strip() + self.pop_queue_handle: Optional[asyncio.TimerHandle] = None + + self.server.register_event_handler( + "server:klippy_ready", self._handle_ready) + self.server.register_event_handler( + "server:klippy_shutdown", self._handle_shutdown) + self.server.register_event_handler( + "job_state:complete", self._on_job_complete) + self.server.register_event_handler( + "job_state:error", self._on_job_abort) + self.server.register_event_handler( + "job_state:cancelled", self._on_job_abort) + + self.server.register_endpoint( + "/server/job_queue/job", ['POST', 'DELETE'], + self._handle_job_request) + self.server.register_endpoint( + "/server/job_queue/pause", ['POST'], self._handle_pause_queue) + self.server.register_endpoint( + "/server/job_queue/resume", ['POST'], self._handle_resume_queue) + self.server.register_endpoint( + "/server/job_queue/status", ['GET'], self._handle_queue_status) + + async def _handle_ready(self) -> None: + async with self.lock: + if not self.load_on_start or not self.queued_jobs: + return + # start a queued print + if self.queue_state in ['ready', 'paused']: + event_loop = self.server.get_event_loop() + self.queue_state = "loading" + self.pop_queue_handle = event_loop.delay_callback( + 0.01, self._pop_job, False) + + async def _handle_shutdown(self) -> None: + await self.pause_queue() + if not self.queued_jobs: + self.queue_state = "ready" + + async def _on_job_complete(self, + prev_stats: Dict[str, Any], + new_stats: Dict[str, Any] + ) -> None: + async with self.lock: + # Transition to the next job in the queue + if self.queue_state == "ready" and self.queued_jobs: + event_loop = self.server.get_event_loop() + self.queue_state = "loading" + self.pop_queue_handle = event_loop.delay_callback( + self.job_delay, self._pop_job) + + async def _on_job_abort(self, + prev_stats: Dict[str, Any], + new_stats: Dict[str, Any] + ) -> None: + async with self.lock: + if self.queued_jobs: + self.queue_state = "paused" + + async def _pop_job(self, need_transition: bool = True) -> None: + self.pop_queue_handle = None + async with self.lock: + if self.queue_state == "paused": + return + if not self.queued_jobs: + self.queue_state = "ready" + return + kapis: KlippyAPI = self.server.lookup_component('klippy_apis') + uid, job = list(self.queued_jobs.items())[0] + filename = str(job) + can_print = await self._check_can_print() + if not can_print or self.queue_state != "loading": + self.queue_state = "paused" + return + try: + if self.job_transition_gcode and need_transition: + await kapis.run_gcode(self.job_transition_gcode) + # Check to see if the queue was paused while running + # the job transition gcode + if self.queue_state != "loading": + raise self.server.error( + "Queue State Changed during Transition Gcode") + self.queue_state = "starting" + await kapis.start_print(filename) + except self.server.error: + logging.exception(f"Error Loading print: {filename}") + self.queue_state = "paused" + else: + self.queued_jobs.pop(uid, None) + if self.queue_state == "starting": + # If the queue was not paused while starting the print, + # reset state to "ready" + self.queue_state = "ready" + + async def _check_can_print(self) -> bool: + # Query the latest stats + kapis: KlippyAPI = self.server.lookup_component('klippy_apis') + try: + result = await kapis.query_objects({"print_stats": None}) + except Exception: + # Klippy not connected + return False + if 'print_stats' not in result: + return False + state: str = result['print_stats']['state'] + if state in ["printing", "paused"]: + return False + return True + + async def queue_job(self, filename: str, + check_exists: bool = True + ) -> bool: + async with self.lock: + # Make sure that the file exists + if check_exists: + self._check_job_file(filename) + can_print = await self._check_can_print() + if ( + self.queue_state == "ready" and + not self.queued_jobs and + can_print + ): + # Printer is ready to accept a print + kapis: KlippyAPI = self.server.lookup_component('klippy_apis') + try: + await kapis.start_print(filename) + except self.server.error: + # Attempt to start print failed, queue the print + pass + else: + return True + queued_job = QueuedJob(filename) + self.queued_jobs[queued_job.job_id] = queued_job + return False + + async def pause_queue(self) -> None: + self.queue_state = "paused" + if self.pop_queue_handle is not None: + self.pop_queue_handle.cancel() + self.pop_queue_handle = None + # Acquire the lock to wait for any pending operations to + # complete + await self.lock.acquire() + self.lock.release() + + def _job_map_to_list(self) -> List[Dict[str, Any]]: + cur_time = time.time() + return [job.as_dict(cur_time) for + job in self.queued_jobs.values()] + + def _check_job_file(self, job_name: str) -> None: + fm: FileManager = self.server.lookup_component('file_manager') + if not fm.check_file_exists("gcodes", job_name): + raise self.server.error( + f"G-Code File {job_name} does not exist") + + async def _handle_job_request(self, + web_request: WebRequest + ) -> Dict[str, Any]: + action = web_request.get_action() + if action == "POST": + files: Union[List[str], str] = web_request.get('filenames') + if isinstance(files, str): + files = [f.strip() for f in files.split(',') if f.strip()] + # Validate that all files exist before queueing + for fname in files: + self._check_job_file(fname) + for fname in files: + await self.queue_job(fname, check_exists=False) + elif action == "DELETE": + if web_request.get_boolean("all", False): + async with self.lock: + self.queued_jobs.clear() + else: + job_ids: Union[List[str], str] = web_request.get('job_ids') + if isinstance(job_ids, str): + job_ids = [f.strip() for f in job_ids.split(',') + if f.strip()] + async with self.lock: + for uid in job_ids: + self.queued_jobs.pop(uid, None) + else: + raise self.server.error(f"Invalid action: {action}") + return { + 'queued_jobs': self._job_map_to_list(), + 'queue_state': self.queue_state + } + + async def _handle_pause_queue(self, + web_request: WebRequest + ) -> Dict[str, Any]: + await self.pause_queue() + return { + 'queued_jobs': self._job_map_to_list(), + 'queue_state': self.queue_state + } + + async def _handle_resume_queue(self, + web_request: WebRequest + ) -> Dict[str, Any]: + async with self.lock: + if self.queue_state == "paused": + self.queue_state = "ready" + if self.queued_jobs and self.pop_queue_handle is None: + self.queue_state = "loading" + event_loop = self.server.get_event_loop() + self.pop_queue_handle = event_loop.delay_callback( + 0.01, self._pop_job) + return { + 'queued_jobs': self._job_map_to_list(), + 'queue_state': self.queue_state + } + + async def _handle_queue_status(self, + web_request: WebRequest + ) -> Dict[str, Any]: + return { + 'queued_jobs': self._job_map_to_list(), + 'queue_state': self.queue_state + } + + async def close(self): + await self.pause_queue() + +class QueuedJob: + def __init__(self, filename: str) -> None: + self.filename = filename + self.job_id = f"{id(self):016X}" + self.time_added = time.time() + + def __str__(self) -> str: + return self.filename + + def as_dict(self, cur_time: float) -> Dict[str, Any]: + return { + 'filename': self.filename, + 'job_id': self.job_id, + 'time_added': self.time_added, + 'time_in_queue': cur_time - self.time_added + } + +def load_component(config: ConfigHelper) -> JobQueue: + return JobQueue(config)