confighelper: support config modification
Signed-off-by: Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
parent
a776f1f6dc
commit
a4f9fbe298
|
@ -10,16 +10,21 @@ import os
|
|||
import hashlib
|
||||
import pathlib
|
||||
import re
|
||||
import threading
|
||||
import copy
|
||||
import logging
|
||||
from io import StringIO
|
||||
from utils import SentinelClass
|
||||
|
||||
# Annotation imports
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
Any,
|
||||
Awaitable,
|
||||
Callable,
|
||||
IO,
|
||||
Optional,
|
||||
Set,
|
||||
Tuple,
|
||||
TypeVar,
|
||||
Union,
|
||||
|
@ -53,7 +58,7 @@ class ConfigHelper:
|
|||
) -> None:
|
||||
self.server = server
|
||||
self.source = config_source
|
||||
self.config = config_source.config
|
||||
self.config = config_source.get_parser()
|
||||
self.section = section
|
||||
self.fallback_section: Optional[str] = fallback_section
|
||||
self.parsed = parsed
|
||||
|
@ -65,6 +70,9 @@ class ConfigHelper:
|
|||
def get_server(self) -> Server:
|
||||
return self.server
|
||||
|
||||
def get_source(self) -> ConfigSourceWrapper:
|
||||
return self.source
|
||||
|
||||
def __getitem__(self, key: str) -> ConfigHelper:
|
||||
return self.getsection(key)
|
||||
|
||||
|
@ -75,7 +83,7 @@ class ConfigHelper:
|
|||
return self.config.has_option(self.section, option)
|
||||
|
||||
def set_option(self, option: str, value: str) -> None:
|
||||
self.config[self.section][option] = value
|
||||
self.source.set_option(self.section, option, value)
|
||||
|
||||
def get_name(self) -> str:
|
||||
return self.section
|
||||
|
@ -401,14 +409,14 @@ class ConfigHelper:
|
|||
def read_supplemental_dict(self, obj: Dict[str, Any]) -> ConfigHelper:
|
||||
if not obj:
|
||||
raise ConfigError(f"Cannot ready Empty Dict")
|
||||
source = ConfigSourceWrapper()
|
||||
source = DictSourceWrapper()
|
||||
source.read_dict(obj)
|
||||
sections = source.config.sections()
|
||||
return ConfigHelper(self.server, source, sections[0], {})
|
||||
|
||||
def read_supplemental_config(self, file_name: str) -> ConfigHelper:
|
||||
fpath = pathlib.Path(file_name).expanduser().resolve()
|
||||
source = ConfigSourceWrapper()
|
||||
source = FileSourceWrapper(self.server)
|
||||
source.read_file(fpath)
|
||||
sections = source.config.sections()
|
||||
return ConfigHelper(self.server, source, sections[0], {})
|
||||
|
@ -420,15 +428,13 @@ class ConfigHelper:
|
|||
return dict(self.parsed)
|
||||
|
||||
def get_orig_config(self) -> Dict[str, Dict[str, str]]:
|
||||
return {
|
||||
key: dict(val) for key, val in self.config.items()
|
||||
}
|
||||
return self.source.as_dict()
|
||||
|
||||
def get_file_sections(self) -> Dict[str, List[str]]:
|
||||
return self.source.get_file_sections(self.section)
|
||||
return self.source.get_file_sections()
|
||||
|
||||
def get_config_files(self) -> List[str]:
|
||||
return [str(f) for f in self.source.files]
|
||||
return [str(f) for f in self.source.get_files()]
|
||||
|
||||
def validate_config(self) -> None:
|
||||
for sect in self.config.sections():
|
||||
|
@ -457,7 +463,7 @@ class ConfigHelper:
|
|||
try:
|
||||
if backup.exists():
|
||||
cfg_mtime: int = 0
|
||||
for cfg in self.source.files:
|
||||
for cfg in self.source.get_files():
|
||||
cfg_mtime = max(cfg_mtime, cfg.stat().st_mtime_ns)
|
||||
backup_mtime = backup.stat().st_mtime_ns
|
||||
if backup_mtime >= cfg_mtime:
|
||||
|
@ -473,15 +479,386 @@ class ConfigHelper:
|
|||
backup_fp.close()
|
||||
|
||||
class ConfigSourceWrapper:
|
||||
def __init__(self):
|
||||
self.config = configparser.ConfigParser(interpolation=None)
|
||||
|
||||
def get_parser(self):
|
||||
return self.config
|
||||
|
||||
def as_dict(self) -> Dict[str, Dict[str, str]]:
|
||||
return {key: dict(val) for key, val in self.config.items()}
|
||||
|
||||
def write_to_string(self) -> str:
|
||||
sio = StringIO()
|
||||
self.config.write(sio)
|
||||
val = sio.getvalue()
|
||||
sio.close()
|
||||
return val
|
||||
|
||||
def get_files(self) -> List[pathlib.Path]:
|
||||
return []
|
||||
|
||||
def set_option(self, section: str, option: str, value: str) -> None:
|
||||
self.config.set(section, option, value)
|
||||
|
||||
def remove_option(self, section: str, option: str) -> None:
|
||||
self.config.remove_option(section, option)
|
||||
|
||||
def add_section(self, section: str) -> None:
|
||||
self.config.add_section(section)
|
||||
|
||||
def remove_section(self, section: str) -> None:
|
||||
self.config.remove_section(section)
|
||||
|
||||
def get_file_sections(self) -> Dict[str, List[str]]:
|
||||
return {}
|
||||
|
||||
def find_config_file(
|
||||
self, section: str, option: Optional[str] = None
|
||||
) -> Optional[pathlib.Path]:
|
||||
return None
|
||||
|
||||
class DictSourceWrapper(ConfigSourceWrapper):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
def read_dict(self, cfg: Dict[str, Any]) -> None:
|
||||
try:
|
||||
self.config.read_dict(cfg)
|
||||
except Exception as e:
|
||||
raise ConfigError("Error Reading config as dict") from e
|
||||
|
||||
class FileSourceWrapper(ConfigSourceWrapper):
|
||||
section_r = re.compile(r"\s*\[([^]]+)\]")
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.config = configparser.ConfigParser(interpolation=None)
|
||||
def __init__(self, server: Server) -> None:
|
||||
super().__init__()
|
||||
self.server = server
|
||||
self.files: List[pathlib.Path] = []
|
||||
self.raw_config_data: List[str] = []
|
||||
self.updates_pending: Set[int] = set()
|
||||
self.file_section_map: Dict[str, List[int]] = {}
|
||||
self.file_option_map: Dict[Tuple[str, str], List[int]] = {}
|
||||
self.save_lock = threading.Lock()
|
||||
self.backup: Dict[str, Any] = {}
|
||||
|
||||
def get_file_sections(self, section: str) -> Dict[str, List[str]]:
|
||||
def get_files(self) -> List[pathlib.Path]:
|
||||
return self.files
|
||||
|
||||
def is_in_transaction(self) -> bool:
|
||||
return (
|
||||
len(self.updates_pending) > 0 or
|
||||
self.save_lock.locked()
|
||||
)
|
||||
|
||||
def backup_source(self) -> None:
|
||||
self.backup = {
|
||||
"raw_data": list(self.raw_config_data),
|
||||
"section_map": copy.deepcopy(self.file_section_map),
|
||||
"option_map": copy.deepcopy(self.file_option_map),
|
||||
"config": self.write_to_string()
|
||||
}
|
||||
|
||||
def _acquire_save_lock(self) -> None:
|
||||
if not self.files:
|
||||
raise ConfigError(
|
||||
"Can only modify file backed configurations"
|
||||
)
|
||||
if not self.save_lock.acquire(blocking=False):
|
||||
raise ConfigError("Configuration locked, cannot modify")
|
||||
|
||||
def set_option(self, section: str, option: str, value: str) -> None:
|
||||
self._acquire_save_lock()
|
||||
try:
|
||||
value = value.strip()
|
||||
try:
|
||||
if (self.config.get(section, option).strip() == value):
|
||||
return
|
||||
except (configparser.NoSectionError, configparser.NoOptionError):
|
||||
pass
|
||||
file_idx: int = 0
|
||||
has_sec = has_opt = False
|
||||
if (section, option) in self.file_option_map:
|
||||
file_idx = self.file_option_map[(section, option)][0]
|
||||
has_sec = has_opt = True
|
||||
elif section in self.file_section_map:
|
||||
file_idx = self.file_section_map[section][0]
|
||||
has_sec = True
|
||||
buf = self.raw_config_data[file_idx].splitlines()
|
||||
new_opt_list = [f"{option}: {value}"]
|
||||
if "\n" in value:
|
||||
vals = [f" {v}" for v in value.split("\n")]
|
||||
new_opt_list = [f"{option}:"] + vals
|
||||
sec_info = self._find_section_info(section, buf, raise_error=False)
|
||||
if sec_info:
|
||||
options: Dict[str, Any] = sec_info["options"]
|
||||
indent: int = sec_info["indent"]
|
||||
opt_start: int = sec_info["end"]
|
||||
opt_end: int = sec_info["end"]
|
||||
opt_info: Optional[Dict[str, Any]] = options.get(option)
|
||||
if opt_info is not None:
|
||||
indent = opt_info["indent"]
|
||||
opt_start = opt_info["start"]
|
||||
opt_end = opt_info["end"]
|
||||
elif options:
|
||||
# match indentation of last option in section
|
||||
last_opt = list(options.values())[-1]
|
||||
indent = last_opt["indent"]
|
||||
if indent:
|
||||
padding = " " * indent
|
||||
new_opt_list = [f"{padding}{v}" for v in new_opt_list]
|
||||
buf[opt_start:] = new_opt_list + buf[opt_end:]
|
||||
else:
|
||||
# Append new section to the end of the file
|
||||
new_opt_list.insert(0, f"[{section}]")
|
||||
if buf and buf[-1].strip() != "":
|
||||
new_opt_list.insert(0, "")
|
||||
buf.extend(new_opt_list)
|
||||
buf.append("")
|
||||
updated_cfg = "\n".join(buf)
|
||||
# test changes to the configuration
|
||||
test_parser = configparser.ConfigParser(interpolation=None)
|
||||
try:
|
||||
test_parser.read_string(updated_cfg)
|
||||
if not test_parser.has_option(section, option):
|
||||
raise ConfigError("Option not added")
|
||||
except Exception as e:
|
||||
raise ConfigError(
|
||||
f"Failed to set option '{option}' in section "
|
||||
f"[{section}], file: {self.files[file_idx]}"
|
||||
) from e
|
||||
# Update local configuration/tracking
|
||||
self.raw_config_data[file_idx] = updated_cfg
|
||||
self.updates_pending.add(file_idx)
|
||||
if not has_sec:
|
||||
self.file_section_map[section] = [file_idx]
|
||||
if not has_opt:
|
||||
self.file_option_map[(section, option)] = [file_idx]
|
||||
if not self.config.has_section(section):
|
||||
self.config.add_section(section)
|
||||
self.config.set(section, option, value)
|
||||
finally:
|
||||
self.save_lock.release()
|
||||
|
||||
def remove_option(self, section: str, option: str) -> None:
|
||||
self._acquire_save_lock()
|
||||
try:
|
||||
key = (section, option)
|
||||
if key not in self.file_option_map:
|
||||
return
|
||||
pending: List[Tuple[int, str]] = []
|
||||
file_indices = self.file_option_map[key]
|
||||
for idx in file_indices:
|
||||
buf = self.raw_config_data[idx].splitlines()
|
||||
try:
|
||||
sec_info = self._find_section_info(section, buf)
|
||||
opt_info = sec_info["options"][option]
|
||||
start = opt_info["start"]
|
||||
end = opt_info["end"]
|
||||
if (
|
||||
end < len(buf) and
|
||||
not buf[start-1].strip()
|
||||
and not buf[end].strip()
|
||||
):
|
||||
end += 1
|
||||
buf[start:] = buf[end:]
|
||||
buf.append("")
|
||||
updated_cfg = "\n".join(buf)
|
||||
test_parser = configparser.ConfigParser(interpolation=None)
|
||||
test_parser.read_string(updated_cfg)
|
||||
if test_parser.has_option(section, option):
|
||||
raise ConfigError("Option still exists")
|
||||
pending.append((idx, updated_cfg))
|
||||
except Exception as e:
|
||||
raise ConfigError(
|
||||
f"Failed to remove option '{option}' from section "
|
||||
f"[{section}], file: {self.files[idx]}"
|
||||
) from e
|
||||
# Update configuration/tracking
|
||||
for (idx, data) in pending:
|
||||
self.updates_pending.add(idx)
|
||||
self.raw_config_data[idx] = data
|
||||
del self.file_option_map[key]
|
||||
self.config.remove_option(section, option)
|
||||
finally:
|
||||
self.save_lock.release()
|
||||
|
||||
def add_section(self, section: str) -> None:
|
||||
self._acquire_save_lock()
|
||||
try:
|
||||
if section in self.file_section_map:
|
||||
return
|
||||
# add section to end of primary file
|
||||
buf = self.raw_config_data[0].splitlines()
|
||||
if buf and buf[-1].strip() != "":
|
||||
buf.append("")
|
||||
buf.extend([f"[{section}]", ""])
|
||||
updated_cfg = "\n".join(buf)
|
||||
try:
|
||||
test_parser = configparser.ConfigParser(interpolation=None)
|
||||
test_parser.read_string(updated_cfg)
|
||||
if not test_parser.has_section(section):
|
||||
raise ConfigError("Section not added")
|
||||
except Exception as e:
|
||||
raise ConfigError(
|
||||
f"Failed to add section [{section}], file: {self.files[0]}"
|
||||
) from e
|
||||
self.updates_pending.add(0)
|
||||
self.file_section_map[section] = [0]
|
||||
self.raw_config_data[0] = updated_cfg
|
||||
self.config.add_section(section)
|
||||
finally:
|
||||
self.save_lock.release()
|
||||
|
||||
def remove_section(self, section: str) -> None:
|
||||
self._acquire_save_lock()
|
||||
try:
|
||||
if section not in self.file_section_map:
|
||||
return
|
||||
pending: List[Tuple[int, str]] = []
|
||||
file_indices = self.file_section_map[section]
|
||||
for idx in file_indices:
|
||||
buf = self.raw_config_data[idx].splitlines()
|
||||
try:
|
||||
sec_info = self._find_section_info(section, buf)
|
||||
start = sec_info["start"]
|
||||
end = sec_info["end"]
|
||||
if (
|
||||
end < len(buf) and
|
||||
not buf[start-1].strip()
|
||||
and not buf[end].strip()
|
||||
):
|
||||
end += 1
|
||||
buf[start:] = buf[end:]
|
||||
buf.append("")
|
||||
updated_cfg = "\n".join(buf)
|
||||
test_parser = configparser.ConfigParser(interpolation=None)
|
||||
test_parser.read_string(updated_cfg)
|
||||
if test_parser.has_section(section):
|
||||
raise ConfigError("Section still exists")
|
||||
pending.append((idx, updated_cfg))
|
||||
except Exception as e:
|
||||
raise ConfigError(
|
||||
f"Failed to remove section [{section}], "
|
||||
f"file: {self.files[0]}"
|
||||
) from e
|
||||
for (idx, data) in pending:
|
||||
self.updates_pending.add(idx)
|
||||
self.raw_config_data[idx] = data
|
||||
del self.file_section_map[section]
|
||||
self.config.remove_section(section)
|
||||
finally:
|
||||
self.save_lock.release()
|
||||
|
||||
def save(self) -> Awaitable[bool]:
|
||||
eventloop = self.server.get_event_loop()
|
||||
if self.server.is_running():
|
||||
fut = eventloop.run_in_thread(self._do_save)
|
||||
else:
|
||||
fut = eventloop.create_future()
|
||||
fut.set_result(self._do_save())
|
||||
return fut
|
||||
|
||||
def _do_save(self) -> bool:
|
||||
with self.save_lock:
|
||||
self.backup.clear()
|
||||
if not self.updates_pending:
|
||||
return False
|
||||
for idx in self.updates_pending:
|
||||
fpath = self.files[idx]
|
||||
fpath.write_text(
|
||||
self.raw_config_data[idx], encoding="utf-8"
|
||||
)
|
||||
self.updates_pending.clear()
|
||||
return True
|
||||
|
||||
def cancel(self):
|
||||
self._acquire_save_lock()
|
||||
try:
|
||||
if not self.backup or not self.updates_pending:
|
||||
self.backup.clear()
|
||||
return
|
||||
self.raw_config_data = self.backup["raw_data"]
|
||||
self.file_option_map = self.backup["option_map"]
|
||||
self.file_section_map = self.backup["section_map"]
|
||||
self.config.clear()
|
||||
self.config.read_string(self.backup["config"])
|
||||
self.updates_pending.clear()
|
||||
self.backup.clear()
|
||||
finally:
|
||||
self.save_lock.release()
|
||||
|
||||
def revert(self) -> Awaitable[bool]:
|
||||
eventloop = self.server.get_event_loop()
|
||||
if self.server.is_running():
|
||||
fut = eventloop.run_in_thread(self._do_revert)
|
||||
else:
|
||||
fut = eventloop.create_future()
|
||||
fut.set_result(self._do_revert())
|
||||
return fut
|
||||
|
||||
def _do_revert(self) -> bool:
|
||||
with self.save_lock:
|
||||
if not self.updates_pending:
|
||||
return False
|
||||
self.backup.clear()
|
||||
entry = self.files[0]
|
||||
self.read_file(entry)
|
||||
return True
|
||||
|
||||
def _find_section_info(
|
||||
self, section: str, file_data: List[str], raise_error: bool = True
|
||||
) -> Dict[str, Any]:
|
||||
options: Dict[str, Dict[str, Any]] = {}
|
||||
result: Dict[str, Any] = {
|
||||
"indent": -1,
|
||||
"start": -1,
|
||||
"end": -1,
|
||||
"options": options
|
||||
}
|
||||
last_option: str = ""
|
||||
opt_indent = -1
|
||||
for idx, line in enumerate(file_data):
|
||||
if not line.strip() or line.lstrip()[0] in "#;":
|
||||
# skip empty lines, whitespace, and comments
|
||||
continue
|
||||
line = line.expandtabs()
|
||||
line_indent = len(line) - len(line.strip())
|
||||
if opt_indent != -1 and line_indent > opt_indent:
|
||||
if last_option:
|
||||
options[last_option]["end"] = idx + 1
|
||||
# Continuation of an option
|
||||
if result["start"] != -1:
|
||||
result["end"] = idx + 1
|
||||
continue
|
||||
sec_match = self.section_r.match(line)
|
||||
if sec_match is not None:
|
||||
opt_indent = -1
|
||||
if result["start"] != -1:
|
||||
break
|
||||
cursec = sec_match.group(1)
|
||||
if section == cursec:
|
||||
result["indent"] = line_indent
|
||||
result["start"] = idx
|
||||
result["end"] = idx + 1
|
||||
else:
|
||||
# This is an option
|
||||
opt_indent = line_indent
|
||||
if result["start"] != -1:
|
||||
result["end"] = idx + 1
|
||||
last_option = re.split(r"[:=]", line, 1)[0].strip()
|
||||
options[last_option] = {
|
||||
"indent": line_indent,
|
||||
"start": idx,
|
||||
"end": idx + 1
|
||||
}
|
||||
if result["start"] != -1:
|
||||
return result
|
||||
if raise_error:
|
||||
raise ConfigError(f"Unable to find section [{section}]")
|
||||
return {}
|
||||
|
||||
def get_file_sections(self) -> Dict[str, List[str]]:
|
||||
sections_by_file: Dict[str, List[str]] = {
|
||||
str(fname): [] for fname in self.files
|
||||
}
|
||||
|
@ -524,20 +901,19 @@ class ConfigSourceWrapper:
|
|||
visited.append(cur_stat)
|
||||
self.files.append(file_path)
|
||||
file_index = len(self.files) - 1
|
||||
lines = file_path.read_text().splitlines()
|
||||
cfg_data = file_path.read_text(encoding="utf-8")
|
||||
self.raw_config_data.append(cfg_data)
|
||||
lines = cfg_data.splitlines()
|
||||
last_section = ""
|
||||
opt_indent = -1
|
||||
for line in lines:
|
||||
if not line.strip():
|
||||
# ignore a line that contains only whitespace
|
||||
if not line.strip() or line.lstrip()[0] in "#;":
|
||||
# ignore lines that contain only whitespace/comments
|
||||
continue
|
||||
line = line.expandtabs(tabsize=4)
|
||||
# Remove inline comments
|
||||
for prefix in "#;":
|
||||
icmt = line.find(prefix)
|
||||
if icmt >= 0 and line.lstrip()[0] == prefix:
|
||||
# This line is a comment, ignore it
|
||||
continue
|
||||
if icmt > 0 and line[icmt-1] != "\\":
|
||||
# inline comment, remove it
|
||||
line = line[:icmt]
|
||||
|
@ -578,15 +954,21 @@ class ConfigSourceWrapper:
|
|||
continue
|
||||
else:
|
||||
last_section = section
|
||||
fsm = self.file_section_map
|
||||
fsm.setdefault(section, []).insert(0, file_index)
|
||||
if section not in self.file_section_map:
|
||||
self.file_section_map[section] = []
|
||||
elif file_index in self.file_section_map[section]:
|
||||
self.file_section_map[section].remove(file_index)
|
||||
self.file_section_map[section].insert(0, file_index)
|
||||
else:
|
||||
# This line must specify an option
|
||||
opt_indent = line_indent
|
||||
option = re.split(r"[:=]", line, 1)[0].strip()
|
||||
key = (last_section, option)
|
||||
fom = self.file_option_map
|
||||
fom.setdefault(key, []).insert(0, file_index)
|
||||
if key not in self.file_option_map:
|
||||
self.file_option_map[key] = []
|
||||
elif file_index in self.file_option_map[key]:
|
||||
self.file_option_map[key].remove(file_index)
|
||||
self.file_option_map[key].insert(0, file_index)
|
||||
buffer.append(line)
|
||||
self._write_buffer(buffer, file_path)
|
||||
except ConfigError:
|
||||
|
@ -602,22 +984,24 @@ class ConfigSourceWrapper:
|
|||
raise ConfigError(f"Error Reading Config: '{file_path}'") from e
|
||||
|
||||
def read_file(self, main_conf: pathlib.Path) -> None:
|
||||
self.config.clear()
|
||||
self.files.clear()
|
||||
self.raw_config_data.clear()
|
||||
self.updates_pending.clear()
|
||||
self.file_section_map.clear()
|
||||
self.file_option_map.clear()
|
||||
self._parse_file(main_conf, [])
|
||||
|
||||
def read_dict(self, cfg: Dict[str, Any]) -> None:
|
||||
try:
|
||||
self.config.read_dict(cfg)
|
||||
except Exception as e:
|
||||
raise ConfigError("Error Reading config as dict") from e
|
||||
size = sum([len(rawcfg) for rawcfg in self.raw_config_data])
|
||||
logging.info(
|
||||
f"Configuration File '{main_conf}' parsed, total size: {size} B"
|
||||
)
|
||||
|
||||
|
||||
def get_configuration(
|
||||
server: Server, app_args: Dict[str, Any]
|
||||
) -> ConfigHelper:
|
||||
start_path = pathlib.Path(app_args['config_file']).expanduser().resolve()
|
||||
source = ConfigSourceWrapper()
|
||||
source = FileSourceWrapper(server)
|
||||
source.read_file(start_path)
|
||||
if not source.config.has_section('server'):
|
||||
raise ConfigError("No section [server] in config")
|
||||
|
|
Loading…
Reference in New Issue