update_manager: fix defaults for base configuration

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2023-06-27 10:52:34 -04:00
parent 07544718fa
commit a1e786fd73
No known key found for this signature in database
GPG Key ID: 5A1EB336DFB4C71B
3 changed files with 28 additions and 37 deletions

View File

@ -123,12 +123,6 @@ class AppDeploy(BaseDeploy):
f"Extension {self.name} managed services: {self.managed_services}"
)
@staticmethod
def _is_git_repo(app_path: Union[str, pathlib.Path]) -> bool:
if isinstance(app_path, str):
app_path = pathlib.Path(app_path).expanduser()
return app_path.joinpath('.git').exists()
def _configure_path(self, config: ConfigHelper) -> None:
self.path = pathlib.Path(config.get('path')).expanduser().resolve()
self._verify_path(config, 'path', self.path, check_file=False)

View File

@ -13,7 +13,8 @@ from ...utils import source_info
from typing import (
TYPE_CHECKING,
Dict,
Optional
Optional,
Union
)
if TYPE_CHECKING:
@ -45,19 +46,19 @@ BASE_CONFIG: Dict[str, Dict[str, str]] = {
}
}
def get_app_type(app_path: Optional[pathlib.Path] = None) -> str:
def get_app_type(app_path: Union[str, pathlib.Path]) -> str:
if isinstance(app_path, str):
app_path = pathlib.Path(app_path).expanduser()
# None type will perform checks on Moonraker
if source_info.is_git_repo(app_path):
return "git_repo"
else:
return "zip"
return "none"
def get_base_configuration(config: ConfigHelper, channel: str) -> ConfigHelper:
def get_base_configuration(config: ConfigHelper) -> ConfigHelper:
server = config.get_server()
base_cfg = copy.deepcopy(BASE_CONFIG)
base_cfg["moonraker"]["channel"] = channel
base_cfg["moonraker"]["type"] = get_app_type()
base_cfg["klipper"]["channel"] = "beta" if channel == "stable" else channel
base_cfg["moonraker"]["type"] = get_app_type(source_info.source_path())
db: MoonrakerDatabase = server.lookup_component('database')
base_cfg["klipper"]["path"] = db.get_item(
"moonraker", "update_manager.klipper_path", KLIPPER_DEFAULT_PATH
@ -65,6 +66,9 @@ def get_base_configuration(config: ConfigHelper, channel: str) -> ConfigHelper:
base_cfg["klipper"]["env"] = db.get_item(
"moonraker", "update_manager.klipper_exec", KLIPPER_DEFAULT_EXEC
).result()
klipper_path = pathlib.Path(base_cfg["klipper"]["path"])
base_cfg["klipper"]["type"] = get_app_type(klipper_path)
base_cfg["klipper"]["type"] = get_app_type(base_cfg["klipper"]["path"])
if config.has_option("channel"):
channel = config.get("channel")
base_cfg["moonraker"]["channel"] = channel
base_cfg["klipper"]["channel"] = channel
return config.read_supplemental_dict(base_cfg)

View File

@ -26,6 +26,7 @@ from .zip_deploy import ZipDeploy
# Annotation imports
from typing import (
TYPE_CHECKING,
TypeVar,
Any,
Awaitable,
Callable,
@ -54,37 +55,28 @@ if TYPE_CHECKING:
from dbus_next import Variant
from dbus_next.aio import ProxyInterface
JsonType = Union[List[Any], Dict[str, Any]]
_T = TypeVar("_T")
# Check To see if Updates are necessary each hour
UPDATE_REFRESH_INTERVAL = 3600.
# Perform auto refresh no later than 4am
MAX_UPDATE_HOUR = 4
def get_deploy_class(app_path: str) -> Type:
if AppDeploy._is_git_repo(app_path):
return GitDeploy
else:
return ZipDeploy
def get_deploy_class(type: str, default: _T) -> Union[Type[BaseDeploy], _T]:
_deployers = {
"web": WebClientDeploy,
"git_repo": GitDeploy,
"zip": ZipDeploy
}
return _deployers.get(type, default)
class UpdateManager:
def __init__(self, config: ConfigHelper) -> None:
_deployers = {
"web": WebClientDeploy,
"git_repo": GitDeploy,
"zip": ZipDeploy
}
self.server = config.get_server()
self.event_loop = self.server.get_event_loop()
self.kconn: KlippyConnection
self.kconn = self.server.lookup_component("klippy_connection")
self.channel = config.get('channel', "dev")
if self.channel not in ["dev", "beta"]:
raise config.error(
f"Unsupported channel '{self.channel}' in section"
" [update_manager]")
self.app_config = base_config.get_base_configuration(
config, self.channel
)
self.app_config = base_config.get_base_configuration(config)
auto_refresh_enabled = config.getboolean('enable_auto_refresh', False)
self.cmd_helper = CommandHelper(config, self.get_updaters)
self.updaters: Dict[str, BaseDeploy] = {}
@ -92,14 +84,14 @@ class UpdateManager:
self.updaters['system'] = PackageDeploy(config, self.cmd_helper)
mcfg = self.app_config["moonraker"]
kcfg = self.app_config["klipper"]
mclass = get_deploy_class(mcfg.get("path"))
mclass = get_deploy_class(mcfg.get("type"), BaseDeploy)
self.updaters['moonraker'] = mclass(mcfg, self.cmd_helper)
kclass = BaseDeploy
if (
os.path.exists(kcfg.get("path")) and
os.path.exists(kcfg.get("env"))
):
kclass = get_deploy_class(kcfg.get("path"))
kclass = get_deploy_class(kcfg.get("type"), BaseDeploy)
self.updaters['klipper'] = kclass(kcfg, self.cmd_helper)
# TODO: The below check may be removed when invalid config options
@ -123,7 +115,7 @@ class UpdateManager:
continue
try:
client_type = cfg.get("type")
deployer = _deployers.get(client_type, None)
deployer = get_deploy_class(client_type, None)
if deployer is None:
self.server.add_warning(
f"Invalid type '{client_type}' for section [{section}]")
@ -218,8 +210,9 @@ class UpdateManager:
kcfg = self.app_config["klipper"]
kcfg.set_option("path", kpath)
kcfg.set_option("env", executable)
kcfg.set_option("type", base_config.get_app_type(kpath))
need_notification = not isinstance(kupdater, AppDeploy)
kclass = get_deploy_class(kpath)
kclass = get_deploy_class(kpath, BaseDeploy)
self.updaters['klipper'] = kclass(kcfg, self.cmd_helper)
coro = self._update_klipper_repo(need_notification)
self.event_loop.create_task(coro)