261 lines
9.3 KiB
Python
261 lines
9.3 KiB
Python
from __future__ import annotations
|
|
import pytest
|
|
import asyncio
|
|
import pathlib
|
|
from typing import TYPE_CHECKING, Dict
|
|
from moonraker.server import ServerError
|
|
from moonraker.klippy_connection import KlippyRequest
|
|
from mocks import MockReader, MockWriter
|
|
|
|
if TYPE_CHECKING:
|
|
from server import Server
|
|
from conftest import KlippyProcess
|
|
|
|
@pytest.mark.usefixtures("klippy")
|
|
@pytest.mark.asyncio
|
|
async def test_klippy_startup(full_server: Server):
|
|
evtloop = full_server.get_event_loop()
|
|
futs = [evtloop.create_future() for _ in range(3)]
|
|
events = {
|
|
"server:klippy_identified": lambda: futs[0].set_result("id"),
|
|
"server:klippy_started": lambda x: futs[1].set_result("started"),
|
|
"server:klippy_ready": lambda: futs[2].set_result("ready")
|
|
}
|
|
for name, func in events.items():
|
|
full_server.register_event_handler(name, func)
|
|
await full_server.start_server()
|
|
ret = await asyncio.wait_for(asyncio.gather(*futs), 4.)
|
|
assert (
|
|
ret == ["id", "started", "ready"] and
|
|
full_server.klippy_connection.is_connected()
|
|
)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_gcode_response(ready_server: Server,
|
|
klippy: KlippyProcess):
|
|
evtloop = ready_server.get_event_loop()
|
|
fut = evtloop.create_future()
|
|
|
|
def on_gc_resp(resp: str):
|
|
if not fut.done():
|
|
fut.set_result(resp)
|
|
ready_server.register_event_handler("server:gcode_response", on_gc_resp)
|
|
klippy.send_gcode("M118 Moonraker Test")
|
|
await asyncio.wait_for(fut, 1.)
|
|
assert "Moonraker Test" in fut.result()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_klippy_shutdown(ready_server: Server, klippy: KlippyProcess):
|
|
evtloop = ready_server.get_event_loop()
|
|
fut = evtloop.create_future()
|
|
|
|
def on_shutdown():
|
|
if not fut.done():
|
|
fut.set_result("shutdown")
|
|
ready_server.register_event_handler("server:klippy_shutdown", on_shutdown)
|
|
klippy.send_gcode("M112")
|
|
await asyncio.wait_for(fut, 2.)
|
|
assert fut.result() == "shutdown"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_klippy_reconnect(ready_server: Server, klippy: KlippyProcess):
|
|
evtloop = ready_server.get_event_loop()
|
|
futs = [evtloop.create_future() for _ in range(2)]
|
|
events = {
|
|
"server:klippy_disconnect": lambda: futs[0].set_result("disconnect"),
|
|
"server:klippy_ready": lambda: futs[1].set_result("ready")
|
|
}
|
|
for name, func in events.items():
|
|
ready_server.register_event_handler(name, func)
|
|
klippy.restart()
|
|
ret = await asyncio.wait_for(asyncio.gather(*futs), 6.)
|
|
assert ret == ["disconnect", "ready"]
|
|
|
|
@pytest.mark.run_paths(klippy_uds="fake_uds")
|
|
@pytest.mark.asyncio
|
|
async def test_no_klippy_connection_error(full_server: Server):
|
|
await full_server.start_server()
|
|
with pytest.raises(ServerError):
|
|
kapis = full_server.klippy_connection.klippy_apis
|
|
await kapis.run_gcode("M115")
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_status_update(ready_server: Server, klippy: KlippyProcess):
|
|
evtloop = ready_server.get_event_loop()
|
|
fut = evtloop.create_future()
|
|
|
|
def on_status_update(data):
|
|
if not fut.done():
|
|
fut.set_result(data)
|
|
ready_server.register_event_handler("server:status_update",
|
|
on_status_update)
|
|
kapis = ready_server.klippy_connection.klippy_apis
|
|
await kapis.subscribe_objects({"toolhead": None})
|
|
klippy.send_gcode("G28")
|
|
await asyncio.wait_for(fut, 2.)
|
|
assert isinstance(fut.result(), dict)
|
|
|
|
@pytest.mark.run_paths(printer_cfg="error_printer.cfg")
|
|
@pytest.mark.asyncio
|
|
async def test_klippy_error(ready_server: Server):
|
|
kconn = ready_server.klippy_connection
|
|
assert kconn.state == "error"
|
|
|
|
@pytest.mark.run_paths(printer_cfg="missing_reqs.cfg")
|
|
@pytest.mark.asyncio
|
|
async def test_missing_reqs(ready_server: Server):
|
|
mreqs = sorted(ready_server.klippy_connection.missing_requirements)
|
|
expected = ["display_status", "pause_resume", "virtual_sdcard"]
|
|
assert mreqs == expected
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_connection_close(full_server: Server):
|
|
await full_server.start_server()
|
|
# Test multiple close attempts, the second to enter
|
|
# should wait and exit
|
|
ret = full_server.klippy_connection.close(True)
|
|
ret2 = full_server.klippy_connection.close(True)
|
|
await asyncio.wait_for(asyncio.gather(ret, ret2), 4.)
|
|
kconn = full_server.klippy_connection
|
|
assert kconn.connection_task.cancelled()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_init_error(base_server: Server):
|
|
base_server.server_running = True
|
|
kconn = base_server.klippy_connection
|
|
|
|
def mock_is_connected():
|
|
return kconn.init_attempts < 3
|
|
kconn.is_connected = mock_is_connected
|
|
ret = await kconn._init_klippy_connection()
|
|
assert ret is False
|
|
|
|
def test_connect_fail(base_server: Server):
|
|
ret = base_server.klippy_connection.connect()
|
|
assert ret.result() is False
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_wait_connect_fail(base_server: Server):
|
|
ret = await base_server.klippy_connection.wait_connected()
|
|
assert ret is False
|
|
|
|
@pytest.mark.run_paths(klippy_uds="fake_uds")
|
|
@pytest.mark.asyncio
|
|
async def test_no_uds(base_server: Server):
|
|
attempts = [1, 2, 3]
|
|
|
|
def mock_is_running():
|
|
attempts.pop(0)
|
|
return len(attempts) > 0
|
|
base_server.is_running = mock_is_running
|
|
ret = await base_server.klippy_connection._do_connect()
|
|
assert ret is False
|
|
|
|
@pytest.mark.run_paths(klippy_uds="fake_uds")
|
|
@pytest.mark.asyncio
|
|
async def test_no_uds_access(base_server: Server,
|
|
path_args: Dict[str, pathlib.Path]):
|
|
attempts = [1, 2, 3]
|
|
uds_path = path_args['klippy_uds_path']
|
|
uds_path.write_text("test")
|
|
uds_path.chmod(mode=222)
|
|
|
|
def mock_is_running():
|
|
attempts.pop(0)
|
|
return len(attempts) > 0
|
|
base_server.is_running = mock_is_running
|
|
ret = await base_server.klippy_connection._do_connect()
|
|
assert ret is False
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_write_not_connected(base_server: Server):
|
|
req = KlippyRequest("", {})
|
|
kconn = base_server.klippy_connection
|
|
await kconn._write_request(req)
|
|
assert isinstance(req.response, ServerError)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_write_error(base_server: Server):
|
|
req = KlippyRequest("", {})
|
|
kconn = base_server.klippy_connection
|
|
kconn.writer = MockWriter()
|
|
await kconn._write_request(req)
|
|
assert isinstance(req.response, ServerError)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_write_cancelled(base_server: Server):
|
|
req = KlippyRequest("", {})
|
|
kconn = base_server.klippy_connection
|
|
kconn.writer = MockWriter(wait_drain=True)
|
|
task = base_server.event_loop.create_task(kconn._write_request(req))
|
|
base_server.event_loop.delay_callback(.01, task.cancel)
|
|
with pytest.raises(asyncio.CancelledError):
|
|
await task
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_read_error(base_server: Server,
|
|
caplog: pytest.LogCaptureFixture):
|
|
mock_reader = MockReader("raise_error")
|
|
kconn = base_server.klippy_connection
|
|
await kconn._read_stream(mock_reader)
|
|
assert "Klippy Stream Read Error" == caplog.messages[-1]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_read_cancelled(base_server: Server):
|
|
mock_reader = MockReader("wait")
|
|
kconn = base_server.klippy_connection
|
|
task = base_server.event_loop.create_task(
|
|
kconn._read_stream(mock_reader))
|
|
base_server.event_loop.delay_callback(.01, task.cancel)
|
|
with pytest.raises(asyncio.CancelledError):
|
|
await task
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_read_decode_error(base_server: Server,
|
|
caplog: pytest.LogCaptureFixture):
|
|
mock_reader = MockReader()
|
|
kconn = base_server.klippy_connection
|
|
await kconn._read_stream(mock_reader)
|
|
assert "Error processing Klippy Host Response:" in caplog.messages[-1]
|
|
|
|
def test_process_unknown_method(base_server: Server,
|
|
caplog: pytest.LogCaptureFixture):
|
|
cmd = {"method": "test_unknown"}
|
|
kconn = base_server.klippy_connection
|
|
kconn._process_command(cmd)
|
|
assert "Unknown method received: test_unknown" == caplog.messages[-1]
|
|
|
|
def test_process_unknown_request(base_server: Server,
|
|
caplog: pytest.LogCaptureFixture):
|
|
cmd = {"id": 4543}
|
|
kconn = base_server.klippy_connection
|
|
kconn._process_command(cmd)
|
|
expected = f"No request matching request ID: 4543, response: {cmd}"
|
|
assert expected == caplog.messages[-1]
|
|
|
|
def test_process_invalid_request(base_server: Server):
|
|
req = KlippyRequest("", {})
|
|
kconn = base_server.klippy_connection
|
|
kconn.pending_requests[req.id] = req
|
|
cmd = {"id": req.id}
|
|
kconn._process_command(cmd)
|
|
assert isinstance(req.response, ServerError)
|
|
|
|
# TODO: This can probably go in a class with test apis
|
|
@pytest.mark.asyncio
|
|
async def test_call_remote_method(base_server: Server,
|
|
klippy: KlippyProcess):
|
|
fut = base_server.get_event_loop().create_future()
|
|
|
|
def method_test(result):
|
|
fut.set_result(result)
|
|
base_server.register_remote_method("moonraker_test", method_test)
|
|
base_server.load_components()
|
|
await base_server.server_init()
|
|
ret = base_server.klippy_connection.wait_connected()
|
|
await asyncio.wait_for(ret, 4.)
|
|
klippy.send_gcode("TEST_REMOTE_METHOD")
|
|
await fut
|
|
await base_server._stop_server("terminate")
|
|
assert fut.result() == "test"
|