diff --git a/moonraker/components/shell_command.py b/moonraker/components/shell_command.py index cac5054..86760b8 100644 --- a/moonraker/components/shell_command.py +++ b/moonraker/components/shell_command.py @@ -11,6 +11,7 @@ import logging import signal import asyncio from tornado import gen +from tornado.locks import Lock from utils import ServerError # Annotation imports @@ -146,6 +147,7 @@ class ShellCommand: self.proc: Optional[SCProcess] = None self.cancelled = False self.return_code: Optional[int] = None + self.run_lock = Lock() async def cancel(self, sig_idx: int = 1) -> None: self.cancelled = True @@ -165,32 +167,33 @@ class ShellCommand: log_complete: bool = True, sig_idx: int = 1 ) -> bool: - self.factory.add_running_command(self) - self._reset_command_data() - if not timeout: - # Never timeout - timeout = 9999999999999999. - if self.std_out_cb is None and self.std_err_cb is None and \ - not self.log_stderr: - # No callbacks set so output cannot be verbose - verbose = False - if not await self._create_subprocess(): - self.factory.remove_running_command(self) - return False - assert self.proc is not None - try: - if verbose: - ret: Coroutine = self.proc.communicate_with_cb() + async with self.run_lock: + self.factory.add_running_command(self) + self._reset_command_data() + if not timeout: + # Never timeout + timeout = 9999999999999999. + if self.std_out_cb is None and self.std_err_cb is None and \ + not self.log_stderr: + # No callbacks set so output cannot be verbose + verbose = False + if not await self._create_subprocess(): + self.factory.remove_running_command(self) + return False + assert self.proc is not None + try: + if verbose: + ret: Coroutine = self.proc.communicate_with_cb() + else: + ret = self.proc.wait() + await asyncio.wait_for(ret, timeout=timeout) + except asyncio.TimeoutError: + complete = False + await self.proc.cancel(sig_idx) else: - ret = self.proc.wait() - await asyncio.wait_for(ret, timeout=timeout) - except asyncio.TimeoutError: - complete = False - await self.proc.cancel(sig_idx) - else: - complete = not self.cancelled - self.factory.remove_running_command(self) - return self._check_proc_success(complete, log_complete) + complete = not self.cancelled + self.factory.remove_running_command(self) + return self._check_proc_success(complete, log_complete) async def run_with_response(self, timeout: float = 2., @@ -198,41 +201,43 @@ class ShellCommand: log_complete: bool = True, sig_idx: int = 1 ) -> str: - self.factory.add_running_command(self) - retries = max(1, retries) - while retries > 0: - self._reset_command_data() - timed_out = False - stdout = stderr = b"" - if await self._create_subprocess(): - assert self.proc is not None - try: - ret = self.proc.communicate() - stdout, stderr = await asyncio.wait_for( - ret, timeout=timeout) - except asyncio.TimeoutError: - complete = False - timed_out = True - await self.proc.cancel(sig_idx) - else: - complete = not self.cancelled - if self.log_stderr and stderr: - logging.info(f"{self.command[0]}: {stderr.decode()}") - if self._check_proc_success(complete, log_complete): - self.factory.remove_running_command(self) - return stdout.decode().rstrip("\n") - if stdout: - logging.debug( - f"Shell command '{self.name}' output:" - f"\n{stdout.decode()}") - if self.cancelled and not timed_out: - break - retries -= 1 - await gen.sleep(.5) - self.factory.remove_running_command(self) - raise ShellCommandError( - f"Error running shell command: '{self.command}'", - self.return_code, stdout, stderr) + async with self.run_lock: + self.factory.add_running_command(self) + retries = max(1, retries) + while retries > 0: + self._reset_command_data() + timed_out = False + stdout = stderr = b"" + if await self._create_subprocess(): + assert self.proc is not None + try: + ret = self.proc.communicate() + stdout, stderr = await asyncio.wait_for( + ret, timeout=timeout) + except asyncio.TimeoutError: + complete = False + timed_out = True + await self.proc.cancel(sig_idx) + else: + complete = not self.cancelled + if self.log_stderr and stderr: + logging.info( + f"{self.command[0]}: {stderr.decode()}") + if self._check_proc_success(complete, log_complete): + self.factory.remove_running_command(self) + return stdout.decode().rstrip("\n") + if stdout: + logging.debug( + f"Shell command '{self.name}' output:" + f"\n{stdout.decode()}") + if self.cancelled and not timed_out: + break + retries -= 1 + await gen.sleep(.5) + self.factory.remove_running_command(self) + raise ShellCommandError( + f"Error running shell command: '{self.command}'", + self.return_code, stdout, stderr) async def _create_subprocess(self) -> bool: loop = asyncio.get_event_loop()