machine: refactor systemd cli implementation

Move all systemd cli calls to its own provider class, inherted from
a base provider class.  This is in preparation for multiple provider
implementations.

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2022-01-13 18:26:15 -05:00 committed by Eric Callahan
parent 9b6161a6b0
commit 0d6791a320
1 changed files with 163 additions and 128 deletions

View File

@ -19,13 +19,15 @@ from typing import (
TYPE_CHECKING, TYPE_CHECKING,
Any, Any,
Dict, Dict,
List List,
Optional
) )
if TYPE_CHECKING: if TYPE_CHECKING:
from confighelper import ConfigHelper from confighelper import ConfigHelper
from websockets import WebRequest from websockets import WebRequest
from . import shell_command from .shell_command import ShellCommandFactory as SCMDComp
SCMDComp = shell_command.ShellCommandFactory from .proc_stats import ProcStats
ALLOWED_SERVICES = [ ALLOWED_SERVICES = [
"moonraker", "klipper", "webcamd", "MoonCord", "moonraker", "klipper", "webcamd", "MoonCord",
@ -42,6 +44,8 @@ SD_MFGRS = {
'74': "PNY" '74': "PNY"
} }
IP_FAMILIES = {'inet': 'ipv4', 'inet6': 'ipv6'} IP_FAMILIES = {'inet': 'ipv4', 'inet6': 'ipv6'}
NETWORK_UPDATE_SEQUENCE = 10
class Machine: class Machine:
def __init__(self, config: ConfigHelper) -> None: def __init__(self, config: ConfigHelper) -> None:
self.server = config.get_server() self.server = config.get_server()
@ -50,7 +54,6 @@ class Machine:
dist_info.update(distro.info()) dist_info.update(distro.info())
dist_info['release_info'] = distro.distro_release_info() dist_info['release_info'] = distro.distro_release_info()
self.inside_container = False self.inside_container = False
self.virt_id = "none"
self.system_info: Dict[str, Any] = { self.system_info: Dict[str, Any] = {
'cpu_info': self._get_cpu_info(), 'cpu_info': self._get_cpu_info(),
'sd_info': self._get_sdcard_info(), 'sd_info': self._get_sdcard_info(),
@ -58,7 +61,7 @@ class Machine:
'virtualization': self._check_inside_container() 'virtualization': self._check_inside_container()
} }
self._update_log_rollover(log=True) self._update_log_rollover(log=True)
self.available_services: Dict[str, Dict[str, str]] = {} self.sys_provider = SystemdCliProvider(config)
self.server.register_endpoint( self.server.register_endpoint(
"/machine/reboot", ['POST'], self._handle_machine_request) "/machine/reboot", ['POST'], self._handle_machine_request)
@ -81,9 +84,9 @@ class Machine:
# Register remote methods # Register remote methods
self.server.register_remote_method( self.server.register_remote_method(
"shutdown_machine", self.shutdown_machine) "shutdown_machine", self.sys_provider.shutdown)
self.server.register_remote_method( self.server.register_remote_method(
"reboot_machine", self.reboot_machine) "reboot_machine", self.sys_provider.reboot)
self.init_evt = asyncio.Event() self.init_evt = asyncio.Event()
@ -105,38 +108,39 @@ class Machine:
pass pass
async def component_init(self): async def component_init(self):
await self.sys_provider.initialize()
if not self.inside_container: if not self.inside_container:
await self._check_virt_status() virt_info = await self.sys_provider.check_virt_status()
await self._find_active_services() self.system_info['virtualization'] = virt_info
await self.parse_network_interfaces(notify=False) await self._parse_network_interfaces(0, notify=False)
self._update_log_rollover() pstats: ProcStats = self.server.lookup_component('proc_stats')
pstats.register_stat_callback(self._parse_network_interfaces)
available_svcs = self.sys_provider.get_available_services()
avail_list = list(available_svcs.keys())
self.system_info['available_services'] = avail_list
self.system_info['service_state'] = available_svcs
self.init_evt.set()
async def _handle_machine_request(self, web_request: WebRequest) -> str: async def _handle_machine_request(self, web_request: WebRequest) -> str:
ep = web_request.get_endpoint() ep = web_request.get_endpoint()
if self.inside_container: if self.inside_container:
virt_id = self.system_info['virtualization'].get('virt_id', "none")
raise self.server.error( raise self.server.error(
f"Cannot {ep.split('/')[-1]} from within a " f"Cannot {ep.split('/')[-1]} from within a "
f"{self.virt_id} container") f"{virt_id} container")
if ep == "/machine/shutdown": if ep == "/machine/shutdown":
await self.shutdown_machine() await self.sys_provider.shutdown()
elif ep == "/machine/reboot": elif ep == "/machine/reboot":
await self.reboot_machine() await self.sys_provider.reboot()
else: else:
raise self.server.error("Unsupported machine request") raise self.server.error("Unsupported machine request")
return "ok" return "ok"
async def shutdown_machine(self) -> None:
await self._execute_cmd("sudo shutdown now")
async def reboot_machine(self) -> None:
await self._execute_cmd("sudo shutdown -r now")
async def do_service_action(self, async def do_service_action(self,
action: str, action: str,
service_name: str service_name: str
) -> None: ) -> None:
await self._execute_cmd( await self.sys_provider.do_service_action(action, service_name)
f'sudo systemctl {action} {service_name}')
async def _handle_service_request(self, web_request: WebRequest) -> str: async def _handle_service_request(self, web_request: WebRequest) -> str:
name: str = web_request.get('service') name: str = web_request.get('service')
@ -147,11 +151,10 @@ class Machine:
f"Service action '{action}' not available for moonraker") f"Service action '{action}' not available for moonraker")
event_loop = self.server.get_event_loop() event_loop = self.server.get_event_loop()
event_loop.register_callback(self.do_service_action, action, name) event_loop.register_callback(self.do_service_action, action, name)
elif name in self.available_services: elif self.sys_provider.is_service_available(name):
await self.do_service_action(action, name) await self.do_service_action(action, name)
else: else:
if name in ALLOWED_SERVICES and \ if name in ALLOWED_SERVICES:
name not in self.available_services:
raise self.server.error(f"Service '{name}' not installed") raise self.server.error(f"Service '{name}' not installed")
raise self.server.error( raise self.server.error(
f"Service '{name}' not allowed") f"Service '{name}' not allowed")
@ -162,15 +165,6 @@ class Machine:
) -> Dict[str, Any]: ) -> Dict[str, Any]:
return {'system_info': self.system_info} return {'system_info': self.system_info}
async def _execute_cmd(self, cmd: str) -> None:
shell_cmd: SCMDComp = self.server.lookup_component('shell_command')
scmd = shell_cmd.build_shell_command(cmd, None)
try:
await scmd.run(timeout=2., verbose=False)
except Exception:
logging.exception(f"Error running cmd '{cmd}'")
raise
def get_system_info(self) -> Dict[str, Any]: def get_system_info(self) -> Dict[str, Any]:
return self.system_info return self.system_info
@ -282,31 +276,6 @@ class Machine:
logging.info("Error Reading /proc/meminfo") logging.info("Error Reading /proc/meminfo")
return cpu_info return cpu_info
async def _find_active_services(self):
shell_cmd: SCMDComp = self.server.lookup_component('shell_command')
scmd = shell_cmd.build_shell_command(
"systemctl list-units --all --type=service --plain --no-legend")
try:
resp = await scmd.run_with_response()
lines = resp.split('\n')
services = [line.split()[0].strip() for line in lines
if ".service" in line.strip()]
except Exception:
services = []
for svc in services:
sname = svc.rsplit('.', 1)[0]
for allowed in ALLOWED_SERVICES:
if sname.startswith(allowed):
self.available_services[sname] = {
'active_state': "unknown",
'sub_state': "unknown"
}
avail_list = list(self.available_services.keys())
self.system_info['available_services'] = avail_list
self.system_info['service_state'] = self.available_services
await self.update_service_status(notify=False)
self.init_evt.set()
def _check_inside_container(self) -> Dict[str, Any]: def _check_inside_container(self) -> Dict[str, Any]:
cgroup_file = pathlib.Path(CGROUP_PATH) cgroup_file = pathlib.Path(CGROUP_PATH)
virt_type = virt_id = "none" virt_type = virt_id = "none"
@ -341,81 +310,17 @@ class Machine:
virt_id = "docker" virt_id = "docker"
except Exception: except Exception:
logging.exception(f"Error reading {SCHED_PATH}") logging.exception(f"Error reading {SCHED_PATH}")
self.virt_id = virt_id
return { return {
'virt_type': virt_type, 'virt_type': virt_type,
'virt_identifier': virt_id 'virt_identifier': virt_id
} }
async def _check_virt_status(self) -> None: async def _parse_network_interfaces(self,
# Fallback virtualization check sequence: int,
virt_id = virt_type = "none" notify: bool = True
) -> None:
shell_cmd: SCMDComp = self.server.lookup_component('shell_command') if sequence % NETWORK_UPDATE_SEQUENCE:
# Check for any form of virtualization. This will report the innermost
# virtualization type in the event that nested virtualization is used
scmd = shell_cmd.build_shell_command("systemd-detect-virt")
try:
resp = await scmd.run_with_response()
except shell_cmd.error:
pass
else:
virt_id = resp.strip()
if virt_id != "none":
# Check explicitly for container virtualization
scmd = shell_cmd.build_shell_command(
"systemd-detect-virt --container")
try:
resp = await scmd.run_with_response()
except shell_cmd.error:
virt_type = "vm"
else:
if virt_id == resp.strip():
virt_type = "container"
else:
# Moonraker is run from within a VM inside a container
virt_type = "vm"
logging.info(
f"Virtualized Environment Detected, Type: {virt_type} "
f"id: {virt_id}")
else:
logging.info("No Virtualization Detected")
self.virt_id = virt_id
self.system_info['virtualization'] = {
'virt_type': virt_type,
'virt_identifier': virt_id
}
async def update_service_status(self, notify: bool = True) -> None:
if not self.available_services:
return return
svcs = self.system_info['available_services']
shell_cmd: SCMDComp = self.server.lookup_component('shell_command')
scmd = shell_cmd.build_shell_command(
"systemctl show -p ActiveState,SubState --value "
f"{' '.join(svcs)}")
try:
resp = await scmd.run_with_response(log_complete=False)
for svc, state in zip(svcs, resp.strip().split('\n\n')):
active_state, sub_state = state.split('\n', 1)
new_state: Dict[str, str] = {
'active_state': active_state,
'sub_state': sub_state
}
if self.available_services[svc] != new_state:
self.available_services[svc] = new_state
if notify:
self.server.send_event(
"machine:service_state_changed",
{svc: new_state})
except Exception:
logging.exception("Error processing service state update")
async def parse_network_interfaces(self, notify: bool = True) -> None:
shell_cmd: SCMDComp = self.server.lookup_component('shell_command') shell_cmd: SCMDComp = self.server.lookup_component('shell_command')
scmd = shell_cmd.build_shell_command("ip -json address") scmd = shell_cmd.build_shell_command("ip -json address")
network: Dict[str, Any] = {} network: Dict[str, Any] = {}
@ -452,5 +357,135 @@ class Machine:
self.server.send_event("machine:net_state_changed", network) self.server.send_event("machine:net_state_changed", network)
self.system_info['network'] = network self.system_info['network'] = network
class BaseProvider:
def __init__(self, config: ConfigHelper) -> None:
self.server = config.get_server()
self.available_services: Dict[str, Dict[str, str]] = {}
self.shell_cmd: SCMDComp = self.server.load_component(
config, 'shell_command')
async def initialize(self) -> None:
pass
async def shutdown(self) -> None:
await self.shell_cmd.exec_cmd(f"sudo shutdown now")
async def reboot(self) -> None:
await self.shell_cmd.exec_cmd(f"sudo shutdown -r now")
async def do_service_action(self,
action: str,
service_name: str
) -> None:
pass
async def check_virt_status(self) -> Dict[str, Any]:
pass
def is_service_available(self, service: str) -> bool:
return service in self.available_services
def get_available_services(self) -> Dict[str, Dict[str, str]]:
return self.available_services
class SystemdCliProvider(BaseProvider):
async def initialize(self) -> None:
await self._detect_active_services()
if self.available_services:
svcs = list(self.available_services.keys())
self.svc_cmd = self.shell_cmd.build_shell_command(
"systemctl show -p ActiveState,SubState --value "
f"{' '.join(svcs)}")
await self._update_service_status(0, notify=True)
pstats: ProcStats = self.server.lookup_component('proc_stats')
pstats.register_stat_callback(self._update_service_status)
async def do_service_action(self,
action: str,
service_name: str
) -> None:
await self.shell_cmd.exec_cmd(
f'sudo systemctl {action} {service_name}')
async def check_virt_status(self) -> Dict[str, Any]:
# Fallback virtualization check
virt_id = virt_type = "none"
# Check for any form of virtualization. This will report the innermost
# virtualization type in the event that nested virtualization is used
try:
resp: str = await self.shell_cmd.exec_cmd("systemd-detect-virt")
except self.shell_cmd.error:
pass
else:
virt_id = resp.strip()
if virt_id != "none":
# Check explicitly for container virtualization
try:
resp = await self.shell_cmd.exec_cmd(
"systemd-detect-virt --container")
except self.shell_cmd.error:
virt_type = "vm"
else:
if virt_id == resp.strip():
virt_type = "container"
else:
# Moonraker is run from within a VM inside a container
virt_type = "vm"
logging.info(
f"Virtualized Environment Detected, Type: {virt_type} "
f"id: {virt_id}")
else:
logging.info("No Virtualization Detected")
return {
'virt_type': virt_type,
'virt_identifier': virt_id
}
async def _detect_active_services(self):
try:
resp: str = await self.shell_cmd.exec_cmd(
"systemctl list-units --all --type=service --plain"
" --no-legend")
lines = resp.split('\n')
services = [line.split()[0].strip() for line in lines
if ".service" in line.strip()]
except Exception:
services = []
for svc in services:
sname = svc.rsplit('.', 1)[0]
for allowed in ALLOWED_SERVICES:
if sname.startswith(allowed):
self.available_services[sname] = {
'active_state': "unknown",
'sub_state': "unknown"
}
async def _update_service_status(self,
sequence: int,
notify: bool = True
) -> None:
if sequence % 2:
# Update every other sequence
return
svcs = list(self.available_services.keys())
try:
resp = await self.svc_cmd.run_with_response(log_complete=False)
for svc, state in zip(svcs, resp.strip().split('\n\n')):
active_state, sub_state = state.split('\n', 1)
new_state: Dict[str, str] = {
'active_state': active_state,
'sub_state': sub_state
}
if self.available_services[svc] != new_state:
self.available_services[svc] = new_state
if notify:
self.server.send_event(
"machine:service_state_changed",
{svc: new_state})
except Exception:
logging.exception("Error processing service state update")
def load_component(config: ConfigHelper) -> Machine: def load_component(config: ConfigHelper) -> Machine:
return Machine(config) return Machine(config)