sysfs_devs: add v4l2 video campture device detection
Signed-off-by: Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
parent
eb1599fa07
commit
522b4df989
|
@ -14,7 +14,7 @@ This module contains of Python port of the macros avaialble in
|
|||
"""
|
||||
|
||||
if TYPE_CHECKING:
|
||||
IOCParamSize = Union[int, str, Type[ctypes._SimpleCData]]
|
||||
IOCParamSize = Union[int, str, Type[ctypes._CData]]
|
||||
|
||||
_IOC_NRBITS = 8
|
||||
_IOC_TYPEBITS = 8
|
||||
|
|
|
@ -4,7 +4,12 @@
|
|||
#
|
||||
# This file may be distributed under the terms of the GNU GPLv3 license
|
||||
from __future__ import annotations
|
||||
import os
|
||||
import fcntl
|
||||
import ctypes
|
||||
import pathlib
|
||||
from ..common import ExtendedFlag
|
||||
from . import ioctl_macros
|
||||
from typing import (
|
||||
Dict,
|
||||
List,
|
||||
|
@ -18,6 +23,9 @@ USB_DEVICE_PATH = "/sys/bus/usb/devices"
|
|||
TTY_PATH = "/sys/class/tty"
|
||||
SER_BYPTH_PATH = "/dev/serial/by-path"
|
||||
SER_BYID_PATH = "/dev/serial/by-id"
|
||||
V4L_DEVICE_PATH = "/sys/class/video4linux"
|
||||
V4L_BYPTH_PATH = "/dev/v4l/by-path"
|
||||
V4L_BYID_PATH = "/dev/v4l/by-id"
|
||||
|
||||
OPTIONAL_USB_INFO = ["manufacturer", "product", "serial"]
|
||||
NULL_DESCRIPTIONS = [
|
||||
|
@ -27,6 +35,19 @@ NULL_DESCRIPTIONS = [
|
|||
def read_item(parent: pathlib.Path, filename: str) -> str:
|
||||
return parent.joinpath(filename).read_text().strip()
|
||||
|
||||
def find_usb_folder(usb_path: pathlib.Path) -> Optional[str]:
|
||||
# Find the sysfs usb folder from a child folder
|
||||
while usb_path.is_dir() and usb_path.name:
|
||||
dnum_file = usb_path.joinpath("devnum")
|
||||
bnum_file = usb_path.joinpath("busnum")
|
||||
if not dnum_file.is_file() or not bnum_file.is_file():
|
||||
usb_path = usb_path.parent
|
||||
continue
|
||||
devnum = int(dnum_file.read_text().strip())
|
||||
busnum = int(bnum_file.read_text().strip())
|
||||
return f"{busnum}:{devnum}"
|
||||
return None
|
||||
|
||||
class UsbIdData:
|
||||
_usb_info_cache: Dict[str, str] = {
|
||||
"DI:1d50": "OpenMoko, Inc",
|
||||
|
@ -204,17 +225,7 @@ def find_serial_devices() -> List[Dict[str, Any]]:
|
|||
device_info["device_type"] = "hardware_uart"
|
||||
else:
|
||||
usb_path = device_folder.resolve()
|
||||
usb_location: Optional[str] = None
|
||||
while usb_path.is_dir() and usb_path.name:
|
||||
dnum_file = usb_path.joinpath("devnum")
|
||||
bnum_file = usb_path.joinpath("busnum")
|
||||
if not dnum_file.is_file() or not bnum_file.is_file():
|
||||
usb_path = usb_path.parent
|
||||
continue
|
||||
devnum = int(dnum_file.read_text().strip())
|
||||
busnum = int(bnum_file.read_text().strip())
|
||||
usb_location = f"{busnum}:{devnum}"
|
||||
break
|
||||
usb_location: Optional[str] = find_usb_folder(usb_path)
|
||||
device_info["path_by_hardware"] = usb_devs_by_path.get(device_name)
|
||||
device_info["path_by_id"] = usb_devs_by_id.get(device_name)
|
||||
device_info["usb_location"] = None
|
||||
|
@ -224,3 +235,122 @@ def find_serial_devices() -> List[Dict[str, Any]]:
|
|||
|
||||
serial_devs.append(device_info)
|
||||
return serial_devs
|
||||
|
||||
class struct_v4l2_capability(ctypes.Structure):
|
||||
_fields_ = [
|
||||
("driver", ctypes.c_char * 16),
|
||||
("card", ctypes.c_char * 32),
|
||||
("bus_info", ctypes.c_char * 32),
|
||||
("version", ctypes.c_uint32),
|
||||
("capabilities", ctypes.c_uint32),
|
||||
("device_caps", ctypes.c_uint32),
|
||||
("reserved", ctypes.c_uint32 * 3),
|
||||
]
|
||||
|
||||
class V4L2Capability(ExtendedFlag):
|
||||
VIDEO_CAPTURE = 0x00000001 # noqa: E221
|
||||
VIDEO_OUTPUT = 0x00000002 # noqa: E221
|
||||
VIDEO_OVERLAY = 0x00000004 # noqa: E221
|
||||
VBI_CAPTURE = 0x00000010 # noqa: E221
|
||||
VBI_OUTPUT = 0x00000020 # noqa: E221
|
||||
SLICED_VBI_CAPTURE = 0x00000040 # noqa: E221
|
||||
SLICED_VBI_OUTPUT = 0x00000080 # noqa: E221
|
||||
RDS_CAPTURE = 0x00000100 # noqa: E221
|
||||
VIDEO_OUTPUT_OVERLAY = 0x00000200
|
||||
HW_FREQ_SEEK = 0x00000400 # noqa: E221
|
||||
RDS_OUTPUT = 0x00000800 # noqa: E221
|
||||
VIDEO_CAPTURE_MPLANE = 0x00001000
|
||||
VIDEO_OUTPUT_MPLANE = 0x00002000 # noqa: E221
|
||||
VIDEO_M2M_MPLANE = 0x00004000 # noqa: E221
|
||||
VIDEO_M2M = 0x00008000 # noqa: E221
|
||||
TUNER = 0x00010000 # noqa: E221
|
||||
AUDIO = 0x00020000 # noqa: E221
|
||||
RADIO = 0x00040000 # noqa: E221
|
||||
MODULATOR = 0x00080000 # noqa: E221
|
||||
SDR_CAPTURE = 0x00100000 # noqa: E221
|
||||
EXT_PIX_FORMAT = 0x00200000 # noqa: E221
|
||||
SDR_OUTPUT = 0x00400000 # noqa: E221
|
||||
META_CAPTURE = 0x00800000 # noqa: E221
|
||||
READWRITE = 0x01000000 # noqa: E221
|
||||
STREAMING = 0x04000000 # noqa: E221
|
||||
META_OUTPUT = 0x08000000 # noqa: E221
|
||||
TOUCH = 0x10000000 # noqa: E221
|
||||
IO_MC = 0x20000000 # noqa: E221
|
||||
SET_DEVICE_CAPS = 0x80000000 # noqa: E221
|
||||
|
||||
|
||||
V4L2_QUERYCAP = ioctl_macros.IOR(ord("V"), 0, struct_v4l2_capability)
|
||||
|
||||
def find_video_devices() -> List[Dict[str, Any]]:
|
||||
v4lpath = pathlib.Path(V4L_DEVICE_PATH)
|
||||
if not v4lpath.is_dir():
|
||||
return []
|
||||
v4l_by_path_dir = pathlib.Path(V4L_BYPTH_PATH)
|
||||
v4l_by_id_dir = pathlib.Path(V4L_BYID_PATH)
|
||||
dev_root_folder = pathlib.Path("/dev")
|
||||
v4l_devs_by_path: Dict[str, str] = {}
|
||||
v4l_devs_by_id: Dict[str, str] = {}
|
||||
if v4l_by_path_dir.is_dir():
|
||||
v4l_devs_by_path = {
|
||||
dev.resolve().name: str(dev) for dev in v4l_by_path_dir.iterdir()
|
||||
}
|
||||
if v4l_by_id_dir.is_dir():
|
||||
v4l_devs_by_id = {
|
||||
dev.resolve().name: str(dev) for dev in v4l_by_id_dir.iterdir()
|
||||
}
|
||||
v4l_devices: List[Dict[str, Any]] = []
|
||||
for v4ldev_path in v4lpath.iterdir():
|
||||
devfs_name = v4ldev_path.name
|
||||
devfs_path = dev_root_folder.joinpath(devfs_name)
|
||||
# The video4linux sysfs implmentation provides limited device
|
||||
# info. Use the VIDEOC_QUERYCAPS ioctl to retreive extended
|
||||
# information about the v4l2 device.
|
||||
fd: int = -1
|
||||
try:
|
||||
fd = os.open(str(devfs_path), os.O_RDONLY | os.O_NONBLOCK)
|
||||
cap_info = struct_v4l2_capability()
|
||||
fcntl.ioctl(fd, V4L2_QUERYCAP, cap_info)
|
||||
except Exception:
|
||||
continue
|
||||
finally:
|
||||
if fd != -1:
|
||||
os.close(fd)
|
||||
capabilities = V4L2Capability(cap_info.device_caps)
|
||||
if not capabilities & V4L2Capability.VIDEO_CAPTURE:
|
||||
# Skip devices that do not capture video
|
||||
continue
|
||||
ver_tuple = tuple(
|
||||
[str((cap_info.version >> (i)) & 0xFF) for i in range(16, -1, -8)]
|
||||
)
|
||||
video_device: Dict[str, Any] = {
|
||||
"device_name": devfs_name,
|
||||
"device_path": str(devfs_path),
|
||||
"camera_name": cap_info.card.decode(),
|
||||
"driver_name": cap_info.driver.decode(),
|
||||
"hardware_bus": cap_info.bus_info.decode(),
|
||||
"capabilities": [cap.name for cap in capabilities],
|
||||
"version": ".".join(ver_tuple),
|
||||
"path_by_hardware": v4l_devs_by_path.get(devfs_name),
|
||||
"path_by_id": v4l_devs_by_id.get(devfs_name),
|
||||
"alt_name": None,
|
||||
"usb_location": None,
|
||||
}
|
||||
name_file = v4ldev_path.joinpath("name")
|
||||
if name_file.is_file():
|
||||
video_device["alt_name"] = read_item(v4ldev_path, "name")
|
||||
device_path = v4ldev_path.joinpath("device")
|
||||
if device_path.is_dir():
|
||||
usb_location = find_usb_folder(device_path.resolve())
|
||||
if usb_location is not None:
|
||||
video_device["usb_location"] = usb_location
|
||||
v4l_devices.append(video_device)
|
||||
|
||||
def idx_sorter(item: Dict[str, Any]) -> int:
|
||||
try:
|
||||
return int(item["device_name"][5:])
|
||||
except ValueError:
|
||||
return -1
|
||||
# Sort by string first, then index
|
||||
v4l_devices.sort(key=lambda item: item["device_name"])
|
||||
v4l_devices.sort(key=idx_sorter)
|
||||
return v4l_devices
|
||||
|
|
Loading…
Reference in New Issue