update_manager: improve corrupt repo detection

It that "git status" will not detect some repo issues, these are only
found after a fetch.  When this condition is detected save the repo
state and report that the repo is corrupt and invalid.

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2022-10-20 10:34:01 -04:00
parent 74f43cad6e
commit e4a670a380
No known key found for this signature in database
GPG Key ID: 5A1EB336DFB4C71B
2 changed files with 76 additions and 34 deletions

View File

@ -160,6 +160,13 @@ class GitDeploy(AppDeploy):
else: else:
await self.repo.pull() await self.repo.pull()
except Exception: except Exception:
if self.repo.repo_corrupt:
self._is_valid = False
self._save_state()
event_loop = self.server.get_event_loop()
event_loop.delay_callback(
.2, self.cmd_helper.notify_update_refreshed
)
raise self.log_exc("Error updating git repo") raise self.log_exc("Error updating git repo")
async def _update_dependencies(self, async def _update_dependencies(self,
@ -219,7 +226,6 @@ GIT_MAX_LOG_CNT = 100
GIT_LOG_FMT = ( GIT_LOG_FMT = (
"\"sha:%H%x1Dauthor:%an%x1Ddate:%ct%x1Dsubject:%s%x1Dmessage:%b%x1E\"" "\"sha:%H%x1Dauthor:%an%x1Ddate:%ct%x1Dsubject:%s%x1Dmessage:%b%x1E\""
) )
GIT_OBJ_ERR = "fatal: loose object"
GIT_REF_FMT = ( GIT_REF_FMT = (
"'%(if)%(*objecttype)%(then)%(*objecttype) (*objectname)" "'%(if)%(*objecttype)%(then)%(*objecttype) (*objectname)"
"%(else)%(objecttype) %(objectname)%(end) %(refname)'" "%(else)%(objecttype) %(objectname)%(end) %(refname)'"
@ -288,6 +294,7 @@ class GitRepo:
self.repo_verified: bool = storage.get( self.repo_verified: bool = storage.get(
"verified", storage.get("is_valid", False) "verified", storage.get("is_valid", False)
) )
self.repo_corrupt: bool = storage.get('corrupt', False)
def get_persistent_data(self) -> Dict[str, Any]: def get_persistent_data(self) -> Dict[str, Any]:
return { return {
@ -309,7 +316,8 @@ class GitRepo:
'commits_behind': self.commits_behind, 'commits_behind': self.commits_behind,
'tag_data': self.tag_data, 'tag_data': self.tag_data,
'diverged': self.diverged, 'diverged': self.diverged,
'verified': self.repo_verified 'verified': self.repo_verified,
'corrupt': self.repo_corrupt
} }
async def initialize(self, need_fetch: bool = True) -> None: async def initialize(self, need_fetch: bool = True) -> None:
@ -562,13 +570,15 @@ class GitRepo:
async def update_repo_status(self) -> bool: async def update_repo_status(self) -> bool:
async with self.git_operation_lock: async with self.git_operation_lock:
self.valid_git_repo = False
if self.repo_corrupt:
return False
if not self.git_path.joinpath(".git").exists(): if not self.git_path.joinpath(".git").exists():
logging.info( logging.info(
f"Git Repo {self.alias}: path '{self.git_path}'" f"Git Repo {self.alias}: path '{self.git_path}'"
" is not a valid git repo") " is not a valid git repo")
return False return False
await self._wait_for_lock_release() await self._wait_for_lock_release()
self.valid_git_repo = False
retries = 3 retries = 3
while retries: while retries:
self.git_messages.clear() self.git_messages.clear()
@ -579,9 +589,8 @@ class GitRepo:
retries -= 1 retries -= 1
resp = None resp = None
# Attempt to recover from "loose object" error # Attempt to recover from "loose object" error
if retries and GIT_OBJ_ERR in "\n".join(self.git_messages): if retries and self.repo_corrupt:
ret = await self._repair_loose_objects() if not await self._repair_loose_objects():
if not ret:
# Since we are unable to recover, immediately # Since we are unable to recover, immediately
# return # return
return False return False
@ -620,10 +629,19 @@ class GitRepo:
"merge-base --is-ancestor HEAD " "merge-base --is-ancestor HEAD "
f"{self.git_remote}/{self.git_branch}" f"{self.git_remote}/{self.git_branch}"
) )
try: for _ in range(3):
await self._run_git_cmd(cmd, retries=1) try:
except self.cmd_helper.scmd_error: await self._run_git_cmd(
return True cmd, retries=1, corrupt_msg="error: "
)
except self.cmd_helper.scmd_error as err:
if err.return_code == 1:
return True
if self.repo_corrupt:
raise
else:
break
await asyncio.sleep(.5)
return False return False
def log_repo_info(self) -> None: def log_repo_info(self) -> None:
@ -684,6 +702,7 @@ class GitRepo:
if self.is_beta: if self.is_beta:
reset_cmd = f"reset --hard {self.upstream_commit}" reset_cmd = f"reset --hard {self.upstream_commit}"
await self._run_git_cmd(reset_cmd, retries=2) await self._run_git_cmd(reset_cmd, retries=2)
self.repo_corrupt = False
async def fetch(self) -> None: async def fetch(self) -> None:
self._verify_repo(check_remote=True) self._verify_repo(check_remote=True)
@ -784,6 +803,7 @@ class GitRepo:
await event_loop.run_in_thread(shutil.rmtree, self.git_path) await event_loop.run_in_thread(shutil.rmtree, self.git_path)
await event_loop.run_in_thread( await event_loop.run_in_thread(
shutil.move, str(self.backup_path), str(self.git_path)) shutil.move, str(self.backup_path), str(self.git_path))
self.repo_corrupt = False
self.cmd_helper.notify_update_response( self.cmd_helper.notify_update_response(
f"Git Repo {self.alias}: Git Clone Complete") f"Git Repo {self.alias}: Git Clone Complete")
@ -865,7 +885,8 @@ class GitRepo:
'commits_behind': self.commits_behind, 'commits_behind': self.commits_behind,
'git_messages': self.git_messages, 'git_messages': self.git_messages,
'full_version_string': self.full_version_string, 'full_version_string': self.full_version_string,
'pristine': not self.dirty 'pristine': not self.dirty,
'corrupt': self.repo_corrupt
} }
def get_version(self, upstream: bool = False) -> Tuple[Any, ...]: def get_version(self, upstream: bool = False) -> Tuple[Any, ...]:
@ -947,6 +968,7 @@ class GitRepo:
return False return False
if notify: if notify:
self.cmd_helper.notify_update_response("Loose objects repaired") self.cmd_helper.notify_update_response("Loose objects repaired")
self.repo_corrupt = False
return True return True
async def _run_git_cmd_async(self, async def _run_git_cmd_async(self,
@ -984,11 +1006,13 @@ class GitRepo:
if ret == 0: if ret == 0:
self.git_messages.clear() self.git_messages.clear()
return return
elif fix_loose: elif self.repo_corrupt and fix_loose:
if GIT_OBJ_ERR in "\n".join(self.git_messages): if await self._repair_loose_objects(notify=True):
ret = await self._repair_loose_objects(notify=True) # Only attempt to repair loose objects once. Re-run
if ret: # the command once.
break fix_loose = False
retries = 2
else:
# since the attept to repair failed, bypass retries # since the attept to repair failed, bypass retries
# and immediately raise an exception # and immediately raise an exception
raise self.server.error( raise self.server.error(
@ -1002,6 +1026,8 @@ class GitRepo:
self.fetch_input_recd = True self.fetch_input_recd = True
out = output.decode().strip() out = output.decode().strip()
if out: if out:
if out.startswith("fatal: "):
self.repo_corrupt = True
self.git_messages.append(out) self.git_messages.append(out)
self.cmd_helper.notify_update_response(out) self.cmd_helper.notify_update_response(out)
logging.debug( logging.debug(
@ -1034,7 +1060,8 @@ class GitRepo:
git_args: str, git_args: str,
timeout: float = 20., timeout: float = 20.,
retries: int = 5, retries: int = 5,
env: Optional[Dict[str, str]] = None env: Optional[Dict[str, str]] = None,
corrupt_msg: str = "fatal: "
) -> str: ) -> str:
try: try:
return await self.cmd_helper.run_cmd_with_response( return await self.cmd_helper.run_cmd_with_response(
@ -1043,8 +1070,16 @@ class GitRepo:
except self.cmd_helper.scmd_error as e: except self.cmd_helper.scmd_error as e:
stdout = e.stdout.decode().strip() stdout = e.stdout.decode().strip()
stderr = e.stderr.decode().strip() stderr = e.stderr.decode().strip()
msg_lines: List[str] = []
if stdout: if stdout:
msg_lines.extend(stdout.split("\n"))
self.git_messages.append(stdout) self.git_messages.append(stdout)
if stderr: if stderr:
msg_lines.extend(stdout.split("\n"))
self.git_messages.append(stderr) self.git_messages.append(stderr)
for line in msg_lines:
line = line.strip().lower()
if line.startswith(corrupt_msg):
self.repo_corrupt = True
break
raise raise

View File

@ -26,6 +26,7 @@ from typing import (
TYPE_CHECKING, TYPE_CHECKING,
Any, Any,
Awaitable, Awaitable,
Callable,
Optional, Optional,
Set, Set,
Tuple, Tuple,
@ -76,7 +77,7 @@ class UpdateManager:
config, self.channel config, self.channel
) )
auto_refresh_enabled = config.getboolean('enable_auto_refresh', False) auto_refresh_enabled = config.getboolean('enable_auto_refresh', False)
self.cmd_helper = CommandHelper(config) self.cmd_helper = CommandHelper(config, self.get_updaters)
self.updaters: Dict[str, BaseDeploy] = {} self.updaters: Dict[str, BaseDeploy] = {}
if config.getboolean('enable_system_updates', True): if config.getboolean('enable_system_updates', True):
self.updaters['system'] = PackageDeploy(config, self.cmd_helper) self.updaters['system'] = PackageDeploy(config, self.cmd_helper)
@ -165,6 +166,9 @@ class UpdateManager:
self.server.register_event_handler( self.server.register_event_handler(
"server:klippy_identified", self._set_klipper_repo) "server:klippy_identified", self._set_klipper_repo)
def get_updaters(self) -> Dict[str, BaseDeploy]:
return self.updaters
async def component_init(self) -> None: async def component_init(self) -> None:
# Prune stale data from the database # Prune stale data from the database
umdb = self.cmd_helper.get_umdb() umdb = self.cmd_helper.get_umdb()
@ -218,13 +222,7 @@ class UpdateManager:
await self.updaters['klipper'].initialize() await self.updaters['klipper'].initialize()
await self.updaters['klipper'].refresh() await self.updaters['klipper'].refresh()
if notify: if notify:
vinfo: Dict[str, Any] = {} self.cmd_helper.notify_update_refreshed()
for name, updater in self.updaters.items():
vinfo[name] = updater.get_update_status()
uinfo = self.cmd_helper.get_rate_limit_stats()
uinfo['version_info'] = vinfo
uinfo['busy'] = self.cmd_helper.is_update_busy()
self.server.send_event("update_manager:update_refreshed", uinfo)
async def _check_klippy_printing(self) -> bool: async def _check_klippy_printing(self) -> bool:
kapi: APIComp = self.server.lookup_component('klippy_apis') kapi: APIComp = self.server.lookup_component('klippy_apis')
@ -243,7 +241,6 @@ class UpdateManager:
# Don't Refresh during a print # Don't Refresh during a print
logging.info("Klippy is printing, auto refresh aborted") logging.info("Klippy is printing, auto refresh aborted")
return eventtime + UPDATE_REFRESH_INTERVAL return eventtime + UPDATE_REFRESH_INTERVAL
vinfo: Dict[str, Any] = {}
need_notify = False need_notify = False
machine: Machine = self.server.lookup_component("machine") machine: Machine = self.server.lookup_component("machine")
if machine.validation_enabled(): if machine.validation_enabled():
@ -259,17 +256,13 @@ class UpdateManager:
if updater.needs_refresh(): if updater.needs_refresh():
await updater.refresh() await updater.refresh()
need_notify = True need_notify = True
vinfo[name] = updater.get_update_status()
except Exception: except Exception:
logging.exception("Unable to Refresh Status") logging.exception("Unable to Refresh Status")
return eventtime + UPDATE_REFRESH_INTERVAL return eventtime + UPDATE_REFRESH_INTERVAL
finally: finally:
self.initial_refresh_complete = True self.initial_refresh_complete = True
if need_notify: if need_notify:
uinfo = self.cmd_helper.get_rate_limit_stats() self.cmd_helper.notify_update_refreshed()
uinfo['version_info'] = vinfo
uinfo['busy'] = self.cmd_helper.is_update_busy()
self.server.send_event("update_manager:update_refreshed", uinfo)
return eventtime + UPDATE_REFRESH_INTERVAL return eventtime + UPDATE_REFRESH_INTERVAL
async def _handle_update_request(self, async def _handle_update_request(self,
@ -437,8 +430,8 @@ class UpdateManager:
if check_refresh: if check_refresh:
event_loop = self.server.get_event_loop() event_loop = self.server.get_event_loop()
event_loop.delay_callback( event_loop.delay_callback(
.2, self.server.send_event, .2, self.cmd_helper.notify_update_refreshed
"update_manager:update_refreshed", ret) )
return ret return ret
async def _handle_repo_recovery(self, async def _handle_repo_recovery(self,
@ -474,8 +467,13 @@ class UpdateManager:
self.refresh_timer.stop() self.refresh_timer.stop()
class CommandHelper: class CommandHelper:
def __init__(self, config: ConfigHelper) -> None: def __init__(
self,
config: ConfigHelper,
get_updater_cb: Callable[[], Dict[str, BaseDeploy]]
) -> None:
self.server = config.get_server() self.server = config.get_server()
self.get_updaters = get_updater_cb
self.http_client: HttpClient self.http_client: HttpClient
self.http_client = self.server.lookup_component("http_client") self.http_client = self.server.lookup_component("http_client")
config.getboolean('enable_repo_debug', False, deprecate=True) config.getboolean('enable_repo_debug', False, deprecate=True)
@ -588,6 +586,15 @@ class CommandHelper:
sig_idx=sig_idx) sig_idx=sig_idx)
return result return result
def notify_update_refreshed(self):
vinfo: Dict[str, Any] = {}
for name, updater in self.get_updaters().items():
vinfo[name] = updater.get_update_status()
uinfo = self.get_rate_limit_stats()
uinfo['version_info'] = vinfo
uinfo['busy'] = self.is_update_busy()
self.server.send_event("update_manager:update_refreshed", uinfo)
def notify_update_response(self, def notify_update_response(self,
resp: Union[str, bytes], resp: Union[str, bytes],
is_complete: bool = False is_complete: bool = False