confighelper: add support for include directives

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2022-04-05 12:20:33 -04:00
parent 2e4814978f
commit ad00e08d11
No known key found for this signature in database
GPG Key ID: 7027245FBBDDF59A
1 changed files with 72 additions and 20 deletions

View File

@ -11,6 +11,7 @@ import hashlib
import shutil import shutil
import filecmp import filecmp
import pathlib import pathlib
import re
import logging import logging
from utils import SentinelClass from utils import SentinelClass
@ -21,6 +22,7 @@ from typing import (
Callable, Callable,
IO, IO,
Optional, Optional,
Set,
Tuple, Tuple,
TypeVar, TypeVar,
Union, Union,
@ -49,6 +51,7 @@ class ConfigHelper:
config: configparser.ConfigParser, config: configparser.ConfigParser,
section: str, section: str,
parsed: Dict[str, Dict[str, ConfigVal]], parsed: Dict[str, Dict[str, ConfigVal]],
file_section_map: Dict[str, List[str]],
fallback_section: Optional[str] = None fallback_section: Optional[str] = None
) -> None: ) -> None:
self.server = server self.server = server
@ -58,6 +61,7 @@ class ConfigHelper:
self.parsed = parsed self.parsed = parsed
if self.section not in self.parsed: if self.section not in self.parsed:
self.parsed[self.section] = {} self.parsed[self.section] = {}
self.file_section_map = file_section_map
self.sections = config.sections self.sections = config.sections
self.has_section = config.has_section self.has_section = config.has_section
@ -79,6 +83,12 @@ class ConfigHelper:
def get_name(self) -> str: def get_name(self) -> str:
return self.section return self.section
def get_file(self) -> Optional[pathlib.Path]:
for fname in reversed(self.file_section_map.keys()):
if self.section in self.file_section_map[fname]:
return pathlib.Path(fname)
return None
def get_options(self) -> Dict[str, str]: def get_options(self) -> Dict[str, str]:
if self.section not in self.config: if self.section not in self.config:
return {} return {}
@ -101,7 +111,8 @@ class ConfigHelper:
self, section: str, fallback: Optional[str] = None self, section: str, fallback: Optional[str] = None
) -> ConfigHelper: ) -> ConfigHelper:
return ConfigHelper( return ConfigHelper(
self.server, self.config, section, self.parsed, fallback self.server, self.config, section, self.parsed,
self.file_section_map, fallback
) )
def _get_option(self, def _get_option(self,
@ -402,7 +413,7 @@ class ConfigHelper:
except Exception: except Exception:
raise ConfigError("Error Reading Object") raise ConfigError("Error Reading Object")
sections = sup_cfg.sections() sections = sup_cfg.sections()
return ConfigHelper(self.server, sup_cfg, sections[0], {}) return ConfigHelper(self.server, sup_cfg, sections[0], {}, {})
def read_supplemental_config(self, file_name: str) -> ConfigHelper: def read_supplemental_config(self, file_name: str) -> ConfigHelper:
cfg_file_path = os.path.normpath(os.path.expanduser(file_name)) cfg_file_path = os.path.normpath(os.path.expanduser(file_name))
@ -415,7 +426,7 @@ class ConfigHelper:
except Exception: except Exception:
raise ConfigError(f"Error Reading Config: '{cfg_file_path}'") raise ConfigError(f"Error Reading Config: '{cfg_file_path}'")
sections = sup_cfg.sections() sections = sup_cfg.sections()
return ConfigHelper(self.server, sup_cfg, sections[0], {}) return ConfigHelper(self.server, sup_cfg, sections[0], {}, {})
def write_config(self, file_obj: IO[str]) -> None: def write_config(self, file_obj: IO[str]) -> None:
self.config.write(file_obj) self.config.write(file_obj)
@ -442,26 +453,67 @@ class ConfigHelper:
"failed to load. In the future this will result " "failed to load. In the future this will result "
"in a startup error.") "in a startup error.")
def get_configuration(server: Server, def get_configuration(
app_args: Dict[str, Any] server: Server, app_args: Dict[str, Any]
) -> ConfigHelper: ) -> ConfigHelper:
cfg_file_path: str = os.path.normpath(os.path.expanduser(
app_args['config_file']))
config = configparser.ConfigParser(interpolation=None) config = configparser.ConfigParser(interpolation=None)
try: section_map = parse_config_file(config, app_args)
config.read_file(open(cfg_file_path))
except Exception as e:
if not os.path.isfile(cfg_file_path):
raise ConfigError(
f"Configuration File Not Found: '{cfg_file_path}''") from e
if not os.access(cfg_file_path, os.R_OK | os.W_OK):
raise ConfigError(
"Moonraker does not have Read/Write permission for "
f"config file at path '{cfg_file_path}'") from e
raise ConfigError(f"Error Reading Config: '{cfg_file_path}'") from e
if not config.has_section('server'): if not config.has_section('server'):
raise ConfigError("No section [server] in config") raise ConfigError("No section [server] in config")
return ConfigHelper(server, config, 'server', {}) return ConfigHelper(server, config, 'server', {}, section_map)
def parse_config_file(
config: configparser.ConfigParser, app_args: Dict[str, Any]
) -> Dict[str, List[str]]:
start_path = pathlib.Path(app_args['config_file']).expanduser().resolve()
config_files: List[pathlib.Path] = [start_path]
visited_files: Set[Tuple[int, int]] = set()
file_sections: Dict[str, List[str]] = {}
while config_files:
config_path = config_files.pop(0)
try:
stat = config_path.stat()
visited = (stat.st_dev, stat.st_ino)
if visited in visited_files:
raise ConfigError("Recursive include directive detected")
visited_files.add(visited)
data = config_path.read_text()
config.read_string(data)
except Exception as e:
if not config_path.is_file():
raise ConfigError(
f"Configuration File Not Found: '{config_path}''") from e
if not os.access(config_path, os.R_OK):
raise ConfigError(
"Moonraker does not have Read/Write permission for "
f"config file at path '{config_path}'") from e
raise ConfigError(f"Error Reading Config: '{config_path}'") from e
all_sections: List[str] = re.findall(
r"^\[([^]]+)\]\s*$", data, flags=re.MULTILINE
)
file_sections[str(config_path)] = [
sec for sec in all_sections if not sec.startswith("include")
]
for sec in config.sections():
if not sec.startswith("include"):
continue
str_path = sec[8:].strip()
if not str_path:
raise ConfigError(
f"Invalid include directive: [{sec}]"
)
config.remove_section(sec)
if str_path[0] == "/":
path = pathlib.Path(str_path)
paths = sorted(path.parent.glob(path.name))
else:
paths = sorted(config_path.parent.glob(str_path))
if not paths:
raise ConfigError(
f"No files matching include directive [{sec}]"
)
config_files.extend(paths)
return file_sections
def backup_config(cfg_path: str) -> None: def backup_config(cfg_path: str) -> None:
cfg = pathlib.Path(cfg_path).expanduser().resolve() cfg = pathlib.Path(cfg_path).expanduser().resolve()