machine: report canbus info

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2022-11-16 13:50:00 -05:00
parent bfe20433f9
commit 68f5de6d2d
No known key found for this signature in database
GPG Key ID: 5A1EB336DFB4C71B
1 changed files with 38 additions and 22 deletions

View File

@ -100,7 +100,9 @@ class Machine:
'cpu_info': self._get_cpu_info(),
'sd_info': self._get_sdcard_info(),
'distribution': dist_info,
'virtualization': self._check_inside_container()
'virtualization': self._check_inside_container(),
'network': {},
'canbus': {}
}
self._update_log_rollover(log=True)
providers: Dict[str, type] = {
@ -151,7 +153,7 @@ class Machine:
# IP network shell commands
shell_cmd: SCMDComp = self.server.load_component(
config, 'shell_command')
self.addr_cmd = shell_cmd.build_shell_command("ip -json address")
self.addr_cmd = shell_cmd.build_shell_command("ip -json -det address")
iwgetbin = "/sbin/iwgetid"
if not pathlib.Path(iwgetbin).exists():
iwgetbin = "iwgetid"
@ -567,32 +569,45 @@ class Machine:
if sequence % NETWORK_UPDATE_SEQUENCE:
return
network: Dict[str, Any] = {}
canbus: Dict[str, Any] = {}
try:
# get network interfaces
resp = await self.addr_cmd.run_with_response(log_complete=False)
decoded = json.loads(resp)
decoded: List[Dict[str, Any]] = json.loads(resp)
for interface in decoded:
if (
interface['operstate'] != "UP" or
interface['link_type'] != "ether" or
'address' not in interface
):
if interface['operstate'] != "UP":
continue
addresses: List[Dict[str, Any]] = [
{
'family': IP_FAMILIES[addr['family']],
'address': addr['local'],
'is_link_local': addr.get('scope', "") == "link"
if interface['link_type'] == "can":
infodata: dict = interface.get(
"linkinfo", {}).get("info_data", {})
canbus[interface['ifname']] = {
'tx_queue_len': interface['txqlen'],
'bitrate': infodata.get("bittiming", {}).get(
"bitrate", -1
),
'driver': infodata.get("bittiming_const", {}).get(
"name", "unknown"
)
}
elif (
interface['link_type'] == "ether" and
'address' in interface
):
addresses: List[Dict[str, Any]] = [
{
'family': IP_FAMILIES[addr['family']],
'address': addr['local'],
'is_link_local': addr.get('scope', "") == "link"
}
for addr in interface.get('addr_info', [])
if 'family' in addr and 'local' in addr
]
if not addresses:
continue
network[interface['ifname']] = {
'mac_address': interface['address'],
'ip_addresses': addresses
}
for addr in interface.get('addr_info', [])
if 'family' in addr and 'local' in addr
]
if not addresses:
continue
network[interface['ifname']] = {
'mac_address': interface['address'],
'ip_addresses': addresses
}
except Exception:
logging.exception("Error processing network update")
return
@ -602,6 +617,7 @@ class Machine:
if notify:
self.server.send_event("machine:net_state_changed", network)
self.system_info['network'] = network
self.system_info['canbus'] = canbus
async def get_public_network(self) -> Dict[str, Any]:
wifis = await self._get_wifi_interfaces()