authorization: limit failed login attempts

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2022-11-26 06:16:44 -05:00
parent 9d6719ed31
commit eca4c7e438
No known key found for this signature in database
GPG Key ID: 5A1EB336DFB4C71B
2 changed files with 42 additions and 10 deletions

View File

@ -77,6 +77,8 @@ class Authorization:
self.force_logins = config.getboolean('force_logins', False)
self.default_source = config.get('default_source', "moonraker").lower()
self.enable_api_key = config.getboolean('enable_api_key', True)
self.max_logins = config.getint("max_login_attempts", None, above=0)
self.failed_logins: Dict[IPAddr, int] = {}
if self.default_source not in AUTH_SOURCES:
raise config.error(
"[authorization]: option 'default_source' - Invalid "
@ -279,7 +281,23 @@ class Authorization:
return self.get_oneshot_token(ip, user_info)
async def _handle_login(self, web_request: WebRequest) -> Dict[str, Any]:
return await self._login_jwt_user(web_request)
ip = web_request.get_ip_address()
if ip is not None and self.check_logins_maxed(ip):
raise HTTPError(
401, "Unauthorized, Maximum Login Attempts Reached"
)
try:
ret = await self._login_jwt_user(web_request)
except asyncio.CancelledError:
raise
except Exception:
if ip is not None:
failed = self.failed_logins.get(ip, 0)
self.failed_logins[ip] = failed + 1
raise
if ip is not None:
self.failed_logins.pop(ip, None)
return ret
async def _handle_logout(self, web_request: WebRequest) -> Dict[str, str]:
user_info = web_request.get_current_user()
@ -586,6 +604,17 @@ class Authorization:
raise self.server.error("Unknown user", 401)
return user_info
def validate_jwt(self, token: str) -> Dict[str, Any]:
try:
user_info = self.decode_jwt(token)
except Exception as e:
if isinstance(e, self.server.error):
raise
raise self.server.error(
f"Failed to decode JWT: {e}", 401
) from e
return user_info
def _load_private_key(self, secret: str) -> Signer:
try:
key = Signer(bytes.fromhex(secret))
@ -707,6 +736,16 @@ class Authorization:
else:
return None
def check_logins_maxed(self, ip_addr: Union[str, IPAddr]) -> bool:
if self.max_logins is None:
return False
if isinstance(ip_addr, str):
try:
ip_addr = ipaddress.ip_address(ip_addr) # type: ignore
except ValueError:
return False
return self.failed_logins.get(ip_addr, 0) >= self.max_logins
def check_authorized(self,
request: HTTPServerRequest
) -> Optional[Dict[str, Any]]:
@ -744,7 +783,7 @@ class Authorization:
# If the force_logins option is enabled and at least one
# user is created this is an unauthorized request
if self.force_logins and len(self.users) > 1:
raise HTTPError(401, "Unauthorized")
raise HTTPError(401, "Unauthorized, Force Logins Enabled")
# Check if IP is trusted
trusted_user = self._check_trusted_connection(ip)

View File

@ -576,14 +576,7 @@ class BaseSocketClient(Subscribable):
if auth is None:
return
if token is not None:
try:
user_info = auth.decode_jwt(token)
except self.server.error:
raise
except Exception as e:
raise self.server.error(
f"Failed to decode JWT: {e}", 401
) from e
user_info = auth.validate_jwt(token)
self.user_info = user_info
elif not auth.is_path_permitted(path):
raise self.server.error("Unauthorized", 401)