authorization: fix access.refresh regression

Allow expired JWTs for HTTP endpoints that do not require authentication.
This is technically an error by the client, as it should not provide
invalid JWTs for an endpoint, however Moonraker previously allowed
this as the token was not verified on unathenticated endpoints.

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2024-01-20 06:21:36 -05:00
parent 66de18f9b6
commit a23187b4af
No known key found for this signature in database
GPG Key ID: 5A1EB336DFB4C71B
1 changed files with 8 additions and 8 deletions

View File

@ -579,7 +579,7 @@ class Authorization:
return b".".join([jwt_msg, jwt_sig]).decode()
def decode_jwt(
self, token: str, token_type: str = "access"
self, token: str, token_type: str = "access", check_exp: bool = True
) -> Dict[str, Any]:
message, sig = token.rsplit('.', maxsplit=1)
enc_header, enc_payload = message.split('.')
@ -607,7 +607,7 @@ class Authorization:
raise self.server.error("Invalid JWT Issuer", 401)
if payload['aud'] != "Moonraker":
raise self.server.error("Invalid JWT Audience", 401)
if payload['exp'] < int(time.time()):
if check_exp and payload['exp'] < int(time.time()):
raise self.server.error("JWT Expired", 401)
# get user
@ -687,23 +687,23 @@ class Authorization:
self.oneshot_tokens[token] = (ip_addr, user, hdl)
return token
def _check_json_web_token(self,
request: HTTPServerRequest
) -> Optional[Dict[str, Any]]:
def _check_json_web_token(
self, request: HTTPServerRequest, required: bool = True
) -> Optional[Dict[str, Any]]:
auth_token: Optional[str] = request.headers.get("Authorization")
if auth_token is None:
auth_token = request.headers.get("X-Access-Token")
if auth_token is None:
qtoken = request.query_arguments.get('access_token', None)
if qtoken is not None:
auth_token = qtoken[-1].decode()
auth_token = qtoken[-1].decode(errors="ignore")
elif auth_token.startswith("Bearer "):
auth_token = auth_token[7:]
else:
return None
if auth_token:
try:
return self.decode_jwt(auth_token)
return self.decode_jwt(auth_token, check_exp=required)
except Exception:
logging.exception(f"JWT Decode Error {auth_token}")
raise HTTPError(401, "JWT Decode Error")
@ -768,7 +768,7 @@ class Authorization:
return None
# Check JSON Web Token
jwt_user = self._check_json_web_token(request)
jwt_user = self._check_json_web_token(request, auth_required)
if jwt_user is not None:
return jwt_user