shell_command: Use a lock to prevent re-entrant calls

Re-entrant calls to "run" and "run_with_response" would poison the process state resulting in unexpected behavior.  Use a lock to prevent this.

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Arksine 2021-05-21 08:18:34 -04:00
parent e7193dbb5f
commit f1ba8e3d9b
1 changed files with 65 additions and 60 deletions

View File

@ -11,6 +11,7 @@ import logging
import signal import signal
import asyncio import asyncio
from tornado import gen from tornado import gen
from tornado.locks import Lock
from utils import ServerError from utils import ServerError
# Annotation imports # Annotation imports
@ -146,6 +147,7 @@ class ShellCommand:
self.proc: Optional[SCProcess] = None self.proc: Optional[SCProcess] = None
self.cancelled = False self.cancelled = False
self.return_code: Optional[int] = None self.return_code: Optional[int] = None
self.run_lock = Lock()
async def cancel(self, sig_idx: int = 1) -> None: async def cancel(self, sig_idx: int = 1) -> None:
self.cancelled = True self.cancelled = True
@ -165,32 +167,33 @@ class ShellCommand:
log_complete: bool = True, log_complete: bool = True,
sig_idx: int = 1 sig_idx: int = 1
) -> bool: ) -> bool:
self.factory.add_running_command(self) async with self.run_lock:
self._reset_command_data() self.factory.add_running_command(self)
if not timeout: self._reset_command_data()
# Never timeout if not timeout:
timeout = 9999999999999999. # Never timeout
if self.std_out_cb is None and self.std_err_cb is None and \ timeout = 9999999999999999.
not self.log_stderr: if self.std_out_cb is None and self.std_err_cb is None and \
# No callbacks set so output cannot be verbose not self.log_stderr:
verbose = False # No callbacks set so output cannot be verbose
if not await self._create_subprocess(): verbose = False
self.factory.remove_running_command(self) if not await self._create_subprocess():
return False self.factory.remove_running_command(self)
assert self.proc is not None return False
try: assert self.proc is not None
if verbose: try:
ret: Coroutine = self.proc.communicate_with_cb() if verbose:
ret: Coroutine = self.proc.communicate_with_cb()
else:
ret = self.proc.wait()
await asyncio.wait_for(ret, timeout=timeout)
except asyncio.TimeoutError:
complete = False
await self.proc.cancel(sig_idx)
else: else:
ret = self.proc.wait() complete = not self.cancelled
await asyncio.wait_for(ret, timeout=timeout) self.factory.remove_running_command(self)
except asyncio.TimeoutError: return self._check_proc_success(complete, log_complete)
complete = False
await self.proc.cancel(sig_idx)
else:
complete = not self.cancelled
self.factory.remove_running_command(self)
return self._check_proc_success(complete, log_complete)
async def run_with_response(self, async def run_with_response(self,
timeout: float = 2., timeout: float = 2.,
@ -198,41 +201,43 @@ class ShellCommand:
log_complete: bool = True, log_complete: bool = True,
sig_idx: int = 1 sig_idx: int = 1
) -> str: ) -> str:
self.factory.add_running_command(self) async with self.run_lock:
retries = max(1, retries) self.factory.add_running_command(self)
while retries > 0: retries = max(1, retries)
self._reset_command_data() while retries > 0:
timed_out = False self._reset_command_data()
stdout = stderr = b"" timed_out = False
if await self._create_subprocess(): stdout = stderr = b""
assert self.proc is not None if await self._create_subprocess():
try: assert self.proc is not None
ret = self.proc.communicate() try:
stdout, stderr = await asyncio.wait_for( ret = self.proc.communicate()
ret, timeout=timeout) stdout, stderr = await asyncio.wait_for(
except asyncio.TimeoutError: ret, timeout=timeout)
complete = False except asyncio.TimeoutError:
timed_out = True complete = False
await self.proc.cancel(sig_idx) timed_out = True
else: await self.proc.cancel(sig_idx)
complete = not self.cancelled else:
if self.log_stderr and stderr: complete = not self.cancelled
logging.info(f"{self.command[0]}: {stderr.decode()}") if self.log_stderr and stderr:
if self._check_proc_success(complete, log_complete): logging.info(
self.factory.remove_running_command(self) f"{self.command[0]}: {stderr.decode()}")
return stdout.decode().rstrip("\n") if self._check_proc_success(complete, log_complete):
if stdout: self.factory.remove_running_command(self)
logging.debug( return stdout.decode().rstrip("\n")
f"Shell command '{self.name}' output:" if stdout:
f"\n{stdout.decode()}") logging.debug(
if self.cancelled and not timed_out: f"Shell command '{self.name}' output:"
break f"\n{stdout.decode()}")
retries -= 1 if self.cancelled and not timed_out:
await gen.sleep(.5) break
self.factory.remove_running_command(self) retries -= 1
raise ShellCommandError( await gen.sleep(.5)
f"Error running shell command: '{self.command}'", self.factory.remove_running_command(self)
self.return_code, stdout, stderr) raise ShellCommandError(
f"Error running shell command: '{self.command}'",
self.return_code, stdout, stderr)
async def _create_subprocess(self) -> bool: async def _create_subprocess(self) -> bool:
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()