proc_stats: add annotations

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Arksine 2021-05-14 19:30:39 -04:00
parent c977948c2c
commit 12246029ef
1 changed files with 41 additions and 21 deletions

View File

@ -3,6 +3,8 @@
# Copyright (C) 2021 Eric Callahan <arksine.code@gmail.com>
#
# This file may be distributed under the terms of the GNU GPLv3 license.
from __future__ import annotations
import time
import re
import os
@ -12,6 +14,20 @@ from collections import deque
from tornado.ioloop import IOLoop, PeriodicCallback
from tornado.locks import Lock
# Annotation imports
from typing import (
TYPE_CHECKING,
Deque,
Any,
Tuple,
Optional,
Dict,
)
if TYPE_CHECKING:
from confighelper import ConfigHelper
from websockets import WebRequest
from . import shell_command
VC_GEN_CMD_FILE = "/usr/bin/vcgencmd"
STATM_FILE_PATH = "/proc/self/smaps_rollup"
TEMPERATURE_PATH = "/sys/class/thermal/thermal_zone0/temp"
@ -31,16 +47,17 @@ THROTTLED_FLAGS = {
}
class ProcStats:
def __init__(self, config):
def __init__(self, config: ConfigHelper) -> None:
self.server = config.get_server()
self.ioloop = IOLoop.current()
self.stat_update_cb = PeriodicCallback(
self._handle_stat_update, STAT_UPDATE_TIME_MS)
self.vcgencmd = None
self._handle_stat_update, STAT_UPDATE_TIME_MS) # type: ignore
self.vcgencmd: Optional[shell_command.ShellCommand] = None
if os.path.exists(VC_GEN_CMD_FILE):
logging.info("Detected 'vcgencmd', throttle checking enabled")
shell_command = self.server.load_component(config, "shell_command")
self.vcgencmd = shell_command.build_shell_command(
shell_cmd: shell_command.ShellCommandFactory
shell_cmd = self.server.load_component(config, "shell_command")
self.vcgencmd = shell_cmd.build_shell_command(
"vcgencmd get_throttled")
self.server.register_notification("proc_stats:cpu_throttled")
else:
@ -53,16 +70,19 @@ class ProcStats:
self.server.register_event_handler(
"server:klippy_shutdown", self._handle_shutdown)
self.server.register_notification("proc_stats:proc_stat_update")
self.proc_stat_queue = deque(maxlen=30)
self.proc_stat_queue: Deque[Dict[str, Any]] = deque(maxlen=30)
self.last_update_time = time.time()
self.last_proc_time = time.process_time()
self.throttle_check_lock = Lock()
self.total_throttled = self.last_throttled = 0
self.update_sequence = 0
self.total_throttled: int = 0
self.last_throttled: int = 0
self.update_sequence: int = 0
self.stat_update_cb.start()
async def _handle_stat_request(self, web_request):
ts = None
async def _handle_stat_request(self,
web_request: WebRequest
) -> Dict[str, Any]:
ts: Optional[Dict[str, Any]] = None
if self.vcgencmd is not None:
ts = await self._check_throttled_state()
return {
@ -71,7 +91,7 @@ class ProcStats:
'cpu_temp': self._get_cpu_temperature()
}
async def _handle_shutdown(self):
async def _handle_shutdown(self) -> None:
msg = "\nMoonraker System Usage Statistics:"
for stats in self.proc_stat_queue:
msg += f"\n{self._format_stats(stats)}"
@ -81,7 +101,7 @@ class ProcStats:
ts = await self._check_throttled_state()
logging.info(f"Throttled Flags: {' '.join(ts['flags'])}")
async def _handle_stat_update(self):
async def _handle_stat_update(self) -> None:
update_time = time.time()
proc_time = time.process_time()
time_diff = update_time - self.last_update_time
@ -115,8 +135,9 @@ class ProcStats:
self.last_throttled = cur_throttled
self.total_throttled |= cur_throttled
async def _check_throttled_state(self):
async def _check_throttled_state(self) -> Dict[str, Any]:
async with self.throttle_check_lock:
assert self.vcgencmd is not None
try:
resp = await self.vcgencmd.run_with_response(
timeout=.5, log_complete=False)
@ -129,7 +150,7 @@ class ProcStats:
flags.append(desc)
return {'bits': ts, 'flags': flags}
def _get_memory_usage(self):
def _get_memory_usage(self) -> Tuple[Optional[int], Optional[str]]:
try:
mem_data = self.smaps.read_text()
rss_match = re.search(r"Rss:\s+(\d+)\s+(\w+)", mem_data)
@ -141,24 +162,23 @@ class ProcStats:
return None, None
return mem, units
def _get_cpu_temperature(self):
def _get_cpu_temperature(self) -> Optional[float]:
temp = None
if self.temp_file.exists():
try:
temp = int(self.temp_file.read_text().strip())
temp = temp / 1000.
res = int(self.temp_file.read_text().strip())
temp = res / 1000.
except Exception:
return None
return temp
def _format_stats(self, stats):
def _format_stats(self, stats: Dict[str, Any]) -> str:
return f"System Time: {stats['time']:2f}, " \
f"Usage: {stats['cpu_usage']}%, " \
f"Memory: {stats['memory']} {stats['mem_units']}"
def close(self):
def close(self) -> None:
self.stat_update_cb.stop()
def load_component(config):
def load_component(config: ConfigHelper) -> ProcStats:
return ProcStats(config)