diff --git a/moonraker/components/update_manager/update_manager.py b/moonraker/components/update_manager/update_manager.py index fca00e0..2833b7d 100644 --- a/moonraker/components/update_manager/update_manager.py +++ b/moonraker/components/update_manager/update_manager.py @@ -7,20 +7,16 @@ from __future__ import annotations import asyncio import os -import pathlib import logging -import shutil -import zipfile import time import tempfile -import json -from ...utils import source_info -from .common import AppType, Channel, get_base_configuration, get_app_type +from .common import AppType, get_base_configuration, get_app_type from .base_deploy import BaseDeploy from .app_deploy import AppDeploy from .git_deploy import GitDeploy from .zip_deploy import ZipDeploy from .system_deploy import PackageDeploy +from .web_deploy import WebClientDeploy # Annotation imports from typing import ( @@ -30,7 +26,6 @@ from typing import ( Callable, Optional, Set, - Tuple, Type, Union, Dict, @@ -47,7 +42,6 @@ if TYPE_CHECKING: from ..database import NamespaceWrapper from ..machine import Machine from ..http_client import HttpClient - from ..file_manager.file_manager import FileManager from ...eventloop import FlexTimer JsonType = Union[List[Any], Dict[str, Any]] _T = TypeVar("_T") @@ -654,362 +648,5 @@ class CommandHelper: return await eventloop.run_in_thread(_createdir, suffix, prefix) -class WebClientDeploy(BaseDeploy): - def __init__(self, - config: ConfigHelper, - cmd_helper: CommandHelper - ) -> None: - super().__init__(config, cmd_helper, prefix="Web Client") - self.repo = config.get('repo').strip().strip("/") - self.owner, self.project_name = self.repo.split("/", 1) - self.path = pathlib.Path(config.get("path")).expanduser().resolve() - self.type = AppType.from_string(config.get('type')) - self.channel = Channel.from_string(config.get("channel", "stable")) - if self.channel == Channel.DEV: - self.server.add_warning( - f"Invalid Channel '{self.channel}' for config " - f"section [{config.get_name()}], type: {self.type}. " - f"Must be one of the following: stable, beta. " - f"Falling back to beta channel" - ) - self.channel = Channel.BETA - self.info_tags: List[str] = config.getlist("info_tags", []) - self.persistent_files: List[str] = [] - self.warnings: List[str] = [] - self.version: str = "?" - pfiles = config.getlist('persistent_files', None) - if pfiles is not None: - self.persistent_files = [pf.strip("/") for pf in pfiles] - if ".version" in self.persistent_files: - raise config.error( - "Invalid value for option 'persistent_files': " - "'.version' can not be persistent") - self._valid: bool = True - self._is_prerelease: bool = False - self._is_fallback: bool = False - self._path_writable: bool = False - - async def _validate_client_info(self) -> None: - self._valid = False - self._is_fallback = False - eventloop = self.server.get_event_loop() - self.warnings.clear() - repo_parent = source_info.find_git_repo(self.path) - homedir = pathlib.Path("~").expanduser() - if not self._path_writable: - self.warnings.append( - f"Location at option 'path: {self.path}' is not writable." - ) - elif not self.path.is_dir(): - self.warnings.append( - f"Location at option 'path: {self.path}' is not a directory." - ) - elif repo_parent is not None and repo_parent != homedir: - self.warnings.append( - f"Location at option 'path: {self.path}' is within a git repo. Found " - f".git folder at '{repo_parent.joinpath('.git')}'" - ) - else: - rinfo = self.path.joinpath("release_info.json") - if rinfo.is_file(): - try: - data = await eventloop.run_in_thread(rinfo.read_text) - uinfo: Dict[str, str] = json.loads(data) - project_name = uinfo["project_name"] - owner = uinfo["project_owner"] - self.version = uinfo["version"] - except Exception: - logging.exception("Failed to load release_info.json.") - else: - self._valid = True - detected_repo = f"{owner}/{project_name}" - if self.repo.lower() != detected_repo.lower(): - self.warnings.append( - f"Value at option 'repo: {self.repo}' does not match " - f"detected repo '{detected_repo}', falling back to " - "detected version." - ) - self.repo = detected_repo - self.owner = owner - self.project_name = project_name - else: - version_path = self.path.joinpath(".version") - if version_path.is_file(): - version = await eventloop.run_in_thread(version_path.read_text) - self.version = version.strip() - self._valid = await self._detect_fallback() - if not self._valid: - self.warnings.append("Failed to validate client installation") - if self.server.is_debug_enabled(): - self.log_info("Debug Enabled, overriding validity checks") - - async def _detect_fallback(self) -> bool: - fallback_defs = { - "mainsail": "mainsail-crew", - "fluidd": "fluidd-core" - } - for fname in ("manifest.json", "manifest.webmanifest"): - manifest = self.path.joinpath(fname) - eventloop = self.server.get_event_loop() - if manifest.is_file(): - try: - mtext = await eventloop.run_in_thread(manifest.read_text) - mdata: Dict[str, Any] = json.loads(mtext) - proj_name: str = mdata["name"].lower() - except Exception: - self.log_exc(f"Failed to load json from {manifest}") - continue - if proj_name in fallback_defs: - owner = fallback_defs[proj_name] - detected_repo = f"{owner}/{proj_name}" - if detected_repo != self.repo.lower(): - self.warnings.append( - f"Value at option 'repo: {self.repo}' does not match " - f"detected repo '{detected_repo}', falling back to " - "detected version." - ) - self.repo = detected_repo - self.owner = owner - self.project_name = proj_name - self._is_fallback = True - return True - return False - - async def initialize(self) -> Dict[str, Any]: - fm: FileManager = self.server.lookup_component("file_manager") - self._path_writable = not fm.check_reserved_path( - self.path, need_write=True, raise_error=False - ) - if self._path_writable: - fm.add_reserved_path(f"update_manager {self.name}", self.path) - await self._validate_client_info() - storage = await super().initialize() - if self.version == "?": - self.version = storage.get("version", "?") - self.remote_version: str = storage.get('remote_version', "?") - self.rollback_version: str = storage.get('rollback_version', self.version) - self.rollback_repo: str = storage.get( - 'rollback_repo', self.repo if self._valid else "?" - ) - self.last_error: str = storage.get('last_error', "") - dl_info: List[Any] = storage.get('dl_info', ["?", "?", 0]) - self.dl_info: Tuple[str, str, int] = cast( - Tuple[str, str, int], tuple(dl_info)) - if not self.needs_refresh(): - self._log_client_info() - return storage - - def _log_client_info(self) -> None: - warn_str = "" - if self.warnings: - warn_str = "\nWarnings:\n" - warn_str += "\n".join([f" {item}" for item in self.warnings]) - dl_url, content_type, size = self.dl_info - logging.info( - f"Web Client {self.name} Detected:\n" - f"Repo: {self.repo}\n" - f"Channel: {self.channel}\n" - f"Path: {self.path}\n" - f"Local Version: {self.version}\n" - f"Remote Version: {self.remote_version}\n" - f"Valid: {self._valid}\n" - f"Fallback Client Detected: {self._is_fallback}\n" - f"Pre-release: {self._is_prerelease}\n" - f"Download Url: {dl_url}\n" - f"Download Size: {size}\n" - f"Content Type: {content_type}\n" - f"Rollback Version: {self.rollback_version}\n" - f"Rollback Repo: {self.rollback_repo}" - f"{warn_str}" - ) - - async def refresh(self) -> None: - try: - if not self._valid: - await self._validate_client_info() - await self._get_remote_version() - except Exception: - logging.exception("Error Refreshing Client") - self._log_client_info() - self._save_state() - - async def _fetch_github_version( - self, repo: Optional[str] = None, tag: Optional[str] = None - ) -> Dict[str, Any]: - if repo is None: - if not self._valid: - self.log_info("Invalid Web Installation, aborting remote refresh") - return {} - repo = self.repo - if tag is not None: - resource = f"repos/{repo}/releases/tags/{tag}" - elif self.channel == Channel.STABLE: - resource = f"repos/{repo}/releases/latest" - else: - resource = f"repos/{repo}/releases?per_page=1" - client = self.cmd_helper.get_http_client() - resp = await client.github_api_request( - resource, attempts=3, retry_pause_time=.5 - ) - release: Union[List[Any], Dict[str, Any]] = {} - if resp.status_code == 304: - if resp.content: - # Not modified, however we need to restore state from - # cached content - release = resp.json() - else: - # Either not necessary or not possible to restore from cache - return {} - elif resp.has_error(): - self.log_info(f"Github Request Error - {resp.error}") - self.last_error = str(resp.error) - return {} - else: - release = resp.json() - result: Dict[str, Any] = {} - if isinstance(release, list): - if release: - result = release[0] - else: - result = release - self.last_error = "" - return result - - async def _get_remote_version(self) -> None: - result = await self._fetch_github_version() - if not result: - return - self.remote_version = result.get('name', "?") - release_asset: Dict[str, Any] = result.get('assets', [{}])[0] - dl_url: str = release_asset.get('browser_download_url', "?") - content_type: str = release_asset.get('content_type', "?") - size: int = release_asset.get('size', 0) - self.dl_info = (dl_url, content_type, size) - self._is_prerelease = result.get('prerelease', False) - - def get_persistent_data(self) -> Dict[str, Any]: - storage = super().get_persistent_data() - storage['version'] = self.version - storage['remote_version'] = self.remote_version - storage['rollback_version'] = self.rollback_version - storage['rollback_repo'] = self.rollback_repo - storage['dl_info'] = list(self.dl_info) - storage['last_error'] = self.last_error - return storage - - async def update( - self, rollback_info: Optional[Tuple[str, str, int]] = None - ) -> bool: - if not self._valid: - raise self.server.error( - f"Web Client {self.name}: Invalid install detected, aborting update" - ) - if rollback_info is not None: - dl_url, content_type, size = rollback_info - start_msg = f"Rolling Back Web Client {self.name}..." - else: - if self.remote_version == "?": - await self._get_remote_version() - if self.remote_version == "?": - raise self.server.error( - f"Client {self.repo}: Unable to locate update" - ) - dl_url, content_type, size = self.dl_info - if self.version == self.remote_version: - # Already up to date - return False - start_msg = f"Updating Web Client {self.name}..." - if dl_url == "?": - raise self.server.error(f"Client {self.repo}: Invalid download url") - current_version = self.version - event_loop = self.server.get_event_loop() - self.cmd_helper.notify_update_response(start_msg) - self.cmd_helper.notify_update_response(f"Downloading Client: {self.name}") - td = await self.cmd_helper.create_tempdir(self.name, "client") - try: - tempdir = pathlib.Path(td.name) - temp_download_file = tempdir.joinpath(f"{self.name}.zip") - temp_persist_dir = tempdir.joinpath(self.name) - client = self.cmd_helper.get_http_client() - await client.download_file( - dl_url, content_type, temp_download_file, size, - self.cmd_helper.on_download_progress) - self.cmd_helper.notify_update_response( - f"Download Complete, extracting release to '{self.path}'") - await event_loop.run_in_thread( - self._extract_release, temp_persist_dir, - temp_download_file) - finally: - await event_loop.run_in_thread(td.cleanup) - self.version = self.remote_version - await self._validate_client_info() - if self._valid and rollback_info is None: - self.rollback_version = current_version - self.rollback_repo = self.repo - msg = f"Client Update Finished: {self.name}" - if rollback_info is not None: - msg = f"Rollback Complete: {self.name}" - self.cmd_helper.notify_update_response(msg, is_complete=True) - self._log_client_info() - self._save_state() - return True - - async def rollback(self) -> bool: - if self.rollback_version == "?" or self.rollback_repo == "?": - raise self.server.error("Incomplete Rollback Data") - if self.rollback_version == self.version: - return False - result = await self._fetch_github_version( - self.rollback_repo, self.rollback_version - ) - if not result: - raise self.server.error("Failed to retrieve release asset data") - release_asset: Dict[str, Any] = result.get('assets', [{}])[0] - dl_url: str = release_asset.get('browser_download_url', "?") - content_type: str = release_asset.get('content_type', "?") - size: int = release_asset.get('size', 0) - dl_info = (dl_url, content_type, size) - return await self.update(dl_info) - - def _extract_release(self, - persist_dir: pathlib.Path, - release_file: pathlib.Path - ) -> None: - if not persist_dir.exists(): - os.mkdir(persist_dir) - if self.path.is_dir(): - # find and move persistent files - for fname in os.listdir(self.path): - src_path = self.path.joinpath(fname) - if fname in self.persistent_files: - dest_dir = persist_dir.joinpath(fname).parent - os.makedirs(dest_dir, exist_ok=True) - shutil.move(str(src_path), str(dest_dir)) - shutil.rmtree(self.path) - os.mkdir(self.path) - with zipfile.ZipFile(release_file) as zf: - zf.extractall(self.path) - # Move temporary files back into - for fname in os.listdir(persist_dir): - src_path = persist_dir.joinpath(fname) - dest_dir = self.path.joinpath(fname).parent - os.makedirs(dest_dir, exist_ok=True) - shutil.move(str(src_path), str(dest_dir)) - - def get_update_status(self) -> Dict[str, Any]: - return { - 'name': self.name, - 'owner': self.owner, - 'version': self.version, - 'remote_version': self.remote_version, - 'rollback_version': self.rollback_version, - 'configured_type': str(self.type), - 'channel': str(self.channel), - 'info_tags': self.info_tags, - 'last_error': self.last_error, - 'is_valid': self._valid, - 'warnings': self.warnings - } - def load_component(config: ConfigHelper) -> UpdateManager: return UpdateManager(config) diff --git a/moonraker/components/update_manager/web_deploy.py b/moonraker/components/update_manager/web_deploy.py new file mode 100644 index 0000000..1050664 --- /dev/null +++ b/moonraker/components/update_manager/web_deploy.py @@ -0,0 +1,391 @@ +# Provides updates for Klipper and Moonraker +# +# Copyright (C) 2020 Eric Callahan +# +# This file may be distributed under the terms of the GNU GPLv3 license. + +from __future__ import annotations +import os +import pathlib +import logging +import shutil +import zipfile +import json +from ...utils import source_info +from .common import AppType, Channel +from .base_deploy import BaseDeploy + +# Annotation imports +from typing import ( + TYPE_CHECKING, + Any, + Optional, + Tuple, + Union, + Dict, + List, + cast +) +if TYPE_CHECKING: + from ...confighelper import ConfigHelper + from ..file_manager.file_manager import FileManager + from .update_manager import CommandHelper + JsonType = Union[List[Any], Dict[str, Any]] + + +class WebClientDeploy(BaseDeploy): + def __init__(self, + config: ConfigHelper, + cmd_helper: CommandHelper + ) -> None: + super().__init__(config, cmd_helper, prefix="Web Client") + self.repo = config.get('repo').strip().strip("/") + self.owner, self.project_name = self.repo.split("/", 1) + self.path = pathlib.Path(config.get("path")).expanduser().resolve() + self.type = AppType.from_string(config.get('type')) + self.channel = Channel.from_string(config.get("channel", "stable")) + if self.channel == Channel.DEV: + self.server.add_warning( + f"Invalid Channel '{self.channel}' for config " + f"section [{config.get_name()}], type: {self.type}. " + f"Must be one of the following: stable, beta. " + f"Falling back to beta channel" + ) + self.channel = Channel.BETA + self.info_tags: List[str] = config.getlist("info_tags", []) + self.persistent_files: List[str] = [] + self.warnings: List[str] = [] + self.version: str = "?" + pfiles = config.getlist('persistent_files', None) + if pfiles is not None: + self.persistent_files = [pf.strip("/") for pf in pfiles] + if ".version" in self.persistent_files: + raise config.error( + "Invalid value for option 'persistent_files': " + "'.version' can not be persistent") + self._valid: bool = True + self._is_prerelease: bool = False + self._is_fallback: bool = False + self._path_writable: bool = False + + async def _validate_client_info(self) -> None: + self._valid = False + self._is_fallback = False + eventloop = self.server.get_event_loop() + self.warnings.clear() + repo_parent = source_info.find_git_repo(self.path) + homedir = pathlib.Path("~").expanduser() + if not self._path_writable: + self.warnings.append( + f"Location at option 'path: {self.path}' is not writable." + ) + elif not self.path.is_dir(): + self.warnings.append( + f"Location at option 'path: {self.path}' is not a directory." + ) + elif repo_parent is not None and repo_parent != homedir: + self.warnings.append( + f"Location at option 'path: {self.path}' is within a git repo. Found " + f".git folder at '{repo_parent.joinpath('.git')}'" + ) + else: + rinfo = self.path.joinpath("release_info.json") + if rinfo.is_file(): + try: + data = await eventloop.run_in_thread(rinfo.read_text) + uinfo: Dict[str, str] = json.loads(data) + project_name = uinfo["project_name"] + owner = uinfo["project_owner"] + self.version = uinfo["version"] + except Exception: + logging.exception("Failed to load release_info.json.") + else: + self._valid = True + detected_repo = f"{owner}/{project_name}" + if self.repo.lower() != detected_repo.lower(): + self.warnings.append( + f"Value at option 'repo: {self.repo}' does not match " + f"detected repo '{detected_repo}', falling back to " + "detected version." + ) + self.repo = detected_repo + self.owner = owner + self.project_name = project_name + else: + version_path = self.path.joinpath(".version") + if version_path.is_file(): + version = await eventloop.run_in_thread(version_path.read_text) + self.version = version.strip() + self._valid = await self._detect_fallback() + if not self._valid: + self.warnings.append("Failed to validate client installation") + if self.server.is_debug_enabled(): + self.log_info("Debug Enabled, overriding validity checks") + + async def _detect_fallback(self) -> bool: + fallback_defs = { + "mainsail": "mainsail-crew", + "fluidd": "fluidd-core" + } + for fname in ("manifest.json", "manifest.webmanifest"): + manifest = self.path.joinpath(fname) + eventloop = self.server.get_event_loop() + if manifest.is_file(): + try: + mtext = await eventloop.run_in_thread(manifest.read_text) + mdata: Dict[str, Any] = json.loads(mtext) + proj_name: str = mdata["name"].lower() + except Exception: + self.log_exc(f"Failed to load json from {manifest}") + continue + if proj_name in fallback_defs: + owner = fallback_defs[proj_name] + detected_repo = f"{owner}/{proj_name}" + if detected_repo != self.repo.lower(): + self.warnings.append( + f"Value at option 'repo: {self.repo}' does not match " + f"detected repo '{detected_repo}', falling back to " + "detected version." + ) + self.repo = detected_repo + self.owner = owner + self.project_name = proj_name + self._is_fallback = True + return True + return False + + async def initialize(self) -> Dict[str, Any]: + fm: FileManager = self.server.lookup_component("file_manager") + self._path_writable = not fm.check_reserved_path( + self.path, need_write=True, raise_error=False + ) + if self._path_writable: + fm.add_reserved_path(f"update_manager {self.name}", self.path) + await self._validate_client_info() + storage = await super().initialize() + if self.version == "?": + self.version = storage.get("version", "?") + self.remote_version: str = storage.get('remote_version', "?") + self.rollback_version: str = storage.get('rollback_version', self.version) + self.rollback_repo: str = storage.get( + 'rollback_repo', self.repo if self._valid else "?" + ) + self.last_error: str = storage.get('last_error', "") + dl_info: List[Any] = storage.get('dl_info', ["?", "?", 0]) + self.dl_info: Tuple[str, str, int] = cast( + Tuple[str, str, int], tuple(dl_info)) + if not self.needs_refresh(): + self._log_client_info() + return storage + + def _log_client_info(self) -> None: + warn_str = "" + if self.warnings: + warn_str = "\nWarnings:\n" + warn_str += "\n".join([f" {item}" for item in self.warnings]) + dl_url, content_type, size = self.dl_info + logging.info( + f"Web Client {self.name} Detected:\n" + f"Repo: {self.repo}\n" + f"Channel: {self.channel}\n" + f"Path: {self.path}\n" + f"Local Version: {self.version}\n" + f"Remote Version: {self.remote_version}\n" + f"Valid: {self._valid}\n" + f"Fallback Client Detected: {self._is_fallback}\n" + f"Pre-release: {self._is_prerelease}\n" + f"Download Url: {dl_url}\n" + f"Download Size: {size}\n" + f"Content Type: {content_type}\n" + f"Rollback Version: {self.rollback_version}\n" + f"Rollback Repo: {self.rollback_repo}" + f"{warn_str}" + ) + + async def refresh(self) -> None: + try: + if not self._valid: + await self._validate_client_info() + await self._get_remote_version() + except Exception: + logging.exception("Error Refreshing Client") + self._log_client_info() + self._save_state() + + async def _fetch_github_version( + self, repo: Optional[str] = None, tag: Optional[str] = None + ) -> Dict[str, Any]: + if repo is None: + if not self._valid: + self.log_info("Invalid Web Installation, aborting remote refresh") + return {} + repo = self.repo + if tag is not None: + resource = f"repos/{repo}/releases/tags/{tag}" + elif self.channel == Channel.STABLE: + resource = f"repos/{repo}/releases/latest" + else: + resource = f"repos/{repo}/releases?per_page=1" + client = self.cmd_helper.get_http_client() + resp = await client.github_api_request( + resource, attempts=3, retry_pause_time=.5 + ) + release: Union[List[Any], Dict[str, Any]] = {} + if resp.status_code == 304: + if resp.content: + # Not modified, however we need to restore state from + # cached content + release = resp.json() + else: + # Either not necessary or not possible to restore from cache + return {} + elif resp.has_error(): + self.log_info(f"Github Request Error - {resp.error}") + self.last_error = str(resp.error) + return {} + else: + release = resp.json() + result: Dict[str, Any] = {} + if isinstance(release, list): + if release: + result = release[0] + else: + result = release + self.last_error = "" + return result + + async def _get_remote_version(self) -> None: + result = await self._fetch_github_version() + if not result: + return + self.remote_version = result.get('name', "?") + release_asset: Dict[str, Any] = result.get('assets', [{}])[0] + dl_url: str = release_asset.get('browser_download_url', "?") + content_type: str = release_asset.get('content_type', "?") + size: int = release_asset.get('size', 0) + self.dl_info = (dl_url, content_type, size) + self._is_prerelease = result.get('prerelease', False) + + def get_persistent_data(self) -> Dict[str, Any]: + storage = super().get_persistent_data() + storage['version'] = self.version + storage['remote_version'] = self.remote_version + storage['rollback_version'] = self.rollback_version + storage['rollback_repo'] = self.rollback_repo + storage['dl_info'] = list(self.dl_info) + storage['last_error'] = self.last_error + return storage + + async def update( + self, rollback_info: Optional[Tuple[str, str, int]] = None + ) -> bool: + if not self._valid: + raise self.server.error( + f"Web Client {self.name}: Invalid install detected, aborting update" + ) + if rollback_info is not None: + dl_url, content_type, size = rollback_info + start_msg = f"Rolling Back Web Client {self.name}..." + else: + if self.remote_version == "?": + await self._get_remote_version() + if self.remote_version == "?": + raise self.server.error( + f"Client {self.repo}: Unable to locate update" + ) + dl_url, content_type, size = self.dl_info + if self.version == self.remote_version: + # Already up to date + return False + start_msg = f"Updating Web Client {self.name}..." + if dl_url == "?": + raise self.server.error(f"Client {self.repo}: Invalid download url") + current_version = self.version + event_loop = self.server.get_event_loop() + self.cmd_helper.notify_update_response(start_msg) + self.cmd_helper.notify_update_response(f"Downloading Client: {self.name}") + td = await self.cmd_helper.create_tempdir(self.name, "client") + try: + tempdir = pathlib.Path(td.name) + temp_download_file = tempdir.joinpath(f"{self.name}.zip") + temp_persist_dir = tempdir.joinpath(self.name) + client = self.cmd_helper.get_http_client() + await client.download_file( + dl_url, content_type, temp_download_file, size, + self.cmd_helper.on_download_progress) + self.cmd_helper.notify_update_response( + f"Download Complete, extracting release to '{self.path}'") + await event_loop.run_in_thread( + self._extract_release, temp_persist_dir, + temp_download_file) + finally: + await event_loop.run_in_thread(td.cleanup) + self.version = self.remote_version + await self._validate_client_info() + if self._valid and rollback_info is None: + self.rollback_version = current_version + self.rollback_repo = self.repo + msg = f"Client Update Finished: {self.name}" + if rollback_info is not None: + msg = f"Rollback Complete: {self.name}" + self.cmd_helper.notify_update_response(msg, is_complete=True) + self._log_client_info() + self._save_state() + return True + + async def rollback(self) -> bool: + if self.rollback_version == "?" or self.rollback_repo == "?": + raise self.server.error("Incomplete Rollback Data") + if self.rollback_version == self.version: + return False + result = await self._fetch_github_version( + self.rollback_repo, self.rollback_version + ) + if not result: + raise self.server.error("Failed to retrieve release asset data") + release_asset: Dict[str, Any] = result.get('assets', [{}])[0] + dl_url: str = release_asset.get('browser_download_url', "?") + content_type: str = release_asset.get('content_type', "?") + size: int = release_asset.get('size', 0) + dl_info = (dl_url, content_type, size) + return await self.update(dl_info) + + def _extract_release(self, + persist_dir: pathlib.Path, + release_file: pathlib.Path + ) -> None: + if not persist_dir.exists(): + os.mkdir(persist_dir) + if self.path.is_dir(): + # find and move persistent files + for fname in os.listdir(self.path): + src_path = self.path.joinpath(fname) + if fname in self.persistent_files: + dest_dir = persist_dir.joinpath(fname).parent + os.makedirs(dest_dir, exist_ok=True) + shutil.move(str(src_path), str(dest_dir)) + shutil.rmtree(self.path) + os.mkdir(self.path) + with zipfile.ZipFile(release_file) as zf: + zf.extractall(self.path) + # Move temporary files back into + for fname in os.listdir(persist_dir): + src_path = persist_dir.joinpath(fname) + dest_dir = self.path.joinpath(fname).parent + os.makedirs(dest_dir, exist_ok=True) + shutil.move(str(src_path), str(dest_dir)) + + def get_update_status(self) -> Dict[str, Any]: + return { + 'name': self.name, + 'owner': self.owner, + 'version': self.version, + 'remote_version': self.remote_version, + 'rollback_version': self.rollback_version, + 'configured_type': str(self.type), + 'channel': str(self.channel), + 'info_tags': self.info_tags, + 'last_error': self.last_error, + 'is_valid': self._valid, + 'warnings': self.warnings + }