git_deploy: use versions utility

Use the newly implemented versions utility for git version parsing.
This allows for simple access to version information and
version comparison.

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2023-07-04 17:49:46 -04:00
parent bdd1d93708
commit 1a4480e74c
No known key found for this signature in database
GPG Key ID: 5A1EB336DFB4C71B
1 changed files with 55 additions and 94 deletions

View File

@ -13,6 +13,7 @@ import re
import logging
from .app_deploy import AppDeploy
from .common import Channel
from ...utils.versions import GitVersion
# Annotation imports
from typing import (
@ -242,7 +243,6 @@ GIT_REF_FMT = (
)
class GitRepo:
tag_r = re.compile(r"(v?\d+(?:\.\d+){1,2}(-(alpha|beta)(\.\d+)?)?)(-\d+)?")
def __init__(self,
cmd_helper: CommandHelper,
git_path: pathlib.Path,
@ -288,27 +288,26 @@ class GitRepo:
self.git_repo_name: str = storage.get('git_repo_name', "?")
self.git_remote: str = storage.get('git_remote', "?")
self.git_branch: str = storage.get('git_branch', "?")
self.current_version: str = storage.get('current_version', "?")
self.upstream_version: str = storage.get('upstream_version', "?")
if "full_version_string" in storage:
self.current_version = GitVersion(storage["full_version_string"])
else:
self.current_version = GitVersion(storage.get('current_version', "?"))
self.upstream_version = GitVersion(storage.get('upstream_version', "?"))
self.current_commit: str = storage.get('current_commit', "?")
self.upstream_commit: str = storage.get('upstream_commit', "?")
self.rollback_commit: str = storage.get('rollback_commit', self.current_commit)
self.rollback_branch: str = storage.get('rollback_branch', self.git_branch)
self.rollback_version: str = storage.get(
'rollback_version', self.current_version
)
rbv = storage.get('rollback_version', self.current_version)
self.rollback_version = GitVersion(str(rbv))
self.upstream_url: str = storage.get('upstream_url', "?")
self.recovery_url: str = storage.get(
'recovery_url',
self.upstream_url if self.git_remote == "origin" else "?"
)
self.full_version_string: str = storage.get('full_version_string', "?")
self.branches: List[str] = storage.get('branches', [])
self.dirty: bool = storage.get('dirty', False)
self.head_detached: bool = storage.get('head_detached', False)
self.git_messages: List[str] = storage.get('git_messages', [])
self.commits_behind: List[Dict[str, Any]] = storage.get(
'commits_behind', [])
self.commits_behind: List[Dict[str, Any]] = storage.get('commits_behind', [])
self.diverged: bool = storage.get("diverged", False)
self.repo_corrupt: bool = storage.get('corrupt', False)
self._check_warnings()
@ -320,18 +319,16 @@ class GitRepo:
'git_repo_name': self.git_repo_name,
'git_remote': self.git_remote,
'git_branch': self.git_branch,
'current_version': self.current_version,
'upstream_version': self.upstream_version,
'current_version': self.current_version.full_version,
'upstream_version': self.upstream_version.full_version,
'current_commit': self.current_commit,
'upstream_commit': self.upstream_commit,
'rollback_commit': self.rollback_commit,
'rollback_branch': self.rollback_branch,
'rollback_version': self.rollback_version,
'rollback_version': self.rollback_version.full_version,
'upstream_url': self.upstream_url,
'recovery_url': self.recovery_url,
'full_version_string': self.full_version_string,
'branches': self.branches,
'dirty': self.dirty,
'head_detached': self.head_detached,
'git_messages': self.git_messages,
'commits_behind': self.commits_behind,
@ -403,14 +400,12 @@ class GitRepo:
if repo_match is not None:
self.git_repo_name = repo_match.group(1)
self.current_commit = await self.rev_parse("HEAD")
git_desc = await self.describe(
"--always --tags --long --dirty")
self.full_version_string = git_desc.strip()
self.dirty = git_desc.endswith("dirty")
git_desc = await self.describe("--always --tags --long --dirty --abbrev=8")
cur_ver = GitVersion(git_desc.strip())
if self.channel != Channel.DEV:
await self._get_beta_versions(git_desc)
await self._get_beta_versions(cur_ver)
else:
await self._get_dev_versions(git_desc)
await self._get_dev_versions(cur_ver)
# Get Commits Behind
self.commits_behind = []
@ -491,49 +486,37 @@ class GitRepo:
logging.debug(f"Move Request Failed: {resp.error}")
return moved
async def _get_dev_versions(self, current_version: str) -> None:
async def _get_dev_versions(self, current_version: GitVersion) -> None:
self.upstream_commit = await self.rev_parse(
f"{self.git_remote}/{self.git_branch}")
upstream_version = await self.describe(
f"{self.git_remote}/{self.git_branch}"
"--always --tags --long")
)
upstream_ver_str = await self.describe(
f"{self.git_remote}/{self.git_branch} --always --tags --long --abbrev=8"
)
upstream_version = GitVersion(upstream_ver_str)
# Get the latest tag as a fallback for shallow clones
commit, tag = await self._parse_latest_tag()
# Parse Version Info
versions: List[str] = []
for ver in [current_version, upstream_version]:
tag_version = "?"
ver_match = self.tag_r.match(ver)
if ver_match:
tag_version = ver_match.group()
elif tag != "?":
if len(versions) == 0:
# Check for shallow clones
if current_version.is_fallback() and tag != "?":
count = await self.rev_list(f"{tag}..HEAD --count")
full_ver = f"{tag}-{count}-g{ver}-shallow"
self.full_version_string = full_ver
else:
count = await self.rev_list(
f"{tag}..{self.upstream_commit} --count")
tag_version = f"{tag}-{count}"
versions.append(tag_version)
self.current_version, self.upstream_version = versions
current_version = GitVersion(f"{tag}-{count}-g{current_version}-shallow")
if not upstream_version.is_valid_version() and tag != "?":
count = await self.rev_list(f"{tag}..{self.upstream_commit} --count")
upstream_version = GitVersion(f"{tag}-{count}")
self.current_version = current_version
self.upstream_version = upstream_version
async def _get_beta_versions(self, current_version: str) -> None:
async def _get_beta_versions(self, current_version: GitVersion) -> None:
upstream_commit, upstream_tag = await self._parse_latest_tag()
ver_match = self.tag_r.match(current_version)
current_tag = "?"
if ver_match:
current_tag = ver_match.group(1)
elif upstream_tag != "?":
if current_version.is_fallback() and upstream_tag != "?":
count = await self.rev_list(f"{upstream_tag}..HEAD --count")
full_ver = f"{upstream_tag}-{count}-g{current_version}-shallow"
self.full_version_string = full_ver
current_tag = upstream_tag
ver_string = f"{upstream_tag}-{count}-g{current_version}-shallow"
current_version = GitVersion(ver_string)
self.upstream_commit = upstream_commit
if current_tag == upstream_tag:
if self.current_version.tag == upstream_tag or upstream_tag == "?":
self.upstream_commit = self.current_commit
self.current_version = current_tag
self.upstream_version = upstream_tag
self.current_version = current_version
self.upstream_version = GitVersion(upstream_tag)
async def _parse_latest_tag(self) -> Tuple[str, str]:
commit = tag = "?"
@ -542,15 +525,12 @@ class GitRepo:
if not commit:
return "?", "?"
tag = await self.describe(f"--tags {commit}")
except asyncio.CancelledError:
raise
except Exception:
pass
else:
tag_match = self.tag_r.match(tag)
if tag_match is not None:
tag = tag_match.group(1)
else:
tag = "?"
return commit, tag
return "?", "?"
version = GitVersion(tag)
return commit, version.tag
async def wait_for_init(self) -> None:
if self.init_evt is not None:
@ -654,7 +634,7 @@ class GitRepo:
f"Rollback Commit: {self.rollback_commit}\n"
f"Rollback Branch: {self.rollback_branch}\n"
f"Rollback Version: {self.rollback_version}\n"
f"Is Dirty: {self.dirty}\n"
f"Is Dirty: {self.current_version.dirty}\n"
f"Is Detached: {self.head_detached}\n"
f"Commits Behind: {len(self.commits_behind)}\n"
f"Diverged: {self.diverged}"
@ -831,7 +811,7 @@ class GitRepo:
"version": self.current_version
}
def set_rollback_state(self, rb_state: Optional[Dict[str, str]]) -> None:
def set_rollback_state(self, rb_state: Optional[Dict[str, Any]]) -> None:
if rb_state is None:
self.rollback_commit = self.current_commit
if self.head_detached:
@ -893,52 +873,33 @@ class GitRepo:
'repo_name': self.git_repo_name,
'remote_url': self.upstream_url,
'recovery_url': self.recovery_url,
'version': self.current_version,
'remote_version': self.upstream_version,
'rollback_version': self.rollback_version,
'version': self.current_version.short_version,
'remote_version': self.upstream_version.short_version,
'rollback_version': self.rollback_version.short_version,
'current_hash': self.current_commit,
'remote_hash': self.upstream_commit,
'is_dirty': self.dirty,
'is_dirty': self.current_version.dirty,
'detached': self.head_detached,
'commits_behind': self.commits_behind,
'git_messages': self.git_messages,
'full_version_string': self.full_version_string,
'pristine': not self.dirty,
'full_version_string': self.current_version.full_version,
'pristine': not self.current_version.dirty,
'corrupt': self.repo_corrupt,
'warnings': self.repo_warnings
}
def get_version(self, upstream: bool = False) -> Tuple[Any, ...]:
version = self.upstream_version if upstream else self.current_version
return tuple(re.findall(r"\d+", version))
def get_version(self, upstream: bool = False) -> GitVersion:
return self.upstream_version if upstream else self.current_version
def is_detached(self) -> bool:
return self.head_detached
def is_dirty(self) -> bool:
return self.dirty
return self.current_version.dirty
def is_current(self) -> bool:
return self.current_commit == self.upstream_commit
def _convert_semver(self, version: str) -> List[int]:
ver_match = self.tag_r.match(version)
if ver_match is None:
return []
try:
tag = ver_match.group(1)
core = tag.split("-")[0]
if core[0] == "v":
core = core[1:]
base_ver = [int(part) for part in core.split(".")]
while len(base_ver) < 3:
base_ver.append(0)
base_ver.append({"alpha": 0, "beta": 1}.get(ver_match.group(3), 2))
base_ver.append(int(ver_match.group(5)[1:]))
except Exception:
return []
return base_ver
async def _check_lock_file_exists(self, remove: bool = False) -> bool:
lock_path = self.git_path.joinpath(".git/index.lock")
if lock_path.is_file():