machine: support supervisord service info extraction

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2023-01-31 05:54:31 -05:00
parent 5aa645f974
commit c69441955a
No known key found for this signature in database
GPG Key ID: 5A1EB336DFB4C71B
1 changed files with 101 additions and 15 deletions

View File

@ -20,6 +20,7 @@ import shutil
import distro
import tempfile
import getpass
import configparser
from confighelper import FileSourceWrapper
from utils import MOONRAKER_PATH
@ -756,7 +757,8 @@ class Machine:
if not svc_info:
return
name = svc_info.get("unit_name", "unknown")
msg = f"\nSystemd unit {name}:"
manager = svc_info.get("manager", "systemd").capitalize()
msg = f"\n{manager} unit {name}:"
for key, val in svc_info.items():
if key == "properties":
msg += "\nProperties:"
@ -929,6 +931,7 @@ class SystemdCliProvider(BaseProvider):
unit_name = resp.split(maxsplit=2)[1]
service_info["unit_name"] = unit_name
service_info["is_default"] = True
service_info["manager"] = "systemd"
if unit_name != expected_name:
service_info["is_default"] = False
logging.info(
@ -1180,6 +1183,7 @@ class SystemdDbusProvider(BaseProvider):
unit_name = await unit_intf.get_id() # type: ignore
service_info["unit_name"] = unit_name
service_info["is_default"] = True
service_info["manager"] = "systemd"
if unit_name != expected_name:
service_info["is_default"] = False
logging.info(
@ -1280,7 +1284,11 @@ class SupervisordProvider(BaseProvider):
}
async def _exec_supervisorctl_command(
self, args: str, tries: int = 1, timeout=2.
self,
args: str,
tries: int = 1,
timeout: float = 2.,
success_codes: Optional[List[int]] = None
) -> str:
if self.spv_conf:
cmd = f"supervisorctl -c {self.spv_conf} {args}"
@ -1288,12 +1296,12 @@ class SupervisordProvider(BaseProvider):
cmd = f"supervisorctl {args}"
return await self.shell_cmd.exec_cmd(
cmd, proc_input=None, log_complete=False, retries=tries,
timeout=timeout
timeout=timeout, success_codes=success_codes
)
async def _detect_active_services(self) -> None:
machine: Machine = self.server.lookup_component("machine")
units: Dict[str, Any] = await self._list_processes()
units: Dict[str, Any] = await self._get_process_info()
for unit, info in units.items():
if machine.is_service_allowed(unit):
self.available_services[unit] = {
@ -1301,20 +1309,34 @@ class SupervisordProvider(BaseProvider):
'sub_state': info["state"]
}
async def _list_processes(self) -> Dict[str, Any]:
async def _get_process_info(
self, process_names: Optional[List[str]] = None
) -> Dict[str, Any]:
units: Dict[str, Any] = {}
cmd = "status"
if process_names is not None:
cmd = f"status {' '.join(process_names)}"
try:
resp = await self._exec_supervisorctl_command("status", timeout=6.)
resp = await self._exec_supervisorctl_command(
cmd, timeout=6., success_codes=[0, 3]
)
lines = [line.strip() for line in resp.split("\n") if line.strip()]
except Exception:
return {}
for line in lines:
parts = line.split()
units[parts[0]] = {
"state": parts[1].lower(),
"pid": int(parts[3].rstrip(",")),
"uptime": parts[5]
}
name: str = parts[0]
state: str = parts[1].lower()
if state == "running" and len(parts) >= 6:
units[name] = {
"state": state,
"pid": int(parts[3].rstrip(",")),
"uptime": parts[5]
}
else:
units[name] = {
"state": parts[1].lower()
}
return units
async def _update_service_status(self,
@ -1328,7 +1350,7 @@ class SupervisordProvider(BaseProvider):
try:
# slow reaction for supervisord, timeout set to 6.0
resp = await self.svc_cmd.run_with_response(
log_complete=False, timeout=6.
log_complete=False, timeout=6., success_codes=[0, 3]
)
resp_l = resp.strip().split("\n") # drop lengend
for svc, state in zip(svcs, resp_l):
@ -1346,8 +1368,36 @@ class SupervisordProvider(BaseProvider):
except Exception:
logging.exception("Error processing service state update")
# service files is defined since docker build.
# not needed to implete SV.
async def _find_service_by_pid(
self, expected_name: str, pid: int
) -> Dict[str, Any]:
service_info: Dict[str, Any] = {}
for _ in range(5):
proc_info = await self._get_process_info(
list(self.available_services.keys())
)
service_info["unit_name"] = expected_name
service_info["is_default"] = True
service_info["manager"] = "supervisord"
need_retry = False
for name, info in proc_info.items():
if "pid" not in info:
need_retry |= info["state"] == "starting"
elif info["pid"] == pid:
if name != expected_name:
service_info["unit_name"] = name
service_info["is_default"] = False
logging.info(
"Detected alternate unit name for "
f"{expected_name}: {name}"
)
return service_info
if need_retry:
await asyncio.sleep(1.)
else:
break
return {}
async def extract_service_info(
self,
service: str,
@ -1355,7 +1405,43 @@ class SupervisordProvider(BaseProvider):
properties: Optional[List[str]] = None,
raw: bool = False
) -> Dict[str, Any]:
return {}
service_info = await self._find_service_by_pid(service, pid)
if not service_info:
logging.info(
f"Unable to locate service info for {service}, pid: {pid}"
)
return {}
# locate supervisord.conf
if self.spv_conf:
spv_path = pathlib.Path(self.spv_conf)
if not spv_path.is_file():
logging.info(
f"Invalid supervisord configuration file: {self.spv_conf}"
)
return service_info
else:
default_config_locations = [
"/etc/supervisord.conf",
"/etc/supervisor/supervisord.conf"
]
for conf_path in default_config_locations:
spv_path = pathlib.Path(conf_path)
if spv_path.is_file():
break
else:
logging.info("Failed to locate supervisord.conf")
return service_info
spv_config = configparser.ConfigParser(interpolation=None)
spv_config.read_string(spv_path.read_text())
unit = service_info["unit_name"]
section_name = f"program:{unit}"
if not spv_config.has_section(section_name):
logging.info(
f"Unable to location supervisor section {section_name}"
)
return service_info
service_info["properties"] = dict(spv_config[section_name])
return service_info
# Install validation