authorization: Add configparser support

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Arksine 2020-08-05 20:49:56 -04:00
parent de462b1d0f
commit e5cb27f5b7
1 changed files with 32 additions and 42 deletions

View File

@ -11,48 +11,44 @@ import ipaddress
import logging import logging
import tornado import tornado
from tornado.ioloop import IOLoop, PeriodicCallback from tornado.ioloop import IOLoop, PeriodicCallback
from utils import ServerError
TOKEN_TIMEOUT = 5 TOKEN_TIMEOUT = 5
CONNECTION_TIMEOUT = 3600 CONNECTION_TIMEOUT = 3600
PRUNE_CHECK_TIME = 300 * 1000 PRUNE_CHECK_TIME = 300 * 1000
class Authorization: class Authorization:
def __init__(self, api_key_file): def __init__(self, config):
self.api_key_loc = os.path.expanduser(api_key_file) api_key_file = config.get('api_key_file', "~/.moonraker_api_key")
self.api_key_file = os.path.expanduser(api_key_file)
self.api_key = self._read_api_key() self.api_key = self._read_api_key()
self.auth_enabled = True self.auth_enabled = config.getboolean('enabled', True)
self.trusted_ips = []
self.trusted_ranges = []
self.trusted_connections = {} self.trusted_connections = {}
self.access_tokens = {} self.access_tokens = {}
self.prune_handler = PeriodicCallback( # Get Trusted Clients
self._prune_conn_handler, PRUNE_CHECK_TIME)
self.prune_handler.start()
def load_config(self, config):
self.auth_enabled = config.get("require_auth", self.auth_enabled)
# reset trusted ips
self.trusted_ips = [] self.trusted_ips = []
trusted_ips = config.get("trusted_ips", [])
for ip in trusted_ips:
try:
ip_addr = ipaddress.ip_address(ip)
except ValueError:
logging.info("IP address <%s> invalid" % (ip))
continue
self.trusted_ips.append(ip_addr)
# reset trusted ranges
self.trusted_ranges = [] self.trusted_ranges = []
trusted_ranges = config.get("trusted_ranges", []) trusted_clients = config.get('trusted_clients', "")
for ip_range in trusted_ranges: trusted_clients = [c.strip() for c in trusted_clients.split('\n')
if c.strip()]
for ip in trusted_clients:
# Check IP address
try: try:
ip_net = ipaddress.ip_network(ip_range) tc = ipaddress.ip_address(ip)
except ValueError: except ValueError:
logging.info("IP range <%s> invalid" % (ip_range)) tc = None
continue if tc is None:
self.trusted_ranges.append(ip_net) # Check ip network
self._reset_trusted_connections() try:
tc = ipaddress.ip_network(ip)
except ValueError:
raise ServerError(
"Invalid option in trusted_clients: %s" % (ip))
self.trusted_ranges.append(tc)
else:
self.trusted_ips.append(tc)
t_clients = [str(ip) for ip in self.trusted_ips] + \ t_clients = [str(ip) for ip in self.trusted_ips] + \
[str(rng) for rng in self.trusted_ranges] [str(rng) for rng in self.trusted_ranges]
logging.info( logging.info(
@ -61,6 +57,10 @@ class Authorization:
"Trusted Clients:\n%s" % "Trusted Clients:\n%s" %
(self.auth_enabled, "\n".join(t_clients))) (self.auth_enabled, "\n".join(t_clients)))
self.prune_handler = PeriodicCallback(
self._prune_conn_handler, PRUNE_CHECK_TIME)
self.prune_handler.start()
def register_handlers(self, app): def register_handlers(self, app):
# Register Authorization Endpoints # Register Authorization Endpoints
app.register_local_handler( app.register_local_handler(
@ -79,33 +79,23 @@ class Authorization:
return self.get_access_token() return self.get_access_token()
def _read_api_key(self): def _read_api_key(self):
if os.path.exists(self.api_key_loc): if os.path.exists(self.api_key_file):
with open(self.api_key_loc, 'r') as f: with open(self.api_key_file, 'r') as f:
api_key = f.read() api_key = f.read()
return api_key return api_key
# API Key file doesn't exist. Generate # API Key file doesn't exist. Generate
# a new api key and create the file. # a new api key and create the file.
logging.info( logging.info(
"No API Key file found, creating new one at:\n%s" "No API Key file found, creating new one at:\n%s"
% (self.api_key_loc)) % (self.api_key_file))
return self._create_api_key() return self._create_api_key()
def _create_api_key(self): def _create_api_key(self):
api_key = uuid.uuid4().hex api_key = uuid.uuid4().hex
with open(self.api_key_loc, 'w') as f: with open(self.api_key_file, 'w') as f:
f.write(api_key) f.write(api_key)
return api_key return api_key
def _reset_trusted_connections(self):
valid_conns = {}
for ip, access_time in self.trusted_connections.items():
if self._check_authorized_ip(ip):
valid_conns[ip] = access_time
else:
logging.info(
"Connection [%s] no longer trusted, removing" % (ip))
self.trusted_connections = valid_conns
def _check_authorized_ip(self, ip): def _check_authorized_ip(self, ip):
if ip in self.trusted_ips: if ip in self.trusted_ips:
return True return True