proc_stats: read system files in another thread

This should prevent the event loop from getting blocked by a system call.

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2021-07-08 19:58:18 -04:00
parent e20c12619b
commit fb9206aa8f
1 changed files with 62 additions and 9 deletions

View File

@ -5,12 +5,14 @@
# This file may be distributed under the terms of the GNU GPLv3 license. # This file may be distributed under the terms of the GNU GPLv3 license.
from __future__ import annotations from __future__ import annotations
import asyncio
import time import time
import re import re
import os import os
import pathlib import pathlib
import logging import logging
from collections import deque from collections import deque
from concurrent.futures import ThreadPoolExecutor
from tornado.ioloop import IOLoop, PeriodicCallback from tornado.ioloop import IOLoop, PeriodicCallback
from tornado.locks import Lock from tornado.locks import Lock
@ -34,7 +36,8 @@ TEMPERATURE_PATH = "/sys/class/thermal/thermal_zone0/temp"
STAT_UPDATE_TIME_MS = 1000 STAT_UPDATE_TIME_MS = 1000
REPORT_QUEUE_SIZE = 30 REPORT_QUEUE_SIZE = 30
THROTTLE_CHECK_INTERVAL = 10 THROTTLE_CHECK_INTERVAL = 10
REPORT_BLOCKED_TIME = 5. WATCHDOG_REFRESH_TIME = 2.
REPORT_BLOCKED_TIME = 4.
THROTTLED_FLAGS = { THROTTLED_FLAGS = {
1: "Under-Voltage Detected", 1: "Under-Voltage Detected",
@ -51,6 +54,7 @@ class ProcStats:
def __init__(self, config: ConfigHelper) -> None: def __init__(self, config: ConfigHelper) -> None:
self.server = config.get_server() self.server = config.get_server()
self.ioloop = IOLoop.current() self.ioloop = IOLoop.current()
self.watchdog = Watchdog(self)
self.stat_update_cb = PeriodicCallback( self.stat_update_cb = PeriodicCallback(
self._handle_stat_update, STAT_UPDATE_TIME_MS) # type: ignore self._handle_stat_update, STAT_UPDATE_TIME_MS) # type: ignore
self.vcgencmd: Optional[shell_command.ShellCommand] = None self.vcgencmd: Optional[shell_command.ShellCommand] = None
@ -79,6 +83,7 @@ class ProcStats:
self.last_throttled: int = 0 self.last_throttled: int = 0
self.update_sequence: int = 0 self.update_sequence: int = 0
self.stat_update_cb.start() self.stat_update_cb.start()
self.watchdog.start()
async def _handle_stat_request(self, async def _handle_stat_request(self,
web_request: WebRequest web_request: WebRequest
@ -86,17 +91,23 @@ class ProcStats:
ts: Optional[Dict[str, Any]] = None ts: Optional[Dict[str, Any]] = None
if self.vcgencmd is not None: if self.vcgencmd is not None:
ts = await self._check_throttled_state() ts = await self._check_throttled_state()
with ThreadPoolExecutor(max_workers=1) as tpe:
cpu_temp = await self.ioloop.run_in_executor(
tpe, self._get_cpu_temperature)
return { return {
'moonraker_stats': list(self.proc_stat_queue), 'moonraker_stats': list(self.proc_stat_queue),
'throttled_state': ts, 'throttled_state': ts,
'cpu_temp': self._get_cpu_temperature() 'cpu_temp': cpu_temp
} }
async def _handle_shutdown(self) -> None: async def _handle_shutdown(self) -> None:
msg = "\nMoonraker System Usage Statistics:" msg = "\nMoonraker System Usage Statistics:"
for stats in self.proc_stat_queue: for stats in self.proc_stat_queue:
msg += f"\n{self._format_stats(stats)}" msg += f"\n{self._format_stats(stats)}"
msg += f"\nCPU Temperature: {self._get_cpu_temperature()}" with ThreadPoolExecutor(max_workers=1) as tpe:
cpu_temp = await self.ioloop.run_in_executor(
tpe, self._get_cpu_temperature)
msg += f"\nCPU Temperature: {cpu_temp}"
logging.info(msg) logging.info(msg)
if self.vcgencmd is not None: if self.vcgencmd is not None:
ts = await self._check_throttled_state() ts = await self._check_throttled_state()
@ -107,12 +118,9 @@ class ProcStats:
proc_time = time.process_time() proc_time = time.process_time()
time_diff = update_time - self.last_update_time time_diff = update_time - self.last_update_time
usage = round((proc_time - self.last_proc_time) / time_diff * 100, 2) usage = round((proc_time - self.last_proc_time) / time_diff * 100, 2)
if time_diff > REPORT_BLOCKED_TIME: with ThreadPoolExecutor(max_workers=1) as tpe:
logging.info( cpu_temp, mem, mem_units = await self.ioloop.run_in_executor(
f"EVENT LOOP BLOCKED: {round(time_diff, 2)} seconds, " tpe, self._read_system_files)
f"Moonraker Process Usage: {usage}%")
mem, mem_units = self._get_memory_usage()
cpu_temp = self._get_cpu_temperature()
result = { result = {
"time": update_time, "time": update_time,
"cpu_usage": usage, "cpu_usage": usage,
@ -155,6 +163,11 @@ class ProcStats:
flags.append(desc) flags.append(desc)
return {'bits': ts, 'flags': flags} return {'bits': ts, 'flags': flags}
def _read_system_files(self) -> Tuple:
mem, units = self._get_memory_usage()
temp = self._get_cpu_temperature()
return temp, mem, units
def _get_memory_usage(self) -> Tuple[Optional[int], Optional[str]]: def _get_memory_usage(self) -> Tuple[Optional[int], Optional[str]]:
try: try:
mem_data = self.smaps.read_text() mem_data = self.smaps.read_text()
@ -182,8 +195,48 @@ class ProcStats:
f"Usage: {stats['cpu_usage']}%, " \ f"Usage: {stats['cpu_usage']}%, " \
f"Memory: {stats['memory']} {stats['mem_units']}" f"Memory: {stats['memory']} {stats['mem_units']}"
def log_last_stats(self, count: int = 1):
count = min(len(self.proc_stat_queue), count)
msg = ""
for stats in self.proc_stat_queue[-count:]:
msg += f"\n{self._format_stats(stats)}"
logging.info(msg)
def close(self) -> None: def close(self) -> None:
self.stat_update_cb.stop() self.stat_update_cb.stop()
self.watchdog.stop()
class Watchdog:
def __init__(self, proc_stats: ProcStats) -> None:
self.evt_loop = asyncio.get_event_loop()
self.proc_stats = proc_stats
self.last_watch_time: float = 0.
self.wdcb_handle: Optional[asyncio.Handle] = None
def _watchdog_callback(self) -> None:
cur_time = self.evt_loop.time()
time_diff = cur_time - self.last_watch_time
if time_diff > REPORT_BLOCKED_TIME:
logging.info(
f"EVENT LOOP BLOCKED: {round(time_diff, 2)} seconds")
# delay the stat logging so we capture the CPU percentage after
# the next cycle
self.wdcb_handle = self.evt_loop.call_later(
.2, self.proc_stats.log_last_stats(5))
self.last_watch_time = cur_time
self.wdcb_handle = self.evt_loop.call_later(
WATCHDOG_REFRESH_TIME, self._watchdog_callback)
def start(self):
if self.wdcb_handle is None:
self.last_watch_time = self.evt_loop.time()
self.wdcb_handle = self.evt_loop.call_soon(
self._watchdog_callback)
def stop(self):
if self.wdcb_handle is not None:
self.wdcb_handle.cancel()
self.wdcb_handle = None
def load_component(config: ConfigHelper) -> ProcStats: def load_component(config: ConfigHelper) -> ProcStats:
return ProcStats(config) return ProcStats(config)