1462 lines
55 KiB
Python
1462 lines
55 KiB
Python
from __future__ import annotations
|
|
from re import L
|
|
import pytest
|
|
import pytest_asyncio
|
|
import asyncio
|
|
import copy
|
|
from inspect import isawaitable
|
|
from moonraker.server import Server
|
|
from moonraker.utils import ServerError
|
|
from typing import TYPE_CHECKING, AsyncIterator, Dict, Any, Iterator
|
|
|
|
if TYPE_CHECKING:
|
|
from components.database import MoonrakerDatabase
|
|
from components.database import NamespaceWrapper
|
|
from fixtures import HttpClient, WebsocketClient
|
|
|
|
TEST_DB: Dict[str, Dict[str, Any]] = {
|
|
"automobiles": {
|
|
"chevy": {
|
|
"camaro": "silver",
|
|
"silverado": {
|
|
"1500": 3,
|
|
"2500": 1
|
|
}
|
|
},
|
|
"ford": {
|
|
"mustang": "red",
|
|
"f-series": {
|
|
"f150": [150, "black"],
|
|
"f350": {
|
|
"platinum": 10000,
|
|
}
|
|
}
|
|
}
|
|
},
|
|
"fruits": {
|
|
"apples": {
|
|
"granny_smith": 10,
|
|
"red_delicious": 8
|
|
},
|
|
"oranges": 50,
|
|
"bananas": True
|
|
},
|
|
"vegetables": {
|
|
"tomato": "nope"
|
|
},
|
|
"books": {
|
|
"fantasy": {
|
|
"lotr": "Gandalf"
|
|
},
|
|
"science_fiction": "dune"
|
|
},
|
|
"planets": {
|
|
"earth": {
|
|
"biosphere": True,
|
|
"color": "blue"
|
|
},
|
|
"venus": {
|
|
"hot": True
|
|
},
|
|
"mars": {
|
|
"color": "red"
|
|
},
|
|
"jupiter": {
|
|
"gas_giant": True,
|
|
"europa": {
|
|
"diameter": 3121.6
|
|
},
|
|
"io": "closest"
|
|
},
|
|
"saturn": {
|
|
"has_rings": True
|
|
},
|
|
"pluto": "Don't unplanet me!"
|
|
}
|
|
}
|
|
|
|
TEST_RECORD = {
|
|
"debian": {
|
|
"ubuntu": 10,
|
|
"mint": True
|
|
},
|
|
"arch": 100,
|
|
"redhat": {
|
|
"centos": False
|
|
}
|
|
}
|
|
|
|
TEST_OVERWRITE = {
|
|
"vegetables": {
|
|
"celery": "ranch",
|
|
"lettuce": 100,
|
|
"spinich": "popeye"
|
|
},
|
|
"oses": TEST_RECORD
|
|
}
|
|
|
|
@pytest_asyncio.fixture(scope="class")
|
|
async def base_db(base_server: Server) -> AsyncIterator[MoonrakerDatabase]:
|
|
db: MoonrakerDatabase = base_server.load_component(
|
|
base_server.config, "database")
|
|
for ns, record in TEST_DB.items():
|
|
for record_name, value in record.items():
|
|
db.insert_item(ns, record_name, value)
|
|
yield db
|
|
await db.close()
|
|
|
|
@pytest_asyncio.fixture(scope="class")
|
|
async def running_db(base_server: Server) -> AsyncIterator[MoonrakerDatabase]:
|
|
base_server.load_components()
|
|
db: MoonrakerDatabase = base_server.lookup_component("database")
|
|
for ns, record in TEST_DB.items():
|
|
for record_name, value in record.items():
|
|
db.insert_item(ns, record_name, value)
|
|
await base_server.server_init(False)
|
|
await base_server.start_server(False)
|
|
yield db
|
|
await base_server._stop_server("terminate")
|
|
|
|
# check_future() only resolves futures that are complete. This
|
|
# is done to test database behavior in __init__() methods, where
|
|
# it is not possible to await a result. We can't make this method
|
|
# async, as we need to check the future immediately. Using an
|
|
# async would cause it to be scheduled on the event loop, with
|
|
# a thread potentially resolving a future before we can check it.
|
|
def check_future(fut: asyncio.Future,
|
|
db: MoonrakerDatabase
|
|
) -> Any:
|
|
server = db.server
|
|
if server.is_running():
|
|
if fut.done():
|
|
pytest.fail("Future done while server running")
|
|
return fut
|
|
elif not fut.done():
|
|
pytest.fail("Future not ready before server start")
|
|
return fut.result()
|
|
|
|
@pytest.mark.asyncio
|
|
class BaseTest:
|
|
@pytest.fixture(scope="class")
|
|
def db(self, base_db):
|
|
return base_db
|
|
|
|
@pytest.mark.asyncio
|
|
class ThreadedTest:
|
|
@pytest.fixture(scope="class")
|
|
def db(self, running_db):
|
|
return running_db
|
|
|
|
class TestInstantiation:
|
|
@pytest.fixture(scope="class")
|
|
def db(self,
|
|
base_server: Server,
|
|
event_loop: asyncio.AbstractEventLoop
|
|
) -> Iterator[MoonrakerDatabase]:
|
|
db: MoonrakerDatabase
|
|
db = base_server.load_component(base_server.config, "database")
|
|
yield db
|
|
event_loop.run_until_complete(db.close())
|
|
|
|
def test_initial_state(self, db: MoonrakerDatabase):
|
|
mrdb = db.get_item("moonraker").result()
|
|
assert (
|
|
list(db.namespaces.keys()) == ["moonraker"] and
|
|
mrdb == {
|
|
"database_version": 1,
|
|
"database": {
|
|
"unsafe_shutdowns": 1
|
|
}
|
|
}
|
|
)
|
|
|
|
def test_wrap_invalid_namespace(self, db: MoonrakerDatabase):
|
|
expected = "Namespace 'invalid' not found"
|
|
with pytest.raises(ServerError, match=expected):
|
|
db.wrap_namespace("invalid")
|
|
|
|
def test_insert_record_nonetype(self, db: MoonrakerDatabase):
|
|
ret = db._insert_record("moonraker", "test_key", None)
|
|
assert ret is False
|
|
|
|
def test_encode_error(self, db: MoonrakerDatabase):
|
|
with pytest.raises(ServerError, match="Error encoding val"):
|
|
db._encode_value(set(["invalid_value"]))
|
|
|
|
def test_decode_error(self, db: MoonrakerDatabase):
|
|
with pytest.raises(ServerError, match="Error decoding value"):
|
|
db._decode_value(b"invalid")
|
|
|
|
class TestCoreServerLoaded:
|
|
@pytest.fixture(scope="class")
|
|
def db(self,
|
|
base_server: Server,
|
|
event_loop: asyncio.AbstractEventLoop
|
|
) -> Iterator[MoonrakerDatabase]:
|
|
base_server.load_components()
|
|
db: MoonrakerDatabase
|
|
db = base_server.lookup_component("database")
|
|
yield db
|
|
event_loop.run_until_complete(
|
|
base_server._stop_server("terminate"))
|
|
|
|
def test_core_state(self, db: MoonrakerDatabase):
|
|
mrdb = db.get_item("moonraker").result()
|
|
expected_ns = ["gcode_metadata", "moonraker"]
|
|
assert (
|
|
sorted(db.namespaces.keys()) == expected_ns and
|
|
mrdb == {
|
|
"database_version": 1,
|
|
"database": {
|
|
"protected_namespaces": expected_ns,
|
|
"unsafe_shutdowns": 1
|
|
},
|
|
"file_manager": {
|
|
"metadata_version": 3
|
|
}
|
|
}
|
|
)
|
|
|
|
@pytest.mark.run_paths(database="bare_db.cdb")
|
|
class TestCoreServerPreloaded(TestCoreServerLoaded):
|
|
def test_core_state(self, db: MoonrakerDatabase):
|
|
expected_ns = [
|
|
"moonraker", "gcode_metadata", "update_manager",
|
|
"authorized_users", "history"
|
|
]
|
|
mrdb = db.get_item("moonraker").result()
|
|
assert (
|
|
sorted(db.namespaces.keys()) == sorted(expected_ns) and
|
|
mrdb["database"]["unsafe_shutdowns"] == 2
|
|
)
|
|
|
|
class TestUnallowedMethods:
|
|
def test_register_error(self, running_db: MoonrakerDatabase):
|
|
with pytest.raises(ServerError):
|
|
running_db.register_local_namespace("fruits")
|
|
|
|
def test_wrap_namespace(self, running_db: MoonrakerDatabase):
|
|
with pytest.raises(ServerError):
|
|
running_db.wrap_namespace("fruits")
|
|
|
|
class TestInsertItem(BaseTest):
|
|
async def test_insert_record(self, db: MoonrakerDatabase):
|
|
db.insert_item("oses", "linux", TEST_RECORD)
|
|
fut = db.get_item("oses", "linux")
|
|
result = check_future(fut, db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert result == TEST_RECORD
|
|
|
|
async def test_insert_nested(self, db: MoonrakerDatabase):
|
|
db.insert_item(
|
|
"oses", "windows.eleven.feburary.2022", "ok")
|
|
fut = db.get_item("oses", "windows.eleven.feburary.2022")
|
|
result = check_future(fut, db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert result == "ok"
|
|
|
|
async def test_insert_nested_invalid_assign(self, db: MoonrakerDatabase):
|
|
with pytest.raises(ServerError):
|
|
ret = db.insert_item("oses", "linux.arch.february", "2022")
|
|
await ret
|
|
|
|
async def test_insert_nested_reduce_failure(self, db: MoonrakerDatabase):
|
|
with pytest.raises(ServerError):
|
|
ret = db.insert_item("oses", "linux.arch.february.2022", True)
|
|
await ret
|
|
|
|
async def test_overwrite_record(self, db: MoonrakerDatabase,
|
|
caplog: pytest.LogCaptureFixture):
|
|
db.insert_item("oses", "ios", 10)
|
|
db.insert_item("oses", ["ios", "15.3"], True)
|
|
fut = db.get_item("oses", "ios")
|
|
expected_log = (
|
|
"Warning: Key ios contains a value of type"
|
|
" <class 'int'>. Overwriting with an object."
|
|
)
|
|
result = check_future(fut, db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert (
|
|
result == {"15.3": True} and
|
|
expected_log in caplog.messages
|
|
)
|
|
|
|
class TestInsertItemThreaded(ThreadedTest, TestInsertItem):
|
|
pass
|
|
class TestGetItem(BaseTest):
|
|
async def test_get_record(self, db: MoonrakerDatabase):
|
|
fut = db.get_item("automobiles", "chevy")
|
|
result = check_future(fut, db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert result == TEST_DB["automobiles"]["chevy"]
|
|
|
|
async def test_get_namespace(self, db: MoonrakerDatabase):
|
|
fut = db.get_item("fruits")
|
|
result = check_future(fut, db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert result == TEST_DB["fruits"]
|
|
|
|
async def test_get_namespace_fail(self, db: MoonrakerDatabase):
|
|
with pytest.raises(ServerError):
|
|
ret = db.get_item("trains")
|
|
await ret
|
|
|
|
async def test_get_namespace_default(self, db: MoonrakerDatabase):
|
|
fut = db.get_item("trains", default={})
|
|
result = check_future(fut, db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert result == {}
|
|
|
|
async def test_get_record_fail(self, db: MoonrakerDatabase):
|
|
with pytest.raises(ServerError):
|
|
ret = db.get_item("automobiles", "toyota")
|
|
await ret
|
|
|
|
async def test_get_record_default(self, db: MoonrakerDatabase):
|
|
fut = db.get_item("automobiles", "toyota", {})
|
|
result = check_future(fut, db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert result == {}
|
|
|
|
async def test_get_key_fail(self, db: MoonrakerDatabase):
|
|
with pytest.raises(ServerError):
|
|
ret = db.get_item("automobiles", "chevy.equinox")
|
|
await ret
|
|
|
|
async def test_get_key_fail_default(self, db: MoonrakerDatabase):
|
|
fut = db.get_item("automobiles", "chevy.equinox", "suv")
|
|
result = check_future(fut, db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert result == "suv"
|
|
|
|
async def test_get_nested(self, db: MoonrakerDatabase):
|
|
fut = db.get_item(
|
|
"automobiles", "ford.f-series.f350.platinum")
|
|
result = check_future(fut, db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert result == 10000
|
|
|
|
async def test_get_nested_no_key(self, db: MoonrakerDatabase):
|
|
with pytest.raises(ServerError):
|
|
ret = db.get_item("automobiles", "ford.f-series.f350.superduty")
|
|
await ret
|
|
|
|
async def test_get_nested_no_key_default(self, db: MoonrakerDatabase):
|
|
fut = db.get_item(
|
|
"automobiles", "ford.f-series.f350.superduty", "success")
|
|
result = check_future(fut, db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert result == "success"
|
|
|
|
async def test_get_record_invalid_key(self, db: MoonrakerDatabase):
|
|
with pytest.raises(ServerError):
|
|
ret = db.get_item("fruits", "apples..red_delicious")
|
|
await ret
|
|
|
|
async def test_get_record_invalid_key_type(self, db: MoonrakerDatabase):
|
|
with pytest.raises(ServerError):
|
|
ret = db.get_item("vegetables", 100)
|
|
await ret
|
|
|
|
async def test_get_record_key_list(self, db: MoonrakerDatabase):
|
|
key = ["chevy", "silverado", "2500"]
|
|
fut = db.get_item("automobiles", key)
|
|
result = check_future(fut, db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert result == 1
|
|
|
|
class TestGetItemThreaded(ThreadedTest, TestGetItem):
|
|
pass
|
|
|
|
class TestUpdateItem(BaseTest):
|
|
async def test_update_record(self, db: MoonrakerDatabase):
|
|
update_val = {
|
|
"granny_smith": 1000,
|
|
"jazz": 10.8,
|
|
"gala": {"bland": True}
|
|
}
|
|
db.update_item("fruits", "apples", update_val)
|
|
fut = db.get_item("fruits", "apples")
|
|
result = check_future(fut, db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert result == {
|
|
"granny_smith": 1000,
|
|
"red_delicious": 8,
|
|
"jazz": 10.8,
|
|
"gala": {"bland": True}
|
|
}
|
|
|
|
async def test_update_nested(self, db: MoonrakerDatabase):
|
|
update_val = {"3500": {"color": "green"}, "2500": None}
|
|
db.update_item("automobiles", "chevy.silverado", update_val)
|
|
fut = db.get_item("automobiles", "chevy")
|
|
result = check_future(fut, db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert result == {
|
|
"camaro": "silver",
|
|
"silverado": {
|
|
"1500": 3,
|
|
"2500": None,
|
|
"3500": {"color": "green"}
|
|
}
|
|
}
|
|
|
|
async def test_update_replace_nested(self, db: MoonrakerDatabase):
|
|
db.update_item("fruits", "apples.gala.bland", "ok")
|
|
fut = db.get_item("fruits", "apples")
|
|
result = check_future(fut, db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert result == {
|
|
"granny_smith": 1000,
|
|
"red_delicious": 8,
|
|
"jazz": 10.8,
|
|
"gala": {"bland": "ok"}
|
|
}
|
|
|
|
async def test_update_replace_nested_dict(self, db: MoonrakerDatabase):
|
|
db.update_item("automobiles", "ford.f-series.f350", "tow")
|
|
fut = db.get_item("automobiles", "ford")
|
|
result = check_future(fut, db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert result == {
|
|
"mustang": "red",
|
|
"f-series": {
|
|
"f150": [150, "black"],
|
|
"f350": "tow"
|
|
}
|
|
}
|
|
|
|
async def test_update_namespace_fail(self, db: MoonrakerDatabase):
|
|
with pytest.raises(ServerError):
|
|
ret = db.update_item("pizza", "deepdish", {})
|
|
await ret
|
|
|
|
async def test_update_record_fail(self, db: MoonrakerDatabase):
|
|
with pytest.raises(ServerError):
|
|
ret = db.update_item("automobiles", "toyota", {})
|
|
await ret
|
|
|
|
async def test_update_replace_record_dict_fail(self, db: MoonrakerDatabase):
|
|
with pytest.raises(ServerError):
|
|
ret = db.update_item("fruits", "apples", None)
|
|
await ret
|
|
|
|
async def test_update_replace_record_dict(self, db: MoonrakerDatabase):
|
|
db.update_item("fruits", "apples", ["success"])
|
|
fut = db.get_item("fruits", "apples")
|
|
result = check_future(fut, db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert result == ["success"]
|
|
|
|
async def test_update_key_fail(self, db: MoonrakerDatabase):
|
|
with pytest.raises(ServerError):
|
|
ret = db.update_item("automobiles", "ford.raptor", 10)
|
|
await ret
|
|
|
|
async def test_update_nested_key_fail(self, db: MoonrakerDatabase):
|
|
with pytest.raises(ServerError):
|
|
ret = db.update_item("automobiles", "ford.mustang.cobra", 10)
|
|
await ret
|
|
|
|
async def test_update_nested_key_not_found(self, db: MoonrakerDatabase):
|
|
with pytest.raises(ServerError):
|
|
ret = db.update_item("automobiles", "chevy.corvette.z06", 10)
|
|
await ret
|
|
|
|
class TestUpdateItemThreaded(ThreadedTest, TestUpdateItem):
|
|
pass
|
|
|
|
|
|
class TestDeleteItem(BaseTest):
|
|
async def test_delete_nested_item(self, db: MoonrakerDatabase):
|
|
del_fut = db.delete_item(
|
|
"automobiles", "ford.f-series.f350.platinum")
|
|
del_result = check_future(del_fut, db)
|
|
if isawaitable(del_result):
|
|
del_result = await del_result
|
|
fut = db.get_item("automobiles", "ford")
|
|
result = check_future(fut, db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert (
|
|
del_result == 10000 and
|
|
result == {
|
|
"mustang": "red",
|
|
"f-series": {
|
|
"f150": [150, "black"],
|
|
"f350": {}
|
|
}
|
|
}
|
|
)
|
|
|
|
async def test_delete_nested_dict(self, db: MoonrakerDatabase):
|
|
del_fut = db.delete_item(
|
|
"automobiles", "ford.f-series.f350")
|
|
del_result = check_future(del_fut, db)
|
|
if isawaitable(del_result):
|
|
del_result = await del_result
|
|
fut = db.get_item("automobiles", "ford")
|
|
result = check_future(fut, db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert (
|
|
del_result == {} and
|
|
result == {
|
|
"mustang": "red",
|
|
"f-series": {
|
|
"f150": [150, "black"],
|
|
}
|
|
}
|
|
)
|
|
|
|
async def test_delete_fail(self, db: MoonrakerDatabase):
|
|
with pytest.raises(ServerError):
|
|
ret = db.delete_item("fruits", "bananas.green")
|
|
await ret
|
|
|
|
async def test_delete_record(self, db: MoonrakerDatabase):
|
|
del_fut = db.delete_item("fruits", "bananas")
|
|
del_result = check_future(del_fut, db)
|
|
if isawaitable(del_result):
|
|
del_result = await del_result
|
|
fut = db.get_item("fruits", "bananas", None)
|
|
result = check_future(fut, db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert (
|
|
del_result is True and
|
|
result is None
|
|
)
|
|
|
|
async def test_delete_last_nested(self, db: MoonrakerDatabase):
|
|
del_fut = db.delete_item("books", "fantasy.lotr")
|
|
del_result = check_future(del_fut, db)
|
|
if isawaitable(del_result):
|
|
del_result = await del_result
|
|
fut = db.get_item("books", "fantasy", None)
|
|
result = check_future(fut, db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert del_result == "Gandalf" and result is None
|
|
|
|
async def test_drop_db(self, db: MoonrakerDatabase):
|
|
del_fut = db.delete_item("vegetables", "tomato",
|
|
drop_empty_db=True)
|
|
del_result = check_future(del_fut, db)
|
|
if isawaitable(del_result):
|
|
del_result = await del_result
|
|
fut = db.get_item("vegetables", default=None)
|
|
result = check_future(fut, db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert del_result == "nope" and result is None
|
|
|
|
class TestDeleteItemThreaded(ThreadedTest, TestDeleteItem):
|
|
pass
|
|
|
|
class TestInsertBatch(BaseTest):
|
|
async def test_insert_batch(self, db: MoonrakerDatabase):
|
|
db.insert_batch("batch_test", TEST_DB)
|
|
fut = db.get_item("batch_test")
|
|
result = check_future(fut, db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert result == TEST_DB
|
|
|
|
async def test_insert_batch_overwrite(self, db: MoonrakerDatabase):
|
|
expected = copy.deepcopy(TEST_DB)
|
|
expected.update(TEST_OVERWRITE)
|
|
db.insert_batch("batch_test", TEST_OVERWRITE)
|
|
fut = db.get_item("batch_test")
|
|
result = check_future(fut, db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert result == expected
|
|
|
|
class TestInsertBatchThreaded(ThreadedTest, TestInsertBatch):
|
|
pass
|
|
|
|
class TestGetBatch(BaseTest):
|
|
async def test_get_batch(self, db: MoonrakerDatabase):
|
|
keys = ["apples", "oranges", "bananas"]
|
|
fut = db.get_batch("fruits", keys)
|
|
result = check_future(fut, db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert result == TEST_DB["fruits"]
|
|
|
|
async def test_get_batch_invalid_namespace(self, db: MoonrakerDatabase):
|
|
with pytest.raises(ServerError):
|
|
fut = db.get_batch("invalid", ["no", "key"])
|
|
await fut
|
|
|
|
async def test_get_batch_invalid_keys(self, db: MoonrakerDatabase):
|
|
fut = db.get_batch("automobiles", ["chevy", "toyota", "dodge"])
|
|
result = check_future(fut, db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert result == {"chevy": TEST_DB["automobiles"]["chevy"]}
|
|
|
|
async def test_get_batch_no_valid_keys(self, db: MoonrakerDatabase):
|
|
fut = db.get_batch("automobiles", ["toyota", "dodge"])
|
|
result = check_future(fut, db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert result == {}
|
|
|
|
class TestGetBatchThreaded(ThreadedTest, TestGetBatch):
|
|
pass
|
|
|
|
class TestMoveBatch(BaseTest):
|
|
async def test_move_batch(self, db: MoonrakerDatabase):
|
|
source_keys = list(TEST_DB["fruits"].keys())
|
|
dest_keys = [f"super_{key}" for key in source_keys]
|
|
expected = {dk: TEST_DB["fruits"][sk] for dk, sk in
|
|
zip(dest_keys, source_keys)}
|
|
db.move_batch("fruits", source_keys, dest_keys)
|
|
fut = db.get_item("fruits")
|
|
result = check_future(fut, db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert result == expected
|
|
|
|
async def test_move_batch_invalid_namespace(self, db: MoonrakerDatabase):
|
|
with pytest.raises(ServerError):
|
|
fut = db.move_batch(
|
|
"invalid_ns", ["super_banana", "super_apple"],
|
|
["banana", "apple"])
|
|
await fut
|
|
|
|
async def test_move_batch_invalid_key(self, db: MoonrakerDatabase):
|
|
source_keys = ["chevy", "toyota", "dodge"]
|
|
dest_keys = ["chevrolet", "lexus", "chrysler"]
|
|
expected = copy.deepcopy(TEST_DB["automobiles"])
|
|
expected["chevrolet"] = expected.pop("chevy")
|
|
db.move_batch("automobiles", source_keys, dest_keys)
|
|
fut = db.get_item("automobiles")
|
|
result = check_future(fut, db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert result == expected
|
|
|
|
async def test_move_batch_no_valid_keys(self, db: MoonrakerDatabase):
|
|
db.move_batch("vegetables", ["celery", "peas"],
|
|
["no_celery", "no_peas"])
|
|
fut = db.get_item("vegetables")
|
|
result = check_future(fut, db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert result == TEST_DB["vegetables"]
|
|
|
|
async def test_move_batch_mismatch_key_length(self, db: MoonrakerDatabase):
|
|
with pytest.raises(ServerError):
|
|
ret = db.move_batch("books", ["science_fiction"],
|
|
["science_fiction", "fantasy"])
|
|
await ret
|
|
|
|
class TestMoveBatchThreaded(ThreadedTest, TestMoveBatch):
|
|
pass
|
|
|
|
class TestDeleteBatch(BaseTest):
|
|
async def test_delete_batch(self, db: MoonrakerDatabase):
|
|
del_keys = ["mars", "venus", "pluto"]
|
|
expected = copy.deepcopy(TEST_DB["planets"])
|
|
for k in del_keys:
|
|
del expected[k]
|
|
db.delete_batch("planets", del_keys)
|
|
fut = db.get_item("planets")
|
|
result = check_future(fut, db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert result == expected
|
|
|
|
async def test_delete_batch_all_keys(self, db: MoonrakerDatabase):
|
|
del_keys = (TEST_DB["automobiles"].keys())
|
|
db.delete_batch("automobiles", del_keys)
|
|
fut = db.get_item("automobiles")
|
|
result = check_future(fut, db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert result == {}
|
|
|
|
async def test_delete_batch_invalid_namespace(self, db: MoonrakerDatabase):
|
|
with pytest.raises(ServerError):
|
|
ret = db.delete_batch("invalid", ["no", "key"])
|
|
await ret
|
|
|
|
async def test_delete_batch_invalid_key(self, db: MoonrakerDatabase):
|
|
del_keys = ["science_fiction", "horror", "documentary"]
|
|
db.delete_batch("books", del_keys)
|
|
fut = db.get_item("books")
|
|
result = check_future(fut, db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert result == {"fantasy": {"lotr": "Gandalf"}}
|
|
|
|
async def test_delete_batch_no_valid_keys(self, db: MoonrakerDatabase):
|
|
del_keys = ["grapes", "peaches", "strawberries"]
|
|
db.delete_batch("fruits", del_keys)
|
|
fut = db.get_item("fruits")
|
|
result = check_future(fut, db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert result == TEST_DB["fruits"]
|
|
|
|
class TestDeleteBatchThreaded(ThreadedTest, TestDeleteBatch):
|
|
pass
|
|
|
|
class TestUpdateNamespace(BaseTest):
|
|
async def test_update_namespace(self, db: MoonrakerDatabase):
|
|
update_val = {
|
|
"venus": {"hot": True},
|
|
"pluto": {"dwarf": True},
|
|
"uranus": "klignons",
|
|
"mercury": [1, 2, 3]
|
|
}
|
|
db.update_namespace("planets", update_val)
|
|
fut = db.get_item("planets")
|
|
result = check_future(fut, db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
expected = copy.deepcopy(TEST_DB["planets"])
|
|
expected.update(update_val)
|
|
assert result == expected
|
|
|
|
async def test_update_namespace_invalid(self, db: MoonrakerDatabase):
|
|
with pytest.raises(ServerError):
|
|
ret = db.update_namespace("invalid", {"hello": False})
|
|
await ret
|
|
|
|
class TestUpdateNamespaceThreaded(ThreadedTest, TestUpdateNamespace):
|
|
pass
|
|
|
|
class TestClearNamespace(BaseTest):
|
|
async def test_clear_namespace(self, db: MoonrakerDatabase):
|
|
db.clear_namespace("fruits")
|
|
fut = db.get_item("fruits")
|
|
result = check_future(fut, db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert result == {}
|
|
|
|
async def test_clear_namespace_drop(self, db: MoonrakerDatabase):
|
|
fut = db.clear_namespace("books", drop_empty_db=True)
|
|
result = check_future(fut, db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert "books" not in db.namespaces
|
|
|
|
|
|
async def test_clear_namespace_invalid(self, db: MoonrakerDatabase):
|
|
with pytest.raises(ServerError):
|
|
fut = db.clear_namespace("invalid")
|
|
await fut
|
|
|
|
class TestClearNamespaceThreaded(ThreadedTest, TestClearNamespace):
|
|
pass
|
|
|
|
class TestSyncNamespace(BaseTest):
|
|
async def test_sync_namespace(self, db: MoonrakerDatabase):
|
|
synced = copy.deepcopy(TEST_DB["planets"])
|
|
del synced["mars"]
|
|
del synced["pluto"]
|
|
synced.update({"mercury": "close", "neptune": "far",
|
|
"venus": "cloudy"})
|
|
db.sync_namespace("planets", synced)
|
|
fut = db.get_item("planets")
|
|
result = check_future(fut, db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert result == synced
|
|
|
|
async def test_sync_no_overlap(self, db: MoonrakerDatabase):
|
|
synced = {"toyota": {"corolla": "car", "tundra": "truck"}}
|
|
db.sync_namespace("automobiles", synced)
|
|
fut = db.get_item("automobiles")
|
|
result = check_future(fut, db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert result == synced
|
|
|
|
async def test_sync_no_remove(self, db: MoonrakerDatabase):
|
|
synced = copy.deepcopy(TEST_DB["fruits"])
|
|
synced.update({"cherries": "sweet", "berries": {"blue": "mild"}})
|
|
db.sync_namespace("fruits", synced)
|
|
fut = db.get_item("fruits")
|
|
result = check_future(fut, db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert result == synced
|
|
|
|
async def test_sync_namespace_empty(self, db: MoonrakerDatabase):
|
|
with pytest.raises(ServerError):
|
|
fut = db.sync_namespace("books", {})
|
|
await fut
|
|
|
|
async def test_sync_namespace_invalid(self, db: MoonrakerDatabase):
|
|
with pytest.raises(ServerError):
|
|
fut = db.sync_namespace("invalid", {"no": "key"})
|
|
await fut
|
|
|
|
class TestSyncNamespaceThreaded(ThreadedTest, TestSyncNamespace):
|
|
pass
|
|
|
|
class TestNamespaceLength(BaseTest):
|
|
async def test_ns_length(self, db: MoonrakerDatabase):
|
|
expected = len(TEST_DB["planets"])
|
|
fut = db.ns_length("planets")
|
|
result = check_future(fut, db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert result == expected
|
|
|
|
async def test_ns_length_invalid(self, db: MoonrakerDatabase):
|
|
with pytest.raises(ServerError):
|
|
fut = db.ns_length("invalid")
|
|
await fut
|
|
|
|
class TestNamespaceLengthThreaded(ThreadedTest, TestNamespaceLength):
|
|
pass
|
|
|
|
class TestNamespaceKeys(BaseTest):
|
|
async def test_ns_keys(self, db: MoonrakerDatabase):
|
|
expected = list(TEST_DB["planets"].keys())
|
|
fut = db.ns_keys("planets")
|
|
result = check_future(fut, db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert result == sorted(expected)
|
|
|
|
async def test_ns_keys_invalid(self, db: MoonrakerDatabase):
|
|
with pytest.raises(ServerError):
|
|
fut = db.ns_keys("invalid")
|
|
await fut
|
|
|
|
class TestNamespaceKeysThreaded(ThreadedTest, TestNamespaceKeys):
|
|
pass
|
|
|
|
class TestNamespaceValues(BaseTest):
|
|
async def test_ns_values(self, db: MoonrakerDatabase):
|
|
expected = [i[1] for i in sorted(TEST_DB["planets"].items(),
|
|
key=lambda d: d[0])]
|
|
fut = db.ns_values("planets")
|
|
result = check_future(fut, db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert result == expected
|
|
|
|
async def test_ns_values_invalid(self, db: MoonrakerDatabase):
|
|
with pytest.raises(ServerError):
|
|
fut = db.ns_values("invalid")
|
|
await fut
|
|
|
|
class TestNamespaceValuesThreaded(ThreadedTest, TestNamespaceValues):
|
|
pass
|
|
|
|
class TestNamespaceItems(BaseTest):
|
|
async def test_ns_items(self, db: MoonrakerDatabase):
|
|
expected = sorted(TEST_DB["planets"].items(), key=lambda d: d[0])
|
|
fut = db.ns_items("planets")
|
|
result = check_future(fut, db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert result == expected
|
|
|
|
async def test_ns_items_invalid(self, db: MoonrakerDatabase):
|
|
with pytest.raises(ServerError):
|
|
fut = db.ns_items("invalid")
|
|
await fut
|
|
|
|
class TestNamespaceItemsThreaded(ThreadedTest, TestNamespaceItems):
|
|
pass
|
|
|
|
class TestNamespaceContains(BaseTest):
|
|
async def test_ns_contains_record(self, db: MoonrakerDatabase):
|
|
fut = db.ns_contains("planets", "venus")
|
|
result = check_future(fut, db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert result is True
|
|
|
|
async def test_ns_not_contains_record(self, db: MoonrakerDatabase):
|
|
fut = db.ns_contains("planets", "mercury")
|
|
result = check_future(fut, db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert result is False
|
|
|
|
async def test_ns_contains_nested(self, db: MoonrakerDatabase):
|
|
fut = db.ns_contains("automobiles", "ford.f-series.f350.platinum")
|
|
result = check_future(fut, db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert result is True
|
|
|
|
async def test_ns_not_contains_nested(self, db: MoonrakerDatabase):
|
|
fut = db.ns_contains("automobiles", "ford.f-series.f250.fx4")
|
|
result = check_future(fut, db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert result is False
|
|
|
|
async def test_ns_contains_invalid(self, db: MoonrakerDatabase):
|
|
with pytest.raises(ServerError):
|
|
fut = db.ns_contains("invalid", "nokey")
|
|
await fut
|
|
|
|
class TestNamespaceConainsThreaded(ThreadedTest, TestNamespaceContains):
|
|
pass
|
|
|
|
class WrapperTest(BaseTest):
|
|
@pytest.fixture(scope="class")
|
|
def wrapped(self,
|
|
request: pytest.FixtureRequest,
|
|
db: MoonrakerDatabase
|
|
) -> NamespaceWrapper:
|
|
parse = not request.cls.__name__.endswith("NoParse")
|
|
return db.wrap_namespace("planets", parse_keys=parse)
|
|
|
|
@pytest.mark.asyncio
|
|
class WrapperTestThreaded:
|
|
@pytest_asyncio.fixture(scope="class")
|
|
async def wrapped(self,
|
|
request: pytest.FixtureRequest,
|
|
base_server: Server
|
|
) -> AsyncIterator[MoonrakerDatabase]:
|
|
base_server.load_components()
|
|
db: MoonrakerDatabase = base_server.lookup_component("database")
|
|
for ns, record in TEST_DB.items():
|
|
for record_name, value in record.items():
|
|
db.insert_item(ns, record_name, value)
|
|
parse = not request.cls.__name__.endswith("NoParse")
|
|
wrapped = db.wrap_namespace("planets", parse_keys=parse)
|
|
await base_server.server_init(False)
|
|
await base_server.start_server(False)
|
|
yield wrapped
|
|
await base_server._stop_server("terminate")
|
|
|
|
async def test_asdict(self, wrapped: NamespaceWrapper):
|
|
with pytest.raises(ServerError):
|
|
wrapped.as_dict()
|
|
|
|
async def test_contains_magic(self, wrapped: NamespaceWrapper):
|
|
with pytest.raises(ServerError):
|
|
"earth" in wrapped
|
|
|
|
|
|
class TestNamespaceWrapper(WrapperTest):
|
|
async def test_wrapped_insert(self, wrapped: NamespaceWrapper):
|
|
expected = copy.deepcopy(TEST_DB["planets"])
|
|
expected["oses"] = TEST_RECORD
|
|
wrapped.insert("oses", TEST_RECORD)
|
|
fut = wrapped.db.get_item(wrapped.namespace)
|
|
result = check_future(fut, wrapped.db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert result == expected
|
|
|
|
async def test_wrapped_get(self, wrapped: NamespaceWrapper):
|
|
fut = wrapped.get("oses")
|
|
result = check_future(fut, wrapped.db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert result == TEST_RECORD
|
|
|
|
async def test_wrapped_delete(self, wrapped: NamespaceWrapper):
|
|
del_fut = wrapped.delete("oses")
|
|
del_result = check_future(del_fut, wrapped.db)
|
|
if isawaitable(del_result):
|
|
del_result = await del_result
|
|
fut = wrapped.db.get_item(wrapped.namespace)
|
|
result = check_future(fut, wrapped.db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert (
|
|
del_result == TEST_RECORD and
|
|
result == TEST_DB["planets"]
|
|
)
|
|
|
|
async def test_wrapped_nested_insert(self, wrapped: NamespaceWrapper):
|
|
expected = copy.deepcopy(TEST_DB["planets"])
|
|
if wrapped.parse_keys:
|
|
expected["oses"] = {"nested": TEST_RECORD}
|
|
else:
|
|
expected["oses.nested"] = TEST_RECORD
|
|
wrapped.insert("oses.nested", TEST_RECORD)
|
|
fut = wrapped.db.get_item(wrapped.namespace)
|
|
result = check_future(fut, wrapped.db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert result == expected
|
|
|
|
async def test_nested_get(self, wrapped: NamespaceWrapper):
|
|
fut = wrapped.get("oses.nested")
|
|
result = check_future(fut, wrapped.db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert result == TEST_RECORD
|
|
|
|
async def test_wrapped_nested_delete(self, wrapped: NamespaceWrapper):
|
|
del_fut = wrapped.delete("oses.nested")
|
|
del_result = check_future(del_fut, wrapped.db)
|
|
if isawaitable(del_result):
|
|
del_result = await del_result
|
|
fut = wrapped.db.get_item(wrapped.namespace)
|
|
result = check_future(fut, wrapped.db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert (
|
|
del_result == TEST_RECORD and
|
|
result == TEST_DB["planets"]
|
|
)
|
|
|
|
async def test_update_child(self, wrapped: NamespaceWrapper):
|
|
expected = copy.deepcopy(TEST_DB["planets"])
|
|
expected["pluto"] = {"type": "dwarf"}
|
|
wrapped.update_child("pluto", {"type": "dwarf"})
|
|
fut = wrapped.db.get_item(wrapped.namespace)
|
|
result = check_future(fut, wrapped.db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert result == expected
|
|
|
|
async def test_update_child_nested(self, wrapped: NamespaceWrapper):
|
|
expected = copy.deepcopy(TEST_DB["planets"])
|
|
expected["pluto"] = {"type": "planet!"}
|
|
wrapped.update_child("pluto.type", "planet!")
|
|
fut = wrapped.db.get_item(wrapped.namespace)
|
|
result = check_future(fut, wrapped.db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert result == expected
|
|
|
|
async def test_update(self, wrapped: NamespaceWrapper):
|
|
expected = copy.deepcopy(TEST_DB["planets"])
|
|
upval = {"pluto": "Don't unplanet me!", "caprica": {"bsg": True}}
|
|
expected.update(upval)
|
|
wrapped.update(upval)
|
|
fut = wrapped.db.get_item(wrapped.namespace)
|
|
result = check_future(fut, wrapped.db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert result == expected
|
|
|
|
async def test_sync(self, wrapped: NamespaceWrapper):
|
|
wrapped.sync(TEST_DB["planets"])
|
|
fut = wrapped.db.get_item(wrapped.namespace)
|
|
result = check_future(fut, wrapped.db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert result == TEST_DB["planets"]
|
|
|
|
async def test_insert_batch(self, wrapped: NamespaceWrapper):
|
|
expected = copy.deepcopy(TEST_DB["planets"])
|
|
expected.update(TEST_RECORD)
|
|
wrapped.insert_batch(TEST_RECORD)
|
|
fut = wrapped.db.get_item(wrapped.namespace)
|
|
result = check_future(fut, wrapped.db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert result == expected
|
|
|
|
async def test_get_batch(self, wrapped: NamespaceWrapper):
|
|
fut = wrapped.get_batch(list(TEST_RECORD.keys()))
|
|
result = check_future(fut, wrapped.db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert result == TEST_RECORD
|
|
|
|
async def test_move_batch(self, wrapped: NamespaceWrapper):
|
|
expected = copy.deepcopy(TEST_DB["planets"])
|
|
expected.update({f"{k}_moved": v for k, v in TEST_RECORD.items()})
|
|
source_keys = list(TEST_RECORD.keys())
|
|
dest_keys = [f"{key}_moved" for key in source_keys]
|
|
wrapped.move_batch(source_keys, dest_keys)
|
|
fut = wrapped.db.get_item(wrapped.namespace)
|
|
result = check_future(fut, wrapped.db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert result == expected
|
|
|
|
async def test_delete_batch(self, wrapped: NamespaceWrapper):
|
|
expected = {f"{k}_moved": v for k, v in TEST_RECORD.items()}
|
|
fut = wrapped.delete_batch(list(expected.keys()))
|
|
result = check_future(fut, wrapped.db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert result == expected
|
|
|
|
async def test_length(self, wrapped: NamespaceWrapper):
|
|
fut = wrapped.length()
|
|
result = check_future(fut, wrapped.db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert result == len(TEST_DB["planets"])
|
|
|
|
async def test_asdict(self, wrapped: NamespaceWrapper):
|
|
assert wrapped.as_dict() == TEST_DB["planets"]
|
|
|
|
async def test_setitem_magic(self, wrapped: NamespaceWrapper):
|
|
expected = copy.deepcopy(TEST_DB["planets"])
|
|
expected["neptune"] = {"distance_from_sun": 2.8}
|
|
wrapped["neptune"] = {"distance_from_sun": 2.8}
|
|
fut = wrapped.db.get_item(wrapped.namespace)
|
|
result = check_future(fut, wrapped.db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert result == expected
|
|
|
|
async def test_getitem_magic(self, wrapped: NamespaceWrapper):
|
|
fut = wrapped["neptune"]
|
|
result = check_future(fut, wrapped.db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert result == {"distance_from_sun": 2.8}
|
|
|
|
async def test_getitem_magic_error(self, wrapped: NamespaceWrapper):
|
|
with pytest.raises(ServerError):
|
|
fut = wrapped["orthos"]
|
|
await fut
|
|
|
|
async def test_del_magic(self, wrapped: NamespaceWrapper):
|
|
del wrapped["neptune"]
|
|
fut = wrapped.db.get_item(wrapped.namespace)
|
|
result = check_future(fut, wrapped.db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert result == TEST_DB["planets"]
|
|
|
|
async def test_contains_magic(self, wrapped: NamespaceWrapper):
|
|
assert "earth" in wrapped
|
|
|
|
async def test_contains(self, wrapped: NamespaceWrapper):
|
|
fut = wrapped.contains("earth")
|
|
result = check_future(fut, wrapped.db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert result is True
|
|
|
|
async def test_contains_nested(self, wrapped: NamespaceWrapper):
|
|
fut = wrapped.contains("jupiter.io")
|
|
result = check_future(fut, wrapped.db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert result is wrapped.parse_keys
|
|
|
|
async def test_keys_method(self, wrapped: NamespaceWrapper):
|
|
fut = wrapped.keys()
|
|
result = check_future(fut, wrapped.db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert result == sorted(TEST_DB["planets"].keys())
|
|
|
|
async def test_values_method(self, wrapped: NamespaceWrapper):
|
|
expected = [TEST_DB["planets"][key] for key in
|
|
sorted(TEST_DB["planets"].keys())]
|
|
fut = wrapped.values()
|
|
result = check_future(fut, wrapped.db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert result == expected
|
|
|
|
async def test_items_method(self, wrapped: NamespaceWrapper):
|
|
expected = sorted(TEST_DB["planets"].items(), key=lambda x: x[0])
|
|
fut = wrapped.items()
|
|
result = check_future(fut, wrapped.db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert result == expected
|
|
|
|
async def test_pop(self, wrapped: NamespaceWrapper):
|
|
fut = wrapped.pop("jupiter")
|
|
result = check_future(fut, wrapped.db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert result == TEST_DB["planets"]["jupiter"]
|
|
|
|
async def test_pop_error(self, wrapped: NamespaceWrapper):
|
|
with pytest.raises(ServerError):
|
|
fut = wrapped.pop("invalid_key")
|
|
await fut
|
|
|
|
async def test_pop_default(self, wrapped: NamespaceWrapper):
|
|
fut = wrapped.pop("invalid_key", "default_value")
|
|
result = check_future(fut, wrapped.db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert result == "default_value"
|
|
|
|
async def test_clear(self, wrapped: NamespaceWrapper):
|
|
wrapped.clear()
|
|
fut = wrapped.db.get_item(wrapped.namespace)
|
|
result = check_future(fut, wrapped.db)
|
|
if isawaitable(result):
|
|
result = await result
|
|
assert result == {}
|
|
|
|
class TestNamespaceWrapperNoParse(TestNamespaceWrapper):
|
|
async def test_update_child_nested(self, wrapped: NamespaceWrapper):
|
|
with pytest.raises(ServerError):
|
|
ret = wrapped.update_child("pluto.type", "planet")
|
|
await ret
|
|
|
|
class TestNamespaceWrapperThreaded(WrapperTestThreaded, TestNamespaceWrapper):
|
|
pass
|
|
|
|
class TestNamespaceWrapperThreadedNoParse(
|
|
WrapperTestThreaded, TestNamespaceWrapperNoParse
|
|
):
|
|
pass
|
|
|
|
def endpoint_result(req_args: Dict[str, Any], expected: Any) -> Dict[str, Any]:
|
|
return {
|
|
"namespace": req_args["namespace"],
|
|
"key": req_args.get("key"),
|
|
"value": expected
|
|
}
|
|
|
|
@pytest.mark.asyncio
|
|
class EndpointTest:
|
|
@pytest_asyncio.fixture(scope="class", autouse=True)
|
|
async def server(self, base_server: Server) -> AsyncIterator[Server]:
|
|
base_server.load_components()
|
|
db: MoonrakerDatabase = base_server.lookup_component("database")
|
|
for ns, record in TEST_DB.items():
|
|
for record_name, value in record.items():
|
|
db.insert_item(ns, record_name, value)
|
|
db.register_local_namespace("planets")
|
|
db.register_local_namespace("fruits", forbidden=True)
|
|
await base_server.server_init(False)
|
|
await base_server.start_server(False)
|
|
yield base_server
|
|
await base_server._stop_server("terminate")
|
|
|
|
@pytest.fixture(scope="class")
|
|
def db(self, server: Server) -> MoonrakerDatabase:
|
|
return server.lookup_component("database")
|
|
|
|
class TestHttpEndpoints(EndpointTest):
|
|
async def test_list_dbs(self, http_client: HttpClient):
|
|
expected = list(TEST_DB.keys())
|
|
expected.remove("fruits")
|
|
expected.extend(["moonraker", "gcode_metadata"])
|
|
ret = await http_client.get("/server/database/list")
|
|
assert sorted(ret["result"]["namespaces"]) == sorted(expected)
|
|
|
|
async def test_get_namespace(self, http_client: HttpClient):
|
|
args = {"namespace": "automobiles"}
|
|
ret = await http_client.get("/server/database/item", args)
|
|
assert ret["result"] == endpoint_result(args, TEST_DB["automobiles"])
|
|
|
|
async def test_get_namespace_not_exist(self, http_client: HttpClient):
|
|
with pytest.raises(http_client.error, match="HTTP 404:"):
|
|
args = {"namespace": "cities"}
|
|
await http_client.get("/server/database/item", args)
|
|
|
|
async def test_get_item(self, http_client: HttpClient):
|
|
args = {"namespace": "automobiles", "key": "ford.mustang"}
|
|
ret = await http_client.get("/server/database/item", args)
|
|
assert ret["result"] == endpoint_result(args, "red")
|
|
|
|
async def test_get_item_not_exist(self, http_client: HttpClient):
|
|
with pytest.raises(http_client.error, match="HTTP 404:"):
|
|
args = {"namespace": "automobiles", "key": "ford.mustang.year"}
|
|
await http_client.get("/server/database/item", args)
|
|
|
|
async def test_get_item_protected_ns(self, http_client: HttpClient):
|
|
args = {"namespace": "planets", "key": "jupiter.gas_giant"}
|
|
ret = await http_client.get("/server/database/item", args)
|
|
assert ret["result"] == endpoint_result(args, True)
|
|
|
|
async def test_get_item_forbidden_ns(self, http_client: HttpClient):
|
|
with pytest.raises(http_client.error, match="HTTP 403:"):
|
|
args = {"namespace": "fruits", "key": "apples"}
|
|
await http_client.get("/server/database/item", args)
|
|
|
|
async def test_post_item(self, http_client: HttpClient,
|
|
db: MoonrakerDatabase):
|
|
args = {"namespace": "breakfast", "key": "cereal.sweet",
|
|
"value": "Count Chocula"}
|
|
ret = await http_client.post("/server/database/item", args)
|
|
check = await db.get_item("breakfast", "cereal.sweet")
|
|
assert ret["result"] == args and check == "Count Chocula"
|
|
|
|
async def test_post_item_no_key(self, http_client: HttpClient):
|
|
with pytest.raises(http_client.error, match="HTTP 400:"):
|
|
args = {"namespace": "breakfast", "value": "pancakes"}
|
|
await http_client.post("/server/database/item", args)
|
|
|
|
async def test_post_item_no_value(self, http_client: HttpClient):
|
|
with pytest.raises(http_client.error, match="HTTP 400:"):
|
|
args = {"namespace": "breakfast", "key": "pancakes"}
|
|
await http_client.post("/server/database/item", args)
|
|
|
|
async def test_post_item_protected(self, http_client: HttpClient):
|
|
with pytest.raises(http_client.error, match="HTTP 403:"):
|
|
args = {"namespace": "planets", "key": "jupiter.gas_giant",
|
|
"value": "biggest"}
|
|
await http_client.post("/server/database/item", args)
|
|
|
|
async def test_post_item_forbidden(self, http_client: HttpClient):
|
|
with pytest.raises(http_client.error, match="HTTP 403:"):
|
|
args = {"namespace": "fruits", "key": "cherries.color",
|
|
"value": "red"}
|
|
await http_client.post("/server/database/item", args)
|
|
|
|
async def test_delete_item(self, http_client: HttpClient,
|
|
db: MoonrakerDatabase):
|
|
args = {"namespace": "automobiles", "key": "ford.f-series.f150"}
|
|
ret = await http_client.delete("/server/database/item", args)
|
|
check = await db.get_item("automobiles", "ford.f-series")
|
|
assert (
|
|
ret["result"] == endpoint_result(args, [150, "black"])
|
|
and check == {"f350": {"platinum": 10000}}
|
|
)
|
|
|
|
async def test_delete_item_drop(self, http_client: HttpClient,
|
|
db: MoonrakerDatabase):
|
|
args = {"namespace": "vegetables", "key": "tomato"}
|
|
ret = await http_client.delete("/server/database/item", args)
|
|
assert (
|
|
ret["result"] == endpoint_result(args, "nope")
|
|
and "vegetables" not in db.namespaces
|
|
)
|
|
|
|
async def test_delete_item_not_found(self, http_client: HttpClient):
|
|
with pytest.raises(http_client.error, match="HTTP 404: Not Found"):
|
|
args = {"namespace": "automobiles", "key": "ford.pinto"}
|
|
await http_client.delete("/server/database/item", args)
|
|
|
|
class TestWebsocketEndpoints(EndpointTest):
|
|
async def test_list_dbs(self, websocket_client: WebsocketClient):
|
|
expected = list(TEST_DB.keys())
|
|
expected.remove("fruits")
|
|
expected.extend(["moonraker", "gcode_metadata"])
|
|
ret = await websocket_client.request("server.database.list")
|
|
assert sorted(ret["namespaces"]) == sorted(expected)
|
|
|
|
async def test_get_namespace(self, websocket_client: WebsocketClient):
|
|
args = {"namespace": "automobiles"}
|
|
ret = await websocket_client.request("server.database.get_item", args)
|
|
assert ret == endpoint_result(args, TEST_DB["automobiles"])
|
|
|
|
async def test_get_namespace_not_exist(self,
|
|
websocket_client: WebsocketClient):
|
|
expected = "Namespace 'cities' not found"
|
|
with pytest.raises(websocket_client.error, match=expected):
|
|
args = {"namespace": "cities"}
|
|
await websocket_client.request("server.database.get_item", args)
|
|
|
|
async def test_get_item(self, websocket_client: WebsocketClient):
|
|
args = {"namespace": "automobiles", "key": "ford.mustang"}
|
|
ret = await websocket_client.request("server.database.get_item", args)
|
|
assert ret == endpoint_result(args, "red")
|
|
|
|
async def test_get_item_not_exist(self,
|
|
websocket_client: WebsocketClient):
|
|
expected = (
|
|
"Key 'ford.mustang.year' in namespace 'automobiles' not found"
|
|
)
|
|
with pytest.raises(websocket_client.error, match=expected):
|
|
args = {"namespace": "automobiles", "key": "ford.mustang.year"}
|
|
await websocket_client.request("server.database.get_item", args)
|
|
|
|
async def test_get_item_protected_ns(self,
|
|
websocket_client: WebsocketClient):
|
|
args = {"namespace": "planets", "key": "jupiter.gas_giant"}
|
|
ret = await websocket_client.request("server.database.get_item", args)
|
|
assert ret == endpoint_result(args, True)
|
|
|
|
async def test_get_item_forbidden_ns(self,
|
|
websocket_client: WebsocketClient):
|
|
expected = "Read/Write access to namespace 'fruits' is forbidden"
|
|
with pytest.raises(websocket_client.error, match=expected):
|
|
args = {"namespace": "fruits", "key": "apples"}
|
|
await websocket_client.request("server.database.get_item", args)
|
|
|
|
async def test_post_item(self, websocket_client: WebsocketClient,
|
|
db: MoonrakerDatabase):
|
|
args = {"namespace": "breakfast", "key": "cereal.sweet",
|
|
"value": "Count Chocula"}
|
|
ret = await websocket_client.request("server.database.post_item", args)
|
|
check = await db.get_item("breakfast", "cereal.sweet")
|
|
assert ret == args and check == "Count Chocula"
|
|
|
|
async def test_post_item_no_key(self, websocket_client: WebsocketClient):
|
|
expected = "No data for argument: key"
|
|
with pytest.raises(websocket_client.error, match=expected):
|
|
args = {"namespace": "breakfast", "value": "pancakes"}
|
|
await websocket_client.request("server.database.post_item", args)
|
|
|
|
async def test_post_item_no_value(self, websocket_client: WebsocketClient):
|
|
expected = "No data for argument: value"
|
|
with pytest.raises(websocket_client.error, match=expected):
|
|
args = {"namespace": "breakfast", "key": "pancakes"}
|
|
await websocket_client.request("server.database.post_item", args)
|
|
|
|
async def test_post_item_protected(self,
|
|
websocket_client: WebsocketClient):
|
|
expected = "Write access to namespace 'planets' is forbidden"
|
|
with pytest.raises(websocket_client.error, match=expected):
|
|
args = {"namespace": "planets", "key": "jupiter.gas_giant",
|
|
"value": "biggest"}
|
|
await websocket_client.request("server.database.post_item", args)
|
|
|
|
async def test_post_item_forbidden(self,
|
|
websocket_client: WebsocketClient):
|
|
expected = "Read/Write access to namespace 'fruits' is forbidden"
|
|
with pytest.raises(websocket_client.error, match=expected):
|
|
args = {"namespace": "fruits", "key": "cherries.color",
|
|
"value": "red"}
|
|
await websocket_client.request("server.database.post_item", args)
|
|
|
|
async def test_delete_item(self, websocket_client: WebsocketClient,
|
|
db: MoonrakerDatabase):
|
|
args = {"namespace": "automobiles", "key": "ford.f-series.f150"}
|
|
ret = await websocket_client.request(
|
|
"server.database.delete_item", args)
|
|
check = await db.get_item("automobiles", "ford.f-series")
|
|
assert (
|
|
ret == endpoint_result(args, [150, "black"])
|
|
and check == {"f350": {"platinum": 10000}}
|
|
)
|
|
|
|
async def test_delete_item_drop(self, websocket_client: WebsocketClient,
|
|
db: MoonrakerDatabase):
|
|
args = {"namespace": "vegetables", "key": "tomato"}
|
|
ret = await websocket_client.request(
|
|
"server.database.delete_item", args)
|
|
assert (
|
|
ret == endpoint_result(args, "nope")
|
|
and "vegetables" not in db.namespaces
|
|
)
|
|
|
|
async def test_delete_item_not_found(self,
|
|
websocket_client: WebsocketClient):
|
|
expected = "Key 'ford.pinto' in namespace 'automobiles' not found"
|
|
with pytest.raises(websocket_client.error, match=expected):
|
|
args = {"namespace": "automobiles", "key": "ford.pinto"}
|
|
await websocket_client.request("server.database.delete_item", args)
|
|
|
|
async def test_invalid_key(self, websocket_client: WebsocketClient):
|
|
expected = "Value for argument 'key' is an invalid type"
|
|
with pytest.raises(websocket_client.error, match=expected):
|
|
args = {"namespace": "planets", "key": {"ford": "pinto"}}
|
|
await websocket_client.request("server.database.get_item", args)
|