job_queue: add websocket notification
Signed-off-by: Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
parent
269897bef7
commit
3df0f3ffe4
|
@ -3757,6 +3757,37 @@ When the state of a service changes the following notification is sent:
|
||||||
|
|
||||||
The example above shows that the `klipper` service has changed to `inactive`.
|
The example above shows that the `klipper` service has changed to `inactive`.
|
||||||
|
|
||||||
|
#### Job Queue Changed
|
||||||
|
Moonraker will send a `job_queue_changed` notification when a change is
|
||||||
|
detected to the queue state or the queue itself:
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"jsonrpc": "2.0",
|
||||||
|
"method": "notify_job_queue_changed",
|
||||||
|
"params": [
|
||||||
|
{
|
||||||
|
"action": "state_changed",
|
||||||
|
"updated_queue": null,
|
||||||
|
"queue_state": "paused"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
The object sent with the notification contains the following fields:
|
||||||
|
|
||||||
|
- `action`: The action taken to the queue which led to the notification.
|
||||||
|
Will be a string set to one of the following values:
|
||||||
|
- `state_changed`: The queue state has changed
|
||||||
|
- `jobs_added`: One or more jobs were added to the queue
|
||||||
|
- `jobs_removed`: One or more jobs were removed from the queue
|
||||||
|
- `job_loaded`: A job was popped off the queue and successfull started
|
||||||
|
- `updated_queue`: If the queue itself is changed this will be a list
|
||||||
|
containing each item in the queue. If the queue has not changed this will
|
||||||
|
be `null`.
|
||||||
|
- `queue_state`: The current queue state
|
||||||
|
|
||||||
### Appendix
|
### Appendix
|
||||||
|
|
||||||
#### Websocket setup
|
#### Websocket setup
|
||||||
|
|
|
@ -52,6 +52,7 @@ class JobQueue:
|
||||||
self.server.register_event_handler(
|
self.server.register_event_handler(
|
||||||
"job_state:cancelled", self._on_job_abort)
|
"job_state:cancelled", self._on_job_abort)
|
||||||
|
|
||||||
|
self.server.register_notification("job_queue:job_queue_changed")
|
||||||
self.server.register_remote_method("pause_job_queue", self.pause_queue)
|
self.server.register_remote_method("pause_job_queue", self.pause_queue)
|
||||||
self.server.register_remote_method("start_job_queue",
|
self.server.register_remote_method("start_job_queue",
|
||||||
self.start_queue)
|
self.start_queue)
|
||||||
|
@ -73,14 +74,14 @@ class JobQueue:
|
||||||
# start a queued print
|
# start a queued print
|
||||||
if self.queue_state in ['ready', 'paused']:
|
if self.queue_state in ['ready', 'paused']:
|
||||||
event_loop = self.server.get_event_loop()
|
event_loop = self.server.get_event_loop()
|
||||||
self.queue_state = "loading"
|
self._set_queue_state("loading")
|
||||||
self.pop_queue_handle = event_loop.delay_callback(
|
self.pop_queue_handle = event_loop.delay_callback(
|
||||||
0.01, self._pop_job, False)
|
0.01, self._pop_job, False)
|
||||||
|
|
||||||
async def _handle_shutdown(self) -> None:
|
async def _handle_shutdown(self) -> None:
|
||||||
await self.pause_queue()
|
await self.pause_queue()
|
||||||
if not self.queued_jobs:
|
if not self.queued_jobs and self.automatic:
|
||||||
self.queue_state = "ready"
|
self._set_queue_state("ready")
|
||||||
|
|
||||||
async def _on_job_complete(self,
|
async def _on_job_complete(self,
|
||||||
prev_stats: Dict[str, Any],
|
prev_stats: Dict[str, Any],
|
||||||
|
@ -90,7 +91,7 @@ class JobQueue:
|
||||||
# Transition to the next job in the queue
|
# Transition to the next job in the queue
|
||||||
if self.queue_state == "ready" and self.queued_jobs:
|
if self.queue_state == "ready" and self.queued_jobs:
|
||||||
event_loop = self.server.get_event_loop()
|
event_loop = self.server.get_event_loop()
|
||||||
self.queue_state = "loading"
|
self._set_queue_state("loading")
|
||||||
self.pop_queue_handle = event_loop.delay_callback(
|
self.pop_queue_handle = event_loop.delay_callback(
|
||||||
self.job_delay, self._pop_job)
|
self.job_delay, self._pop_job)
|
||||||
|
|
||||||
|
@ -100,7 +101,7 @@ class JobQueue:
|
||||||
) -> None:
|
) -> None:
|
||||||
async with self.lock:
|
async with self.lock:
|
||||||
if self.queued_jobs:
|
if self.queued_jobs:
|
||||||
self.queue_state = "paused"
|
self._set_queue_state("paused")
|
||||||
|
|
||||||
async def _pop_job(self, need_transition: bool = True) -> None:
|
async def _pop_job(self, need_transition: bool = True) -> None:
|
||||||
self.pop_queue_handle = None
|
self.pop_queue_handle = None
|
||||||
|
@ -108,14 +109,15 @@ class JobQueue:
|
||||||
if self.queue_state == "paused":
|
if self.queue_state == "paused":
|
||||||
return
|
return
|
||||||
if not self.queued_jobs:
|
if not self.queued_jobs:
|
||||||
self.queue_state = "ready"
|
qs = "ready" if self.automatic else "paused"
|
||||||
|
self._set_queue_state(qs)
|
||||||
return
|
return
|
||||||
kapis: KlippyAPI = self.server.lookup_component('klippy_apis')
|
kapis: KlippyAPI = self.server.lookup_component('klippy_apis')
|
||||||
uid, job = list(self.queued_jobs.items())[0]
|
uid, job = list(self.queued_jobs.items())[0]
|
||||||
filename = str(job)
|
filename = str(job)
|
||||||
can_print = await self._check_can_print()
|
can_print = await self._check_can_print()
|
||||||
if not can_print or self.queue_state != "loading":
|
if not can_print or self.queue_state != "loading":
|
||||||
self.queue_state = "paused"
|
self._set_queue_state("paused")
|
||||||
return
|
return
|
||||||
try:
|
try:
|
||||||
if self.job_transition_gcode and need_transition:
|
if self.job_transition_gcode and need_transition:
|
||||||
|
@ -125,17 +127,18 @@ class JobQueue:
|
||||||
if self.queue_state != "loading":
|
if self.queue_state != "loading":
|
||||||
raise self.server.error(
|
raise self.server.error(
|
||||||
"Queue State Changed during Transition Gcode")
|
"Queue State Changed during Transition Gcode")
|
||||||
self.queue_state = "starting"
|
self._set_queue_state("starting")
|
||||||
await kapis.start_print(filename)
|
await kapis.start_print(filename)
|
||||||
except self.server.error:
|
except self.server.error:
|
||||||
logging.exception(f"Error Loading print: {filename}")
|
logging.exception(f"Error Loading print: {filename}")
|
||||||
self.queue_state = "paused"
|
self._set_queue_state("paused")
|
||||||
else:
|
else:
|
||||||
self.queued_jobs.pop(uid, None)
|
self.queued_jobs.pop(uid, None)
|
||||||
if self.queue_state == "starting":
|
if self.queue_state == "starting":
|
||||||
# If the queue was not paused while starting the print,
|
# If the queue was not paused while starting the print,
|
||||||
set_ready = not self.queued_jobs or self.automatic
|
set_ready = not self.queued_jobs or self.automatic
|
||||||
self.queue_state = "ready" if set_ready else "paused"
|
self.queue_state = "ready" if set_ready else "paused"
|
||||||
|
self._send_queue_event(action="job_loaded")
|
||||||
|
|
||||||
async def _check_can_print(self) -> bool:
|
async def _check_can_print(self) -> bool:
|
||||||
# Query the latest stats
|
# Query the latest stats
|
||||||
|
@ -152,18 +155,45 @@ class JobQueue:
|
||||||
return False
|
return False
|
||||||
return True
|
return True
|
||||||
|
|
||||||
async def queue_job(self, filename: str,
|
async def queue_job(self,
|
||||||
|
filenames: Union[str, List[str]],
|
||||||
check_exists: bool = True
|
check_exists: bool = True
|
||||||
) -> None:
|
) -> None:
|
||||||
async with self.lock:
|
async with self.lock:
|
||||||
# Make sure that the file exists
|
# Make sure that the file exists
|
||||||
|
if isinstance(filenames, str):
|
||||||
|
filenames = [filenames]
|
||||||
if check_exists:
|
if check_exists:
|
||||||
self._check_job_file(filename)
|
# Make sure all files exist before adding them to the queue
|
||||||
queued_job = QueuedJob(filename)
|
for fname in filenames:
|
||||||
self.queued_jobs[queued_job.job_id] = queued_job
|
self._check_job_file(fname)
|
||||||
|
for fname in filenames:
|
||||||
|
queued_job = QueuedJob(fname)
|
||||||
|
self.queued_jobs[queued_job.job_id] = queued_job
|
||||||
|
self._send_queue_event(action="jobs_added")
|
||||||
|
|
||||||
|
async def delete_job(self,
|
||||||
|
job_ids: Union[str, List[str]],
|
||||||
|
all: bool = False
|
||||||
|
) -> None:
|
||||||
|
async with self.lock:
|
||||||
|
if not self.queued_jobs:
|
||||||
|
# No jobs in queue, nothing to delete
|
||||||
|
return
|
||||||
|
if all:
|
||||||
|
self.queued_jobs.clear()
|
||||||
|
elif job_ids:
|
||||||
|
if isinstance(job_ids, str):
|
||||||
|
job_ids = [job_ids]
|
||||||
|
for uid in job_ids:
|
||||||
|
self.queued_jobs.pop(uid, None)
|
||||||
|
else:
|
||||||
|
# Don't notify, nothing was deleted
|
||||||
|
return
|
||||||
|
self._send_queue_event(action="jobs_removed")
|
||||||
|
|
||||||
async def pause_queue(self) -> None:
|
async def pause_queue(self) -> None:
|
||||||
self.queue_state = "paused"
|
self._set_queue_state("paused")
|
||||||
if self.pop_queue_handle is not None:
|
if self.pop_queue_handle is not None:
|
||||||
self.pop_queue_handle.cancel()
|
self.pop_queue_handle.cancel()
|
||||||
self.pop_queue_handle = None
|
self.pop_queue_handle = None
|
||||||
|
@ -176,13 +206,12 @@ class JobQueue:
|
||||||
async with self.lock:
|
async with self.lock:
|
||||||
if self.queue_state != "loading":
|
if self.queue_state != "loading":
|
||||||
if self.queued_jobs and await self._check_can_print():
|
if self.queued_jobs and await self._check_can_print():
|
||||||
self.queue_state = "loading"
|
self._set_queue_state("loading")
|
||||||
event_loop = self.server.get_event_loop()
|
event_loop = self.server.get_event_loop()
|
||||||
self.pop_queue_handle = event_loop.delay_callback(
|
self.pop_queue_handle = event_loop.delay_callback(
|
||||||
0.01, self._pop_job)
|
0.01, self._pop_job)
|
||||||
else:
|
else:
|
||||||
self.queue_state = "ready"
|
self._set_queue_state("ready")
|
||||||
|
|
||||||
def _job_map_to_list(self) -> List[Dict[str, Any]]:
|
def _job_map_to_list(self) -> List[Dict[str, Any]]:
|
||||||
cur_time = time.time()
|
cur_time = time.time()
|
||||||
return [job.as_dict(cur_time) for
|
return [job.as_dict(cur_time) for
|
||||||
|
@ -194,6 +223,24 @@ class JobQueue:
|
||||||
raise self.server.error(
|
raise self.server.error(
|
||||||
f"G-Code File {job_name} does not exist")
|
f"G-Code File {job_name} does not exist")
|
||||||
|
|
||||||
|
def _set_queue_state(self, new_state: str) -> None:
|
||||||
|
if new_state != self.queue_state:
|
||||||
|
self.queue_state = new_state
|
||||||
|
self._send_queue_event()
|
||||||
|
|
||||||
|
def _send_queue_event(self, action: str = "state_changed"):
|
||||||
|
updated_queue: Optional[List[Dict[str, Any]]] = None
|
||||||
|
if action != "state_changed":
|
||||||
|
updated_queue = self._job_map_to_list()
|
||||||
|
event_loop = self.server.get_event_loop()
|
||||||
|
event_loop.delay_callback(
|
||||||
|
.05, self.server.send_event, "job_queue:job_queue_changed",
|
||||||
|
{
|
||||||
|
'action': action,
|
||||||
|
'updated_queue': updated_queue,
|
||||||
|
'queue_state': self.queue_state
|
||||||
|
})
|
||||||
|
|
||||||
async def _handle_job_request(self,
|
async def _handle_job_request(self,
|
||||||
web_request: WebRequest
|
web_request: WebRequest
|
||||||
) -> Dict[str, Any]:
|
) -> Dict[str, Any]:
|
||||||
|
@ -203,22 +250,16 @@ class JobQueue:
|
||||||
if isinstance(files, str):
|
if isinstance(files, str):
|
||||||
files = [f.strip() for f in files.split(',') if f.strip()]
|
files = [f.strip() for f in files.split(',') if f.strip()]
|
||||||
# Validate that all files exist before queueing
|
# Validate that all files exist before queueing
|
||||||
for fname in files:
|
await self.queue_job(files)
|
||||||
self._check_job_file(fname)
|
|
||||||
for fname in files:
|
|
||||||
await self.queue_job(fname, check_exists=False)
|
|
||||||
elif action == "DELETE":
|
elif action == "DELETE":
|
||||||
if web_request.get_boolean("all", False):
|
if web_request.get_boolean("all", False):
|
||||||
async with self.lock:
|
await self.delete_job([], all=True)
|
||||||
self.queued_jobs.clear()
|
|
||||||
else:
|
else:
|
||||||
job_ids: Union[List[str], str] = web_request.get('job_ids')
|
job_ids: Union[List[str], str] = web_request.get('job_ids')
|
||||||
if isinstance(job_ids, str):
|
if isinstance(job_ids, str):
|
||||||
job_ids = [f.strip() for f in job_ids.split(',')
|
job_ids = [f.strip() for f in job_ids.split(',')
|
||||||
if f.strip()]
|
if f.strip()]
|
||||||
async with self.lock:
|
await self.delete_job(job_ids)
|
||||||
for uid in job_ids:
|
|
||||||
self.queued_jobs.pop(uid, None)
|
|
||||||
else:
|
else:
|
||||||
raise self.server.error(f"Invalid action: {action}")
|
raise self.server.error(f"Invalid action: {action}")
|
||||||
return {
|
return {
|
||||||
|
|
Loading…
Reference in New Issue