machine: implement get_public_network method

This allows components to determine the current local
ip address that routes to the internet.

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2022-03-03 18:09:45 -05:00
parent 9c33eb225c
commit 393cfa2a0e
No known key found for this signature in database
GPG Key ID: 7027245FBBDDF59A
1 changed files with 77 additions and 3 deletions

View File

@ -13,6 +13,8 @@ import pathlib
import logging import logging
import asyncio import asyncio
import platform import platform
import socket
import ipaddress
import distro import distro
# Annotation imports # Annotation imports
@ -108,6 +110,11 @@ class Machine:
self.server.register_remote_method( self.server.register_remote_method(
"reboot_machine", self.sys_provider.reboot) "reboot_machine", self.sys_provider.reboot)
# IP network shell commands
shell_cmd: SCMDComp = self.server.load_component(
config, 'shell_command')
self.addr_cmd = shell_cmd.build_shell_command("ip -json address")
self.iwgetid_cmd = shell_cmd.build_shell_command("iwgetid")
self.init_evt = asyncio.Event() self.init_evt = asyncio.Event()
def _update_log_rollover(self, log: bool = False) -> None: def _update_log_rollover(self, log: bool = False) -> None:
@ -341,11 +348,10 @@ class Machine:
) -> None: ) -> None:
if sequence % NETWORK_UPDATE_SEQUENCE: if sequence % NETWORK_UPDATE_SEQUENCE:
return return
shell_cmd: SCMDComp = self.server.lookup_component('shell_command')
scmd = shell_cmd.build_shell_command("ip -json address")
network: Dict[str, Any] = {} network: Dict[str, Any] = {}
try: try:
resp = await scmd.run_with_response(log_complete=False) # get network interfaces
resp = await self.addr_cmd.run_with_response(log_complete=False)
decoded = json.loads(resp) decoded = json.loads(resp)
for interface in decoded: for interface in decoded:
if ( if (
@ -377,6 +383,74 @@ class Machine:
self.server.send_event("machine:net_state_changed", network) self.server.send_event("machine:net_state_changed", network)
self.system_info['network'] = network self.system_info['network'] = network
async def get_public_network(self) -> Dict[str, Any]:
wifis = await self._get_wifi_interfaces()
public_intf = self._find_public_interface()
ifname = public_intf["ifname"]
is_wifi = ifname in wifis
public_intf["is_wifi"] = is_wifi
if is_wifi:
public_intf["ssid"] = wifis[ifname]
# TODO: Can we detect the private top level domain? That
# would be ideal
public_intf["hostname"] = socket.gethostname()
return public_intf
def _find_public_interface(self) -> Dict[str, Any]:
src_ip = self._find_public_ip()
networks = self.system_info.get("network", {})
for ifname, ifinfo in networks.items():
for addrinfo in ifinfo["ip_addresses"]:
if addrinfo["is_link_local"]:
continue
fam = addrinfo["family"]
addr = addrinfo["address"]
if fam == "ipv6" and src_ip is None:
ip = ipaddress.ip_address(addr)
if ip.is_global:
return {
"ifname": ifname,
"address": addr,
"family": fam
}
elif src_ip == addr:
return {
"ifname": ifname,
"address": addr,
"family": fam
}
return {}
def _find_public_ip(self) -> Optional[str]:
# Check for an IPv4 Source IP
# NOTE: It should also be possible to extract this from
# the routing table, ie: ip -json route
# It would be an entry with a "gateway" with the lowest
# metric. Might also be able to get IPv6 info from this.
# However, it would be better to use NETLINK for this rather
# than run another shell command
src_ip: Optional[str] = None
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
s.settimeout(0)
s.connect(('10.255.255.255', 1))
src_ip = s.getsockname()[0]
except Exception:
pass
finally:
s.close()
return src_ip
async def _get_wifi_interfaces(self) -> Dict[str, Any]:
# get wifi interfaces
wifi_intfs: Dict[str, Any] = {}
resp = await self.iwgetid_cmd.run_with_response(log_complete=False)
if resp:
for line in resp.split("\n"):
parts = line.strip().split(maxsplit=1)
wifi_intfs[parts[0]] = parts[1].split(":")[-1].strip('"')
return wifi_intfs
class BaseProvider: class BaseProvider:
def __init__(self, config: ConfigHelper) -> None: def __init__(self, config: ConfigHelper) -> None:
self.server = config.get_server() self.server = config.get_server()