authorization: use ES256 algorithm for JWT signatures

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Arksine 2021-05-23 09:08:49 -04:00
parent ce7f659a32
commit 8a3b885eca
1 changed files with 60 additions and 16 deletions

View File

@ -19,6 +19,13 @@ import logging
from jose import jwt
from tornado.ioloop import IOLoop, PeriodicCallback
from tornado.web import HTTPError
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.serialization import (
PrivateFormat,
Encoding,
NoEncryption,
load_pem_private_key,
)
# Annotation imports
from typing import (
@ -30,6 +37,7 @@ from typing import (
Union,
Dict,
List,
cast,
)
if TYPE_CHECKING:
from confighelper import ConfigHelper
@ -42,6 +50,8 @@ if TYPE_CHECKING:
IPNetwork = Union[ipaddress.IPv4Network, ipaddress.IPv6Network]
OneshotToken = Tuple[IPAddr, Optional[Dict[str, Any]], object]
ECPrivateKey = ec.EllipticCurvePrivateKeyWithSerialization
ONESHOT_TIMEOUT = 5
TRUSTED_CONNECTION_TIMEOUT = 3600
PRUNE_CHECK_TIME = 300 * 1000
@ -52,7 +62,7 @@ TRUSTED_USER = "_TRUSTED_USER_"
RESERVED_USERS = [API_USER, TRUSTED_USER]
JWT_EXP_TIME = datetime.timedelta(hours=1)
JWT_HEADER = {
'alg': "HS256",
'alg': "ES256",
'typ': "JWT"
}
@ -74,11 +84,25 @@ class Authorization:
}
else:
self.api_key = api_user['api_key']
host_name, port = self.server.get_host_info()
self.issuer = f"http://{host_name}:{port}"
self.public_keys: Dict[str, ec.EllipticCurvePublicKey] = {}
for username, user_info in list(self.users.items()):
if username == API_USER:
continue
if 'jwt_secret' in user_info:
try:
priv_key = self._load_private_key(user_info['jwt_secret'])
except self.server.error:
logging.info("Invalid key found for user, removing")
user_info.pop('jwt_secret', None)
self.users[username] = user_info
continue
self.public_keys[username] = priv_key.public_key()
self.trusted_users: Dict[IPAddr, Any] = {}
self.oneshot_tokens: Dict[str, OneshotToken] = {}
self.permitted_paths: Set[str] = set()
host_name, port = self.server.get_host_info()
self.issuer = f"http://{host_name}:{port}"
# Get allowed cors domains
self.cors_domains: List[str] = []
@ -189,6 +213,7 @@ class Authorization:
raise self.server.error(
f"Invalid log out request for user {username}")
self.users.pop(f"{username}.jwt_secret", None)
self.public_keys.pop(username, None)
return {
"username": username,
"action": "user_logged_out"
@ -200,8 +225,11 @@ class Authorization:
refresh_token: str = web_request.get_str('refresh_token')
user_info = self._decode_jwt(refresh_token, token_type="refresh")
username: str = user_info['username']
secret = user_info['jwt_secret']
token = self._generate_jwt(username, secret)
secret: Optional[str] = user_info.get('jwt_secret', None)
if secret is None:
raise self.server.error("User not logged in", 401)
private_key = self._load_private_key(secret)
token = self._generate_jwt(username, private_key)
return {
'username': username,
'token': token,
@ -306,14 +334,19 @@ class Authorization:
action = "user_logged_in"
if hashed_pass != user_info['password']:
raise self.server.error("Invalid Password")
jwt_secret: Optional[str] = user_info.get('jwt_secret', None)
if jwt_secret is None:
jwt_secret = secrets.token_bytes(32).hex()
user_info['jwt_secret'] = jwt_secret
jwt_secret_hex: Optional[str] = user_info.get('jwt_secret', None)
if jwt_secret_hex is None:
private_key = ec.generate_private_key(ec.SECP256R1())
serialized: bytes = cast(ECPrivateKey, private_key).private_bytes(
Encoding.PEM, PrivateFormat.PKCS8, NoEncryption())
user_info['jwt_secret'] = serialized.hex()
self.users[username] = user_info
token = self._generate_jwt(username, jwt_secret)
else:
private_key = self._load_private_key(jwt_secret_hex)
self.public_keys[username] = private_key.public_key()
token = self._generate_jwt(username, private_key)
refresh_token = self._generate_jwt(
username, jwt_secret, token_type="refresh",
username, private_key, token_type="refresh",
exp_time=datetime.timedelta(days=self.login_timeout))
if create:
IOLoop.current().call_later(
@ -342,6 +375,7 @@ class Authorization:
if user_info is None:
raise self.server.error(f"No registered user: {username}")
del self.users[username]
self.public_keys.pop(username, None)
IOLoop.current().call_later(
.005, self.server.send_event,
"authorization:user_deleted",
@ -353,7 +387,7 @@ class Authorization:
def _generate_jwt(self,
username: str,
secret: str,
secret: ec.EllipticCurvePrivateKey,
token_type: str = "access",
exp_time: datetime.timedelta = JWT_EXP_TIME
) -> str:
@ -366,7 +400,8 @@ class Authorization:
'username': username,
'token_type': token_type
}
return jwt.encode(payload, secret, headers=JWT_HEADER)
return jwt.encode(payload, secret, algorithm="ES256",
headers=JWT_HEADER)
def _decode_jwt(self,
token: str,
@ -386,14 +421,23 @@ class Authorization:
if user_info is None:
raise self.server.error(
f"Invalid JWT, no registered user {username}", 401)
jwt_secret: Optional[str] = user_info.get('jwt_secret', None)
if jwt_secret is None:
public_key = self.public_keys.get(username, None)
if public_key is None:
raise self.server.error(
f"Invalid JWT, user {username} not logged in", 401)
jwt.decode(token, jwt_secret, algorithms=['HS256'],
jwt.decode(token, [public_key], algorithms=['ES256'],
audience="Moonraker")
return user_info
def _load_private_key(self, secret: str) -> ec.EllipticCurvePrivateKey:
try:
key = load_pem_private_key(bytes.fromhex(secret), None)
except Exception:
raise self.server.error(
"Error decoding private key, user data may"
" be corrupt", 500) from None
return key
def _prune_conn_handler(self) -> None:
cur_time = time.time()
for ip, user_info in list(self.trusted_users.items()):