From b0fda218f1e775f91a75d3879a4dd1f150650b5c Mon Sep 17 00:00:00 2001 From: Arksine Date: Tue, 29 Dec 2020 17:42:27 -0500 Subject: [PATCH] update_manager: manage GitHub rate limits Track rate limit attributes and reject requests when the user IP has reached their limit. Use conditional API requests to reduce the number of requests that count against the limit. Signed-off-by: Eric Callahan --- moonraker/plugins/update_manager.py | 148 +++++++++++++++++++++++----- 1 file changed, 125 insertions(+), 23 deletions(-) diff --git a/moonraker/plugins/update_manager.py b/moonraker/plugins/update_manager.py index f88a066..849fed6 100644 --- a/moonraker/plugins/update_manager.py +++ b/moonraker/plugins/update_manager.py @@ -12,9 +12,10 @@ import shutil import zipfile import io import asyncio +import time import tornado.gen from tornado.ioloop import IOLoop -from tornado.httpclient import AsyncHTTPClient, HTTPRequest +from tornado.httpclient import AsyncHTTPClient from tornado.locks import Event MOONRAKER_PATH = os.path.normpath(os.path.join( @@ -67,6 +68,12 @@ class UpdateManager: self.updaters['client'] = ClientUpdater( self, client_repo, client_path) + # GitHub API Rate Limit Tracking + self.gh_rate_limit = None + self.gh_limit_remaining = None + self.gh_limit_reset_time = None + self.gh_init_evt = Event() + self.server.register_endpoint( "/machine/update/moonraker", ["POST"], self._handle_update_request) @@ -86,6 +93,8 @@ class UpdateManager: # Register Ready Event self.server.register_event_handler( "server:klippy_ready", self._set_klipper_repo) + # Initialize GitHub API Rate Limits + IOLoop.current().spawn_callback(self._init_api_rate_limit) async def _set_klipper_repo(self): kinfo = self.server.get_klippy_info() @@ -126,6 +135,9 @@ class UpdateManager: vinfo[name] = updater.get_update_status() return { 'version_info': vinfo, + 'github_rate_limit': self.gh_rate_limit, + 'github_requests_remaining': self.gh_limit_remaining, + 'github_limit_reset_time': self.gh_limit_reset_time, 'busy': self.current_update is not None} async def execute_cmd(self, cmd, timeout=10., notify=False, retries=1): @@ -144,29 +156,113 @@ class UpdateManager: scmd = shell_command.build_shell_command(cmd, None) return await scmd.run_with_response(timeout) - async def github_request(self, url, is_download=False): - cto = rto = 5. - content_type = "application/vnd.github.v3+json" - if is_download: - content_type = "application/zip" - rto = 120. - request = HTTPRequest(url, headers={"Accept": content_type}, - connect_timeout=cto, request_timeout=rto) - retries = 5 - while True: + async def _init_api_rate_limit(self): + url = "https://api.github.com/rate_limit" + while 1: try: - resp = await self.http_client.fetch(request) - except Exception as e: + resp = await self.github_api_request(url, is_init=True) + core = resp['resources']['core'] + self.gh_rate_limit = core['limit'] + self.gh_limit_remaining = core['remaining'] + self.gh_limit_reset_time = core['reset'] + except Exception: + logging.exception("Error Initializing GitHub API Rate Limit") + await tornado.gen.sleep(30.) + else: + reset_time = time.ctime(self.gh_limit_reset_time) + logging.info( + "GitHub API Rate Limit Initialized\n" + f"Rate Limit: {self.gh_rate_limit}\n" + f"Rate Limit Remaining: {self.gh_limit_remaining}\n" + f"Rate Limit Reset Time: {reset_time}, " + f"Seconds Since Epoch: {self.gh_limit_reset_time}") + break + self.gh_init_evt.set() + + async def github_api_request(self, url, etag=None, is_init=False): + if not is_init: + timeout = time.time() + 30. + try: + await self.gh_init_evt.wait(timeout) + except Exception: + raise self.server.error( + "Timeout while waiting for GitHub " + "API Rate Limit initialization") + if self.gh_limit_remaining == 0: + curtime = time.time() + if curtime < self.gh_limit_reset_time: + raise self.server.error( + f"GitHub Rate Limit Reached\nRequest: {url}\n" + f"Limit Reset Time: {time.ctime(self.gh_limit_remaining)}") + headers = {"Accept": "application/vnd.github.v3+json"} + if etag is not None: + headers['If-None-Match'] = etag + retries = 5 + while retries: + try: + resp = await self.http_client.fetch( + url, headers=headers, connect_timeout=5., + request_timeout=5., raise_error=False) + except Exception: + retries -= 1 + msg = f"Error Processing GitHub API request: {url}" + if not retries: + raise self.server.error(msg) + logging.exception(msg) + await tornado.gen.sleep(1.) + continue + etag = resp.headers.get('etag', None) + if etag is not None: + if etag[:2] == "W/": + etag = etag[2:] + logging.info( + "GitHub API Request Processed\n" + f"URL: {url}\n" + f"Response Code: {resp.code}\n" + f"Response Reason: {resp.reason}\n" + f"ETag: {etag}") + if resp.code == 403: + raise self.server.error( + f"Forbidden GitHub Request: {resp.reason}") + elif resp.code == 304: + logging.info(f"Github Request not Modified: {url}") + return None + if resp.code != 200: retries -= 1 if not retries: - raise - logging.exception(f"Github request error, retrying: {e}") + raise self.server.error( + f"Github Request failed: {resp.code} {resp.reason}") + logging.info( + f"Github request error, {retries} retries remaining") + await tornado.gen.sleep(1.) continue - if is_download: - return resp.body + # Update rate limit on return success + if 'X-Ratelimit-Limit' in resp.headers and not is_init: + self.gh_rate_limit = int(resp.headers['X-Ratelimit-Limit']) + self.gh_limit_remaining = int( + resp.headers['X-Ratelimit-Remaining']) + self.gh_limit_reset_time = float( + resp.headers['X-Ratelimit-Reset']) decoded = json.loads(resp.body) + decoded['etag'] = etag return decoded + async def http_download_request(self, url): + retries = 5 + while retries: + try: + resp = await self.http_client.fetch( + url, headers={"Accept": "application/zip"}, + connect_timeout=5., request_timeout=120.) + except Exception: + retries -= 1 + logging.exception("Error Processing Download") + if not retries: + raise + await tornado.gen.sleep(1.) + continue + return resp.body + def notify_update_response(self, resp, is_complete=False): resp = resp.strip() if isinstance(resp, bytes): @@ -504,13 +600,14 @@ class PackageUpdater: class ClientUpdater: def __init__(self, umgr, repo, path): + self.umgr = umgr self.server = umgr.server - self.github_request = umgr.github_request self.notify_update_response = umgr.notify_update_response self.repo = repo.strip().strip("/") self.name = self.repo.split("/")[-1] self.path = path self.version = self.remote_version = self.dl_url = "?" + self.etag = None self.init_evt = Event() self._get_local_version() logging.info(f"\nInitializing Client Updater: '{self.name}'," @@ -539,16 +636,21 @@ class ClientUpdater: # Remote state url = f"https://api.github.com/repos/{self.repo}/releases/latest" try: - result = await self.github_request(url) + result = await self.umgr.github_api_request(url, etag=self.etag) except Exception: logging.exception(f"Client {self.repo}: Github Request Error") result = {} + if result is None: + # No change, update not necessary + return None + self.etag = result.get('etag', None) self.remote_version = result.get('name', "?") release_assets = result.get('assets', [{}])[0] self.dl_url = release_assets.get('browser_download_url', "?") logging.info( - f"Github client Info Received: {self.name}, " - f"version: {self.remote_version} " + f"Github client Info Received:\nRepo: {self.name}\n" + f"Local Version: {self.version}\n" + f"Remote Version: {self.remote_version}\n" f"url: {self.dl_url}") self.init_evt.set() @@ -568,7 +670,7 @@ class ClientUpdater: shutil.rmtree(self.path) os.mkdir(self.path) self.notify_update_response(f"Downloading Client: {self.name}") - archive = await self.github_request(self.dl_url, is_download=True) + archive = await self.umgr.http_download_request(self.dl_url) with zipfile.ZipFile(io.BytesIO(archive)) as zf: zf.extractall(self.path) self.version = self.remote_version @@ -576,7 +678,7 @@ class ClientUpdater: if not os.path.exists(version_path): with open(version_path, "w") as f: f.write(self.version) - self.notify_update_response(f"Client Updated Finished: {self.name}", + self.notify_update_response(f"Client Update Finished: {self.name}", is_complete=True) def get_update_status(self):