history: use tables for history and totals storage
Signed-off-by: Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
parent
80c762074f
commit
3f0d20ed8c
|
@ -1,5 +1,7 @@
|
||||||
# History cache for printer jobs
|
# History cache for printer jobs
|
||||||
#
|
#
|
||||||
|
# Copyright (C) 2024 Eric Callahan <arksine.code@gmail.com>
|
||||||
|
#
|
||||||
# This file may be distributed under the terms of the GNU GPLv3 license.
|
# This file may be distributed under the terms of the GNU GPLv3 license.
|
||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
@ -10,7 +12,8 @@ from ..common import (
|
||||||
JobEvent,
|
JobEvent,
|
||||||
RequestType,
|
RequestType,
|
||||||
HistoryFieldData,
|
HistoryFieldData,
|
||||||
FieldTracker
|
FieldTracker,
|
||||||
|
SqlTableDefinition
|
||||||
)
|
)
|
||||||
|
|
||||||
# Annotation imports
|
# Annotation imports
|
||||||
|
@ -20,7 +23,8 @@ from typing import (
|
||||||
Union,
|
Union,
|
||||||
Optional,
|
Optional,
|
||||||
Dict,
|
Dict,
|
||||||
List
|
List,
|
||||||
|
Tuple
|
||||||
)
|
)
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
|
@ -29,13 +33,10 @@ if TYPE_CHECKING:
|
||||||
from .database import MoonrakerDatabase as DBComp
|
from .database import MoonrakerDatabase as DBComp
|
||||||
from .job_state import JobState
|
from .job_state import JobState
|
||||||
from .file_manager.file_manager import FileManager
|
from .file_manager.file_manager import FileManager
|
||||||
|
from .database import DBProviderWrapper
|
||||||
Totals = Dict[str, Union[float, int]]
|
Totals = Dict[str, Union[float, int]]
|
||||||
AuxTotals = List[Dict[str, Any]]
|
AuxTotals = List[Dict[str, Any]]
|
||||||
|
|
||||||
|
|
||||||
HIST_NAMESPACE = "history"
|
|
||||||
HIST_VERSION = 1
|
|
||||||
MAX_JOBS = 10000
|
|
||||||
BASE_TOTALS = {
|
BASE_TOTALS = {
|
||||||
"total_jobs": 0,
|
"total_jobs": 0,
|
||||||
"total_time": 0.,
|
"total_time": 0.,
|
||||||
|
@ -44,20 +45,142 @@ BASE_TOTALS = {
|
||||||
"longest_job": 0.,
|
"longest_job": 0.,
|
||||||
"longest_print": 0.
|
"longest_print": 0.
|
||||||
}
|
}
|
||||||
|
HIST_TABLE = "job_history"
|
||||||
|
TOTALS_TABLE = "job_totals"
|
||||||
|
|
||||||
|
def _create_totals_list(
|
||||||
|
job_totals: Dict[str, Any],
|
||||||
|
aux_totals: List[Dict[str, Any]],
|
||||||
|
instance: str = "default"
|
||||||
|
) -> List[Tuple[str, str, Any, Any, str]]:
|
||||||
|
"""
|
||||||
|
Returns a list of Tuples formatted for SQL Database insertion.
|
||||||
|
|
||||||
|
Fields of each tuple are in the following order:
|
||||||
|
provider, field, maximum, total, instance_id
|
||||||
|
"""
|
||||||
|
totals_list: List[Tuple[str, str, Any, Any, str]] = []
|
||||||
|
for key, value in job_totals.items():
|
||||||
|
total = value if key.startswith("total_") else None
|
||||||
|
maximum = value if total is None else None
|
||||||
|
totals_list.append(("history", key, maximum, total, instance))
|
||||||
|
for item in aux_totals:
|
||||||
|
totals_list.append(
|
||||||
|
(
|
||||||
|
item["provider"],
|
||||||
|
item["field"],
|
||||||
|
item["maximum"],
|
||||||
|
item["total"],
|
||||||
|
instance
|
||||||
|
)
|
||||||
|
)
|
||||||
|
return totals_list
|
||||||
|
|
||||||
|
class TotalsSqlDefinition(SqlTableDefinition):
|
||||||
|
name = TOTALS_TABLE
|
||||||
|
prototype = (
|
||||||
|
f"""
|
||||||
|
{TOTALS_TABLE} (
|
||||||
|
provider TEXT NOT NULL,
|
||||||
|
field TEXT NOT NULL,
|
||||||
|
maximum REAL,
|
||||||
|
total REAL,
|
||||||
|
instance_id TEXT NOT NULL,
|
||||||
|
PRIMARY KEY (provider, field, instance_id)
|
||||||
|
)
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
version = 1
|
||||||
|
|
||||||
|
def migrate(self, last_version: int, db_provider: DBProviderWrapper) -> None:
|
||||||
|
if last_version == 0:
|
||||||
|
# Migrate from "moonraker" namespace to a table
|
||||||
|
logging.info("Migrating history totals from moonraker namespace...")
|
||||||
|
hist_ns: Dict[str, Any] = db_provider.get_item("moonraker", "history", {})
|
||||||
|
job_totals: Dict[str, Any] = hist_ns.get("job_totals", BASE_TOTALS)
|
||||||
|
aux_totals: List[Dict[str, Any]] = hist_ns.get("aux_totals", [])
|
||||||
|
totals_list = _create_totals_list(job_totals, aux_totals)
|
||||||
|
sql_conn = db_provider.connection
|
||||||
|
with sql_conn:
|
||||||
|
sql_conn.executemany(
|
||||||
|
f"INSERT OR IGNORE INTO {TOTALS_TABLE} VALUES(?, ?, ?, ?, ?)",
|
||||||
|
totals_list
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
db_provider.delete_item("moonraker", "history")
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
class HistorySqlDefinition(SqlTableDefinition):
|
||||||
|
name = HIST_TABLE
|
||||||
|
prototype = (
|
||||||
|
f"""
|
||||||
|
{HIST_TABLE} (
|
||||||
|
job_id INTEGER PRIMARY KEY ASC,
|
||||||
|
user TEXT NOT NULL,
|
||||||
|
filename TEXT,
|
||||||
|
status TEXT NOT NULL,
|
||||||
|
start_time REAL NOT NULL,
|
||||||
|
end_time REAL,
|
||||||
|
print_duration REAL NOT NULL,
|
||||||
|
total_duration REAL NOT NULL,
|
||||||
|
filament_used REAL NOT NULL,
|
||||||
|
metadata pyjson,
|
||||||
|
auxiliary_data pyjson NOT NULL,
|
||||||
|
instance_id TEXT NOT NULL
|
||||||
|
)
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
version = 1
|
||||||
|
|
||||||
|
def migrate(self, last_version: int, db_provider: DBProviderWrapper) -> None:
|
||||||
|
if last_version == 0:
|
||||||
|
conn = db_provider.connection
|
||||||
|
for batch in db_provider.iter_namespace("history", 1000):
|
||||||
|
conv_vals: List[Tuple[Any, ...]] = []
|
||||||
|
entry: Dict[str, Any]
|
||||||
|
for entry in batch.values():
|
||||||
|
try:
|
||||||
|
conv_vals.append(
|
||||||
|
(
|
||||||
|
None,
|
||||||
|
entry.get("user", "No User"),
|
||||||
|
entry["filename"],
|
||||||
|
entry["status"],
|
||||||
|
entry["start_time"],
|
||||||
|
entry["end_time"],
|
||||||
|
entry["print_duration"],
|
||||||
|
entry["total_duration"],
|
||||||
|
entry["filament_used"],
|
||||||
|
entry["metadata"],
|
||||||
|
entry.get("auxiliary_data", []),
|
||||||
|
"default"
|
||||||
|
)
|
||||||
|
)
|
||||||
|
except KeyError:
|
||||||
|
continue
|
||||||
|
if not conv_vals:
|
||||||
|
continue
|
||||||
|
placeholders = ",".join("?" * len(conv_vals[0]))
|
||||||
|
with conn:
|
||||||
|
conn.executemany(
|
||||||
|
f"INSERT INTO {HIST_TABLE} VALUES({placeholders})",
|
||||||
|
conv_vals
|
||||||
|
)
|
||||||
|
db_provider.wipe_local_namespace("history")
|
||||||
|
|
||||||
class History:
|
class History:
|
||||||
def __init__(self, config: ConfigHelper) -> None:
|
def __init__(self, config: ConfigHelper) -> None:
|
||||||
self.server = config.get_server()
|
self.server = config.get_server()
|
||||||
self.file_manager: FileManager = self.server.lookup_component(
|
self.file_manager: FileManager = self.server.lookup_component('file_manager')
|
||||||
'file_manager')
|
|
||||||
self.request_lock = Lock()
|
self.request_lock = Lock()
|
||||||
FieldTracker.class_init(self)
|
FieldTracker.class_init(self)
|
||||||
self.auxiliary_fields: List[HistoryFieldData] = []
|
self.auxiliary_fields: List[HistoryFieldData] = []
|
||||||
database: DBComp = self.server.lookup_component("database")
|
database: DBComp = self.server.lookup_component("database")
|
||||||
hist_info: Dict[str, Any]
|
self.history_table = database.register_table(HistorySqlDefinition())
|
||||||
hist_info = database.get_item("moonraker", "history", {}).result()
|
self.totals_table = database.register_table(TotalsSqlDefinition())
|
||||||
self.job_totals: Totals = hist_info.get("job_totals", dict(BASE_TOTALS))
|
self.job_totals: Totals = dict(BASE_TOTALS)
|
||||||
self.aux_totals: AuxTotals = hist_info.get("aux_totals", [])
|
self.aux_totals: AuxTotals = []
|
||||||
|
|
||||||
self.server.register_event_handler(
|
self.server.register_event_handler(
|
||||||
"server:klippy_disconnect", self._handle_disconnect)
|
"server:klippy_disconnect", self._handle_disconnect)
|
||||||
|
@ -84,41 +207,43 @@ class History:
|
||||||
self._handle_job_total_reset
|
self._handle_job_total_reset
|
||||||
)
|
)
|
||||||
|
|
||||||
database.register_local_namespace(HIST_NAMESPACE)
|
|
||||||
self.history_ns = database.wrap_namespace(HIST_NAMESPACE,
|
|
||||||
parse_keys=False)
|
|
||||||
|
|
||||||
self.current_job: Optional[PrinterJob] = None
|
self.current_job: Optional[PrinterJob] = None
|
||||||
self.current_job_id: Optional[str] = None
|
self.current_job_id: Optional[int] = None
|
||||||
self.job_user: str = "No User"
|
self.job_user: str = "No User"
|
||||||
self.job_paused: bool = False
|
self.job_paused: bool = False
|
||||||
self.next_job_id: int = 0
|
|
||||||
self.cached_job_ids = self.history_ns.keys().result()
|
|
||||||
if self.cached_job_ids:
|
|
||||||
self.next_job_id = int(self.cached_job_ids[-1], 16) + 1
|
|
||||||
|
|
||||||
async def component_init(self) -> None:
|
async def component_init(self) -> None:
|
||||||
# Check for interupted jobs. If this is the first time, check
|
# Popluate totals
|
||||||
# the entire database. Otherwise only check the last 20 jobs.
|
valid_aux_totals = [
|
||||||
interrupted_jobs: Dict[str, Any] = {}
|
(item.provider, item.name) for item in self.auxiliary_fields
|
||||||
database: DBComp = self.server.lookup_component("database")
|
if item.has_totals()
|
||||||
version: int = await database.get_item("moonraker", "history.version", 0)
|
]
|
||||||
if version != HIST_VERSION:
|
cursor = await self.totals_table.execute(f"SELECT * from {TOTALS_TABLE}")
|
||||||
await database.insert_item("moonraker", "history.version", HIST_VERSION)
|
await cursor.set_arraysize(200)
|
||||||
job_ids = self.cached_job_ids if version < 1 else self.cached_job_ids[-20:]
|
for row in await cursor.fetchall():
|
||||||
jobs: Dict[str, Dict[str, Any]]
|
provider, field, maximum, total, _ = tuple(row)
|
||||||
jobs = await self.history_ns.get_batch(job_ids)
|
if provider == "history":
|
||||||
for jid, job_data in jobs.items():
|
self.job_totals[field] = total if maximum is None else maximum
|
||||||
if job_data.get("status", "") == "in_progress":
|
elif (provider, field) in valid_aux_totals:
|
||||||
job_data["status"] = "interrupted"
|
item = dict(row)
|
||||||
interrupted_jobs[jid] = job_data
|
item.pop("instance_id", None)
|
||||||
|
self.aux_totals.append(item)
|
||||||
|
# Check for interupted jobs
|
||||||
|
cursor = await self.history_table.execute(
|
||||||
|
f"SELECT job_id FROM {HIST_TABLE} WHERE status = 'in_progress'"
|
||||||
|
)
|
||||||
|
interrupted_jobs: List[int] = [row[0] for row in await cursor.fetchall()]
|
||||||
if interrupted_jobs:
|
if interrupted_jobs:
|
||||||
|
async with self.history_table as tx:
|
||||||
|
await tx.execute(
|
||||||
|
f"UPDATE {HIST_TABLE} SET status = 'interrupted' "
|
||||||
|
"WHERE status = 'in_progress'"
|
||||||
|
)
|
||||||
self.server.add_log_rollover_item(
|
self.server.add_log_rollover_item(
|
||||||
"interrupted_history",
|
"interrupted_history",
|
||||||
"The following jobs were detected as interrupted: "
|
"The following jobs were detected as interrupted: "
|
||||||
f"{list(interrupted_jobs.keys())}"
|
f"{interrupted_jobs}"
|
||||||
)
|
)
|
||||||
await self.history_ns.insert_batch(interrupted_jobs)
|
|
||||||
|
|
||||||
async def _handle_job_request(self,
|
async def _handle_job_request(self,
|
||||||
web_request: WebRequest
|
web_request: WebRequest
|
||||||
|
@ -127,24 +252,37 @@ class History:
|
||||||
req_type = web_request.get_request_type()
|
req_type = web_request.get_request_type()
|
||||||
if req_type == RequestType.GET:
|
if req_type == RequestType.GET:
|
||||||
job_id = web_request.get_str("uid")
|
job_id = web_request.get_str("uid")
|
||||||
if job_id not in self.cached_job_ids:
|
cursor = await self.history_table.execute(
|
||||||
|
f"SELECT * FROM {HIST_TABLE} WHERE job_id = ?", (int(job_id, 16),)
|
||||||
|
)
|
||||||
|
result = await cursor.fetchone()
|
||||||
|
if result is None:
|
||||||
raise self.server.error(f"Invalid job uid: {job_id}", 404)
|
raise self.server.error(f"Invalid job uid: {job_id}", 404)
|
||||||
job = await self.history_ns[job_id]
|
job = dict(result)
|
||||||
return {"job": self._prep_requested_job(job, job_id)}
|
return {"job": self._prep_requested_job(job, job_id)}
|
||||||
if req_type == RequestType.DELETE:
|
if req_type == RequestType.DELETE:
|
||||||
all = web_request.get_boolean("all", False)
|
all = web_request.get_boolean("all", False)
|
||||||
if all:
|
if all:
|
||||||
deljobs = self.cached_job_ids
|
cursor = await self.history_table.execute(
|
||||||
self.history_ns.clear()
|
f"SELECT job_id FROM {HIST_TABLE} WHERE instance_id = ?",
|
||||||
self.cached_job_ids = []
|
("default",)
|
||||||
self.next_job_id = 0
|
)
|
||||||
|
await cursor.set_arraysize(1000)
|
||||||
|
deljobs = [f"{row[0]:06X}" for row in await cursor.fetchall()]
|
||||||
|
async with self.history_table as tx:
|
||||||
|
await tx.execute(
|
||||||
|
f"DELETE FROM {HIST_TABLE} WHERE instance_id = ?",
|
||||||
|
("default",)
|
||||||
|
)
|
||||||
return {'deleted_jobs': deljobs}
|
return {'deleted_jobs': deljobs}
|
||||||
|
|
||||||
job_id = web_request.get_str("uid")
|
job_id = web_request.get_str("uid")
|
||||||
if job_id not in self.cached_job_ids:
|
async with self.history_table as tx:
|
||||||
|
cursor = await tx.execute(
|
||||||
|
f"DELETE FROM {HIST_TABLE} WHERE job_id = ?", (int(job_id, 16),)
|
||||||
|
)
|
||||||
|
if cursor.rowcount < 1:
|
||||||
raise self.server.error(f"Invalid job uid: {job_id}", 404)
|
raise self.server.error(f"Invalid job uid: {job_id}", 404)
|
||||||
|
|
||||||
self.delete_job(job_id)
|
|
||||||
return {'deleted_jobs': [job_id]}
|
return {'deleted_jobs': [job_id]}
|
||||||
raise self.server.error("Invalid Request Method")
|
raise self.server.error("Invalid Request Method")
|
||||||
|
|
||||||
|
@ -152,59 +290,36 @@ class History:
|
||||||
web_request: WebRequest
|
web_request: WebRequest
|
||||||
) -> Dict[str, Any]:
|
) -> Dict[str, Any]:
|
||||||
async with self.request_lock:
|
async with self.request_lock:
|
||||||
i = 0
|
|
||||||
count = 0
|
|
||||||
end_num = len(self.cached_job_ids)
|
|
||||||
jobs: List[Dict[str, Any]] = []
|
|
||||||
start_num = 0
|
|
||||||
|
|
||||||
before = web_request.get_float("before", -1)
|
before = web_request.get_float("before", -1)
|
||||||
since = web_request.get_float("since", -1)
|
since = web_request.get_float("since", -1)
|
||||||
limit = web_request.get_int("limit", 50)
|
limit = web_request.get_int("limit", 50)
|
||||||
start = web_request.get_int("start", 0)
|
start = web_request.get_int("start", 0)
|
||||||
order = web_request.get_str("order", "desc")
|
order = web_request.get_str("order", "desc").upper()
|
||||||
|
|
||||||
if order not in ["asc", "desc"]:
|
if order not in ["ASC", "DESC"]:
|
||||||
raise self.server.error(f"Invalid `order` value: {order}", 400)
|
raise self.server.error(f"Invalid `order` value: {order}", 400)
|
||||||
|
# Build SQL Select Statement
|
||||||
reverse_order = (order == "desc")
|
values: List[Any] = ["default"]
|
||||||
|
sql_statement = f"SELECT * FROM {HIST_TABLE} WHERE instance_id = ?"
|
||||||
# cached jobs is asc order, find lower and upper boundary
|
|
||||||
if since != -1:
|
|
||||||
while start_num < end_num:
|
|
||||||
job_id = self.cached_job_ids[start_num]
|
|
||||||
job: Dict[str, Any] = await self.history_ns[job_id]
|
|
||||||
if job['start_time'] > since:
|
|
||||||
break
|
|
||||||
start_num += 1
|
|
||||||
|
|
||||||
if before != -1:
|
if before != -1:
|
||||||
while end_num > 0:
|
sql_statement += " and end_time < ?"
|
||||||
job_id = self.cached_job_ids[end_num-1]
|
values.append(before)
|
||||||
job = await self.history_ns[job_id]
|
if since != -1:
|
||||||
if job['end_time'] < before:
|
sql_statement += " and start_time > ?"
|
||||||
break
|
values.append(since)
|
||||||
end_num -= 1
|
sql_statement += f" ORDER BY job_id {order}"
|
||||||
|
if limit > 0:
|
||||||
if start_num >= end_num or end_num == 0:
|
sql_statement += " LIMIT ? OFFSET ?"
|
||||||
return {"count": 0, "jobs": []}
|
values.append(limit)
|
||||||
|
values.append(start)
|
||||||
i = start
|
cursor = await self.history_table.execute(sql_statement, values)
|
||||||
count = end_num - start_num
|
await cursor.set_arraysize(1000)
|
||||||
|
jobs: List[Dict[str, Any]] = []
|
||||||
if limit == 0:
|
for row in await cursor.fetchall():
|
||||||
limit = MAX_JOBS
|
job = dict(row)
|
||||||
|
job_id = f"{row['job_id']:06X}"
|
||||||
while i < count and len(jobs) < limit:
|
|
||||||
if reverse_order:
|
|
||||||
job_id = self.cached_job_ids[end_num - i - 1]
|
|
||||||
else:
|
|
||||||
job_id = self.cached_job_ids[start_num + i]
|
|
||||||
job = await self.history_ns[job_id]
|
|
||||||
jobs.append(self._prep_requested_job(job, job_id))
|
jobs.append(self._prep_requested_job(job, job_id))
|
||||||
i += 1
|
return {"count": len(jobs), "jobs": jobs}
|
||||||
|
|
||||||
return {"count": count, "jobs": jobs}
|
|
||||||
|
|
||||||
async def _handle_job_totals(
|
async def _handle_job_totals(
|
||||||
self, web_request: WebRequest
|
self, web_request: WebRequest
|
||||||
|
@ -223,15 +338,20 @@ class History:
|
||||||
self.job_totals = dict(BASE_TOTALS)
|
self.job_totals = dict(BASE_TOTALS)
|
||||||
last_aux_totals = self.aux_totals
|
last_aux_totals = self.aux_totals
|
||||||
self._update_aux_totals(reset=True)
|
self._update_aux_totals(reset=True)
|
||||||
database: DBComp = self.server.lookup_component("database")
|
totals_list = _create_totals_list(self.job_totals, self.aux_totals)
|
||||||
await database.insert_item("moonraker", "history.job_totals", self.job_totals)
|
async with self.totals_table as tx:
|
||||||
await database.insert_item("moonraker", "history.aux_totals", self.aux_totals)
|
await tx.execute(
|
||||||
|
f"DELETE FROM {TOTALS_TABLE} WHERE instance_id = ?", ("default",)
|
||||||
|
)
|
||||||
|
await tx.executemany(
|
||||||
|
f"INSERT INTO {TOTALS_TABLE} VALUES(?, ?, ?, ?, ?)", totals_list
|
||||||
|
)
|
||||||
return {
|
return {
|
||||||
"last_totals": last_totals,
|
"last_totals": last_totals,
|
||||||
"last_auxiliary_totals": last_aux_totals
|
"last_auxiliary_totals": last_aux_totals
|
||||||
}
|
}
|
||||||
|
|
||||||
def _on_job_state_changed(
|
async def _on_job_state_changed(
|
||||||
self,
|
self,
|
||||||
job_event: JobEvent,
|
job_event: JobEvent,
|
||||||
prev_stats: Dict[str, Any],
|
prev_stats: Dict[str, Any],
|
||||||
|
@ -241,16 +361,16 @@ class History:
|
||||||
if job_event == JobEvent.STARTED:
|
if job_event == JobEvent.STARTED:
|
||||||
if self.current_job is not None:
|
if self.current_job is not None:
|
||||||
# Finish with the previous state
|
# Finish with the previous state
|
||||||
self.finish_job("cancelled", prev_stats)
|
await self.finish_job("cancelled", prev_stats)
|
||||||
self.add_job(PrinterJob(new_stats))
|
await self.add_job(PrinterJob(new_stats))
|
||||||
elif job_event == JobEvent.COMPLETE:
|
elif job_event == JobEvent.COMPLETE:
|
||||||
self.finish_job("completed", new_stats)
|
await self.finish_job("completed", new_stats)
|
||||||
elif job_event == JobEvent.ERROR:
|
elif job_event == JobEvent.ERROR:
|
||||||
self.finish_job("error", new_stats)
|
await self.finish_job("error", new_stats)
|
||||||
elif job_event in (JobEvent.CANCELLED, JobEvent.STANDBY):
|
elif job_event in (JobEvent.CANCELLED, JobEvent.STANDBY):
|
||||||
# Cancel on "standby" for backward compatibility with
|
# Cancel on "standby" for backward compatibility with
|
||||||
# `CLEAR_PAUSE/SDCARD_RESET_FILE` workflow
|
# `CLEAR_PAUSE/SDCARD_RESET_FILE` workflow
|
||||||
self.finish_job("cancelled", prev_stats)
|
await self.finish_job("cancelled", prev_stats)
|
||||||
|
|
||||||
def _on_job_requested(self, user: Optional[Dict[str, Any]]) -> None:
|
def _on_job_requested(self, user: Optional[Dict[str, Any]]) -> None:
|
||||||
username = (user or {}).get("username", "No User")
|
username = (user or {}).get("username", "No User")
|
||||||
|
@ -258,92 +378,117 @@ class History:
|
||||||
if self.current_job is not None:
|
if self.current_job is not None:
|
||||||
self.current_job.user = username
|
self.current_job.user = username
|
||||||
|
|
||||||
def _handle_shutdown(self) -> None:
|
async def _handle_shutdown(self) -> None:
|
||||||
jstate: JobState = self.server.lookup_component("job_state")
|
jstate: JobState = self.server.lookup_component("job_state")
|
||||||
last_ps = jstate.get_last_stats()
|
last_ps = jstate.get_last_stats()
|
||||||
self.finish_job("klippy_shutdown", last_ps)
|
await self.finish_job("klippy_shutdown", last_ps)
|
||||||
|
|
||||||
def _handle_disconnect(self) -> None:
|
async def _handle_disconnect(self) -> None:
|
||||||
jstate: JobState = self.server.lookup_component("job_state")
|
jstate: JobState = self.server.lookup_component("job_state")
|
||||||
last_ps = jstate.get_last_stats()
|
last_ps = jstate.get_last_stats()
|
||||||
self.finish_job("klippy_disconnect", last_ps)
|
await self.finish_job("klippy_disconnect", last_ps)
|
||||||
|
|
||||||
def add_job(self, job: PrinterJob) -> None:
|
async def add_job(self, job: PrinterJob) -> None:
|
||||||
if len(self.cached_job_ids) >= MAX_JOBS:
|
async with self.request_lock:
|
||||||
self.delete_job(self.cached_job_ids[0])
|
self.current_job = job
|
||||||
job_id = f"{self.next_job_id:06X}"
|
self.current_job_id = None
|
||||||
self.current_job = job
|
self.current_job.user = self.job_user
|
||||||
self.current_job_id = job_id
|
self.grab_job_metadata()
|
||||||
self.current_job.user = self.job_user
|
for field in self.auxiliary_fields:
|
||||||
self.grab_job_metadata()
|
field.tracker.reset()
|
||||||
for field in self.auxiliary_fields:
|
self.current_job.set_aux_data(self.auxiliary_fields)
|
||||||
field.tracker.reset()
|
new_id = await self.save_job(job, None)
|
||||||
self.current_job.set_aux_data(self.auxiliary_fields)
|
if new_id is None:
|
||||||
self.history_ns[job_id] = job.get_stats()
|
logging.info(f"Error saving job, filename '{job.filename}'")
|
||||||
self.cached_job_ids.append(job_id)
|
return
|
||||||
self.next_job_id += 1
|
self.current_job_id = new_id
|
||||||
logging.debug(
|
job_id = f"{new_id:06X}"
|
||||||
f"History Job Added - Id: {job_id}, File: {job.filename}"
|
self.update_metadata(job_id)
|
||||||
)
|
logging.debug(
|
||||||
self.send_history_event("added")
|
f"History Job Added - Id: {job_id}, File: {job.filename}"
|
||||||
|
)
|
||||||
|
self.send_history_event("added")
|
||||||
|
|
||||||
def delete_job(self, job_id: Union[int, str]) -> None:
|
async def save_job(self, job: PrinterJob, job_id: Optional[int]) -> Optional[int]:
|
||||||
if isinstance(job_id, int):
|
values: List[Any] = [
|
||||||
job_id = f"{job_id:06X}"
|
job_id,
|
||||||
|
job.user,
|
||||||
|
job.filename,
|
||||||
|
job.status,
|
||||||
|
job.start_time,
|
||||||
|
job.end_time,
|
||||||
|
job.print_duration,
|
||||||
|
job.total_duration,
|
||||||
|
job.filament_used,
|
||||||
|
job.metadata,
|
||||||
|
job.auxiliary_data,
|
||||||
|
"default"
|
||||||
|
]
|
||||||
|
placeholders = ",".join("?" * len(values))
|
||||||
|
async with self.history_table as tx:
|
||||||
|
cursor = await tx.execute(
|
||||||
|
f"REPLACE INTO {HIST_TABLE} VALUES({placeholders})", values
|
||||||
|
)
|
||||||
|
return cursor.lastrowid
|
||||||
|
|
||||||
if job_id in self.cached_job_ids:
|
async def delete_job(self, job_id: Union[int, str]) -> None:
|
||||||
del self.history_ns[job_id]
|
if isinstance(job_id, str):
|
||||||
self.cached_job_ids.remove(job_id)
|
job_id = int(job_id, 16)
|
||||||
|
async with self.history_table as tx:
|
||||||
|
tx.execute(
|
||||||
|
f"DELETE FROM {HIST_TABLE} WHERE job_id = ?", (job_id,)
|
||||||
|
)
|
||||||
|
|
||||||
def finish_job(self, status: str, pstats: Dict[str, Any]) -> None:
|
async def finish_job(self, status: str, pstats: Dict[str, Any]) -> None:
|
||||||
if self.current_job is None:
|
async with self.request_lock:
|
||||||
return
|
if self.current_job is None or self.current_job_id is None:
|
||||||
cj = self.current_job
|
self._reset_current_job()
|
||||||
if (
|
return
|
||||||
pstats.get('filename') != cj.get('filename') or
|
if (
|
||||||
pstats.get('total_duration', 0.) < cj.get('total_duration')
|
pstats.get('filename') != self.current_job.filename or
|
||||||
):
|
pstats.get('total_duration', 0.) < self.current_job.total_duration
|
||||||
# Print stats have been reset, do not update this job with them
|
):
|
||||||
pstats = {}
|
# Print stats have been reset, do not update this job with them
|
||||||
|
pstats = {}
|
||||||
|
self.current_job.user = self.job_user
|
||||||
|
self.current_job.finish(status, pstats)
|
||||||
|
# Regrab metadata incase metadata wasn't parsed yet due to file upload
|
||||||
|
self.grab_job_metadata()
|
||||||
|
self.current_job.set_aux_data(self.auxiliary_fields)
|
||||||
|
job_id = f"{self.current_job_id:06X}"
|
||||||
|
await self.save_job(self.current_job, self.current_job_id)
|
||||||
|
self.update_metadata(job_id)
|
||||||
|
await self._update_job_totals()
|
||||||
|
logging.debug(
|
||||||
|
f"History Job Finished - Id: {job_id}, "
|
||||||
|
f"File: {self.current_job.filename}, "
|
||||||
|
f"Status: {status}"
|
||||||
|
)
|
||||||
|
self.send_history_event("finished")
|
||||||
|
self._reset_current_job()
|
||||||
|
|
||||||
self.current_job.user = self.job_user
|
def _reset_current_job(self) -> None:
|
||||||
self.current_job.finish(status, pstats)
|
|
||||||
# Regrab metadata incase metadata wasn't parsed yet due to file upload
|
|
||||||
self.grab_job_metadata()
|
|
||||||
self.current_job.set_aux_data(self.auxiliary_fields)
|
|
||||||
self.save_current_job()
|
|
||||||
self._update_job_totals()
|
|
||||||
logging.debug(
|
|
||||||
f"History Job Finished - Id: {self.current_job_id}, "
|
|
||||||
f"File: {self.current_job.filename}, "
|
|
||||||
f"Status: {status}"
|
|
||||||
)
|
|
||||||
self.send_history_event("finished")
|
|
||||||
self.current_job = None
|
self.current_job = None
|
||||||
self.current_job_id = None
|
self.current_job_id = None
|
||||||
self.job_user = "No User"
|
self.job_user = "No User"
|
||||||
|
|
||||||
async def get_job(self,
|
async def get_job(
|
||||||
job_id: Union[int, str]
|
self, job_id: Union[int, str]
|
||||||
) -> Optional[Dict[str, Any]]:
|
) -> Optional[Dict[str, Any]]:
|
||||||
if isinstance(job_id, int):
|
if isinstance(job_id, str):
|
||||||
job_id = f"{job_id:06X}"
|
job_id = int(job_id, 16)
|
||||||
return await self.history_ns.get(job_id, None)
|
cursor = await self.history_table.execute(
|
||||||
|
f"SELECT * FROM {HIST_TABLE} WHERE job_id = ?", (job_id,)
|
||||||
|
)
|
||||||
|
result = await cursor.fetchone()
|
||||||
|
return dict(result) if result is not None else result
|
||||||
|
|
||||||
def grab_job_metadata(self) -> None:
|
def grab_job_metadata(self) -> None:
|
||||||
if self.current_job is None:
|
if self.current_job is None:
|
||||||
return
|
return
|
||||||
filename: str = self.current_job.get("filename")
|
filename: str = self.current_job.filename
|
||||||
mdst = self.file_manager.get_metadata_storage()
|
mdst = self.file_manager.get_metadata_storage()
|
||||||
metadata: Dict[str, Any] = mdst.get(filename, {})
|
metadata: Dict[str, Any] = mdst.get(filename, {})
|
||||||
if metadata:
|
|
||||||
# Add the start time and job id to the
|
|
||||||
# persistent metadata storage
|
|
||||||
metadata.update({
|
|
||||||
'print_start_time': self.current_job.get('start_time'),
|
|
||||||
'job_id': self.current_job_id
|
|
||||||
})
|
|
||||||
mdst.insert(filename, metadata.copy())
|
|
||||||
# We don't need to store these fields in the
|
# We don't need to store these fields in the
|
||||||
# job metadata, as they are redundant
|
# job metadata, as they are redundant
|
||||||
metadata.pop('print_start_time', None)
|
metadata.pop('print_start_time', None)
|
||||||
|
@ -352,14 +497,24 @@ class History:
|
||||||
thumb: Dict[str, Any]
|
thumb: Dict[str, Any]
|
||||||
for thumb in metadata['thumbnails']:
|
for thumb in metadata['thumbnails']:
|
||||||
thumb.pop('data', None)
|
thumb.pop('data', None)
|
||||||
self.current_job.set("metadata", metadata)
|
self.current_job.metadata = metadata
|
||||||
|
|
||||||
def save_current_job(self) -> None:
|
def update_metadata(self, job_id: str) -> None:
|
||||||
if self.current_job is None or self.current_job_id is None:
|
if self.current_job is None:
|
||||||
return
|
return
|
||||||
self.history_ns[self.current_job_id] = self.current_job.get_stats()
|
mdst = self.file_manager.get_metadata_storage()
|
||||||
|
filename: str = self.current_job.filename
|
||||||
|
metadata: Dict[str, Any] = mdst.get(filename, {})
|
||||||
|
if metadata:
|
||||||
|
# Add the start time and job id to the
|
||||||
|
# persistent metadata storage
|
||||||
|
metadata.update({
|
||||||
|
'print_start_time': self.current_job.get('start_time'),
|
||||||
|
'job_id': job_id
|
||||||
|
})
|
||||||
|
mdst.insert(filename, metadata)
|
||||||
|
|
||||||
def _update_job_totals(self) -> None:
|
async def _update_job_totals(self) -> None:
|
||||||
if self.current_job is None:
|
if self.current_job is None:
|
||||||
return
|
return
|
||||||
job = self.current_job
|
job = self.current_job
|
||||||
|
@ -370,9 +525,11 @@ class History:
|
||||||
self._maximize_total("longest_job", job.total_duration)
|
self._maximize_total("longest_job", job.total_duration)
|
||||||
self._maximize_total("longest_print", job.print_duration)
|
self._maximize_total("longest_print", job.print_duration)
|
||||||
self._update_aux_totals()
|
self._update_aux_totals()
|
||||||
database: DBComp = self.server.lookup_component("database")
|
totals_list = _create_totals_list(self.job_totals, self.aux_totals)
|
||||||
database.insert_item("moonraker", "history.job_totals", self.job_totals)
|
async with self.totals_table as tx:
|
||||||
database.insert_item("moonraker", "history.aux_totals", self.aux_totals)
|
await tx.executemany(
|
||||||
|
f"REPLACE INTO {TOTALS_TABLE} VALUES(?, ?, ?, ?, ?)", totals_list
|
||||||
|
)
|
||||||
|
|
||||||
def _accumulate_total(self, field: str, val: Union[int, float]) -> None:
|
def _accumulate_total(self, field: str, val: Union[int, float]) -> None:
|
||||||
self.job_totals[field] += val
|
self.job_totals[field] += val
|
||||||
|
@ -391,23 +548,25 @@ class History:
|
||||||
def send_history_event(self, evt_action: str) -> None:
|
def send_history_event(self, evt_action: str) -> None:
|
||||||
if self.current_job is None or self.current_job_id is None:
|
if self.current_job is None or self.current_job_id is None:
|
||||||
return
|
return
|
||||||
job = self._prep_requested_job(
|
job_id = f"{self.current_job_id:06X}"
|
||||||
self.current_job.get_stats(), self.current_job_id)
|
job = self._prep_requested_job(self.current_job.get_stats(), job_id)
|
||||||
self.server.send_event("history:history_changed",
|
self.server.send_event(
|
||||||
{'action': evt_action, 'job': job})
|
"history:history_changed", {'action': evt_action, 'job': job}
|
||||||
|
|
||||||
def _prep_requested_job(self,
|
|
||||||
job: Dict[str, Any],
|
|
||||||
job_id: str
|
|
||||||
) -> Dict[str, Any]:
|
|
||||||
mtime = job.get("metadata", {}).get("modified", None)
|
|
||||||
job['job_id'] = job_id
|
|
||||||
job['exists'] = self.file_manager.check_file_exists(
|
|
||||||
"gcodes", job['filename'], mtime
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def _prep_requested_job(
|
||||||
|
self, job: Dict[str, Any], job_id: str
|
||||||
|
) -> Dict[str, Any]:
|
||||||
|
fm = self.file_manager
|
||||||
|
mtime = job.get("metadata", {}).get("modified", None)
|
||||||
|
job["exists"] = fm.check_file_exists("gcodes", job['filename'], mtime)
|
||||||
|
job["job_id"] = job_id
|
||||||
|
job.pop("instance_id", None)
|
||||||
return job
|
return job
|
||||||
|
|
||||||
def register_auxiliary_field(self, new_field: HistoryFieldData) -> None:
|
def register_auxiliary_field(self, new_field: HistoryFieldData) -> None:
|
||||||
|
if new_field.provider == "history":
|
||||||
|
raise self.server.error("Provider name 'history' is reserved")
|
||||||
for field in self.auxiliary_fields:
|
for field in self.auxiliary_fields:
|
||||||
if field == new_field:
|
if field == new_field:
|
||||||
raise self.server.error(
|
raise self.server.error(
|
||||||
|
@ -421,17 +580,19 @@ class History:
|
||||||
return False
|
return False
|
||||||
return not self.job_paused if check_paused else True
|
return not self.job_paused if check_paused else True
|
||||||
|
|
||||||
def on_exit(self) -> None:
|
async def on_exit(self) -> None:
|
||||||
|
if self.current_job is None:
|
||||||
|
return
|
||||||
jstate: JobState = self.server.lookup_component("job_state")
|
jstate: JobState = self.server.lookup_component("job_state")
|
||||||
last_ps = jstate.get_last_stats()
|
last_ps = jstate.get_last_stats()
|
||||||
self.finish_job("server_exit", last_ps)
|
await self.finish_job("server_exit", last_ps)
|
||||||
|
|
||||||
class PrinterJob:
|
class PrinterJob:
|
||||||
def __init__(self, data: Dict[str, Any] = {}) -> None:
|
def __init__(self, data: Dict[str, Any] = {}) -> None:
|
||||||
self.end_time: Optional[float] = None
|
self.end_time: Optional[float] = None
|
||||||
self.filament_used: float = 0
|
self.filament_used: float = 0
|
||||||
self.filename: Optional[str] = None
|
self.filename: str = ""
|
||||||
self.metadata: Optional[Dict[str, Any]] = None
|
self.metadata: Dict[str, Any] = {}
|
||||||
self.print_duration: float = 0.
|
self.print_duration: float = 0.
|
||||||
self.status: str = "in_progress"
|
self.status: str = "in_progress"
|
||||||
self.start_time = time.time()
|
self.start_time = time.time()
|
||||||
|
|
Loading…
Reference in New Issue