webcam: support lookup by UID

Expose the UID assigned to webcams and allow clients to fetch, modify,
and delete items based on the UID.  New additions must not specify a
UID.

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2023-08-11 11:50:24 -04:00
parent ee62d07c68
commit a232260713
No known key found for this signature in database
GPG Key ID: 5A1EB336DFB4C71B
1 changed files with 158 additions and 113 deletions

View File

@ -16,10 +16,10 @@ from typing import (
Dict, Dict,
List, List,
Any, Any,
Tuple
) )
if TYPE_CHECKING: if TYPE_CHECKING:
from asyncio import Future
from ..server import Server from ..server import Server
from ..confighelper import ConfigHelper from ..confighelper import ConfigHelper
from ..common import WebRequest from ..common import WebRequest
@ -68,17 +68,35 @@ class WebcamManager:
machine: Machine = self.server.lookup_component("machine") machine: Machine = self.server.lookup_component("machine")
if machine.public_ip: if machine.public_ip:
self._set_default_host_ip(machine.public_ip) self._set_default_host_ip(machine.public_ip)
all_uids = [wc.uid for wc in self.webcams.values()]
db: MoonrakerDatabase = self.server.lookup_component("database") db: MoonrakerDatabase = self.server.lookup_component("database")
saved_cams: Dict[str, Any] = await db.get_item("webcams", default={}) db_cams: Dict[str, Dict[str, Any]] = await db.get_item("webcams", default={})
for cam_data in saved_cams.values(): ro_info: List[str] = []
# Process configured cams
for uid, cam_data in db_cams.items():
try: try:
cam_data["uid"] = uid
webcam = WebCam.from_database(self.server, cam_data) webcam = WebCam.from_database(self.server, cam_data)
if uid in all_uids:
# Unlikely but possible collision between random UUID4
# and UUID5 generated from a configured webcam.
await db.delete_item("webcams", uid)
webcam.uid = self._get_guaranteed_uuid()
await self._save_cam(webcam, False)
ro_info.append(f"Detected webcam UID collision: {uid}")
all_uids.append(webcam.uid)
if webcam.name in self.webcams: if webcam.name in self.webcams:
ro_info.append(
f"Detected webcam name collision: {webcam.name}, uuid: "
f"{uid}. This camera will be ignored."
)
continue continue
self.webcams[webcam.name] = webcam self.webcams[webcam.name] = webcam
except Exception: except Exception:
logging.exception("Failed to process webcam from db") logging.exception("Failed to process webcam from db")
continue continue
if ro_info:
self.server.add_log_rollover_item("webcam", "\n".join(ro_info))
def _set_default_host_ip(self, ip: str) -> None: def _set_default_host_ip(self, ip: str) -> None:
default_host = "http://127.0.0.1" default_host = "http://127.0.0.1"
@ -101,103 +119,113 @@ class WebcamManager:
def _list_webcams(self) -> List[Dict[str, Any]]: def _list_webcams(self) -> List[Dict[str, Any]]:
return [wc.as_dict() for wc in self.webcams.values()] return [wc.as_dict() for wc in self.webcams.values()]
async def _find_dbcam_by_uuid( def _save_cam(self, webcam: WebCam, save_local: bool = True) -> Future:
self, name: str if save_local:
) -> Tuple[str, Dict[str, Any]]: self.webcams[webcam.name] = webcam
db: MoonrakerDatabase = self.server.lookup_component("database") cam_data: Dict[str, Any] = {}
saved_cams: Dict[str, Dict[str, Any]]
saved_cams = await db.get_item("webcams", default={})
for uid, cam_data in saved_cams.items():
if name == cam_data["name"]:
return uid, cam_data
return "", {}
async def _save_cam(self, webcam: WebCam) -> None:
uid, cam_data = await self._find_dbcam_by_uuid(webcam.name)
if not uid:
uid = str(uuid.uuid4())
for mfield, dbfield in CAM_FIELDS.items(): for mfield, dbfield in CAM_FIELDS.items():
cam_data[dbfield] = getattr(webcam, mfield) cam_data[dbfield] = getattr(webcam, mfield)
cam_data["location"] = webcam.location cam_data["location"] = webcam.location
cam_data["rotation"] = webcam.rotation cam_data["rotation"] = webcam.rotation
cam_data["extra_data"] = webcam.extra_data cam_data["extra_data"] = webcam.extra_data
db: MoonrakerDatabase = self.server.lookup_component("database") db: MoonrakerDatabase = self.server.lookup_component("database")
db.insert_item("webcams", uid, cam_data) return db.insert_item("webcams", webcam.uid, cam_data)
async def _delete_cam(self, webcam: WebCam) -> None: def _delete_cam(self, webcam: WebCam) -> Future:
uid, cam = await self._find_dbcam_by_uuid(webcam.name)
if not uid:
return
db: MoonrakerDatabase = self.server.lookup_component("database") db: MoonrakerDatabase = self.server.lookup_component("database")
db.delete_item("webcams", uid) self.webcams.pop(webcam.name, None)
return db.delete_item("webcams", webcam.uid)
async def _handle_webcam_request( def _get_guaranteed_uuid(self) -> str:
self, web_request: WebRequest cur_uids = [wc.uid for wc in self.webcams.values()]
) -> Dict[str, Any]: while True:
action = web_request.get_action() uid = str(uuid.uuid4())
if uid not in cur_uids:
break
return uid
def get_cam_by_uid(self, uid: str) -> WebCam:
for cam in self.webcams.values():
if cam.uid == uid:
return cam
raise self.server.error(f"Webcam with UID {uid} not found", 404)
def _lookup_camera(
self, web_request: WebRequest, required: bool = True
) -> Optional[WebCam]:
args = web_request.get_args()
if "uid" in args:
return self.get_cam_by_uid(web_request.get_str("uid"))
name = web_request.get_str("name") name = web_request.get_str("name")
webcam = self.webcams.get(name, None)
if required and webcam is None:
raise self.server.error(f"Webcam {name} not found", 404)
return webcam
async def _handle_webcam_request(self, web_request: WebRequest) -> Dict[str, Any]:
action = web_request.get_action()
webcam = self._lookup_camera(web_request, action != "POST")
webcam_data: Dict[str, Any] = {} webcam_data: Dict[str, Any] = {}
if action == "GET": if action == "GET":
if name not in self.webcams: assert webcam is not None
raise self.server.error(f"Webcam {name} not found", 404) webcam_data = webcam.as_dict()
webcam_data = self.webcams[name].as_dict()
elif action == "POST": elif action == "POST":
webcam = self.webcams.get(name, None)
if webcam is not None: if webcam is not None:
if webcam.source == "config": if webcam.source == "config":
raise self.server.error( raise self.server.error(
f"Cannot overwrite webcam '{name}' sourced from " f"Cannot overwrite webcam '{webcam.name}' sourced from "
"Moonraker configuration" "Moonraker configuration"
) )
new_name = web_request.get_str("name", None)
if new_name is not None and webcam.name != new_name:
if new_name in self.webcams:
raise self.server.error(
f"Cannot rename webcam from '{webcam.name}' to "
f"'{new_name}'. Webcam with requested name '{new_name}' "
"already exists."
)
self.webcams.pop(webcam.name, None)
webcam.update(web_request) webcam.update(web_request)
else: else:
webcam = WebCam.from_web_request(self.server, web_request) uid = self._get_guaranteed_uuid()
self.webcams[name] = webcam webcam = WebCam.from_web_request(self.server, web_request, uid)
webcam_data = webcam.as_dict()
await self._save_cam(webcam) await self._save_cam(webcam)
webcam_data = webcam.as_dict()
elif action == "DELETE": elif action == "DELETE":
if name not in self.webcams: assert webcam is not None
raise self.server.error(f"Webcam {name} not found", 404) if webcam.source == "config":
elif self.webcams[name].source == "config":
raise self.server.error( raise self.server.error(
f"Cannot delete webcam '{name}' sourced from " f"Cannot delete webcam '{webcam.name}' sourced from "
"Moonraker configuration" "Moonraker configuration"
) )
webcam = self.webcams.pop(name)
webcam_data = webcam.as_dict() webcam_data = webcam.as_dict()
await self._delete_cam(webcam) self._delete_cam(webcam)
if action != "GET": if action != "GET":
self.server.send_event( self.server.send_event(
"webcam:webcams_changed", {"webcams": self._list_webcams()} "webcam:webcams_changed", {"webcams": self._list_webcams()}
) )
return {"webcam": webcam_data} return {"webcam": webcam_data}
async def _handle_webcam_list( async def _handle_webcam_list(self, web_request: WebRequest) -> Dict[str, Any]:
self, web_request: WebRequest
) -> Dict[str, Any]:
return {"webcams": self._list_webcams()} return {"webcams": self._list_webcams()}
async def _handle_webcam_test( async def _handle_webcam_test(self, web_request: WebRequest) -> Dict[str, Any]:
self, web_request: WebRequest
) -> Dict[str, Any]:
name = web_request.get_str("name")
if name not in self.webcams:
raise self.server.error(f"Webcam '{name}' not found", 404)
client: HttpClient = self.server.lookup_component("http_client") client: HttpClient = self.server.lookup_component("http_client")
cam = self.webcams[name] webcam = self._lookup_camera(web_request)
assert webcam is not None
result: Dict[str, Any] = { result: Dict[str, Any] = {
"name": name, "name": webcam.name,
"snapshot_reachable": False "snapshot_reachable": False
} }
for img_type in ["snapshot", "stream"]: for img_type in ["snapshot", "stream"]:
try: try:
func = getattr(cam, f"get_{img_type}_url") func = getattr(webcam, f"get_{img_type}_url")
result[f"{img_type}_url"] = await func(True) result[f"{img_type}_url"] = await func(True)
except Exception: except Exception:
logging.exception(f"Error Processing {img_type} url") logging.exception(f"Error Processing {img_type} url")
result[f"{img_type}_url"] = "" result[f"{img_type}_url"] = ""
url: str = result["snapshot_url"] url: str = result["snapshot_url"]
if result.get("snapshot_url", "").startswith("http"): if url.startswith("http"):
ret = await client.get(url, connect_timeout=1., request_timeout=1.) ret = await client.get(url, connect_timeout=1., request_timeout=1.)
result["snapshot_reachable"] = not ret.has_error() result["snapshot_reachable"] = not ret.has_error()
return result return result
@ -205,16 +233,17 @@ class WebcamManager:
class WebCam: class WebCam:
_default_host: str = "http://127.0.0.1" _default_host: str = "http://127.0.0.1"
_protected_fields: List[str] = ["source", "uid"]
def __init__(self, server: Server, **kwargs) -> None: def __init__(self, server: Server, **kwargs) -> None:
self._server = server self._server = server
self.name: str = kwargs["name"]
self.enabled: bool = kwargs["enabled"] self.enabled: bool = kwargs["enabled"]
self.icon: str = kwargs["icon"] self.icon: str = kwargs["icon"]
self.aspect_ratio: str = kwargs["aspect_ratio"] self.aspect_ratio: str = kwargs["aspect_ratio"]
self.target_fps: int = kwargs["target_fps"]
self.target_fps_idle: int = kwargs["target_fps_idle"] self.target_fps_idle: int = kwargs["target_fps_idle"]
self.name: str = kwargs["name"]
self.location: str = kwargs["location"] self.location: str = kwargs["location"]
self.service: str = kwargs["service"] self.service: str = kwargs["service"]
self.target_fps: int = kwargs["target_fps"]
self.stream_url: str = kwargs["stream_url"] self.stream_url: str = kwargs["stream_url"]
self.snapshot_url: str = kwargs["snapshot_url"] self.snapshot_url: str = kwargs["snapshot_url"]
self.flip_horizontal: bool = kwargs["flip_horizontal"] self.flip_horizontal: bool = kwargs["flip_horizontal"]
@ -222,6 +251,7 @@ class WebCam:
self.rotation: int = kwargs["rotation"] self.rotation: int = kwargs["rotation"]
self.source: str = kwargs["source"] self.source: str = kwargs["source"]
self.extra_data: Dict[str, Any] = kwargs.get("extra_data", {}) self.extra_data: Dict[str, Any] = kwargs.get("extra_data", {})
self.uid: str = kwargs["uid"]
if self.rotation not in [0, 90, 180, 270]: if self.rotation not in [0, 90, 180, 270]:
raise server.error(f"Invalid value for 'rotation': {self.rotation}") raise server.error(f"Invalid value for 'rotation': {self.rotation}")
prefix, sep, postfix = self.aspect_ratio.partition(":") prefix, sep, postfix = self.aspect_ratio.partition(":")
@ -330,18 +360,24 @@ class WebCam:
return url return url
def update(self, web_request: WebRequest) -> None: def update(self, web_request: WebRequest) -> None:
valid_fields = [
f for f in self.__dict__.keys() if f[0] != "_"
and f not in self._protected_fields
]
for field in web_request.get_args().keys(): for field in web_request.get_args().keys():
if field not in valid_fields:
continue
try: try:
attr = getattr(self, field) attr_type = type(getattr(self, field))
except AttributeError: except AttributeError:
continue continue
if isinstance(attr, bool): if attr_type is bool:
val: Any = web_request.get_boolean(field) val: Any = web_request.get_boolean(field)
elif isinstance(attr, int): elif attr_type is int:
val = web_request.get_int(field) val = web_request.get_int(field)
elif isinstance(attr, float): elif attr_type is float:
val = web_request.get_float(field) val = web_request.get_float(field)
elif isinstance(attr, str): elif attr_type is str:
val = web_request.get_str(field) val = web_request.get_str(field)
else: else:
val = web_request.get(field) val = web_request.get(field)
@ -353,68 +389,77 @@ class WebCam:
@classmethod @classmethod
def from_config(cls, config: ConfigHelper) -> WebCam: def from_config(cls, config: ConfigHelper) -> WebCam:
webcam: Dict[str, Any] = {}
webcam["name"] = config.get_name().split(maxsplit=1)[-1]
webcam["enabled"] = config.getboolean("enabled", True)
webcam["icon"] = config.get("icon", "mdiWebcam")
webcam["aspect_ratio"] = config.get("aspect_ratio", "4:3")
webcam["location"] = config.get("location", "printer")
webcam["service"] = config.get("service", "mjpegstreamer")
webcam["target_fps"] = config.getint("target_fps", 15)
webcam["target_fps_idle"] = config.getint("target_fps_idle", 5)
webcam["stream_url"] = config.get("stream_url")
webcam["snapshot_url"] = config.get("snapshot_url", "")
webcam["flip_horizontal"] = config.getboolean("flip_horizontal", False)
webcam["flip_vertical"] = config.getboolean("flip_vertical", False)
webcam["rotation"] = config.getint("rotation", 0)
webcam["source"] = "config"
server = config.get_server() server = config.get_server()
name = config.get_name().split(maxsplit=1)[-1]
ns = uuid.UUID(server.get_app_args()["instance_uuid"])
try: try:
return cls(config.get_server(), **webcam) return cls(
server,
name=name,
enabled=config.getboolean("enabled", True),
icon=config.get("icon", "mdiWebcam"),
aspect_ratio=config.get("aspect_ratio", "4:3"),
target_fps=config.getint("target_fps", 15),
target_fps_idle=config.getint("target_fps_idle", 5),
location=config.get("location", "printer"),
service=config.get("service", "mjpegstreamer"),
stream_url=config.get("stream_url"),
snapshot_url=config.get("snapshot_url", ""),
flip_horizontal=config.getboolean("flip_horizontal", False),
flip_vertical=config.getboolean("flip_vertical", False),
rotation=config.getint("rotation", 0),
source="config",
uid=str(uuid.uuid5(ns, f"moonraker.webcam.{name}"))
)
except server.error as err: except server.error as err:
raise config.error(str(err)) from err raise config.error(str(err)) from err
@classmethod @classmethod
def from_web_request( def from_web_request(
cls, server: Server, web_request: WebRequest cls, server: Server, web_request: WebRequest, uid: str
) -> WebCam: ) -> WebCam:
webcam: Dict[str, Any] = {} name = web_request.get_str("name")
webcam["name"] = web_request.get_str("name") return cls(
webcam["enabled"] = web_request.get_boolean("enabled", True) server,
webcam["icon"] = web_request.get_str("icon", "mdiWebcam") name=name,
webcam["aspect_ratio"] = web_request.get_str("aspect_ratio", "4:3") enabled=web_request.get_boolean("enabled", True),
webcam["location"] = web_request.get_str("location", "printer") icon=web_request.get_str("icon", "mdiWebcam"),
webcam["service"] = web_request.get_str("service", "mjpegstreamer") aspect_ratio=web_request.get_str("aspect_ratio", "4:3"),
webcam["target_fps"] = web_request.get_int("target_fps", 15) target_fps=web_request.get_int("target_fps", 15),
webcam["target_fps_idle"] = web_request.get_int("target_fps_idle", 5) target_fps_idle=web_request.get_int("target_fps_idle", 5),
webcam["stream_url"] = web_request.get_str("stream_url") location=web_request.get_str("location", "printer"),
webcam["snapshot_url"] = web_request.get_str("snapshot_url", "") service=web_request.get_str("service", "mjpegstreamer"),
webcam["flip_horizontal"] = web_request.get_boolean("flip_horizontal", False) stream_url=web_request.get_str("stream_url"),
webcam["flip_vertical"] = web_request.get_boolean("flip_vertical", False) snapshot_url=web_request.get_str("snapshot_url", ""),
webcam["rotation"] = web_request.get_int("rotation", 0) flip_horizontal=web_request.get_boolean("flip_horizontal", False),
webcam["extra_data"] = web_request.get("extra_data", {}) flip_vertical=web_request.get_boolean("flip_vertical", False),
webcam["source"] = "database" rotation=web_request.get_int("rotation", 0),
return cls(server, **webcam) source="database",
extra_data=web_request.get("extra_data", {}),
uid=uid
)
@classmethod @classmethod
def from_database(cls, server: Server, cam_data: Dict[str, Any]) -> WebCam: def from_database(cls, server: Server, cam_data: Dict[str, Any]) -> WebCam:
webcam: Dict[str, Any] = {} return cls(
webcam["name"] = str(cam_data["name"]) server,
webcam["enabled"] = bool(cam_data.get("enabled", True)) name=str(cam_data["name"]),
webcam["icon"] = str(cam_data.get("icon", "mdiWebcam")) enabled=bool(cam_data.get("enabled", True)),
webcam["aspect_ratio"] = str(cam_data.get("aspectRatio", "4:3")) icon=str(cam_data.get("icon", "mdiWebcam")),
webcam["location"] = str(cam_data.get("location", "printer")) aspect_ratio=str(cam_data.get("aspectRatio", "4:3")),
webcam["service"] = str(cam_data.get("service", "mjpegstreamer")) target_fps=int(cam_data.get("targetFps", 15)),
webcam["target_fps"] = int(cam_data.get("targetFps", 15)) target_fps_idle=int(cam_data.get("targetFpsIdle", 5)),
webcam["target_fps_idle"] = int(cam_data.get("targetFpsIdle", 5)) location=str(cam_data.get("location", "printer")),
webcam["stream_url"] = str(cam_data.get("urlStream", "")) service=str(cam_data.get("service", "mjpegstreamer")),
webcam["snapshot_url"] = str(cam_data.get("urlSnapshot", "")) stream_url=str(cam_data.get("urlStream", "")),
webcam["flip_horizontal"] = bool(cam_data.get("flipX", False)) snapshot_url=str(cam_data.get("urlSnapshot", "")),
webcam["flip_vertical"] = bool(cam_data.get("flipY", False)) flip_horizontal=bool(cam_data.get("flipX", False)),
webcam["rotation"] = int(cam_data.get("rotation", webcam.get("rotate", 0))) flip_vertical=bool(cam_data.get("flipY", False)),
webcam["extra_data"] = cam_data.get("extra_data", {}) rotation=int(cam_data.get("rotation", cam_data.get("rotate", 0))),
webcam["source"] = "database" source="database",
return cls(server, **webcam) extra_data=cam_data.get("extra_data", {}),
uid=cam_data["uid"]
)
def load_component(config: ConfigHelper) -> WebcamManager: def load_component(config: ConfigHelper) -> WebcamManager:
return WebcamManager(config) return WebcamManager(config)