file_manager: update delete_file method

This method is now the primary means of deleting files, as it includes checks to make sure that the delete is allowed.

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Arksine 2020-09-03 20:24:29 -04:00
parent 059f5d6a73
commit 7078d5c980
3 changed files with 31 additions and 30 deletions

View File

@ -187,7 +187,7 @@ class MoonrakerApp:
logging.info(msg)
def register_static_file_handler(self, pattern, file_path,
can_delete=False, op_check_cb=None):
can_delete=False):
if pattern[0] != "/":
pattern = "/server/files/" + pattern
if os.path.isfile(file_path):
@ -205,7 +205,7 @@ class MoonrakerApp:
methods.append('DELETE')
params = {
'server': self.server, 'auth': self.auth,
'path': file_path, 'methods': methods, 'op_check_cb': op_check_cb}
'path': file_path, 'methods': methods}
self.mutable_router.add_handler(pattern, FileRequestHandler, params)
def register_upload_handler(self, pattern):
@ -322,11 +322,10 @@ class LocalRequestHandler(AuthorizedRequestHandler):
class FileRequestHandler(AuthorizedFileHandler):
def initialize(self, server, auth, path, methods,
op_check_cb=None, default_filename=None):
default_filename=None):
super(FileRequestHandler, self).initialize(
server, auth, path, default_filename)
self.methods = methods
self.op_check_cb = op_check_cb
def set_extra_headers(self, path):
# The call below shold never return an empty string,
@ -340,26 +339,17 @@ class FileRequestHandler(AuthorizedFileHandler):
if 'DELETE' not in self.methods:
raise tornado.web.HTTPError(405)
# Use the same method Tornado uses to validate the path
self.path = self.parse_url_path(path)
del path # make sure we don't refer to path instead of self.path again
absolute_path = self.get_absolute_path(self.root, self.path)
self.absolute_path = self.validate_absolute_path(
self.root, absolute_path)
if self.op_check_cb is not None:
try:
await self.op_check_cb(self.absolute_path)
except ServerError as e:
if e.status_code == 403:
raise tornado.web.HTTPError(
403, "File is loaded, DELETE not permitted")
os.remove(self.absolute_path)
base = self.request.path.lstrip("/").split("/")[2]
filename = self.path.lstrip("/")
path = self.request.path.lstrip("/").split("/", 3)[-1]
filename = path.split("/", 1)[-1]
file_manager = self.server.lookup_plugin('file_manager')
file_manager.notify_filelist_changed('delete_file', filename, base)
try:
file_manager.delete_file(path)
except self.server.error as e:
if e.status_code == 403:
raise tornado.web.HTTPError(
403, "File is loaded, DELETE not permitted")
else:
raise tornado.web.HTTPError(400, str(e))
self.finish({'result': filename})
class FileUploadHandler(AuthorizedRequestHandler):

View File

@ -22,6 +22,7 @@ class FileManager:
self.server = config.get_server()
self.file_paths = {}
self.file_lists = {}
self.mutable_paths = set()
self.gcode_metadata = MetadataStorage(self.server)
self.fixed_path_args = {}
@ -70,9 +71,6 @@ class FileManager:
self.server.register_static_file_handler("klippy.log", log_path)
def register_directory(self, base, path, can_delete=False):
op_check_cb = None
if base == 'gcodes':
op_check_cb = self._handle_operation_check
if path is None:
return False
home = os.path.expanduser('~')
@ -86,8 +84,10 @@ class FileManager:
return False
if path != self.file_paths.get(base, ""):
self.file_paths[base] = path
if can_delete:
self.mutable_paths.add(base)
self.server.register_static_file_handler(
base, path, can_delete=can_delete, op_check_cb=op_check_cb)
base, path, can_delete=can_delete)
try:
self._update_file_list(base=base)
except Exception:
@ -441,16 +441,27 @@ class FileManager:
return simple_list
return flist
def delete_file(self, path):
async def delete_file(self, path):
parts = path.split("/", 1)
root = parts[0]
filename = parts[1]
if root not in self.file_paths or len(parts) != 2:
raise self.server.error(f"Invalid file path: {path}")
if root not in self.mutable_paths:
raise self.server.error(
f"Path not mutable, Cannot delete file: {path}")
root_path = self.file_paths[root]
full_path = os.path.join(root_path, parts[1])
full_path = os.path.join(root_path, filename)
if not os.path.isfile(full_path):
raise self.server.error(f"Invalid file path: {path}")
if root == "gcodes":
try:
await self._handle_operation_check(full_path)
except self.server.error as e:
if e.status_code == 403:
raise
os.remove(full_path)
self.notify_filelist_changed('delete_file', filename, root)
def notify_filelist_changed(self, action, fname, base, source_item={}):
self._update_file_list(base, do_notify=True)

View File

@ -666,7 +666,7 @@ class PanelDue:
if not path.startswith("gcodes/"):
path = "gcodes/" + path
self.file_manager.delete_file(path)
await self.file_manager.delete_file(path)
async def _run_paneldue_M36(self, arg_p=None):
response = {}