file_manager: don't send filelist notifications until metadata is processed

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Arksine 2021-03-27 08:09:42 -04:00 committed by Eric Callahan
parent 9da74f7bc6
commit 5fe9f1a217
1 changed files with 57 additions and 30 deletions

View File

@ -9,6 +9,7 @@ import shutil
import logging import logging
import json import json
import tempfile import tempfile
import asyncio
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
from tornado.ioloop import IOLoop from tornado.ioloop import IOLoop
from tornado.locks import Event from tornado.locks import Event
@ -396,9 +397,8 @@ class FileManager:
# Don't start if another print is currently in progress # Don't start if another print is currently in progress
start_print = start_print and not print_ongoing start_print = start_print and not print_ongoing
finfo = self._process_uploaded_file(upload_info) finfo = self._process_uploaded_file(upload_info)
evt = self.gcode_metadata.parse_metadata( await self.gcode_metadata.parse_metadata(
upload_info['filename'], finfo) upload_info['filename'], finfo).wait()
await evt.wait()
if start_print: if start_print:
# Make a Klippy Request to "Start Print" # Make a Klippy Request to "Start Print"
klippy_apis = self.server.lookup_component('klippy_apis') klippy_apis = self.server.lookup_component('klippy_apis')
@ -564,7 +564,7 @@ class InotifyNode:
self.pending_deleted_children = set() self.pending_deleted_children = set()
self.pending_file_events = {} self.pending_file_events = {}
def _finish_create_node(self): async def _finish_create_node(self):
# Finish a node's creation. All children that were created # Finish a node's creation. All children that were created
# with this node (ie: a directory is copied) are bundled into # with this node (ie: a directory is copied) are bundled into
# this notification. We also scan the node to extract metadata # this notification. We also scan the node to extract metadata
@ -575,7 +575,10 @@ class InotifyNode:
node_path = self.get_path() node_path = self.get_path()
root = self.get_root() root = self.get_root()
# Scan child nodes for unwatched directories and metadata # Scan child nodes for unwatched directories and metadata
self.scan_node(notify_created=False) mevts = self.scan_node(notify_created=False)
if mevts:
mfuts = [e.wait() for e in mevts]
await asyncio.gather(*mfuts)
self.ihdlr.log_nodes() self.ihdlr.log_nodes()
self.ihdlr.notify_filelist_changed( self.ihdlr.notify_filelist_changed(
"create_dir", root, node_path) "create_dir", root, node_path)
@ -603,7 +606,8 @@ class InotifyNode:
dir_path = self.get_path() dir_path = self.get_path()
st = os.stat(dir_path) st = os.stat(dir_path)
if st in visited_dirs: if st in visited_dirs:
return return []
metadata_events = []
visited_dirs.add((st.st_dev, st.st_ino)) visited_dirs.add((st.st_dev, st.st_ino))
for fname in os.listdir(dir_path): for fname in os.listdir(dir_path):
if fname[0] == ".": if fname[0] == ".":
@ -612,13 +616,16 @@ class InotifyNode:
ext = os.path.splitext(fname)[-1].lower() ext = os.path.splitext(fname)[-1].lower()
if os.path.isdir(item_path): if os.path.isdir(item_path):
new_child = self.create_child_node(fname, notify_created) new_child = self.create_child_node(fname, notify_created)
new_child.scan_node(notify_created, visited_dirs) metadata_events.extend(new_child.scan_node(
notify_created, visited_dirs))
elif os.path.isfile(item_path) and \ elif os.path.isfile(item_path) and \
self.get_root() == "gcodes" and \ self.get_root() == "gcodes" and \
ext in VALID_GCODE_EXTS: ext in VALID_GCODE_EXTS:
self.ihdlr.parse_gcode_metadata(item_path) mevt = self.ihdlr.parse_gcode_metadata(item_path)
metadata_events.append(mevt)
return metadata_events
def move_child_node(self, child_name, new_name, new_parent): async def move_child_node(self, child_name, new_name, new_parent):
child_node = self.pop_child_node(child_name) child_node = self.pop_child_node(child_name)
if child_node is None: if child_node is None:
logging.info(f"No child for node at path: {self.get_path()}") logging.info(f"No child for node at path: {self.get_path()}")
@ -632,7 +639,10 @@ class InotifyNode:
logging.debug(f"Moving node from '{prev_path}' to '{new_path}'") logging.debug(f"Moving node from '{prev_path}' to '{new_path}'")
# TODO: It is possible to "move" metadata rather # TODO: It is possible to "move" metadata rather
# than rescan. # than rescan.
child_node.scan_node(notify_created=False) mevts = child_node.scan_node(notify_created=False)
if mevts:
mfuts = [e.wait() for e in mevts]
await asyncio.gather(*mfuts)
self.ihdlr.notify_filelist_changed( self.ihdlr.notify_filelist_changed(
"move_dir", new_root, new_path, "move_dir", new_root, new_path,
prev_root, prev_path) prev_root, prev_path)
@ -645,7 +655,7 @@ class InotifyNode:
pending_node.stop_event("create_node") pending_node.stop_event("create_node")
self.pending_file_events[file_name] = evt_name self.pending_file_events[file_name] = evt_name
def complete_file_write(self, file_name): async def complete_file_write(self, file_name):
evt_name = self.pending_file_events.pop(file_name, None) evt_name = self.pending_file_events.pop(file_name, None)
if evt_name is None: if evt_name is None:
logging.info(f"Invalid file write event: {file_name}") logging.info(f"Invalid file write event: {file_name}")
@ -659,10 +669,11 @@ class InotifyNode:
file_path = os.path.join(self.get_path(), file_name) file_path = os.path.join(self.get_path(), file_name)
root = self.get_root() root = self.get_root()
if root == "gcodes": if root == "gcodes":
self.ihdlr.parse_gcode_metadata(file_path) mevt = self.ihdlr.parse_gcode_metadata(file_path)
if os.path.splitext(file_path)[1].lower() == ".ufp": if os.path.splitext(file_path)[1].lower() == ".ufp":
# don't notify .ufp files # don't notify .ufp files
return return
await mevt.wait()
self.ihdlr.notify_filelist_changed(evt_name, root, file_path) self.ihdlr.notify_filelist_changed(evt_name, root, file_path)
def add_child_node(self, node): def add_child_node(self, node):
@ -779,6 +790,9 @@ class INotifyHandler:
self.inotify.fileno(), self._handle_inotify_read, self.inotify.fileno(), self._handle_inotify_read,
IOLoop.READ | IOLoop.ERROR) IOLoop.READ | IOLoop.ERROR)
self.event_loop_busy = False
self.pending_inotify_events = []
self.watched_roots = {} self.watched_roots = {}
self.watched_nodes = {} self.watched_nodes = {}
self.pending_moves = {} self.pending_moves = {}
@ -838,13 +852,16 @@ class INotifyHandler:
logging.info( logging.info(
f"File at path '{file_path}' is not in the gcode path" f"File at path '{file_path}' is not in the gcode path"
", metadata extraction aborted") ", metadata extraction aborted")
return mevt = Event()
mevt.set()
return mevt
path_info = self.file_manager.get_path_info(file_path) path_info = self.file_manager.get_path_info(file_path)
ext = os.path.splitext(file_path)[-1].lower() ext = os.path.splitext(file_path)[-1].lower()
if ext == ".ufp": if ext == ".ufp":
rel_path = os.path.splitext(rel_path)[0] + ".gcode" rel_path = os.path.splitext(rel_path)[0] + ".gcode"
path_info['ufp_path'] = file_path path_info['ufp_path'] = file_path
self.gcode_metadata.parse_metadata(rel_path, path_info, notify=True) return self.gcode_metadata.parse_metadata(
rel_path, path_info, notify=True)
def _handle_move_timeout(self, cookie, is_dir): def _handle_move_timeout(self, cookie, is_dir):
if cookie not in self.pending_moves: if cookie not in self.pending_moves:
@ -887,13 +904,22 @@ class INotifyHandler:
f"not currently tracked: name: {evt.name}, " f"not currently tracked: name: {evt.name}, "
f"flags: {flags}") f"flags: {flags}")
continue continue
self.pending_inotify_events.append(evt)
if not self.event_loop_busy:
self.event_loop_busy = True
IOLoop.current().spawn_callback(self._process_inotify_events)
async def _process_inotify_events(self):
while self.pending_inotify_events:
evt = self.pending_inotify_events.pop(0)
node = self.watched_nodes[evt.wd] node = self.watched_nodes[evt.wd]
if evt.mask & iFlags.ISDIR: if evt.mask & iFlags.ISDIR:
self._process_dir_event(evt, node) await self._process_dir_event(evt, node)
else: else:
self._process_file_event(evt, node) await self._process_file_event(evt, node)
self.event_loop_busy = False
def _process_dir_event(self, evt, node): async def _process_dir_event(self, evt, node):
if evt.name and evt.name[0] == ".": if evt.name and evt.name[0] == ".":
# ignore changes to the hidden directories # ignore changes to the hidden directories
return return
@ -919,14 +945,14 @@ class INotifyHandler:
# Moved from a currently watched directory # Moved from a currently watched directory
prev_parent, child_name, hdl = moved_evt prev_parent, child_name, hdl = moved_evt
IOLoop.current().remove_timeout(hdl) IOLoop.current().remove_timeout(hdl)
prev_parent.move_child_node(child_name, evt.name, node) await prev_parent.move_child_node(child_name, evt.name, node)
else: else:
# Moved from an unwatched directory, for our # Moved from an unwatched directory, for our
# purposes this is the same as creating a # purposes this is the same as creating a
# directory # directory
node.create_child_node(evt.name) node.create_child_node(evt.name)
def _process_file_event(self, evt, node): async def _process_file_event(self, evt, node):
ext = os.path.splitext(evt.name)[-1].lower() ext = os.path.splitext(evt.name)[-1].lower()
root = node.get_root() root = node.get_root()
node_path = node.get_path() node_path = node.get_path()
@ -953,7 +979,8 @@ class INotifyHandler:
f"{node_path}, {evt.name}") f"{node_path}, {evt.name}")
file_path = os.path.join(node_path, evt.name) file_path = os.path.join(node_path, evt.name)
if root == "gcodes": if root == "gcodes":
self.parse_gcode_metadata(file_path) mevt = self.parse_gcode_metadata(file_path)
await mevt.wait()
moved_evt = self.pending_moves.pop(evt.cookie, None) moved_evt = self.pending_moves.pop(evt.cookie, None)
if moved_evt is not None: if moved_evt is not None:
# Moved from a currently watched directory # Moved from a currently watched directory
@ -973,7 +1000,7 @@ class INotifyHandler:
file_path = os.path.join(node_path, evt.name) file_path = os.path.join(node_path, evt.name)
logging.debug(f"Inotify writable file closed: {file_path}") logging.debug(f"Inotify writable file closed: {file_path}")
# Only process files that have been created or modified # Only process files that have been created or modified
node.complete_file_write(evt.name) await node.complete_file_write(evt.name)
def notify_filelist_changed(self, action, root, full_path, def notify_filelist_changed(self, action, root, full_path,
source_root=None, source_path=None): source_root=None, source_path=None):
@ -1079,25 +1106,25 @@ class MetadataStorage:
logging.debug(f"Error removing thumb at {thumb_path}") logging.debug(f"Error removing thumb at {thumb_path}")
def parse_metadata(self, fname, path_info, notify=False): def parse_metadata(self, fname, path_info, notify=False):
evt = Event() mevt = Event()
if fname in self.pending_requests or \ if fname in self.pending_requests or \
self._has_valid_data(fname, path_info): self._has_valid_data(fname, path_info):
# request already pending or not necessary # request already pending or not necessary
evt.set() mevt.set()
return evt return mevt
self.pending_requests[fname] = (path_info, notify, evt) self.pending_requests[fname] = (path_info, notify, mevt)
if self.busy: if self.busy:
return evt return mevt
self.busy = True self.busy = True
IOLoop.current().spawn_callback(self._process_metadata_update) IOLoop.current().spawn_callback(self._process_metadata_update)
return evt return mevt
async def _process_metadata_update(self): async def _process_metadata_update(self):
while self.pending_requests: while self.pending_requests:
fname, (path_info, notify, evt) = \ fname, (path_info, notify, mevt) = \
self.pending_requests.popitem() self.pending_requests.popitem()
if self._has_valid_data(fname, path_info): if self._has_valid_data(fname, path_info):
evt.set() mevt.set()
continue continue
ufp_path = path_info.get('ufp_path', None) ufp_path = path_info.get('ufp_path', None)
retries = 3 retries = 3
@ -1119,7 +1146,7 @@ class MetadataStorage:
} }
logging.info( logging.info(
f"Unable to extract medatadata from file: {fname}") f"Unable to extract medatadata from file: {fname}")
evt.set() mevt.set()
self.busy = False self.busy = False
async def _run_extract_metadata(self, filename, ufp_path, notify): async def _run_extract_metadata(self, filename, ufp_path, notify):