shell_command: refactor using asyncio

At times a subprocess created by shell command fails.  Attempt to mitigate this by using asyncio's version of subprocess, which should be more stable when running via the event loop.

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Arksine 2021-02-23 09:02:40 -05:00
parent 4abfb886f5
commit 75dce0af1a
1 changed files with 120 additions and 75 deletions

View File

@ -5,47 +5,81 @@
# This file may be distributed under the terms of the GNU GPLv3 license.
import os
import shlex
import subprocess
import logging
import asyncio
from tornado import gen
from tornado.ioloop import IOLoop
class SCProcess(asyncio.subprocess.Process):
def initialize(self, cb, log_stderr, program):
self.callback = cb
self.log_stderr = log_stderr
self.program = program
self.partial_data = b""
async def _read_stream_with_cb(self, fd):
transport = self._transport.get_pipe_transport(fd)
if fd == 2:
stream = self.stderr
else:
assert fd == 1
stream = self.stdout
while not stream.at_eof():
output = await stream.readline()
if not output:
break
if fd == 2 and self.log_stderr:
logging.info(f"{self.program}: {output.decode()}")
else:
output = output.rstrip(b'\n')
if output:
self.callback(output)
transport.close()
return output
def cancel(self):
self.stdout.feed_eof()
self.stderr.feed_eof()
self.terminate()
async def communicate_with_cb(self, input=None):
if input is not None:
stdin = self._feed_stdin(input)
else:
stdin = self._noop()
if self.stdout is not None:
stdout = self._read_stream_with_cb(1)
else:
stdout = self._noop()
if self.stderr is not None:
stderr = self._read_stream_with_cb(2)
else:
stderr = self._noop()
stdin, stdout, stderr = await asyncio.tasks.gather(
stdin, stdout, stderr, loop=self._loop)
await self.wait()
class ShellCommand:
def __init__(self, cmd, callback):
self.io_loop = IOLoop.current()
def __init__(self, cmd, callback, log_stderr=False):
self.name = cmd
self.output_cb = callback
cmd = os.path.expanduser(cmd)
self.command = shlex.split(cmd)
self.partial_output = b""
self.program = self.command[0]
self.log_stderr = log_stderr
self.proc = None
self.cancelled = False
self.return_code = None
def _process_output(self, fd, events):
if events & IOLoop.ERROR:
return
try:
data = os.read(fd, 4096)
except Exception:
return
data = self.partial_output + data
lines = data.split(b'\n')
self.partial_output = lines.pop()
for line in lines:
try:
self.output_cb(line)
except Exception:
logging.exception("Error writing command output")
def cancel(self):
self.cancelled = True
if self.proc is not None:
self.proc.cancel()
def get_return_code(self):
return self.return_code
async def run(self, timeout=2., verbose=True):
self.return_code = fd = None
self.partial_output = b""
self.return_code = self.proc = None
self.cancelled = False
if timeout is None:
# Never timeout
@ -54,38 +88,72 @@ class ShellCommand:
# Fire and forget commands cannot be verbose as we can't
# clean up after the process terminates
verbose = False
if not await self._create_subprocess():
return False
if not timeout:
# fire and forget, return from execution
return True
try:
proc = subprocess.Popen(
self.command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if verbose:
ret = self.proc.communicate_with_cb()
else:
ret = self.proc.wait()
await asyncio.wait_for(ret, timeout=timeout)
except asyncio.TimeoutError:
complete = False
self.proc.terminate()
else:
complete = not self.cancelled
return self._check_proc_success(complete)
async def run_with_response(self, timeout=2., retries=1):
while retries > 0:
stdout = stderr = None
if await self._create_subprocess():
try:
ret = self.proc.communicate()
stdout, stderr = await asyncio.wait_for(
ret, timeout=timeout)
except asyncio.TimeoutError:
complete = False
self.proc.terminate()
else:
complete = not self.cancelled
if self.log_stderr:
logging.info(f"{self.program}: {stderr.decode()}")
if self._check_proc_success(complete):
return stdout.decode().rstrip("\n")
elif stdout:
logging.debug(
f"Shell command '{self.name}' output:"
f"\n{stdout.decode()}")
retries -= 1
await gen.sleep(.5)
return None
async def _create_subprocess(self):
loop = asyncio.get_event_loop()
def protocol_factory():
return asyncio.subprocess.SubprocessStreamProtocol(
limit=2**20, loop=loop)
try:
errpipe = asyncio.subprocess.PIPE if self.log_stderr \
else asyncio.subprocess.STDOUT
transport, protocol = await loop.subprocess_exec(
protocol_factory, *self.command,
stdout=asyncio.subprocess.PIPE,
stderr=errpipe)
self.proc = SCProcess(transport, protocol, loop)
self.proc.initialize(self.output_cb, self.log_stderr, self.program)
except Exception:
logging.exception(
f"shell_command: Command ({self.name}) failed")
return False
if verbose:
fd = proc.stdout.fileno()
self.io_loop.add_handler(
fd, self._process_output, IOLoop.READ | IOLoop.ERROR)
elif not timeout:
# fire and forget, return from execution
return True
sleeptime = 0
complete = False
while sleeptime < timeout:
await gen.sleep(.05)
sleeptime += .05
if proc.poll() is not None:
complete = True
break
if self.cancelled:
break
if not complete:
proc.terminate()
if verbose:
if self.partial_output:
self.output_cb(self.partial_output)
self.partial_output = b""
self.io_loop.remove_handler(fd)
self.return_code = proc.returncode
return True
def _check_proc_success(self, complete):
self.return_code = self.proc.returncode
success = self.return_code == 0 and complete
if success:
msg = f"Command ({self.name}) successfully finished"
@ -99,32 +167,9 @@ class ShellCommand:
logging.info(msg)
return success
async def run_with_response(self, timeout=2., retries=1):
result = []
def cb(data):
data = data.strip()
if data:
result.append(data.decode())
prev_cb = self.output_cb
self.output_cb = cb
while 1:
ret = await self.run(timeout)
if not ret or not result:
retries -= 1
if not retries:
return None
await gen.sleep(.5)
result.clear()
continue
break
self.output_cb = prev_cb
return "\n".join(result)
class ShellCommandFactory:
def build_shell_command(self, cmd, callback=None):
return ShellCommand(cmd, callback)
def build_shell_command(self, cmd, callback=None, log_stderr=False):
return ShellCommand(cmd, callback, log_stderr)
def load_plugin(config):
return ShellCommandFactory()