diff --git a/moonraker/components/update_manager/git_deploy.py b/moonraker/components/update_manager/git_deploy.py index 2a7850f..32d6159 100644 --- a/moonraker/components/update_manager/git_deploy.py +++ b/moonraker/components/update_manager/git_deploy.py @@ -5,14 +5,13 @@ # This file may be distributed under the terms of the GNU GPLv3 license. from __future__ import annotations +import asyncio import os import pathlib import shutil import re import logging -import tornado from concurrent.futures import ThreadPoolExecutor -from tornado.locks import Condition, Lock from tornado.ioloop import IOLoop from .app_deploy import AppDeploy @@ -241,20 +240,20 @@ class GitRepo: sudo service {self.alias} start """ - self.init_condition: Optional[Condition] = None + self.init_evt: Optional[asyncio.Event] = None self.initialized: bool = False - self.git_operation_lock = Lock() + self.git_operation_lock = asyncio.Lock() self.fetch_timeout_handle: Optional[object] = None self.fetch_input_recd: bool = False async def initialize(self, need_fetch: bool = True) -> None: - if self.init_condition is not None: + if self.init_evt is not None: # No need to initialize multiple requests - await self.init_condition.wait() + await self.init_evt.wait() if self.initialized: return self.initialized = False - self.init_condition = Condition() + self.init_evt = asyncio.Event() self.git_messages.clear() try: await self.update_repo_status() @@ -337,12 +336,12 @@ class GitRepo: else: self.initialized = True finally: - self.init_condition.notify_all() - self.init_condition = None + self.init_evt.set() + self.init_evt = None async def wait_for_init(self) -> None: - if self.init_condition is not None: - await self.init_condition.wait() + if self.init_evt is not None: + await self.init_evt.wait() if not self.initialized: raise self.server.error( f"Git Repo {self.alias}: Initialization failure") @@ -625,7 +624,7 @@ class GitRepo: logging.info(f"Git Repo {self.alias}: Git lock file " f"exists, {timeout} seconds remaining " "before removal.") - await tornado.gen.sleep(1.) + await asyncio.sleep(1.) timeout -= 1 else: return @@ -689,7 +688,7 @@ class GitRepo: raise self.server.error( f"Unable to repair loose objects, use hard recovery") retries -= 1 - await tornado.gen.sleep(.5) + await asyncio.sleep(.5) self._check_lock_file_exists(remove=True) raise self.server.error(f"Git Command '{cmd}' failed") diff --git a/moonraker/components/update_manager/update_manager.py b/moonraker/components/update_manager/update_manager.py index 8adb511..814f3ee 100644 --- a/moonraker/components/update_manager/update_manager.py +++ b/moonraker/components/update_manager/update_manager.py @@ -5,6 +5,7 @@ # This file may be distributed under the terms of the GNU GPLv3 license. from __future__ import annotations +import asyncio import os import pathlib import logging @@ -15,11 +16,8 @@ import zipfile import time import tempfile from concurrent.futures import ThreadPoolExecutor -import tornado.gen -import tornado.util from tornado.ioloop import IOLoop, PeriodicCallback from tornado.httpclient import AsyncHTTPClient -from tornado.locks import Event, Lock from .base_deploy import BaseDeploy from .app_deploy import AppDeploy from .git_deploy import GitDeploy @@ -134,9 +132,9 @@ class UpdateManager: raise config.error( f"Invalid type '{client_type}' for section [{section}]") - self.cmd_request_lock = Lock() - self.initialized_lock = Event() - self.klippy_identified_evt: Optional[Event] = None + self.cmd_request_lock = asyncio.Lock() + self.initialized_lock = asyncio.Event() + self.klippy_identified_evt: Optional[asyncio.Event] = None # Auto Status Refresh self.last_refresh_time: float = 0 @@ -317,16 +315,16 @@ class UpdateManager: app_name = 'klipper' kupdater = self.updaters.get('klipper') if isinstance(kupdater, AppDeploy): - self.klippy_identified_evt = Event() + self.klippy_identified_evt = asyncio.Event() if not await self._check_need_reinstall(app_name): await kupdater.update() self.cmd_helper.notify_update_response( "Waiting for Klippy to reconnect (this may take" " up to 2 minutes)...") try: - await self.klippy_identified_evt.wait( - time.time() + 120.) - except tornado.util.TimeoutError: + await asyncio.wait_for( + self.klippy_identified_evt.wait(), 120.) + except asyncio.TimeoutError: self.cmd_helper.notify_update_response( "Klippy reconnect timed out...") else: @@ -516,7 +514,7 @@ class CommandHelper: self.gh_limit_reset_time = core['reset'] except Exception: logging.exception("Error Initializing GitHub API Rate Limit") - await tornado.gen.sleep(30.) + await asyncio.sleep(30.) else: reset_time = time.ctime(self.gh_limit_reset_time) logging.info( @@ -582,19 +580,17 @@ class CommandHelper: retries = 5 while retries: try: - timeout = time.time() + 10. fut = self.http_client.fetch( url, headers=headers, connect_timeout=5., request_timeout=5., raise_error=False) resp: HTTPResponse - resp = await tornado.gen.with_timeout(timeout, fut) + resp = await asyncio.wait_for(fut, 10.) except Exception: - fut.cancel() retries -= 1 if retries > 0: logging.exception( f"Error Processing GitHub API request: {url}") - await tornado.gen.sleep(1.) + await asyncio.sleep(1.) continue etag = resp.headers.get('etag', None) if etag is not None: @@ -619,7 +615,7 @@ class CommandHelper: f"Github Request failed: {resp.code} {resp.reason}") logging.info( f"Github request error, {retries} retries remaining") - await tornado.gen.sleep(1.) + await asyncio.sleep(1.) continue # Update rate limit on return success if 'X-Ratelimit-Limit' in resp.headers and not is_init: @@ -643,19 +639,17 @@ class CommandHelper: retries = 5 while retries: try: - timeout = time.time() + timeout fut = self.http_client.fetch( url, headers={"Accept": content_type}, connect_timeout=5., request_timeout=timeout) resp: HTTPResponse - resp = await tornado.gen.with_timeout(timeout + 10., fut) + resp = await asyncio.wait_for(fut, timeout + 10.) except Exception: - fut.cancel() retries -= 1 logging.exception("Error Processing Download") if not retries: raise - await tornado.gen.sleep(1.) + await asyncio.sleep(1.) continue return resp.body raise self.server.error( @@ -674,20 +668,18 @@ class CommandHelper: while retries: dl = StreamingDownload(self, dest, size) try: - timeout = time.time() + timeout fut = self.http_client.fetch( url, headers={"Accept": content_type}, connect_timeout=5., request_timeout=timeout, streaming_callback=dl.on_chunk_recd) resp: HTTPResponse - resp = await tornado.gen.with_timeout(timeout + 10., fut) + resp = await asyncio.wait_for(fut, timeout + 10.) except Exception: - fut.cancel() retries -= 1 logging.exception("Error Processing Download") if not retries: raise - await tornado.gen.sleep(1.) + await asyncio.sleep(1.) continue finally: await dl.close() @@ -746,7 +738,7 @@ class StreamingDownload: self.total_recd: int = 0 self.last_pct: int = 0 self.chunk_buffer: List[bytes] = [] - self.busy_evt: Event = Event() + self.busy_evt: asyncio.Event = asyncio.Event() self.busy_evt.set() def on_chunk_recd(self, chunk: bytes) -> None: @@ -785,8 +777,8 @@ class PackageDeploy(BaseDeploy): ) -> None: super().__init__(config, cmd_helper) self.available_packages: List[str] = [] - self.refresh_evt: Optional[Event] = None - self.mutex: Lock = Lock() + self.refresh_evt: Optional[asyncio.Event] = None + self.mutex: asyncio.Lock = asyncio.Lock() async def refresh(self, fetch_packages: bool = True) -> None: # TODO: Use python-apt python lib rather than command line for updates @@ -794,7 +786,7 @@ class PackageDeploy(BaseDeploy): self.refresh_evt.wait() return async with self.mutex: - self.refresh_evt = Event() + self.refresh_evt = asyncio.Event() try: if fetch_packages: await self.cmd_helper.run_cmd( @@ -856,8 +848,8 @@ class WebClientDeploy(BaseDeploy): self.version: str = "?" self.remote_version: str = "?" self.dl_info: Tuple[str, str, int] = ("?", "?", 0) - self.refresh_evt: Optional[Event] = None - self.mutex: Lock = Lock() + self.refresh_evt: Optional[asyncio.Event] = None + self.mutex: asyncio.Lock = asyncio.Lock() logging.info(f"\nInitializing Client Updater: '{self.name}'," f"\npath: {self.path}") @@ -876,7 +868,7 @@ class WebClientDeploy(BaseDeploy): self.refresh_evt.wait() return async with self.mutex: - self.refresh_evt = Event() + self.refresh_evt = asyncio.Event() try: await self._get_local_version() await self._get_remote_version() diff --git a/moonraker/components/update_manager/zip_deploy.py b/moonraker/components/update_manager/zip_deploy.py index a575403..25c523d 100644 --- a/moonraker/components/update_manager/zip_deploy.py +++ b/moonraker/components/update_manager/zip_deploy.py @@ -5,6 +5,7 @@ # This file may be distributed under the terms of the GNU GPLv3 license. from __future__ import annotations +import asyncio import os import pathlib import json @@ -15,7 +16,6 @@ import tempfile import zipfile from concurrent.futures import ThreadPoolExecutor from tornado.ioloop import IOLoop -from tornado.locks import Event, Lock from .app_deploy import AppDeploy from utils import verify_source @@ -74,8 +74,8 @@ class ZipDeploy(AppDeploy): self.python_pkg_list: List[str] = [] self.release_download_info: Tuple[str, str, int] = ("?", "?", 0) self.errors: List[str] = [] - self.mutex: Lock = Lock() - self.refresh_event: Optional[Event] = None + self.mutex: asyncio.Lock = asyncio.Lock() + self.refresh_event: Optional[asyncio.Event] = None @staticmethod async def from_application(app: AppDeploy) -> ZipDeploy: @@ -107,7 +107,7 @@ class ZipDeploy(AppDeploy): await self.refresh_event.wait() return async with self.mutex: - self.refresh_event = Event() + self.refresh_event = asyncio.Event() try: await self._update_repo_state() except Exception: @@ -138,9 +138,8 @@ class ZipDeploy(AppDeploy): f"Owner repo mismatch. Received {owner_repo}, " f"official: {self.official_repo}") # validate the local source code - ioloop = IOLoop.current() with ThreadPoolExecutor(max_workers=1) as tpe: - res = await ioloop.run_in_executor( + res = await IOLoop.current().run_in_executor( tpe, verify_source, self.path) if res is not None: self.source_checksum, self.pristine = res