diff --git a/moonraker/components/authorization.py b/moonraker/components/authorization.py index 752e69e..05fb527 100644 --- a/moonraker/components/authorization.py +++ b/moonraker/components/authorization.py @@ -19,6 +19,13 @@ import logging from jose import jwt from tornado.ioloop import IOLoop, PeriodicCallback from tornado.web import HTTPError +from cryptography.hazmat.primitives.asymmetric import ec +from cryptography.hazmat.primitives.serialization import ( + PrivateFormat, + Encoding, + NoEncryption, + load_pem_private_key, +) # Annotation imports from typing import ( @@ -30,6 +37,7 @@ from typing import ( Union, Dict, List, + cast, ) if TYPE_CHECKING: from confighelper import ConfigHelper @@ -42,6 +50,8 @@ if TYPE_CHECKING: IPNetwork = Union[ipaddress.IPv4Network, ipaddress.IPv6Network] OneshotToken = Tuple[IPAddr, Optional[Dict[str, Any]], object] +ECPrivateKey = ec.EllipticCurvePrivateKeyWithSerialization + ONESHOT_TIMEOUT = 5 TRUSTED_CONNECTION_TIMEOUT = 3600 PRUNE_CHECK_TIME = 300 * 1000 @@ -52,7 +62,7 @@ TRUSTED_USER = "_TRUSTED_USER_" RESERVED_USERS = [API_USER, TRUSTED_USER] JWT_EXP_TIME = datetime.timedelta(hours=1) JWT_HEADER = { - 'alg': "HS256", + 'alg': "ES256", 'typ': "JWT" } @@ -74,11 +84,25 @@ class Authorization: } else: self.api_key = api_user['api_key'] + host_name, port = self.server.get_host_info() + self.issuer = f"http://{host_name}:{port}" + self.public_keys: Dict[str, ec.EllipticCurvePublicKey] = {} + for username, user_info in list(self.users.items()): + if username == API_USER: + continue + if 'jwt_secret' in user_info: + try: + priv_key = self._load_private_key(user_info['jwt_secret']) + except self.server.error: + logging.info("Invalid key found for user, removing") + user_info.pop('jwt_secret', None) + self.users[username] = user_info + continue + self.public_keys[username] = priv_key.public_key() + self.trusted_users: Dict[IPAddr, Any] = {} self.oneshot_tokens: Dict[str, OneshotToken] = {} self.permitted_paths: Set[str] = set() - host_name, port = self.server.get_host_info() - self.issuer = f"http://{host_name}:{port}" # Get allowed cors domains self.cors_domains: List[str] = [] @@ -189,6 +213,7 @@ class Authorization: raise self.server.error( f"Invalid log out request for user {username}") self.users.pop(f"{username}.jwt_secret", None) + self.public_keys.pop(username, None) return { "username": username, "action": "user_logged_out" @@ -200,8 +225,11 @@ class Authorization: refresh_token: str = web_request.get_str('refresh_token') user_info = self._decode_jwt(refresh_token, token_type="refresh") username: str = user_info['username'] - secret = user_info['jwt_secret'] - token = self._generate_jwt(username, secret) + secret: Optional[str] = user_info.get('jwt_secret', None) + if secret is None: + raise self.server.error("User not logged in", 401) + private_key = self._load_private_key(secret) + token = self._generate_jwt(username, private_key) return { 'username': username, 'token': token, @@ -306,14 +334,19 @@ class Authorization: action = "user_logged_in" if hashed_pass != user_info['password']: raise self.server.error("Invalid Password") - jwt_secret: Optional[str] = user_info.get('jwt_secret', None) - if jwt_secret is None: - jwt_secret = secrets.token_bytes(32).hex() - user_info['jwt_secret'] = jwt_secret + jwt_secret_hex: Optional[str] = user_info.get('jwt_secret', None) + if jwt_secret_hex is None: + private_key = ec.generate_private_key(ec.SECP256R1()) + serialized: bytes = cast(ECPrivateKey, private_key).private_bytes( + Encoding.PEM, PrivateFormat.PKCS8, NoEncryption()) + user_info['jwt_secret'] = serialized.hex() self.users[username] = user_info - token = self._generate_jwt(username, jwt_secret) + else: + private_key = self._load_private_key(jwt_secret_hex) + self.public_keys[username] = private_key.public_key() + token = self._generate_jwt(username, private_key) refresh_token = self._generate_jwt( - username, jwt_secret, token_type="refresh", + username, private_key, token_type="refresh", exp_time=datetime.timedelta(days=self.login_timeout)) if create: IOLoop.current().call_later( @@ -342,6 +375,7 @@ class Authorization: if user_info is None: raise self.server.error(f"No registered user: {username}") del self.users[username] + self.public_keys.pop(username, None) IOLoop.current().call_later( .005, self.server.send_event, "authorization:user_deleted", @@ -353,7 +387,7 @@ class Authorization: def _generate_jwt(self, username: str, - secret: str, + secret: ec.EllipticCurvePrivateKey, token_type: str = "access", exp_time: datetime.timedelta = JWT_EXP_TIME ) -> str: @@ -366,7 +400,8 @@ class Authorization: 'username': username, 'token_type': token_type } - return jwt.encode(payload, secret, headers=JWT_HEADER) + return jwt.encode(payload, secret, algorithm="ES256", + headers=JWT_HEADER) def _decode_jwt(self, token: str, @@ -386,14 +421,23 @@ class Authorization: if user_info is None: raise self.server.error( f"Invalid JWT, no registered user {username}", 401) - jwt_secret: Optional[str] = user_info.get('jwt_secret', None) - if jwt_secret is None: + public_key = self.public_keys.get(username, None) + if public_key is None: raise self.server.error( f"Invalid JWT, user {username} not logged in", 401) - jwt.decode(token, jwt_secret, algorithms=['HS256'], + jwt.decode(token, [public_key], algorithms=['ES256'], audience="Moonraker") return user_info + def _load_private_key(self, secret: str) -> ec.EllipticCurvePrivateKey: + try: + key = load_pem_private_key(bytes.fromhex(secret), None) + except Exception: + raise self.server.error( + "Error decoding private key, user data may" + " be corrupt", 500) from None + return key + def _prune_conn_handler(self) -> None: cur_time = time.time() for ip, user_info in list(self.trusted_users.items()):