zeroconf: add support for ssdp discovery

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2023-06-23 15:00:47 -04:00
parent a55818bb1a
commit 1f0fd699ea
No known key found for this signature in database
GPG Key ID: 5A1EB336DFB4C71B
1 changed files with 281 additions and 1 deletions

View File

@ -8,14 +8,28 @@ import socket
import asyncio import asyncio
import logging import logging
import ipaddress import ipaddress
import random
import uuid
from itertools import cycle
from email.utils import formatdate
from zeroconf import IPVersion from zeroconf import IPVersion
from zeroconf.asyncio import AsyncServiceInfo, AsyncZeroconf from zeroconf.asyncio import AsyncServiceInfo, AsyncZeroconf
from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional from typing import (
TYPE_CHECKING,
Any,
Dict,
Iterator,
List,
Optional,
Tuple
)
if TYPE_CHECKING: if TYPE_CHECKING:
from ..confighelper import ConfigHelper from ..confighelper import ConfigHelper
from ..common import WebRequest
from ..app import MoonrakerApp from ..app import MoonrakerApp
from .authorization import Authorization
from .machine import Machine from .machine import Machine
ZC_SERVICE_TYPE = "_moonraker._tcp.local." ZC_SERVICE_TYPE = "_moonraker._tcp.local."
@ -62,6 +76,9 @@ class ZeroconfRegistrar:
if self.bound_all: if self.bound_all:
self.server.register_event_handler( self.server.register_event_handler(
"machine:net_state_changed", self._update_service) "machine:net_state_changed", self._update_service)
self.ssdp_server: Optional[SSDPServer] = None
if config.getboolean("enable_ssdp", False):
self.ssdp_server = SSDPServer(config)
async def component_init(self) -> None: async def component_init(self) -> None:
logging.info("Starting Zeroconf services") logging.info("Starting Zeroconf services")
@ -107,9 +124,20 @@ class ZeroconfRegistrar:
server=f"{server_name}.local.", server=f"{server_name}.local.",
) )
await self.runner.register_services([self.service_info]) await self.runner.register_services([self.service_info])
if self.ssdp_server is not None:
addr = self.cfg_addr if not self.bound_all else machine.public_ip
if not addr:
addr = f"{self.mdns_name}.local"
name = f"{instance_name} ({host})"
if len(name) > 64:
name = instance_name
await self.ssdp_server.start()
self.ssdp_server.register_service(name, addr, hi["port"])
async def close(self) -> None: async def close(self) -> None:
await self.runner.unregister_services([self.service_info]) await self.runner.unregister_services([self.service_info])
if self.ssdp_server is not None:
await self.ssdp_server.stop()
async def _update_service(self, network: Dict[str, Any]) -> None: async def _update_service(self, network: Dict[str, Any]) -> None:
if self.bound_all: if self.bound_all:
@ -127,5 +155,257 @@ class ZeroconfRegistrar:
yield socket.inet_pton(family, addr_info["address"]) yield socket.inet_pton(family, addr_info["address"])
SSDP_ADDR = ("239.255.255.250", 1900)
SSDP_SERVER_ID = "Moonraker SSDP/UPNP Server"
SSDP_MAX_AGE = 1800
SSDP_DEVICE_TYPE = "urn:arksine.github.io:device:Moonraker:1"
SSDP_DEVICE_XML = """
<?xml version="1.0"?>
<root xmlns="urn:schemas-upnp-org:device-1-0" configId="{config_id}">
<specVersion>
<major>2</major>
<minor>0</minor>
</specVersion>
<device>
<deviceType>{device_type}</deviceType>
<friendlyName>{friendly_name}</friendlyName>
<manufacturer>Arksine</manufacturer>
<manufacturerURL>https://github.com/Arksine/moonraker</manufacturerURL>
<modelDescription>API Server for Klipper</modelDescription>
<modelName>Moonraker</modelName>
<modelNumber>{model_number}</modelNumber>
<modelURL>https://github.com/Arksine/moonraker</modelURL>
<serialNumber>{serial_number}</serialNumber>
<UDN>uuid:{device_uuid}</UDN>
<presentationURL>{presentation_url}</presentationURL>
</device>
</root>
""".strip()
class SSDPServer(asyncio.protocols.DatagramProtocol):
def __init__(self, config: ConfigHelper) -> None:
self.server = config.get_server()
self.unique_id = uuid.UUID(self.server.get_app_args()["instance_uuid"])
self.name: str = "Moonraker"
self.base_url: str = ""
self.response_headers: List[str] = []
self.registered: bool = False
self.running: bool = False
self.close_fut: Optional[asyncio.Future] = None
self.response_handle: Optional[asyncio.TimerHandle] = None
eventloop = self.server.get_event_loop()
self.boot_id = int(eventloop.get_loop_time())
self.config_id = 1
self.ad_timer = eventloop.register_timer(self._advertise_presence)
auth: Optional[Authorization]
auth = self.server.load_component(config, "authorization", None)
if auth is not None:
auth.register_permited_path("/server/zeroconf/ssdp")
self.server.register_endpoint(
"/server/zeroconf/ssdp",
["GET"],
self._handle_xml_request,
transports=["http"],
wrap_result=False,
content_type="application/xml"
)
def _create_ssdp_socket(
self,
source_addr: Tuple[str, int] = ("0.0.0.0", 0),
target_addr: Tuple[str, int] = SSDP_ADDR
) -> socket.socket:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
except AttributeError:
pass
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
source_ip = socket.inet_aton(source_addr[0])
target_ip = socket.inet_aton(target_addr[0])
ip_combo = target_ip + source_ip
sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF, source_ip)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, ip_combo)
return sock
async def start(self) -> None:
if self.running:
return
try:
sock = self._create_ssdp_socket()
sock.settimeout(0)
sock.setblocking(False)
sock.bind(("", SSDP_ADDR[1]))
_loop = asyncio.get_running_loop()
ret = await _loop.create_datagram_endpoint(lambda: self, sock=sock)
self.transport, _ = ret
except (socket.error, OSError):
return
self.running = True
async def stop(self) -> None:
if not self.running:
return
self.running = False
self.ad_timer.stop()
if self.response_handle is not None:
self.response_handle.cancel()
self.response_handle = None
if self.transport.is_closing():
logging.info("Transport already closing")
return
for notification in self._build_notifications("ssdp:byebye"):
self.transport.sendto(notification, SSDP_ADDR)
self.close_fut = self.server.get_event_loop().create_future()
self.transport.close()
try:
await asyncio.wait_for(self.close_fut, 2.)
except asyncio.TimeoutError:
pass
self.close_fut = None
def register_service(
self, name: str, host_name_or_ip: str, port: int
) -> None:
if len(name) > 64:
name = name[:64]
self.name = name
self.base_url = f"http://{host_name_or_ip}:{port}"
self.response_headers = [
f"USN: uuid:{self.unique_id}::upnp:rootdevice",
f"LOCATION: {self.base_url}/server/zeroconf/ssdp",
"ST: upnp:rootdevice",
"EXT:",
f"SERVER: {SSDP_SERVER_ID}",
f"CACHE-CONTROL: max-age={SSDP_MAX_AGE}",
f"BOOTID.UPNP.ORG: {self.boot_id}",
f"CONFIGID.UPNP.ORG: {self.config_id}",
]
self.registered = True
advertisements = self._build_notifications("ssdp:alive")
if self.running:
for ad in advertisements:
self.transport.sendto(ad, SSDP_ADDR)
self.advertisements = cycle(advertisements)
self.ad_timer.start()
async def _handle_xml_request(self, web_request: WebRequest) -> str:
if not self.registered:
raise self.server.error("Moonraker SSDP Device not registered", 404)
app_args = self.server.get_app_args()
return SSDP_DEVICE_XML.format(
device_type=SSDP_DEVICE_TYPE,
config_id=str(self.config_id),
friendly_name=self.name,
model_number=app_args["software_version"],
serial_number=self.unique_id.hex,
device_uuid=str(self.unique_id),
presentation_url=self.base_url
)
def _advertise_presence(self, eventtime: float) -> float:
if self.running and self.registered:
cur_ad = next(self.advertisements)
self.transport.sendto(cur_ad, SSDP_ADDR)
delay = random.uniform(SSDP_MAX_AGE / 6., SSDP_MAX_AGE / 3.)
return eventtime + delay
def connection_made(
self, transport: asyncio.transports.BaseTransport
) -> None:
logging.debug("SSDP Server Connected")
def connection_lost(self, exc: Exception | None) -> None:
logging.debug("SSDP Server Disconnected")
if self.close_fut is not None:
self.close_fut.set_result(None)
def pause_writing(self) -> None:
logging.debug("SSDP Pause Writing Requested")
def resume_writing(self) -> None:
logging.debug("SSDP Resume Writing Requested")
def datagram_received(self, data: bytes, addr: tuple[str | Any, int]) -> None:
if not self.registered:
return
try:
parts = data.decode().split("\r\n\r\n", maxsplit=1)
header = parts[0]
except ValueError as e:
logging.exception("Data Decode Error")
return
hlines = header.splitlines()
ssdp_command = hlines[0].strip()
headers = {}
for line in hlines[1:]:
parts = line.strip().split(":", maxsplit=1)
if len(parts) < 2:
continue
headers[parts[0].upper()] = parts[1].strip()
if (
ssdp_command != "M-SEARCH * HTTP/1.1" or
headers.get("MAN") != '"ssdp:discover"'
):
# Not a discovery request
return
if headers.get("ST") not in ["upnp:rootdevice", "ssdp:all"]:
# Service Type doesn't apply
return
if self.response_handle is not None:
# response in progress
return
if "MX" in headers:
delay_time = random.uniform(0, float(headers["MX"]))
eventloop = self.server.get_event_loop()
self.response_handle = eventloop.delay_callback(
delay_time, self._respond_to_discovery, addr
)
else:
self._respond_to_discovery(addr)
def _respond_to_discovery(self, addr: tuple[str | Any, int]) -> None:
if not self.running:
return
self.response_handle = None
response: List[str] = ["HTTP/1.1 200 OK"]
response.extend(self.response_headers)
response.append(f"DATE: {formatdate(usegmt=True)}")
response.extend(["", ""])
self.transport.sendto("\r\n".join(response).encode(), addr)
def _build_notifications(self, nts: str) -> List[bytes]:
notifications: List[bytes] = []
notify_types = [
("upnp:rootdevice", f"uuid:{self.unique_id}::upnp:rootdevice"),
(f"uuid:{self.unique_id}", f"uuid:{self.unique_id}"),
(SSDP_DEVICE_TYPE, f"uuid:{self.unique_id}::{SSDP_DEVICE_TYPE}")
]
for (nt, usn) in notify_types:
notifications.append(
"\r\n".join([
"NOTIFY * HTTP/1.1",
f"HOST: {SSDP_ADDR[0]}:{SSDP_ADDR[1]}",
f"NTS: {nts}",
f"NT: {nt}",
f"USN: {usn}",
f"LOCATION: {self.base_url}/server/zeroconf/ssdp",
"EXT:",
f"SERVER: {SSDP_SERVER_ID}",
f"CACHE-CONTROL: max-age={SSDP_MAX_AGE}",
f"BOOTID.UPNP.ORG: {self.boot_id}",
f"CONFIGID.UPNP.ORG: {self.config_id}",
"",
""
]).encode()
)
return notifications
def error_received(self, exc: Exception) -> None:
logging.info(f"SSDP Server Error: {exc}")
def load_component(config: ConfigHelper) -> ZeroconfRegistrar: def load_component(config: ConfigHelper) -> ZeroconfRegistrar:
return ZeroconfRegistrar(config) return ZeroconfRegistrar(config)