diff --git a/moonraker/components/authorization.py b/moonraker/components/authorization.py index 00121ab..c0a2570 100644 --- a/moonraker/components/authorization.py +++ b/moonraker/components/authorization.py @@ -77,6 +77,8 @@ class Authorization: self.force_logins = config.getboolean('force_logins', False) self.default_source = config.get('default_source', "moonraker").lower() self.enable_api_key = config.getboolean('enable_api_key', True) + self.max_logins = config.getint("max_login_attempts", None, above=0) + self.failed_logins: Dict[IPAddr, int] = {} if self.default_source not in AUTH_SOURCES: raise config.error( "[authorization]: option 'default_source' - Invalid " @@ -279,7 +281,23 @@ class Authorization: return self.get_oneshot_token(ip, user_info) async def _handle_login(self, web_request: WebRequest) -> Dict[str, Any]: - return await self._login_jwt_user(web_request) + ip = web_request.get_ip_address() + if ip is not None and self.check_logins_maxed(ip): + raise HTTPError( + 401, "Unauthorized, Maximum Login Attempts Reached" + ) + try: + ret = await self._login_jwt_user(web_request) + except asyncio.CancelledError: + raise + except Exception: + if ip is not None: + failed = self.failed_logins.get(ip, 0) + self.failed_logins[ip] = failed + 1 + raise + if ip is not None: + self.failed_logins.pop(ip, None) + return ret async def _handle_logout(self, web_request: WebRequest) -> Dict[str, str]: user_info = web_request.get_current_user() @@ -586,6 +604,17 @@ class Authorization: raise self.server.error("Unknown user", 401) return user_info + def validate_jwt(self, token: str) -> Dict[str, Any]: + try: + user_info = self.decode_jwt(token) + except Exception as e: + if isinstance(e, self.server.error): + raise + raise self.server.error( + f"Failed to decode JWT: {e}", 401 + ) from e + return user_info + def _load_private_key(self, secret: str) -> Signer: try: key = Signer(bytes.fromhex(secret)) @@ -707,6 +736,16 @@ class Authorization: else: return None + def check_logins_maxed(self, ip_addr: Union[str, IPAddr]) -> bool: + if self.max_logins is None: + return False + if isinstance(ip_addr, str): + try: + ip_addr = ipaddress.ip_address(ip_addr) # type: ignore + except ValueError: + return False + return self.failed_logins.get(ip_addr, 0) >= self.max_logins + def check_authorized(self, request: HTTPServerRequest ) -> Optional[Dict[str, Any]]: @@ -744,7 +783,7 @@ class Authorization: # If the force_logins option is enabled and at least one # user is created this is an unauthorized request if self.force_logins and len(self.users) > 1: - raise HTTPError(401, "Unauthorized") + raise HTTPError(401, "Unauthorized, Force Logins Enabled") # Check if IP is trusted trusted_user = self._check_trusted_connection(ip) diff --git a/moonraker/websockets.py b/moonraker/websockets.py index bb34d7d..171d551 100644 --- a/moonraker/websockets.py +++ b/moonraker/websockets.py @@ -576,14 +576,7 @@ class BaseSocketClient(Subscribable): if auth is None: return if token is not None: - try: - user_info = auth.decode_jwt(token) - except self.server.error: - raise - except Exception as e: - raise self.server.error( - f"Failed to decode JWT: {e}", 401 - ) from e + user_info = auth.validate_jwt(token) self.user_info = user_info elif not auth.is_path_permitted(path): raise self.server.error("Unauthorized", 401)