python_deploy: support for updating python apps

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2023-02-15 13:40:48 -05:00
parent 17dc05c9b7
commit b8921ca593
1 changed files with 416 additions and 0 deletions

View File

@ -0,0 +1,416 @@
# Python Package Update Deployment
#
# Copyright (C) 2024 Eric Callahan <arksine.code@gmail.com>
#
# This file may be distributed under the terms of the GNU GPLv3 license.
from __future__ import annotations
import asyncio
import re
import logging
from enum import Enum
from ...utils.source_info import normalize_project_name, load_distribution_info
from ...utils.versions import PyVersion, GitVersion
from ...utils import pip_utils, json_wrapper
from .app_deploy import AppDeploy, Channel, DISTRO_ALIASES
# Annotation imports
from typing import (
TYPE_CHECKING,
Any,
Optional,
Dict,
List,
cast
)
if TYPE_CHECKING:
from ...confighelper import ConfigHelper
from ...utils.source_info import PackageInfo
from ...components.file_manager.file_manager import FileManager
from .update_manager import CommandHelper
class PackageSource(Enum):
PIP = 0
GITHUB = 1
UNKNOWN = 2
class PythonDeploy(AppDeploy):
def __init__(self, config: ConfigHelper, cmd_helper: CommandHelper) -> None:
super().__init__(config, cmd_helper, "Python Package")
self._configure_virtualenv(config)
if self.virtualenv is None:
raise config.error(
f"[{config.get_name()}]: Option 'virtualenv' must specify a valid "
"the path to a Python virtualenv"
)
fm: FileManager = self.server.lookup_component("file_manager")
fm.add_reserved_path(f"update_manager {self.name}", self.virtualenv)
self._configure_managed_services(config)
self.primary_branch = config.get("primary_branch", None)
self.project_name = config.get("project_name", self.name)
self.source: PackageSource = PackageSource.UNKNOWN
self.repo_url: str = ""
self.repo_owner: str = ""
self.repo_name: str = ""
self.current_version: PyVersion = PyVersion("?")
self.git_version: GitVersion = GitVersion("?")
self.current_sha: str = "?"
self.upstream_version: PyVersion = self.current_version
self.upstream_sha: str = "?"
self.rollback_ref: str = "?"
self.warnings: List[str] = []
package_info = load_distribution_info(self.virtualenv, self.project_name)
self._detect_update_source(package_info)
self._update_current_version(package_info)
self.changelog: str = self._get_url(["changelog"], package_info)
self.system_deps = self._parse_system_dependencies(package_info)
self._is_valid = len(self.warnings) == 0
async def initialize(self) -> Dict[str, Any]:
storage = await super().initialize()
self.upstream_sha = storage.get("upstream_commit", "?")
self.upstream_version = PyVersion(storage.get("upstream_version", "?"))
self.rollback_ref = storage.get("rollback_ref", "?")
if not self.needs_refresh():
self._log_package_info()
return storage
def get_persistent_data(self) -> Dict[str, Any]:
storage = super().get_persistent_data()
storage["upstream_commit"] = self.upstream_sha
storage["upstream_version"] = self.upstream_version.full_version
storage["rollback_ref"] = self.rollback_ref
return storage
def get_update_status(self) -> Dict[str, Any]:
status = super().get_update_status()
status.update({
"detected_type": "python_package",
"branch": self.primary_branch,
"owner": self.repo_owner,
"repo_name": self.repo_name,
"version": self.current_version.short_version,
"remote_version": self.upstream_version.short_version,
"current_hash": self.current_sha,
"remote_hash": self.upstream_sha,
"is_dirty": self.git_version.dirty,
"changelog_url": self.changelog,
"full_version_string": self.current_version.full_version,
"pristine": not self.git_version.dirty,
"warnings": self.warnings
})
return status
def _add_warning(self, msg: str) -> None:
self.warnings.append(msg)
self.log_info(msg)
def _detect_update_source(self, package_info: PackageInfo) -> None:
self.source = PackageSource.UNKNOWN
direct_url_data = package_info.direct_url_data
if direct_url_data is None:
self.source = PackageSource.PIP
self.repo_url = self._get_url(["repository", "repo"], package_info)
return
self.log_debug(f"Direct URL info: {direct_url_data}")
vcs_info: Dict[str, str] = direct_url_data.get("vcs_info", {})
if vcs_info.get("vcs", "") != "git":
self._add_warning(
"Package installed from source other than pypi or git: "
f"{direct_url_data}"
)
return
try:
self.current_sha = vcs_info["commit_id"]
self.repo_url = direct_url_data["url"]
except KeyError:
self._add_warning("Failed to retrive direct_url vcs info")
return
url_match = re.match(
r"https://(?:www\.)?github\.com/(?P<owner>.+?)/(?P<proj>.+?)(?:\.git|$)",
self.repo_url, re.IGNORECASE
)
if url_match is None:
self._add_warning(f"Invalid repo url: {self.repo_url}")
return
self.source = PackageSource.GITHUB
self.repo_owner = url_match["owner"] or "?"
self.repo_name = url_match["proj"] or "?"
def _get_url(self, keys: List[str], package_info: PackageInfo) -> str:
release_info = package_info.release_info
primary = keys[0]
if release_info is not None:
urls: Dict[str, Any] = release_info.get("urls", {})
for name, url in urls.items():
if name.lower() in keys:
return url
self.log_debug(f"Unable to find {primary} url in release info")
# Fallback to Metadata
metadata = package_info.metadata
md_urls: Optional[List[str]] = metadata.get_all("Project-URL", None)
if md_urls is not None:
for url in md_urls:
key, url = url.split(",", maxsplit=1)
key = key.lower().strip()
if key in keys:
return url.strip()
self.log_info(f"Unable to find {primary} url in metadata")
return ""
def _update_current_version(self, package_info: PackageInfo) -> bool:
pkg_verson = ""
release_info = package_info.release_info
metadata = package_info.metadata
if release_info is not None:
self.current_sha = release_info.get("commit_sha", self.current_sha)
self.git_version = GitVersion(release_info.get("git_version", "?"))
pkg_verson = release_info.get("package_version", "")
if "Version" in metadata:
pkg_verson = metadata["Version"]
if not pkg_verson:
self._add_warning("Failed to detect package version")
return False
self.current_version = PyVersion(pkg_verson)
if not self.current_version.is_valid_version():
self._add_warning("Failed to parse package version")
return False
local = self.current_version.local
if self.current_sha == "?":
if self.source != PackageSource.GITHUB:
self.current_sha = "not-specified"
elif local:
self.current_sha = local[1:].split(".", 1)[0]
if not self.git_version.is_valid_version():
self.git_version = self.current_version.convert_to_git()
return True
def _parse_system_dependencies(self, package_info: PackageInfo) -> List[str]:
rinfo = package_info.release_info
if rinfo is None:
return []
dep_info = rinfo.get("system_dependencies", {})
for distro_id in DISTRO_ALIASES:
if distro_id in dep_info:
if not dep_info[distro_id]:
self.log_info(
f"Package release_info contains an empty system "
f"package definition for linux distro '{distro_id}'"
)
return dep_info[distro_id]
else:
self.log_info(
"Package release_info has no package definition "
f" for linux distro '{DISTRO_ALIASES[0]}'"
)
return []
async def _update_local_state(self) -> None:
self.warnings.clear()
eventloop = self.server.get_event_loop()
try:
assert self.virtualenv is not None
package_info = await eventloop.run_in_thread(
load_distribution_info, self.virtualenv, self.project_name
)
except self.server.error:
self._add_warning("Failed to parse package info")
else:
self.git_version = GitVersion("?")
self.current_sha = "?"
self.current_version = PyVersion("?")
self._detect_update_source(package_info)
self._update_current_version(package_info)
self.changelog = self._get_url(["changelog"], package_info)
self.system_deps = self._parse_system_dependencies(package_info)
self._is_valid = len(self.warnings) == 0
async def refresh(self) -> None:
try:
if self.source == PackageSource.PIP:
await self._refresh_pip()
elif self.source == PackageSource.GITHUB:
await self._refresh_github()
else:
self.log_info("Cannot refresh, package source is unknown")
except asyncio.CancelledError:
raise
except Exception:
self.log_exc(f"Error Refreshing Python Package: {self.name}")
self._log_package_info()
self._save_state()
async def _refresh_pip(self) -> None:
# Perform a dry-run install to see if an update is available.
# Curently this is the most reliable way to fetch the latest
# version from an index, as we can't assume configurations
# will use PyPI.
self.log_info("Requesting package info via PIP...")
norm_name = normalize_project_name(self.project_name)
assert self.pip_cmd is not None
pip_args = f"install -U --quiet --dry-run --no-deps --report - {norm_name}"
pip_exec = pip_utils.AsyncPipExecutor(self.pip_cmd, self.server)
resp = await pip_exec.call_pip_with_response(pip_args)
data: Dict[str, Any] = json_wrapper.loads(resp)
install_data: List[Dict[str, Any]] = data.get("install", [])
if not install_data:
# No update available
return
metadata: Dict[str, Any] = install_data[0].get("metadata", {})
name: str = normalize_project_name(metadata.get("name", ""))
if len(install_data) > 1 and name != norm_name:
for inst in install_data[1:]:
md: Dict[str, Any] = inst.get("metadata", {})
name = normalize_project_name(md.get("name", ""))
if name == norm_name:
metadata = md
break
else:
raise self.server.error("Failed to find metadata for package")
version: str = metadata.get("version", "?")
self.upstream_version = PyVersion(version)
if self.current_version < self.upstream_version:
self.upstream_sha = "update-available"
else:
self.upstream_sha = self.current_sha
async def _refresh_github(self) -> None:
repo = f"{self.repo_owner}/{self.repo_name}"
client = self.cmd_helper.get_http_client()
if self.channel == Channel.DEV:
resource = f"/repos/{repo}/commits?per_page=1"
if self.primary_branch is not None:
resource += f"&sha={self.primary_branch}"
resp = await client.github_api_request(
resource, attempts=3, retry_pause_time=.5
)
if resp.status_code != 304 and resp.has_error():
self.log_info(f"Github Request Error - {resp.error}")
return
commit_list: List[Dict[str, Any]] = cast(list, resp.json())
if not commit_list:
self.log_info("No commits found")
return
self.upstream_sha = commit_list[0]["sha"]
self.upstream_version = self.current_version
if self.upstream_sha != self.current_sha:
local_part = f"g{self.upstream_sha[:8]}"
bumped = self.current_version.bump_local_version(local_part)
self.upstream_version = bumped
return
if self.channel == Channel.STABLE:
resource = f"repos/{repo}/releases/latest"
else:
resource = f"repos/{repo}/releases?per_page=1"
resp = await client.github_api_request(
resource, attempts=3, retry_pause_time=.5
)
if resp.status_code != 304 and resp.has_error():
self.log_info(f"Github Request Error - {resp.error}")
return
release = resp.json()
result: Dict[str, Any] = {}
if isinstance(release, list):
if release:
result = release[0]
else:
result = release
if not result:
self.log_info("No releases found")
self.upstream_sha = self.current_sha
self.upstream_version = self.current_version
return
self.upstream_version = PyVersion(result["tag_name"])
if self.upstream_version > self.current_version:
self.upstream_sha = "update-available"
async def update(self, rollback: bool = False) -> bool:
project_name = normalize_project_name(self.project_name)
assert self.pip_cmd is not None
pip_args: str
if not self.upstream_version.is_valid_version():
# Can't update without a valid upstream
return False
pip_exec = pip_utils.AsyncPipExecutor(
self.pip_cmd, self.server, self.cmd_helper.notify_update_response
)
current_ref = self.current_version.tag
if self.source == PackageSource.PIP:
# We can't depend on the SHA being available for PyPI packages,
# so we must compare versions
if (
self.current_version.is_valid_version() and
self.upstream_version <= self.current_version
):
return False
pip_args = f"install -U {project_name}"
if rollback:
pip_args += f"=={self.rollback_ref}"
elif self.source == PackageSource.GITHUB:
if self.current_sha == self.upstream_sha:
return False
repo = f"{self.repo_owner}/{self.repo_name}"
pip_args = f"install -U git+https://github.com/{repo}"
if rollback:
pip_args += f"@{self.rollback_ref}"
elif self.channel == "dev":
current_ref = self.current_sha
if self.primary_branch is not None:
pip_args += f"@{self.primary_branch}"
else:
pip_args += f"@{self.upstream_version.tag}"
else:
raise self.server.error("Cannot update, package source is unknown")
await self._update_pip(pip_exec)
sys_deps = self.system_deps
source = self.source.name
self.notify_status(f"Updating Python Package {self.name} from {source}...")
await pip_exec.call_pip(pip_args, 3600, sys_env_vars=self.pip_env_vars)
await self._update_local_state()
if not rollback:
self.rollback_ref = current_ref
self.upstream_sha = self.current_sha
self.upstream_version = self.current_version
await self._update_sys_deps(sys_deps)
self._log_package_info()
self._save_state()
await self.restart_service()
self.notify_status("Update Finished...", is_complete=True)
return True
async def recover(
self, hard: bool = False, force_dep_update: bool = False
) -> None:
pass
async def rollback(self) -> bool:
if self.rollback_ref == "?":
return False
await self.update(rollback=True)
return True
async def _update_sys_deps(self, prev_deps: List[str]) -> None:
new_deps = self.system_deps
deps_diff = list(set(new_deps) - set(prev_deps))
if deps_diff:
await self._install_packages(deps_diff)
def _log_package_info(self) -> None:
logging.info(
f"Python Package {self.name} detected:\n"
f"Channel: {self.channel}\n"
f"Package Source: {self.source.name}\n"
f"Repo Owner: {self.repo_owner}\n"
f"Repo Name: {self.repo_name}\n"
f"Repo URL: {self.repo_url}\n"
f"Changelog URL: {self.changelog}\n"
f"Full Version String: {self.current_version.full_version}\n"
f"Current Version: {self.current_version.short_version}\n"
f"Current Commit SHA: {self.current_sha}\n"
f"Upstream Version: {self.upstream_version.short_version}\n"
f"Upstream Commit SHA: {self.upstream_sha}\n"
f"Converted Git Version: {self.git_version}\n"
f"Rollback Ref: {self.rollback_ref}\n"
)