moonraker/tests/mocks/mock_gpio.py

194 lines
6.2 KiB
Python

from __future__ import annotations
import os
import logging
from typing import Dict, Optional, List, Tuple
class GpioException(Exception):
pass
class MockGpiod:
LINE_REQ_DIR_OUT = 3
LINE_REQ_EV_BOTH_EDGES = 6
LINE_REQ_FLAG_ACTIVE_LOW = 1 << 2
LINE_REQ_FLAG_BIAS_DISABLE = 1 << 3
LINE_REQ_FLAG_BIAS_PULL_DOWN = 1 << 4
LINE_REQ_FLAG_BIAS_PULL_UP = 1 << 5
def __init__(self, version: str = "1.2") -> None:
self.version = version
self.Chip = MockChipWrapper(self)
self.LineEvent = MockLineEvent
self.chips: Dict[str, MockChip] = {}
def version_string(self) -> str:
return self.version
def version_tuple(self) -> Tuple[int, ...]:
return tuple([int(v) for v in self.version.split(".")])
def get_chip(self, chip_name) -> Optional[MockChip]:
return self.chips.get(chip_name, None)
def add_chip(self, chip: MockChip):
self.chips[chip.name] = chip
def pop_chip(self, name: str):
self.chips.pop(name, None)
def find_line(self, chip_id: str, pin_id: str) -> MockLine:
if chip_id not in self.chips:
raise GpioException(f"Unable to find chip {chip_id}")
return self.chips[chip_id].find_line(pin_id)
class MockChipWrapper:
OPEN_BY_NAME = 2
def __init__(self, gpiod: MockGpiod) -> None:
self.mock_gpiod = gpiod
def __call__(self, chip_name: str, flags: int) -> MockChip:
if chip_name in self.mock_gpiod.chips:
return self.mock_gpiod.chips[chip_name]
chip = MockChip(chip_name, flags, self.mock_gpiod)
self.mock_gpiod.add_chip(chip)
return chip
class MockChip:
def __init__(self,
chip_name: str,
flags: int,
mock_gpiod: MockGpiod
) -> None:
self.name = chip_name
self.flags = flags
self.mock_gpiod = mock_gpiod
self.requested_lines: Dict[str, MockLine] = {}
def get_line(self, pin_id: str) -> MockLine:
if pin_id in self.requested_lines:
raise GpioException(f"Line {pin_id} already reserved")
line = MockLine(self, pin_id, self.mock_gpiod)
self.requested_lines[pin_id] = line
return line
def find_line(self, pin_id: str) -> MockLine:
if pin_id not in self.requested_lines:
raise GpioException(f"Unable to find line {pin_id}")
return self.requested_lines[pin_id]
def pop_line(self, name: str) -> None:
self.requested_lines.pop(name, None)
def close(self) -> None:
for line in list(self.requested_lines.values()):
line.release()
self.requested_lines = {}
self.mock_gpiod.pop_chip(self.name)
class MockLine:
def __init__(self,
chip: MockChip,
name: str,
mock_gpiod: MockGpiod
) -> None:
self.mock_gpiod = mock_gpiod
self.chip = chip
self.name = name
self.consumer_name: str = ""
self.is_event = False
self.invert = False
self.value = 0
self.read_pipe: Optional[int] = None
self.write_pipe: Optional[int] = None
self.bias = "not_configured"
def request(self,
consumer: str,
type: int,
flags: int = 0,
default_vals: Optional[List[int]] = None,
default_val: Optional[int] = None
) -> None:
self.consumer_name = consumer
version = self.mock_gpiod.version_tuple()
if type == MockGpiod.LINE_REQ_DIR_OUT:
self.is_event = False
if default_vals is not None:
if version > (1, 2):
logging.warn("default_vals is deprecated in gpiod 1.3+")
self.value = default_vals[0]
elif default_val is not None:
if version < (1, 3):
raise GpioException(
"default_val not available in gpiod < 1.3")
self.value = default_val
elif type == MockGpiod.LINE_REQ_EV_BOTH_EDGES:
self.is_event = True
if version >= (1, 5):
if flags & MockGpiod.LINE_REQ_FLAG_BIAS_DISABLE:
self.bias = "disabled"
elif flags & MockGpiod.LINE_REQ_FLAG_BIAS_PULL_DOWN:
self.bias = "pulldown"
elif flags & MockGpiod.LINE_REQ_FLAG_BIAS_PULL_UP:
self.bias = "pullup"
self.read_pipe, self.write_pipe = os.pipe2(os.O_NONBLOCK)
else:
raise GpioException("Unsupported GPIO Type")
if flags & MockGpiod.LINE_REQ_FLAG_ACTIVE_LOW:
self.invert = True
def release(self) -> None:
if self.read_pipe is not None:
try:
os.close(self.read_pipe)
except Exception:
pass
if self.write_pipe is not None:
try:
os.close(self.write_pipe)
except Exception:
pass
self.chip.pop_line(self.name)
def set_value(self, value: int) -> None:
if self.is_event:
raise GpioException("Cannot set the value for an input pin")
self.value = int(not not value)
def get_value(self) -> int:
return self.value
def event_read(self) -> MockLineEvent:
if self.read_pipe is None:
raise GpioException
try:
data = os.read(self.read_pipe, 64)
except Exception:
pass
else:
value = int(not not data[-1])
self.value = value
return MockLineEvent(self.value)
def event_get_fd(self) -> int:
if self.read_pipe is None:
raise GpioException("Event not configured")
return self.read_pipe
def simulate_line_event(self, value: int) -> None:
if self.write_pipe is None:
raise GpioException("Event not configured")
val = bytes([int(not not value)])
try:
os.write(self.write_pipe, val)
except Exception:
pass
class MockLineEvent:
RISING_EDGE = 1
FALLING_EDGE = 2
def __init__(self, value: int) -> None:
if value == 1:
self.type = self.RISING_EDGE
else:
self.type = self.FALLING_EDGE