machine: add video peripheral API request

Signed-off-by: Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2024-01-06 11:45:22 -05:00
parent 522b4df989
commit 6f4a0480f3
No known key found for this signature in database
GPG Key ID: 5A1EB336DFB4C71B
2 changed files with 61 additions and 1 deletions

View File

@ -21,7 +21,7 @@ import tempfile
import getpass
import configparser
from ..confighelper import FileSourceWrapper
from ..utils import source_info, cansocket, sysfs_devs
from ..utils import source_info, cansocket, sysfs_devs, load_system_module
from ..utils import json_wrapper as jsonw
from ..common import RequestType
@ -167,6 +167,9 @@ class Machine:
self.server.register_endpoint(
"/machine/peripherals/canbus", RequestType.GET, self._handle_can_query
)
self.server.register_endpoint(
"/machine/peripherals/video", RequestType.GET, self._handle_video_request
)
self.server.register_notification("machine:service_state_changed")
self.server.register_notification("machine:sudo_alert")
@ -186,6 +189,7 @@ class Machine:
iwgetbin = "iwgetid"
self.iwgetid_cmd = shell_cmd.build_shell_command(iwgetbin)
self.init_evt = asyncio.Event()
self.libcam = self._try_import_libcamera()
def _init_allowed_services(self) -> None:
app_args = self.server.get_app_args()
@ -222,6 +226,23 @@ class Machine:
sys_info_msg += f"\n {svc}"
self.server.add_log_rollover_item('system_info', sys_info_msg, log=log)
def _try_import_libcamera(self) -> Any:
try:
libcam = load_system_module("libcamera")
cmgr = libcam.CameraManager.singleton()
self.server.add_log_rollover_item(
"libcamera",
f"Found libcamera Python module, version: {cmgr.version}"
)
return libcam
except Exception:
if self.server.is_verbose_enabled():
logging.exception("Failed to import libcamera")
self.server.add_log_rollover_item(
"libcamera", "Module libcamera unavailble, import failed"
)
return None
@property
def public_ip(self) -> str:
return self._public_ip
@ -433,6 +454,9 @@ class Machine:
"can_uuids": await self.query_can_uuids(interface)
}
async def _handle_video_request(self, web_request: WebRequest) -> Dict[str, Any]:
return await self.detect_video_devices()
def get_system_info(self) -> Dict[str, Any]:
return self.system_info
@ -879,6 +903,41 @@ class Machine:
cansock.close()
return uuids
async def detect_video_devices(self) -> Dict[str, List[Dict[str, Any]]]:
async with self.periph_lock:
eventloop = self.server.get_event_loop()
v4l2_devs = await eventloop.run_in_thread(sysfs_devs.find_video_devices)
libcam_devs = await eventloop.run_in_thread(self.get_libcamera_devices)
return {
"v4l2_devices": v4l2_devs,
"libcamera_devices": libcam_devs
}
def get_libcamera_devices(self) -> List[Dict[str, Any]]:
libcam = self.libcam
libcam_devs: List[Dict[str, Any]] = []
if libcam is not None:
cm = libcam.CameraManager.singleton()
for cam in cm.cameras:
device: Dict[str, Any] = {"libcamera_id": cam.id}
props_by_name = {cid.name: val for cid, val in cam.properties.items()}
device["model"] = props_by_name.get("Model")
modes: List[Dict[str, Any]] = []
cam_config = cam.generate_configuration([libcam.StreamRole.Raw])
for stream_cfg in cam_config:
formats = stream_cfg.formats
for pix_fmt in formats.pixel_formats:
cur_mode: Dict[str, Any] = {"format": str(pix_fmt)}
resolutions: List[str] = []
for size in formats.sizes(pix_fmt):
resolutions.append(str(size))
cur_mode["resolutions"] = resolutions
modes.append(cur_mode)
device["modes"] = modes
libcam_devs.append(device)
return libcam_devs
class BaseProvider:
def __init__(self, config: ConfigHelper) -> None:
self.server = config.get_server()

View File

@ -40,6 +40,7 @@ if TYPE_CHECKING:
SYS_MOD_PATHS = glob.glob("/usr/lib/python3*/dist-packages")
SYS_MOD_PATHS += glob.glob("/usr/lib/python3*/site-packages")
SYS_MOD_PATHS += glob.glob("/usr/lib/*-linux-gnu/python3*/site-packages")
IPAddress = Union[ipaddress.IPv4Address, ipaddress.IPv6Address]
class ServerError(Exception):