From 393cfa2a0e3e2280bbf8f6256d59349495ba1e88 Mon Sep 17 00:00:00 2001 From: Eric Callahan Date: Thu, 3 Mar 2022 18:09:45 -0500 Subject: [PATCH] machine: implement get_public_network method This allows components to determine the current local ip address that routes to the internet. Signed-off-by: Eric Callahan --- moonraker/components/machine.py | 80 +++++++++++++++++++++++++++++++-- 1 file changed, 77 insertions(+), 3 deletions(-) diff --git a/moonraker/components/machine.py b/moonraker/components/machine.py index 52e2c74..bc78513 100644 --- a/moonraker/components/machine.py +++ b/moonraker/components/machine.py @@ -13,6 +13,8 @@ import pathlib import logging import asyncio import platform +import socket +import ipaddress import distro # Annotation imports @@ -108,6 +110,11 @@ class Machine: self.server.register_remote_method( "reboot_machine", self.sys_provider.reboot) + # IP network shell commands + shell_cmd: SCMDComp = self.server.load_component( + config, 'shell_command') + self.addr_cmd = shell_cmd.build_shell_command("ip -json address") + self.iwgetid_cmd = shell_cmd.build_shell_command("iwgetid") self.init_evt = asyncio.Event() def _update_log_rollover(self, log: bool = False) -> None: @@ -341,11 +348,10 @@ class Machine: ) -> None: if sequence % NETWORK_UPDATE_SEQUENCE: return - shell_cmd: SCMDComp = self.server.lookup_component('shell_command') - scmd = shell_cmd.build_shell_command("ip -json address") network: Dict[str, Any] = {} try: - resp = await scmd.run_with_response(log_complete=False) + # get network interfaces + resp = await self.addr_cmd.run_with_response(log_complete=False) decoded = json.loads(resp) for interface in decoded: if ( @@ -377,6 +383,74 @@ class Machine: self.server.send_event("machine:net_state_changed", network) self.system_info['network'] = network + async def get_public_network(self) -> Dict[str, Any]: + wifis = await self._get_wifi_interfaces() + public_intf = self._find_public_interface() + ifname = public_intf["ifname"] + is_wifi = ifname in wifis + public_intf["is_wifi"] = is_wifi + if is_wifi: + public_intf["ssid"] = wifis[ifname] + # TODO: Can we detect the private top level domain? That + # would be ideal + public_intf["hostname"] = socket.gethostname() + return public_intf + + def _find_public_interface(self) -> Dict[str, Any]: + src_ip = self._find_public_ip() + networks = self.system_info.get("network", {}) + for ifname, ifinfo in networks.items(): + for addrinfo in ifinfo["ip_addresses"]: + if addrinfo["is_link_local"]: + continue + fam = addrinfo["family"] + addr = addrinfo["address"] + if fam == "ipv6" and src_ip is None: + ip = ipaddress.ip_address(addr) + if ip.is_global: + return { + "ifname": ifname, + "address": addr, + "family": fam + } + elif src_ip == addr: + return { + "ifname": ifname, + "address": addr, + "family": fam + } + return {} + + def _find_public_ip(self) -> Optional[str]: + # Check for an IPv4 Source IP + # NOTE: It should also be possible to extract this from + # the routing table, ie: ip -json route + # It would be an entry with a "gateway" with the lowest + # metric. Might also be able to get IPv6 info from this. + # However, it would be better to use NETLINK for this rather + # than run another shell command + src_ip: Optional[str] = None + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + try: + s.settimeout(0) + s.connect(('10.255.255.255', 1)) + src_ip = s.getsockname()[0] + except Exception: + pass + finally: + s.close() + return src_ip + + async def _get_wifi_interfaces(self) -> Dict[str, Any]: + # get wifi interfaces + wifi_intfs: Dict[str, Any] = {} + resp = await self.iwgetid_cmd.run_with_response(log_complete=False) + if resp: + for line in resp.split("\n"): + parts = line.strip().split(maxsplit=1) + wifi_intfs[parts[0]] = parts[1].split(":")[-1].strip('"') + return wifi_intfs + class BaseProvider: def __init__(self, config: ConfigHelper) -> None: self.server = config.get_server()