authorization: support fqdns as "trusted_clients"

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Arksine 2021-04-29 19:16:57 -04:00
parent 497423ddc2
commit 46e1d7b66b
1 changed files with 21 additions and 12 deletions

View File

@ -14,6 +14,7 @@ import datetime
import ipaddress import ipaddress
import json import json
import re import re
import socket
import logging import logging
from tornado.ioloop import IOLoop, PeriodicCallback from tornado.ioloop import IOLoop, PeriodicCallback
from tornado.web import HTTPError from tornado.web import HTTPError
@ -80,29 +81,34 @@ class Authorization:
# Get Trusted Clients # Get Trusted Clients
self.trusted_ips = [] self.trusted_ips = []
self.trusted_ranges = [] self.trusted_ranges = []
self.trusted_domains = []
trusted_clients = config.get('trusted_clients', "") trusted_clients = config.get('trusted_clients', "")
trusted_clients = [c.strip() for c in trusted_clients.split('\n') trusted_clients = [c.strip() for c in trusted_clients.split('\n')
if c.strip()] if c.strip()]
for ip in trusted_clients: for val in trusted_clients:
# Check IP address # Check IP address
try: try:
tc = ipaddress.ip_address(ip) tc = ipaddress.ip_address(val)
except ValueError: except ValueError:
tc = None pass
if tc is None:
# Check ip network
try:
tc = ipaddress.ip_network(ip)
except ValueError:
raise ServerError(
f"Invalid option in trusted_clients: {ip}")
self.trusted_ranges.append(tc)
else: else:
self.trusted_ips.append(tc) self.trusted_ips.append(tc)
continue
# Check ip network
try:
tc = ipaddress.ip_network(val)
except ValueError:
pass
else:
self.trusted_ranges.append(tc)
continue
# Check hostname
self.trusted_domains.append(val.lower())
t_clients = "\n".join( t_clients = "\n".join(
[str(ip) for ip in self.trusted_ips] + [str(ip) for ip in self.trusted_ips] +
[str(rng) for rng in self.trusted_ranges]) [str(rng) for rng in self.trusted_ranges] +
self.trusted_domains)
c_domains = "\n".join(self.cors_domains) c_domains = "\n".join(self.cors_domains)
logging.info( logging.info(
@ -376,6 +382,9 @@ class Authorization:
for rng in self.trusted_ranges: for rng in self.trusted_ranges:
if ip in rng: if ip in rng:
return True return True
fqdn = socket.getfqdn(str(ip)).lower()
if fqdn in self.trusted_domains:
return True
return False return False
def _check_trusted_connection(self, ip): def _check_trusted_connection(self, ip):