shell_command: add exec_cmd method to factory
This allows for users to execute a method directly rather than create a command. This is useful for fire once commands that do not need to inspect the return code unless there is an error. Signed-off-by: Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
parent
89d0bbdb63
commit
f4c5e808c3
|
@ -15,6 +15,7 @@ from utils import ServerError
|
|||
# Annotation imports
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
Awaitable,
|
||||
List,
|
||||
Optional,
|
||||
Callable,
|
||||
|
@ -349,6 +350,23 @@ class ShellCommandFactory:
|
|||
return ShellCommand(self, cmd, callback, std_err_callback, env,
|
||||
log_stderr, cwd)
|
||||
|
||||
def exec_cmd(self,
|
||||
cmd: str,
|
||||
timeout: float = 2.,
|
||||
retries: int = 1,
|
||||
sig_idx: int = 1,
|
||||
proc_input: Optional[str] = None,
|
||||
log_complete: bool = True,
|
||||
log_stderr: bool = False,
|
||||
env: Optional[Dict[str, str]] = None,
|
||||
cwd: Optional[str] = None
|
||||
) -> Awaitable:
|
||||
scmd = ShellCommand(self, cmd, None, None, env,
|
||||
log_stderr, cwd)
|
||||
coro = scmd.run_with_response(timeout, retries, log_complete,
|
||||
sig_idx, proc_input)
|
||||
return asyncio.create_task(coro)
|
||||
|
||||
async def close(self) -> None:
|
||||
for cmd in self.running_commands:
|
||||
await cmd.cancel()
|
||||
|
|
Loading…
Reference in New Issue