From b1a232efcedff7488e368153df9eb6222ad00fa4 Mon Sep 17 00:00:00 2001 From: Eric Callahan Date: Sun, 23 Jan 2022 08:31:56 -0500 Subject: [PATCH] update_manager: remove redundant locks All requests to update, refresh, recover, or reinstall must acquire the command lock. Given that the individual Deployment implementations are not (and should not be) called from outside of a request the locks they use to prevent unwanted re-entry are redundant, confusing, and could potential result in a deadlock if used improperly. Signed-off-by: Eric Callahan --- .../update_manager/update_manager.py | 154 ++++++++---------- .../components/update_manager/zip_deploy.py | 80 ++++----- 2 files changed, 101 insertions(+), 133 deletions(-) diff --git a/moonraker/components/update_manager/update_manager.py b/moonraker/components/update_manager/update_manager.py index c901938..272beb6 100644 --- a/moonraker/components/update_manager/update_manager.py +++ b/moonraker/components/update_manager/update_manager.py @@ -783,8 +783,6 @@ class PackageDeploy(BaseDeploy): storage = self._load_storage() self.use_packagekit = config.getboolean("enable_packagekit", True) self.available_packages: List[str] = storage.get('packages', []) - self.refresh_evt: Optional[asyncio.Event] = None - self.mutex: asyncio.Lock = asyncio.Lock() async def initialize(self) -> None: provider: BasePackageProvider @@ -833,26 +831,19 @@ class PackageDeploy(BaseDeploy): return provider async def refresh(self) -> None: - if self.refresh_evt is not None: - self.refresh_evt.wait() - return - async with self.mutex: - self.refresh_evt = asyncio.Event() - try: - # Do not force a refresh until the server has started - if self.server.is_running(): - await self._update_package_cache(force=True) - self.available_packages = await self.provider.get_packages() - pkg_msg = "\n".join(self.available_packages) - logging.info( - f"Detected {len(self.available_packages)} package updates:" - f"\n{pkg_msg}") - except Exception: - logging.exception("Error Refreshing System Packages") - self.refresh_evt.set() - self.refresh_evt = None - # Update Persistent Storage - self._save_state() + try: + # Do not force a refresh until the server has started + if self.server.is_running(): + await self._update_package_cache(force=True) + self.available_packages = await self.provider.get_packages() + pkg_msg = "\n".join(self.available_packages) + logging.info( + f"Detected {len(self.available_packages)} package updates:" + f"\n{pkg_msg}") + except Exception: + logging.exception("Error Refreshing System Packages") + # Update Persistent Storage + self._save_state() def get_persistent_data(self) -> Dict[str, Any]: storage = super().get_persistent_data() @@ -860,20 +851,19 @@ class PackageDeploy(BaseDeploy): return storage async def update(self) -> bool: - async with self.mutex: - if not self.available_packages: - return False - self.cmd_helper.notify_update_response("Updating packages...") - try: - await self._update_package_cache(force=True, notify=True) - await self.provider.upgrade_system() - except Exception: - raise self.server.error("Error updating system packages") - self.available_packages = [] - self._save_state() - self.cmd_helper.notify_update_response( - "Package update finished...", is_complete=True) - return True + if not self.available_packages: + return False + self.cmd_helper.notify_update_response("Updating packages...") + try: + await self._update_package_cache(force=True, notify=True) + await self.provider.upgrade_system() + except Exception: + raise self.server.error("Error updating system packages") + self.available_packages = [] + self._save_state() + self.cmd_helper.notify_update_response( + "Package update finished...", is_complete=True) + return True async def _update_package_cache(self, force: bool = False, @@ -1304,8 +1294,6 @@ class WebClientDeploy(BaseDeploy): dl_info: List[Any] = storage.get('dl_info', ["?", "?", 0]) self.dl_info: Tuple[str, str, int] = cast( Tuple[str, str, int], tuple(dl_info)) - self.refresh_evt: Optional[asyncio.Event] = None - self.mutex: asyncio.Lock = asyncio.Lock() logging.info(f"\nInitializing Client Updater: '{self.name}'," f"\nChannel: {self.channel}" f"\npath: {self.path}") @@ -1320,19 +1308,12 @@ class WebClientDeploy(BaseDeploy): self.version = "?" async def refresh(self) -> None: - if self.refresh_evt is not None: - self.refresh_evt.wait() - return - async with self.mutex: - self.refresh_evt = asyncio.Event() - try: - await self._get_local_version() - await self._get_remote_version() - except Exception: - logging.exception("Error Refreshing Client") - self.refresh_evt.set() - self.refresh_evt = None - self._save_state() + try: + await self._get_local_version() + await self._get_remote_version() + except Exception: + logging.exception("Error Refreshing Client") + self._save_state() async def _get_remote_version(self) -> None: # Remote state @@ -1375,45 +1356,44 @@ class WebClientDeploy(BaseDeploy): return storage async def update(self) -> bool: - async with self.mutex: + if self.remote_version == "?": + await self._get_remote_version() if self.remote_version == "?": - await self._get_remote_version() - if self.remote_version == "?": - raise self.server.error( - f"Client {self.repo}: Unable to locate update") - dl_url, content_type, size = self.dl_info - if dl_url == "?": raise self.server.error( - f"Client {self.repo}: Invalid download url") - if self.version == self.remote_version: - # Already up to date - return False - event_loop = self.server.get_event_loop() + f"Client {self.repo}: Unable to locate update") + dl_url, content_type, size = self.dl_info + if dl_url == "?": + raise self.server.error( + f"Client {self.repo}: Invalid download url") + if self.version == self.remote_version: + # Already up to date + return False + event_loop = self.server.get_event_loop() + self.cmd_helper.notify_update_response( + f"Updating Web Client {self.name}...") + self.cmd_helper.notify_update_response( + f"Downloading Client: {self.name}") + with tempfile.TemporaryDirectory( + suffix=self.name, prefix="client") as tempdirname: + tempdir = pathlib.Path(tempdirname) + temp_download_file = tempdir.joinpath(f"{self.name}.zip") + temp_persist_dir = tempdir.joinpath(self.name) + await self.cmd_helper.streaming_download_request( + dl_url, temp_download_file, content_type, size) self.cmd_helper.notify_update_response( - f"Updating Web Client {self.name}...") - self.cmd_helper.notify_update_response( - f"Downloading Client: {self.name}") - with tempfile.TemporaryDirectory( - suffix=self.name, prefix="client") as tempdirname: - tempdir = pathlib.Path(tempdirname) - temp_download_file = tempdir.joinpath(f"{self.name}.zip") - temp_persist_dir = tempdir.joinpath(self.name) - await self.cmd_helper.streaming_download_request( - dl_url, temp_download_file, content_type, size) - self.cmd_helper.notify_update_response( - f"Download Complete, extracting release to '{self.path}'") - await event_loop.run_in_thread( - self._extract_release, temp_persist_dir, - temp_download_file) - self.version = self.remote_version - version_path = self.path.joinpath(".version") - if not version_path.exists(): - await event_loop.run_in_thread( - version_path.write_text, self.version) - self.cmd_helper.notify_update_response( - f"Client Update Finished: {self.name}", is_complete=True) - self._save_state() - return True + f"Download Complete, extracting release to '{self.path}'") + await event_loop.run_in_thread( + self._extract_release, temp_persist_dir, + temp_download_file) + self.version = self.remote_version + version_path = self.path.joinpath(".version") + if not version_path.exists(): + await event_loop.run_in_thread( + version_path.write_text, self.version) + self.cmd_helper.notify_update_response( + f"Client Update Finished: {self.name}", is_complete=True) + self._save_state() + return True def _extract_release(self, persist_dir: pathlib.Path, diff --git a/moonraker/components/update_manager/zip_deploy.py b/moonraker/components/update_manager/zip_deploy.py index b74cce5..ae42fac 100644 --- a/moonraker/components/update_manager/zip_deploy.py +++ b/moonraker/components/update_manager/zip_deploy.py @@ -5,7 +5,6 @@ # This file may be distributed under the terms of the GNU GPLv3 license. from __future__ import annotations -import asyncio import os import pathlib import json @@ -74,8 +73,6 @@ class ZipDeploy(AppDeploy): self.package_list: List[str] = [] self.python_pkg_list: List[str] = [] self.release_download_info: Tuple[str, str, int] = ("?", "?", 0) - self.mutex: asyncio.Lock = asyncio.Lock() - self.refresh_event: Optional[asyncio.Event] = None @staticmethod async def from_application(app: AppDeploy) -> ZipDeploy: @@ -125,18 +122,11 @@ class ZipDeploy(AppDeploy): return tag_version async def refresh(self) -> None: - if self.refresh_event is not None: - await self.refresh_event.wait() - return - async with self.mutex: - self.refresh_event = asyncio.Event() - try: - await self._update_repo_state() - except Exception: - self.verified = False - self.log_exc("Error refreshing application state") - self.refresh_event.set() - self.refresh_event = None + try: + await self._update_repo_state() + except Exception: + self.verified = False + self.log_exc("Error refreshing application state") async def _update_repo_state(self) -> None: self.errors = [] @@ -372,42 +362,40 @@ class ZipDeploy(AppDeploy): zf.extractall(self.path) async def update(self, force_dep_update: bool = False) -> bool: - async with self.mutex: - if not self._is_valid: - raise self.log_exc("Update aborted, repo not valid", False) - if self.short_version == self.latest_version: - # already up to date - return False - self.cmd_helper.notify_update_response( - f"Updating Application {self.name}...") - npm_hash = await self._get_file_hash(self.npm_pkg_json) - dl_url, content_type, size = self.release_download_info - self.notify_status("Starting Download...") - with tempfile.TemporaryDirectory( - suffix=self.name, prefix="app") as tempdirname: - tempdir = pathlib.Path(tempdirname) - temp_download_file = tempdir.joinpath(f"{self.name}.zip") - await self.cmd_helper.streaming_download_request( - dl_url, temp_download_file, content_type, size) - self.notify_status( - f"Download Complete, extracting release to '{self.path}'") - event_loop = self.server.get_event_loop() - await event_loop.run_in_thread( - self._extract_release, temp_download_file) - await self._update_dependencies(npm_hash, force=force_dep_update) - await self._update_repo_state() - await self.restart_service() - self.notify_status("Update Finished...", is_complete=True) - return True + if not self._is_valid: + raise self.log_exc("Update aborted, repo not valid", False) + if self.short_version == self.latest_version: + # already up to date + return False + self.cmd_helper.notify_update_response( + f"Updating Application {self.name}...") + npm_hash = await self._get_file_hash(self.npm_pkg_json) + dl_url, content_type, size = self.release_download_info + self.notify_status("Starting Download...") + with tempfile.TemporaryDirectory( + suffix=self.name, prefix="app") as tempdirname: + tempdir = pathlib.Path(tempdirname) + temp_download_file = tempdir.joinpath(f"{self.name}.zip") + await self.cmd_helper.streaming_download_request( + dl_url, temp_download_file, content_type, size) + self.notify_status( + f"Download Complete, extracting release to '{self.path}'") + event_loop = self.server.get_event_loop() + await event_loop.run_in_thread( + self._extract_release, temp_download_file) + await self._update_dependencies(npm_hash, force=force_dep_update) + await self._update_repo_state() + await self.restart_service() + self.notify_status("Update Finished...", is_complete=True) + return True async def recover(self, hard: bool = False, force_dep_update: bool = False ) -> None: - async with self.mutex: - url = f"https://api.github.com/repos/{self.host_repo}/releases" - releases = await self._fetch_github_releases(url) - await self._process_latest_release(releases[1]) + url = f"https://api.github.com/repos/{self.host_repo}/releases" + releases = await self._fetch_github_releases(url) + await self._process_latest_release(releases[1]) await self.update(force_dep_update=force_dep_update) async def reinstall(self) -> None: