announcements: initial implementation

This adds support for announcements retreived via RSS feed from
the "moonlight" GitHub repo.  Announcements may also be procedurally
generated internally.

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2022-03-02 17:57:07 -05:00
parent 4a6732f13a
commit b1cc766576
No known key found for this signature in database
GPG Key ID: 7027245FBBDDF59A
1 changed files with 388 additions and 0 deletions

View File

@ -0,0 +1,388 @@
# Support for Moonraker/Klipper/Client announcements
#
# Copyright (C) 2022 Eric Callahan <arksine.code@gmail.com>
#
# This file may be distributed under the terms of the GNU GPLv3 license
from __future__ import annotations
import datetime
from importlib.resources import path
import pathlib
import asyncio
import logging
import email.utils
import xml.etree.ElementTree as etree
from typing import (
TYPE_CHECKING,
Awaitable,
List,
Dict,
Any,
Optional,
Union
)
if TYPE_CHECKING:
from confighelper import ConfigHelper
from websockets import WebRequest
from http_client import HttpClient
from components.database import MoonrakerDatabase
MOONLIGHT_URL = "https://arksine.github.io/moonlight"
UPDATE_CHECK_TIME = 1800.
etree.register_namespace("moonlight", MOONLIGHT_URL)
class Announcements:
def __init__(self, config: ConfigHelper) -> None:
self.server = config.get_server()
self.entry_mgr = EntryManager(config)
self.eventloop = self.server.get_event_loop()
self.update_timer = self.eventloop.register_timer(
self._handle_update_timer
)
self.request_lock = asyncio.Lock()
dev_path = config.get("dev_path", None)
if dev_path is not None:
res_path = pathlib.Path(dev_path).expanduser().resolve()
if not res_path.is_dir():
raise config.error(
f"[announcements] - No folder at path '{dev_path}'")
logging.info(f"Annoucments dev path enabled - {res_path}")
self.subscriptions: Dict[str, RssFeed] = {
"moonraker": RssFeed(config, "moonraker", self.entry_mgr),
"klipper": RssFeed(config, "klipper", self.entry_mgr)
}
sub_list: List[str] = config.getlist("subscriptions", [])
for sub in sub_list:
sub = sub.lower()
if sub in self.subscriptions:
continue
self.subscriptions[sub] = RssFeed(config, sub, self.entry_mgr)
self.server.register_endpoint(
"/server/announcements/list", ["GET"],
self._list_announcements
)
self.server.register_endpoint(
"/server/announcements/dismiss", ["POST"],
self._handle_dismiss_request
)
self.server.register_endpoint(
"/server/announcements/update", ["POST"],
self._handle_update_request
)
self.server.register_notification(
"announcements:dismissed", "announcement_dismissed"
)
self.server.register_notification(
"announcements:entries_updated", "announcement_update"
)
async def component_init(self) -> None:
async with self.request_lock:
await self.entry_mgr.initialize()
for sub in self.subscriptions.values():
await sub.initialize()
self.update_timer.start()
async def _handle_update_timer(self, eventtime: float) -> float:
changed = False
entries: List[Dict[str, Any]] = []
async with self.request_lock:
for sub in self.subscriptions.values():
ret = await sub.update_entries()
changed |= ret
if changed:
entries = await self.entry_mgr.list_entries()
self.server.send_event(
"announcements:entries_updated", {"entries": entries}
)
return eventtime + UPDATE_CHECK_TIME
async def _handle_dismiss_request(
self, web_request: WebRequest
) -> Dict[str, Any]:
async with self.request_lock:
entry_id: str = web_request.get_str("entry_id")
await self.entry_mgr.dismiss_entry(entry_id)
return {
"entry_id": entry_id
}
async def _list_announcements(
self, web_request: WebRequest
) -> Dict[str, Any]:
async with self.request_lock:
incl_dsm = web_request.get_boolean("include_dismissed", True)
entries = await self.entry_mgr.list_entries(incl_dsm)
return {
"entries": entries
}
async def _handle_update_request(
self, web_request: WebRequest
) -> Dict[str, Any]:
subs: Optional[Union[str, List[str]]]
subs = web_request.get("subscriptions", None)
if isinstance(subs, str):
subs = [sub.strip() for sub in subs.split(",") if sub.strip()]
elif subs is None:
subs = list(self.subscriptions.keys())
for sub in subs:
if sub not in self.subscriptions:
raise self.server.error(f"No subscription for {sub}")
async with self.request_lock:
changed = False
for sub in subs:
ret = await self.subscriptions[sub].update_entries()
changed |= ret
entries = await self.entry_mgr.list_entries()
if changed:
self.eventloop.delay_callback(
.05, self.server.send_event,
"announcements:entries_updated",
{"entries": entries})
return {
"entries": entries,
"modified": changed
}
def add_internal_announcement(
self, title: str, desc: str, url: str, priority: str, feed: str
) -> Dict[str, Any]:
date = datetime.datetime.utcnow()
entry_id: str = f"{feed}/{date.isoformat(timespec='seconds')}"
entry = {
"entry_id": entry_id,
"url": url,
"title": title,
"description": desc,
"priority": priority,
"date": date.timestamp(),
"dismissed": False,
"source": "internal",
"feed": feed
}
self.entry_mgr.add_entry(entry)
return entry
async def remove_internal_announcment(self, entry_id: str) -> None:
ret = await self.entry_mgr.remove_entry(entry_id)
if ret is not None:
entries = await self.entry_mgr.list_entries()
self.server.send_event(
"announcements:entries_updated", {"entries": entries}
)
class EntryManager:
def __init__(self, config: ConfigHelper) -> None:
self.server = config.get_server()
database: MoonrakerDatabase
database = self.server.lookup_component("database")
database.register_local_namespace("announcements")
self.announce_db = database.wrap_namespace("announcements")
self.entry_id_map: Dict[str, str] = {}
self.next_key = 0
async def initialize(self) -> None:
last_key = ""
for key, entry in await self.announce_db.items():
last_key = key
aid = entry["entry_id"]
self.entry_id_map[aid] = key
if last_key:
self.next_key = int(last_key, 16) + 1
async def list_entries(
self, include_dismissed: bool = True
) -> List[Dict[str, Any]]:
vals: List[Dict[str, Any]] = await self.announce_db.values()
vals = sorted(vals, key=lambda x: x["date"], reverse=True)
if include_dismissed:
return vals
return [val for val in vals if not val["dismissed"]]
def has_entry(self, entry_id: str) -> bool:
return entry_id in self.entry_id_map
def add_entry(self, entry: Dict[str, Any]) -> Awaitable[None]:
aid = entry["entry_id"]
key = f"{self.next_key:06X}"
self.next_key += 1
self.entry_id_map[aid] = key
return self.announce_db.insert(key, entry)
def remove_entry(self, entry_id: str) -> Awaitable[Any]:
key = self.entry_id_map.pop(entry_id, None)
if key is None:
raise self.server.error(f"No key matching entry id: {entry_id}")
return self.announce_db.pop(key, None)
async def dismiss_entry(self, entry_id: str) -> None:
key = self.entry_id_map.get(entry_id)
if key is None:
raise self.server.error(f"No key matching entry id: {entry_id}")
is_dismissed = await self.announce_db[f"{key}.dismissed"]
if is_dismissed:
return
await self.announce_db.insert(f"{key}.dismissed", True)
eventloop = self.server.get_event_loop()
eventloop.delay_callback(
.05, self.server.send_event, "announcements:dismissed",
{"entry_id": entry_id}
)
def prune_by_prefix(self, prefix: str, valid_ids: List[str]) -> bool:
del_keys: List[str] = []
for entry_id in list(self.entry_id_map.keys()):
if not entry_id.startswith(prefix) or entry_id in valid_ids:
continue
# Entry is no longer valid and should be removed
key = self.entry_id_map.pop(entry_id, None)
if key is not None:
del_keys.append(key)
if del_keys:
self.announce_db.delete_batch(del_keys)
return True
return False
class RssFeed:
def __init__(
self, config: ConfigHelper, name: str, entry_mgr: EntryManager
) -> None:
self.server = config.get_server()
self.name = name
self.entry_mgr = entry_mgr
self.client: HttpClient = self.server.lookup_component("http_client")
database: MoonrakerDatabase
database = self.server.lookup_component("database")
self.moon_db = database.wrap_namespace("moonraker")
self.xml_file = f"{self.name}.xml"
self.asset_url = f"{MOONLIGHT_URL}/assets/{self.xml_file}"
self.warned: bool = False
self.last_modified: int = 0
self.dev_xml_path: Optional[pathlib.Path] = None
dev_mode = config.getboolean("dev_mode", False)
if dev_mode:
res_dir = pathlib.Path(__file__).parent.parent.parent.resolve()
res_path = res_dir.joinpath(".devel/announcement_xml")
self.dev_xml_path = res_path.joinpath(self.xml_file)
async def initialize(self) -> None:
etag: Optional[str] = await self.moon_db.get(
f"announcements.{self.name}.etag", None)
if etag is not None:
self.client.register_cached_url(self.asset_url, etag)
async def update_entries(self) -> bool:
if self.dev_xml_path is None:
xml_data = await self._fetch_moonlight()
else:
xml_data = await self._fetch_local_folder()
if not xml_data:
return False
return self._parse_xml(xml_data)
async def _fetch_moonlight(self) -> str:
headers = {"Accept": "application/xml"}
resp = await self.client.get(self.asset_url, headers)
if resp.has_error():
msg = f"Failed to update subscription '{self.name}': {resp.error}"
logging.info(msg)
if not self.warned:
self.warned = True
self.server.add_warning(msg)
return ""
if resp.status_code == 304:
logging.debug(f"Content at {self.xml_file} not modified")
return ""
# update etag
if "etag" in resp.headers:
etag = resp.headers["etag"]
self.moon_db[f"announcements.{self.name}.etag"] = etag
return resp.text
async def _fetch_local_folder(self) -> str:
if self.dev_xml_path is None:
return ""
if not self.dev_xml_path.is_file():
msg = f"No file at path {self.dev_xml_path}"
if not self.warned:
self.warned = True
self.server.add_warning(msg)
return ""
mtime = self.dev_xml_path.stat().st_mtime_ns
if mtime <= self.last_modified:
logging.debug(f"Content at {self.xml_file} not modified")
return ""
try:
eventloop = self.server.get_event_loop()
xml_data = await eventloop.run_in_thread(
self.dev_xml_path.read_text)
except Exception:
msg = f"Unable read xml file {self.dev_xml_path}"
if not self.warned:
self.warned = True
self.server.add_warning(msg)
return ""
self.last_modified = mtime
return xml_data
def _parse_xml(self, xml_data: str) -> bool:
root = etree.fromstring(xml_data)
channel = root.find("channel")
if channel is None:
root_str = etree.tostring(root, encoding="unicode")
logging.debug(f"Feed {self.name}: no channel found\n{root_str}")
return False
# extract prefix
prefix = channel.findtext("title", "").lower()
if not prefix:
logging.info(f"Feed {self.name}: No prefix found")
items = channel.findall("item")
valid_ids: List[str] = []
changed: bool = False
for item in items:
guid = item.findtext("guid")
if guid is None:
item_str = etree.tostring(item, encoding="unicode")
logging.debug(f"Feed {self.name}: Invalid Item\n{item_str}")
continue
if not prefix:
# fall back to first guid prefix
prefix = "/".join(guid.split("/")[:2])
elif not guid.startswith(prefix):
logging.debug(
f"Feed {self.name}: Guid {guid} is not "
f"prefixed with {prefix}")
valid_ids.append(guid)
if self.entry_mgr.has_entry(guid):
continue
try:
rfc_date = item.findtext("pubDate", "")
dt = email.utils.parsedate_to_datetime(rfc_date)
except Exception:
dt = datetime.datetime.utcnow()
entry: Dict[str, Any] = {
"entry_id": guid,
"url": item.findtext("link"),
"title": item.findtext("title"),
"description": item.findtext("description"),
"priority": item.findtext("category"),
"date": dt.timestamp(),
"dismissed": False,
"source": "moonlight",
"feed": self.name.capitalize()
}
changed = True
self.entry_mgr.add_entry(entry)
logging.debug(f"Feed {self.name}: found entries {valid_ids}")
if prefix:
pruned = self.entry_mgr.prune_by_prefix(prefix, valid_ids)
changed = changed or pruned
return changed
def load_component(config: ConfigHelper) -> Announcements:
return Announcements(config)