zip_deploy: bring up for usage

This performs a significant refactor to the ZipDeploy class, making it near identiical to WebClientDeploy.  Zipped applications have the same
"release_info" requirement as web clients.  Unlike web clients, they may also
configure the dependency and service options available to git repos.

The ZipDeploy class can also support we clients, eliminating duplicate code
and the need to keep web_deploy.py.

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2024-01-18 20:04:25 -05:00
parent daad786072
commit 55454a300e
No known key found for this signature in database
GPG Key ID: 5A1EB336DFB4C71B
1 changed files with 341 additions and 345 deletions

View File

@ -1,19 +1,17 @@
# Zip Application Deployment implementation
#
# Copyright (C) 2021 Eric Callahan <arksine.code@gmail.com>
# Copyright (C) 2024 Eric Callahan <arksine.code@gmail.com>
#
# This file may be distributed under the terms of the GNU GPLv3 license.
from __future__ import annotations
import os
import pathlib
import shutil
import re
import time
import zipfile
import logging
from .app_deploy import AppDeploy
from .common import Channel
from ...utils import verify_source
from .common import Channel, AppType
from ...utils import source_info
from ...utils import json_wrapper as jsonw
# Annotation imports
@ -24,403 +22,401 @@ from typing import (
Optional,
Dict,
List,
Union,
cast
)
if TYPE_CHECKING:
from ...confighelper import ConfigHelper
from .update_manager import CommandHelper
RINFO_KEYS = [
"git_version", "long_version", "commit_hash", "source_checksum",
"ignored_exts", "ignored_dirs", "build_date", "channel",
"owner_repo", "host_repo", "release_tag"
]
from ..file_manager.file_manager import FileManager
class ZipDeploy(AppDeploy):
def __init__(self, config: ConfigHelper, cmd_helper: CommandHelper) -> None:
super().__init__(config, cmd_helper, "Zip Dist")
self._configure_path(config)
self._configure_virtualenv(config)
self._configure_dependencies(config, node_only=True)
self.origin: str = config.get('origin')
self.official_repo: str = "?"
self.owner: str = "?"
# Extract repo from origin for validation
match = re.match(r"https?://(?:www\.)?github.com/([^/]+/[^.]+)",
self.origin)
if match is not None:
self.official_repo = match.group(1)
self.owner = self.official_repo.split('/')[0]
def __init__(
self,
config: ConfigHelper,
cmd_helper: CommandHelper
) -> None:
super().__init__(config, cmd_helper, "Zip Application")
self._configure_path(config, False)
if self.type == AppType.ZIP:
self._configure_virtualenv(config)
self._configure_dependencies(config)
elif self.type == AppType.WEB:
self.prefix = f"Web Client {self.name}: "
self.repo = config.get('repo').strip().strip("/")
self.owner, self.project_name = self.repo.split("/", 1)
self.persistent_files: List[str] = []
self.warnings: List[str] = []
self.anomalies: List[str] = []
self.version: str = "?"
self.remote_version: str = "?"
self.rollback_version: str = "?"
self.rollback_repo: str = "?"
self.last_error: str = "?"
self._dl_info: Tuple[str, str, int] = ("?", "?", 0)
self._is_fallback: bool = False
self._is_prerelease: bool = False
self._path_writable: bool = False
self._configure_persistent_files(config)
def _configure_persistent_files(self, config: ConfigHelper) -> None:
pfiles = config.getlist('persistent_files', None)
if pfiles is not None:
self.persistent_files = [pf.strip("/") for pf in pfiles]
for fname in (".version", "release_info.json"):
if fname in self.persistent_files:
raise config.error(
"Invalid value for option 'persistent_files': "
f"'{fname}' can not be persistent."
)
if (
self.type == AppType.ZIP and
self.virtualenv is not None and
self.virtualenv in self.path.parents
):
rel_path = str(self.virtualenv.relative_to(self.path))
if rel_path not in self.persistent_files:
self.persistent_files.append(rel_path)
if self.persistent_files:
self.log_info(f"Configured persistent files: {self.persistent_files}")
async def _validate_release_info(self) -> None:
self._is_valid = False
self._is_fallback = False
eventloop = self.server.get_event_loop()
self.warnings.clear()
repo_parent = source_info.find_git_repo(self.path)
homedir = pathlib.Path("~").expanduser()
if not self._path_writable:
self.warnings.append(
f"Location at option 'path: {self.path}' is not writable."
)
elif not self.path.is_dir():
self.warnings.append(
f"Location at option 'path: {self.path}' is not a directory."
)
elif repo_parent is not None and repo_parent != homedir:
self.warnings.append(
f"Location at option 'path: {self.path}' is within a git repo. Found "
f".git folder at '{repo_parent.joinpath('.git')}'"
)
else:
raise config.error(
"Invalid url set for 'origin' option in section "
f"[{config.get_name()}]. Unable to extract owner/repo.")
self.host_repo: str = config.get('host_repo', self.official_repo)
self.package_list: List[str] = []
self.python_pkg_list: List[str] = []
self.release_download_info: Tuple[str, str, int] = ("?", "?", 0)
rinfo = self.path.joinpath("release_info.json")
if rinfo.is_file():
try:
data = await eventloop.run_in_thread(rinfo.read_text)
uinfo: Dict[str, str] = jsonw.loads(data)
project_name = uinfo["project_name"]
owner = uinfo["project_owner"]
self.version = uinfo["version"]
except Exception:
logging.exception("Failed to load release_info.json.")
else:
self._is_valid = True
detected_repo = f"{owner}/{project_name}"
if self.repo.lower() != detected_repo.lower():
self.anomalies.append(
f"Value at option 'repo: {self.repo}' does not match "
f"detected repo '{detected_repo}', falling back to "
"detected version."
)
self.repo = detected_repo
self.owner = owner
self.project_name = project_name
elif self.type == AppType.WEB:
version_path = self.path.joinpath(".version")
if version_path.is_file():
version = await eventloop.run_in_thread(version_path.read_text)
self.version = version.strip()
self._is_valid = await self._detect_fallback()
if not self._is_valid:
self.warnings.append("Failed to validate installation")
if self.server.is_debug_enabled():
self.log_info("Debug Enabled, overriding validity checks")
async def _detect_fallback(self) -> bool:
# Only used by "web" app types to fallback on the previous version info
fallback_defs = {
"mainsail": "mainsail-crew",
"fluidd": "fluidd-core"
}
for fname in ("manifest.json", "manifest.webmanifest"):
manifest = self.path.joinpath(fname)
eventloop = self.server.get_event_loop()
if manifest.is_file():
try:
mtext = await eventloop.run_in_thread(manifest.read_text)
mdata: Dict[str, Any] = jsonw.loads(mtext)
proj_name: str = mdata["name"].lower()
except Exception:
self.log_exc(f"Failed to load json from {manifest}")
continue
if proj_name in fallback_defs:
owner = fallback_defs[proj_name]
detected_repo = f"{owner}/{proj_name}"
if detected_repo != self.repo.lower():
self.anomalies.append(
f"Value at option 'repo: {self.repo}' does not match "
f"detected repo '{detected_repo}', falling back to "
"detected version."
)
self.repo = detected_repo
self.owner = owner
self.project_name = proj_name
self._is_fallback = True
return True
return False
async def initialize(self) -> Dict[str, Any]:
storage = await super().initialize()
self.source_checksum: str = storage.get("source_checksum", "?")
self.pristine = storage.get('pristine', False)
self.verified = storage.get('verified', False)
self.build_date: int = storage.get('build_date', 0)
self.full_version: str = storage.get('full_version', "?")
self.short_version: str = storage.get('short_version', "?")
self.commit_hash: str = storage.get('commit_hash', "?")
self.lastest_hash: str = storage.get('latest_hash', "?")
self.latest_version: str = storage.get('latest_version', "?")
self.latest_checksum: str = storage.get('latest_checksum', "?")
self.latest_build_date: int = storage.get('latest_build_date', 0)
self.errors: List[str] = storage.get('errors', [])
self.commit_log: List[Dict[str, Any]] = storage.get('commit_log', [])
fm: FileManager = self.server.lookup_component("file_manager")
self._path_writable = not fm.check_reserved_path(
self.path, need_write=True, raise_error=False
)
if self._path_writable and not self.path.joinpath(".writeable").is_file():
fm.add_reserved_path(f"update_manager {self.name}", self.path)
await self._validate_release_info()
if self.version == "?":
self.version = storage.get("version", "?")
self.remote_version = storage.get('remote_version', "?")
self.rollback_version = storage.get('rollback_version', self.version)
self.rollback_repo = storage.get(
'rollback_repo', self.repo if self._is_valid else "?"
)
self.last_error = storage.get('last_error', "")
dl_info: List[Any] = storage.get('dl_info', ["?", "?", 0])
self.dl_info = cast(Tuple[str, str, int], tuple(dl_info))
if not self.needs_refresh():
self._log_zipapp_info()
return storage
def get_persistent_data(self) -> Dict[str, Any]:
storage = super().get_persistent_data()
storage.update({
'source_checksum': self.source_checksum,
'pristine': self.pristine,
'verified': self.verified,
'build_date': self.build_date,
'full_version': self.full_version,
'short_version': self.short_version,
'commit_hash': self.commit_hash,
'latest_hash': self.lastest_hash,
'latest_version': self.latest_version,
'latest_checksum': self.latest_checksum,
'latest_build_date': self.latest_build_date,
'commit_log': self.commit_log,
'errors': self.errors
"version": self.version,
"remote_version": self.remote_version,
"rollback_version": self.rollback_version,
"rollback_repo": self.rollback_repo,
"dl_info": list(self.dl_info),
"last_error": self.last_error
})
return storage
async def _parse_info_file(self, file_name: str) -> Dict[str, Any]:
info_file = self.path.joinpath(file_name)
if not info_file.exists():
self.log_info(f"Unable to locate file '{info_file}'")
return {}
try:
event_loop = self.server.get_event_loop()
info_bytes = await event_loop.run_in_thread(info_file.read_text)
info: Dict[str, Any] = jsonw.loads(info_bytes)
except Exception:
self.log_exc(f"Unable to parse info file {file_name}")
info = {}
return info
def _get_tag_version(self, version_string: str) -> str:
tag_version: str = "?"
ver_match = re.match(r"v\d+\.\d+\.\d-\d+", version_string)
if ver_match:
tag_version = ver_match.group()
return tag_version
async def refresh(self) -> None:
try:
await self._update_repo_state()
await self._validate_release_info()
await self._get_remote_version()
except Exception:
self.verified = False
self.log_exc("Error refreshing application state")
async def _update_repo_state(self) -> None:
self.errors = []
self._is_valid = False
self.verified = False
release_info = await self._parse_info_file(".release_info")
dep_info = await self._parse_info_file(".dependencies")
for key in RINFO_KEYS:
if key not in release_info:
self._add_error(f"Missing release info item: {key}")
self.full_version = release_info.get('long_version', "?")
self.short_version = self._get_tag_version(
release_info.get('git_version', ""))
self.commit_hash = release_info.get('commit_hash', "?")
self.build_date = release_info.get('build_date', 0)
owner_repo = release_info.get('owner_repo', "?")
if self.official_repo != owner_repo:
self._add_error(
f"Owner repo mismatch. Received {owner_repo}, "
f"official: {self.official_repo}")
# validate the local source code
event_loop = self.server.get_event_loop()
res = await event_loop.run_in_thread(verify_source, self.path)
if res is not None:
self.source_checksum, self.pristine = res
if self.name in ["moonraker", "klipper"]:
self.server.add_log_rollover_item(
f"{self.name}_validation",
f"{self.name} checksum: {self.source_checksum}, "
f"pristine: {self.pristine}")
else:
self._add_error("Unable to validate source checksum")
self.source_checksum = ""
self.pristine = False
self.package_list = sorted(dep_info.get(
'debian', {}).get('packages', []))
self.python_pkg_list = sorted(dep_info.get('python', []))
# Retrieve version info from github to check for updates and
# validate local release info
host_repo = release_info.get('host_repo', "?")
release_tag = release_info.get('release_tag', "?")
if host_repo != self.host_repo:
self._add_error(
f"Host repo mismatch, received: {host_repo}, "
f"expected: {self.host_repo}. This could result in "
" a failed update.")
resource = f"repos/{self.host_repo}/releases"
current_release, latest_release = await self._fetch_github_releases(
resource, release_tag)
await self._validate_current_release(release_info, current_release)
if not self.errors:
self.verified = True
await self._process_latest_release(latest_release)
self._save_state()
logging.exception("Error Refreshing Client")
self._log_zipapp_info()
self._save_state()
async def _fetch_github_releases(self,
resource: str,
current_tag: Optional[str] = None
) -> Tuple[Dict[str, Any], Dict[str, Any]]:
try:
client = self.cmd_helper.get_http_client()
resp = await client.github_api_request(resource, attempts=3)
resp.raise_for_status()
releases = resp.json()
assert isinstance(releases, list)
except Exception:
self.log_exc("Error fetching releases from GitHub")
return {}, {}
release: Dict[str, Any]
latest_release: Dict[str, Any] = {}
current_release: Dict[str, Any] = {}
for release in releases:
if not latest_release:
if self.channel != Channel.STABLE:
# Allow the beta channel to update regardless
latest_release = release
elif not release['prerelease']:
# This is a stable release on the stable channle
latest_release = release
if current_tag is not None:
if not current_release and release['tag_name'] == current_tag:
current_release = release
if latest_release and current_release:
break
elif latest_release:
break
return current_release, latest_release
async def _validate_current_release(self,
release_info: Dict[str, Any],
release: Dict[str, Any]
) -> None:
if not release:
self._add_error("Unable to find current release on GitHub")
return
asset_info = self._get_asset_urls(release, ["RELEASE_INFO"])
if "RELEASE_INFO" not in asset_info:
self._add_error(
"RELEASE_INFO not found in current release assets")
info_url, content_type, size = asset_info['RELEASE_INFO']
async def _fetch_github_version(
self, repo: Optional[str] = None, tag: Optional[str] = None
) -> Dict[str, Any]:
if repo is None:
if not self._is_valid:
self.log_info("Invalid Installation, aborting remote refresh")
return {}
repo = self.repo
if tag is not None:
resource = f"repos/{repo}/releases/tags/{tag}"
elif self.channel == Channel.STABLE:
resource = f"repos/{repo}/releases/latest"
else:
resource = f"repos/{repo}/releases?per_page=1"
client = self.cmd_helper.get_http_client()
rinfo_bytes = await client.get_file(info_url, content_type)
github_rinfo: Dict[str, Any] = jsonw.loads(rinfo_bytes)
if github_rinfo.get(self.name, {}) != release_info:
self._add_error(
"Local release info does not match the remote")
resp = await client.github_api_request(
resource, attempts=3, retry_pause_time=.5
)
release: Union[List[Any], Dict[str, Any]] = {}
if resp.status_code == 304:
if resp.content:
# Not modified, however we need to restore state from
# cached content
release = resp.json()
else:
# Either not necessary or not possible to restore from cache
return {}
elif resp.has_error():
self.log_info(f"Github Request Error - {resp.error}")
self.last_error = str(resp.error)
return {}
else:
self.log_info("Current Release Info Validated")
release = resp.json()
result: Dict[str, Any] = {}
if isinstance(release, list):
if release:
result = release[0]
else:
result = release
self.last_error = ""
return result
async def _process_latest_release(self, release: Dict[str, Any]):
if not release:
self._add_error("Unable to find latest release on GitHub")
async def _get_remote_version(self) -> None:
result = await self._fetch_github_version()
if not result:
return
zip_file_name = f"{self.name}.zip"
asset_names = ["RELEASE_INFO", "COMMIT_LOG", zip_file_name]
asset_info = self._get_asset_urls(release, asset_names)
if "RELEASE_INFO" in asset_info:
asset_url, content_type, size = asset_info['RELEASE_INFO']
client = self.cmd_helper.get_http_client()
rinfo_bytes = await client.get_file(asset_url, content_type)
update_release_info: Dict[str, Any] = jsonw.loads(rinfo_bytes)
update_info = update_release_info.get(self.name, {})
self.lastest_hash = update_info.get('commit_hash', "?")
self.latest_checksum = update_info.get('source_checksum', "?")
self.latest_version = self._get_tag_version(
update_info.get('git_version', "?"))
self.latest_build_date = update_info.get('build_date', 0)
else:
self._add_error(
"RELEASE_INFO not found in latest release assets")
self.commit_log = []
if self.short_version != self.latest_version:
# Only report commit log if versions change
if "COMMIT_LOG" in asset_info:
asset_url, content_type, size = asset_info['COMMIT_LOG']
client = self.cmd_helper.get_http_client()
commit_bytes = await client.get_file(asset_url, content_type)
commit_info: Dict[str, Any] = jsonw.loads(commit_bytes)
self.commit_log = commit_info.get(self.name, [])
if zip_file_name in asset_info:
self.release_download_info = asset_info[zip_file_name]
self._is_valid = True
else:
self.release_download_info = ("?", "?", 0)
self._add_error(f"Release asset {zip_file_name} not found")
def _get_asset_urls(self,
release: Dict[str, Any],
filenames: List[str]
) -> Dict[str, Tuple[str, str, int]]:
asset_info: Dict[str, Tuple[str, str, int]] = {}
asset: Dict[str, Any]
for asset in release.get('assets', []):
name = asset['name']
if name in filenames:
rinfo_url = asset['browser_download_url']
content_type = asset['content_type']
size = asset['size']
asset_info[name] = (rinfo_url, content_type, size)
filenames.remove(name)
if not filenames:
break
return asset_info
def _add_error(self, warning: str):
self.log_info(warning)
self.errors.append(warning)
self.remote_version = result.get('name', "?")
release_asset: Dict[str, Any] = result.get('assets', [{}])[0]
dl_url: str = release_asset.get('browser_download_url', "?")
content_type: str = release_asset.get('content_type', "?")
size: int = release_asset.get('size', 0)
self.dl_info = (dl_url, content_type, size)
self._is_prerelease = result.get('prerelease', False)
def _log_zipapp_info(self):
warn_str = ""
if self.warnings or self.anomalies:
warn_str = "\nWarnings:\n"
warn_str += "\n".join(
[f" {item}" for item in self.warnings + self.anomalies]
)
dl_url, content_type, size = self.dl_info
self.log_info(
"\nZip Application Distribution Detected\n"
f" Valid: {self._is_valid}\n"
f" Verified: {self.verified}\n"
f" Channel: {self.channel}\n"
f" Repo: {self.official_repo}\n"
f" Path: {self.path}\n"
f" Pristine: {self.pristine}\n"
f" Commits Behind: {len(self.commit_log)}\n"
f"Current Release Info:\n"
f" Source Checksum: {self.source_checksum}\n"
f" Commit SHA: {self.commit_hash}\n"
f" Long Version: {self.full_version}\n"
f" Short Version: {self.short_version}\n"
f" Build Date: {time.ctime(self.build_date)}\n"
f"Latest Available Release Info:\n"
f" Source Checksum: {self.latest_checksum}\n"
f" Commit SHA: {self.lastest_hash}\n"
f" Version: {self.latest_version}\n"
f" Build Date: {time.ctime(self.latest_build_date)}\n"
f" Download URL: {self.release_download_info[0]}\n"
f" Content Type: {self.release_download_info[1]}\n"
f" Download Size: {self.release_download_info[2]}"
f"Detected\n"
f"Repo: {self.repo}\n"
f"Channel: {self.channel}\n"
f"Path: {self.path}\n"
f"Local Version: {self.version}\n"
f"Remote Version: {self.remote_version}\n"
f"Valid: {self._is_valid}\n"
f"Fallback Detected: {self._is_fallback}\n"
f"Pre-release: {self._is_prerelease}\n"
f"Download Url: {dl_url}\n"
f"Download Size: {size}\n"
f"Content Type: {content_type}\n"
f"Rollback Version: {self.rollback_version}\n"
f"Rollback Repo: {self.rollback_repo}"
f"{warn_str}"
)
async def _update_dependencies(self,
npm_hash,
force: bool = False
) -> None:
new_deps = await self._parse_info_file('.dependencies')
system_pkgs = sorted(
new_deps.get('debian', {}).get('packages', []))
python_pkgs = sorted(new_deps.get('python', []))
if system_pkgs:
if force or system_pkgs != self.package_list:
await self._install_packages(system_pkgs)
if python_pkgs:
if force or python_pkgs != self.python_pkg_list:
await self._update_python_requirements(python_pkgs)
ret = await self._check_need_update(npm_hash, self.npm_pkg_json)
if force or ret:
if self.npm_pkg_json is not None:
self.notify_status("Updating Node Packages...")
try:
await self.cmd_helper.run_cmd(
"npm ci --only=prod", notify=True, timeout=600.,
cwd=str(self.path))
except Exception:
self.notify_status("Node Package Update failed")
def _extract_release(self, release_zip: pathlib.Path) -> None:
def _extract_release(
self, persist_dir: pathlib.Path, release_file: pathlib.Path
) -> None:
if not persist_dir.exists():
persist_dir.mkdir()
if self.path.is_dir():
# find and move persistent files
for src_path in self.path.iterdir():
fname = src_path.name
if fname in self.persistent_files:
dest_path = persist_dir.joinpath(fname)
dest_dir = dest_path.parent
dest_dir.mkdir(parents=True, exist_ok=True)
shutil.move(str(src_path), str(dest_path))
shutil.rmtree(self.path)
os.mkdir(self.path)
with zipfile.ZipFile(release_zip) as zf:
self.path.mkdir()
with zipfile.ZipFile(release_file) as zf:
for zip_entry in zf.filelist:
dest = pathlib.Path(zf.extract(zip_entry, str(self.path)))
dest.chmod((zip_entry.external_attr >> 16) & 0o777)
# Move temporary files back into
for src_path in persist_dir.iterdir():
dest_path = self.path.joinpath(src_path.name)
dest_dir = dest_path.parent
dest_dir.mkdir(parents=True, exist_ok=True)
shutil.move(str(src_path), str(dest_path))
async def update(self, force_dep_update: bool = False) -> bool:
async def update(
self,
rollback_info: Optional[Tuple[str, str, int]] = None,
is_recover: bool = False,
force_dep_update: bool = False
) -> bool:
if not self._is_valid:
raise self.log_exc("Update aborted, repo not valid", False)
if self.short_version == self.latest_version:
# already up to date
return False
self.cmd_helper.notify_update_response(
f"Updating Application {self.name}...")
npm_hash = await self._get_file_hash(self.npm_pkg_json)
dl_url, content_type, size = self.release_download_info
self.notify_status("Starting Download...")
raise self.server.error(
f"{self.prefix}Invalid install detected, aborting update"
)
if rollback_info is not None:
dl_url, content_type, size = rollback_info
start_msg = "Rolling Back..." if not is_recover else "Recovering..."
else:
if self.remote_version == "?":
await self._get_remote_version()
if self.remote_version == "?":
raise self.server.error(
f"{self.prefix}Unable to locate update"
)
dl_url, content_type, size = self.dl_info
if self.version == self.remote_version:
# Already up to date
return False
start_msg = "Updating..."
if dl_url == "?":
raise self.server.error(f"{self.prefix}Invalid download url")
current_version = self.version
event_loop = self.server.get_event_loop()
self.notify_status(start_msg)
self.notify_status("Downloading Release...")
dep_info: Optional[Dict[str, Any]] = None
if self.type == AppType.ZIP:
dep_info = await self._collect_dependency_info()
td = await self.cmd_helper.create_tempdir(self.name, "app")
try:
tempdir = pathlib.Path(td.name)
temp_download_file = tempdir.joinpath(f"{self.name}.zip")
temp_persist_dir = tempdir.joinpath(self.name)
client = self.cmd_helper.get_http_client()
await client.download_file(
dl_url, content_type, temp_download_file, size,
self.cmd_helper.on_download_progress)
self.cmd_helper.on_download_progress
)
self.notify_status(
f"Download Complete, extracting release to '{self.path}'")
event_loop = self.server.get_event_loop()
f"Download Complete, extracting release to '{self.path}'"
)
await event_loop.run_in_thread(
self._extract_release, temp_download_file)
self._extract_release, temp_persist_dir, temp_download_file
)
finally:
await event_loop.run_in_thread(td.cleanup)
await self._update_dependencies(npm_hash, force=force_dep_update)
await self._update_repo_state()
if dep_info is not None:
await self._update_dependencies(dep_info, force_dep_update)
self.version = self.remote_version
await self._validate_release_info()
if self._is_valid and rollback_info is None:
self.rollback_version = current_version
self.rollback_repo = self.repo
self._log_zipapp_info()
self._save_state()
await self.restart_service()
self.notify_status("Update Finished...", is_complete=True)
msg = "Update Finished..." if rollback_info is None else "Rollback Complete"
self.notify_status(msg, is_complete=True)
return True
async def recover(self,
hard: bool = False,
force_dep_update: bool = False
) -> None:
res = f"repos/{self.host_repo}/releases"
releases = await self._fetch_github_releases(res)
await self._process_latest_release(releases[1])
await self.update(force_dep_update=force_dep_update)
async def recover(
self, hard: bool = False, force_dep_update: bool = False
) -> None:
await self.update(self.dl_info, True, force_dep_update)
async def reinstall(self) -> None:
# Clear the persistent storage prior to a channel swap.
# After the next update is complete new data will be
# restored.
umdb = self.cmd_helper.get_umdb()
await umdb.pop(self.name, None)
await self.initialize()
await self.recover(force_dep_update=True)
async def rollback(self) -> bool:
if self.rollback_version == "?" or self.rollback_repo == "?":
raise self.server.error("Incomplete Rollback Data", False)
if self.rollback_version == self.version:
return False
result = await self._fetch_github_version(
self.rollback_repo, self.rollback_version
)
if not result:
raise self.server.error("Failed to retrieve release asset data")
release_asset: Dict[str, Any] = result.get('assets', [{}])[0]
dl_url: str = release_asset.get('browser_download_url', "?")
content_type: str = release_asset.get('content_type', "?")
size: int = release_asset.get('size', 0)
dl_info = (dl_url, content_type, size)
return await self.update(dl_info)
def get_update_status(self) -> Dict[str, Any]:
status = super().get_update_status()
# XXX - Currently this reports status matching
# that of the git repo so as to not break existing
# client functionality. In the future it would be
# good to report values that are specifc
status.update({
'detected_type': "zip",
'remote_alias': "origin",
'branch': "master",
'name': self.name,
'repo_name': self.project_name,
'owner': self.owner,
'version': self.short_version,
'remote_version': self.latest_version,
'current_hash': self.commit_hash,
'remote_hash': self.lastest_hash,
'is_dirty': False,
'detached': not self.verified,
'commits_behind': self.commit_log,
'git_messages': self.errors,
'full_version_string': self.full_version,
'pristine': self.pristine,
'version': self.version,
'remote_version': self.remote_version,
'rollback_version': self.rollback_version,
'last_error': self.last_error,
'warnings': self.warnings,
'anomalies': self.anomalies
})
return status