From ad00e08d111156e3c863f3a7cfe7b6ff2aa823b9 Mon Sep 17 00:00:00 2001 From: Eric Callahan Date: Tue, 5 Apr 2022 12:20:33 -0400 Subject: [PATCH] confighelper: add support for include directives Signed-off-by: Eric Callahan --- moonraker/confighelper.py | 92 ++++++++++++++++++++++++++++++--------- 1 file changed, 72 insertions(+), 20 deletions(-) diff --git a/moonraker/confighelper.py b/moonraker/confighelper.py index 9f28372..11c1493 100644 --- a/moonraker/confighelper.py +++ b/moonraker/confighelper.py @@ -11,6 +11,7 @@ import hashlib import shutil import filecmp import pathlib +import re import logging from utils import SentinelClass @@ -21,6 +22,7 @@ from typing import ( Callable, IO, Optional, + Set, Tuple, TypeVar, Union, @@ -49,6 +51,7 @@ class ConfigHelper: config: configparser.ConfigParser, section: str, parsed: Dict[str, Dict[str, ConfigVal]], + file_section_map: Dict[str, List[str]], fallback_section: Optional[str] = None ) -> None: self.server = server @@ -58,6 +61,7 @@ class ConfigHelper: self.parsed = parsed if self.section not in self.parsed: self.parsed[self.section] = {} + self.file_section_map = file_section_map self.sections = config.sections self.has_section = config.has_section @@ -79,6 +83,12 @@ class ConfigHelper: def get_name(self) -> str: return self.section + def get_file(self) -> Optional[pathlib.Path]: + for fname in reversed(self.file_section_map.keys()): + if self.section in self.file_section_map[fname]: + return pathlib.Path(fname) + return None + def get_options(self) -> Dict[str, str]: if self.section not in self.config: return {} @@ -101,7 +111,8 @@ class ConfigHelper: self, section: str, fallback: Optional[str] = None ) -> ConfigHelper: return ConfigHelper( - self.server, self.config, section, self.parsed, fallback + self.server, self.config, section, self.parsed, + self.file_section_map, fallback ) def _get_option(self, @@ -402,7 +413,7 @@ class ConfigHelper: except Exception: raise ConfigError("Error Reading Object") sections = sup_cfg.sections() - return ConfigHelper(self.server, sup_cfg, sections[0], {}) + return ConfigHelper(self.server, sup_cfg, sections[0], {}, {}) def read_supplemental_config(self, file_name: str) -> ConfigHelper: cfg_file_path = os.path.normpath(os.path.expanduser(file_name)) @@ -415,7 +426,7 @@ class ConfigHelper: except Exception: raise ConfigError(f"Error Reading Config: '{cfg_file_path}'") sections = sup_cfg.sections() - return ConfigHelper(self.server, sup_cfg, sections[0], {}) + return ConfigHelper(self.server, sup_cfg, sections[0], {}, {}) def write_config(self, file_obj: IO[str]) -> None: self.config.write(file_obj) @@ -442,26 +453,67 @@ class ConfigHelper: "failed to load. In the future this will result " "in a startup error.") -def get_configuration(server: Server, - app_args: Dict[str, Any] - ) -> ConfigHelper: - cfg_file_path: str = os.path.normpath(os.path.expanduser( - app_args['config_file'])) +def get_configuration( + server: Server, app_args: Dict[str, Any] +) -> ConfigHelper: config = configparser.ConfigParser(interpolation=None) - try: - config.read_file(open(cfg_file_path)) - except Exception as e: - if not os.path.isfile(cfg_file_path): - raise ConfigError( - f"Configuration File Not Found: '{cfg_file_path}''") from e - if not os.access(cfg_file_path, os.R_OK | os.W_OK): - raise ConfigError( - "Moonraker does not have Read/Write permission for " - f"config file at path '{cfg_file_path}'") from e - raise ConfigError(f"Error Reading Config: '{cfg_file_path}'") from e + section_map = parse_config_file(config, app_args) if not config.has_section('server'): raise ConfigError("No section [server] in config") - return ConfigHelper(server, config, 'server', {}) + return ConfigHelper(server, config, 'server', {}, section_map) + +def parse_config_file( + config: configparser.ConfigParser, app_args: Dict[str, Any] +) -> Dict[str, List[str]]: + start_path = pathlib.Path(app_args['config_file']).expanduser().resolve() + config_files: List[pathlib.Path] = [start_path] + visited_files: Set[Tuple[int, int]] = set() + file_sections: Dict[str, List[str]] = {} + while config_files: + config_path = config_files.pop(0) + try: + stat = config_path.stat() + visited = (stat.st_dev, stat.st_ino) + if visited in visited_files: + raise ConfigError("Recursive include directive detected") + visited_files.add(visited) + data = config_path.read_text() + config.read_string(data) + except Exception as e: + if not config_path.is_file(): + raise ConfigError( + f"Configuration File Not Found: '{config_path}''") from e + if not os.access(config_path, os.R_OK): + raise ConfigError( + "Moonraker does not have Read/Write permission for " + f"config file at path '{config_path}'") from e + raise ConfigError(f"Error Reading Config: '{config_path}'") from e + all_sections: List[str] = re.findall( + r"^\[([^]]+)\]\s*$", data, flags=re.MULTILINE + ) + file_sections[str(config_path)] = [ + sec for sec in all_sections if not sec.startswith("include") + ] + for sec in config.sections(): + if not sec.startswith("include"): + continue + str_path = sec[8:].strip() + if not str_path: + raise ConfigError( + f"Invalid include directive: [{sec}]" + ) + config.remove_section(sec) + if str_path[0] == "/": + path = pathlib.Path(str_path) + paths = sorted(path.parent.glob(path.name)) + else: + paths = sorted(config_path.parent.glob(str_path)) + if not paths: + raise ConfigError( + f"No files matching include directive [{sec}]" + ) + config_files.extend(paths) + return file_sections def backup_config(cfg_path: str) -> None: cfg = pathlib.Path(cfg_path).expanduser().resolve()