shell_command: extend output support

It is now possible to redirect stderr to a callback for asynchronous message transfer.   Also added is the env option, allowing custom environment variables to be passed to the subprocess.

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Arksine 2021-03-06 18:24:49 -05:00
parent 29644bd44c
commit dc165636cb
1 changed files with 24 additions and 18 deletions

View File

@ -11,10 +11,11 @@ import asyncio
from tornado import gen from tornado import gen
class SCProcess(asyncio.subprocess.Process): class SCProcess(asyncio.subprocess.Process):
def initialize(self, cb, log_stderr, program): def initialize(self, program_name, std_out_cb, std_err_cb, log_stderr):
self.callback = cb self.program_name = program_name
self.std_out_cb = std_out_cb
self.std_err_cb = std_err_cb
self.log_stderr = log_stderr self.log_stderr = log_stderr
self.program = program
self.partial_data = b"" self.partial_data = b""
self.cancel_requested = False self.cancel_requested = False
@ -22,19 +23,20 @@ class SCProcess(asyncio.subprocess.Process):
transport = self._transport.get_pipe_transport(fd) transport = self._transport.get_pipe_transport(fd)
if fd == 2: if fd == 2:
stream = self.stderr stream = self.stderr
cb = self.std_err_cb
else: else:
assert fd == 1 assert fd == 1
stream = self.stdout stream = self.stdout
cb = self.std_out_cb
while not stream.at_eof(): while not stream.at_eof():
output = await stream.readline() output = await stream.readline()
if not output: if not output:
break break
if fd == 2 and self.log_stderr: if fd == 2 and self.log_stderr:
logging.info(f"{self.program}: {output.decode()}") logging.info(f"{self.program_name}: {output.decode()}")
else: output = output.rstrip(b'\n')
output = output.rstrip(b'\n') if output and cb is not None:
if output: cb(output)
self.callback(output)
transport.close() transport.close()
return output return output
@ -50,12 +52,12 @@ class SCProcess(asyncio.subprocess.Process):
await asyncio.wait_for(ret, timeout=2.) await asyncio.wait_for(ret, timeout=2.)
except asyncio.TimeoutError: except asyncio.TimeoutError:
continue continue
logging.debug(f"Command '{self.program}' exited with " logging.debug(f"Command '{self.program_name}' exited with "
f"signal: {sig.name}") f"signal: {sig.name}")
exit_success = True exit_success = True
break break
if not exit_success: if not exit_success:
logging.info(f"WARNING: {self.program} did not cleanly exit") logging.info(f"WARNING: {self.program_name} did not cleanly exit")
if self.stdout is not None: if self.stdout is not None:
self.stdout.feed_eof() self.stdout.feed_eof()
if self.stderr is not None: if self.stderr is not None:
@ -79,13 +81,15 @@ class SCProcess(asyncio.subprocess.Process):
await self.wait() await self.wait()
class ShellCommand: class ShellCommand:
def __init__(self, cmd, callback, log_stderr=False): def __init__(self, cmd, std_out_callback, std_err_callback,
env=None, log_stderr=False):
self.name = cmd self.name = cmd
self.output_cb = callback self.std_out_cb = std_out_callback
self.std_err_cb = std_err_callback
cmd = os.path.expanduser(cmd) cmd = os.path.expanduser(cmd)
self.command = shlex.split(cmd) self.command = shlex.split(cmd)
self.program = self.command[0]
self.log_stderr = log_stderr self.log_stderr = log_stderr
self.env = env
self.proc = None self.proc = None
self.cancelled = False self.cancelled = False
self.return_code = None self.return_code = None
@ -142,7 +146,7 @@ class ShellCommand:
else: else:
complete = not self.cancelled complete = not self.cancelled
if self.log_stderr and stderr: if self.log_stderr and stderr:
logging.info(f"{self.program}: {stderr.decode()}") logging.info(f"{self.command[0]}: {stderr.decode()}")
if self._check_proc_success(complete, quiet): if self._check_proc_success(complete, quiet):
return stdout.decode().rstrip("\n") return stdout.decode().rstrip("\n")
elif stdout: elif stdout:
@ -165,9 +169,10 @@ class ShellCommand:
transport, protocol = await loop.subprocess_exec( transport, protocol = await loop.subprocess_exec(
protocol_factory, *self.command, protocol_factory, *self.command,
stdout=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE,
stderr=errpipe) stderr=errpipe, env=self.env)
self.proc = SCProcess(transport, protocol, loop) self.proc = SCProcess(transport, protocol, loop)
self.proc.initialize(self.output_cb, self.log_stderr, self.program) self.proc.initialize(self.command[0], self.std_out_cb,
self.std_err_cb, self.log_stderr)
except Exception: except Exception:
logging.exception( logging.exception(
f"shell_command: Command ({self.name}) failed") f"shell_command: Command ({self.name}) failed")
@ -191,8 +196,9 @@ class ShellCommand:
return success return success
class ShellCommandFactory: class ShellCommandFactory:
def build_shell_command(self, cmd, callback=None, log_stderr=False): def build_shell_command(self, cmd, callback=None, std_err_callback=None,
return ShellCommand(cmd, callback, log_stderr) env=None, log_stderr=False):
return ShellCommand(cmd, callback, std_err_callback, env, log_stderr)
def load_plugin(config): def load_plugin(config):
return ShellCommandFactory() return ShellCommandFactory()