From 2ceafb0ff54c66e0eb956466ebe5ff3b7b437e00 Mon Sep 17 00:00:00 2001 From: Eric Callahan Date: Sun, 11 Jul 2021 06:45:06 -0400 Subject: [PATCH] git_deploy: replace references to ioloop with eventloop Wrap some additional system calls that could potentially block in "run_in_thread". Signed-off-by: Eric Callahan --- .../components/update_manager/git_deploy.py | 44 +++++++++---------- .../components/update_manager/zip_deploy.py | 2 +- 2 files changed, 23 insertions(+), 23 deletions(-) diff --git a/moonraker/components/update_manager/git_deploy.py b/moonraker/components/update_manager/git_deploy.py index 1a0d10a..e8840ff 100644 --- a/moonraker/components/update_manager/git_deploy.py +++ b/moonraker/components/update_manager/git_deploy.py @@ -11,8 +11,6 @@ import pathlib import shutil import re import logging -from concurrent.futures import ThreadPoolExecutor -from tornado.ioloop import IOLoop from .app_deploy import AppDeploy # Annotation imports @@ -179,9 +177,8 @@ class GitDeploy(AppDeploy): if not inst_path.is_file(): self.log_info(f"Unable to open install script: {inst_path}") return None - with ThreadPoolExecutor(max_workers=1) as tpe: - data = await IOLoop.current().run_in_executor( - tpe, inst_path.read_text) + event_loop = self.server.get_event_loop() + data = await event_loop.run_in_thread(inst_path.read_text) packages: List[str] = re.findall(r'PKGLIST="(.*)"', data) packages = [p.lstrip("${PKGLIST}").strip() for p in packages] if not packages: @@ -244,7 +241,7 @@ class GitRepo: self.init_evt: Optional[asyncio.Event] = None self.initialized: bool = False self.git_operation_lock = asyncio.Lock() - self.fetch_timeout_handle: Optional[object] = None + self.fetch_timeout_handle: Optional[asyncio.Handle] = None self.fetch_input_recd: bool = False async def initialize(self, need_fetch: bool = True) -> None: @@ -519,9 +516,10 @@ class GitRepo: async with self.git_operation_lock: self.cmd_helper.notify_update_response( f"Git Repo {self.alias}: Starting Clone Recovery...") + event_loop = self.server.get_event_loop() if self.backup_path.exists(): - shutil.rmtree(self.backup_path) - self._check_lock_file_exists(remove=True) + await event_loop.run_in_thread(shutil.rmtree, self.backup_path) + await self._check_lock_file_exists(remove=True) git_cmd = f"clone {self.origin_url} {self.backup_path}" try: await self._run_git_cmd_async(git_cmd, 1, False, False) @@ -530,8 +528,9 @@ class GitRepo: f"Git Repo {self.alias}: Git Clone Failed") raise self.server.error("Git Clone Error") from e if self.git_path.exists(): - shutil.rmtree(self.git_path) - shutil.move(str(self.backup_path), str(self.git_path)) + await event_loop.run_in_thread(shutil.rmtree, self.git_path) + await event_loop.run_in_thread( + shutil.move, str(self.backup_path), str(self.git_path)) self.cmd_helper.notify_update_response( f"Git Repo {self.alias}: Git Clone Complete") @@ -607,14 +606,15 @@ class GitRepo: def is_current(self) -> bool: return self.current_commit == self.upstream_commit - def _check_lock_file_exists(self, remove: bool = False) -> bool: + async def _check_lock_file_exists(self, remove: bool = False) -> bool: lock_path = self.git_path.joinpath(".git/index.lock") if lock_path.is_file(): if remove: logging.info(f"Git Repo {self.alias}: Git lock file found " "after git process exited, removing") try: - os.remove(lock_path) + event_loop = self.server.get_event_loop() + await event_loop.run_in_thread(os.remove, lock_path) except Exception: pass return True @@ -622,7 +622,7 @@ class GitRepo: async def _wait_for_lock_release(self, timeout: int = 60) -> None: while timeout: - if self._check_lock_file_exists(): + if await self._check_lock_file_exists(): if not timeout % 10: logging.info(f"Git Repo {self.alias}: Git lock file " f"exists, {timeout} seconds remaining " @@ -631,7 +631,7 @@ class GitRepo: timeout -= 1 else: return - self._check_lock_file_exists(remove=True) + await self._check_lock_file_exists(remove=True) async def _repair_loose_objects(self) -> bool: try: @@ -656,6 +656,7 @@ class GitRepo: # gets delayed we do not want to terminate it while the command # is processing. await self._wait_for_lock_release() + event_loop = self.server.get_event_loop() env = os.environ.copy() env.update(GIT_ENV_VARS) if need_git_path: @@ -667,16 +668,15 @@ class GitRepo: env=env) while retries: self.git_messages.clear() - ioloop = IOLoop.current() self.fetch_input_recd = False - self.fetch_timeout_handle = ioloop.call_later( - GIT_ASYNC_TIMEOUT, self._check_process_active, # type: ignore + self.fetch_timeout_handle = event_loop.delay_callback( + GIT_ASYNC_TIMEOUT, self._check_process_active, scmd, cmd) try: await scmd.run(timeout=0) except Exception: pass - ioloop.remove_timeout(self.fetch_timeout_handle) + self.fetch_timeout_handle.cancel() ret = scmd.get_return_code() if ret == 0: self.git_messages.clear() @@ -692,7 +692,7 @@ class GitRepo: f"Unable to repair loose objects, use hard recovery") retries -= 1 await asyncio.sleep(.5) - self._check_lock_file_exists(remove=True) + await self._check_lock_file_exists(remove=True) raise self.server.error(f"Git Command '{cmd}' failed") def _handle_process_output(self, output: bytes) -> None: @@ -716,10 +716,10 @@ class GitRepo: # Received some input, reschedule timeout logging.debug( f"Git Repo {self.alias}: {cmd_name} active, rescheduling") - ioloop = IOLoop.current() + event_loop = self.server.get_event_loop() self.fetch_input_recd = False - self.fetch_timeout_handle = ioloop.call_later( - GIT_ASYNC_TIMEOUT, self._check_process_active, # type: ignore + self.fetch_timeout_handle = event_loop.delay_callback( + GIT_ASYNC_TIMEOUT, self._check_process_active, scmd, cmd_name) else: # Request has timed out with no input, terminate it diff --git a/moonraker/components/update_manager/zip_deploy.py b/moonraker/components/update_manager/zip_deploy.py index 88d60c3..b984132 100644 --- a/moonraker/components/update_manager/zip_deploy.py +++ b/moonraker/components/update_manager/zip_deploy.py @@ -366,7 +366,7 @@ class ZipDeploy(AppDeploy): self.notify_status( f"Download Complete, extracting release to '{self.path}'") event_loop = self.server.get_event_loop() - event_loop.run_in_thread( + await event_loop.run_in_thread( self._extract_release, temp_download_file) await self._update_dependencies(npm_hash, force=force_dep_update) await self._update_repo_state()