diff --git a/moonraker/confighelper.py b/moonraker/confighelper.py index e8a6a24..54a1c31 100644 --- a/moonraker/confighelper.py +++ b/moonraker/confighelper.py @@ -20,7 +20,6 @@ from typing import ( Callable, IO, Optional, - Set, Tuple, TypeVar, Union, @@ -47,22 +46,21 @@ class ConfigHelper: error = ConfigError def __init__(self, server: Server, - config: configparser.ConfigParser, + config_source: ConfigSourceWrapper, section: str, parsed: Dict[str, Dict[str, ConfigVal]], - file_section_map: Dict[str, List[str]], fallback_section: Optional[str] = None ) -> None: self.server = server - self.config = config + self.source = config_source + self.config = config_source.config self.section = section self.fallback_section: Optional[str] = fallback_section self.parsed = parsed if self.section not in self.parsed: self.parsed[self.section] = {} - self.file_section_map = file_section_map - self.sections = config.sections - self.has_section = config.has_section + self.sections = self.config.sections + self.has_section = self.config.has_section def get_server(self) -> Server: return self.server @@ -83,10 +81,7 @@ class ConfigHelper: return self.section def get_file(self) -> Optional[pathlib.Path]: - for fname in reversed(self.file_section_map.keys()): - if self.section in self.file_section_map[fname]: - return pathlib.Path(fname) - return None + return self.source.find_config_file(self.section) def get_options(self) -> Dict[str, str]: if self.section not in self.config: @@ -110,8 +105,7 @@ class ConfigHelper: self, section: str, fallback: Optional[str] = None ) -> ConfigHelper: return ConfigHelper( - self.server, self.config, section, self.parsed, - self.file_section_map, fallback + self.server, self.source, section, self.parsed, fallback ) def _get_option(self, @@ -406,26 +400,17 @@ class ConfigHelper: def read_supplemental_dict(self, obj: Dict[str, Any]) -> ConfigHelper: if not obj: raise ConfigError(f"Cannot ready Empty Dict") - try: - sup_cfg = configparser.ConfigParser(interpolation=None) - sup_cfg.read_dict(obj) - except Exception: - raise ConfigError("Error Reading Object") - sections = sup_cfg.sections() - return ConfigHelper(self.server, sup_cfg, sections[0], {}, {}) + source = ConfigSourceWrapper() + source.read_dict(obj) + sections = source.config.sections() + return ConfigHelper(self.server, source, sections[0], {}) def read_supplemental_config(self, file_name: str) -> ConfigHelper: - cfg_file_path = os.path.normpath(os.path.expanduser(file_name)) - if not os.path.isfile(cfg_file_path): - raise ConfigError( - f"Configuration File Not Found: '{cfg_file_path}''") - try: - sup_cfg = configparser.ConfigParser(interpolation=None) - sup_cfg.read_file(open(cfg_file_path)) - except Exception: - raise ConfigError(f"Error Reading Config: '{cfg_file_path}'") - sections = sup_cfg.sections() - return ConfigHelper(self.server, sup_cfg, sections[0], {}, {}) + fpath = pathlib.Path(file_name).expanduser().resolve() + source = ConfigSourceWrapper() + source.read_file(fpath) + sections = source.config.sections() + return ConfigHelper(self.server, source, sections[0], {}) def write_config(self, file_obj: IO[str]) -> None: self.config.write(file_obj) @@ -439,10 +424,10 @@ class ConfigHelper: } def get_file_sections(self) -> Dict[str, List[str]]: - return dict(self.file_section_map) + return self.source.get_file_sections(self.section) def get_config_files(self) -> List[str]: - return list(self.file_section_map.keys()) + return [str(f) for f in self.source.files] def validate_config(self) -> None: for sect in self.config.sections(): @@ -471,8 +456,7 @@ class ConfigHelper: try: if backup.exists(): cfg_mtime: int = 0 - for cfg_fname in set(self.file_section_map.keys()): - cfg = pathlib.Path(cfg_fname) + for cfg in self.source.files: cfg_mtime = max(cfg_mtime, cfg.stat().st_mtime_ns) backup_mtime = backup.stat().st_mtime_ns if backup_mtime >= cfg_mtime: @@ -487,67 +471,156 @@ class ConfigHelper: if backup_fp is not None: backup_fp.close() +class ConfigSourceWrapper: + section_r = re.compile(r"\s*\[([^]]+)\]") + + def __init__(self) -> None: + self.config = configparser.ConfigParser(interpolation=None) + self.files: List[pathlib.Path] = [] + self.file_section_map: Dict[str, List[int]] = {} + self.file_option_map: Dict[Tuple[str, str], List[int]] = {} + + def get_file_sections(self, section: str) -> Dict[str, List[str]]: + sections_by_file: Dict[str, List[str]] = { + str(fname): [] for fname in self.files + } + for section, idx_list in self.file_section_map.items(): + for idx in idx_list: + fname = str(self.files[idx]) + sections_by_file[fname].append(section) + return sections_by_file + + def find_config_file( + self, section: str, option: Optional[str] = None + ) -> Optional[pathlib.Path]: + idx: int = -1 + if option is not None: + key = (section, option) + if key in self.file_option_map: + idx = self.file_option_map[key][0] + elif section in self.file_section_map: + idx = self.file_section_map[section][0] + if idx == -1: + return None + return self.files[idx] + + def _write_buffer(self, buffer: List[str], fpath: pathlib.Path) -> None: + if not buffer: + return + self.config.read_string("\n".join(buffer), fpath.name) + + def _parse_file( + self, file_path: pathlib.Path, visited: List[Tuple[int, int]] + ) -> None: + buffer: List[str] = [] + try: + stat = file_path.stat() + cur_stat = (stat.st_dev, stat.st_ino) + if cur_stat in visited: + raise ConfigError( + f"Recursive include directive detected, {file_path}" + ) + visited.append(cur_stat) + self.files.append(file_path) + file_index = len(self.files) - 1 + lines = file_path.read_text().splitlines() + last_section = "" + opt_indent = -1 + for line in lines: + if not line.strip(): + # ignore a line that contains only whitespace + continue + line = line.expandtabs(tabsize=4) + # Remove inline comments + for prefix in "#;": + icmt = line.find(prefix) + if icmt >= 0 and line.lstrip()[0] == prefix: + # This line is a comment, ignore it + continue + if icmt > 0 and line[icmt-1] != "\\": + # inline comment, remove it + line = line[:icmt] + break + line_indent = len(line) - len(line.lstrip()) + if opt_indent != -1 and line_indent > opt_indent: + # Multi-line value, append to buffer and resume parsing + buffer.append(line) + continue + sect_match = self.section_r.match(line) + if sect_match is not None: + # Section detected + opt_indent = -1 + section = sect_match.group(1) + if section.startswith("include "): + inc_path = section[8:].strip() + if not inc_path: + raise ConfigError( + f"Invalid include directive: [{section}]" + ) + if inc_path[0] == "/": + new_path = pathlib.Path(inc_path).resolve() + paths = sorted(new_path.parent.glob(new_path.name)) + else: + paths = sorted(file_path.parent.glob(inc_path)) + if not paths: + raise ConfigError( + "No files matching include directive " + f"[{section}]" + ) + # Write out buffered data to the config before parsing + # included files + self._write_buffer(buffer, file_path) + buffer.clear() + for p in paths: + self._parse_file(p, visited) + # Don't add included sections to the configparser + continue + else: + last_section = section + fsm = self.file_section_map + fsm.setdefault(section, []).insert(0, file_index) + else: + # This line must specify an option + opt_indent = line_indent + option = re.split(r"[:=]", line, 1)[0].strip() + key = (last_section, option) + fom = self.file_option_map + fom.setdefault(key, []).insert(0, file_index) + buffer.append(line) + self._write_buffer(buffer, file_path) + except ConfigError: + raise + except Exception as e: + if not file_path.is_file(): + raise ConfigError( + f"Configuration File Not Found: '{file_path}''") from e + if not os.access(file_path, os.R_OK): + raise ConfigError( + "Moonraker does not have Read/Write permission for " + f"config file at path '{file_path}'") from e + raise ConfigError(f"Error Reading Config: '{file_path}'") from e + + def read_file(self, main_conf: pathlib.Path) -> None: + self.file_section_map.clear() + self.file_option_map.clear() + self._parse_file(main_conf, []) + + def read_dict(self, cfg: Dict[str, Any]) -> None: + try: + self.config.read_dict(cfg) + except Exception as e: + raise ConfigError("Error Reading config as dict") from e + + def get_configuration( server: Server, app_args: Dict[str, Any] ) -> ConfigHelper: - config = configparser.ConfigParser(interpolation=None) - section_map = parse_config_file(config, app_args) - if not config.has_section('server'): - raise ConfigError("No section [server] in config") - return ConfigHelper(server, config, 'server', {}, section_map) - -def parse_config_file( - config: configparser.ConfigParser, app_args: Dict[str, Any] -) -> Dict[str, List[str]]: start_path = pathlib.Path(app_args['config_file']).expanduser().resolve() - config_files: List[pathlib.Path] = [start_path] - visited_files: Set[Tuple[int, int]] = set() - file_sections: Dict[str, List[str]] = {} - while config_files: - config_path = config_files.pop(0) - try: - stat = config_path.stat() - visited = (stat.st_dev, stat.st_ino) - if visited in visited_files: - raise ConfigError("Recursive include directive detected") - visited_files.add(visited) - data = config_path.read_text() - config.read_string(data) - except Exception as e: - if not config_path.is_file(): - raise ConfigError( - f"Configuration File Not Found: '{config_path}''") from e - if not os.access(config_path, os.R_OK): - raise ConfigError( - "Moonraker does not have Read/Write permission for " - f"config file at path '{config_path}'") from e - raise ConfigError(f"Error Reading Config: '{config_path}'") from e - all_sections: List[str] = re.findall( - r"^\[([^]]+)\]\s*$", data, flags=re.MULTILINE - ) - file_sections[str(config_path)] = [ - sec for sec in all_sections if not sec.startswith("include") - ] - for sec in config.sections(): - if not sec.startswith("include"): - continue - str_path = sec[8:].strip() - if not str_path: - raise ConfigError( - f"Invalid include directive: [{sec}]" - ) - config.remove_section(sec) - if str_path[0] == "/": - path = pathlib.Path(str_path) - paths = sorted(path.parent.glob(path.name)) - else: - paths = sorted(config_path.parent.glob(str_path)) - if not paths: - raise ConfigError( - f"No files matching include directive [{sec}]" - ) - config_files.extend(paths) - return file_sections + source = ConfigSourceWrapper() + source.read_file(start_path) + if not source.config.has_section('server'): + raise ConfigError("No section [server] in config") + return ConfigHelper(server, source, 'server', {}) def find_config_backup(cfg_path: str) -> Optional[str]: cfg = pathlib.Path(cfg_path).expanduser().resolve()