update_manager: remove redundant locks

All requests to update, refresh, recover, or reinstall must acquire
the command lock. Given that the individual Deployment implementations
are not (and should not be) called from outside of a request the locks they
use to prevent unwanted re-entry are redundant, confusing, and could
potential result in a deadlock if used improperly.

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2022-01-23 08:31:56 -05:00 committed by Eric Callahan
parent 4d473969db
commit b1a232efce
2 changed files with 101 additions and 133 deletions

View File

@ -783,8 +783,6 @@ class PackageDeploy(BaseDeploy):
storage = self._load_storage() storage = self._load_storage()
self.use_packagekit = config.getboolean("enable_packagekit", True) self.use_packagekit = config.getboolean("enable_packagekit", True)
self.available_packages: List[str] = storage.get('packages', []) self.available_packages: List[str] = storage.get('packages', [])
self.refresh_evt: Optional[asyncio.Event] = None
self.mutex: asyncio.Lock = asyncio.Lock()
async def initialize(self) -> None: async def initialize(self) -> None:
provider: BasePackageProvider provider: BasePackageProvider
@ -833,26 +831,19 @@ class PackageDeploy(BaseDeploy):
return provider return provider
async def refresh(self) -> None: async def refresh(self) -> None:
if self.refresh_evt is not None: try:
self.refresh_evt.wait() # Do not force a refresh until the server has started
return if self.server.is_running():
async with self.mutex: await self._update_package_cache(force=True)
self.refresh_evt = asyncio.Event() self.available_packages = await self.provider.get_packages()
try: pkg_msg = "\n".join(self.available_packages)
# Do not force a refresh until the server has started logging.info(
if self.server.is_running(): f"Detected {len(self.available_packages)} package updates:"
await self._update_package_cache(force=True) f"\n{pkg_msg}")
self.available_packages = await self.provider.get_packages() except Exception:
pkg_msg = "\n".join(self.available_packages) logging.exception("Error Refreshing System Packages")
logging.info( # Update Persistent Storage
f"Detected {len(self.available_packages)} package updates:" self._save_state()
f"\n{pkg_msg}")
except Exception:
logging.exception("Error Refreshing System Packages")
self.refresh_evt.set()
self.refresh_evt = None
# Update Persistent Storage
self._save_state()
def get_persistent_data(self) -> Dict[str, Any]: def get_persistent_data(self) -> Dict[str, Any]:
storage = super().get_persistent_data() storage = super().get_persistent_data()
@ -860,20 +851,19 @@ class PackageDeploy(BaseDeploy):
return storage return storage
async def update(self) -> bool: async def update(self) -> bool:
async with self.mutex: if not self.available_packages:
if not self.available_packages: return False
return False self.cmd_helper.notify_update_response("Updating packages...")
self.cmd_helper.notify_update_response("Updating packages...") try:
try: await self._update_package_cache(force=True, notify=True)
await self._update_package_cache(force=True, notify=True) await self.provider.upgrade_system()
await self.provider.upgrade_system() except Exception:
except Exception: raise self.server.error("Error updating system packages")
raise self.server.error("Error updating system packages") self.available_packages = []
self.available_packages = [] self._save_state()
self._save_state() self.cmd_helper.notify_update_response(
self.cmd_helper.notify_update_response( "Package update finished...", is_complete=True)
"Package update finished...", is_complete=True) return True
return True
async def _update_package_cache(self, async def _update_package_cache(self,
force: bool = False, force: bool = False,
@ -1304,8 +1294,6 @@ class WebClientDeploy(BaseDeploy):
dl_info: List[Any] = storage.get('dl_info', ["?", "?", 0]) dl_info: List[Any] = storage.get('dl_info', ["?", "?", 0])
self.dl_info: Tuple[str, str, int] = cast( self.dl_info: Tuple[str, str, int] = cast(
Tuple[str, str, int], tuple(dl_info)) Tuple[str, str, int], tuple(dl_info))
self.refresh_evt: Optional[asyncio.Event] = None
self.mutex: asyncio.Lock = asyncio.Lock()
logging.info(f"\nInitializing Client Updater: '{self.name}'," logging.info(f"\nInitializing Client Updater: '{self.name}',"
f"\nChannel: {self.channel}" f"\nChannel: {self.channel}"
f"\npath: {self.path}") f"\npath: {self.path}")
@ -1320,19 +1308,12 @@ class WebClientDeploy(BaseDeploy):
self.version = "?" self.version = "?"
async def refresh(self) -> None: async def refresh(self) -> None:
if self.refresh_evt is not None: try:
self.refresh_evt.wait() await self._get_local_version()
return await self._get_remote_version()
async with self.mutex: except Exception:
self.refresh_evt = asyncio.Event() logging.exception("Error Refreshing Client")
try: self._save_state()
await self._get_local_version()
await self._get_remote_version()
except Exception:
logging.exception("Error Refreshing Client")
self.refresh_evt.set()
self.refresh_evt = None
self._save_state()
async def _get_remote_version(self) -> None: async def _get_remote_version(self) -> None:
# Remote state # Remote state
@ -1375,45 +1356,44 @@ class WebClientDeploy(BaseDeploy):
return storage return storage
async def update(self) -> bool: async def update(self) -> bool:
async with self.mutex: if self.remote_version == "?":
await self._get_remote_version()
if self.remote_version == "?": if self.remote_version == "?":
await self._get_remote_version()
if self.remote_version == "?":
raise self.server.error(
f"Client {self.repo}: Unable to locate update")
dl_url, content_type, size = self.dl_info
if dl_url == "?":
raise self.server.error( raise self.server.error(
f"Client {self.repo}: Invalid download url") f"Client {self.repo}: Unable to locate update")
if self.version == self.remote_version: dl_url, content_type, size = self.dl_info
# Already up to date if dl_url == "?":
return False raise self.server.error(
event_loop = self.server.get_event_loop() f"Client {self.repo}: Invalid download url")
if self.version == self.remote_version:
# Already up to date
return False
event_loop = self.server.get_event_loop()
self.cmd_helper.notify_update_response(
f"Updating Web Client {self.name}...")
self.cmd_helper.notify_update_response(
f"Downloading Client: {self.name}")
with tempfile.TemporaryDirectory(
suffix=self.name, prefix="client") as tempdirname:
tempdir = pathlib.Path(tempdirname)
temp_download_file = tempdir.joinpath(f"{self.name}.zip")
temp_persist_dir = tempdir.joinpath(self.name)
await self.cmd_helper.streaming_download_request(
dl_url, temp_download_file, content_type, size)
self.cmd_helper.notify_update_response( self.cmd_helper.notify_update_response(
f"Updating Web Client {self.name}...") f"Download Complete, extracting release to '{self.path}'")
self.cmd_helper.notify_update_response( await event_loop.run_in_thread(
f"Downloading Client: {self.name}") self._extract_release, temp_persist_dir,
with tempfile.TemporaryDirectory( temp_download_file)
suffix=self.name, prefix="client") as tempdirname: self.version = self.remote_version
tempdir = pathlib.Path(tempdirname) version_path = self.path.joinpath(".version")
temp_download_file = tempdir.joinpath(f"{self.name}.zip") if not version_path.exists():
temp_persist_dir = tempdir.joinpath(self.name) await event_loop.run_in_thread(
await self.cmd_helper.streaming_download_request( version_path.write_text, self.version)
dl_url, temp_download_file, content_type, size) self.cmd_helper.notify_update_response(
self.cmd_helper.notify_update_response( f"Client Update Finished: {self.name}", is_complete=True)
f"Download Complete, extracting release to '{self.path}'") self._save_state()
await event_loop.run_in_thread( return True
self._extract_release, temp_persist_dir,
temp_download_file)
self.version = self.remote_version
version_path = self.path.joinpath(".version")
if not version_path.exists():
await event_loop.run_in_thread(
version_path.write_text, self.version)
self.cmd_helper.notify_update_response(
f"Client Update Finished: {self.name}", is_complete=True)
self._save_state()
return True
def _extract_release(self, def _extract_release(self,
persist_dir: pathlib.Path, persist_dir: pathlib.Path,

View File

@ -5,7 +5,6 @@
# This file may be distributed under the terms of the GNU GPLv3 license. # This file may be distributed under the terms of the GNU GPLv3 license.
from __future__ import annotations from __future__ import annotations
import asyncio
import os import os
import pathlib import pathlib
import json import json
@ -74,8 +73,6 @@ class ZipDeploy(AppDeploy):
self.package_list: List[str] = [] self.package_list: List[str] = []
self.python_pkg_list: List[str] = [] self.python_pkg_list: List[str] = []
self.release_download_info: Tuple[str, str, int] = ("?", "?", 0) self.release_download_info: Tuple[str, str, int] = ("?", "?", 0)
self.mutex: asyncio.Lock = asyncio.Lock()
self.refresh_event: Optional[asyncio.Event] = None
@staticmethod @staticmethod
async def from_application(app: AppDeploy) -> ZipDeploy: async def from_application(app: AppDeploy) -> ZipDeploy:
@ -125,18 +122,11 @@ class ZipDeploy(AppDeploy):
return tag_version return tag_version
async def refresh(self) -> None: async def refresh(self) -> None:
if self.refresh_event is not None: try:
await self.refresh_event.wait() await self._update_repo_state()
return except Exception:
async with self.mutex: self.verified = False
self.refresh_event = asyncio.Event() self.log_exc("Error refreshing application state")
try:
await self._update_repo_state()
except Exception:
self.verified = False
self.log_exc("Error refreshing application state")
self.refresh_event.set()
self.refresh_event = None
async def _update_repo_state(self) -> None: async def _update_repo_state(self) -> None:
self.errors = [] self.errors = []
@ -372,42 +362,40 @@ class ZipDeploy(AppDeploy):
zf.extractall(self.path) zf.extractall(self.path)
async def update(self, force_dep_update: bool = False) -> bool: async def update(self, force_dep_update: bool = False) -> bool:
async with self.mutex: if not self._is_valid:
if not self._is_valid: raise self.log_exc("Update aborted, repo not valid", False)
raise self.log_exc("Update aborted, repo not valid", False) if self.short_version == self.latest_version:
if self.short_version == self.latest_version: # already up to date
# already up to date return False
return False self.cmd_helper.notify_update_response(
self.cmd_helper.notify_update_response( f"Updating Application {self.name}...")
f"Updating Application {self.name}...") npm_hash = await self._get_file_hash(self.npm_pkg_json)
npm_hash = await self._get_file_hash(self.npm_pkg_json) dl_url, content_type, size = self.release_download_info
dl_url, content_type, size = self.release_download_info self.notify_status("Starting Download...")
self.notify_status("Starting Download...") with tempfile.TemporaryDirectory(
with tempfile.TemporaryDirectory( suffix=self.name, prefix="app") as tempdirname:
suffix=self.name, prefix="app") as tempdirname: tempdir = pathlib.Path(tempdirname)
tempdir = pathlib.Path(tempdirname) temp_download_file = tempdir.joinpath(f"{self.name}.zip")
temp_download_file = tempdir.joinpath(f"{self.name}.zip") await self.cmd_helper.streaming_download_request(
await self.cmd_helper.streaming_download_request( dl_url, temp_download_file, content_type, size)
dl_url, temp_download_file, content_type, size) self.notify_status(
self.notify_status( f"Download Complete, extracting release to '{self.path}'")
f"Download Complete, extracting release to '{self.path}'") event_loop = self.server.get_event_loop()
event_loop = self.server.get_event_loop() await event_loop.run_in_thread(
await event_loop.run_in_thread( self._extract_release, temp_download_file)
self._extract_release, temp_download_file) await self._update_dependencies(npm_hash, force=force_dep_update)
await self._update_dependencies(npm_hash, force=force_dep_update) await self._update_repo_state()
await self._update_repo_state() await self.restart_service()
await self.restart_service() self.notify_status("Update Finished...", is_complete=True)
self.notify_status("Update Finished...", is_complete=True) return True
return True
async def recover(self, async def recover(self,
hard: bool = False, hard: bool = False,
force_dep_update: bool = False force_dep_update: bool = False
) -> None: ) -> None:
async with self.mutex: url = f"https://api.github.com/repos/{self.host_repo}/releases"
url = f"https://api.github.com/repos/{self.host_repo}/releases" releases = await self._fetch_github_releases(url)
releases = await self._fetch_github_releases(url) await self._process_latest_release(releases[1])
await self._process_latest_release(releases[1])
await self.update(force_dep_update=force_dep_update) await self.update(force_dep_update=force_dep_update)
async def reinstall(self) -> None: async def reinstall(self) -> None: