diff --git a/tests/conftest.py b/tests/conftest.py index cb24a14..b0959e0 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -17,6 +17,8 @@ from fixtures import KlippyProcess, HttpClient, WebsocketClient ASSETS = pathlib.Path(__file__).parent.joinpath("assets") +need_klippy_restart = pytest.StashKey[bool]() + def pytest_addoption(parser: pytest.Parser, pluginmanager): parser.addoption("--klipper-path", action="store", dest="klipper_path") parser.addoption("--klipper-exec", action="store", dest="klipper_exec") @@ -55,25 +57,17 @@ def event_loop() -> Iterator[asyncio.AbstractEventLoop]: yield loop loop.close() -@pytest.fixture(scope="class") -def path_args(request: pytest.FixtureRequest, - ssl_certs: Dict[str, pathlib.Path] - ) -> Iterator[Dict[str, pathlib.Path]]: - path_marker = request.node.get_closest_marker("run_paths") - paths = { - "moonraker_conf": "base_server.conf", - "secrets": "secrets.ini", - "printer_cfg": "base_printer.cfg" - } - if path_marker is not None: - paths.update(path_marker.kwargs) - moon_cfg_path = ASSETS.joinpath(f"moonraker/{paths['moonraker_conf']}") - secrets_path = ASSETS.joinpath(f"moonraker/{paths['secrets']}") - pcfg_path = ASSETS.joinpath(f"klipper/{paths['printer_cfg']}") + +@pytest.fixture(scope="session") +def session_args(ssl_certs: Dict[str, pathlib.Path] + ) -> Iterator[Dict[str, pathlib.Path]]: + mconf_asset = ASSETS.joinpath(f"moonraker/base_server.conf") + secrets_asset = ASSETS.joinpath(f"moonraker/secrets.ini") + pcfg_asset = ASSETS.joinpath(f"klipper/base_printer.cfg") with tempfile.TemporaryDirectory(prefix="moonraker-test") as tmpdir: tmp_path = pathlib.Path(tmpdir) - secrets_dest = tmp_path.joinpath(paths['secrets']) - shutil.copy(secrets_path, secrets_dest) + secrets_dest = tmp_path.joinpath("secrets.ini") + shutil.copy(secrets_asset, secrets_dest) cfg_path = tmp_path.joinpath("config") cfg_path.mkdir() log_path = tmp_path.joinpath("logs") @@ -83,6 +77,7 @@ def path_args(request: pytest.FixtureRequest, gcode_path = tmp_path.joinpath("gcode_files") gcode_path.mkdir() dest_paths = { + "temp_path": tmp_path, "asset_path": ASSETS, "config_path": cfg_path, "database_path": db_path, @@ -92,26 +87,22 @@ def path_args(request: pytest.FixtureRequest, "klippy_uds_path": tmp_path.joinpath("klippy_uds"), "klippy_pty_path": tmp_path.joinpath("klippy_pty"), "klipper.dict": ASSETS.joinpath("klipper/klipper.dict"), + "mconf_asset": mconf_asset, + "pcfg_asset": pcfg_asset, } dest_paths.update(ssl_certs) - if "moonraker_log" in paths: - dest_paths['moonraker.log'] = log_path.joinpath( - paths["moonraker_log"]) - moon_cfg_dest = cfg_path.joinpath("moonraker.conf") - interpolate_config(moon_cfg_path, moon_cfg_dest, dest_paths) - dest_paths['moonraker.conf'] = moon_cfg_dest + mconf_dest = cfg_path.joinpath("moonraker.conf") + dest_paths["moonraker.conf"] = mconf_dest + interpolate_config(mconf_asset, mconf_dest, dest_paths) pcfg_dest = cfg_path.joinpath("printer.cfg") - interpolate_config(pcfg_path, pcfg_dest, dest_paths) - dest_paths['printer.cfg'] = pcfg_dest - if "moonraker_bkp" in paths: - bkp_source = ASSETS.joinpath("moonraker/base_server.conf") - bkp_dest = cfg_path.joinpath(paths["moonraker_bkp"]) - interpolate_config(bkp_source, bkp_dest, dest_paths) + dest_paths["printer.cfg"] = pcfg_dest + interpolate_config(pcfg_asset, pcfg_dest, dest_paths) yield dest_paths -@pytest.fixture(scope="class") -def klippy(path_args: Dict[str, pathlib.Path], - pytestconfig: pytest.Config) -> Iterator[KlippyProcess]: +@pytest.fixture(scope="session") +def klippy_session(session_args: Dict[str, pathlib.Path], + pytestconfig: pytest.Config) -> Iterator[KlippyProcess]: + pytestconfig.stash[need_klippy_restart] = False kpath = pytestconfig.getoption('klipper_path', "~/klipper") kexec = pytestconfig.getoption('klipper_exec', None) if kexec is None: @@ -119,11 +110,81 @@ def klippy(path_args: Dict[str, pathlib.Path], exec = pathlib.Path(kexec).expanduser() klipper_path = pathlib.Path(kpath).expanduser() base_cmd = f"{exec} {klipper_path}/klippy/klippy.py " - kproc = KlippyProcess(base_cmd, path_args) + kproc = KlippyProcess(base_cmd, session_args) kproc.start() yield kproc kproc.stop() +@pytest.fixture(scope="class") +def klippy(klippy_session: KlippyProcess, + pytestconfig: pytest.Config): + if pytestconfig.stash[need_klippy_restart]: + pytestconfig.stash[need_klippy_restart] = False + klippy_session.restart() + return klippy_session + +@pytest.fixture(scope="class") +def path_args(request: pytest.FixtureRequest, + session_args: Dict[str, pathlib.Path], + pytestconfig: pytest.Config + ) -> Iterator[Dict[str, pathlib.Path]]: + path_marker = request.node.get_closest_marker("run_paths") + paths: Dict[str, Any] = { + "moonraker_conf": "base_server.conf", + "secrets": "secrets.ini", + "printer_cfg": "base_printer.cfg", + "klippy_uds": None + } + if path_marker is not None: + paths.update(path_marker.kwargs) + tmp_path = session_args["temp_path"] + cfg_path = session_args["config_path"] + mconf_dest = session_args["moonraker.conf"] + mconf_asset = ASSETS.joinpath(f"moonraker/{paths['moonraker_conf']}") + pcfg_asset = ASSETS.joinpath(f"klipper/{paths['printer_cfg']}") + last_uds = session_args["klippy_uds_path"] + if paths["klippy_uds"] is not None: + tmp_uds = tmp_path.joinpath(paths["klippy_uds"]) + session_args["klippy_uds_path"] = tmp_uds + if ( + not mconf_asset.samefile(session_args["mconf_asset"]) or + paths["klippy_uds"] is not None + ): + session_args['mconf_asset'] = mconf_asset + interpolate_config(mconf_asset, mconf_dest, session_args) + if not pcfg_asset.samefile(session_args["pcfg_asset"]): + pcfg_dest = session_args["printer.cfg"] + session_args["pcfg_asset"] = pcfg_asset + interpolate_config(pcfg_asset, pcfg_dest, session_args) + pytestconfig.stash[need_klippy_restart] = True + if paths["secrets"] != session_args["secrets_path"].name: + secrets_asset = ASSETS.joinpath(f"moonraker/{paths['secrets']}") + secrets_dest = tmp_path.joinpath(paths['secrets']) + shutil.copy(secrets_asset, secrets_dest) + session_args["secrets_path"] = secrets_dest + if "moonraker_log" in paths: + log_path = session_args["log_path"] + session_args['moonraker.log'] = log_path.joinpath( + paths["moonraker_log"]) + bkp_dest: pathlib.Path = cfg_path.joinpath(".moonraker.conf.bkp") + if "moonraker_bkp" in paths: + bkp_source = ASSETS.joinpath("moonraker/base_server.conf") + bkp_dest = cfg_path.joinpath(paths["moonraker_bkp"]) + interpolate_config(bkp_source, bkp_dest, session_args) + yield session_args + log = session_args.pop("moonraker.log", None) + if log is not None and log.is_file(): + log.unlink() + if bkp_dest.is_file(): + bkp_dest.unlink() + for item in session_args["database_path"].iterdir(): + if item.is_file(): + item.unlink() + session_args["klippy_uds_path"] = last_uds + if paths["klippy_uds"] is not None: + # restore the original uds path + interpolate_config(mconf_asset, mconf_dest, session_args) + @pytest.fixture(scope="class") def base_server(path_args: Dict[str, pathlib.Path], event_loop: asyncio.AbstractEventLoop diff --git a/tests/test_config.py b/tests/test_config.py index 6757169..3c279f2 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -476,7 +476,7 @@ class TestBackupConfig: def test_backup_config_success(self, path_args: Dict[str, pathlib.Path]): cfg_path = path_args["moonraker.conf"] - bkp_dest = cfg_path.parent.joinpath(".moonraker.conf.bkp") + bkp_dest = cfg_path.parent.joinpath(f".{cfg_path.name}.bkp") if bkp_dest.exists(): pytest.fail("Backup Already Exists") confighelper.backup_config(str(cfg_path)) @@ -484,7 +484,7 @@ class TestBackupConfig: def test_backup_skip(self, path_args: Dict[str, pathlib.Path]): cfg_path = path_args["moonraker.conf"] - bkp_dest = cfg_path.parent.joinpath(".moonraker.conf.bkp") + bkp_dest = cfg_path.parent.joinpath(f".{cfg_path.name}.bkp") if not bkp_dest.exists(): pytest.fail("Backup Not Present") stat = bkp_dest.stat() @@ -493,6 +493,6 @@ class TestBackupConfig: def test_find_backup(self, path_args: Dict[str, pathlib.Path]): cfg_path = path_args["moonraker.conf"] - bkp_dest = cfg_path.parent.joinpath(".moonraker.conf.bkp") + bkp_dest = cfg_path.parent.joinpath(f".{cfg_path.name}.bkp") bkp = confighelper.find_config_backup(str(cfg_path)) assert bkp == str(bkp_dest) diff --git a/tests/test_klippy_connection.py b/tests/test_klippy_connection.py index 1437315..4b6122f 100644 --- a/tests/test_klippy_connection.py +++ b/tests/test_klippy_connection.py @@ -57,34 +57,21 @@ async def test_klippy_shutdown(ready_server: Server, klippy: KlippyProcess): await asyncio.wait_for(fut, 2.) assert fut.result() == "shutdown" -@pytest.mark.asyncio -async def test_klippy_disconnect(ready_server: Server, klippy: KlippyProcess): - evtloop = ready_server.get_event_loop() - fut = evtloop.create_future() - - def on_disconnect(): - if not fut.done(): - fut.set_result("disconnect") - ready_server.register_event_handler("server:klippy_disconnect", - on_disconnect) - klippy.stop() - await asyncio.wait_for(fut, 2.) - assert fut.result() == "disconnect" - @pytest.mark.asyncio async def test_klippy_reconnect(ready_server: Server, klippy: KlippyProcess): evtloop = ready_server.get_event_loop() - fut = evtloop.create_future() - - def on_reconnect(): - if not fut.done(): - fut.set_result("test") - ready_server.register_event_handler("server:klippy_ready", - on_reconnect) - klippy.send_gcode("RESTART") - await asyncio.wait_for(fut, 4.) - assert fut.result() == "test" + futs = [evtloop.create_future() for _ in range(2)] + events = { + "server:klippy_disconnect": lambda: futs[0].set_result("disconnect"), + "server:klippy_ready": lambda: futs[1].set_result("ready") + } + for name, func in events.items(): + ready_server.register_event_handler(name, func) + klippy.restart() + ret = await asyncio.wait_for(asyncio.gather(*futs), 6.) + assert ret == ["disconnect", "ready"] +@pytest.mark.run_paths(klippy_uds="fake_uds") @pytest.mark.asyncio async def test_no_klippy_connection_error(full_server: Server): await full_server.start_server() @@ -152,6 +139,7 @@ async def test_wait_connect_fail(base_server: Server): ret = await base_server.klippy_connection.wait_connected() assert ret is False +@pytest.mark.run_paths(klippy_uds="fake_uds") @pytest.mark.asyncio async def test_no_uds(base_server: Server): attempts = [1, 2, 3] @@ -163,6 +151,7 @@ async def test_no_uds(base_server: Server): ret = await base_server.klippy_connection._do_connect() assert ret is False +@pytest.mark.run_paths(klippy_uds="fake_uds") @pytest.mark.asyncio async def test_no_uds_access(base_server: Server, path_args: Dict[str, pathlib.Path]):