file_manager: replace references to ioloop with eventloop

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2021-07-10 17:17:10 -04:00
parent e4ecc1febe
commit 13a85fe9e0
1 changed files with 52 additions and 59 deletions

View File

@ -12,9 +12,6 @@ import logging
import json import json
import tempfile import tempfile
import asyncio import asyncio
from concurrent.futures import ThreadPoolExecutor
from tornado.ioloop import IOLoop
from tornado.locks import Event, Lock, Condition
from inotify_simple import INotify from inotify_simple import INotify
from inotify_simple import flags as iFlags from inotify_simple import flags as iFlags
@ -57,6 +54,7 @@ WATCH_FLAGS = iFlags.CREATE | iFlags.DELETE | iFlags.MODIFY \
class FileManager: class FileManager:
def __init__(self, config: ConfigHelper) -> None: def __init__(self, config: ConfigHelper) -> None:
self.server = config.get_server() self.server = config.get_server()
self.event_loop = self.server.get_event_loop()
self.file_paths: Dict[str, str] = {} self.file_paths: Dict[str, str] = {}
db: DBComp = self.server.load_component(config, "database") db: DBComp = self.server.load_component(config, "database")
gc_path: str = db.get_item( gc_path: str = db.get_item(
@ -64,7 +62,7 @@ class FileManager:
self.gcode_metadata = MetadataStorage(self.server, gc_path, db) self.gcode_metadata = MetadataStorage(self.server, gc_path, db)
self.inotify_handler = INotifyHandler(config, self, self.inotify_handler = INotifyHandler(config, self,
self.gcode_metadata) self.gcode_metadata)
self.write_mutex = Lock() self.write_mutex = asyncio.Lock()
self.notify_sync_lock: Optional[NotifySyncLock] = None self.notify_sync_lock: Optional[NotifySyncLock] = None
self.fixed_path_args: Dict[str, Any] = {} self.fixed_path_args: Dict[str, Any] = {}
@ -148,7 +146,7 @@ class FileManager:
path = os.path.realpath(path) path = os.path.realpath(path)
if not os.path.isdir(path) or path == "/": if not os.path.isdir(path) or path == "/":
logging.info( logging.info(
f"\nSupplied path ({path}) for ({root}) a valid. Make sure\n" f"\nSupplied path ({path}) for ({root}) is invalid. Make sure\n"
"that the path exists and is not the file system root.") "that the path exists and is not the file system root.")
return False return False
permissions = os.R_OK permissions = os.R_OK
@ -172,7 +170,7 @@ class FileManager:
# Refresh the file list and add watches # Refresh the file list and add watches
self.inotify_handler.add_root_watch(root, path) self.inotify_handler.add_root_watch(root, path)
else: else:
IOLoop.current().spawn_callback( self.event_loop.register_callback(
self.inotify_handler.notify_filelist_changed, self.inotify_handler.notify_filelist_changed,
"root_update", root, path) "root_update", root, path)
return True return True
@ -257,12 +255,10 @@ class FileManager:
# Make sure that the directory does not contain a file # Make sure that the directory does not contain a file
# loaded by the virtual_sdcard # loaded by the virtual_sdcard
await self._handle_operation_check(dir_path) await self._handle_operation_check(dir_path)
ioloop = IOLoop.current()
self.notify_sync_lock = NotifySyncLock(dir_path) self.notify_sync_lock = NotifySyncLock(dir_path)
try: try:
with ThreadPoolExecutor(max_workers=1) as tpe: await self.event_loop.run_in_thread(
await ioloop.run_in_executor( shutil.rmtree, dir_path)
tpe, shutil.rmtree, dir_path)
except Exception: except Exception:
self.notify_sync_lock.cancel() self.notify_sync_lock.cancel()
self.notify_sync_lock = None self.notify_sync_lock = None
@ -356,12 +352,10 @@ class FileManager:
else: else:
result['action'] = "create_file" result['action'] = "create_file"
op_func = shutil.copy2 op_func = shutil.copy2
ioloop = IOLoop.current()
self.notify_sync_lock = NotifySyncLock(dest_path) self.notify_sync_lock = NotifySyncLock(dest_path)
try: try:
with ThreadPoolExecutor(max_workers=1) as tpe: full_dest = await self.event_loop.run_in_thread(
full_dest = await ioloop.run_in_executor( op_func, source_path, dest_path)
tpe, op_func, source_path, dest_path)
except Exception as e: except Exception as e:
self.notify_sync_lock.cancel() self.notify_sync_lock.cancel()
self.notify_sync_lock = None self.notify_sync_lock = None
@ -411,10 +405,10 @@ class FileManager:
return path_info return path_info
def gen_temp_upload_path(self) -> str: def gen_temp_upload_path(self) -> str:
ioloop = IOLoop.current() loop_time = int(self.event_loop.get_loop_time())
return os.path.join( return os.path.join(
tempfile.gettempdir(), tempfile.gettempdir(),
f"moonraker.upload-{int(ioloop.time())}.mru") f"moonraker.upload-{loop_time}.mru")
async def finalize_upload(self, async def finalize_upload(self,
form_args: Dict[str, Any] form_args: Dict[str, Any]
@ -705,11 +699,12 @@ class InotifyNode:
name: str name: str
) -> None: ) -> None:
self.ihdlr = ihdlr self.ihdlr = ihdlr
self.event_loop = ihdlr.event_loop
self.name = name self.name = name
self.parent_node = parent self.parent_node = parent
self.child_nodes: Dict[str, InotifyNode] = {} self.child_nodes: Dict[str, InotifyNode] = {}
self.watch_desc = self.ihdlr.add_watch(self) self.watch_desc = self.ihdlr.add_watch(self)
self.pending_node_events: Dict[str, object] = {} self.pending_node_events: Dict[str, asyncio.Handle] = {}
self.pending_deleted_children: Set[Tuple[str, bool]] = set() self.pending_deleted_children: Set[Tuple[str, bool]] = set()
self.pending_file_events: Dict[str, str] = {} self.pending_file_events: Dict[str, str] = {}
@ -724,7 +719,7 @@ class InotifyNode:
node_path = self.get_path() node_path = self.get_path()
root = self.get_root() root = self.get_root()
# Scan child nodes for unwatched directories and metadata # Scan child nodes for unwatched directories and metadata
mevts: List[Event] = self.scan_node() mevts: List[asyncio.Event] = self.scan_node()
if mevts: if mevts:
mfuts = [e.wait() for e in mevts] mfuts = [e.wait() for e in mevts]
await asyncio.gather(*mfuts) await asyncio.gather(*mfuts)
@ -753,12 +748,12 @@ class InotifyNode:
def scan_node(self, def scan_node(self,
visited_dirs: Set[Tuple[int, int]] = set() visited_dirs: Set[Tuple[int, int]] = set()
) -> List[Event]: ) -> List[asyncio.Event]:
dir_path = self.get_path() dir_path = self.get_path()
st = os.stat(dir_path) st = os.stat(dir_path)
if st in visited_dirs: if st in visited_dirs:
return [] return []
metadata_events: List[Event] = [] metadata_events: List[asyncio.Event] = []
visited_dirs.add((st.st_dev, st.st_ino)) visited_dirs.add((st.st_dev, st.st_ino))
for fname in os.listdir(dir_path): for fname in os.listdir(dir_path):
if fname[0] == ".": if fname[0] == ".":
@ -885,34 +880,33 @@ class InotifyNode:
self.reset_event(evt_name, timeout) self.reset_event(evt_name, timeout)
return return
callback = getattr(self, f"_finish_{evt_name}") callback = getattr(self, f"_finish_{evt_name}")
hdl = IOLoop.current().call_later(timeout, callback) hdl = self.event_loop.delay_callback(timeout, callback)
self.pending_node_events[evt_name] = hdl self.pending_node_events[evt_name] = hdl
def reset_event(self, evt_name: str, timeout: float) -> None: def reset_event(self, evt_name: str, timeout: float) -> None:
if evt_name in self.pending_node_events: if evt_name in self.pending_node_events:
ioloop = IOLoop.current()
hdl = self.pending_node_events[evt_name] hdl = self.pending_node_events[evt_name]
ioloop.remove_timeout(hdl) hdl.cancel()
callback = getattr(self, f"_finish_{evt_name}") callback = getattr(self, f"_finish_{evt_name}")
hdl = ioloop.call_later(timeout, callback) hdl = self.event_loop.delay_callback(timeout, callback)
self.pending_node_events[evt_name] = hdl self.pending_node_events[evt_name] = hdl
def stop_event(self, evt_name: str) -> None: def stop_event(self, evt_name: str) -> None:
if evt_name in self.pending_node_events: if evt_name in self.pending_node_events:
hdl = self.pending_node_events[evt_name] hdl = self.pending_node_events[evt_name]
IOLoop.current().remove_timeout(hdl) hdl.cancel()
def remove_event(self, evt_name: str) -> None: def remove_event(self, evt_name: str) -> None:
hdl = self.pending_node_events.pop(evt_name, None) hdl = self.pending_node_events.pop(evt_name, None)
if hdl is not None: if hdl is not None:
IOLoop.current().remove_timeout(hdl) hdl.cancel()
def clear_events(self, include_children: bool = True) -> None: def clear_events(self, include_children: bool = True) -> None:
if include_children: if include_children:
for child in self.child_nodes.values(): for child in self.child_nodes.values():
child.clear_events(include_children) child.clear_events(include_children)
for hdl in self.pending_node_events.values(): for hdl in self.pending_node_events.values():
IOLoop.current().remove_timeout(hdl) hdl.cancel()
self.pending_node_events.clear() self.pending_node_events.clear()
self.pending_deleted_children.clear() self.pending_deleted_children.clear()
self.pending_file_events.clear() self.pending_file_events.clear()
@ -945,7 +939,7 @@ class InotifyRootNode(InotifyNode):
class NotifySyncLock: class NotifySyncLock:
def __init__(self, dest_path: str) -> None: def __init__(self, dest_path: str) -> None:
self.wait_fut: Optional[asyncio.Future] = None self.wait_fut: Optional[asyncio.Future] = None
self.sync_condition = Condition() self.sync_event = asyncio.Event()
self.dest_path = dest_path self.dest_path = dest_path
self.notified_paths: Set[str] = set() self.notified_paths: Set[str] = set()
self.finished: bool = False self.finished: bool = False
@ -970,7 +964,7 @@ class NotifySyncLock:
await asyncio.wait_for(self.wait_fut, timeout) await asyncio.wait_for(self.wait_fut, timeout)
except asyncio.TimeoutError: except asyncio.TimeoutError:
pass pass
self.sync_condition.notify_all() self.sync_event.set()
self.finished = True self.finished = True
async def sync(self, path, timeout: Optional[float] = None) -> None: async def sync(self, path, timeout: Optional[float] = None) -> None:
@ -980,10 +974,8 @@ class NotifySyncLock:
if self.wait_fut is not None and self.dest_path == path: if self.wait_fut is not None and self.dest_path == path:
self.wait_fut.set_result(None) self.wait_fut.set_result(None)
# Transfer control to waiter # Transfer control to waiter
if timeout is not None:
timeout = IOLoop.current().time() + timeout
try: try:
await self.sync_condition.wait(timeout) await asyncio.wait_for(self.sync_event.wait(), timeout)
except Exception: except Exception:
pass pass
else: else:
@ -996,7 +988,7 @@ class NotifySyncLock:
return return
if self.wait_fut is not None and not self.wait_fut.done(): if self.wait_fut is not None and not self.wait_fut.done():
self.wait_fut.set_result(None) self.wait_fut.set_result(None)
self.sync_condition.notify_all() self.sync_event.set()
self.finished = True self.finished = True
class INotifyHandler: class INotifyHandler:
@ -1006,21 +998,23 @@ class INotifyHandler:
gcode_metadata: MetadataStorage gcode_metadata: MetadataStorage
) -> None: ) -> None:
self.server = config.get_server() self.server = config.get_server()
self.event_loop = self.server.get_event_loop()
self.debug_enabled = config['server'].getboolean( self.debug_enabled = config['server'].getboolean(
'enable_debug_logging', False) 'enable_debug_logging', False)
self.file_manager = file_manager self.file_manager = file_manager
self.gcode_metadata = gcode_metadata self.gcode_metadata = gcode_metadata
self.inotify = INotify(nonblocking=True) self.inotify = INotify(nonblocking=True)
IOLoop.current().add_handler( self.event_loop.add_reader(
self.inotify.fileno(), self._handle_inotify_read, self.inotify.fileno(), self._handle_inotify_read)
IOLoop.READ | IOLoop.ERROR)
self.event_loop_busy: bool = False self.node_loop_busy: bool = False
self.pending_inotify_events: List[InotifyEvent] = [] self.pending_inotify_events: List[InotifyEvent] = []
self.watched_roots: Dict[str, InotifyRootNode] = {} self.watched_roots: Dict[str, InotifyRootNode] = {}
self.watched_nodes: Dict[int, InotifyNode] = {} self.watched_nodes: Dict[int, InotifyNode] = {}
self.pending_moves: Dict[int, Tuple[InotifyNode, str, object]] = {} self.pending_moves: Dict[
int, Tuple[InotifyNode, str, asyncio.Handle]] = {}
def add_root_watch(self, root: str, root_path: str) -> None: def add_root_watch(self, root: str, root_path: str) -> None:
if root not in FULL_ACCESS_ROOTS: if root not in FULL_ACCESS_ROOTS:
@ -1034,11 +1028,11 @@ class INotifyHandler:
self.watched_roots[root] = root_node self.watched_roots[root] = root_node
mevts = root_node.scan_node() mevts = root_node.scan_node()
self.log_nodes() self.log_nodes()
IOLoop.current().spawn_callback( self.event_loop.register_callback(
self._notify_root_updated, mevts, root, root_path) self._notify_root_updated, mevts, root, root_path)
async def _notify_root_updated(self, async def _notify_root_updated(self,
mevts: List[Event], mevts: List[asyncio.Event],
root: str, root: str,
root_path: str root_path: str
) -> None: ) -> None:
@ -1119,14 +1113,14 @@ class INotifyHandler:
f"Watch: {wdesc}" f"Watch: {wdesc}"
logging.debug(debug_msg) logging.debug(debug_msg)
def parse_gcode_metadata(self, file_path: str) -> Event: def parse_gcode_metadata(self, file_path: str) -> asyncio.Event:
rel_path = self.file_manager.get_relative_path("gcodes", file_path) rel_path = self.file_manager.get_relative_path("gcodes", file_path)
try: try:
path_info = self.file_manager.get_path_info(file_path) path_info = self.file_manager.get_path_info(file_path)
except Exception: except Exception:
logging.exception( logging.exception(
f"Error retreiving path info for file {file_path}") f"Error retreiving path info for file {file_path}")
evt = Event() evt = asyncio.Event()
evt.set() evt.set()
return evt return evt
ext = os.path.splitext(file_path)[-1].lower() ext = os.path.splitext(file_path)[-1].lower()
@ -1159,15 +1153,12 @@ class INotifyHandler:
parent_node: InotifyNode, parent_node: InotifyNode,
is_dir: bool is_dir: bool
) -> None: ) -> None:
hdl = IOLoop.current().call_later( hdl = self.event_loop.delay_callback(
INOTIFY_MOVE_TIME, self._handle_move_timeout, INOTIFY_MOVE_TIME, self._handle_move_timeout,
evt.cookie, is_dir) evt.cookie, is_dir)
self.pending_moves[evt.cookie] = (parent_node, evt.name, hdl) self.pending_moves[evt.cookie] = (parent_node, evt.name, hdl)
def _handle_inotify_read(self, fd: int, events: int) -> None: def _handle_inotify_read(self) -> None:
if events & IOLoop.ERROR:
logging.info("INotify Read Error")
return
evt: InotifyEvent evt: InotifyEvent
for evt in self.inotify.read(timeout=0): for evt in self.inotify.read(timeout=0):
if evt.mask & iFlags.IGNORED: if evt.mask & iFlags.IGNORED:
@ -1180,9 +1171,9 @@ class INotifyHandler:
f"flags: {flags}") f"flags: {flags}")
continue continue
self.pending_inotify_events.append(evt) self.pending_inotify_events.append(evt)
if not self.event_loop_busy: if not self.node_loop_busy:
self.event_loop_busy = True self.node_loop_busy = True
IOLoop.current().spawn_callback(self._process_inotify_events) self.event_loop.register_callback(self._process_inotify_events)
async def _process_inotify_events(self) -> None: async def _process_inotify_events(self) -> None:
while self.pending_inotify_events: while self.pending_inotify_events:
@ -1192,7 +1183,7 @@ class INotifyHandler:
await self._process_dir_event(evt, node) await self._process_dir_event(evt, node)
else: else:
await self._process_file_event(evt, node) await self._process_file_event(evt, node)
self.event_loop_busy = False self.node_loop_busy = False
async def _process_dir_event(self, async def _process_dir_event(self,
evt: InotifyEvent, evt: InotifyEvent,
@ -1222,7 +1213,7 @@ class INotifyHandler:
if moved_evt is not None: if moved_evt is not None:
# Moved from a currently watched directory # Moved from a currently watched directory
prev_parent, child_name, hdl = moved_evt prev_parent, child_name, hdl = moved_evt
IOLoop.current().remove_timeout(hdl) hdl.cancel()
await prev_parent.move_child_node(child_name, evt.name, node) await prev_parent.move_child_node(child_name, evt.name, node)
else: else:
# Moved from an unwatched directory, for our # Moved from an unwatched directory, for our
@ -1260,7 +1251,7 @@ class INotifyHandler:
if moved_evt is not None: if moved_evt is not None:
# Moved from a currently watched directory # Moved from a currently watched directory
prev_parent, prev_name, hdl = moved_evt prev_parent, prev_name, hdl = moved_evt
IOLoop.current().remove_timeout(hdl) hdl.cancel()
prev_root = prev_parent.get_root() prev_root = prev_parent.get_root()
prev_path = os.path.join(prev_parent.get_path(), prev_name) prev_path = os.path.join(prev_parent.get_path(), prev_name)
move_success = await self.try_move_metadata( move_success = await self.try_move_metadata(
@ -1314,7 +1305,7 @@ class INotifyHandler:
if sync_lock is not None: if sync_lock is not None:
# Delay this notification so that it occurs after an item # Delay this notification so that it occurs after an item
logging.debug(f"Syncing notification: {full_path}") logging.debug(f"Syncing notification: {full_path}")
IOLoop.current().spawn_callback( self.event_loop.register_callback(
self._sync_with_request, result, self._sync_with_request, result,
sync_lock.sync(full_path), is_valid) sync_lock.sync(full_path), is_valid)
elif is_valid and self._check_need_notify(file_info): elif is_valid and self._check_need_notify(file_info):
@ -1337,7 +1328,7 @@ class INotifyHandler:
return True return True
def close(self) -> None: def close(self) -> None:
IOLoop.current().remove_handler(self.inotify.fileno()) self.event_loop.remove_reader(self.inotify.fileno())
for watch in self.watched_nodes.keys(): for watch in self.watched_nodes.keys():
try: try:
self.inotify.rm_watch(watch) self.inotify.rm_watch(watch)
@ -1368,7 +1359,8 @@ class MetadataStorage:
db.insert_item( db.insert_item(
"moonraker", "file_manager.metadata_version", "moonraker", "file_manager.metadata_version",
METADATA_VERSION) METADATA_VERSION)
self.pending_requests: Dict[str, Tuple[Dict[str, Any], Event]] = {} self.pending_requests: Dict[
str, Tuple[Dict[str, Any], asyncio.Event]] = {}
self.busy: bool = False self.busy: bool = False
if self.gc_path: if self.gc_path:
# Check for removed gcode files while moonraker was shutdown # Check for removed gcode files while moonraker was shutdown
@ -1477,8 +1469,8 @@ class MetadataStorage:
def parse_metadata(self, def parse_metadata(self,
fname: str, fname: str,
path_info: Dict[str, Any] path_info: Dict[str, Any]
) -> Event: ) -> asyncio.Event:
mevt = Event() mevt = asyncio.Event()
ext = os.path.splitext(fname)[1] ext = os.path.splitext(fname)[1]
if fname in self.pending_requests or \ if fname in self.pending_requests or \
ext not in VALID_GCODE_EXTS or \ ext not in VALID_GCODE_EXTS or \
@ -1490,7 +1482,8 @@ class MetadataStorage:
if self.busy: if self.busy:
return mevt return mevt
self.busy = True self.busy = True
IOLoop.current().spawn_callback(self._process_metadata_update) event_loop = self.server.get_event_loop()
event_loop.register_callback(self._process_metadata_update)
return mevt return mevt
async def _process_metadata_update(self) -> None: async def _process_metadata_update(self) -> None: