machine: add support for peripheral queries

Implement endpoints to query the following:
- Serial Devices (including Hardware UART)
- USB Devices using lsusb
- Klipper CAN Node UUIDs

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2023-12-16 15:04:54 -05:00
parent 0fb997285b
commit eb1599fa07
No known key found for this signature in database
GPG Key ID: 5A1EB336DFB4C71B
1 changed files with 101 additions and 1 deletions

View File

@ -21,7 +21,7 @@ import tempfile
import getpass
import configparser
from ..confighelper import FileSourceWrapper
from ..utils import source_info
from ..utils import source_info, cansocket, sysfs_devs
from ..utils import json_wrapper as jsonw
from ..common import RequestType
@ -44,6 +44,7 @@ if TYPE_CHECKING:
from ..common import WebRequest
from .application import MoonrakerApp
from .klippy_connection import KlippyConnection
from .http_client import HttpClient
from .shell_command import ShellCommandFactory as SCMDComp
from .database import MoonrakerDatabase
from .file_manager.file_manager import FileManager
@ -84,6 +85,7 @@ SERVICE_PROPERTIES = [
"ExecStart", "WorkingDirectory", "FragmentPath", "Description",
"User"
]
USB_IDS_URL = "http://www.linux-usb.org/usb.ids"
class Machine:
def __init__(self, config: ConfigHelper) -> None:
@ -97,6 +99,7 @@ class Machine:
self.inside_container = False
self.moonraker_service_info: Dict[str, Any] = {}
self.sudo_req_lock = asyncio.Lock()
self.periph_lock = asyncio.Lock()
self._sudo_password: Optional[str] = None
sudo_template = config.gettemplate("sudo_password", None)
if sudo_template is not None:
@ -155,6 +158,15 @@ class Machine:
self.server.register_endpoint(
"/machine/sudo/password", RequestType.POST, self._set_sudo_password
)
self.server.register_endpoint(
"/machine/peripherals/serial", RequestType.GET, self._handle_serial_request
)
self.server.register_endpoint(
"/machine/peripherals/usb", RequestType.GET, self._handle_usb_request
)
self.server.register_endpoint(
"/machine/peripherals/canbus", RequestType.GET, self._handle_can_query
)
self.server.register_notification("machine:service_state_changed")
self.server.register_notification("machine:sudo_alert")
@ -251,6 +263,7 @@ class Machine:
pass
async def component_init(self) -> None:
await self.update_usb_ids()
await self.validator.validation_init()
await self.sys_provider.initialize()
if not self.inside_container:
@ -404,6 +417,22 @@ class Machine:
"request_messages": self.sudo_request_messages
}
async def _handle_serial_request(self, web_request: WebRequest) -> Dict[str, Any]:
return {
"serial_devices": await self.detect_serial_devices()
}
async def _handle_usb_request(self, web_request: WebRequest) -> Dict[str, Any]:
return {
"usb_devices": await self.detect_usb_devices()
}
async def _handle_can_query(self, web_request: WebRequest) -> Dict[str, Any]:
interface = web_request.get_str("interface", "can0")
return {
"can_uuids": await self.query_can_uuids(interface)
}
def get_system_info(self) -> Dict[str, Any]:
return self.system_info
@ -779,6 +808,77 @@ class Machine:
msg += f"\n{key}: {val}"
self.server.add_log_rollover_item(name, msg)
async def update_usb_ids(self, force: bool = False) -> None:
async with self.periph_lock:
db: MoonrakerDatabase = self.server.lookup_component("database")
client: HttpClient = self.server.lookup_component("http_client")
dpath = pathlib.Path(self.server.get_app_arg("data_path"))
usb_ids_path = pathlib.Path(dpath).joinpath("misc/usb.ids")
if usb_ids_path.is_file() and not force:
return
usb_id_req_info: Dict[str, str]
usb_id_req_info = await db.get_item("moonraker", "usb_id_req_info", {})
etag: Optional[str] = usb_id_req_info.pop("etag", None)
last_modified: Optional[str] = usb_id_req_info.pop("last_modified", None)
headers = {"Accept": "text/plain"}
if etag is not None and usb_ids_path.is_file():
headers["If-None-Match"] = etag
if last_modified is not None and usb_ids_path.is_file():
headers["If-Modified-Since"] = last_modified
resp = await client.get(
USB_IDS_URL, headers, enable_cache=False
)
if resp.has_error():
logging.info("Failed to retrieve usb.ids file")
return
if resp.status_code == 304:
logging.info("USB IDs file up to date")
return
# Save etag and modified headers
if resp.etag is not None:
usb_id_req_info["etag"] = resp.etag
if resp.last_modified is not None:
usb_id_req_info["last_modifed"] = resp.last_modified
await db.insert_item("moonraker", "usb_id_req_info", usb_id_req_info)
# Write file
logging.info("Writing usb.ids file...")
eventloop = self.server.get_event_loop()
await eventloop.run_in_thread(usb_ids_path.write_bytes, resp.content)
async def detect_serial_devices(self) -> List[Dict[str, Any]]:
async with self.periph_lock:
eventloop = self.server.get_event_loop()
return await eventloop.run_in_thread(sysfs_devs.find_serial_devices)
async def detect_usb_devices(self) -> List[Dict[str, Any]]:
async with self.periph_lock:
eventloop = self.server.get_event_loop()
return await eventloop.run_in_thread(self._do_usb_detect)
def _do_usb_detect(self) -> List[Dict[str, Any]]:
data_path = pathlib.Path(self.server.get_app_args()["data_path"])
usb_id_path = data_path.joinpath("misc/usb.ids")
usb_id_data = sysfs_devs.UsbIdData(usb_id_path)
dev_list = sysfs_devs.find_usb_devices()
for usb_dev_info in dev_list:
cls_ids: List[str] = usb_dev_info.pop("class_ids", None)
class_info = usb_id_data.get_class_info(*cls_ids)
usb_dev_info.update(class_info)
prod_info = usb_id_data.get_product_info(
usb_dev_info["vendor_id"], usb_dev_info["product_id"]
)
for field, desc in prod_info.items():
if usb_dev_info.get(field) is None:
usb_dev_info[field] = desc
return dev_list
async def query_can_uuids(self, interface: str) -> List[Dict[str, Any]]:
async with self.periph_lock:
cansock = cansocket.CanSocket(interface)
uuids = await cansocket.query_klipper_uuids(cansock)
cansock.close()
return uuids
class BaseProvider:
def __init__(self, config: ConfigHelper) -> None:
self.server = config.get_server()