file_manager: improve notification sync

Most requests do not require waiting for inotify to process the
file event before exiting.  Rework the NotifySyncLock to make this
optional.  Replace the write mutex with the sync lock itself,
a guarantee that the synchronization state is cleaned up when
a request is complete.

This implementation avoids potential deadlocks or long wait times
when inotify is not enabled on a file system.

Signed-off-by: Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2023-02-15 14:26:06 -05:00
parent 87dba2f2e2
commit 9f08aad6f6
No known key found for this signature in database
GPG Key ID: 5A1EB336DFB4C71B
1 changed files with 229 additions and 126 deletions

View File

@ -77,10 +77,10 @@ class FileManager:
"systemd", self.datapath.joinpath("systemd"), False
)
self.gcode_metadata = MetadataStorage(config, db)
self.inotify_handler = INotifyHandler(config, self,
self.gcode_metadata)
self.write_mutex = asyncio.Lock()
self.notify_sync_lock: Optional[NotifySyncLock] = None
self.sync_lock = NotifySyncLock(config)
self.inotify_handler = INotifyHandler(
config, self, self.gcode_metadata, self.sync_lock
)
self.fixed_path_args: Dict[str, Any] = {}
self.queue_gcodes: bool = config.getboolean('queue_gcode_uploads',
False)
@ -345,12 +345,6 @@ class FileManager:
def upload_queue_enabled(self) -> bool:
return self.queue_gcodes
def sync_inotify_event(self, path: str) -> Optional[NotifySyncLock]:
if self.notify_sync_lock is None or \
not self.notify_sync_lock.check_need_sync(path):
return None
return self.notify_sync_lock
async def _handle_filelist_request(self,
web_request: WebRequest
) -> List[Dict[str, Any]]:
@ -413,13 +407,16 @@ class FileManager:
# Get list of files and subdirectories for this target
dir_info = self._list_directory(dir_path, root, is_extended)
return dir_info
async with self.write_mutex:
async with self.sync_lock:
self.check_reserved_path(dir_path, True)
rel_dir = self.get_relative_path(root, dir_path)
result = {
'item': {'path': directory, 'root': root},
'action': "create_dir"}
'item': {'path': rel_dir, 'root': root},
'action': "create_dir"
}
if action == 'POST' and root in self.full_access_roots:
# Create a new directory
self.sync_lock.setup("create_dir", dir_path)
try:
os.mkdir(dir_path)
except Exception as e:
@ -433,21 +430,17 @@ class FileManager:
if not os.path.isdir(dir_path):
raise self.server.error(
f"Directory does not exist ({directory})")
self.sync_lock.setup("delete_dir", dir_path)
force = web_request.get_boolean('force', False)
if force:
# Make sure that the directory does not contain a file
# loaded by the virtual_sdcard
self._handle_operation_check(dir_path)
self.notify_sync_lock = NotifySyncLock(dir_path)
try:
await self.event_loop.run_in_thread(
shutil.rmtree, dir_path)
except Exception:
self.notify_sync_lock.cancel()
self.notify_sync_lock = None
raise
await self.notify_sync_lock.wait(30.)
self.notify_sync_lock = None
else:
try:
os.rmdir(dir_path)
@ -509,7 +502,7 @@ class FileManager:
f"Destination path is read-only: {dest_root}")
self.check_reserved_path(source_path, False)
self.check_reserved_path(dest_path, True)
async with self.write_mutex:
async with self.sync_lock:
result: Dict[str, Any] = {'item': {'root': dest_root}}
if not os.path.exists(source_path):
raise self.server.error(f"File {source_path} does not exist")
@ -524,7 +517,7 @@ class FileManager:
self._handle_operation_check(source_path)
op_func: Callable[..., str] = shutil.move
result['source_item'] = {
'path': source,
'path': self.get_relative_path(source_root, source_path),
'root': source_root
}
result['action'] = "move_dir" if os.path.isdir(source_path) \
@ -535,25 +528,28 @@ class FileManager:
op_func = shutil.copytree
else:
result['action'] = "create_file"
source_base = os.path.basename(source_path)
if (
os.path.isfile(dest_path) or
os.path.isfile(os.path.join(dest_path, source_base))
):
result['action'] = "modify_file"
op_func = shutil.copy2
self.notify_sync_lock = NotifySyncLock(dest_path)
self.sync_lock.setup(result["action"], dest_path, move_copy=True)
try:
full_dest = await self.event_loop.run_in_thread(
op_func, source_path, dest_path)
if dest_root == "gcodes":
await self.sync_lock.wait_inotify_event(full_dest)
except Exception as e:
self.notify_sync_lock.cancel()
self.notify_sync_lock = None
raise self.server.error(str(e))
self.notify_sync_lock.update_dest(full_dest)
await self.notify_sync_lock.wait(600.)
self.notify_sync_lock = None
raise self.server.error(str(e)) from e
result['item']['path'] = self.get_relative_path(dest_root, full_dest)
return result
async def _handle_zip_files(
self, web_request: WebRequest
) -> Dict[str, Any]:
async with self.write_mutex:
async with self.sync_lock:
store_only = web_request.get_boolean("store_only", False)
suffix = time.strftime("%Y%m%d-%H%M%S", time.localtime())
dest: str = web_request.get_str(
@ -585,9 +581,11 @@ class FileManager:
raise self.server.error(
"At least one file or directory must be specified"
)
self.sync_lock.setup("create_file", dest_path)
await self.event_loop.run_in_thread(
self._zip_files, items, dest_path, store_only
)
rel_dest = dest_path.relative_to(self.file_paths[dest_root])
return {
"destination": {"root": dest_root, "path": str(rel_dest)},
@ -722,10 +720,11 @@ class FileManager:
form_args: Dict[str, Any]
) -> Dict[str, Any]:
# lookup root file path
async with self.write_mutex:
async with self.sync_lock:
try:
upload_info = self._parse_upload_args(form_args)
self.check_reserved_path(upload_info["dest_path"], True)
self.sync_lock.setup("create_file", upload_info["dest_path"])
root = upload_info['root']
if root not in self.full_access_roots:
raise self.server.error(f"Invalid root request: {root}")
@ -787,9 +786,9 @@ class FileManager:
'ext': f_ext
}
async def _finish_gcode_upload(self,
upload_info: Dict[str, Any]
) -> Dict[str, Any]:
async def _finish_gcode_upload(
self, upload_info: Dict[str, Any]
) -> Dict[str, Any]:
# Verify that the operation can be done if attempting to upload a gcode
can_start: bool = False
try:
@ -799,7 +798,6 @@ class FileManager:
if e.status_code == 403:
raise self.server.error(
"File is loaded, upload not permitted", 403)
self.notify_sync_lock = NotifySyncLock(upload_info['dest_path'])
finfo = await self._process_uploaded_file(upload_info)
await self.gcode_metadata.parse_metadata(
upload_info['filename'], finfo).wait()
@ -820,9 +818,6 @@ class FileManager:
await job_queue.queue_job(
upload_info['filename'], check_exists=False)
queued = True
await self.notify_sync_lock.wait(300.)
self.notify_sync_lock = None
return {
'item': {
'path': upload_info['filename'],
@ -833,13 +828,10 @@ class FileManager:
'action': "create_file"
}
async def _finish_standard_upload(self,
upload_info: Dict[str, Any]
) -> Dict[str, Any]:
self.notify_sync_lock = NotifySyncLock(upload_info['dest_path'])
async def _finish_standard_upload(
self, upload_info: Dict[str, Any]
) -> Dict[str, Any]:
await self._process_uploaded_file(upload_info)
await self.notify_sync_lock.wait(5.)
self.notify_sync_lock = None
return {
'item': {
'path': upload_info['filename'],
@ -976,7 +968,7 @@ class FileManager:
return await self.delete_file(file_path)
async def delete_file(self, path: str) -> Dict[str, Any]:
async with self.write_mutex:
async with self.sync_lock:
root, full_path = self._convert_request_path(path)
self.check_reserved_path(full_path, True)
filename = self.get_relative_path(root, full_path)
@ -990,6 +982,7 @@ class FileManager:
except self.server.error as e:
if e.status_code == 403:
raise
self.sync_lock.setup("delete_file", full_path)
os.remove(full_path)
return {
'item': {'path': filename, 'root': root},
@ -999,6 +992,156 @@ class FileManager:
self.inotify_handler.close()
class NotifySyncLock(asyncio.Lock):
def __init__(self, config: ConfigHelper) -> None:
super().__init__()
self.server = config.get_server()
self.action: str = ""
self.dest_path: Optional[pathlib.Path] = None
self.check_pending = False
self.move_copy_fut: Optional[asyncio.Future] = None
self.sync_waiters: List[asyncio.Future] = []
self.pending_paths: Set[pathlib.Path] = set()
self.acquired_paths: Set[pathlib.Path] = set()
def setup(
self, action: str, path: StrOrPath, move_copy: bool = False
) -> None:
if not self.locked():
raise self.server.error(
"Cannot call setup unless the lock has been acquired"
)
# Called by a file manager request. Sets the destination path to sync
# with the inotify handler.
if self.dest_path is not None:
logging.debug(
"NotifySync Error: Setup requested while a path is still pending"
)
self.finish()
if isinstance(path, str):
path = pathlib.Path(path)
self.dest_path = path
self.action = action
self.check_pending = move_copy
async def wait_inotify_event(self, current_path: StrOrPath) -> None:
# Called by a file manager move copy request to wait for metadata
# analysis to complete. We need to be careful here to avoid a deadlock
# or a long wait time when inotify isn't available.
if not self.check_pending:
return
if isinstance(current_path, str):
current_path = pathlib.Path(current_path)
self.dest_path = current_path
if current_path in self.acquired_paths:
# Notifcation has been recieved, no need to wait
return
self.move_copy_fut = self.server.get_event_loop().create_future()
mcfut = self.move_copy_fut
has_pending = current_path in self.pending_paths
timeout = 1200. if has_pending else 1.
for _ in range(5):
try:
await asyncio.wait_for(asyncio.shield(mcfut), timeout)
except asyncio.TimeoutError:
if timeout > 2.:
break
has_pending = current_path in self.pending_paths
timeout = 1200. if has_pending else 1.
else:
break
else:
logging.info(
f"Failed to receive an inotify event, dest path: {current_path}"
)
self.move_copy_fut = None
def finish(self) -> None:
# Called by a file manager request upon completion. The inotify handler
# can now emit the websocket notification
for waiter in self.sync_waiters:
if not waiter.done():
waiter.set_result((self.action, self.dest_path))
self.sync_waiters.clear()
self.dest_path = None
self.action = ""
self.pending_paths.clear()
self.acquired_paths.clear()
if self.move_copy_fut is not None and not self.move_copy_fut.done():
self.move_copy_fut.set_exception(
self.server.error("Move/Copy Interrupted by call to finish")
)
self.move_copy_fut = None
self.check_pending = False
def add_pending_path(self, action: str, pending_path: StrOrPath) -> None:
# Called by the inotify handler whenever a create or move event
# is detected. This is only necessary to track for move/copy actions,
# since we don't get the final destination until the request is complete.
if (
not self.check_pending or
self.dest_path is None or
action != self.action
):
return
if isinstance(pending_path, str):
pending_path = pathlib.Path(pending_path)
if self.dest_path in [pending_path, pending_path.parent]:
self.pending_paths.add(pending_path)
def check_in_request(
self, action: str, inotify_path: StrOrPath
) -> Optional[asyncio.Future]:
# Called by the inotify handler to check if request synchronization
# is necessary. If so, this method will return a future the inotify
# handler can await.
if self.dest_path is None:
return None
if isinstance(inotify_path, str):
inotify_path = pathlib.Path(inotify_path)
waiter: Optional[asyncio.Future] = None
if self.check_pending:
# The final path of move/copy requests aren't known until the request
# complete. It may be the destination path recieved from the request
# or it may be a child as of that path.
if self.move_copy_fut is not None:
# Request is complete, metadata analysis pending. We can explicitly
# check for a path match
if self.dest_path == inotify_path:
if not self.move_copy_fut.done():
self.move_copy_fut.set_result(None)
waiter = self.server.get_event_loop().create_future()
elif self.dest_path in [inotify_path, inotify_path.parent]:
# Request is still processing. This might be the notification for
# the request, it will be checked when the move/copy request awaits
self.acquired_paths.add(inotify_path)
waiter = self.server.get_event_loop().create_future()
elif self.dest_path == inotify_path:
waiter = self.server.get_event_loop().create_future()
if waiter is not None:
self._check_action(action, inotify_path)
self.sync_waiters.append(waiter)
return waiter
def _check_action(self, action: str, path: StrOrPath) -> bool:
# We aren't going to set a hard filter on the sync action, however
# we will log mismatches as they shouldn't occur
if action != self.action:
logging.info(
f"\nInotify action mismatch:\n"
f"Expected action: {self.action}, Inotify action: {action}\n"
f"Requested path: {self.dest_path}\n"
f"Inotify path: {path}\n"
f"Is move/copy: {self.check_pending}"
)
return False
return True
def release(self) -> None:
super().release()
self.finish()
INOTIFY_BUNDLE_TIME = .25
INOTIFY_MOVE_TIME = 1.
@ -1278,76 +1421,20 @@ class InotifyRootNode(InotifyNode):
def is_processing(self) -> bool:
return self.is_processing_metadata
class NotifySyncLock:
def __init__(self, dest_path: str) -> None:
self.wait_fut: Optional[asyncio.Future] = None
self.sync_event = asyncio.Event()
self.dest_path = dest_path
self.notified_paths: Set[str] = set()
self.finished: bool = False
def update_dest(self, dest_path: str) -> None:
self.dest_path = dest_path
def check_need_sync(self, path: str) -> bool:
return self.dest_path in [path, os.path.dirname(path)] \
and not self.finished
async def wait(self, timeout: Optional[float] = None) -> None:
if self.finished or self.wait_fut is not None:
# Can only wait once
return
if self.dest_path not in self.notified_paths:
self.wait_fut = asyncio.Future()
if timeout is None:
await self.wait_fut
else:
try:
await asyncio.wait_for(self.wait_fut, timeout)
except asyncio.TimeoutError:
pass
self.sync_event.set()
self.finished = True
async def sync(self, path, timeout: Optional[float] = None) -> None:
if not self.check_need_sync(path):
return
self.notified_paths.add(path)
if (
self.wait_fut is not None and
not self.wait_fut.done() and
self.dest_path == path
):
self.wait_fut.set_result(None)
# Transfer control to waiter
try:
await asyncio.wait_for(self.sync_event.wait(), timeout)
except Exception:
pass
else:
# Sleep an additional 5ms to give HTTP requests a chance to
# return prior to a notification
await asyncio.sleep(.005)
def cancel(self) -> None:
if self.finished:
return
if self.wait_fut is not None and not self.wait_fut.done():
self.wait_fut.set_result(None)
self.sync_event.set()
self.finished = True
class INotifyHandler:
def __init__(self,
config: ConfigHelper,
file_manager: FileManager,
gcode_metadata: MetadataStorage
) -> None:
def __init__(
self,
config: ConfigHelper,
file_manager: FileManager,
gcode_metadata: MetadataStorage,
sync_lock: NotifySyncLock
) -> None:
self.server = config.get_server()
self.event_loop = self.server.get_event_loop()
self.enable_warn = config.getboolean("enable_inotify_warnings", True)
self.file_manager = file_manager
self.gcode_metadata = gcode_metadata
self.sync_lock = sync_lock
self.inotify = INotify(nonblocking=True)
self.event_loop.add_reader(
self.inotify.fileno(), self._handle_inotify_read)
@ -1581,7 +1668,9 @@ class INotifyHandler:
return
root = node.get_root()
node_path = node.get_path()
full_path = os.path.join(node_path, evt.name)
if evt.mask & iFlags.CREATE:
self.sync_lock.add_pending_path("create_dir", full_path)
logging.debug(f"Inotify directory create: {root}, "
f"{node_path}, {evt.name}")
node.create_child_node(evt.name)
@ -1598,6 +1687,7 @@ class INotifyHandler:
f"{node_path}, {evt.name}")
moved_evt = self.pending_moves.pop(evt.cookie, None)
if moved_evt is not None:
self.sync_lock.add_pending_path("move_dir", full_path)
# Moved from a currently watched directory
prev_parent, child_name, hdl = moved_evt
hdl.cancel()
@ -1606,6 +1696,7 @@ class INotifyHandler:
# Moved from an unwatched directory, for our
# purposes this is the same as creating a
# directory
self.sync_lock.add_pending_path("create_dir", full_path)
node.create_child_node(evt.name)
async def _process_file_event(self,
@ -1619,6 +1710,7 @@ class INotifyHandler:
if evt.mask & iFlags.CREATE:
logging.debug(f"Inotify file create: {root}, "
f"{node_path}, {evt.name}")
self.sync_lock.add_pending_path("create_file", file_path)
node.schedule_file_event(evt.name, "create_file")
if os.path.islink(file_path):
logging.debug(f"Inotify symlink create: {file_path}")
@ -1643,6 +1735,8 @@ class INotifyHandler:
can_notify = not node.is_processing()
if moved_evt is not None:
# Moved from a currently watched directory
if can_notify:
self.sync_lock.add_pending_path("move_file", file_path)
prev_parent, prev_name, hdl = moved_evt
hdl.cancel()
prev_root = prev_parent.get_root()
@ -1658,6 +1752,8 @@ class INotifyHandler:
"move_file", root, file_path,
prev_root, prev_path)
else:
if can_notify:
self.sync_lock.add_pending_path("create_file", file_path)
if root == "gcodes":
mevt = self.parse_gcode_metadata(file_path)
await mevt.wait()
@ -1668,6 +1764,7 @@ class INotifyHandler:
logging.debug("Metadata is processing, suppressing move "
f"notification: {file_path}")
elif evt.mask & iFlags.MODIFY:
self.sync_lock.add_pending_path("modify_file", file_path)
node.schedule_file_event(evt.name, "modify_file")
elif evt.mask & iFlags.CLOSE_WRITE:
logging.debug(f"Inotify writable file closed: {file_path}")
@ -1682,18 +1779,25 @@ class INotifyHandler:
source_path: Optional[str] = None
) -> None:
rel_path = self.file_manager.get_relative_path(root, full_path)
sync_fut = self.sync_lock.check_in_request(action, full_path)
file_info: Dict[str, Any] = {'size': 0, 'modified': 0}
is_valid = True
if os.path.exists(full_path):
try:
file_info = self.file_manager.get_path_info(full_path, root)
except Exception:
is_valid = False
logging.debug(
f"Invalid Filelist Notification Request, root: {root}, "
f"path: {full_path} - Failed to get path info")
return
elif action not in ["delete_file", "delete_dir"]:
is_valid = False
logging.debug(
f"Invalid Filelist Notification Request, root: {root}, "
f"path: {full_path} - Action {action} received for file "
"that does not exit"
)
return
ext = os.path.splitext(rel_path)[-1].lower()
if (
is_valid and
root == "gcodes" and
ext in VALID_GCODE_EXTS and
action == "create_file"
@ -1702,9 +1806,8 @@ class INotifyHandler:
if file_info == prev_info:
logging.debug("Ignoring duplicate 'create_file' "
f"notification: {rel_path}")
is_valid = False
else:
self.create_gcode_notifications[rel_path] = dict(file_info)
return
self.create_gcode_notifications[rel_path] = dict(file_info)
elif rel_path in self.create_gcode_notifications:
del self.create_gcode_notifications[rel_path]
file_info['path'] = rel_path
@ -1714,24 +1817,21 @@ class INotifyHandler:
src_rel_path = self.file_manager.get_relative_path(
source_root, source_path)
result['source_item'] = {'path': src_rel_path, 'root': source_root}
sync_lock = self.file_manager.sync_inotify_event(full_path)
if sync_lock is not None:
if sync_fut is not None:
# Delay this notification so that it occurs after an item
logging.debug(f"Syncing notification: {full_path}")
self.event_loop.register_callback(
self._sync_with_request, result,
sync_lock.sync(full_path), is_valid)
elif is_valid:
self._sync_with_request, result, sync_fut
)
else:
self.server.send_event("file_manager:filelist_changed", result)
async def _sync_with_request(self,
result: Dict[str, Any],
sync_fut: Coroutine,
is_valid: bool
) -> None:
async def _sync_with_request(
self, result: Dict[str, Any], sync_fut: asyncio.Future
) -> None:
await sync_fut
if is_valid:
self.server.send_event("file_manager:filelist_changed", result)
await asyncio.sleep(.005)
self.server.send_event("file_manager:filelist_changed", result)
def close(self) -> None:
self.event_loop.remove_reader(self.inotify.fileno())
@ -1821,6 +1921,9 @@ class MetadataStorage:
self.metadata[key] = val
self.mddb[key] = val
def is_processing(self) -> bool:
return len(self.pending_requests) > 0
def _has_valid_data(self,
fname: str,
path_info: Dict[str, Any]