update_manager: refactor GitUpdater

Move git command functionality to its own class outside of the Updater class.  This class is responsible for tracking repo state and executing commands on the repo.  Fetch and Pull no longer use built in command timeouts, instead a callback is scheduled to see if the command returned progress.  Only when no progress is returned will a fetch or pull be terminated after a timeout.

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Arksine 2021-03-09 12:45:53 -05:00
parent 9632465a78
commit df82730832
1 changed files with 349 additions and 148 deletions

View File

@ -447,10 +447,12 @@ class GitUpdater:
self.server = cmd_helper.get_server()
self.cmd_helper = cmd_helper
self.name = config.get_name().split()[-1]
self.owner = "?"
self.repo_path = path
if path is None:
self.repo_path = os.path.expanduser(config.get('path'))
path = os.path.expanduser(config.get('path'))
self.repo_path = path
self.repo = GitRepo(cmd_helper, path, self.name)
self.init_evt = Event()
self.debug = self.cmd_helper.is_debug_enabled()
self.env = config.get("env", env)
dist_packages = None
if self.env is not None:
@ -491,15 +493,6 @@ class GitUpdater:
raise config.error("Invalid path for option '%s': %s"
% (val, opt))
self.version = self.cur_hash = "?"
self.remote_version = self.remote_hash = "?"
self.init_evt = Event()
self.refresh_condition = None
self.debug = self.cmd_helper.is_debug_enabled()
self.remote = "origin"
self.branch = "master"
self.is_valid = self.is_dirty = self.detached = False
def _get_version_info(self):
ver_path = os.path.join(self.repo_path, "scripts/version.txt")
vinfo = {}
@ -516,7 +509,7 @@ class GitUpdater:
pass
else:
self._log_info(f"Version Info Found: {vinfo}")
vinfo['version'] = tuple(re.findall(r"\d+", self.version))
vinfo['version'] = self.repo.get_version()
return vinfo
def _log_exc(self, msg, traceback=True):
@ -544,145 +537,51 @@ class GitUpdater:
await self.init_evt.wait(timeout)
async def refresh(self):
if self.refresh_condition is None:
self.refresh_condition = Condition()
else:
self.refresh_condition.wait()
return
try:
await self._check_version()
await self._update_repo_state()
except Exception:
logging.exception("Error Refreshing git state")
self.init_evt.set()
self.refresh_condition.notify_all()
self.refresh_condition = None
async def _check_version(self, need_fetch=True):
self.is_valid = self.detached = False
self.cur_hash = self.branch = self.remote = "?"
self.version = self.remote_version = self.owner = "?"
try:
blist = await self.cmd_helper.run_cmd_with_response(
f"git -C {self.repo_path} branch --list")
if blist.startswith("fatal:"):
self._log_info(f"Invalid git repo at path '{self.repo_path}'")
return
branch = None
for b in blist.split("\n"):
b = b.strip()
if b[0] == "*":
branch = b[2:]
break
if branch is None:
self._log_info(
"Unable to retreive current branch from branch list\n"
f"{blist}")
return
if "HEAD detached" in branch:
bparts = branch.split()[-1].strip("()")
self.remote, self.branch = bparts.split("/")
self.detached = True
else:
self.branch = branch.strip()
self.remote = await self.cmd_helper.run_cmd_with_response(
f"git -C {self.repo_path} config --get"
f" branch.{self.branch}.remote")
if need_fetch:
env = {
'GIT_HTTP_LOW_SPEED_LIMIT': "1000",
'GIT_HTTP_LOW_SPEED_TIME ': "15"
}
await self.cmd_helper.run_cmd(
f"git -C {self.repo_path} fetch {self.remote} --prune -q",
timeout=20., retries=3, env=env)
remote_url = await self.cmd_helper.run_cmd_with_response(
f"git -C {self.repo_path} remote get-url {self.remote}")
cur_hash = await self.cmd_helper.run_cmd_with_response(
f"git -C {self.repo_path} rev-parse HEAD")
remote_hash = await self.cmd_helper.run_cmd_with_response(
f"git -C {self.repo_path} rev-parse "
f"{self.remote}/{self.branch}")
repo_version = await self.cmd_helper.run_cmd_with_response(
f"git -C {self.repo_path} describe --always "
"--tags --long --dirty")
remote_version = await self.cmd_helper.run_cmd_with_response(
f"git -C {self.repo_path} describe {self.remote}/{self.branch}"
" --always --tags --long")
except Exception:
self._log_exc("Error retreiving git info")
return
remote_url = remote_url.strip()
owner_match = re.match(r"https?://[^/]+/([^/]+)", remote_url)
if owner_match is not None:
self.owner = owner_match.group(1)
self.is_dirty = repo_version.endswith("dirty")
versions = []
for ver in [repo_version, remote_version]:
tag_version = "?"
ver_match = re.match(r"v\d+\.\d+\.\d-\d+", ver.strip())
if ver_match:
tag_version = ver_match.group()
versions.append(tag_version)
self.version, self.remote_version = versions
self.cur_hash = cur_hash.strip()
self.remote_hash = remote_hash.strip()
self._log_info(
f"Repo Detected:\nPath: {self.repo_path}\nRemote: {self.remote}\n"
f"Branch: {self.branch}\nRemote URL: {remote_url}\n"
f"Current SHA: {self.cur_hash}\n"
f"Remote SHA: {self.remote_hash}\nVersion: {self.version}\n"
f"Remote Version: {self.remote_version}\n"
f"Is Dirty: {self.is_dirty}\nIs Detached: {self.detached}")
if self.debug:
self.is_valid = True
self._log_info("Debug enabled, bypassing official repo check")
elif self.branch == "master" and self.remote == "origin":
if self.detached:
self._log_info("Detached HEAD detected, repo invalid")
return
remote_url = remote_url.lower()
if remote_url[-4:] != ".git":
remote_url += ".git"
if remote_url == self.origin:
self.is_valid = True
self._log_info("Validity check for git repo passed")
else:
self._log_info(f"Invalid git origin url '{remote_url}'")
else:
async def _update_repo_state(self, need_fetch=True):
self.is_valid = False
await self.repo.initialize(need_fetch=need_fetch)
invalids = self.repo.report_invalids(self.origin)
if invalids:
msgs = '\n'.join(invalids)
self._log_info(
"Git repo not on offical remote/branch: "
f"{self.remote}/{self.branch}")
f"Repo validation checks failed:\n{msgs}")
if self.debug:
self.is_valid = True
self._log_info(
"Repo debug enabled, overriding validity checks")
else:
self._log_info("Updates on repo disabled")
else:
self.is_valid = True
self._log_info("Validity check for git repo passed")
async def update(self, update_deps=False):
await self.check_initialized(20.)
if self.refresh_condition is not None:
self.refresh_condition.wait()
await self.repo.wait_for_init()
if not self.is_valid:
raise self._log_exc("Update aborted, repo is not valid", False)
if self.is_dirty:
raise self._log_exc("Update aborted, repo not valid", False)
if self.repo.is_dirty():
raise self._log_exc(
"Update aborted, repo is has been modified", False)
if self.remote_hash == self.cur_hash:
"Update aborted, repo has been modified", False)
if self.repo.is_current():
# No need to update
return
self._notify_status("Updating Repo...")
try:
env = {
'GIT_HTTP_LOW_SPEED_LIMIT': "1000",
'GIT_HTTP_LOW_SPEED_TIME ': "15"
}
if self.detached:
await self.cmd_helper.run_cmd(
f"git -C {self.repo_path} fetch {self.remote} -q",
timeout=20., retries=3, env=env)
await self.cmd_helper.run_cmd(
f"git -C {self.repo_path} checkout"
f" {self.remote}/{self.branch} -q")
if self.repo.is_detached():
await self.repo.fetch()
await self.repo.checkout()
else:
await self.cmd_helper.run_cmd(
f"git -C {self.repo_path} pull -q", timeout=20.,
retries=3, env=env)
await self.repo.pull()
# Prune stale refrences. Do this separately from pull or
# fetch to prevent a timeout during a prune
await self.repo.prune()
except Exception:
raise self._log_exc("Error running 'git pull'")
# Check Semantic Versions
@ -696,7 +595,7 @@ class GitUpdater:
elif need_env_rebuild:
await self._update_virtualenv(True)
# Refresh local repo state
await self._check_version(need_fetch=False)
await self._update_repo_state(need_fetch=False)
if self.name == "moonraker":
# Launch restart async so the request can return
# before the server restarts
@ -800,20 +699,322 @@ class GitUpdater:
raise self._log_exc("Error restarting service")
def get_update_status(self):
status = self.repo.get_repo_status()
status['is_valid'] = self.is_valid
status['debug_enabled'] = self.debug
return status
GIT_FETCH_TIMEOUT = 20.
GIT_FETCH_ENV_VARS = {
'GIT_HTTP_LOW_SPEED_LIMIT': "1000",
'GIT_HTTP_LOW_SPEED_TIME ': "15"
}
class GitRepo:
def __init__(self, cmd_helper, git_path, alias):
self.server = cmd_helper.get_server()
self.cmd_helper = cmd_helper
self.alias = alias
self.git_path = git_path
self.git_cmd = f"git -C {git_path}"
self.valid_git_repo = False
self.git_owner = "?"
self.git_remote = "?"
self.git_branch = "?"
self.current_version = "?"
self.upstream_version = "?"
self.current_commit = "?"
self.upstream_commit = "?"
self.upstream_url = "?"
self.branches = []
self.dirty = False
self.head_detached = False
self.init_condition = None
self.git_operation_lock = Lock()
self.fetch_timeout_handle = None
self.fetch_input_recd = False
async def initialize(self, need_fetch=True):
if self.init_condition is not None:
# No need to initialize multiple requests
await self.init_condition.wait()
return
self.init_condition = Condition()
try:
await self.update_repo_status()
self._verify_repo()
if not self.head_detached:
# lookup remote via git config
self.git_remote = await self.get_config_item(
f"branch.{self.git_branch}.remote")
# Populate list of current branches
blist = await self.list_branches()
self.branches = []
for branch in blist:
branch = branch.strip()
if branch[0] == "*":
branch = branch[2:]
if branch[0] == "(":
continue
self.branches.append(branch)
if need_fetch:
await self.fetch()
self.upstream_url = await self.remote("get-url")
self.current_commit = await self.rev_parse("HEAD")
self.upstream_commit = await self.rev_parse(
f"{self.git_remote}/{self.git_branch}")
current_version = await self.describe(
"--always --tags --long --dirty")
upstream_version = await self.describe(
f"{self.git_remote}/{self.git_branch} "
"--always --tags --long")
# Parse GitHub Owner from URL
owner_match = re.match(r"https?://[^/]+/([^/]+)", self.upstream_url)
self.git_owner = "?"
if owner_match is not None:
self.git_owner = owner_match.group(1)
self.dirty = current_version.endswith("dirty")
# Parse Version Info
versions = []
for ver in [current_version, upstream_version]:
tag_version = "?"
ver_match = re.match(r"v\d+\.\d+\.\d-\d+", ver)
if ver_match:
tag_version = ver_match.group()
versions.append(tag_version)
self.current_version, self.upstream_version = versions
self.log_repo_info()
except Exception:
logging.exception(f"Git Repo {self.alias}: Initialization failure")
raise
finally:
self.init_condition.notify_all()
self.init_condition = None
async def wait_for_init(self):
if self.init_condition is not None:
await self.init_condition.wait()
async def update_repo_status(self):
async with self.git_operation_lock:
try:
resp = await self.cmd_helper.run_cmd_with_response(
f"{self.git_cmd} status -u no")
except Exception:
self.valid_git_repo = False
return False
resp = resp.strip().split('\n', 1)[0]
if resp.startswith("fatal:"):
# Invalid repo
self.valid_git_repo = False
return False
self.head_detached = resp.startswith("HEAD detached")
branch_info = resp.split()[-1]
if self.head_detached:
bparts = branch_info.split("/", 1)
if len(bparts) == 2:
self.git_remote, self.git_branch = bparts
else:
if self.git_remote == "?":
msg = "Resolve by manually checking out" \
" a branch via SSH."
else:
msg = "Defaulting to previously tracked " \
f"{self.git_remote}/{self.git_branch}."
logging.info(
f"Git Repo {self.alias}: HEAD detached on untracked "
f"commit {branch_info}. {msg}")
else:
self.git_branch = branch_info
self.valid_git_repo = True
return True
def log_repo_info(self):
logging.info(
f"Git Repo {self.alias} Detected:\n"
f"Owner: {self.git_owner}\n"
f"Path: {self.git_path}\n"
f"Remote: {self.git_remote}\n"
f"Branch: {self.git_branch}\n"
f"Remote URL: {self.upstream_url}\n"
f"Current Commit SHA: {self.current_commit}\n"
f"Upstream Commit SHA: {self.upstream_commit}\n"
f"Current Version: {self.current_version}\n"
f"Upstream Version: {self.upstream_version}\n"
f"Is Dirty: {self.dirty}\n"
f"Is Detached: {self.head_detached}")
def report_invalids(self, valid_origin):
invalids = []
upstream_url = self.upstream_url.lower()
if upstream_url[-4:] != ".git":
upstream_url += ".git"
if upstream_url != valid_origin:
invalids.append(f"Unofficial remote url: {self.upstream_url}")
if self.git_branch != "master" or self.git_remote != "origin":
invalids.append(
"Repo not on default remote branch: "
f"{self.git_remote}/{self.git_branch}")
if self.head_detached:
invalids.append("Detached HEAD detected")
return invalids
def _verify_repo(self, check_remote=False):
if not self.valid_git_repo:
raise self.server.error(
f"Git Repo {self.alias}: '{self.git_path}' "
"not a git repository")
if check_remote:
if self.git_remote == "?":
raise self.server.error(
f"Git Repo {self.alias}: No valid git remote detected")
async def fetch(self):
self._verify_repo(check_remote=True)
async with self.git_operation_lock:
await self._do_fetch_pull(
f"{self.git_cmd} fetch {self.git_remote}")
async def pull(self):
self._verify_repo()
if self.head_detached:
raise self.server.error(
f"Git Repo {self.alias}: Cannot perform pull on a "
"detached HEAD")
async with self.git_operation_lock:
await self._do_fetch_pull(f"{self.git_cmd} pull")
async def list_branches(self):
self._verify_repo()
async with self.git_operation_lock:
resp = await self.cmd_helper.run_cmd_with_response(
f"{self.git_cmd} branch --list")
return resp.strip().split("\n")
async def remote(self, command):
self._verify_repo(check_remote=True)
async with self.git_operation_lock:
resp = await self.cmd_helper.run_cmd_with_response(
f"{self.git_cmd} remote {command} {self.git_remote}")
return resp.strip()
async def prune(self):
self._verify_repo(check_remote=True)
async with self.git_operation_lock:
await self.cmd_helper.run_cmd(
f"{self.git_cmd} remote prune {self.git_remote}",
timeout=30.)
async def describe(self, args=""):
self._verify_repo()
async with self.git_operation_lock:
resp = await self.cmd_helper.run_cmd_with_response(
f"{self.git_cmd} describe {args}".strip())
return resp.strip()
async def rev_parse(self, args=""):
self._verify_repo()
async with self.git_operation_lock:
resp = await self.cmd_helper.run_cmd_with_response(
f"{self.git_cmd} rev-parse {args}".strip())
return resp.strip()
async def get_config_item(self, item):
self._verify_repo()
async with self.git_operation_lock:
resp = await self.cmd_helper.run_cmd_with_response(
f"{self.git_cmd} config --get {item}")
return resp.strip()
async def checkout(self, branch=None):
self._verify_repo()
async with self.git_operation_lock:
branch = branch or f"{self.git_remote}/{self.git_branch}"
await self.cmd_helper.run_cmd_with_response(
f"{self.git_cmd} checkout {branch} -q")
def get_repo_status(self):
return {
'remote_alias': self.remote,
'branch': self.branch,
'owner': self.owner,
'version': self.version,
'remote_version': self.remote_version,
'current_hash': self.cur_hash,
'remote_hash': self.remote_hash,
'is_dirty': self.is_dirty,
'is_valid': self.is_valid,
'detached': self.detached,
'debug_enabled': self.debug
'remote_alias': self.git_remote,
'branch': self.git_branch,
'owner': self.git_owner,
'version': self.current_version,
'remote_version': self.upstream_version,
'current_hash': self.current_commit,
'remote_hash': self.upstream_commit,
'is_dirty': self.dirty,
'detached': self.head_detached
}
def get_version(self, upstream=False):
version = self.upstream_version if upstream else self.current_version
return tuple(re.findall(r"\d+", version))
def is_detached(self):
return self.head_detached
def is_dirty(self):
return self.dirty
def is_current(self):
return self.current_commit == self.upstream_commit
async def _do_fetch_pull(self, cmd, retries=5):
# Fetch and pull require special handling. If the request
# gets delayed we do not want to terminate it while the command
# is processing.
env = os.environ.copy()
env.update(GIT_FETCH_ENV_VARS)
scmd = self.cmd_helper.build_shell_command(
cmd, std_err_callback=self._handle_process_output,
env=env)
while retries:
ioloop = IOLoop.current()
self.fetch_input_recd = False
self.fetch_timeout_handle = ioloop.call_later(
GIT_FETCH_TIMEOUT, self._check_process_active, scmd)
try:
await scmd.run(timeout=0)
except Exception:
pass
ioloop.remove_timeout(self.fetch_timeout_handle)
ret = scmd.get_return_code()
if ret == 0:
return
retries -= 1
raise self.server.error(f"Git Command '{cmd}' failed")
def _handle_process_output(self, output):
self.fetch_input_recd = True
logging.debug(
f"Git Repo {self.alias}: Fetch/Pull Response\n"
f"{output.decode()}")
async def _check_process_active(self, scmd):
ret = scmd.get_return_code()
if ret is not None:
logging.debug(f"Git Repo {self.alias}: Fetch/Pull returned")
return
if self.fetch_input_recd:
# Received some input, reschedule timeout
logging.debug(
f"Git Repo {self.alias}: Fetch/Pull active, rescheduling")
ioloop = IOLoop.current()
self.fetch_input_recd = False
self.fetch_timeout_handle = ioloop.call_later(
GIT_FETCH_TIMEOUT, self._check_process_active, scmd)
else:
# Request has timed out with no input, terminate it
logging.debug(f"Git Repo {self.alias}: Fetch/Pull timed out")
await scmd.cancel()
class PackageUpdater:
def __init__(self, cmd_helper):