proc_stats: replace PeriodicCallback

Signed-off-by: Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2021-12-09 07:37:34 -05:00
parent 5dadc2067d
commit df674ae476
1 changed files with 23 additions and 25 deletions

View File

@ -12,7 +12,6 @@ import os
import pathlib
import logging
from collections import deque
from tornado.ioloop import PeriodicCallback
# Annotation imports
from typing import (
@ -33,7 +32,7 @@ VC_GEN_CMD_FILE = "/usr/bin/vcgencmd"
STATM_FILE_PATH = "/proc/self/smaps_rollup"
NET_DEV_PATH = "/proc/net/dev"
TEMPERATURE_PATH = "/sys/class/thermal/thermal_zone0/temp"
STAT_UPDATE_TIME_MS = 1000
STAT_UPDATE_TIME = 1.
REPORT_QUEUE_SIZE = 30
THROTTLE_CHECK_INTERVAL = 10
WATCHDOG_REFRESH_TIME = 2.
@ -56,8 +55,8 @@ class ProcStats:
self.event_loop = self.server.get_event_loop()
self.machine: Machine = self.server.load_component(config, 'machine')
self.watchdog = Watchdog(self)
self.stat_update_cb = PeriodicCallback(
self._handle_stat_update, STAT_UPDATE_TIME_MS) # type: ignore
self.stat_update_timer = self.event_loop.register_timer(
self._handle_stat_update)
self.vcgencmd: Optional[shell_command.ShellCommand] = None
if os.path.exists(VC_GEN_CMD_FILE):
logging.info("Detected 'vcgencmd', throttle checking enabled")
@ -85,7 +84,7 @@ class ProcStats:
self.last_throttled: int = 0
self.update_sequence: int = 0
self.last_net_stats: Dict[str, Dict[str, Any]] = {}
self.stat_update_cb.start()
self.stat_update_timer.start()
self.watchdog.start()
async def _handle_stat_request(self,
@ -117,8 +116,8 @@ class ProcStats:
ts = await self._check_throttled_state()
logging.info(f"Throttled Flags: {' '.join(ts['flags'])}")
async def _handle_stat_update(self) -> None:
update_time = time.time()
async def _handle_stat_update(self, eventtime: float) -> float:
update_time = eventtime
proc_time = time.process_time()
time_diff = update_time - self.last_update_time
usage = round((proc_time - self.last_proc_time) / time_diff * 100, 2)
@ -135,7 +134,7 @@ class ProcStats:
net[dev]['bandwidth'] = bytes_sec
self.last_net_stats = net
result = {
'time': update_time,
'time': time.time(),
'cpu_usage': usage,
'memory': mem,
'mem_units': mem_units
@ -165,6 +164,7 @@ class ProcStats:
self.total_throttled |= cur_throttled
await self.machine.parse_network_interfaces()
await self.machine.update_service_status()
return eventtime + STAT_UPDATE_TIME
async def _check_throttled_state(self) -> Dict[str, Any]:
async with self.throttle_check_lock:
@ -240,39 +240,37 @@ class ProcStats:
logging.info(msg)
def close(self) -> None:
self.stat_update_cb.stop()
self.stat_update_timer.stop()
self.watchdog.stop()
class Watchdog:
def __init__(self, proc_stats: ProcStats) -> None:
self.evt_loop = asyncio.get_event_loop()
self.proc_stats = proc_stats
self.event_loop = proc_stats.event_loop
self.last_watch_time: float = 0.
self.wdcb_handle: Optional[asyncio.Handle] = None
self.watchdog_timer = self.event_loop.register_timer(
self._watchdog_callback
)
def _watchdog_callback(self) -> None:
cur_time = self.evt_loop.time()
time_diff = cur_time - self.last_watch_time
def _watchdog_callback(self, eventtime: float) -> float:
time_diff = eventtime - self.last_watch_time
if time_diff > REPORT_BLOCKED_TIME:
logging.info(
f"EVENT LOOP BLOCKED: {round(time_diff, 2)} seconds")
# delay the stat logging so we capture the CPU percentage after
# the next cycle
self.evt_loop.call_later(.2, self.proc_stats.log_last_stats, 5)
self.last_watch_time = cur_time
self.wdcb_handle = self.evt_loop.call_later(
WATCHDOG_REFRESH_TIME, self._watchdog_callback)
self.event_loop.delay_callback(
.2, self.proc_stats.log_last_stats, 5)
self.last_watch_time = eventtime
return eventtime + WATCHDOG_REFRESH_TIME
def start(self):
if self.wdcb_handle is None:
self.last_watch_time = self.evt_loop.time()
self.wdcb_handle = self.evt_loop.call_soon(
self._watchdog_callback)
if not self.watchdog_timer.is_running():
self.last_watch_time = self.event_loop.get_loop_time()
self.watchdog_timer.start()
def stop(self):
if self.wdcb_handle is not None:
self.wdcb_handle.cancel()
self.wdcb_handle = None
self.watchdog_timer.stop()
def load_component(config: ConfigHelper) -> ProcStats:
return ProcStats(config)