diff --git a/docs/web_api.md b/docs/web_api.md index f3e4481..e48d91c 100644 --- a/docs/web_api.md +++ b/docs/web_api.md @@ -3757,6 +3757,37 @@ When the state of a service changes the following notification is sent: The example above shows that the `klipper` service has changed to `inactive`. +#### Job Queue Changed +Moonraker will send a `job_queue_changed` notification when a change is +detected to the queue state or the queue itself: + +```json +{ + "jsonrpc": "2.0", + "method": "notify_job_queue_changed", + "params": [ + { + "action": "state_changed", + "updated_queue": null, + "queue_state": "paused" + } + ] +} +``` + +The object sent with the notification contains the following fields: + +- `action`: The action taken to the queue which led to the notification. + Will be a string set to one of the following values: + - `state_changed`: The queue state has changed + - `jobs_added`: One or more jobs were added to the queue + - `jobs_removed`: One or more jobs were removed from the queue + - `job_loaded`: A job was popped off the queue and successfull started +- `updated_queue`: If the queue itself is changed this will be a list + containing each item in the queue. If the queue has not changed this will + be `null`. +- `queue_state`: The current queue state + ### Appendix #### Websocket setup diff --git a/moonraker/components/job_queue.py b/moonraker/components/job_queue.py index 025fd03..46e8814 100644 --- a/moonraker/components/job_queue.py +++ b/moonraker/components/job_queue.py @@ -52,6 +52,7 @@ class JobQueue: self.server.register_event_handler( "job_state:cancelled", self._on_job_abort) + self.server.register_notification("job_queue:job_queue_changed") self.server.register_remote_method("pause_job_queue", self.pause_queue) self.server.register_remote_method("start_job_queue", self.start_queue) @@ -73,14 +74,14 @@ class JobQueue: # start a queued print if self.queue_state in ['ready', 'paused']: event_loop = self.server.get_event_loop() - self.queue_state = "loading" + self._set_queue_state("loading") self.pop_queue_handle = event_loop.delay_callback( 0.01, self._pop_job, False) async def _handle_shutdown(self) -> None: await self.pause_queue() - if not self.queued_jobs: - self.queue_state = "ready" + if not self.queued_jobs and self.automatic: + self._set_queue_state("ready") async def _on_job_complete(self, prev_stats: Dict[str, Any], @@ -90,7 +91,7 @@ class JobQueue: # Transition to the next job in the queue if self.queue_state == "ready" and self.queued_jobs: event_loop = self.server.get_event_loop() - self.queue_state = "loading" + self._set_queue_state("loading") self.pop_queue_handle = event_loop.delay_callback( self.job_delay, self._pop_job) @@ -100,7 +101,7 @@ class JobQueue: ) -> None: async with self.lock: if self.queued_jobs: - self.queue_state = "paused" + self._set_queue_state("paused") async def _pop_job(self, need_transition: bool = True) -> None: self.pop_queue_handle = None @@ -108,14 +109,15 @@ class JobQueue: if self.queue_state == "paused": return if not self.queued_jobs: - self.queue_state = "ready" + qs = "ready" if self.automatic else "paused" + self._set_queue_state(qs) return kapis: KlippyAPI = self.server.lookup_component('klippy_apis') uid, job = list(self.queued_jobs.items())[0] filename = str(job) can_print = await self._check_can_print() if not can_print or self.queue_state != "loading": - self.queue_state = "paused" + self._set_queue_state("paused") return try: if self.job_transition_gcode and need_transition: @@ -125,17 +127,18 @@ class JobQueue: if self.queue_state != "loading": raise self.server.error( "Queue State Changed during Transition Gcode") - self.queue_state = "starting" + self._set_queue_state("starting") await kapis.start_print(filename) except self.server.error: logging.exception(f"Error Loading print: {filename}") - self.queue_state = "paused" + self._set_queue_state("paused") else: self.queued_jobs.pop(uid, None) if self.queue_state == "starting": # If the queue was not paused while starting the print, set_ready = not self.queued_jobs or self.automatic self.queue_state = "ready" if set_ready else "paused" + self._send_queue_event(action="job_loaded") async def _check_can_print(self) -> bool: # Query the latest stats @@ -152,18 +155,45 @@ class JobQueue: return False return True - async def queue_job(self, filename: str, + async def queue_job(self, + filenames: Union[str, List[str]], check_exists: bool = True ) -> None: async with self.lock: # Make sure that the file exists + if isinstance(filenames, str): + filenames = [filenames] if check_exists: - self._check_job_file(filename) - queued_job = QueuedJob(filename) - self.queued_jobs[queued_job.job_id] = queued_job + # Make sure all files exist before adding them to the queue + for fname in filenames: + self._check_job_file(fname) + for fname in filenames: + queued_job = QueuedJob(fname) + self.queued_jobs[queued_job.job_id] = queued_job + self._send_queue_event(action="jobs_added") + + async def delete_job(self, + job_ids: Union[str, List[str]], + all: bool = False + ) -> None: + async with self.lock: + if not self.queued_jobs: + # No jobs in queue, nothing to delete + return + if all: + self.queued_jobs.clear() + elif job_ids: + if isinstance(job_ids, str): + job_ids = [job_ids] + for uid in job_ids: + self.queued_jobs.pop(uid, None) + else: + # Don't notify, nothing was deleted + return + self._send_queue_event(action="jobs_removed") async def pause_queue(self) -> None: - self.queue_state = "paused" + self._set_queue_state("paused") if self.pop_queue_handle is not None: self.pop_queue_handle.cancel() self.pop_queue_handle = None @@ -176,13 +206,12 @@ class JobQueue: async with self.lock: if self.queue_state != "loading": if self.queued_jobs and await self._check_can_print(): - self.queue_state = "loading" + self._set_queue_state("loading") event_loop = self.server.get_event_loop() self.pop_queue_handle = event_loop.delay_callback( 0.01, self._pop_job) else: - self.queue_state = "ready" - + self._set_queue_state("ready") def _job_map_to_list(self) -> List[Dict[str, Any]]: cur_time = time.time() return [job.as_dict(cur_time) for @@ -194,6 +223,24 @@ class JobQueue: raise self.server.error( f"G-Code File {job_name} does not exist") + def _set_queue_state(self, new_state: str) -> None: + if new_state != self.queue_state: + self.queue_state = new_state + self._send_queue_event() + + def _send_queue_event(self, action: str = "state_changed"): + updated_queue: Optional[List[Dict[str, Any]]] = None + if action != "state_changed": + updated_queue = self._job_map_to_list() + event_loop = self.server.get_event_loop() + event_loop.delay_callback( + .05, self.server.send_event, "job_queue:job_queue_changed", + { + 'action': action, + 'updated_queue': updated_queue, + 'queue_state': self.queue_state + }) + async def _handle_job_request(self, web_request: WebRequest ) -> Dict[str, Any]: @@ -203,22 +250,16 @@ class JobQueue: if isinstance(files, str): files = [f.strip() for f in files.split(',') if f.strip()] # Validate that all files exist before queueing - for fname in files: - self._check_job_file(fname) - for fname in files: - await self.queue_job(fname, check_exists=False) + await self.queue_job(files) elif action == "DELETE": if web_request.get_boolean("all", False): - async with self.lock: - self.queued_jobs.clear() + await self.delete_job([], all=True) else: job_ids: Union[List[str], str] = web_request.get('job_ids') if isinstance(job_ids, str): job_ids = [f.strip() for f in job_ids.split(',') if f.strip()] - async with self.lock: - for uid in job_ids: - self.queued_jobs.pop(uid, None) + await self.delete_job(job_ids) else: raise self.server.error(f"Invalid action: {action}") return {