confighelper: add support for getlist and getdict
Signed-off-by: Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
parent
c0ae10bef6
commit
0b25350ebc
|
@ -16,10 +16,12 @@ from typing import (
|
||||||
Callable,
|
Callable,
|
||||||
IO,
|
IO,
|
||||||
Optional,
|
Optional,
|
||||||
|
Tuple,
|
||||||
TypeVar,
|
TypeVar,
|
||||||
Union,
|
Union,
|
||||||
Dict,
|
Dict,
|
||||||
List,
|
List,
|
||||||
|
Type,
|
||||||
)
|
)
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from moonraker import Server
|
from moonraker import Server
|
||||||
|
@ -168,6 +170,110 @@ class ConfigHelper:
|
||||||
self.config.getfloat, option, default,
|
self.config.getfloat, option, default,
|
||||||
above, below, minval, maxval)
|
above, below, minval, maxval)
|
||||||
|
|
||||||
|
def getlists(self,
|
||||||
|
option: str,
|
||||||
|
default: Union[SentinelClass, _T] = SENTINEL,
|
||||||
|
list_type: Type = str,
|
||||||
|
separators: Tuple[str, ...] = ('\n',),
|
||||||
|
count: Optional[Tuple[Optional[int], ...]] = None
|
||||||
|
) -> Union[List[Any], _T]:
|
||||||
|
if count is not None and len(count) != len(separators):
|
||||||
|
raise ConfigError(
|
||||||
|
f"Option '{option}' in section "
|
||||||
|
f"[{self.section}]: length of 'count' argument must ",
|
||||||
|
"match length of 'separators' argument")
|
||||||
|
else:
|
||||||
|
count = tuple(None for _ in range(len(separators)))
|
||||||
|
|
||||||
|
def list_parser(value: str,
|
||||||
|
ltype: Type,
|
||||||
|
seps: Tuple[str, ...],
|
||||||
|
expected_cnt: Tuple[Optional[int], ...]
|
||||||
|
) -> List[Any]:
|
||||||
|
sep = seps[0]
|
||||||
|
seps = seps[1:]
|
||||||
|
cnt = expected_cnt[0]
|
||||||
|
expected_cnt = expected_cnt[1:]
|
||||||
|
ret: List[Any] = []
|
||||||
|
if seps:
|
||||||
|
sub_lists = [val.strip() for val in value.split(sep)
|
||||||
|
if val.strip()]
|
||||||
|
for sub_list in sub_lists:
|
||||||
|
ret.append(list_parser(sub_list, ltype, seps,
|
||||||
|
expected_cnt))
|
||||||
|
else:
|
||||||
|
ret = [ltype(val.strip()) for val in value.split(sep)
|
||||||
|
if val.strip()]
|
||||||
|
if cnt is not None and len(ret) != cnt:
|
||||||
|
raise ConfigError(
|
||||||
|
f"List length mismatch, expected {cnt}, "
|
||||||
|
f"parsed {len(ret)}")
|
||||||
|
return ret
|
||||||
|
|
||||||
|
def getlist_wrapper(sec: str, opt: str) -> List[Any]:
|
||||||
|
val = self.config.get(sec, opt)
|
||||||
|
assert count is not None
|
||||||
|
return list_parser(val, list_type, separators, count)
|
||||||
|
|
||||||
|
return self._get_option(getlist_wrapper, option, default)
|
||||||
|
|
||||||
|
|
||||||
|
def getlist(self,
|
||||||
|
option: str,
|
||||||
|
default: Union[SentinelClass, _T] = SENTINEL,
|
||||||
|
separator: str = '\n',
|
||||||
|
count: Optional[int] = None
|
||||||
|
) -> Union[List[str], _T]:
|
||||||
|
return self.getlists(option, default, str, (separator,), (count,))
|
||||||
|
|
||||||
|
def getintlist(self,
|
||||||
|
option: str,
|
||||||
|
default: Union[SentinelClass, _T] = SENTINEL,
|
||||||
|
separator: str = '\n',
|
||||||
|
count: Optional[int] = None
|
||||||
|
) -> Union[List[int], _T]:
|
||||||
|
return self.getlists(option, default, int, (separator,), (count,))
|
||||||
|
|
||||||
|
def getfloatlist(self,
|
||||||
|
option: str,
|
||||||
|
default: Union[SentinelClass, _T] = SENTINEL,
|
||||||
|
separator: str = '\n',
|
||||||
|
count: Optional[int] = None
|
||||||
|
) -> Union[List[float], _T]:
|
||||||
|
return self.getlists(option, default, float, (separator,), (count,))
|
||||||
|
|
||||||
|
def getdict(self,
|
||||||
|
option: str,
|
||||||
|
default: Union[SentinelClass, _T] = SENTINEL,
|
||||||
|
separators: Tuple[str, str] = ('\n', '='),
|
||||||
|
dict_type: Type = str,
|
||||||
|
allow_empty_fields: bool = False
|
||||||
|
) -> Union[Dict[str, Any], _T]:
|
||||||
|
if len(separators) != 2:
|
||||||
|
raise ConfigError(
|
||||||
|
"The `separators` argument of getdict() must be a Tuple"
|
||||||
|
"of length of 2")
|
||||||
|
|
||||||
|
def getdict_wrapper(sec: str, opt: str) -> Dict[str, Any]:
|
||||||
|
val = self.config.get(sec, opt)
|
||||||
|
ret: Dict[str, Any] = {}
|
||||||
|
for line in val.split(separators[0]):
|
||||||
|
line = line.strip()
|
||||||
|
if not line:
|
||||||
|
continue
|
||||||
|
parts = line.split(separators[1], 1)
|
||||||
|
if len(parts) == 1:
|
||||||
|
if allow_empty_fields:
|
||||||
|
ret[parts[0].strip()] = None
|
||||||
|
else:
|
||||||
|
raise ConfigError(
|
||||||
|
f"Failed to parse dictionary field, {line}")
|
||||||
|
else:
|
||||||
|
ret[parts[0].strip()] = dict_type(parts[1].strip())
|
||||||
|
return ret
|
||||||
|
|
||||||
|
return self._get_option(getdict_wrapper, option, default)
|
||||||
|
|
||||||
def read_supplemental_config(self, file_name: str) -> ConfigHelper:
|
def read_supplemental_config(self, file_name: str) -> ConfigHelper:
|
||||||
cfg_file_path = os.path.normpath(os.path.expanduser(file_name))
|
cfg_file_path = os.path.normpath(os.path.expanduser(file_name))
|
||||||
if not os.path.isfile(cfg_file_path):
|
if not os.path.isfile(cfg_file_path):
|
||||||
|
|
Loading…
Reference in New Issue