update_manager: add support for PackageKit updates

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2022-01-22 12:02:12 -05:00 committed by Eric Callahan
parent 744f14d6f9
commit 0e6fbb12b0
1 changed files with 393 additions and 14 deletions

View File

@ -15,7 +15,9 @@ import shutil
import zipfile
import time
import tempfile
import re
from tornado.httpclient import AsyncHTTPClient
from thirdparty.packagekit import enums as PkEnum
from .base_deploy import BaseDeploy
from .app_deploy import AppDeploy
from .git_deploy import GitDeploy
@ -25,6 +27,7 @@ from .zip_deploy import ZipDeploy
from typing import (
TYPE_CHECKING,
Any,
Awaitable,
Optional,
Tuple,
Type,
@ -42,7 +45,11 @@ if TYPE_CHECKING:
from components.shell_command import ShellCommandFactory as SCMDComp
from components.database import MoonrakerDatabase as DBComp
from components.database import NamespaceWrapper
from components.dbus_manager import DbusManager
from components.machine import Machine
from eventloop import FlexTimer
from dbus_next import Variant
from dbus_next.aio import ProxyInterface
JsonType = Union[List[Any], Dict[str, Any]]
MOONRAKER_PATH = os.path.normpath(os.path.join(
@ -176,9 +183,10 @@ class UpdateManager:
async def component_init(self) -> None:
async with self.cmd_request_lock:
for updater in list(self.updaters.values()):
if isinstance(updater, PackageDeploy):
await updater.initialize()
if updater.needs_refresh():
ret = updater.refresh()
await ret
await updater.refresh()
if self.refresh_timer is not None:
self.refresh_timer.start(delay=UPDATE_REFRESH_INTERVAL)
@ -771,15 +779,60 @@ class PackageDeploy(BaseDeploy):
cmd_helper: CommandHelper
) -> None:
super().__init__(config, cmd_helper, "system", "", "")
self.provider = AptCliProvider(config, cmd_helper)
cmd_helper.set_package_updater(self)
storage = self._load_storage()
self.use_packagekit = config.getboolean("enable_packagekit", True)
self.available_packages: List[str] = storage.get('packages', [])
self.refresh_evt: Optional[asyncio.Event] = None
self.mutex: asyncio.Lock = asyncio.Lock()
async def initialize(self) -> None:
provider: BasePackageProvider
try_fallback = True
if self.use_packagekit:
try:
provider = PackageKitProvider(self.cmd_helper)
await provider.initialize()
except Exception:
pass
else:
logging.info("PackageDeploy: Using PackageKit Provider")
try_fallback = False
if try_fallback:
# Check to see of the apt command is available
fallback = await self._get_fallback_provider()
if fallback is None:
provider = BasePackageProvider(self.cmd_helper)
machine: Machine = self.server.lookup_component("machine")
dist_info = machine.get_system_info()['distribution']
dist_id: str = dist_info['id'].lower()
self.server.add_warning(
"Unable to initialize System Update Provider for "
f"distribution: {dist_id}")
else:
logging.info("PackageDeploy: Using APT CLI Provider")
provider = fallback
self.provider = provider
async def _get_fallback_provider(self) -> Optional[BasePackageProvider]:
# Currently only the API Fallback provider is available
shell_cmd: SCMDComp
shell_cmd = self.server.lookup_component("shell_command")
cmd = shell_cmd.build_shell_command("sh -c 'command -v apt'")
try:
ret = await cmd.run_with_response()
except shell_cmd.error:
return None
# APT Command found should be available
logging.debug(f"APT package manager detected: {ret.encode()}")
provider = AptCliProvider(self.cmd_helper)
try:
await provider.initialize()
except Exception:
return None
return provider
async def refresh(self) -> None:
# TODO: Use python-apt python lib rather than command line for updates
if self.refresh_evt is not None:
self.refresh_evt.wait()
return
@ -829,7 +882,7 @@ class PackageDeploy(BaseDeploy):
curtime = time.time()
if force or curtime > self.last_refresh_time + 3600.:
# Don't update if a request was done within the last hour
await self.provider.refresh_packages()
await self.provider.refresh_packages(notify)
async def install_packages(self,
package_list: List[str],
@ -844,27 +897,27 @@ class PackageDeploy(BaseDeploy):
}
class BasePackageProvider:
def __init__(self,
config: ConfigHelper,
cmd_helper: CommandHelper
) -> None:
self.server = config.get_server()
def __init__(self, cmd_helper: CommandHelper) -> None:
self.server = cmd_helper.get_server()
self.cmd_helper = cmd_helper
async def initialize(self) -> None:
pass
async def refresh_packages(self, notify: bool = False) -> None:
raise NotImplementedError("Children must implement refresh_packages()")
raise self.server.error("Cannot refresh packages, no provider set")
async def get_packages(self) -> List[str]:
raise NotImplementedError("Children must implement getpackages()")
raise self.server.error("Cannot retrieve packages, no provider set")
async def install_packages(self,
package_list: List[str],
**kwargs
) -> None:
raise NotImplementedError("Children must implement install_packages()")
raise self.server.error("Cannot install packages, no provider set")
async def upgrade_system(self) -> None:
raise NotImplementedError("Children must implement upgrade_system()")
raise self.server.error("Cannot upgrade packages, no provider set")
class AptCliProvider(BasePackageProvider):
APT_CMD = "sudo DEBIAN_FRONTEND=noninteractive apt-get"
@ -900,6 +953,332 @@ class AptCliProvider(BasePackageProvider):
f"{self.APT_CMD} upgrade --yes", timeout=3600.,
notify=True)
class PackageKitProvider(BasePackageProvider):
def __init__(self, cmd_helper: CommandHelper) -> None:
super().__init__(cmd_helper)
dbus_mgr: DbusManager = self.server.lookup_component("dbus_manager")
self.dbus_mgr = dbus_mgr
self.pkgkit: Optional[ProxyInterface] = None
async def initialize(self) -> None:
if not self.dbus_mgr.is_connected():
raise self.server.error("DBus Connection Not available")
# Check for PolicyKit permissions
await self.dbus_mgr.check_permission(
"org.freedesktop.packagekit.system-sources-refresh",
"The Update Manager will fail to fetch package updates")
await self.dbus_mgr.check_permission(
"org.freedesktop.packagekit.package-install",
"The Update Manager will fail to install packages")
await self.dbus_mgr.check_permission(
"org.freedesktop.packagekit.system-update",
"The Update Manager will fail to update packages"
)
# Fetch the PackageKit DBus Inteface
self.pkgkit = await self.dbus_mgr.get_interface(
"org.freedesktop.PackageKit",
"/org/freedesktop/PackageKit",
"org.freedesktop.PackageKit")
async def refresh_packages(self, notify: bool = False) -> None:
await self.run_transaction("refresh_cache", False, notify=notify)
async def get_packages(self) -> List[str]:
flags = PkEnum.Filter.NONE
pkgs = await self.run_transaction("get_updates", flags.value)
pkg_ids = [info['package_id'] for info in pkgs if 'package_id' in info]
return [pkg_id.split(";")[0] for pkg_id in pkg_ids]
async def install_packages(self,
package_list: List[str],
**kwargs
) -> None:
notify: bool = kwargs.get('notify', False)
await self.refresh_packages(notify=notify)
flags = PkEnum.Filter.NEWEST | PkEnum.Filter.NOT_INSTALLED | \
PkEnum.Filter.BASENAME
pkgs = await self.run_transaction("resolve", flags.value, package_list)
pkg_ids = [info['package_id'] for info in pkgs if 'package_id' in info]
if pkg_ids:
tflag = PkEnum.TransactionFlag.ONLY_TRUSTED
await self.run_transaction("install_packages", tflag.value,
pkg_ids, notify=notify)
async def upgrade_system(self) -> None:
# Get Updates, Install Packages
flags = PkEnum.Filter.NONE
pkgs = await self.run_transaction("get_updates", flags.value)
pkg_ids = [info['package_id'] for info in pkgs if 'package_id' in info]
if pkg_ids:
tflag = PkEnum.TransactionFlag.ONLY_TRUSTED
await self.run_transaction("update_packages", tflag.value,
pkg_ids, notify=True)
def create_transaction(self) -> PackageKitTransaction:
if self.pkgkit is None:
raise self.server.error("PackageKit Interface Not Available")
return PackageKitTransaction(self.dbus_mgr, self.pkgkit,
self.cmd_helper)
async def run_transaction(self,
method: str,
*args,
notify: bool = False
) -> Any:
transaction = self.create_transaction()
return await transaction.run(method, *args, notify=notify)
class PackageKitTransaction:
GET_PKG_ROLES = (
PkEnum.Role.RESOLVE | PkEnum.Role.GET_PACKAGES |
PkEnum.Role.GET_UPDATES
)
QUERY_ROLES = GET_PKG_ROLES | PkEnum.Role.GET_REPO_LIST
PROGRESS_STATUS = (
PkEnum.Status.RUNNING | PkEnum.Status.INSTALL |
PkEnum.Status.UPDATE
)
def __init__(self,
dbus_mgr: DbusManager,
pkgkit: ProxyInterface,
cmd_helper: CommandHelper
) -> None:
self.server = cmd_helper.get_server()
self.eventloop = self.server.get_event_loop()
self.cmd_helper = cmd_helper
self.dbus_mgr = dbus_mgr
self.pkgkit = pkgkit
# Transaction Properties
self.notify = False
self._status = PkEnum.Status.UNKNOWN
self._role = PkEnum.Role.UNKNOWN
self._tflags = PkEnum.TransactionFlag.NONE
self._percentage = 101
self._dl_remaining = 0
self.speed = 0
self.elapsed_time = 0
self.remaining_time = 0
self.caller_active = False
self.allow_cancel = True
self.uid = 0
# Transaction data tracking
self.tfut: Optional[asyncio.Future] = None
self.last_progress_notify_time: float = 0.
self.result: List[Dict[str, Any]] = []
self.err_msg: str = ""
def run(self,
method: str,
*args,
notify: bool = False
) -> Awaitable:
if self.tfut is not None:
raise self.server.error(
"PackageKit transaction can only be used once")
self.notify = notify
self.tfut = self.eventloop.create_future()
coro = self._start_transaction(method, *args)
self.eventloop.create_task(coro)
return self.tfut
async def _start_transaction(self,
method: str,
*args
) -> None:
assert self.tfut is not None
try:
# Create Transaction
tid = await self.pkgkit.call_create_transaction() # type: ignore
transaction, props = await self.dbus_mgr.get_interfaces(
"org.freedesktop.PackageKit", tid,
["org.freedesktop.PackageKit.Transaction",
"org.freedesktop.DBus.Properties"])
# Set interface callbacks
transaction.on_package(self._on_package_signal) # type: ignore
transaction.on_repo_detail( # type: ignore
self._on_repo_detail_signal)
transaction.on_item_progress( # type: ignore
self._on_item_progress_signal)
transaction.on_error_code(self._on_error_signal) # type: ignore
transaction.on_finished(self._on_finished_signal) # type: ignore
props.on_properties_changed( # type: ignore
self._on_properties_changed)
# Run method
logging.debug(f"PackageKit: Running transaction call_{method}")
func = getattr(transaction, f"call_{method}")
await func(*args)
except Exception as e:
self.tfut.set_exception(e)
def _on_package_signal(self,
info_code: int,
package_id: str,
summary: str
) -> None:
info = PkEnum.Info.from_index(info_code)
if self._role in self.GET_PKG_ROLES:
pkg_data = {
'package_id': package_id,
'info': info.desc,
'summary': summary
}
self.result.append(pkg_data)
else:
self._notify_package(info, package_id)
def _on_repo_detail_signal(self,
repo_id: str,
description: str,
enabled: bool
) -> None:
if self._role == PkEnum.Role.GET_REPO_LIST:
repo_data = {
"repo_id": repo_id,
"description": description,
"enabled": enabled
}
self.result.append(repo_data)
else:
self._notify_repo(repo_id, description)
def _on_item_progress_signal(self,
item_id: str,
status_code: int,
percent_complete: int
) -> None:
status = PkEnum.Status.from_index(status_code)
# NOTE: This signal doesn't seem to fire predictably,
# nor does it seem to provide a consistent "percent complete"
# parameter.
# logging.debug(
# f"Role {self._role.name}: Item Progress Signal Received\n"
# f"Item ID: {item_id}\n"
# f"Percent Complete: {percent_complete}\n"
# f"Status: {status.desc}")
def _on_error_signal(self,
error_code: int,
details: str
) -> None:
err = PkEnum.Error.from_index(error_code)
self.err_msg = f"{err.name}: {details}"
def _on_finished_signal(self, exit_code: int, run_time: int) -> None:
if self.tfut is None:
return
ext = PkEnum.Exit.from_index(exit_code)
secs = run_time / 1000.
if ext == PkEnum.Exit.SUCCESS:
self.tfut.set_result(self.result)
else:
err = self.err_msg or ext.desc
server = self.cmd_helper.get_server()
self.tfut.set_exception(server.error(err))
msg = f"Transaction {self._role.desc}: Exit {ext.desc}, " \
f"Run time: {secs:.2f} seconds"
if self.notify:
self.cmd_helper.notify_update_response(msg)
logging.debug(msg)
def _on_properties_changed(self,
iface_name: str,
changed_props: Dict[str, Variant],
invalid_props: Dict[str, Variant]
) -> None:
for name, var in changed_props.items():
formatted = re.sub(r"(\w)([A-Z])", r"\g<1>_\g<2>", name).lower()
setattr(self, formatted, var.value)
def _notify_package(self, info: PkEnum.Info, package_id: str) -> None:
if self.notify:
if info == PkEnum.Info.FINISHED:
return
pkg_parts = package_id.split(";")
msg = f"{info.desc}: {pkg_parts[0]} ({pkg_parts[1]})"
self.cmd_helper.notify_update_response(msg)
def _notify_repo(self, repo_id: str, description: str) -> None:
if self.notify:
if not repo_id.strip():
repo_id = description
# TODO: May want to eliminate dups
msg = f"GET: {repo_id}"
self.cmd_helper.notify_update_response(msg)
def _notify_progress(self) -> None:
if self.notify and self._percentage <= 100:
msg = f"{self._status.desc}...{self._percentage}%"
if self._status == PkEnum.Status.DOWNLOAD and self._dl_remaining:
if self._dl_remaining < 1024:
msg += f", Remaining: {self._dl_remaining} B"
elif self._dl_remaining < 1048576:
msg += f", Remaining: {self._dl_remaining // 1024} KiB"
else:
msg += f", Remaining: {self._dl_remaining // 1048576} MiB"
if self.speed:
speed = self.speed // 8
if speed < 1024:
msg += f", Speed: {speed} B/s"
elif speed < 1048576:
msg += f", Speed: {speed // 1024} KiB/s"
else:
msg += f", Speed: {speed // 1048576} MiB/s"
self.cmd_helper.notify_update_response(msg)
@property
def role(self) -> PkEnum.Role:
return self._role
@role.setter
def role(self, role_code: int) -> None:
self._role = PkEnum.Role.from_index(role_code)
if self._role in self.QUERY_ROLES:
# Never Notify Queries
self.notify = False
if self.notify:
msg = f"Transaction {self._role.desc} started..."
self.cmd_helper.notify_update_response(msg)
logging.debug(f"PackageKit: Current Role: {self._role.desc}")
@property
def status(self) -> PkEnum.Status:
return self._status
@status.setter
def status(self, status_code: int) -> None:
self._status = PkEnum.Status.from_index(status_code)
self._percentage = 101
self.speed = 0
logging.debug(f"PackageKit: Current Status: {self._status.desc}")
@property
def transaction_flags(self) -> PkEnum.TransactionFlag:
return self._tflags
@transaction_flags.setter
def transaction_flags(self, bits: int) -> None:
self._tflags = PkEnum.TransactionFlag(bits)
@property
def percentage(self) -> int:
return self._percentage
@percentage.setter
def percentage(self, percent: int) -> None:
self._percentage = percent
if self._status in self.PROGRESS_STATUS:
self._notify_progress()
@property
def download_size_remaining(self) -> int:
return self._dl_remaining
@download_size_remaining.setter
def download_size_remaining(self, bytes_remaining: int) -> None:
self._dl_remaining = bytes_remaining
self._notify_progress()
class WebClientDeploy(BaseDeploy):
def __init__(self,
config: ConfigHelper,