update_manager: create a CommandHelper class

This class handles running shell commands and sending http API requests.  Each updater class  shares an instance of the command helper rather than the UpdateManager instance.

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Arksine 2021-03-07 20:07:12 -05:00
parent 22b3a953c5
commit 9632465a78
1 changed files with 140 additions and 117 deletions

View File

@ -39,20 +39,18 @@ class UpdateManager:
self.server = config.get_server()
self.config = config
self.config.read_supplemental_config(SUPPLEMENTAL_CFG_PATH)
self.repo_debug = config.getboolean('enable_repo_debug', False)
auto_refresh_enabled = config.getboolean('enable_auto_refresh', False)
self.distro = config.get('distro', "debian").lower()
if self.distro not in SUPPORTED_DISTROS:
raise config.error(f"Unsupported distro: {self.distro}")
if self.repo_debug:
logging.warn("UPDATE MANAGER: REPO DEBUG ENABLED")
self.cmd_helper = CommandHelper(config)
env = sys.executable
mooncfg = self.config[f"update_manager static {self.distro} moonraker"]
self.updaters = {
"system": PackageUpdater(self),
"moonraker": GitUpdater(self, mooncfg, MOONRAKER_PATH, env)
"system": PackageUpdater(self.cmd_helper),
"moonraker": GitUpdater(mooncfg, self.cmd_helper,
MOONRAKER_PATH, env)
}
self.current_update = None
# TODO: Check for client config in [update_manager]. This is
# deprecated and will be removed.
client_repo = config.get("client_repo", None)
@ -60,7 +58,8 @@ class UpdateManager:
client_path = config.get("client_path")
name = client_repo.split("/")[-1]
self.updaters[name] = WebUpdater(
self, {'repo': client_repo, 'path': client_path})
{'repo': client_repo, 'path': client_path},
self.cmd_helper)
client_sections = self.config.get_prefix_sections(
"update_manager client")
for section in client_sections:
@ -71,18 +70,13 @@ class UpdateManager:
% (name,))
client_type = cfg.get("type")
if client_type == "git_repo":
self.updaters[name] = GitUpdater(self, cfg)
self.updaters[name] = GitUpdater(cfg, self.cmd_helper)
elif client_type == "web":
self.updaters[name] = WebUpdater(self, cfg)
self.updaters[name] = WebUpdater(cfg, self.cmd_helper)
else:
raise config.error("Invalid type '%s' for section [%s]"
% (client_type, section))
# GitHub API Rate Limit Tracking
self.gh_rate_limit = None
self.gh_limit_remaining = None
self.gh_limit_reset_time = None
self.gh_init_evt = Event()
self.cmd_request_lock = Lock()
self.is_refreshing = False
@ -94,9 +88,6 @@ class UpdateManager:
self._handle_auto_refresh, UPDATE_REFRESH_INTERVAL_MS)
self.refresh_cb.start()
AsyncHTTPClient.configure(None, defaults=dict(user_agent="Moonraker"))
self.http_client = AsyncHTTPClient()
self.server.register_endpoint(
"/machine/update/moonraker", ["POST"],
self._handle_update_request)
@ -124,7 +115,7 @@ class UpdateManager:
async def _initalize_updaters(self, initial_updaters):
self.is_refreshing = True
await self._init_api_rate_limit()
await self.cmd_helper.init_api_rate_limit()
for updater in initial_updaters:
if isinstance(updater, PackageUpdater):
ret = updater.refresh(False)
@ -147,7 +138,7 @@ class UpdateManager:
# Current Klipper Updater is valid
return
kcfg = self.config[f"update_manager static {self.distro} klipper"]
self.updaters['klipper'] = GitUpdater(self, kcfg, kpath, env)
self.updaters['klipper'] = GitUpdater(kcfg, self.cmd_helper, kpath, env)
await self.updaters['klipper'].refresh()
async def _check_klippy_printing(self):
@ -188,12 +179,9 @@ class UpdateManager:
return
finally:
self.is_refreshing = False
uinfo = {
'version_info': vinfo,
'github_rate_limit': self.gh_rate_limit,
'github_requests_remaining': self.gh_limit_remaining,
'github_limit_reset_time': self.gh_limit_reset_time,
'busy': self.current_update is not None}
uinfo = self.cmd_helper.get_rate_limit_stats()
uinfo['version_info'] = vinfo
uinfo['busy'] = self.cmd_helper.is_update_busy()
self.server.send_event("update_manager:update_refreshed", uinfo)
async def _handle_update_request(self, web_request):
@ -203,22 +191,23 @@ class UpdateManager:
if app == "client":
app = web_request.get('name')
inc_deps = web_request.get_boolean('include_deps', False)
if self.current_update is not None and \
self.current_update[0] == app:
if self.cmd_helper.is_app_updating(app):
return f"Object {app} is currently being updated"
updater = self.updaters.get(app, None)
if updater is None:
raise self.server.error(f"Updater {app} not available")
async with self.cmd_request_lock:
self.current_update = (app, id(web_request))
self.cmd_helper.set_update_info(app, id(web_request))
try:
await updater.update(inc_deps)
except Exception as e:
self.notify_update_response(f"Error updating {app}")
self.notify_update_response(str(e), is_complete=True)
self.cmd_helper.notify_update_response(
f"Error updating {app}")
self.cmd_helper.notify_update_response(
str(e), is_complete=True)
raise
finally:
self.current_update = None
self.cmd_helper.clear_update_info()
return "ok"
async def _handle_status_request(self, web_request):
@ -226,7 +215,7 @@ class UpdateManager:
# Don't refresh if a print is currently in progress or
# if an update is in progress. Just return the current
# state
if self.current_update is not None or \
if self.cmd_helper.is_update_busy() or \
await self._check_klippy_printing():
check_refresh = False
need_refresh = False
@ -252,42 +241,64 @@ class UpdateManager:
if check_refresh:
self.is_refreshing = False
self.cmd_request_lock.release()
ret = self.cmd_helper.get_rate_limit_stats()
ret['version_info'] = vinfo
ret['busy'] = self.cmd_helper.is_update_busy()
return ret
def close(self):
self.cmd_helper.close()
if self.refresh_cb is not None:
self.refresh_cb.stop()
class CommandHelper:
def __init__(self, config):
self.server = config.get_server()
self.debug_enabled = config.getboolean('enable_repo_debug', False)
if self.debug_enabled:
logging.warn("UPDATE MANAGER: REPO DEBUG ENABLED")
shell_command = self.server.lookup_plugin('shell_command')
self.build_shell_command = shell_command.build_shell_command
AsyncHTTPClient.configure(None, defaults=dict(user_agent="Moonraker"))
self.http_client = AsyncHTTPClient()
# GitHub API Rate Limit Tracking
self.gh_rate_limit = None
self.gh_limit_remaining = None
self.gh_limit_reset_time = None
self.gh_init_evt = Event()
# Update In Progress Tracking
self.cur_update_app = self.cur_update_id = None
def get_server(self):
return self.server
def is_debug_enabled(self):
return self.debug_enabled
def set_update_info(self, app, uid):
self.cur_update_app = app
self.cur_update_id = uid
def clear_update_info(self):
self.cur_update_app = self.cur_update_id = None
def is_app_updating(self, app_name):
return self.cur_update_app == app_name
def is_update_busy(self):
return self.cur_update_app is not None
def get_rate_limit_stats(self):
return {
'version_info': vinfo,
'github_rate_limit': self.gh_rate_limit,
'github_requests_remaining': self.gh_limit_remaining,
'github_limit_reset_time': self.gh_limit_reset_time,
'busy': self.current_update is not None}
}
async def execute_cmd(self, cmd, timeout=10., notify=False,
retries=1, env=None):
shell_command = self.server.lookup_plugin('shell_command')
if env is not None:
os_env = dict(os.environ)
os_env.update(env)
env = os_env
cb = self.notify_update_response if notify else None
scmd = shell_command.build_shell_command(cmd, callback=cb, env=env)
while retries:
if await scmd.run(timeout=timeout, verbose=notify):
break
retries -= 1
if not retries:
raise self.server.error("Shell Command Error")
async def execute_cmd_with_response(self, cmd, timeout=10., env=None):
shell_command = self.server.lookup_plugin('shell_command')
if env is not None:
os_env = dict(os.environ)
os_env.update(env)
env = os_env
scmd = shell_command.build_shell_command(cmd, None, env=env)
result = await scmd.run_with_response(timeout, retries=5)
if result is None:
raise self.server.error(f"Error Running Command: {cmd}")
return result
async def _init_api_rate_limit(self):
async def init_api_rate_limit(self):
url = "https://api.github.com/rate_limit"
while 1:
try:
@ -310,6 +321,24 @@ class UpdateManager:
break
self.gh_init_evt.set()
async def run_cmd(self, cmd, timeout=10., notify=False,
retries=1, env=None):
cb = self.notify_update_response if notify else None
scmd = self.build_shell_command(cmd, callback=cb, env=env)
while retries:
if await scmd.run(timeout=timeout):
break
retries -= 1
if not retries:
raise self.server.error("Shell Command Error")
async def run_cmd_with_response(self, cmd, timeout=10., env=None):
scmd = self.build_shell_command(cmd, None, env=env)
result = await scmd.run_with_response(timeout, retries=5)
if result is None:
raise self.server.error(f"Error Running Command: {cmd}")
return result
async def github_api_request(self, url, etag=None, is_init=False):
if not is_init:
timeout = time.time() + 30.
@ -404,27 +433,19 @@ class UpdateManager:
resp = resp.decode()
notification = {
'message': resp,
'application': None,
'proc_id': None,
'application': self.cur_update_app,
'proc_id': self.cur_update_id,
'complete': is_complete}
if self.current_update is not None:
notification['application'] = self.current_update[0]
notification['proc_id'] = self.current_update[1]
self.server.send_event(
"update_manager:update_response", notification)
def close(self):
self.http_client.close()
if self.refresh_cb is not None:
self.refresh_cb.stop()
class GitUpdater:
def __init__(self, umgr, config, path=None, env=None):
self.server = umgr.server
self.execute_cmd = umgr.execute_cmd
self.execute_cmd_with_response = umgr.execute_cmd_with_response
self.notify_update_response = umgr.notify_update_response
def __init__(self, config, cmd_helper, path=None, env=None):
self.server = cmd_helper.get_server()
self.cmd_helper = cmd_helper
self.name = config.get_name().split()[-1]
self.owner = "?"
self.repo_path = path
@ -474,7 +495,7 @@ class GitUpdater:
self.remote_version = self.remote_hash = "?"
self.init_evt = Event()
self.refresh_condition = None
self.debug = umgr.repo_debug
self.debug = self.cmd_helper.is_debug_enabled()
self.remote = "origin"
self.branch = "master"
self.is_valid = self.is_dirty = self.detached = False
@ -513,7 +534,7 @@ class GitUpdater:
def _notify_status(self, msg, is_complete=False):
log_msg = f"Repo {self.name}: {msg}"
logging.debug(log_msg)
self.notify_update_response(log_msg, is_complete)
self.cmd_helper.notify_update_response(log_msg, is_complete)
async def check_initialized(self, timeout=None):
if self.init_evt.is_set():
@ -541,7 +562,7 @@ class GitUpdater:
self.cur_hash = self.branch = self.remote = "?"
self.version = self.remote_version = self.owner = "?"
try:
blist = await self.execute_cmd_with_response(
blist = await self.cmd_helper.run_cmd_with_response(
f"git -C {self.repo_path} branch --list")
if blist.startswith("fatal:"):
self._log_info(f"Invalid git repo at path '{self.repo_path}'")
@ -563,7 +584,7 @@ class GitUpdater:
self.detached = True
else:
self.branch = branch.strip()
self.remote = await self.execute_cmd_with_response(
self.remote = await self.cmd_helper.run_cmd_with_response(
f"git -C {self.repo_path} config --get"
f" branch.{self.branch}.remote")
if need_fetch:
@ -571,20 +592,20 @@ class GitUpdater:
'GIT_HTTP_LOW_SPEED_LIMIT': "1000",
'GIT_HTTP_LOW_SPEED_TIME ': "15"
}
await self.execute_cmd(
await self.cmd_helper.run_cmd(
f"git -C {self.repo_path} fetch {self.remote} --prune -q",
timeout=20., retries=3, env=env)
remote_url = await self.execute_cmd_with_response(
remote_url = await self.cmd_helper.run_cmd_with_response(
f"git -C {self.repo_path} remote get-url {self.remote}")
cur_hash = await self.execute_cmd_with_response(
cur_hash = await self.cmd_helper.run_cmd_with_response(
f"git -C {self.repo_path} rev-parse HEAD")
remote_hash = await self.execute_cmd_with_response(
remote_hash = await self.cmd_helper.run_cmd_with_response(
f"git -C {self.repo_path} rev-parse "
f"{self.remote}/{self.branch}")
repo_version = await self.execute_cmd_with_response(
repo_version = await self.cmd_helper.run_cmd_with_response(
f"git -C {self.repo_path} describe --always "
"--tags --long --dirty")
remote_version = await self.execute_cmd_with_response(
remote_version = await self.cmd_helper.run_cmd_with_response(
f"git -C {self.repo_path} describe {self.remote}/{self.branch}"
" --always --tags --long")
except Exception:
@ -652,14 +673,14 @@ class GitUpdater:
'GIT_HTTP_LOW_SPEED_TIME ': "15"
}
if self.detached:
await self.execute_cmd(
await self.cmd_helper.run_cmd(
f"git -C {self.repo_path} fetch {self.remote} -q",
timeout=20., retries=3, env=env)
await self.execute_cmd(
await self.cmd_helper.run_cmd(
f"git -C {self.repo_path} checkout"
f" {self.remote}/{self.branch} -q")
else:
await self.execute_cmd(
await self.cmd_helper.run_cmd(
f"git -C {self.repo_path} pull -q", timeout=20.,
retries=3, env=env)
except Exception:
@ -707,9 +728,9 @@ class GitUpdater:
self._notify_status("Installing system dependencies...")
# Install packages with apt-get
try:
await self.execute_cmd(
await self.cmd_helper.run_cmd(
f"{APT_CMD} update", timeout=300., notify=True)
await self.execute_cmd(
await self.cmd_helper.run_cmd(
f"{APT_CMD} install --yes {pkgs}", timeout=3600.,
notify=True)
except Exception:
@ -727,7 +748,7 @@ class GitUpdater:
if os.path.exists(env_path):
shutil.rmtree(env_path)
try:
await self.execute_cmd(
await self.cmd_helper.run_cmd(
f"virtualenv {self.venv_args} {env_path}", timeout=300.)
except Exception:
self._log_exc(f"Error creating virtualenv")
@ -741,7 +762,7 @@ class GitUpdater:
pip = os.path.join(bin_dir, "pip")
self._notify_status("Updating python packages...")
try:
await self.execute_cmd(
await self.cmd_helper.run_cmd(
f"{pip} install -r {reqs}", timeout=1200., notify=True,
retries=3)
except Exception:
@ -773,7 +794,8 @@ class GitUpdater:
async def restart_service(self):
self._notify_status("Restarting Service...")
try:
await self.execute_cmd(f"sudo systemctl restart {self.name}")
await self.cmd_helper.run_cmd(
f"sudo systemctl restart {self.name}")
except Exception:
raise self._log_exc("Error restarting service")
@ -789,15 +811,14 @@ class GitUpdater:
'is_dirty': self.is_dirty,
'is_valid': self.is_valid,
'detached': self.detached,
'debug_enabled': self.debug}
'debug_enabled': self.debug
}
class PackageUpdater:
def __init__(self, umgr):
self.server = umgr.server
self.execute_cmd = umgr.execute_cmd
self.execute_cmd_with_response = umgr.execute_cmd_with_response
self.notify_update_response = umgr.notify_update_response
def __init__(self, cmd_helper):
self.server = cmd_helper.get_server()
self.cmd_helper = cmd_helper
self.available_packages = []
self.init_evt = Event()
self.refresh_condition = None
@ -811,9 +832,9 @@ class PackageUpdater:
return
try:
if fetch_packages:
await self.execute_cmd(
await self.cmd_helper.run_cmd(
f"{APT_CMD} update", timeout=300., retries=3)
res = await self.execute_cmd_with_response(
res = await self.cmd_helper.run_cmd_with_response(
"apt list --upgradable", timeout=60.)
pkg_list = [p.strip() for p in res.split("\n") if p.strip()]
if pkg_list:
@ -841,17 +862,17 @@ class PackageUpdater:
await self.check_initialized(20.)
if self.refresh_condition is not None:
self.refresh_condition.wait()
self.notify_update_response("Updating packages...")
self.cmd_helper.notify_update_response("Updating packages...")
try:
await self.execute_cmd(
await self.cmd_helper.run_cmd(
f"{APT_CMD} update", timeout=300., notify=True)
await self.execute_cmd(
await self.cmd_helper.run_cmd(
f"{APT_CMD} upgrade --yes", timeout=3600., notify=True)
except Exception:
raise self.server.error("Error updating system packages")
self.available_packages = []
self.notify_update_response("Package update finished...",
is_complete=True)
self.cmd_helper.notify_update_response("Package update finished...",
is_complete=True)
def get_update_status(self):
return {
@ -860,10 +881,9 @@ class PackageUpdater:
}
class WebUpdater:
def __init__(self, umgr, config):
self.umgr = umgr
self.server = umgr.server
self.notify_update_response = umgr.notify_update_response
def __init__(self, config, cmd_helper):
self.server = cmd_helper.get_server()
self.cmd_helper = cmd_helper
self.repo = config.get('repo').strip().strip("/")
self.owner, self.name = self.repo.split("/", 1)
if hasattr(config, "get_name"):
@ -922,7 +942,8 @@ class WebUpdater:
# Remote state
url = f"https://api.github.com/repos/{self.repo}/releases/latest"
try:
result = await self.umgr.github_api_request(url, etag=self.etag)
result = await self.cmd_helper.github_api_request(
url, etag=self.etag)
except Exception:
logging.exception(f"Client {self.repo}: Github Request Error")
result = {}
@ -955,8 +976,9 @@ class WebUpdater:
if self.version == self.remote_version:
# Already up to date
return
self.notify_update_response(f"Downloading Client: {self.name}")
archive = await self.umgr.http_download_request(self.dl_url)
self.cmd_helper.notify_update_response(
f"Downloading Client: {self.name}")
archive = await self.cmd_helper.http_download_request(self.dl_url)
with tempfile.TemporaryDirectory(
suffix=self.name, prefix="client") as tempdir:
if os.path.isdir(self.path):
@ -964,7 +986,8 @@ class WebUpdater:
for fname in os.listdir(self.path):
src_path = os.path.join(self.path, fname)
if fname in self.persistent_files:
dest_dir = os.path.dirname(os.path.join(tempdir, fname))
dest_dir = os.path.dirname(
os.path.join(tempdir, fname))
os.makedirs(dest_dir, exist_ok=True)
shutil.move(src_path, dest_dir)
shutil.rmtree(self.path)
@ -982,8 +1005,8 @@ class WebUpdater:
if not os.path.exists(version_path):
with open(version_path, "w") as f:
f.write(self.version)
self.notify_update_response(f"Client Update Finished: {self.name}",
is_complete=True)
self.cmd_helper.notify_update_response(
f"Client Update Finished: {self.name}", is_complete=True)
def get_update_status(self):
return {