app: use datapath for ssl certificate storage

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2022-08-09 06:10:55 -04:00 committed by Eric Callahan
parent 1327602ec3
commit 4df6aba6c0
1 changed files with 27 additions and 13 deletions

View File

@ -176,9 +176,9 @@ class MoonrakerApp:
self.max_upload_size *= 1024 * 1024
# SSL config
self.cert_path: str = self._get_path_option(
self.cert_path: pathlib.Path = self._get_path_option(
config, 'ssl_certificate_path')
self.key_path: str = self._get_path_option(
self.key_path: pathlib.Path = self._get_path_option(
config, 'ssl_key_path')
# Set Up Websocket and Authorization Managers
@ -231,16 +231,30 @@ class MoonrakerApp:
self.server.register_component("internal_transport",
self.internal_transport)
def _get_path_option(self, config: ConfigHelper, option: str) -> str:
path: Optional[str] = config.get(option, None)
if path is None:
return ""
expanded = os.path.abspath(os.path.expanduser(path))
if not os.path.exists(expanded):
def _get_path_option(
self, config: ConfigHelper, option: str
) -> pathlib.Path:
path: Optional[str] = config.get(option, None, deprecate=True)
app_args = self.server.get_app_args()
data_path = app_args["data_path"]
alias = app_args["alias"]
certs_path = pathlib.Path(data_path).joinpath("certs")
if not certs_path.exists():
try:
certs_path.mkdir()
except Exception:
pass
ext = "key" if "key" in option else "cert"
item = certs_path.joinpath(f"{alias}.{ext}")
if item.exists() or path is None:
return item
item = pathlib.Path(path).expanduser().resolve()
if not item.exists():
raise self.server.error(
f"Invalid path for option '{option}', "
f"{path} does not exist")
return expanded
f"{path} does not exist"
)
return item
def listen(self, host: str, port: int, ssl_port: int) -> None:
if host.lower() == "all":
@ -248,7 +262,7 @@ class MoonrakerApp:
self.http_server = self.app.listen(
port, address=host, max_body_size=MAX_BODY_SIZE,
xheaders=True)
if os.path.exists(self.cert_path) and os.path.exists(self.key_path):
if self.cert_path.exists() and self.key_path.exists():
logging.info(f"Starting secure server on port {ssl_port}")
ssl_ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_ctx.load_cert_chain(self.cert_path, self.key_path)
@ -629,7 +643,7 @@ class DynamicRequestHandler(AuthorizedRequestHandler):
assert callable(self.callback)
return await self.callback(
WebRequest(self.request.path, args, self.request.method,
conn=conn, ip_addr=self.request.remote_ip,
conn=conn, ip_addr=self.request.remote_ip or "",
user=self.current_user))
async def _do_remote_request(self,
@ -640,7 +654,7 @@ class DynamicRequestHandler(AuthorizedRequestHandler):
klippy: Klippy = self.server.lookup_component("klippy_connection")
return await klippy.request(
WebRequest(self.callback, args, conn=conn,
ip_addr=self.request.remote_ip,
ip_addr=self.request.remote_ip or "",
user=self.current_user))
async def _process_http_request(self) -> None: