machine: add support for parsing network info
Signed-off-by: Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
parent
7a99f83396
commit
b33183f60f
|
@ -7,6 +7,7 @@
|
|||
from __future__ import annotations
|
||||
import os
|
||||
import re
|
||||
import json
|
||||
import pathlib
|
||||
import logging
|
||||
import asyncio
|
||||
|
@ -18,6 +19,7 @@ from typing import (
|
|||
TYPE_CHECKING,
|
||||
Any,
|
||||
Dict,
|
||||
List
|
||||
)
|
||||
if TYPE_CHECKING:
|
||||
from confighelper import ConfigHelper
|
||||
|
@ -39,7 +41,7 @@ SD_MFGRS = {
|
|||
'03': "Sandisk",
|
||||
'74': "PNY"
|
||||
}
|
||||
|
||||
IP_FAMILIES = {'inet': 'ipv4', 'inet6': 'ipv6'}
|
||||
class Machine:
|
||||
def __init__(self, config: ConfigHelper) -> None:
|
||||
self.server = config.get_server()
|
||||
|
@ -105,6 +107,7 @@ class Machine:
|
|||
if not self.inside_container:
|
||||
await self._check_virt_status()
|
||||
await self._find_active_services()
|
||||
await self.parse_network_interfaces(notify=False)
|
||||
self._update_log_rollover()
|
||||
|
||||
async def _handle_machine_request(self, web_request: WebRequest) -> str:
|
||||
|
@ -411,5 +414,37 @@ class Machine:
|
|||
except Exception:
|
||||
logging.exception("Error processing service state update")
|
||||
|
||||
async def parse_network_interfaces(self, notify: bool = True) -> None:
|
||||
shell_cmd: SCMDComp = self.server.lookup_component('shell_command')
|
||||
scmd = shell_cmd.build_shell_command("ip -json address")
|
||||
network: Dict[str, Any] = {}
|
||||
try:
|
||||
resp = await scmd.run_with_response(log_complete=False)
|
||||
decoded = json.loads(resp)
|
||||
for interface in decoded:
|
||||
if (
|
||||
interface['operstate'] != "UP" or
|
||||
interface['link_type'] == "loopback"
|
||||
):
|
||||
continue
|
||||
addresses: List[Dict[str, Any]] = [
|
||||
{
|
||||
'family': IP_FAMILIES[addr['family']],
|
||||
'address': addr['local']
|
||||
}
|
||||
for addr in interface.get('addr_info', [])
|
||||
if 'family' in addr and 'local' in addr
|
||||
]
|
||||
network[interface['ifname']] = {
|
||||
'mac_address': interface['address'],
|
||||
'ip_addresses': addresses
|
||||
}
|
||||
except Exception:
|
||||
logging.exception("Error processing network update")
|
||||
prev_network = self.system_info.get('network', {})
|
||||
if notify and network != prev_network:
|
||||
self.server.send_event("machine:net_state_changed", network)
|
||||
self.system_info['network'] = network
|
||||
|
||||
def load_component(config: ConfigHelper) -> Machine:
|
||||
return Machine(config)
|
||||
|
|
Loading…
Reference in New Issue