authorization: refactor trusted ip checks to use ipaddress module

This adds support for both IPv4 and IPv6 authorization.

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Arksine 2020-07-28 10:54:48 -04:00
parent b4b1356bdd
commit 470f1af475
1 changed files with 42 additions and 14 deletions

View File

@ -7,6 +7,7 @@ import base64
import uuid
import os
import time
import ipaddress
import logging
import tornado
from tornado.ioloop import IOLoop, PeriodicCallback
@ -31,17 +32,34 @@ class Authorization:
def load_config(self, config):
self.auth_enabled = config.get("require_auth", self.auth_enabled)
self.trusted_ips = config.get("trusted_ips", self.trusted_ips)
self.trusted_ranges = config.get("trusted_ranges", self.trusted_ranges)
# reset trusted ips
self.trusted_ips = []
trusted_ips = config.get("trusted_ips", [])
for ip in trusted_ips:
try:
ip_addr = ipaddress.ip_address(ip)
except ValueError:
logging.info("IP address <%s> invalid" % (ip))
continue
self.trusted_ips.append(ip_addr)
# reset trusted ranges
self.trusted_ranges = []
trusted_ranges = config.get("trusted_ranges", [])
for ip_range in trusted_ranges:
try:
ip_net = ipaddress.ip_network(ip_range)
except ValueError:
logging.info("IP range <%s> invalid" % (ip_range))
continue
self.trusted_ranges.append(ip_net)
self._reset_trusted_connections()
t_clients = [str(ip) for ip in self.trusted_ips] + \
[str(rng) for rng in self.trusted_ranges]
logging.info(
"Authorization Configuration Loaded\n"
"Auth Enabled: %s\n"
"Trusted IPs:\n%s\n"
"Trusted IP Ranges:\n%s" %
(self.auth_enabled,
('\n').join(self.trusted_ips),
('\n').join(self.trusted_ranges)))
"Trusted Clients:\n%s" %
(self.auth_enabled, "\n".join(t_clients)))
def register_handlers(self, app):
# Register Authorization Endpoints
@ -81,14 +99,21 @@ class Authorization:
def _reset_trusted_connections(self):
valid_conns = {}
for ip, access_time in self.trusted_connections.items():
if ip in self.trusted_ips or \
ip[:ip.rfind('.')] in self.trusted_ranges:
if self._check_authorized_ip(ip):
valid_conns[ip] = access_time
else:
logging.info(
"Connection [%s] no longer trusted, removing" % (ip))
self.trusted_connections = valid_conns
def _check_authorized_ip(self, ip):
if ip in self.trusted_ips:
return True
for rng in self.trusted_ranges:
if ip in rng:
return True
return False
def _prune_conn_handler(self):
cur_time = time.time()
expired_conns = []
@ -118,11 +143,9 @@ class Authorization:
if ip in self.trusted_connections:
self.trusted_connections[ip] = time.time()
return True
elif ip in self.trusted_ips or \
ip[:ip.rfind('.')] in self.trusted_ranges:
elif self._check_authorized_ip(ip):
logging.info(
"Trusted Connection Detected, IP: %s"
% (ip))
"Trusted Connection Detected, IP: %s" % (ip))
self.trusted_connections[ip] = time.time()
return True
return False
@ -141,7 +164,12 @@ class Authorization:
return True
# Check if IP is trusted
ip = request.remote_ip
try:
ip = ipaddress.ip_address(request.remote_ip)
except ValueError:
logging.exception(
"Unable to Create IP Address %s" % (request.remote_ip))
ip = None
if self._check_trusted_connection(ip):
return True