From 3f0d20ed8cca7f5244cf4e20b71a75d86cbd2d19 Mon Sep 17 00:00:00 2001 From: Eric Callahan Date: Wed, 8 May 2024 13:58:08 -0400 Subject: [PATCH] history: use tables for history and totals storage Signed-off-by: Eric Callahan --- moonraker/components/history.py | 543 +++++++++++++++++++++----------- 1 file changed, 352 insertions(+), 191 deletions(-) diff --git a/moonraker/components/history.py b/moonraker/components/history.py index c0e8680..e2ae3f0 100644 --- a/moonraker/components/history.py +++ b/moonraker/components/history.py @@ -1,5 +1,7 @@ # History cache for printer jobs # +# Copyright (C) 2024 Eric Callahan +# # This file may be distributed under the terms of the GNU GPLv3 license. from __future__ import annotations @@ -10,7 +12,8 @@ from ..common import ( JobEvent, RequestType, HistoryFieldData, - FieldTracker + FieldTracker, + SqlTableDefinition ) # Annotation imports @@ -20,7 +23,8 @@ from typing import ( Union, Optional, Dict, - List + List, + Tuple ) if TYPE_CHECKING: @@ -29,13 +33,10 @@ if TYPE_CHECKING: from .database import MoonrakerDatabase as DBComp from .job_state import JobState from .file_manager.file_manager import FileManager + from .database import DBProviderWrapper Totals = Dict[str, Union[float, int]] AuxTotals = List[Dict[str, Any]] - -HIST_NAMESPACE = "history" -HIST_VERSION = 1 -MAX_JOBS = 10000 BASE_TOTALS = { "total_jobs": 0, "total_time": 0., @@ -44,20 +45,142 @@ BASE_TOTALS = { "longest_job": 0., "longest_print": 0. } +HIST_TABLE = "job_history" +TOTALS_TABLE = "job_totals" + +def _create_totals_list( + job_totals: Dict[str, Any], + aux_totals: List[Dict[str, Any]], + instance: str = "default" +) -> List[Tuple[str, str, Any, Any, str]]: + """ + Returns a list of Tuples formatted for SQL Database insertion. + + Fields of each tuple are in the following order: + provider, field, maximum, total, instance_id + """ + totals_list: List[Tuple[str, str, Any, Any, str]] = [] + for key, value in job_totals.items(): + total = value if key.startswith("total_") else None + maximum = value if total is None else None + totals_list.append(("history", key, maximum, total, instance)) + for item in aux_totals: + totals_list.append( + ( + item["provider"], + item["field"], + item["maximum"], + item["total"], + instance + ) + ) + return totals_list + +class TotalsSqlDefinition(SqlTableDefinition): + name = TOTALS_TABLE + prototype = ( + f""" + {TOTALS_TABLE} ( + provider TEXT NOT NULL, + field TEXT NOT NULL, + maximum REAL, + total REAL, + instance_id TEXT NOT NULL, + PRIMARY KEY (provider, field, instance_id) + ) + """ + ) + version = 1 + + def migrate(self, last_version: int, db_provider: DBProviderWrapper) -> None: + if last_version == 0: + # Migrate from "moonraker" namespace to a table + logging.info("Migrating history totals from moonraker namespace...") + hist_ns: Dict[str, Any] = db_provider.get_item("moonraker", "history", {}) + job_totals: Dict[str, Any] = hist_ns.get("job_totals", BASE_TOTALS) + aux_totals: List[Dict[str, Any]] = hist_ns.get("aux_totals", []) + totals_list = _create_totals_list(job_totals, aux_totals) + sql_conn = db_provider.connection + with sql_conn: + sql_conn.executemany( + f"INSERT OR IGNORE INTO {TOTALS_TABLE} VALUES(?, ?, ?, ?, ?)", + totals_list + ) + try: + db_provider.delete_item("moonraker", "history") + except Exception: + pass + +class HistorySqlDefinition(SqlTableDefinition): + name = HIST_TABLE + prototype = ( + f""" + {HIST_TABLE} ( + job_id INTEGER PRIMARY KEY ASC, + user TEXT NOT NULL, + filename TEXT, + status TEXT NOT NULL, + start_time REAL NOT NULL, + end_time REAL, + print_duration REAL NOT NULL, + total_duration REAL NOT NULL, + filament_used REAL NOT NULL, + metadata pyjson, + auxiliary_data pyjson NOT NULL, + instance_id TEXT NOT NULL + ) + """ + ) + version = 1 + + def migrate(self, last_version: int, db_provider: DBProviderWrapper) -> None: + if last_version == 0: + conn = db_provider.connection + for batch in db_provider.iter_namespace("history", 1000): + conv_vals: List[Tuple[Any, ...]] = [] + entry: Dict[str, Any] + for entry in batch.values(): + try: + conv_vals.append( + ( + None, + entry.get("user", "No User"), + entry["filename"], + entry["status"], + entry["start_time"], + entry["end_time"], + entry["print_duration"], + entry["total_duration"], + entry["filament_used"], + entry["metadata"], + entry.get("auxiliary_data", []), + "default" + ) + ) + except KeyError: + continue + if not conv_vals: + continue + placeholders = ",".join("?" * len(conv_vals[0])) + with conn: + conn.executemany( + f"INSERT INTO {HIST_TABLE} VALUES({placeholders})", + conv_vals + ) + db_provider.wipe_local_namespace("history") class History: def __init__(self, config: ConfigHelper) -> None: self.server = config.get_server() - self.file_manager: FileManager = self.server.lookup_component( - 'file_manager') + self.file_manager: FileManager = self.server.lookup_component('file_manager') self.request_lock = Lock() FieldTracker.class_init(self) self.auxiliary_fields: List[HistoryFieldData] = [] database: DBComp = self.server.lookup_component("database") - hist_info: Dict[str, Any] - hist_info = database.get_item("moonraker", "history", {}).result() - self.job_totals: Totals = hist_info.get("job_totals", dict(BASE_TOTALS)) - self.aux_totals: AuxTotals = hist_info.get("aux_totals", []) + self.history_table = database.register_table(HistorySqlDefinition()) + self.totals_table = database.register_table(TotalsSqlDefinition()) + self.job_totals: Totals = dict(BASE_TOTALS) + self.aux_totals: AuxTotals = [] self.server.register_event_handler( "server:klippy_disconnect", self._handle_disconnect) @@ -84,41 +207,43 @@ class History: self._handle_job_total_reset ) - database.register_local_namespace(HIST_NAMESPACE) - self.history_ns = database.wrap_namespace(HIST_NAMESPACE, - parse_keys=False) - self.current_job: Optional[PrinterJob] = None - self.current_job_id: Optional[str] = None + self.current_job_id: Optional[int] = None self.job_user: str = "No User" self.job_paused: bool = False - self.next_job_id: int = 0 - self.cached_job_ids = self.history_ns.keys().result() - if self.cached_job_ids: - self.next_job_id = int(self.cached_job_ids[-1], 16) + 1 async def component_init(self) -> None: - # Check for interupted jobs. If this is the first time, check - # the entire database. Otherwise only check the last 20 jobs. - interrupted_jobs: Dict[str, Any] = {} - database: DBComp = self.server.lookup_component("database") - version: int = await database.get_item("moonraker", "history.version", 0) - if version != HIST_VERSION: - await database.insert_item("moonraker", "history.version", HIST_VERSION) - job_ids = self.cached_job_ids if version < 1 else self.cached_job_ids[-20:] - jobs: Dict[str, Dict[str, Any]] - jobs = await self.history_ns.get_batch(job_ids) - for jid, job_data in jobs.items(): - if job_data.get("status", "") == "in_progress": - job_data["status"] = "interrupted" - interrupted_jobs[jid] = job_data + # Popluate totals + valid_aux_totals = [ + (item.provider, item.name) for item in self.auxiliary_fields + if item.has_totals() + ] + cursor = await self.totals_table.execute(f"SELECT * from {TOTALS_TABLE}") + await cursor.set_arraysize(200) + for row in await cursor.fetchall(): + provider, field, maximum, total, _ = tuple(row) + if provider == "history": + self.job_totals[field] = total if maximum is None else maximum + elif (provider, field) in valid_aux_totals: + item = dict(row) + item.pop("instance_id", None) + self.aux_totals.append(item) + # Check for interupted jobs + cursor = await self.history_table.execute( + f"SELECT job_id FROM {HIST_TABLE} WHERE status = 'in_progress'" + ) + interrupted_jobs: List[int] = [row[0] for row in await cursor.fetchall()] if interrupted_jobs: + async with self.history_table as tx: + await tx.execute( + f"UPDATE {HIST_TABLE} SET status = 'interrupted' " + "WHERE status = 'in_progress'" + ) self.server.add_log_rollover_item( "interrupted_history", "The following jobs were detected as interrupted: " - f"{list(interrupted_jobs.keys())}" + f"{interrupted_jobs}" ) - await self.history_ns.insert_batch(interrupted_jobs) async def _handle_job_request(self, web_request: WebRequest @@ -127,24 +252,37 @@ class History: req_type = web_request.get_request_type() if req_type == RequestType.GET: job_id = web_request.get_str("uid") - if job_id not in self.cached_job_ids: + cursor = await self.history_table.execute( + f"SELECT * FROM {HIST_TABLE} WHERE job_id = ?", (int(job_id, 16),) + ) + result = await cursor.fetchone() + if result is None: raise self.server.error(f"Invalid job uid: {job_id}", 404) - job = await self.history_ns[job_id] + job = dict(result) return {"job": self._prep_requested_job(job, job_id)} if req_type == RequestType.DELETE: all = web_request.get_boolean("all", False) if all: - deljobs = self.cached_job_ids - self.history_ns.clear() - self.cached_job_ids = [] - self.next_job_id = 0 + cursor = await self.history_table.execute( + f"SELECT job_id FROM {HIST_TABLE} WHERE instance_id = ?", + ("default",) + ) + await cursor.set_arraysize(1000) + deljobs = [f"{row[0]:06X}" for row in await cursor.fetchall()] + async with self.history_table as tx: + await tx.execute( + f"DELETE FROM {HIST_TABLE} WHERE instance_id = ?", + ("default",) + ) return {'deleted_jobs': deljobs} job_id = web_request.get_str("uid") - if job_id not in self.cached_job_ids: + async with self.history_table as tx: + cursor = await tx.execute( + f"DELETE FROM {HIST_TABLE} WHERE job_id = ?", (int(job_id, 16),) + ) + if cursor.rowcount < 1: raise self.server.error(f"Invalid job uid: {job_id}", 404) - - self.delete_job(job_id) return {'deleted_jobs': [job_id]} raise self.server.error("Invalid Request Method") @@ -152,59 +290,36 @@ class History: web_request: WebRequest ) -> Dict[str, Any]: async with self.request_lock: - i = 0 - count = 0 - end_num = len(self.cached_job_ids) - jobs: List[Dict[str, Any]] = [] - start_num = 0 - before = web_request.get_float("before", -1) since = web_request.get_float("since", -1) limit = web_request.get_int("limit", 50) start = web_request.get_int("start", 0) - order = web_request.get_str("order", "desc") + order = web_request.get_str("order", "desc").upper() - if order not in ["asc", "desc"]: + if order not in ["ASC", "DESC"]: raise self.server.error(f"Invalid `order` value: {order}", 400) - - reverse_order = (order == "desc") - - # cached jobs is asc order, find lower and upper boundary - if since != -1: - while start_num < end_num: - job_id = self.cached_job_ids[start_num] - job: Dict[str, Any] = await self.history_ns[job_id] - if job['start_time'] > since: - break - start_num += 1 - + # Build SQL Select Statement + values: List[Any] = ["default"] + sql_statement = f"SELECT * FROM {HIST_TABLE} WHERE instance_id = ?" if before != -1: - while end_num > 0: - job_id = self.cached_job_ids[end_num-1] - job = await self.history_ns[job_id] - if job['end_time'] < before: - break - end_num -= 1 - - if start_num >= end_num or end_num == 0: - return {"count": 0, "jobs": []} - - i = start - count = end_num - start_num - - if limit == 0: - limit = MAX_JOBS - - while i < count and len(jobs) < limit: - if reverse_order: - job_id = self.cached_job_ids[end_num - i - 1] - else: - job_id = self.cached_job_ids[start_num + i] - job = await self.history_ns[job_id] + sql_statement += " and end_time < ?" + values.append(before) + if since != -1: + sql_statement += " and start_time > ?" + values.append(since) + sql_statement += f" ORDER BY job_id {order}" + if limit > 0: + sql_statement += " LIMIT ? OFFSET ?" + values.append(limit) + values.append(start) + cursor = await self.history_table.execute(sql_statement, values) + await cursor.set_arraysize(1000) + jobs: List[Dict[str, Any]] = [] + for row in await cursor.fetchall(): + job = dict(row) + job_id = f"{row['job_id']:06X}" jobs.append(self._prep_requested_job(job, job_id)) - i += 1 - - return {"count": count, "jobs": jobs} + return {"count": len(jobs), "jobs": jobs} async def _handle_job_totals( self, web_request: WebRequest @@ -223,15 +338,20 @@ class History: self.job_totals = dict(BASE_TOTALS) last_aux_totals = self.aux_totals self._update_aux_totals(reset=True) - database: DBComp = self.server.lookup_component("database") - await database.insert_item("moonraker", "history.job_totals", self.job_totals) - await database.insert_item("moonraker", "history.aux_totals", self.aux_totals) + totals_list = _create_totals_list(self.job_totals, self.aux_totals) + async with self.totals_table as tx: + await tx.execute( + f"DELETE FROM {TOTALS_TABLE} WHERE instance_id = ?", ("default",) + ) + await tx.executemany( + f"INSERT INTO {TOTALS_TABLE} VALUES(?, ?, ?, ?, ?)", totals_list + ) return { "last_totals": last_totals, "last_auxiliary_totals": last_aux_totals } - def _on_job_state_changed( + async def _on_job_state_changed( self, job_event: JobEvent, prev_stats: Dict[str, Any], @@ -241,16 +361,16 @@ class History: if job_event == JobEvent.STARTED: if self.current_job is not None: # Finish with the previous state - self.finish_job("cancelled", prev_stats) - self.add_job(PrinterJob(new_stats)) + await self.finish_job("cancelled", prev_stats) + await self.add_job(PrinterJob(new_stats)) elif job_event == JobEvent.COMPLETE: - self.finish_job("completed", new_stats) + await self.finish_job("completed", new_stats) elif job_event == JobEvent.ERROR: - self.finish_job("error", new_stats) + await self.finish_job("error", new_stats) elif job_event in (JobEvent.CANCELLED, JobEvent.STANDBY): # Cancel on "standby" for backward compatibility with # `CLEAR_PAUSE/SDCARD_RESET_FILE` workflow - self.finish_job("cancelled", prev_stats) + await self.finish_job("cancelled", prev_stats) def _on_job_requested(self, user: Optional[Dict[str, Any]]) -> None: username = (user or {}).get("username", "No User") @@ -258,92 +378,117 @@ class History: if self.current_job is not None: self.current_job.user = username - def _handle_shutdown(self) -> None: + async def _handle_shutdown(self) -> None: jstate: JobState = self.server.lookup_component("job_state") last_ps = jstate.get_last_stats() - self.finish_job("klippy_shutdown", last_ps) + await self.finish_job("klippy_shutdown", last_ps) - def _handle_disconnect(self) -> None: + async def _handle_disconnect(self) -> None: jstate: JobState = self.server.lookup_component("job_state") last_ps = jstate.get_last_stats() - self.finish_job("klippy_disconnect", last_ps) + await self.finish_job("klippy_disconnect", last_ps) - def add_job(self, job: PrinterJob) -> None: - if len(self.cached_job_ids) >= MAX_JOBS: - self.delete_job(self.cached_job_ids[0]) - job_id = f"{self.next_job_id:06X}" - self.current_job = job - self.current_job_id = job_id - self.current_job.user = self.job_user - self.grab_job_metadata() - for field in self.auxiliary_fields: - field.tracker.reset() - self.current_job.set_aux_data(self.auxiliary_fields) - self.history_ns[job_id] = job.get_stats() - self.cached_job_ids.append(job_id) - self.next_job_id += 1 - logging.debug( - f"History Job Added - Id: {job_id}, File: {job.filename}" - ) - self.send_history_event("added") + async def add_job(self, job: PrinterJob) -> None: + async with self.request_lock: + self.current_job = job + self.current_job_id = None + self.current_job.user = self.job_user + self.grab_job_metadata() + for field in self.auxiliary_fields: + field.tracker.reset() + self.current_job.set_aux_data(self.auxiliary_fields) + new_id = await self.save_job(job, None) + if new_id is None: + logging.info(f"Error saving job, filename '{job.filename}'") + return + self.current_job_id = new_id + job_id = f"{new_id:06X}" + self.update_metadata(job_id) + logging.debug( + f"History Job Added - Id: {job_id}, File: {job.filename}" + ) + self.send_history_event("added") - def delete_job(self, job_id: Union[int, str]) -> None: - if isinstance(job_id, int): - job_id = f"{job_id:06X}" + async def save_job(self, job: PrinterJob, job_id: Optional[int]) -> Optional[int]: + values: List[Any] = [ + job_id, + job.user, + job.filename, + job.status, + job.start_time, + job.end_time, + job.print_duration, + job.total_duration, + job.filament_used, + job.metadata, + job.auxiliary_data, + "default" + ] + placeholders = ",".join("?" * len(values)) + async with self.history_table as tx: + cursor = await tx.execute( + f"REPLACE INTO {HIST_TABLE} VALUES({placeholders})", values + ) + return cursor.lastrowid - if job_id in self.cached_job_ids: - del self.history_ns[job_id] - self.cached_job_ids.remove(job_id) + async def delete_job(self, job_id: Union[int, str]) -> None: + if isinstance(job_id, str): + job_id = int(job_id, 16) + async with self.history_table as tx: + tx.execute( + f"DELETE FROM {HIST_TABLE} WHERE job_id = ?", (job_id,) + ) - def finish_job(self, status: str, pstats: Dict[str, Any]) -> None: - if self.current_job is None: - return - cj = self.current_job - if ( - pstats.get('filename') != cj.get('filename') or - pstats.get('total_duration', 0.) < cj.get('total_duration') - ): - # Print stats have been reset, do not update this job with them - pstats = {} + async def finish_job(self, status: str, pstats: Dict[str, Any]) -> None: + async with self.request_lock: + if self.current_job is None or self.current_job_id is None: + self._reset_current_job() + return + if ( + pstats.get('filename') != self.current_job.filename or + pstats.get('total_duration', 0.) < self.current_job.total_duration + ): + # Print stats have been reset, do not update this job with them + pstats = {} + self.current_job.user = self.job_user + self.current_job.finish(status, pstats) + # Regrab metadata incase metadata wasn't parsed yet due to file upload + self.grab_job_metadata() + self.current_job.set_aux_data(self.auxiliary_fields) + job_id = f"{self.current_job_id:06X}" + await self.save_job(self.current_job, self.current_job_id) + self.update_metadata(job_id) + await self._update_job_totals() + logging.debug( + f"History Job Finished - Id: {job_id}, " + f"File: {self.current_job.filename}, " + f"Status: {status}" + ) + self.send_history_event("finished") + self._reset_current_job() - self.current_job.user = self.job_user - self.current_job.finish(status, pstats) - # Regrab metadata incase metadata wasn't parsed yet due to file upload - self.grab_job_metadata() - self.current_job.set_aux_data(self.auxiliary_fields) - self.save_current_job() - self._update_job_totals() - logging.debug( - f"History Job Finished - Id: {self.current_job_id}, " - f"File: {self.current_job.filename}, " - f"Status: {status}" - ) - self.send_history_event("finished") + def _reset_current_job(self) -> None: self.current_job = None self.current_job_id = None self.job_user = "No User" - async def get_job(self, - job_id: Union[int, str] - ) -> Optional[Dict[str, Any]]: - if isinstance(job_id, int): - job_id = f"{job_id:06X}" - return await self.history_ns.get(job_id, None) + async def get_job( + self, job_id: Union[int, str] + ) -> Optional[Dict[str, Any]]: + if isinstance(job_id, str): + job_id = int(job_id, 16) + cursor = await self.history_table.execute( + f"SELECT * FROM {HIST_TABLE} WHERE job_id = ?", (job_id,) + ) + result = await cursor.fetchone() + return dict(result) if result is not None else result def grab_job_metadata(self) -> None: if self.current_job is None: return - filename: str = self.current_job.get("filename") + filename: str = self.current_job.filename mdst = self.file_manager.get_metadata_storage() metadata: Dict[str, Any] = mdst.get(filename, {}) - if metadata: - # Add the start time and job id to the - # persistent metadata storage - metadata.update({ - 'print_start_time': self.current_job.get('start_time'), - 'job_id': self.current_job_id - }) - mdst.insert(filename, metadata.copy()) # We don't need to store these fields in the # job metadata, as they are redundant metadata.pop('print_start_time', None) @@ -352,14 +497,24 @@ class History: thumb: Dict[str, Any] for thumb in metadata['thumbnails']: thumb.pop('data', None) - self.current_job.set("metadata", metadata) + self.current_job.metadata = metadata - def save_current_job(self) -> None: - if self.current_job is None or self.current_job_id is None: + def update_metadata(self, job_id: str) -> None: + if self.current_job is None: return - self.history_ns[self.current_job_id] = self.current_job.get_stats() + mdst = self.file_manager.get_metadata_storage() + filename: str = self.current_job.filename + metadata: Dict[str, Any] = mdst.get(filename, {}) + if metadata: + # Add the start time and job id to the + # persistent metadata storage + metadata.update({ + 'print_start_time': self.current_job.get('start_time'), + 'job_id': job_id + }) + mdst.insert(filename, metadata) - def _update_job_totals(self) -> None: + async def _update_job_totals(self) -> None: if self.current_job is None: return job = self.current_job @@ -370,9 +525,11 @@ class History: self._maximize_total("longest_job", job.total_duration) self._maximize_total("longest_print", job.print_duration) self._update_aux_totals() - database: DBComp = self.server.lookup_component("database") - database.insert_item("moonraker", "history.job_totals", self.job_totals) - database.insert_item("moonraker", "history.aux_totals", self.aux_totals) + totals_list = _create_totals_list(self.job_totals, self.aux_totals) + async with self.totals_table as tx: + await tx.executemany( + f"REPLACE INTO {TOTALS_TABLE} VALUES(?, ?, ?, ?, ?)", totals_list + ) def _accumulate_total(self, field: str, val: Union[int, float]) -> None: self.job_totals[field] += val @@ -391,23 +548,25 @@ class History: def send_history_event(self, evt_action: str) -> None: if self.current_job is None or self.current_job_id is None: return - job = self._prep_requested_job( - self.current_job.get_stats(), self.current_job_id) - self.server.send_event("history:history_changed", - {'action': evt_action, 'job': job}) - - def _prep_requested_job(self, - job: Dict[str, Any], - job_id: str - ) -> Dict[str, Any]: - mtime = job.get("metadata", {}).get("modified", None) - job['job_id'] = job_id - job['exists'] = self.file_manager.check_file_exists( - "gcodes", job['filename'], mtime + job_id = f"{self.current_job_id:06X}" + job = self._prep_requested_job(self.current_job.get_stats(), job_id) + self.server.send_event( + "history:history_changed", {'action': evt_action, 'job': job} ) + + def _prep_requested_job( + self, job: Dict[str, Any], job_id: str + ) -> Dict[str, Any]: + fm = self.file_manager + mtime = job.get("metadata", {}).get("modified", None) + job["exists"] = fm.check_file_exists("gcodes", job['filename'], mtime) + job["job_id"] = job_id + job.pop("instance_id", None) return job def register_auxiliary_field(self, new_field: HistoryFieldData) -> None: + if new_field.provider == "history": + raise self.server.error("Provider name 'history' is reserved") for field in self.auxiliary_fields: if field == new_field: raise self.server.error( @@ -421,17 +580,19 @@ class History: return False return not self.job_paused if check_paused else True - def on_exit(self) -> None: + async def on_exit(self) -> None: + if self.current_job is None: + return jstate: JobState = self.server.lookup_component("job_state") last_ps = jstate.get_last_stats() - self.finish_job("server_exit", last_ps) + await self.finish_job("server_exit", last_ps) class PrinterJob: def __init__(self, data: Dict[str, Any] = {}) -> None: self.end_time: Optional[float] = None self.filament_used: float = 0 - self.filename: Optional[str] = None - self.metadata: Optional[Dict[str, Any]] = None + self.filename: str = "" + self.metadata: Dict[str, Any] = {} self.print_duration: float = 0. self.status: str = "in_progress" self.start_time = time.time()