update_manager: use asyncio locks

Transion away from Tornado's wrappers, as their implementation of
"wait()" can be confusing.  Some of tornado's methods require that
you specify a delay in seconds, others requrire that you specify a
timeout relative to the Unix epoch.

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2021-07-07 08:10:07 -04:00
parent 9133b59dbf
commit 74630f933a
3 changed files with 40 additions and 50 deletions

View File

@ -5,14 +5,13 @@
# This file may be distributed under the terms of the GNU GPLv3 license. # This file may be distributed under the terms of the GNU GPLv3 license.
from __future__ import annotations from __future__ import annotations
import asyncio
import os import os
import pathlib import pathlib
import shutil import shutil
import re import re
import logging import logging
import tornado
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
from tornado.locks import Condition, Lock
from tornado.ioloop import IOLoop from tornado.ioloop import IOLoop
from .app_deploy import AppDeploy from .app_deploy import AppDeploy
@ -241,20 +240,20 @@ class GitRepo:
sudo service {self.alias} start sudo service {self.alias} start
""" """
self.init_condition: Optional[Condition] = None self.init_evt: Optional[asyncio.Event] = None
self.initialized: bool = False self.initialized: bool = False
self.git_operation_lock = Lock() self.git_operation_lock = asyncio.Lock()
self.fetch_timeout_handle: Optional[object] = None self.fetch_timeout_handle: Optional[object] = None
self.fetch_input_recd: bool = False self.fetch_input_recd: bool = False
async def initialize(self, need_fetch: bool = True) -> None: async def initialize(self, need_fetch: bool = True) -> None:
if self.init_condition is not None: if self.init_evt is not None:
# No need to initialize multiple requests # No need to initialize multiple requests
await self.init_condition.wait() await self.init_evt.wait()
if self.initialized: if self.initialized:
return return
self.initialized = False self.initialized = False
self.init_condition = Condition() self.init_evt = asyncio.Event()
self.git_messages.clear() self.git_messages.clear()
try: try:
await self.update_repo_status() await self.update_repo_status()
@ -337,12 +336,12 @@ class GitRepo:
else: else:
self.initialized = True self.initialized = True
finally: finally:
self.init_condition.notify_all() self.init_evt.set()
self.init_condition = None self.init_evt = None
async def wait_for_init(self) -> None: async def wait_for_init(self) -> None:
if self.init_condition is not None: if self.init_evt is not None:
await self.init_condition.wait() await self.init_evt.wait()
if not self.initialized: if not self.initialized:
raise self.server.error( raise self.server.error(
f"Git Repo {self.alias}: Initialization failure") f"Git Repo {self.alias}: Initialization failure")
@ -625,7 +624,7 @@ class GitRepo:
logging.info(f"Git Repo {self.alias}: Git lock file " logging.info(f"Git Repo {self.alias}: Git lock file "
f"exists, {timeout} seconds remaining " f"exists, {timeout} seconds remaining "
"before removal.") "before removal.")
await tornado.gen.sleep(1.) await asyncio.sleep(1.)
timeout -= 1 timeout -= 1
else: else:
return return
@ -689,7 +688,7 @@ class GitRepo:
raise self.server.error( raise self.server.error(
f"Unable to repair loose objects, use hard recovery") f"Unable to repair loose objects, use hard recovery")
retries -= 1 retries -= 1
await tornado.gen.sleep(.5) await asyncio.sleep(.5)
self._check_lock_file_exists(remove=True) self._check_lock_file_exists(remove=True)
raise self.server.error(f"Git Command '{cmd}' failed") raise self.server.error(f"Git Command '{cmd}' failed")

View File

