From ce7495ecce6fc09a16b6a40426b80ce8a263aea7 Mon Sep 17 00:00:00 2001 From: Arksine Date: Thu, 13 May 2021 20:16:26 -0400 Subject: [PATCH] shell_command: add annotations Signed-off-by: Eric Callahan --- moonraker/components/shell_command.py | 117 +++++++++++++++++++------- 1 file changed, 86 insertions(+), 31 deletions(-) diff --git a/moonraker/components/shell_command.py b/moonraker/components/shell_command.py index 443a478..b096827 100644 --- a/moonraker/components/shell_command.py +++ b/moonraker/components/shell_command.py @@ -3,6 +3,8 @@ # Copyright (C) 2020 Eric Callahan # # This file may be distributed under the terms of the GNU GPLv3 license. + +from __future__ import annotations import os import shlex import logging @@ -11,24 +13,48 @@ import asyncio from tornado import gen from utils import ServerError +# Annotation imports +from typing import ( + TYPE_CHECKING, + Optional, + Callable, + Coroutine, + Dict, +) +if TYPE_CHECKING: + from confighelper import ConfigHelper + from asyncio import BaseTransport + OutputCallback = Optional[Callable[[bytes], None]] + class ShellCommandError(ServerError): - def __init__(self, message, return_code, stdout=b"", - stderr=b"", status_code=500): + def __init__(self, + message: str, + return_code: Optional[int], + stdout: Optional[bytes] = b"", + stderr: Optional[bytes] = b"", + status_code: int = 500 + ) -> None: super().__init__(message, status_code=status_code) self.stdout = stdout or b"" self.stderr = stderr or b"" self.return_code = return_code class SCProcess(asyncio.subprocess.Process): - def initialize(self, program_name, std_out_cb, std_err_cb, log_stderr): + def initialize(self, + program_name: str, + std_out_cb: OutputCallback, + std_err_cb: OutputCallback, + log_stderr: bool + ) -> None: self.program_name = program_name self.std_out_cb = std_out_cb self.std_err_cb = std_err_cb self.log_stderr = log_stderr self.cancel_requested = False - async def _read_stream_with_cb(self, fd): - transport = self._transport.get_pipe_transport(fd) + async def _read_stream_with_cb(self, fd: int) -> bytes: + transport: BaseTransport = \ + self._transport.get_pipe_transport(fd) # type: ignore if fd == 2: stream = self.stderr cb = self.std_err_cb @@ -36,6 +62,7 @@ class SCProcess(asyncio.subprocess.Process): assert fd == 1 stream = self.stdout cb = self.std_out_cb + assert stream is not None while not stream.at_eof(): output = await stream.readline() if not output: @@ -48,7 +75,7 @@ class SCProcess(asyncio.subprocess.Process): transport.close() return output - async def cancel(self, sig_idx=1): + async def cancel(self, sig_idx: int = 1) -> None: if self.cancel_requested: return self.cancel_requested = True @@ -73,30 +100,38 @@ class SCProcess(asyncio.subprocess.Process): if self.stderr is not None: self.stderr.feed_eof() - async def communicate_with_cb(self, input=None): + async def communicate_with_cb(self, + input: Optional[bytes] = None + ) -> None: if input is not None: - stdin = self._feed_stdin(input) + stdin: Coroutine = self._feed_stdin(input) # type: ignore else: - stdin = self._noop() + stdin = self._noop() # type: ignore if self.stdout is not None and self.std_out_cb is not None: - stdout = self._read_stream_with_cb(1) + stdout: Coroutine = self._read_stream_with_cb(1) else: - stdout = self._noop() + stdout = self._noop() # type: ignore has_err_output = self.std_err_cb is not None or self.log_stderr if self.stderr is not None and has_err_output: - stderr = self._read_stream_with_cb(2) + stderr: Coroutine = self._read_stream_with_cb(2) else: - stderr = self._noop() + stderr = self._noop() # type: ignore stdin, stdout, stderr = await asyncio.tasks.gather( - stdin, stdout, stderr, loop=self._loop) + stdin, stdout, stderr, loop=self._loop) # type: ignore await self.wait() class ShellCommand: IDX_SIGINT = 0 IDX_SIGTERM = 1 IDX_SIGKILL = 2 - def __init__(self, cmd, std_out_callback, std_err_callback, - env=None, log_stderr=False, cwd=None): + def __init__(self, + cmd: str, + std_out_callback: OutputCallback, + std_err_callback: OutputCallback, + env: Optional[Dict[str, str]] = None, + log_stderr: bool = False, + cwd: Optional[str] = None + ) -> None: self.name = cmd self.std_out_cb = std_out_callback self.std_err_cb = std_err_callback @@ -105,24 +140,28 @@ class ShellCommand: self.log_stderr = log_stderr self.env = env self.cwd = cwd - self.proc = None + self.proc: Optional[SCProcess] = None self.cancelled = False - self.return_code = None + self.return_code: Optional[int] = None - async def cancel(self, sig_idx=1): + async def cancel(self, sig_idx: int = 1) -> None: self.cancelled = True if self.proc is not None: await self.proc.cancel(sig_idx) - def get_return_code(self): + def get_return_code(self) -> Optional[int]: return self.return_code - def _reset_command_data(self): + def _reset_command_data(self) -> None: self.return_code = self.proc = None self.cancelled = False - async def run(self, timeout=2., verbose=True, log_complete=True, - sig_idx=1): + async def run(self, + timeout: float = 2., + verbose: bool = True, + log_complete: bool = True, + sig_idx: int = 1 + ) -> bool: self._reset_command_data() if not timeout: # Never timeout @@ -133,9 +172,10 @@ class ShellCommand: verbose = False if not await self._create_subprocess(): return False + assert self.proc is not None try: if verbose: - ret = self.proc.communicate_with_cb() + ret: Coroutine = self.proc.communicate_with_cb() else: ret = self.proc.wait() await asyncio.wait_for(ret, timeout=timeout) @@ -146,13 +186,18 @@ class ShellCommand: complete = not self.cancelled return self._check_proc_success(complete, log_complete) - async def run_with_response(self, timeout=2., retries=1, - log_complete=True, sig_idx=1): + async def run_with_response(self, + timeout: float = 2., + retries: int = 1, + log_complete: bool = True, + sig_idx: int = 1 + ) -> str: self._reset_command_data() retries = max(1, retries) while retries > 0: stdout = stderr = b"" if await self._create_subprocess(): + assert self.proc is not None try: ret = self.proc.communicate() stdout, stderr = await asyncio.wait_for( @@ -176,7 +221,7 @@ class ShellCommand: f"Error running shell command: '{self.command}'", self.return_code, stdout, stderr) - async def _create_subprocess(self): + async def _create_subprocess(self) -> bool: loop = asyncio.get_event_loop() def protocol_factory(): @@ -200,7 +245,11 @@ class ShellCommand: return False return True - def _check_proc_success(self, complete, log_complete): + def _check_proc_success(self, + complete: bool, + log_complete: bool + ) -> bool: + assert self.proc is not None self.return_code = self.proc.returncode success = self.return_code == 0 and complete if success: @@ -218,10 +267,16 @@ class ShellCommand: class ShellCommandFactory: error = ShellCommandError - def build_shell_command(self, cmd, callback=None, std_err_callback=None, - env=None, log_stderr=False, cwd=None): + def build_shell_command(self, + cmd: str, + callback: OutputCallback = None, + std_err_callback: OutputCallback = None, + env: Optional[Dict[str, str]] = None, + log_stderr: bool = False, + cwd: Optional[str] = None + ) -> ShellCommand: return ShellCommand(cmd, callback, std_err_callback, env, log_stderr, cwd) -def load_component(config): +def load_component(config: ConfigHelper) -> ShellCommandFactory: return ShellCommandFactory()