diff --git a/moonraker/plugins/shell_command.py b/moonraker/plugins/shell_command.py index e033baf..41c7ff9 100644 --- a/moonraker/plugins/shell_command.py +++ b/moonraker/plugins/shell_command.py @@ -5,47 +5,81 @@ # This file may be distributed under the terms of the GNU GPLv3 license. import os import shlex -import subprocess import logging +import asyncio from tornado import gen -from tornado.ioloop import IOLoop + +class SCProcess(asyncio.subprocess.Process): + def initialize(self, cb, log_stderr, program): + self.callback = cb + self.log_stderr = log_stderr + self.program = program + self.partial_data = b"" + + async def _read_stream_with_cb(self, fd): + transport = self._transport.get_pipe_transport(fd) + if fd == 2: + stream = self.stderr + else: + assert fd == 1 + stream = self.stdout + while not stream.at_eof(): + output = await stream.readline() + if not output: + break + if fd == 2 and self.log_stderr: + logging.info(f"{self.program}: {output.decode()}") + else: + output = output.rstrip(b'\n') + if output: + self.callback(output) + transport.close() + return output + + def cancel(self): + self.stdout.feed_eof() + self.stderr.feed_eof() + self.terminate() + + async def communicate_with_cb(self, input=None): + if input is not None: + stdin = self._feed_stdin(input) + else: + stdin = self._noop() + if self.stdout is not None: + stdout = self._read_stream_with_cb(1) + else: + stdout = self._noop() + if self.stderr is not None: + stderr = self._read_stream_with_cb(2) + else: + stderr = self._noop() + stdin, stdout, stderr = await asyncio.tasks.gather( + stdin, stdout, stderr, loop=self._loop) + await self.wait() class ShellCommand: - def __init__(self, cmd, callback): - self.io_loop = IOLoop.current() + def __init__(self, cmd, callback, log_stderr=False): self.name = cmd self.output_cb = callback cmd = os.path.expanduser(cmd) self.command = shlex.split(cmd) - self.partial_output = b"" + self.program = self.command[0] + self.log_stderr = log_stderr + self.proc = None self.cancelled = False self.return_code = None - def _process_output(self, fd, events): - if events & IOLoop.ERROR: - return - try: - data = os.read(fd, 4096) - except Exception: - return - data = self.partial_output + data - lines = data.split(b'\n') - self.partial_output = lines.pop() - for line in lines: - try: - self.output_cb(line) - except Exception: - logging.exception("Error writing command output") - def cancel(self): self.cancelled = True + if self.proc is not None: + self.proc.cancel() def get_return_code(self): return self.return_code async def run(self, timeout=2., verbose=True): - self.return_code = fd = None - self.partial_output = b"" + self.return_code = self.proc = None self.cancelled = False if timeout is None: # Never timeout @@ -54,38 +88,72 @@ class ShellCommand: # Fire and forget commands cannot be verbose as we can't # clean up after the process terminates verbose = False + if not await self._create_subprocess(): + return False + if not timeout: + # fire and forget, return from execution + return True try: - proc = subprocess.Popen( - self.command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + if verbose: + ret = self.proc.communicate_with_cb() + else: + ret = self.proc.wait() + await asyncio.wait_for(ret, timeout=timeout) + except asyncio.TimeoutError: + complete = False + self.proc.terminate() + else: + complete = not self.cancelled + return self._check_proc_success(complete) + + async def run_with_response(self, timeout=2., retries=1): + while retries > 0: + stdout = stderr = None + if await self._create_subprocess(): + try: + ret = self.proc.communicate() + stdout, stderr = await asyncio.wait_for( + ret, timeout=timeout) + except asyncio.TimeoutError: + complete = False + self.proc.terminate() + else: + complete = not self.cancelled + if self.log_stderr: + logging.info(f"{self.program}: {stderr.decode()}") + if self._check_proc_success(complete): + return stdout.decode().rstrip("\n") + elif stdout: + logging.debug( + f"Shell command '{self.name}' output:" + f"\n{stdout.decode()}") + retries -= 1 + await gen.sleep(.5) + return None + + async def _create_subprocess(self): + loop = asyncio.get_event_loop() + + def protocol_factory(): + return asyncio.subprocess.SubprocessStreamProtocol( + limit=2**20, loop=loop) + try: + errpipe = asyncio.subprocess.PIPE if self.log_stderr \ + else asyncio.subprocess.STDOUT + transport, protocol = await loop.subprocess_exec( + protocol_factory, *self.command, + stdout=asyncio.subprocess.PIPE, + stderr=errpipe) + self.proc = SCProcess(transport, protocol, loop) + self.proc.initialize(self.output_cb, self.log_stderr, self.program) except Exception: logging.exception( f"shell_command: Command ({self.name}) failed") return False - if verbose: - fd = proc.stdout.fileno() - self.io_loop.add_handler( - fd, self._process_output, IOLoop.READ | IOLoop.ERROR) - elif not timeout: - # fire and forget, return from execution - return True - sleeptime = 0 - complete = False - while sleeptime < timeout: - await gen.sleep(.05) - sleeptime += .05 - if proc.poll() is not None: - complete = True - break - if self.cancelled: - break - if not complete: - proc.terminate() - if verbose: - if self.partial_output: - self.output_cb(self.partial_output) - self.partial_output = b"" - self.io_loop.remove_handler(fd) - self.return_code = proc.returncode + return True + + def _check_proc_success(self, complete): + self.return_code = self.proc.returncode success = self.return_code == 0 and complete if success: msg = f"Command ({self.name}) successfully finished" @@ -99,32 +167,9 @@ class ShellCommand: logging.info(msg) return success - async def run_with_response(self, timeout=2., retries=1): - result = [] - - def cb(data): - data = data.strip() - if data: - result.append(data.decode()) - prev_cb = self.output_cb - self.output_cb = cb - while 1: - ret = await self.run(timeout) - if not ret or not result: - retries -= 1 - if not retries: - return None - await gen.sleep(.5) - result.clear() - continue - break - self.output_cb = prev_cb - return "\n".join(result) - - class ShellCommandFactory: - def build_shell_command(self, cmd, callback=None): - return ShellCommand(cmd, callback) + def build_shell_command(self, cmd, callback=None, log_stderr=False): + return ShellCommand(cmd, callback, log_stderr) def load_plugin(config): return ShellCommandFactory()