From 522b4df9898ff6790ccae79203aff2343ebe8ff3 Mon Sep 17 00:00:00 2001 From: Eric Callahan Date: Fri, 5 Jan 2024 06:30:37 -0500 Subject: [PATCH] sysfs_devs: add v4l2 video campture device detection Signed-off-by: Eric Callahan --- moonraker/utils/ioctl_macros.py | 2 +- moonraker/utils/sysfs_devs.py | 152 +++++++++++++++++++++++++++++--- 2 files changed, 142 insertions(+), 12 deletions(-) diff --git a/moonraker/utils/ioctl_macros.py b/moonraker/utils/ioctl_macros.py index 148e909..08ccbb7 100644 --- a/moonraker/utils/ioctl_macros.py +++ b/moonraker/utils/ioctl_macros.py @@ -14,7 +14,7 @@ This module contains of Python port of the macros avaialble in """ if TYPE_CHECKING: - IOCParamSize = Union[int, str, Type[ctypes._SimpleCData]] + IOCParamSize = Union[int, str, Type[ctypes._CData]] _IOC_NRBITS = 8 _IOC_TYPEBITS = 8 diff --git a/moonraker/utils/sysfs_devs.py b/moonraker/utils/sysfs_devs.py index 4a3f316..b701715 100644 --- a/moonraker/utils/sysfs_devs.py +++ b/moonraker/utils/sysfs_devs.py @@ -4,7 +4,12 @@ # # This file may be distributed under the terms of the GNU GPLv3 license from __future__ import annotations +import os +import fcntl +import ctypes import pathlib +from ..common import ExtendedFlag +from . import ioctl_macros from typing import ( Dict, List, @@ -18,6 +23,9 @@ USB_DEVICE_PATH = "/sys/bus/usb/devices" TTY_PATH = "/sys/class/tty" SER_BYPTH_PATH = "/dev/serial/by-path" SER_BYID_PATH = "/dev/serial/by-id" +V4L_DEVICE_PATH = "/sys/class/video4linux" +V4L_BYPTH_PATH = "/dev/v4l/by-path" +V4L_BYID_PATH = "/dev/v4l/by-id" OPTIONAL_USB_INFO = ["manufacturer", "product", "serial"] NULL_DESCRIPTIONS = [ @@ -27,6 +35,19 @@ NULL_DESCRIPTIONS = [ def read_item(parent: pathlib.Path, filename: str) -> str: return parent.joinpath(filename).read_text().strip() +def find_usb_folder(usb_path: pathlib.Path) -> Optional[str]: + # Find the sysfs usb folder from a child folder + while usb_path.is_dir() and usb_path.name: + dnum_file = usb_path.joinpath("devnum") + bnum_file = usb_path.joinpath("busnum") + if not dnum_file.is_file() or not bnum_file.is_file(): + usb_path = usb_path.parent + continue + devnum = int(dnum_file.read_text().strip()) + busnum = int(bnum_file.read_text().strip()) + return f"{busnum}:{devnum}" + return None + class UsbIdData: _usb_info_cache: Dict[str, str] = { "DI:1d50": "OpenMoko, Inc", @@ -204,17 +225,7 @@ def find_serial_devices() -> List[Dict[str, Any]]: device_info["device_type"] = "hardware_uart" else: usb_path = device_folder.resolve() - usb_location: Optional[str] = None - while usb_path.is_dir() and usb_path.name: - dnum_file = usb_path.joinpath("devnum") - bnum_file = usb_path.joinpath("busnum") - if not dnum_file.is_file() or not bnum_file.is_file(): - usb_path = usb_path.parent - continue - devnum = int(dnum_file.read_text().strip()) - busnum = int(bnum_file.read_text().strip()) - usb_location = f"{busnum}:{devnum}" - break + usb_location: Optional[str] = find_usb_folder(usb_path) device_info["path_by_hardware"] = usb_devs_by_path.get(device_name) device_info["path_by_id"] = usb_devs_by_id.get(device_name) device_info["usb_location"] = None @@ -224,3 +235,122 @@ def find_serial_devices() -> List[Dict[str, Any]]: serial_devs.append(device_info) return serial_devs + +class struct_v4l2_capability(ctypes.Structure): + _fields_ = [ + ("driver", ctypes.c_char * 16), + ("card", ctypes.c_char * 32), + ("bus_info", ctypes.c_char * 32), + ("version", ctypes.c_uint32), + ("capabilities", ctypes.c_uint32), + ("device_caps", ctypes.c_uint32), + ("reserved", ctypes.c_uint32 * 3), + ] + +class V4L2Capability(ExtendedFlag): + VIDEO_CAPTURE = 0x00000001 # noqa: E221 + VIDEO_OUTPUT = 0x00000002 # noqa: E221 + VIDEO_OVERLAY = 0x00000004 # noqa: E221 + VBI_CAPTURE = 0x00000010 # noqa: E221 + VBI_OUTPUT = 0x00000020 # noqa: E221 + SLICED_VBI_CAPTURE = 0x00000040 # noqa: E221 + SLICED_VBI_OUTPUT = 0x00000080 # noqa: E221 + RDS_CAPTURE = 0x00000100 # noqa: E221 + VIDEO_OUTPUT_OVERLAY = 0x00000200 + HW_FREQ_SEEK = 0x00000400 # noqa: E221 + RDS_OUTPUT = 0x00000800 # noqa: E221 + VIDEO_CAPTURE_MPLANE = 0x00001000 + VIDEO_OUTPUT_MPLANE = 0x00002000 # noqa: E221 + VIDEO_M2M_MPLANE = 0x00004000 # noqa: E221 + VIDEO_M2M = 0x00008000 # noqa: E221 + TUNER = 0x00010000 # noqa: E221 + AUDIO = 0x00020000 # noqa: E221 + RADIO = 0x00040000 # noqa: E221 + MODULATOR = 0x00080000 # noqa: E221 + SDR_CAPTURE = 0x00100000 # noqa: E221 + EXT_PIX_FORMAT = 0x00200000 # noqa: E221 + SDR_OUTPUT = 0x00400000 # noqa: E221 + META_CAPTURE = 0x00800000 # noqa: E221 + READWRITE = 0x01000000 # noqa: E221 + STREAMING = 0x04000000 # noqa: E221 + META_OUTPUT = 0x08000000 # noqa: E221 + TOUCH = 0x10000000 # noqa: E221 + IO_MC = 0x20000000 # noqa: E221 + SET_DEVICE_CAPS = 0x80000000 # noqa: E221 + + +V4L2_QUERYCAP = ioctl_macros.IOR(ord("V"), 0, struct_v4l2_capability) + +def find_video_devices() -> List[Dict[str, Any]]: + v4lpath = pathlib.Path(V4L_DEVICE_PATH) + if not v4lpath.is_dir(): + return [] + v4l_by_path_dir = pathlib.Path(V4L_BYPTH_PATH) + v4l_by_id_dir = pathlib.Path(V4L_BYID_PATH) + dev_root_folder = pathlib.Path("/dev") + v4l_devs_by_path: Dict[str, str] = {} + v4l_devs_by_id: Dict[str, str] = {} + if v4l_by_path_dir.is_dir(): + v4l_devs_by_path = { + dev.resolve().name: str(dev) for dev in v4l_by_path_dir.iterdir() + } + if v4l_by_id_dir.is_dir(): + v4l_devs_by_id = { + dev.resolve().name: str(dev) for dev in v4l_by_id_dir.iterdir() + } + v4l_devices: List[Dict[str, Any]] = [] + for v4ldev_path in v4lpath.iterdir(): + devfs_name = v4ldev_path.name + devfs_path = dev_root_folder.joinpath(devfs_name) + # The video4linux sysfs implmentation provides limited device + # info. Use the VIDEOC_QUERYCAPS ioctl to retreive extended + # information about the v4l2 device. + fd: int = -1 + try: + fd = os.open(str(devfs_path), os.O_RDONLY | os.O_NONBLOCK) + cap_info = struct_v4l2_capability() + fcntl.ioctl(fd, V4L2_QUERYCAP, cap_info) + except Exception: + continue + finally: + if fd != -1: + os.close(fd) + capabilities = V4L2Capability(cap_info.device_caps) + if not capabilities & V4L2Capability.VIDEO_CAPTURE: + # Skip devices that do not capture video + continue + ver_tuple = tuple( + [str((cap_info.version >> (i)) & 0xFF) for i in range(16, -1, -8)] + ) + video_device: Dict[str, Any] = { + "device_name": devfs_name, + "device_path": str(devfs_path), + "camera_name": cap_info.card.decode(), + "driver_name": cap_info.driver.decode(), + "hardware_bus": cap_info.bus_info.decode(), + "capabilities": [cap.name for cap in capabilities], + "version": ".".join(ver_tuple), + "path_by_hardware": v4l_devs_by_path.get(devfs_name), + "path_by_id": v4l_devs_by_id.get(devfs_name), + "alt_name": None, + "usb_location": None, + } + name_file = v4ldev_path.joinpath("name") + if name_file.is_file(): + video_device["alt_name"] = read_item(v4ldev_path, "name") + device_path = v4ldev_path.joinpath("device") + if device_path.is_dir(): + usb_location = find_usb_folder(device_path.resolve()) + if usb_location is not None: + video_device["usb_location"] = usb_location + v4l_devices.append(video_device) + + def idx_sorter(item: Dict[str, Any]) -> int: + try: + return int(item["device_name"][5:]) + except ValueError: + return -1 + # Sort by string first, then index + v4l_devices.sort(key=lambda item: item["device_name"]) + v4l_devices.sort(key=idx_sorter) + return v4l_devices