update_manager: use component_init method for post initialzation

Clean up retry functionality to use for loops rather than while loops.

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2021-11-02 12:39:47 -04:00 committed by Eric Callahan
parent cbdbe83bbd
commit 15d99be9f0
1 changed files with 47 additions and 41 deletions

View File

@ -133,6 +133,7 @@ class UpdateManager:
self.cmd_request_lock = asyncio.Lock() self.cmd_request_lock = asyncio.Lock()
self.initialized_lock = asyncio.Event() self.initialized_lock = asyncio.Event()
self.init_success: bool = False
self.klippy_identified_evt: Optional[asyncio.Event] = None self.klippy_identified_evt: Optional[asyncio.Event] = None
# Auto Status Refresh # Auto Status Refresh
@ -170,21 +171,24 @@ class UpdateManager:
# Register Ready Event # Register Ready Event
self.server.register_event_handler( self.server.register_event_handler(
"server:klippy_identified", self._set_klipper_repo) "server:klippy_identified", self._set_klipper_repo)
# Initialize GitHub API Rate Limits and configured updaters
self.event_loop.register_callback(
self._initalize_updaters, list(self.updaters.values()))
async def _initalize_updaters(self, async def component_init(self) -> None:
initial_updaters: List[BaseDeploy]
) -> None:
async with self.cmd_request_lock: async with self.cmd_request_lock:
await self.cmd_helper.init_api_rate_limit() if not await self.cmd_helper.init_api_rate_limit():
for updater in initial_updaters: self.server.add_warning(
"update_manager: failed to initalize GitHub API "
"rate limit")
self.server.set_failed_component("update_manager")
self.init_success = False
self.initialized_lock.set()
return
for updater in list(self.updaters.values()):
if isinstance(updater, PackageDeploy): if isinstance(updater, PackageDeploy):
ret = updater.refresh(False) ret = updater.refresh(False)
else: else:
ret = updater.refresh() ret = updater.refresh()
await ret await ret
self.init_success = True
self.initialized_lock.set() self.initialized_lock.set()
if self.refresh_cb is not None: if self.refresh_cb is not None:
self.refresh_cb.start() self.refresh_cb.start()
@ -231,6 +235,11 @@ class UpdateManager:
pstate: str = result.get('print_stats', {}).get('state', "") pstate: str = result.get('print_stats', {}).get('state', "")
return pstate.lower() == "printing" return pstate.lower() == "printing"
async def _check_init_success(self):
await self.initialized_lock.wait()
if not self.init_success:
raise self.server.error("Update Manger Failed to Initialize", 500)
async def _handle_auto_refresh(self) -> None: async def _handle_auto_refresh(self) -> None:
if await self._check_klippy_printing(): if await self._check_klippy_printing():
# Don't Refresh during a print # Don't Refresh during a print
@ -262,7 +271,7 @@ class UpdateManager:
async def _handle_update_request(self, async def _handle_update_request(self,
web_request: WebRequest web_request: WebRequest
) -> str: ) -> str:
await self.initialized_lock.wait() await self._check_init_success()
if await self._check_klippy_printing(): if await self._check_klippy_printing():
raise self.server.error("Update Refused: Klippy is printing") raise self.server.error("Update Refused: Klippy is printing")
app: str = web_request.get_endpoint().split("/")[-1] app: str = web_request.get_endpoint().split("/")[-1]
@ -291,6 +300,7 @@ class UpdateManager:
async def _handle_full_update_request(self, async def _handle_full_update_request(self,
web_request: WebRequest web_request: WebRequest
) -> str: ) -> str:
await self._check_init_success()
async with self.cmd_request_lock: async with self.cmd_request_lock:
app_name = "" app_name = ""
self.cmd_helper.set_update_info('full', id(web_request), self.cmd_helper.set_update_info('full', id(web_request),
@ -376,7 +386,7 @@ class UpdateManager:
async def _handle_status_request(self, async def _handle_status_request(self,
web_request: WebRequest web_request: WebRequest
) -> Dict[str, Any]: ) -> Dict[str, Any]:
await self.initialized_lock.wait() await self._check_init_success()
check_refresh = web_request.get_boolean('refresh', False) check_refresh = web_request.get_boolean('refresh', False)
# Don't refresh if a print is currently in progress or # Don't refresh if a print is currently in progress or
# if an update is in progress. Just return the current # if an update is in progress. Just return the current
@ -419,7 +429,7 @@ class UpdateManager:
async def _handle_repo_recovery(self, async def _handle_repo_recovery(self,
web_request: WebRequest web_request: WebRequest
) -> str: ) -> str:
await self.initialized_lock.wait() await self._check_init_success()
if await self._check_klippy_printing(): if await self._check_klippy_printing():
raise self.server.error( raise self.server.error(
"Recovery Attempt Refused: Klippy is printing") "Recovery Attempt Refused: Klippy is printing")
@ -511,9 +521,9 @@ class CommandHelper:
'github_limit_reset_time': self.gh_limit_reset_time, 'github_limit_reset_time': self.gh_limit_reset_time,
} }
async def init_api_rate_limit(self) -> None: async def init_api_rate_limit(self, retries: int = 5) -> bool:
url = "https://api.github.com/rate_limit" url = "https://api.github.com/rate_limit"
while 1: for i in range(retries):
try: try:
resp = await self.github_api_request(url, is_init=True) resp = await self.github_api_request(url, is_init=True)
assert isinstance(resp, dict) assert isinstance(resp, dict)
@ -523,7 +533,8 @@ class CommandHelper:
self.gh_limit_reset_time = core['reset'] self.gh_limit_reset_time = core['reset']
except Exception: except Exception:
logging.exception("Error Initializing GitHub API Rate Limit") logging.exception("Error Initializing GitHub API Rate Limit")
await asyncio.sleep(30.) if i + 1 < retries:
await asyncio.sleep(2.)
else: else:
reset_time = time.ctime(self.gh_limit_reset_time) reset_time = time.ctime(self.gh_limit_reset_time)
logging.info( logging.info(
@ -532,7 +543,8 @@ class CommandHelper:
f"Rate Limit Remaining: {self.gh_limit_remaining}\n" f"Rate Limit Remaining: {self.gh_limit_remaining}\n"
f"Rate Limit Reset Time: {reset_time}, " f"Rate Limit Reset Time: {reset_time}, "
f"Seconds Since Epoch: {self.gh_limit_reset_time}") f"Seconds Since Epoch: {self.gh_limit_reset_time}")
break return True
return False
async def run_cmd(self, async def run_cmd(self,
cmd: str, cmd: str,
@ -545,11 +557,10 @@ class CommandHelper:
) -> None: ) -> None:
cb = self.notify_update_response if notify else None cb = self.notify_update_response if notify else None
scmd = self.build_shell_command(cmd, callback=cb, env=env, cwd=cwd) scmd = self.build_shell_command(cmd, callback=cb, env=env, cwd=cwd)
while retries: for _ in range(retries):
if await scmd.run(timeout=timeout, sig_idx=sig_idx): if await scmd.run(timeout=timeout, sig_idx=sig_idx):
break break
retries -= 1 else:
if not retries:
raise self.server.error("Shell Command Error") raise self.server.error("Shell Command Error")
async def run_cmd_with_response(self, async def run_cmd_with_response(self,
@ -567,7 +578,8 @@ class CommandHelper:
async def github_api_request(self, async def github_api_request(self,
url: str, url: str,
is_init: Optional[bool] = False is_init: Optional[bool] = False,
retries: int = 5
) -> JsonType: ) -> JsonType:
if self.gh_limit_remaining == 0: if self.gh_limit_remaining == 0:
curtime = time.time() curtime = time.time()
@ -586,8 +598,7 @@ class CommandHelper:
headers = {"Accept": "application/vnd.github.v3+json"} headers = {"Accept": "application/vnd.github.v3+json"}
if etag is not None: if etag is not None:
headers['If-None-Match'] = etag headers['If-None-Match'] = etag
retries = 5 for i in range(retries):
while retries:
try: try:
fut = self.http_client.fetch( fut = self.http_client.fetch(
url, headers=headers, connect_timeout=5., url, headers=headers, connect_timeout=5.,
@ -595,10 +606,9 @@ class CommandHelper:
resp: HTTPResponse resp: HTTPResponse
resp = await asyncio.wait_for(fut, 10.) resp = await asyncio.wait_for(fut, 10.)
except Exception: except Exception:
retries -= 1
if retries > 0:
logging.exception( logging.exception(
f"Error Processing GitHub API request: {url}") f"Error Processing GitHub API request: {url}")
if i + 1 < retries:
await asyncio.sleep(1.) await asyncio.sleep(1.)
continue continue
etag = resp.headers.get('etag', None) etag = resp.headers.get('etag', None)
@ -618,12 +628,9 @@ class CommandHelper:
logging.info(f"Github Request not Modified: {url}") logging.info(f"Github Request not Modified: {url}")
return cached_request.get_cached_result() return cached_request.get_cached_result()
if resp.code != 200: if resp.code != 200:
retries -= 1
if not retries:
raise self.server.error(
f"Github Request failed: {resp.code} {resp.reason}")
logging.info( logging.info(
f"Github request error, {retries} retries remaining") f"Github Request failed: {resp.code} {resp.reason}")
if i + 1 < retries:
await asyncio.sleep(1.) await asyncio.sleep(1.)
continue continue
# Update rate limit on return success # Update rate limit on return success
@ -643,10 +650,10 @@ class CommandHelper:
async def http_download_request(self, async def http_download_request(self,
url: str, url: str,
content_type: str, content_type: str,
timeout: float = 180. timeout: float = 180.,
retries: int = 5
) -> bytes: ) -> bytes:
retries = 5 for i in range(retries):
while retries:
try: try:
fut = self.http_client.fetch( fut = self.http_client.fetch(
url, headers={"Accept": content_type}, url, headers={"Accept": content_type},
@ -654,9 +661,8 @@ class CommandHelper:
resp: HTTPResponse resp: HTTPResponse
resp = await asyncio.wait_for(fut, timeout + 10.) resp = await asyncio.wait_for(fut, timeout + 10.)
except Exception: except Exception:
retries -= 1
logging.exception("Error Processing Download") logging.exception("Error Processing Download")
if not retries: if i + 1 == retries:
raise raise
await asyncio.sleep(1.) await asyncio.sleep(1.)
continue continue
@ -669,12 +675,12 @@ class CommandHelper:
dest: Union[str, pathlib.Path], dest: Union[str, pathlib.Path],
content_type: str, content_type: str,
size: int, size: int,
timeout: float = 180. timeout: float = 180.,
retries: int = 5
) -> None: ) -> None:
if isinstance(dest, str): if isinstance(dest, str):
dest = pathlib.Path(dest) dest = pathlib.Path(dest)
retries = 5 for i in range(retries):
while retries:
dl = StreamingDownload(self, dest, size) dl = StreamingDownload(self, dest, size)
try: try:
fut = self.http_client.fetch( fut = self.http_client.fetch(
@ -684,9 +690,8 @@ class CommandHelper:
resp: HTTPResponse resp: HTTPResponse
resp = await asyncio.wait_for(fut, timeout + 10.) resp = await asyncio.wait_for(fut, timeout + 10.)
except Exception: except Exception:
retries -= 1
logging.exception("Error Processing Download") logging.exception("Error Processing Download")
if not retries: if i + 1 == retries:
raise raise
await asyncio.sleep(1.) await asyncio.sleep(1.)
continue continue
@ -694,6 +699,7 @@ class CommandHelper:
await dl.close() await dl.close()
if resp.code < 400: if resp.code < 400:
return return
raise self.server.error(f"Retries exceeded for request: {url}")
def notify_update_response(self, def notify_update_response(self,
resp: Union[str, bytes], resp: Union[str, bytes],