@ -5,6 +5,7 @@
# This file may be distributed under the terms of the GNU GPLv3 license. # This file may be distributed under the terms of the GNU GPLv3 license.
from __future__ import annotations from __future__ import annotations
import asyncio
import os import os
import pathlib import pathlib
import logging import logging
@ -15,11 +16,8 @@ import zipfile
import time import time
import tempfile import tempfile
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
import tornado.gen
import tornado.util
from tornado.ioloop import IOLoop, PeriodicCallback from tornado.ioloop import IOLoop, PeriodicCallback
from tornado.httpclient import AsyncHTTPClient from tornado.httpclient import AsyncHTTPClient
from tornado.locks import Event, Lock
from .base_deploy import BaseDeploy from .base_deploy import BaseDeploy
from .app_deploy import AppDeploy from .app_deploy import AppDeploy
from .git_deploy import GitDeploy from .git_deploy import GitDeploy
@ -134,9 +132,9 @@ class UpdateManager:
raise config.error( raise config.error(
f"Invalid type '{client_type}' for section [{section}]") f"Invalid type '{client_type}' for section [{section}]")
self.cmd_request_lock = Lock() self.cmd_request_lock = asyncio.Lock()
self.initialized_lock = Event() self.initialized_lock = asyncio.Event()
self.klippy_identified_evt: Optional[Event] = None self.klippy_identified_evt: Optional[asyncio.Event] = None
# Auto Status Refresh # Auto Status Refresh
self.last_refresh_time: float = 0 self.last_refresh_time: float = 0
@ -317,16 +315,16 @@ class UpdateManager:
app_name = 'klipper' app_name = 'klipper'
kupdater = self.updaters.get('klipper') kupdater = self.updaters.get('klipper')
if isinstance(kupdater, AppDeploy): if isinstance(kupdater, AppDeploy):
self.klippy_identified_evt = Event() self.klippy_identified_evt = asyncio.Event()
if not await self._check_need_reinstall(app_name): if not await self._check_need_reinstall(app_name):
await kupdater.update() await kupdater.update()
self.cmd_helper.notify_update_response( self.cmd_helper.notify_update_response(
"Waiting for Klippy to reconnect (this may take" "Waiting for Klippy to reconnect (this may take"
" up to 2 minutes)...") " up to 2 minutes)...")
try: try:
await self.klippy_identified_evt.wait( await asyncio.wait_for(
time.time() + 120.) self.klippy_identified_evt.wait(), 120.)
except tornado.util.TimeoutError: except asyncio.TimeoutError:
self.cmd_helper.notify_update_response( self.cmd_helper.notify_update_response(
"Klippy reconnect timed out...") "Klippy reconnect timed out...")
else: else:
@ -516,7 +514,7 @@ class CommandHelper:
self.gh_limit_reset_time = core['reset'] self.gh_limit_reset_time = core['reset']
except Exception: except Exception:
logging.exception("Error Initializing GitHub API Rate Limit") logging.exception("Error Initializing GitHub API Rate Limit")
await tornado.gen.sleep(30.) await asyncio.sleep(30.)
else: else:
reset_time = time.ctime(self.gh_limit_reset_time) reset_time = time.ctime(self.gh_limit_reset_time)
logging.info( logging.info(
@ -582,19 +580,17 @@ class CommandHelper:
retries = 5 retries = 5
while retries: while retries:
try: try:
timeout = time.time() + 10.
fut = self.http_client.fetch( fut = self.http_client.fetch(
url, headers=headers, connect_timeout=5., url, headers=headers, connect_timeout=5.,
request_timeout=5., raise_error=False) request_timeout=5., raise_error=False)
resp: HTTPResponse resp: HTTPResponse
resp = await tornado.gen.with_timeout(timeout, fut) resp = await asyncio.wait_for(fut, 10.)
except Exception: except Exception:
fut.cancel()
retries -= 1 retries -= 1
if retries > 0: if retries > 0:
logging.exception( logging.exception(
f"Error Processing GitHub API request: {url}") f"Error Processing GitHub API request: {url}")
await tornado.gen.sleep(1.) await asyncio.sleep(1.)
continue continue
etag = resp.headers.get('etag', None) etag = resp.headers.get('etag', None)
if etag is not None: if etag is not None:
@ -619,7 +615,7 @@ class CommandHelper:
f"Github Request failed: {resp.code} {resp.reason}") f"Github Request failed: {resp.code} {resp.reason}")
logging.info( logging.info(
f"Github request error, {retries} retries remaining") f"Github request error, {retries} retries remaining")
await tornado.gen.sleep(1.) await asyncio.sleep(1.)
continue continue
# Update rate limit on return success # Update rate limit on return success
if 'X-Ratelimit-Limit' in resp.headers and not is_init: if 'X-Ratelimit-Limit' in resp.headers and not is_init:
@ -643,19 +639,17 @@ class CommandHelper:
retries = 5 retries = 5
while retries: while retries:
try: try:
timeout = time.time() + timeout
fut = self.http_client.fetch( fut = self.http_client.fetch(
url, headers={"Accept": content_type}, url, headers={"Accept": content_type},
connect_timeout=5., request_timeout=timeout) connect_timeout=5., request_timeout=timeout)
resp: HTTPResponse resp: HTTPResponse
resp = await tornado.gen.with_timeout(timeout + 10., fut) resp = await asyncio.wait_for(fut, timeout + 10.)
except Exception: except Exception:
fut.cancel()
retries -= 1 retries -= 1
logging.exception("Error Processing Download") logging.exception("Error Processing Download")
if not retries: if not retries:
raise raise
await tornado.gen.sleep(1.) await asyncio.sleep(1.)
continue continue
return resp.body return resp.body
raise self.server.error( raise self.server.error(
@ -674,20 +668,18 @@ class CommandHelper:
while retries: while retries:
dl = StreamingDownload(self, dest, size) dl = StreamingDownload(self, dest, size)
try: try:
timeout = time.time() + timeout
fut = self.http_client.fetch( fut = self.http_client.fetch(
url, headers={"Accept": content_type}, url, headers={"Accept": content_type},
connect_timeout=5., request_timeout=timeout, connect_timeout=5., request_timeout=timeout,
streaming_callback=dl.on_chunk_recd) streaming_callback=dl.on_chunk_recd)
resp: HTTPResponse resp: HTTPResponse
resp = await tornado.gen.with_timeout(timeout + 10., fut) resp = await asyncio.wait_for(fut, timeout + 10.)
except Exception: except Exception:
fut.cancel()
retries -= 1 retries -= 1
logging.exception("Error Processing Download") logging.exception("Error Processing Download")
if not retries: if not retries:
raise raise
await tornado.gen.sleep(1.) await asyncio.sleep(1.)
continue continue
finally: finally:
await dl.close() await dl.close()
@ -746,7 +738,7 @@ class StreamingDownload:
self.total_recd: int = 0 self.total_recd: int = 0
self.last_pct: int = 0 self.last_pct: int = 0
self.chunk_buffer: List[bytes] = [] self.chunk_buffer: List[bytes] = []
self.busy_evt: Event = Event() self.busy_evt: asyncio.Event = asyncio.Event()
self.busy_evt.set() self.busy_evt.set()
def on_chunk_recd(self, chunk: bytes) -> None: def on_chunk_recd(self, chunk: bytes) -> None:
@ -785,8 +777,8 @@ class PackageDeploy(BaseDeploy):
) -> None: ) -> None:
super().__init__(config, cmd_helper) super().__init__(config, cmd_helper)
self.available_packages: List[str] = [] self.available_packages: List[str] = []
self.refresh_evt: Optional[Event] = None self.refresh_evt: Optional[asyncio.Event] = None
self.mutex: Lock = Lock() self.mutex: asyncio.Lock = asyncio.Lock()
async def refresh(self, fetch_packages: bool = True) -> None: async def refresh(self, fetch_packages: bool = True) -> None:
# TODO: Use python-apt python lib rather than command line for updates # TODO: Use python-apt python lib rather than command line for updates
@ -794,7 +786,7 @@ class PackageDeploy(BaseDeploy):
self.refresh_evt.wait() self.refresh_evt.wait()
return return
async with self.mutex: async with self.mutex:
self.refresh_evt = Event() self.refresh_evt = asyncio.Event()
try: try:
if fetch_packages: if fetch_packages:
await self.cmd_helper.run_cmd( await self.cmd_helper.run_cmd(
@ -856,8 +848,8 @@ class WebClientDeploy(BaseDeploy):
self.version: str = "?" self.version: str = "?"
self.remote_version: str = "?" self.remote_version: str = "?"
self.dl_info: Tuple[str, str, int] = ("?", "?", 0) self.dl_info: Tuple[str, str, int] = ("?", "?", 0)
self.refresh_evt: Optional[Event] = None self.refresh_evt: Optional[asyncio.Event] = None
self.mutex: Lock = Lock() self.mutex: asyncio.Lock = asyncio.Lock()
logging.info(f"\nInitializing Client Updater: '{self.name}'," logging.info(f"\nInitializing Client Updater: '{self.name}',"
f"\npath: {self.path}") f"\npath: {self.path}")
@ -876,7 +868,7 @@ class WebClientDeploy(BaseDeploy):
self.refresh_evt.wait() self.refresh_evt.wait()
return return
async with self.mutex: async with self.mutex:
self.refresh_evt = Event() self.refresh_evt = asyncio.Event()
try: try:
await self._get_local_version() await self._get_local_version()
await self._get_remote_version() await self._get_remote_version()

View File

@ -5,6 +5,7 @@
# This file may be distributed under the terms of the GNU GPLv3 license. # This file may be distributed under the terms of the GNU GPLv3 license.
from __future__ import annotations from __future__ import annotations
import asyncio
import os import os
import pathlib import pathlib
import json import json
@ -15,7 +16,6 @@ import tempfile
import zipfile import zipfile
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
from tornado.ioloop import IOLoop from tornado.ioloop import IOLoop
from tornado.locks import Event, Lock
from .app_deploy import AppDeploy from .app_deploy import AppDeploy
from utils import verify_source from utils import verify_source
@ -74,8 +74,8 @@ class ZipDeploy(AppDeploy):
self.python_pkg_list: List[str] = [] self.python_pkg_list: List[str] = []
self.release_download_info: Tuple[str, str, int] = ("?", "?", 0) self.release_download_info: Tuple[str, str, int] = ("?", "?", 0)
self.errors: List[str] = [] self.errors: List[str] = []
self.mutex: Lock = Lock() self.mutex: asyncio.Lock = asyncio.Lock()
self.refresh_event: Optional[Event] = None self.refresh_event: Optional[asyncio.Event] = None
@staticmethod @staticmethod
async def from_application(app: AppDeploy) -> ZipDeploy: async def from_application(app: AppDeploy) -> ZipDeploy:
@ -107,7 +107,7 @@ class ZipDeploy(AppDeploy):
await self.refresh_event.wait() await self.refresh_event.wait()
return return
async with self.mutex: async with self.mutex:
self.refresh_event = Event() self.refresh_event = asyncio.Event()
try: try:
await self._update_repo_state() await self._update_repo_state()
except Exception: except Exception:
@ -138,9 +138,8 @@ class ZipDeploy(AppDeploy):
f"Owner repo mismatch. Received {owner_repo}, " f"Owner repo mismatch. Received {owner_repo}, "
f"official: {self.official_repo}") f"official: {self.official_repo}")
# validate the local source code # validate the local source code
ioloop = IOLoop.current()
with ThreadPoolExecutor(max_workers=1) as tpe: with ThreadPoolExecutor(max_workers=1) as tpe:
res = await ioloop.run_in_executor( res = await IOLoop.current().run_in_executor(
tpe, verify_source, self.path) tpe, verify_source, self.path)
if res is not None: if res is not None:
self.source_checksum, self.pristine = res self.source_checksum, self.pristine = res