git_deploy: replace references to ioloop with eventloop
Wrap some additional system calls that could potentially block in "run_in_thread". Signed-off-by: Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
parent
912840bbcd
commit
2ceafb0ff5
|
@ -11,8 +11,6 @@ import pathlib
|
||||||
import shutil
|
import shutil
|
||||||
import re
|
import re
|
||||||
import logging
|
import logging
|
||||||
from concurrent.futures import ThreadPoolExecutor
|
|
||||||
from tornado.ioloop import IOLoop
|
|
||||||
from .app_deploy import AppDeploy
|
from .app_deploy import AppDeploy
|
||||||
|
|
||||||
# Annotation imports
|
# Annotation imports
|
||||||
|
@ -179,9 +177,8 @@ class GitDeploy(AppDeploy):
|
||||||
if not inst_path.is_file():
|
if not inst_path.is_file():
|
||||||
self.log_info(f"Unable to open install script: {inst_path}")
|
self.log_info(f"Unable to open install script: {inst_path}")
|
||||||
return None
|
return None
|
||||||
with ThreadPoolExecutor(max_workers=1) as tpe:
|
event_loop = self.server.get_event_loop()
|
||||||
data = await IOLoop.current().run_in_executor(
|
data = await event_loop.run_in_thread(inst_path.read_text)
|
||||||
tpe, inst_path.read_text)
|
|
||||||
packages: List[str] = re.findall(r'PKGLIST="(.*)"', data)
|
packages: List[str] = re.findall(r'PKGLIST="(.*)"', data)
|
||||||
packages = [p.lstrip("${PKGLIST}").strip() for p in packages]
|
packages = [p.lstrip("${PKGLIST}").strip() for p in packages]
|
||||||
if not packages:
|
if not packages:
|
||||||
|
@ -244,7 +241,7 @@ class GitRepo:
|
||||||
self.init_evt: Optional[asyncio.Event] = None
|
self.init_evt: Optional[asyncio.Event] = None
|
||||||
self.initialized: bool = False
|
self.initialized: bool = False
|
||||||
self.git_operation_lock = asyncio.Lock()
|
self.git_operation_lock = asyncio.Lock()
|
||||||
self.fetch_timeout_handle: Optional[object] = None
|
self.fetch_timeout_handle: Optional[asyncio.Handle] = None
|
||||||
self.fetch_input_recd: bool = False
|
self.fetch_input_recd: bool = False
|
||||||
|
|
||||||
async def initialize(self, need_fetch: bool = True) -> None:
|
async def initialize(self, need_fetch: bool = True) -> None:
|
||||||
|
@ -519,9 +516,10 @@ class GitRepo:
|
||||||
async with self.git_operation_lock:
|
async with self.git_operation_lock:
|
||||||
self.cmd_helper.notify_update_response(
|
self.cmd_helper.notify_update_response(
|
||||||
f"Git Repo {self.alias}: Starting Clone Recovery...")
|
f"Git Repo {self.alias}: Starting Clone Recovery...")
|
||||||
|
event_loop = self.server.get_event_loop()
|
||||||
if self.backup_path.exists():
|
if self.backup_path.exists():
|
||||||
shutil.rmtree(self.backup_path)
|
await event_loop.run_in_thread(shutil.rmtree, self.backup_path)
|
||||||
self._check_lock_file_exists(remove=True)
|
await self._check_lock_file_exists(remove=True)
|
||||||
git_cmd = f"clone {self.origin_url} {self.backup_path}"
|
git_cmd = f"clone {self.origin_url} {self.backup_path}"
|
||||||
try:
|
try:
|
||||||
await self._run_git_cmd_async(git_cmd, 1, False, False)
|
await self._run_git_cmd_async(git_cmd, 1, False, False)
|
||||||
|
@ -530,8 +528,9 @@ class GitRepo:
|
||||||
f"Git Repo {self.alias}: Git Clone Failed")
|
f"Git Repo {self.alias}: Git Clone Failed")
|
||||||
raise self.server.error("Git Clone Error") from e
|
raise self.server.error("Git Clone Error") from e
|
||||||
if self.git_path.exists():
|
if self.git_path.exists():
|
||||||
shutil.rmtree(self.git_path)
|
await event_loop.run_in_thread(shutil.rmtree, self.git_path)
|
||||||
shutil.move(str(self.backup_path), str(self.git_path))
|
await event_loop.run_in_thread(
|
||||||
|
shutil.move, str(self.backup_path), str(self.git_path))
|
||||||
self.cmd_helper.notify_update_response(
|
self.cmd_helper.notify_update_response(
|
||||||
f"Git Repo {self.alias}: Git Clone Complete")
|
f"Git Repo {self.alias}: Git Clone Complete")
|
||||||
|
|
||||||
|
@ -607,14 +606,15 @@ class GitRepo:
|
||||||
def is_current(self) -> bool:
|
def is_current(self) -> bool:
|
||||||
return self.current_commit == self.upstream_commit
|
return self.current_commit == self.upstream_commit
|
||||||
|
|
||||||
def _check_lock_file_exists(self, remove: bool = False) -> bool:
|
async def _check_lock_file_exists(self, remove: bool = False) -> bool:
|
||||||
lock_path = self.git_path.joinpath(".git/index.lock")
|
lock_path = self.git_path.joinpath(".git/index.lock")
|
||||||
if lock_path.is_file():
|
if lock_path.is_file():
|
||||||
if remove:
|
if remove:
|
||||||
logging.info(f"Git Repo {self.alias}: Git lock file found "
|
logging.info(f"Git Repo {self.alias}: Git lock file found "
|
||||||
"after git process exited, removing")
|
"after git process exited, removing")
|
||||||
try:
|
try:
|
||||||
os.remove(lock_path)
|
event_loop = self.server.get_event_loop()
|
||||||
|
await event_loop.run_in_thread(os.remove, lock_path)
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
return True
|
return True
|
||||||
|
@ -622,7 +622,7 @@ class GitRepo:
|
||||||
|
|
||||||
async def _wait_for_lock_release(self, timeout: int = 60) -> None:
|
async def _wait_for_lock_release(self, timeout: int = 60) -> None:
|
||||||
while timeout:
|
while timeout:
|
||||||
if self._check_lock_file_exists():
|
if await self._check_lock_file_exists():
|
||||||
if not timeout % 10:
|
if not timeout % 10:
|
||||||
logging.info(f"Git Repo {self.alias}: Git lock file "
|
logging.info(f"Git Repo {self.alias}: Git lock file "
|
||||||
f"exists, {timeout} seconds remaining "
|
f"exists, {timeout} seconds remaining "
|
||||||
|
@ -631,7 +631,7 @@ class GitRepo:
|
||||||
timeout -= 1
|
timeout -= 1
|
||||||
else:
|
else:
|
||||||
return
|
return
|
||||||
self._check_lock_file_exists(remove=True)
|
await self._check_lock_file_exists(remove=True)
|
||||||
|
|
||||||
async def _repair_loose_objects(self) -> bool:
|
async def _repair_loose_objects(self) -> bool:
|
||||||
try:
|
try:
|
||||||
|
@ -656,6 +656,7 @@ class GitRepo:
|
||||||
# gets delayed we do not want to terminate it while the command
|
# gets delayed we do not want to terminate it while the command
|
||||||
# is processing.
|
# is processing.
|
||||||
await self._wait_for_lock_release()
|
await self._wait_for_lock_release()
|
||||||
|
event_loop = self.server.get_event_loop()
|
||||||
env = os.environ.copy()
|
env = os.environ.copy()
|
||||||
env.update(GIT_ENV_VARS)
|
env.update(GIT_ENV_VARS)
|
||||||
if need_git_path:
|
if need_git_path:
|
||||||
|
@ -667,16 +668,15 @@ class GitRepo:
|
||||||
env=env)
|
env=env)
|
||||||
while retries:
|
while retries:
|
||||||
self.git_messages.clear()
|
self.git_messages.clear()
|
||||||
ioloop = IOLoop.current()
|
|
||||||
self.fetch_input_recd = False
|
self.fetch_input_recd = False
|
||||||
self.fetch_timeout_handle = ioloop.call_later(
|
self.fetch_timeout_handle = event_loop.delay_callback(
|
||||||
GIT_ASYNC_TIMEOUT, self._check_process_active, # type: ignore
|
GIT_ASYNC_TIMEOUT, self._check_process_active,
|
||||||
scmd, cmd)
|
scmd, cmd)
|
||||||
try:
|
try:
|
||||||
await scmd.run(timeout=0)
|
await scmd.run(timeout=0)
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
ioloop.remove_timeout(self.fetch_timeout_handle)
|
self.fetch_timeout_handle.cancel()
|
||||||
ret = scmd.get_return_code()
|
ret = scmd.get_return_code()
|
||||||
if ret == 0:
|
if ret == 0:
|
||||||
self.git_messages.clear()
|
self.git_messages.clear()
|
||||||
|
@ -692,7 +692,7 @@ class GitRepo:
|
||||||
f"Unable to repair loose objects, use hard recovery")
|
f"Unable to repair loose objects, use hard recovery")
|
||||||
retries -= 1
|
retries -= 1
|
||||||
await asyncio.sleep(.5)
|
await asyncio.sleep(.5)
|
||||||
self._check_lock_file_exists(remove=True)
|
await self._check_lock_file_exists(remove=True)
|
||||||
raise self.server.error(f"Git Command '{cmd}' failed")
|
raise self.server.error(f"Git Command '{cmd}' failed")
|
||||||
|
|
||||||
def _handle_process_output(self, output: bytes) -> None:
|
def _handle_process_output(self, output: bytes) -> None:
|
||||||
|
@ -716,10 +716,10 @@ class GitRepo:
|
||||||
# Received some input, reschedule timeout
|
# Received some input, reschedule timeout
|
||||||
logging.debug(
|
logging.debug(
|
||||||
f"Git Repo {self.alias}: {cmd_name} active, rescheduling")
|
f"Git Repo {self.alias}: {cmd_name} active, rescheduling")
|
||||||
ioloop = IOLoop.current()
|
event_loop = self.server.get_event_loop()
|
||||||
self.fetch_input_recd = False
|
self.fetch_input_recd = False
|
||||||
self.fetch_timeout_handle = ioloop.call_later(
|
self.fetch_timeout_handle = event_loop.delay_callback(
|
||||||
GIT_ASYNC_TIMEOUT, self._check_process_active, # type: ignore
|
GIT_ASYNC_TIMEOUT, self._check_process_active,
|
||||||
scmd, cmd_name)
|
scmd, cmd_name)
|
||||||
else:
|
else:
|
||||||
# Request has timed out with no input, terminate it
|
# Request has timed out with no input, terminate it
|
||||||
|
|
|
@ -366,7 +366,7 @@ class ZipDeploy(AppDeploy):
|
||||||
self.notify_status(
|
self.notify_status(
|
||||||
f"Download Complete, extracting release to '{self.path}'")
|
f"Download Complete, extracting release to '{self.path}'")
|
||||||
event_loop = self.server.get_event_loop()
|
event_loop = self.server.get_event_loop()
|
||||||
event_loop.run_in_thread(
|
await event_loop.run_in_thread(
|
||||||
self._extract_release, temp_download_file)
|
self._extract_release, temp_download_file)
|
||||||
await self._update_dependencies(npm_hash, force=force_dep_update)
|
await self._update_dependencies(npm_hash, force=force_dep_update)
|
||||||
await self._update_repo_state()
|
await self._update_repo_state()
|
||||||
|
|
Loading…
Reference in New Issue