klippy_apis: allow subscription requests from transports

The default behavior of the subscribe API shares all subscription
requests.  API Transports require their own subscription.  Add
a method to facilitate this request.

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2023-11-26 12:23:39 -05:00
parent bfeb096f31
commit eed759e111
No known key found for this signature in database
GPG Key ID: 5A1EB336DFB4C71B
3 changed files with 36 additions and 30 deletions

View File

@ -100,10 +100,11 @@ class KlippyAPI(APITransport):
self,
method: str,
params: Dict[str, Any],
default: Any = Sentinel.MISSING
default: Any = Sentinel.MISSING,
transport: Optional[APITransport] = None
) -> Any:
try:
req = WebRequest(method, params, transport=self)
req = WebRequest(method, params, transport=transport or self)
result = await self.klippy.request(req)
except self.server.error:
if default is Sentinel.MISSING:
@ -227,6 +228,7 @@ class KlippyAPI(APITransport):
callback: Optional[SubCallback] = None,
default: Union[Sentinel, _T] = Sentinel.MISSING
) -> Union[_T, Dict[str, Any]]:
# The host transport shares subscriptions amongst all components
for obj, items in objects.items():
if obj in self.host_subscription:
prev = self.host_subscription[obj]
@ -237,9 +239,8 @@ class KlippyAPI(APITransport):
self.host_subscription[obj] = uitems
else:
self.host_subscription[obj] = items
params = {'objects': dict(self.host_subscription)}
result = await self._send_klippy_request(
SUBSCRIPTION_ENDPOINT, params, default)
params = {"objects": dict(self.host_subscription)}
result = await self._send_klippy_request(SUBSCRIPTION_ENDPOINT, params, default)
if isinstance(result, dict) and "status" in result:
if callback is not None:
self.subscription_callbacks.append(callback)
@ -248,6 +249,22 @@ class KlippyAPI(APITransport):
return default
raise self.server.error("Invalid response received from Klippy", 500)
async def subscribe_from_transport(
self,
objects: Mapping[str, Optional[List[str]]],
transport: APITransport,
default: Union[Sentinel, _T] = Sentinel.MISSING,
) -> Union[_T, Dict[str, Any]]:
params = {"objects": dict(objects)}
result = await self._send_klippy_request(
SUBSCRIPTION_ENDPOINT, params, default, transport
)
if isinstance(result, dict) and "status" in result:
return result["status"]
if default is not Sentinel.MISSING:
return default
raise self.server.error("Invalid response received from Klippy", 500)
async def subscribe_gcode_output(self) -> str:
template = {'response_template':
{'method': "process_gcode_response"}}

View File

@ -37,8 +37,8 @@ from typing import (
)
if TYPE_CHECKING:
from ..confighelper import ConfigHelper
from ..klippy_connection import KlippyConnection as Klippy
from ..common import JsonRPC, APIDefinition
from .klippy_apis import KlippyAPI
FlexCallback = Callable[[bytes], Optional[Coroutine]]
RPCCallback = Callable[..., Coroutine]
@ -251,7 +251,6 @@ class MQTTClient(APITransport):
def __init__(self, config: ConfigHelper) -> None:
self.server = config.get_server()
self.eventloop = self.server.get_event_loop()
self.klippy: Klippy = self.server.lookup_component("klippy_connection")
self.address: str = config.get('address')
self.port: int = config.getint('port', 1883)
user = config.gettemplate('username', None)
@ -321,13 +320,13 @@ class MQTTClient(APITransport):
self.klipper_status_topic = f"{self.instance_name}/klipper/status"
self.klipper_state_prefix = f"{self.instance_name}/klipper/state"
self.moonraker_status_topic = f"{self.instance_name}/moonraker/status"
status_cfg: Dict[str, Any] = config.getdict("status_objects", {},
allow_empty_fields=True)
self.status_objs: Dict[str, Any] = {}
status_cfg: Dict[str, str] = config.getdict(
"status_objects", {}, allow_empty_fields=True
)
self.status_objs: Dict[str, Optional[List[str]]] = {}
for key, val in status_cfg.items():
if val is not None:
self.status_objs[key] = [v.strip() for v in val.split(',')
if v.strip()]
self.status_objs[key] = [v.strip() for v in val.split(',') if v.strip()]
else:
self.status_objs[key] = None
if status_cfg:
@ -368,13 +367,10 @@ class MQTTClient(APITransport):
async def _handle_klippy_started(self, state: KlippyState) -> None:
if self.status_objs:
args = {'objects': self.status_objs}
try:
await self.klippy.request(
WebRequest("objects/subscribe", args, transport=self)
)
except self.server.error:
pass
kapi: KlippyAPI = self.server.lookup_component("klippy_apis")
await kapi.subscribe_from_transport(
self.status_objs, self, default=None,
)
def _on_message(self,
client: str,

View File

@ -17,7 +17,7 @@ import logging.handlers
import tempfile
from queue import SimpleQueue
from ..loghelper import LocalQueueHandler
from ..common import APITransport, WebRequest, JobEvent, KlippyState
from ..common import APITransport, JobEvent, KlippyState
from ..utils import json_wrapper as jsonw
from typing import (
@ -580,16 +580,9 @@ class SimplyPrint(APITransport):
if not sub_objs:
return
# Create our own subscription rather than use the host sub
args = {'objects': sub_objs}
klippy: KlippyConnection
klippy = self.server.lookup_component("klippy_connection")
try:
resp: Dict[str, Dict[str, Any]] = await klippy.request(
WebRequest("objects/subscribe", args, transport=self)
)
status: Dict[str, Any] = resp.get("status", {})
except self.server.error:
status = {}
status: Dict[str, Any] = await self.klippy_apis.subscribe_from_transport(
sub_objs, self, default={}
)
if status:
logging.debug(f"SimplyPrint: Got Initial Status: {status}")
self.printer_status = status