file_manager: Restrict full access to "gcodes" and "config" paths

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Arksine 2020-08-05 07:19:27 -04:00
parent 72bba79830
commit 43d54f078e
1 changed files with 19 additions and 12 deletions

View File

@ -13,6 +13,7 @@ from tornado.ioloop import IOLoop
from tornado.locks import Lock from tornado.locks import Lock
VALID_GCODE_EXTS = ['gcode', 'g', 'gco'] VALID_GCODE_EXTS = ['gcode', 'g', 'gco']
FULL_ACCESS_ROOTS = ["gcodes", "config"]
METADATA_SCRIPT = os.path.join( METADATA_SCRIPT = os.path.join(
os.path.dirname(__file__), "../../scripts/extract_metadata.py") os.path.dirname(__file__), "../../scripts/extract_metadata.py")
@ -106,14 +107,14 @@ class FileManager:
if method == 'GET': if method == 'GET':
# Get list of files and subdirectories for this target # Get list of files and subdirectories for this target
return self._list_directory(dir_path) return self._list_directory(dir_path)
elif method == 'POST' and base in ["gcodes", "config"]: elif method == 'POST' and base in FULL_ACCESS_ROOTS:
# Create a new directory # Create a new directory
try: try:
os.mkdir(dir_path) os.mkdir(dir_path)
except Exception as e: except Exception as e:
raise self.server.error(str(e)) raise self.server.error(str(e))
self.notify_filelist_changed(url_path, "add_directory", base) self.notify_filelist_changed(url_path, "add_directory", base)
elif method == 'DELETE' and base in ["gcodes", "config"]: elif method == 'DELETE' and base in FULL_ACCESS_ROOTS:
# Remove a directory # Remove a directory
if directory.strip("/") == base: if directory.strip("/") == base:
raise self.server.error( raise self.server.error(
@ -183,10 +184,9 @@ class FileManager:
"File move/copy request missing destination") "File move/copy request missing destination")
source_base, src_url_path, source_path = self._convert_path(source) source_base, src_url_path, source_path = self._convert_path(source)
dest_base, dst_url_path, dest_path = self._convert_path(destination) dest_base, dst_url_path, dest_path = self._convert_path(destination)
if source_base != dest_base or source_base not in ["gcodes", "config"]: if dest_base not in FULL_ACCESS_ROOTS:
raise self.server.error( raise self.server.error(
"Unsupported root directory: source=%s base=%s" % "Destination path is read-only: %s" % (dest_base))
(source_base, dest_base))
if not os.path.exists(source_path): if not os.path.exists(source_path):
raise self.server.error("File %s does not exist" % (source_path)) raise self.server.error("File %s does not exist" % (source_path))
# make sure the destination is not in use # make sure the destination is not in use
@ -194,6 +194,10 @@ class FileManager:
await self._handle_operation_check(dest_path) await self._handle_operation_check(dest_path)
action = "" action = ""
if path == "/server/files/move": if path == "/server/files/move":
if source_base not in FULL_ACCESS_ROOTS:
raise self.server.error(
"Source path is read-only, cannot move: %s"
% (source_base))
# if moving the file, make sure the source is not in use # if moving the file, make sure the source is not in use
await self._handle_operation_check(source_path) await self._handle_operation_check(source_path)
try: try:
@ -211,7 +215,8 @@ class FileManager:
raise self.server.error(str(e)) raise self.server.error(str(e))
action = "file_copy" action = "file_copy"
self.notify_filelist_changed( self.notify_filelist_changed(
dst_url_path, action, dest_base, {'prev_file': src_url_path}) dst_url_path, action, dest_base,
{'prev_file': src_url_path, 'prev_root': source_base})
return "ok" return "ok"
def _list_directory(self, path): def _list_directory(self, path):
@ -309,15 +314,17 @@ class FileManager:
root = self._get_argument(request, 'root', "gcodes") root = self._get_argument(request, 'root', "gcodes")
if root == "gcodes": if root == "gcodes":
result = await self._do_gcode_upload(request) result = await self._do_gcode_upload(request)
else: elif root in FULL_ACCESS_ROOTS:
result = self._do_standard_upload(request, root) result = self._do_standard_upload(request, root)
else:
raise self.server.error("Invalid root request: %s" % (root))
return result return result
async def _do_gcode_upload(self, request): async def _do_gcode_upload(self, request):
start_print = print_ongoing = False start_print = print_ongoing = False
base_path = self.file_paths.get("gcodes", "") base_path = self.file_paths.get("gcodes", "")
if not base_path: if not base_path:
raise self.server.error(400, "Gcodes root not available") raise self.server.error("Gcodes root not available")
start_print = self._get_argument(request, 'print', "false") == "true" start_print = self._get_argument(request, 'print', "false") == "true"
upload = self._get_upload_info(request, base_path) upload = self._get_upload_info(request, base_path)
# Verify that the operation can be done if attempting to upload a gcode # Verify that the operation can be done if attempting to upload a gcode
@ -327,7 +334,7 @@ class FileManager:
except self.server.error as e: except self.server.error as e:
if e.status_code == 403: if e.status_code == 403:
raise self.server.error( raise self.server.error(
403, "File is loaded, upload not permitted") "File is loaded, upload not permitted", 403)
else: else:
# Couldn't reach Klippy, so it should be safe # Couldn't reach Klippy, so it should be safe
# to permit the upload but not start # to permit the upload but not start
@ -368,11 +375,11 @@ class FileManager:
# fetch the upload from the request # fetch the upload from the request
if len(request.files) != 1: if len(request.files) != 1:
raise self.server.error( raise self.server.error(
400, "Bad Request, can only process a single file upload") "Bad Request, can only process a single file upload")
f_list = list(request.files.values())[0] f_list = list(request.files.values())[0]
if len(f_list) != 1: if len(f_list) != 1:
raise self.server.error( raise self.server.error(
400, "Bad Request, can only process a single file upload") "Bad Request, can only process a single file upload")
upload = f_list[0] upload = f_list[0]
if os.path.isfile(base_path): if os.path.isfile(base_path):
filename = os.path.basename(base_path) filename = os.path.basename(base_path)
@ -400,7 +407,7 @@ class FileManager:
with open(upload['full_path'], 'wb') as fh: with open(upload['full_path'], 'wb') as fh:
fh.write(upload['body']) fh.write(upload['body'])
except Exception: except Exception:
raise self.server.error(500, "Unable to save file") raise self.server.error("Unable to save file", 500)
def get_file_list(self, format_list=False, base='gcodes'): def get_file_list(self, format_list=False, base='gcodes'):
try: try: