diff --git a/moonraker/components/update_manager/update_manager.py b/moonraker/components/update_manager/update_manager.py index 1181426..4c8fc66 100644 --- a/moonraker/components/update_manager/update_manager.py +++ b/moonraker/components/update_manager/update_manager.py @@ -10,6 +10,7 @@ import os import logging import time import tempfile +import pathlib from .common import AppType, get_base_configuration, get_app_type from .base_deploy import BaseDeploy from .app_deploy import AppDeploy @@ -17,6 +18,7 @@ from .git_deploy import GitDeploy from .zip_deploy import ZipDeploy from .system_deploy import PackageDeploy from ...common import RequestType +from ...utils.filelock import AsyncExclusiveFileLock, LockTimeout # Annotation imports from typing import ( @@ -179,7 +181,7 @@ class UpdateManager: return self.updaters async def component_init(self) -> None: - self.instance_tracker.set_instance_id() + await self.instance_tracker.set_instance_id() # Prune stale data from the database umdb = self.cmd_helper.get_umdb() db_keys = await umdb.keys() @@ -497,7 +499,7 @@ class UpdateManager: async def close(self) -> None: if self.refresh_timer is not None: self.refresh_timer.stop() - self.instance_tracker.close() + await self.instance_tracker.close() for updater in self.updaters.values(): ret = updater.close() if ret is not None: @@ -671,88 +673,63 @@ class CommandHelper: class InstanceTracker: def __init__(self, server: Server) -> None: self.server = server - self.inst_id = b"" - self.shm = self._try_open_shm() + self.inst_id = "" + tmpdir = pathlib.Path(tempfile.gettempdir()) + self.inst_file_path = tmpdir.joinpath("moonraker_instance_ids") - def _try_open_shm(self) -> Any: - prev_mask = os.umask(0) - try: - from multiprocessing.shared_memory import SharedMemory - setattr(SharedMemory, "_mode", 438) - try: - return SharedMemory("moonraker_instance_ids", True, 4096) - except FileExistsError: - return SharedMemory("moonraker_instance_ids") - except Exception as e: - self.server.add_log_rollover_item( - "um_multi_instance_msg", - "Failed to open shared memory, update_manager instance tracking " - f"disabled.\n{e.__class__.__name__}: {e}" - ) - return None - finally: - os.umask(prev_mask) - - def get_instance_id(self) -> bytes: + def get_instance_id(self) -> str: machine: Machine = self.server.lookup_component("machine") cur_name = "".join(machine.unit_name.split()) cur_uuid: str = self.server.get_app_args()["instance_uuid"] pid = os.getpid() - return f"{cur_name}:{cur_uuid}:{pid}".encode(errors="ignore") + return f"{cur_name}:{cur_uuid}:{pid}" - def _read_instance_ids(self) -> List[bytes]: - if self.shm is not None: - try: - data = bytearray(self.shm.buf) - idx = data.find(b"\x00") - if idx > 1: - return bytes(data[:idx]).strip().splitlines() - except Exception: - logging.exception("Failed to Read Shared Memory") - return [] + async def _read_instance_ids(self) -> List[str]: + if not self.inst_file_path.exists(): + return [] + eventloop = self.server.get_event_loop() + id_data = await eventloop.run_in_thread(self.inst_file_path.read_text) + return [iid.strip() for iid in id_data.strip().splitlines() if iid.strip()] - def set_instance_id(self) -> None: - if self.shm is None: - return - self.inst_id = self.get_instance_id() - iids = self._read_instance_ids() - if self.inst_id not in iids: - iids.append(self.inst_id) - if len(iids) > 1: - id_str = "\n".join([iid.decode(errors="ignore") for iid in iids]) - self.server.add_log_rollover_item( - "um_multi_instance_msg", - "Multiple instances of Moonraker have the update manager enabled." - f"\n{id_str}" - ) - encoded_ids = b"\n".join(iids) + b"\x00" - if len(encoded_ids) > self.shm.size: - iid = self.inst_id.decode(errors="ignore") - logging.info(f"Not enough storage in shared memory for id {iid}") - return + async def set_instance_id(self) -> None: try: - buf: memoryview = self.shm.buf - buf[:len(encoded_ids)] = encoded_ids + async with AsyncExclusiveFileLock(self.inst_file_path, 2.): + self.inst_id = self.get_instance_id() + iids = await self._read_instance_ids() + if self.inst_id not in iids: + iids.append(self.inst_id) + iid_string = "\n".join(iids) + if len(iids) > 1: + self.server.add_log_rollover_item( + "um_multi_instance_msg", + "Multiple instances of Moonraker have the update " + f"manager enabled.\n{iid_string}" + ) + eventloop = self.server.get_event_loop() + await eventloop.run_in_thread( + self.inst_file_path.write_text, iid_string + ) + except LockTimeout as e: + logging.info(str(e)) except Exception: - logging.exception("Failed to Write Shared Memory") + logging.exception("Failed to set instance id") - def close(self) -> None: - if self.shm is None: - return - # Remove current id and clean up shared memory - iids = self._read_instance_ids() - if self.inst_id in iids: - iids.remove(self.inst_id) + async def close(self) -> None: try: - buf: memoryview = self.shm.buf - null_len = min(self.shm.size, max(len(self.inst_id), 10)) - data = b"\n".join(iids) + b"\x00" if iids else b"\x00" * null_len - buf[:len(data)] = data - self.shm.close() - if not iids: - self.shm.unlink() + async with AsyncExclusiveFileLock(self.inst_file_path, 2.): + # Remove current id + iids = await self._read_instance_ids() + if self.inst_id in iids: + iids.remove(self.inst_id) + iid_string = "\n".join(iids) + eventloop = self.server.get_event_loop() + await eventloop.run_in_thread( + self.inst_file_path.write_text, iid_string + ) + except LockTimeout as e: + logging.info(str(e)) except Exception: - logging.exception("Failed to write/close shared memory") + logging.exception("Failed to remove instance id") def load_component(config: ConfigHelper) -> UpdateManager: