update_manager: use temporary file storage for iids
The default behavior of systemd is to remove shared memory files when the user that created them logs out. This breaks instance ID tracking for most installations. While its possible to overcome this by changing the user type or the logind configuration, these solutions required elevated privileges. The simple solution is to store iids in a temporary file, with access protected by a flock. Signed-off-by: Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
parent
683d93a894
commit
379a26600c
|
@ -10,6 +10,7 @@ import os
|
||||||
import logging
|
import logging
|
||||||
import time
|
import time
|
||||||
import tempfile
|
import tempfile
|
||||||
|
import pathlib
|
||||||
from .common import AppType, get_base_configuration, get_app_type
|
from .common import AppType, get_base_configuration, get_app_type
|
||||||
from .base_deploy import BaseDeploy
|
from .base_deploy import BaseDeploy
|
||||||
from .app_deploy import AppDeploy
|
from .app_deploy import AppDeploy
|
||||||
|
@ -17,6 +18,7 @@ from .git_deploy import GitDeploy
|
||||||
from .zip_deploy import ZipDeploy
|
from .zip_deploy import ZipDeploy
|
||||||
from .system_deploy import PackageDeploy
|
from .system_deploy import PackageDeploy
|
||||||
from ...common import RequestType
|
from ...common import RequestType
|
||||||
|
from ...utils.filelock import AsyncExclusiveFileLock, LockTimeout
|
||||||
|
|
||||||
# Annotation imports
|
# Annotation imports
|
||||||
from typing import (
|
from typing import (
|
||||||
|
@ -179,7 +181,7 @@ class UpdateManager:
|
||||||
return self.updaters
|
return self.updaters
|
||||||
|
|
||||||
async def component_init(self) -> None:
|
async def component_init(self) -> None:
|
||||||
self.instance_tracker.set_instance_id()
|
await self.instance_tracker.set_instance_id()
|
||||||
# Prune stale data from the database
|
# Prune stale data from the database
|
||||||
umdb = self.cmd_helper.get_umdb()
|
umdb = self.cmd_helper.get_umdb()
|
||||||
db_keys = await umdb.keys()
|
db_keys = await umdb.keys()
|
||||||
|
@ -497,7 +499,7 @@ class UpdateManager:
|
||||||
async def close(self) -> None:
|
async def close(self) -> None:
|
||||||
if self.refresh_timer is not None:
|
if self.refresh_timer is not None:
|
||||||
self.refresh_timer.stop()
|
self.refresh_timer.stop()
|
||||||
self.instance_tracker.close()
|
await self.instance_tracker.close()
|
||||||
for updater in self.updaters.values():
|
for updater in self.updaters.values():
|
||||||
ret = updater.close()
|
ret = updater.close()
|
||||||
if ret is not None:
|
if ret is not None:
|
||||||
|
@ -671,88 +673,63 @@ class CommandHelper:
|
||||||
class InstanceTracker:
|
class InstanceTracker:
|
||||||
def __init__(self, server: Server) -> None:
|
def __init__(self, server: Server) -> None:
|
||||||
self.server = server
|
self.server = server
|
||||||
self.inst_id = b""
|
self.inst_id = ""
|
||||||
self.shm = self._try_open_shm()
|
tmpdir = pathlib.Path(tempfile.gettempdir())
|
||||||
|
self.inst_file_path = tmpdir.joinpath("moonraker_instance_ids")
|
||||||
|
|
||||||
def _try_open_shm(self) -> Any:
|
def get_instance_id(self) -> str:
|
||||||
prev_mask = os.umask(0)
|
|
||||||
try:
|
|
||||||
from multiprocessing.shared_memory import SharedMemory
|
|
||||||
setattr(SharedMemory, "_mode", 438)
|
|
||||||
try:
|
|
||||||
return SharedMemory("moonraker_instance_ids", True, 4096)
|
|
||||||
except FileExistsError:
|
|
||||||
return SharedMemory("moonraker_instance_ids")
|
|
||||||
except Exception as e:
|
|
||||||
self.server.add_log_rollover_item(
|
|
||||||
"um_multi_instance_msg",
|
|
||||||
"Failed to open shared memory, update_manager instance tracking "
|
|
||||||
f"disabled.\n{e.__class__.__name__}: {e}"
|
|
||||||
)
|
|
||||||
return None
|
|
||||||
finally:
|
|
||||||
os.umask(prev_mask)
|
|
||||||
|
|
||||||
def get_instance_id(self) -> bytes:
|
|
||||||
machine: Machine = self.server.lookup_component("machine")
|
machine: Machine = self.server.lookup_component("machine")
|
||||||
cur_name = "".join(machine.unit_name.split())
|
cur_name = "".join(machine.unit_name.split())
|
||||||
cur_uuid: str = self.server.get_app_args()["instance_uuid"]
|
cur_uuid: str = self.server.get_app_args()["instance_uuid"]
|
||||||
pid = os.getpid()
|
pid = os.getpid()
|
||||||
return f"{cur_name}:{cur_uuid}:{pid}".encode(errors="ignore")
|
return f"{cur_name}:{cur_uuid}:{pid}"
|
||||||
|
|
||||||
def _read_instance_ids(self) -> List[bytes]:
|
async def _read_instance_ids(self) -> List[str]:
|
||||||
if self.shm is not None:
|
if not self.inst_file_path.exists():
|
||||||
try:
|
|
||||||
data = bytearray(self.shm.buf)
|
|
||||||
idx = data.find(b"\x00")
|
|
||||||
if idx > 1:
|
|
||||||
return bytes(data[:idx]).strip().splitlines()
|
|
||||||
except Exception:
|
|
||||||
logging.exception("Failed to Read Shared Memory")
|
|
||||||
return []
|
return []
|
||||||
|
eventloop = self.server.get_event_loop()
|
||||||
|
id_data = await eventloop.run_in_thread(self.inst_file_path.read_text)
|
||||||
|
return [iid.strip() for iid in id_data.strip().splitlines() if iid.strip()]
|
||||||
|
|
||||||
def set_instance_id(self) -> None:
|
async def set_instance_id(self) -> None:
|
||||||
if self.shm is None:
|
try:
|
||||||
return
|
async with AsyncExclusiveFileLock(self.inst_file_path, 2.):
|
||||||
self.inst_id = self.get_instance_id()
|
self.inst_id = self.get_instance_id()
|
||||||
iids = self._read_instance_ids()
|
iids = await self._read_instance_ids()
|
||||||
if self.inst_id not in iids:
|
if self.inst_id not in iids:
|
||||||
iids.append(self.inst_id)
|
iids.append(self.inst_id)
|
||||||
|
iid_string = "\n".join(iids)
|
||||||
if len(iids) > 1:
|
if len(iids) > 1:
|
||||||
id_str = "\n".join([iid.decode(errors="ignore") for iid in iids])
|
|
||||||
self.server.add_log_rollover_item(
|
self.server.add_log_rollover_item(
|
||||||
"um_multi_instance_msg",
|
"um_multi_instance_msg",
|
||||||
"Multiple instances of Moonraker have the update manager enabled."
|
"Multiple instances of Moonraker have the update "
|
||||||
f"\n{id_str}"
|
f"manager enabled.\n{iid_string}"
|
||||||
)
|
)
|
||||||
encoded_ids = b"\n".join(iids) + b"\x00"
|
eventloop = self.server.get_event_loop()
|
||||||
if len(encoded_ids) > self.shm.size:
|
await eventloop.run_in_thread(
|
||||||
iid = self.inst_id.decode(errors="ignore")
|
self.inst_file_path.write_text, iid_string
|
||||||
logging.info(f"Not enough storage in shared memory for id {iid}")
|
)
|
||||||
return
|
except LockTimeout as e:
|
||||||
try:
|
logging.info(str(e))
|
||||||
buf: memoryview = self.shm.buf
|
|
||||||
buf[:len(encoded_ids)] = encoded_ids
|
|
||||||
except Exception:
|
except Exception:
|
||||||
logging.exception("Failed to Write Shared Memory")
|
logging.exception("Failed to set instance id")
|
||||||
|
|
||||||
def close(self) -> None:
|
async def close(self) -> None:
|
||||||
if self.shm is None:
|
try:
|
||||||
return
|
async with AsyncExclusiveFileLock(self.inst_file_path, 2.):
|
||||||
# Remove current id and clean up shared memory
|
# Remove current id
|
||||||
iids = self._read_instance_ids()
|
iids = await self._read_instance_ids()
|
||||||
if self.inst_id in iids:
|
if self.inst_id in iids:
|
||||||
iids.remove(self.inst_id)
|
iids.remove(self.inst_id)
|
||||||
try:
|
iid_string = "\n".join(iids)
|
||||||
buf: memoryview = self.shm.buf
|
eventloop = self.server.get_event_loop()
|
||||||
null_len = min(self.shm.size, max(len(self.inst_id), 10))
|
await eventloop.run_in_thread(
|
||||||
data = b"\n".join(iids) + b"\x00" if iids else b"\x00" * null_len
|
self.inst_file_path.write_text, iid_string
|
||||||
buf[:len(data)] = data
|
)
|
||||||
self.shm.close()
|
except LockTimeout as e:
|
||||||
if not iids:
|
logging.info(str(e))
|
||||||
self.shm.unlink()
|
|
||||||
except Exception:
|
except Exception:
|
||||||
logging.exception("Failed to write/close shared memory")
|
logging.exception("Failed to remove instance id")
|
||||||
|
|
||||||
|
|
||||||
def load_component(config: ConfigHelper) -> UpdateManager:
|
def load_component(config: ConfigHelper) -> UpdateManager:
|
||||||
|
|
Loading…
Reference in New Issue