pip_utils: add async method to invoke pip directly

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2024-05-19 16:45:16 -04:00
parent ecb6ebcc18
commit 11d15f96d1
1 changed files with 35 additions and 0 deletions

View File

@ -165,6 +165,41 @@ class AsyncPipExecutor:
def get_shell_cmd(self) -> ShellCommandFactory: def get_shell_cmd(self) -> ShellCommandFactory:
return self.server.lookup_component("shell_command") return self.server.lookup_component("shell_command")
async def call_pip_with_response(
self,
args: str,
timeout: float = 30.,
attempts: int = 3,
sys_env_vars: Optional[Dict[str, Any]] = None
) -> str:
env: Optional[Dict[str, str]] = None
if sys_env_vars is not None:
env = dict(os.environ)
env.update(sys_env_vars)
shell_cmd = self.get_shell_cmd()
return await shell_cmd.exec_cmd(
f"{self.pip_cmd} {args}", attempts=attempts,
timeout=timeout, env=env, log_stderr=True
)
async def call_pip(
self,
args: str,
timeout: float = 30.,
attempts: int = 3,
sys_env_vars: Optional[Dict[str, Any]] = None
) -> None:
env: Optional[Dict[str, str]] = None
if sys_env_vars is not None:
env = dict(os.environ)
env.update(sys_env_vars)
shell_cmd = self.get_shell_cmd()
await shell_cmd.run_cmd_async(
f"{self.pip_cmd} {args}", self.notify_callback,
timeout=timeout, attempts=attempts, env=env,
log_stderr=True
)
async def get_pip_version(self) -> PipVersionInfo: async def get_pip_version(self) -> PipVersionInfo:
resp: str = await self.get_shell_cmd().exec_cmd( resp: str = await self.get_shell_cmd().exec_cmd(
f"{self.pip_cmd} --version", timeout=30., attempts=3, log_stderr=True f"{self.pip_cmd} --version", timeout=30., attempts=3, log_stderr=True