From 0b25350ebcb0e2e9965c7525d98dc7a2d6e52510 Mon Sep 17 00:00:00 2001 From: Eric Callahan Date: Sat, 20 Nov 2021 08:44:36 -0500 Subject: [PATCH] confighelper: add support for getlist and getdict Signed-off-by: Eric Callahan --- moonraker/confighelper.py | 106 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 106 insertions(+) diff --git a/moonraker/confighelper.py b/moonraker/confighelper.py index 2cd60f3..e44035a 100644 --- a/moonraker/confighelper.py +++ b/moonraker/confighelper.py @@ -16,10 +16,12 @@ from typing import ( Callable, IO, Optional, + Tuple, TypeVar, Union, Dict, List, + Type, ) if TYPE_CHECKING: from moonraker import Server @@ -168,6 +170,110 @@ class ConfigHelper: self.config.getfloat, option, default, above, below, minval, maxval) + def getlists(self, + option: str, + default: Union[SentinelClass, _T] = SENTINEL, + list_type: Type = str, + separators: Tuple[str, ...] = ('\n',), + count: Optional[Tuple[Optional[int], ...]] = None + ) -> Union[List[Any], _T]: + if count is not None and len(count) != len(separators): + raise ConfigError( + f"Option '{option}' in section " + f"[{self.section}]: length of 'count' argument must ", + "match length of 'separators' argument") + else: + count = tuple(None for _ in range(len(separators))) + + def list_parser(value: str, + ltype: Type, + seps: Tuple[str, ...], + expected_cnt: Tuple[Optional[int], ...] + ) -> List[Any]: + sep = seps[0] + seps = seps[1:] + cnt = expected_cnt[0] + expected_cnt = expected_cnt[1:] + ret: List[Any] = [] + if seps: + sub_lists = [val.strip() for val in value.split(sep) + if val.strip()] + for sub_list in sub_lists: + ret.append(list_parser(sub_list, ltype, seps, + expected_cnt)) + else: + ret = [ltype(val.strip()) for val in value.split(sep) + if val.strip()] + if cnt is not None and len(ret) != cnt: + raise ConfigError( + f"List length mismatch, expected {cnt}, " + f"parsed {len(ret)}") + return ret + + def getlist_wrapper(sec: str, opt: str) -> List[Any]: + val = self.config.get(sec, opt) + assert count is not None + return list_parser(val, list_type, separators, count) + + return self._get_option(getlist_wrapper, option, default) + + + def getlist(self, + option: str, + default: Union[SentinelClass, _T] = SENTINEL, + separator: str = '\n', + count: Optional[int] = None + ) -> Union[List[str], _T]: + return self.getlists(option, default, str, (separator,), (count,)) + + def getintlist(self, + option: str, + default: Union[SentinelClass, _T] = SENTINEL, + separator: str = '\n', + count: Optional[int] = None + ) -> Union[List[int], _T]: + return self.getlists(option, default, int, (separator,), (count,)) + + def getfloatlist(self, + option: str, + default: Union[SentinelClass, _T] = SENTINEL, + separator: str = '\n', + count: Optional[int] = None + ) -> Union[List[float], _T]: + return self.getlists(option, default, float, (separator,), (count,)) + + def getdict(self, + option: str, + default: Union[SentinelClass, _T] = SENTINEL, + separators: Tuple[str, str] = ('\n', '='), + dict_type: Type = str, + allow_empty_fields: bool = False + ) -> Union[Dict[str, Any], _T]: + if len(separators) != 2: + raise ConfigError( + "The `separators` argument of getdict() must be a Tuple" + "of length of 2") + + def getdict_wrapper(sec: str, opt: str) -> Dict[str, Any]: + val = self.config.get(sec, opt) + ret: Dict[str, Any] = {} + for line in val.split(separators[0]): + line = line.strip() + if not line: + continue + parts = line.split(separators[1], 1) + if len(parts) == 1: + if allow_empty_fields: + ret[parts[0].strip()] = None + else: + raise ConfigError( + f"Failed to parse dictionary field, {line}") + else: + ret[parts[0].strip()] = dict_type(parts[1].strip()) + return ret + + return self._get_option(getdict_wrapper, option, default) + def read_supplemental_config(self, file_name: str) -> ConfigHelper: cfg_file_path = os.path.normpath(os.path.expanduser(file_name)) if not os.path.isfile(cfg_file_path):