file_manager: allow registration of full access directories

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2021-10-27 07:51:38 -04:00
parent 4c42b8d072
commit 734f295822
2 changed files with 24 additions and 16 deletions

View File

@ -44,7 +44,6 @@ if TYPE_CHECKING:
_T = TypeVar("_T")
VALID_GCODE_EXTS = ['.gcode', '.g', '.gco', '.ufp', '.nc']
FULL_ACCESS_ROOTS = ["gcodes", "config"]
METADATA_SCRIPT = os.path.abspath(os.path.join(
os.path.dirname(__file__), "metadata.py"))
WATCH_FLAGS = iFlags.CREATE | iFlags.DELETE | iFlags.MODIFY \
@ -55,6 +54,7 @@ class FileManager:
def __init__(self, config: ConfigHelper) -> None:
self.server = config.get_server()
self.event_loop = self.server.get_event_loop()
self.full_access_roots: Set[str] = set()
self.file_paths: Dict[str, str] = {}
db: DBComp = self.server.load_component(config, "database")
gc_path: str = db.get_item(
@ -93,7 +93,8 @@ class FileManager:
# Register Klippy Configuration Path
config_path = config.get('config_path', None)
if config_path is not None:
ret = self.register_directory('config', config_path)
ret = self.register_directory('config', config_path,
full_access=True)
if not ret:
raise config.error(
"Option 'config_path' is not a valid directory")
@ -108,7 +109,7 @@ class FileManager:
# If gcode path is in the database, register it
if gc_path:
self.register_directory('gcodes', gc_path)
self.register_directory('gcodes', gc_path, full_access=True)
def _update_fixed_paths(self) -> None:
kinfo = self.server.get_klippy_info()
@ -138,7 +139,11 @@ class FileManager:
self.server.register_static_file_handler(
"klippy.log", log_path, force=True)
def register_directory(self, root: str, path: Optional[str]) -> bool:
def register_directory(self,
root: str,
path: Optional[str],
full_access: bool = False
) -> bool:
if path is None:
return False
path = os.path.abspath(os.path.expanduser(path))
@ -150,8 +155,9 @@ class FileManager:
"that the path exists and is not the file system root.")
return False
permissions = os.R_OK
if root in FULL_ACCESS_ROOTS:
if full_access:
permissions |= os.W_OK
self.full_access_roots.add(root)
if not os.access(path, permissions):
logging.info(
f"\nMoonraker does not have permission to access path "
@ -166,7 +172,7 @@ class FileManager:
moon_db["file_manager.gcode_path"] = path
# scan for metadata changes
self.gcode_metadata.update_gcode_path(path)
if root in FULL_ACCESS_ROOTS:
if full_access:
# Refresh the file list and add watches
self.inotify_handler.add_root_watch(root, path)
else:
@ -235,13 +241,13 @@ class FileManager:
result = {
'item': {'path': directory, 'root': root},
'action': "create_dir"}
if action == 'POST' and root in FULL_ACCESS_ROOTS:
if action == 'POST' and root in self.full_access_roots:
# Create a new directory
try:
os.mkdir(dir_path)
except Exception as e:
raise self.server.error(str(e))
elif action == 'DELETE' and root in FULL_ACCESS_ROOTS:
elif action == 'DELETE' and root in self.full_access_roots:
# Remove a directory
result['action'] = "delete_dir"
if directory.strip("/") == root:
@ -322,7 +328,7 @@ class FileManager:
ep = web_request.get_endpoint()
source_root, source_path = self._convert_request_path(source)
dest_root, dest_path = self._convert_request_path(destination)
if dest_root not in FULL_ACCESS_ROOTS:
if dest_root not in self.full_access_roots:
raise self.server.error(
f"Destination path is read-only: {dest_root}")
async with self.write_mutex:
@ -333,7 +339,7 @@ class FileManager:
if os.path.exists(dest_path):
await self._handle_operation_check(dest_path)
if ep == "/server/files/move":
if source_root not in FULL_ACCESS_ROOTS:
if source_root not in self.full_access_roots:
raise self.server.error(
f"Source path is read-only, cannot move: {source_root}")
# if moving the file, make sure the source is not in use
@ -406,7 +412,7 @@ class FileManager:
if (
(os.path.islink(path) and os.path.isfile(real_path)) or
not os.access(real_path, os.R_OK | os.W_OK) or
root not in FULL_ACCESS_ROOTS
root not in self.full_access_roots
):
permissions = "r"
return {
@ -431,7 +437,7 @@ class FileManager:
root = upload_info['root']
if root == "gcodes" and upload_info['ext'] in VALID_GCODE_EXTS:
result = await self._finish_gcode_upload(upload_info)
elif root in FULL_ACCESS_ROOTS:
elif root in self.full_access_roots:
result = await self._finish_standard_upload(upload_info)
else:
raise self.server.error(f"Invalid root request: {root}")
@ -677,7 +683,10 @@ class FileManager:
f"Path not available for DELETE: {path}", 405)
root = parts[0]
filename = parts[1]
if root not in self.file_paths or root not in FULL_ACCESS_ROOTS:
if (
root not in self.file_paths or
root not in self.full_access_roots
):
raise self.server.error(
f"Path not available for DELETE: {path}", 405)
root_path = self.file_paths[root]
@ -1027,8 +1036,6 @@ class INotifyHandler:
def add_root_watch(self, root: str, root_path: str) -> None:
if root not in FULL_ACCESS_ROOTS:
return
# remove all exisiting watches on root
if root in self.watched_roots:
old_root = self.watched_roots.pop(root)

View File

@ -441,7 +441,8 @@ class Server:
if vsd_path is not None:
file_manager: FileManager = self.lookup_component(
'file_manager')
file_manager.register_directory('gcodes', vsd_path)
file_manager.register_directory('gcodes', vsd_path,
full_access=True)
else:
logging.info(
"Configuration for [virtual_sdcard] not found,"