From 470f1af475feb0985f7104860f0d88f374afea31 Mon Sep 17 00:00:00 2001 From: Arksine Date: Tue, 28 Jul 2020 10:54:48 -0400 Subject: [PATCH] authorization: refactor trusted ip checks to use ipaddress module This adds support for both IPv4 and IPv6 authorization. Signed-off-by: Eric Callahan --- moonraker/authorization.py | 56 ++++++++++++++++++++++++++++---------- 1 file changed, 42 insertions(+), 14 deletions(-) diff --git a/moonraker/authorization.py b/moonraker/authorization.py index 0007f28..8d73dc1 100644 --- a/moonraker/authorization.py +++ b/moonraker/authorization.py @@ -7,6 +7,7 @@ import base64 import uuid import os import time +import ipaddress import logging import tornado from tornado.ioloop import IOLoop, PeriodicCallback @@ -31,17 +32,34 @@ class Authorization: def load_config(self, config): self.auth_enabled = config.get("require_auth", self.auth_enabled) - self.trusted_ips = config.get("trusted_ips", self.trusted_ips) - self.trusted_ranges = config.get("trusted_ranges", self.trusted_ranges) + # reset trusted ips + self.trusted_ips = [] + trusted_ips = config.get("trusted_ips", []) + for ip in trusted_ips: + try: + ip_addr = ipaddress.ip_address(ip) + except ValueError: + logging.info("IP address <%s> invalid" % (ip)) + continue + self.trusted_ips.append(ip_addr) + # reset trusted ranges + self.trusted_ranges = [] + trusted_ranges = config.get("trusted_ranges", []) + for ip_range in trusted_ranges: + try: + ip_net = ipaddress.ip_network(ip_range) + except ValueError: + logging.info("IP range <%s> invalid" % (ip_range)) + continue + self.trusted_ranges.append(ip_net) self._reset_trusted_connections() + t_clients = [str(ip) for ip in self.trusted_ips] + \ + [str(rng) for rng in self.trusted_ranges] logging.info( "Authorization Configuration Loaded\n" "Auth Enabled: %s\n" - "Trusted IPs:\n%s\n" - "Trusted IP Ranges:\n%s" % - (self.auth_enabled, - ('\n').join(self.trusted_ips), - ('\n').join(self.trusted_ranges))) + "Trusted Clients:\n%s" % + (self.auth_enabled, "\n".join(t_clients))) def register_handlers(self, app): # Register Authorization Endpoints @@ -81,14 +99,21 @@ class Authorization: def _reset_trusted_connections(self): valid_conns = {} for ip, access_time in self.trusted_connections.items(): - if ip in self.trusted_ips or \ - ip[:ip.rfind('.')] in self.trusted_ranges: + if self._check_authorized_ip(ip): valid_conns[ip] = access_time else: logging.info( "Connection [%s] no longer trusted, removing" % (ip)) self.trusted_connections = valid_conns + def _check_authorized_ip(self, ip): + if ip in self.trusted_ips: + return True + for rng in self.trusted_ranges: + if ip in rng: + return True + return False + def _prune_conn_handler(self): cur_time = time.time() expired_conns = [] @@ -118,11 +143,9 @@ class Authorization: if ip in self.trusted_connections: self.trusted_connections[ip] = time.time() return True - elif ip in self.trusted_ips or \ - ip[:ip.rfind('.')] in self.trusted_ranges: + elif self._check_authorized_ip(ip): logging.info( - "Trusted Connection Detected, IP: %s" - % (ip)) + "Trusted Connection Detected, IP: %s" % (ip)) self.trusted_connections[ip] = time.time() return True return False @@ -141,7 +164,12 @@ class Authorization: return True # Check if IP is trusted - ip = request.remote_ip + try: + ip = ipaddress.ip_address(request.remote_ip) + except ValueError: + logging.exception( + "Unable to Create IP Address %s" % (request.remote_ip)) + ip = None if self._check_trusted_connection(ip): return True