test: initial testing framework

This adds the framework for unit testing Moonraker via pytest.
Initally only moonraker.py, klippy_connection.py, and confighelper.py
have acceptable coverage.  Coverage for other modules will be added on
an incremental basis, when most of Moonraker's source is covered tests
will be conducted via GitHub actions.

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2022-02-05 20:13:48 -05:00
parent 34f607c061
commit 5f9706f6be
22 changed files with 3310 additions and 0 deletions

13
pytest.ini Normal file
View File

@ -0,0 +1,13 @@
[pytest]
minversion = 7.0
pythonpath = moonraker
testpaths = tests
required_plugins =
pytest-asyncio>=0.17.2
pytest-timeout>=2.1.0
asyncio_mode = strict
timeout = 20
timeout_method = signal
markers =
run_paths
no_ws_connect

View File

@ -0,0 +1,403 @@
[mcu]
serial: /dev/serial/by-id/usb
[printer]
kinematics: cartesian
max_velocity: 300
max_accel: 1500
max_z_velocity: 15
max_z_accel: 200
[stepper_x]
microsteps: 16
step_pin: PC0
dir_pin: !PL0
enable_pin: !PA7
rotation_distance: 32
endstop_pin: tmc2130_stepper_x:virtual_endstop
position_endstop: 0
position_min: 0
position_max: 250
homing_speed: 50
homing_retract_dist: 0
[tmc2130 stepper_x]
cs_pin: PG0
interpolate: True
run_current: .281738
hold_current: .281738
sense_resistor: 0.220
diag1_pin: !PK2
driver_IHOLDDELAY: 8
driver_TPOWERDOWN: 0
driver_TBL: 2
driver_TOFF: 3
driver_HEND: 1
driver_HSTRT: 5
driver_PWM_FREQ: 2
driver_PWM_GRAD: 2
driver_PWM_AMPL: 230
driver_PWM_AUTOSCALE: True
driver_SGT: 3
[stepper_y]
microsteps: 16
step_pin: PC1
dir_pin: PL1
enable_pin: !PA6
rotation_distance: 32
endstop_pin: tmc2130_stepper_y:virtual_endstop
position_endstop: -4
position_max: 210
position_min: -4
homing_speed: 50
homing_retract_dist: 0
[tmc2130 stepper_y]
cs_pin: PG2
interpolate: True
run_current: .3480291
hold_current: .3480291
sense_resistor: 0.220
diag1_pin: !PK7
driver_IHOLDDELAY: 8
driver_TPOWERDOWN: 0
driver_TBL: 2
driver_TOFF: 3
driver_HEND: 1
driver_HSTRT: 5
driver_PWM_FREQ: 2
driver_PWM_GRAD: 2
driver_PWM_AMPL: 235
driver_PWM_AUTOSCALE: True
driver_SGT: 3
[stepper_z]
microsteps: 16
step_pin: PC2
dir_pin: !PL2
enable_pin: !PA5
rotation_distance: 8
endstop_pin: probe:z_virtual_endstop
position_max: 220
position_min: -2
homing_speed: 13.333
[tmc2130 stepper_z]
cs_pin: PK5
interpolate: True
run_current: .53033
hold_current: .53033
sense_resistor: 0.220
diag1_pin: !PK6
driver_IHOLDDELAY: 8
driver_TPOWERDOWN: 0
driver_TBL: 2
driver_TOFF: 3
driver_HEND: 1
driver_HSTRT: 5
driver_PWM_FREQ: 2
driver_PWM_GRAD: 4
driver_PWM_AMPL: 200
driver_PWM_AUTOSCALE: True
driver_SGT: 4
[extruder]
microsteps: 8
step_pin: PC3
dir_pin: PL6
enable_pin: !PA4
rotation_distance: 6.53061216
full_steps_per_rotation: 400
nozzle_diameter: 0.4
filament_diameter: 1.750
max_extrude_cross_section: 50.0
# Allows to load filament and purge up to 500mm
max_extrude_only_distance: 500.0
max_extrude_only_velocity: 120.0
max_extrude_only_accel: 1250.0
heater_pin: PE5
sensor_type: ATC Semitec 104GT-2
sensor_pin: PF0
control: pid
pid_Kp: 16.13
pid_Ki: 1.1625
pid_Kd: 56.23
min_temp: 0
max_temp: 305
[tmc2130 extruder]
cs_pin: PK4
interpolate: True
run_current: 0.41432
hold_current: 0.3
sense_resistor: 0.220
diag1_pin: !PK3
driver_IHOLDDELAY: 8
driver_TPOWERDOWN: 0
driver_TBL: 2
driver_TOFF: 3
driver_HEND: 1
driver_HSTRT: 5
driver_PWM_FREQ: 2
driver_PWM_GRAD:4
driver_PWM_AMPL: 240
driver_PWM_AUTOSCALE: True
driver_SGT: 3
[heater_bed]
heater_pin: PG5
sensor_type: EPCOS 100K B57560G104F
sensor_pin: PF2
control: pid
pid_Kp: 126.13
pid_Ki: 4.3
pid_Kd: 924.76
min_temp: 0
max_temp: 125
[verify_heater heater_bed]
max_error: 240
check_gain_time: 120
[heater_fan nozzle_cooling_fan]
pin: PH5
heater: extruder
heater_temp: 50.0
[fan]
pin: PH3
[display]
lcd_type: hd44780
rs_pin: PD5
e_pin: PF7
d4_pin: PF5
d5_pin: PG4
d6_pin: PH7
d7_pin: PG3
encoder_pins: ^PJ1,^PJ2
click_pin: ^!PH6
[pause_resume]
[virtual_sdcard]
path: ${gcode_path}
[respond]
default_type: command
[probe]
pin: PB4
x_offset: 23
y_offset: 5
z_offset: 0.8
speed: 12.0
[bed_mesh]
speed: 140
horizontal_move_z: 2
mesh_min: 24, 6
mesh_max: 238, 210
probe_count: 7
mesh_pps: 2
fade_start: 1
fade_end: 10
fade_target: 0
move_check_distance: 15
algorithm: bicubic
bicubic_tension: .2
relative_reference_index: 24
faulty_region_1_min: 116.75, 41.81
faulty_region_1_max: 133.25, 78.81
faulty_region_2_min: 156.5, 99.31
faulty_region_2_max: 193.5, 115.81
faulty_region_3_min: 116.75, 136.21
faulty_region_3_max: 133.25, 173.31
[homing_override]
gcode:
G1 Z3 F600
G28 X0 Y0
G1 X131 Y108 F5000
G28 Z0
axes: Z
set_position_x: 0
set_position_y: 0
set_position_z: 0
[output_pin BEEPER_pin]
pin: PH2
pwm: True
value: 0
shutdown_value:0
cycle_time: 0.001
scale: 1000
[force_move]
enable_force_move: True
[idle_timeout]
gcode:
M104 S0
M84
[gcode_macro PAUSE]
rename_existing: BASE_PAUSE
gcode:
{% if not printer.pause_resume.is_paused %}
M600
{% endif %}
[gcode_macro M600]
variable_extr_temp: 0
gcode:
{% set X = params.X|default(100) %}
{% set Y = params.Y|default(100) %}
{% set Z = params.Z|default(100) %}
BASE_PAUSE
SET_GCODE_VARIABLE MACRO=M600 VARIABLE=extr_temp VALUE={printer.extruder.target}
G91
{% if printer.extruder.temperature|float > 180 %}
G1 E-.8 F2700
{% endif %}
G1 Z{Z}
G90
G1 X{X} Y{Y} F3000
[gcode_macro RESUME]
rename_existing: BASE_RESUME
gcode:
{% if printer.pause_resume.is_paused %}
{% if printer["gcode_macro M600"].extr_temp %}
M109 S{printer["gcode_macro M600"].extr_temp}
{% endif %}
BASE_RESUME
{% endif %}
[gcode_macro LOAD_FILAMENT]
gcode:
M117 Loading Filament...
G92 E0.0
G91
G1 E50 F400
G1 E25 F100
G90
G92 E0.0
M400
M117 Load Complete
UPDATE_DELAYED_GCODE ID=clear_display DURATION=5
[gcode_macro UNLOAD_FILAMENT]
gcode:
M117 Unloading Filament...
G92 E0.0
G91
G1 E-32 F5200
G1 E-10 F100
G1 E-38 F1000
G90
G92 E0.0
M400
M300 S300 P1000
M117 Remove Filament Now!
UPDATE_DELAYED_GCODE ID=clear_display DURATION=5
[gcode_macro G80]
gcode:
G28
BED_MESH_CALIBRATE
G1 X0 Y0 F4000
[gcode_macro G81]
gcode:
{% set S = params.S|default(0) %}
BED_MESH_OUTPUT CENTER_ZERO={S}
[gcode_macro M300]
gcode:
{% set S = params.S|default(1000) %}
{% set P = params.P|default(100) %}
SET_PIN PIN=BEEPER_pin VALUE={S}
G4 P{P}
SET_PIN PIN=BEEPER_pin VALUE=0
[gcode_macro PRINT_START]
gcode:
{% set MATERIAL = params.MATERIAL|default("Unknown") %}
{% set LAYER_HEIGHT = params.LAYER_HEIGHT|default(0) %}
M83
CLEAR_PAUSE
SET_IDLE_TIMEOUT TIMEOUT=600
SET_PRESSURE_ADVANCE ADVANCE=0
SET_GCODE_OFFSET Z=0
G90
M104 S170
M190 S{params.BTMP}
M109 S170
G80
M104 S{params.ETMP}
G1 X1 Y-3.0 Z20 F1000.0 ; go outside print area
M109 S{params.ETMP}
G1 Z.4
G92 E0.0
G91
G1 X60.0 E9.0 F1000.0 ; intro line
G1 X40.0 E12.5 F1000.0 ; intro line
G90
G92 E0.0
{% if MATERIAL != "PLA" %}
SET_VELOCITY_LIMIT SQUARE_CORNER_VELOCITY=1
{% endif %}
{% if LAYER_HEIGHT|float < 0.051 %}
M221 S100
{% else %}
M221 S95
{% endif %}
[gcode_macro PRINT_END]
gcode:
CLEAR_PAUSE
M400
BED_MESH_CLEAR
G92 E0.0
G91
{% if printer.gcode_move.gcode_position.x > 20 %}
{% if printer.gcode_move.gcode_position.y > 20 %}
G1 Z+1.00 X-20.0 Y-20.0 F20000 ;short quick move to disengage from print
{% else %}
G1 Z+1.00 X-20.0 F20000 ;short quick move to disengage from print
{% endif %}
{% elif printer.gcode_move.gcode_position.y > 20 %}
G1 Z+1.00 Y-20.0 F20000 ;short quick move to disengage from print
{% endif %}
G1 E-8.00 F500 ;retract additional filament to prevent oozing
G90
{% if printer.gcode_move.gcode_position.z < 100 %}
G0 Z100 F1500
{% elif printer.gcode_move.gcode_position.z < 190 %}
G91
G0 Z10 F1500
G90
{% endif %}
G0 X10 Y200 F6000
SET_GCODE_OFFSET Z=0 MOVE=1
TURN_OFF_HEATERS
SET_VELOCITY_LIMIT VELOCITY=300 SQUARE_CORNER_VELOCITY=5
M84
M107
M204 S3000
M221 S100
[gcode_macro CANCEL_PRINT]
rename_existing: BASE_CANCEL_PRINT
gcode:
PAUSE
SDCARD_RESET_FILE
PRINT_END
CLEAR_PAUSE
[gcode_macro TEST_REMOTE_METHOD]
gcode:
{action_call_remote_method(method="moonraker_test",
result="test")}

View File

@ -0,0 +1,403 @@
[mcu]
serial: /dev/serial/by-id/usb
printer]
kinematics: cartesian
max_velocity: 300
max_accel: 1500
max_z_velocity: 15
max_z_accel: 200
[stepper_x]
microsteps: 16
step_pin: PC0
dir_pin: !PL0
enable_pin: !PA7
rotation_distance: 32
endstop_pin: tmc2130_stepper_x:virtual_endstop
position_endstop: 0
position_min: 0
position_max: 250
homing_speed: 50
homing_retract_dist: 0
[tmc2130 stepper_x]
cs_pin: PG0
interpolate: True
run_current: .281738
hold_current: .281738
sense_resistor: 0.220
diag1_pin: !PK2
driver_IHOLDDELAY: 8
driver_TPOWERDOWN: 0
driver_TBL: 2
driver_TOFF: 3
driver_HEND: 1
driver_HSTRT: 5
driver_PWM_FREQ: 2
driver_PWM_GRAD: 2
driver_PWM_AMPL: 230
driver_PWM_AUTOSCALE: True
driver_SGT: 3
[stepper_y]
microsteps: 16
step_pin: PC1
dir_pin: PL1
enable_pin: !PA6
rotation_distance: 32
endstop_pin: tmc2130_stepper_y:virtual_endstop
position_endstop: -4
position_max: 210
position_min: -4
homing_speed: 50
homing_retract_dist: 0
[tmc2130 stepper_y]
cs_pin: PG2
interpolate: True
run_current: .3480291
hold_current: .3480291
sense_resistor: 0.220
diag1_pin: !PK7
driver_IHOLDDELAY: 8
driver_TPOWERDOWN: 0
driver_TBL: 2
driver_TOFF: 3
driver_HEND: 1
driver_HSTRT: 5
driver_PWM_FREQ: 2
driver_PWM_GRAD: 2
driver_PWM_AMPL: 235
driver_PWM_AUTOSCALE: True
driver_SGT: 3
[stepper_z]
microsteps: 16
step_pin: PC2
dir_pin: !PL2
enable_pin: !PA5
rotation_distance: 8
endstop_pin: probe:z_virtual_endstop
position_max: 220
position_min: -2
homing_speed: 13.333
[tmc2130 stepper_z]
cs_pin: PK5
interpolate: True
run_current: .53033
hold_current: .53033
sense_resistor: 0.220
diag1_pin: !PK6
driver_IHOLDDELAY: 8
driver_TPOWERDOWN: 0
driver_TBL: 2
driver_TOFF: 3
driver_HEND: 1
driver_HSTRT: 5
driver_PWM_FREQ: 2
driver_PWM_GRAD: 4
driver_PWM_AMPL: 200
driver_PWM_AUTOSCALE: True
driver_SGT: 4
[extruder]
microsteps: 8
step_pin: PC3
dir_pin: PL6
enable_pin: !PA4
rotation_distance: 6.53061216
full_steps_per_rotation: 400
nozzle_diameter: 0.4
filament_diameter: 1.750
max_extrude_cross_section: 50.0
# Allows to load filament and purge up to 500mm
max_extrude_only_distance: 500.0
max_extrude_only_velocity: 120.0
max_extrude_only_accel: 1250.0
heater_pin: PE5
sensor_type: ATC Semitec 104GT-2
sensor_pin: PF0
control: pid
pid_Kp: 16.13
pid_Ki: 1.1625
pid_Kd: 56.23
min_temp: 0
max_temp: 305
[tmc2130 extruder]
cs_pin: PK4
interpolate: True
run_current: 0.41432
hold_current: 0.3
sense_resistor: 0.220
diag1_pin: !PK3
driver_IHOLDDELAY: 8
driver_TPOWERDOWN: 0
driver_TBL: 2
driver_TOFF: 3
driver_HEND: 1
driver_HSTRT: 5
driver_PWM_FREQ: 2
driver_PWM_GRAD:4
driver_PWM_AMPL: 240
driver_PWM_AUTOSCALE: True
driver_SGT: 3
[heater_bed]
heater_pin: PG5
sensor_type: EPCOS 100K B57560G104F
sensor_pin: PF2
control: pid
pid_Kp: 126.13
pid_Ki: 4.3
pid_Kd: 924.76
min_temp: 0
max_temp: 125
[verify_heater heater_bed]
max_error: 240
check_gain_time: 120
[heater_fan nozzle_cooling_fan]
pin: PH5
heater: extruder
heater_temp: 50.0
[fan]
pin: PH3
[display]
lcd_type: hd44780
rs_pin: PD5
e_pin: PF7
d4_pin: PF5
d5_pin: PG4
d6_pin: PH7
d7_pin: PG3
encoder_pins: ^PJ1,^PJ2
click_pin: ^!PH6
[pause_resume]
[virtual_sdcard]
path: ${gcode_path}
[respond]
default_type: command
[probe]
pin: PB4
x_offset: 23
y_offset: 5
z_offset: 0.8
speed: 12.0
[bed_mesh]
speed: 140
horizontal_move_z: 2
mesh_min: 24, 6
mesh_max: 238, 210
probe_count: 7
mesh_pps: 2
fade_start: 1
fade_end: 10
fade_target: 0
move_check_distance: 15
algorithm: bicubic
bicubic_tension: .2
relative_reference_index: 24
faulty_region_1_min: 116.75, 41.81
faulty_region_1_max: 133.25, 78.81
faulty_region_2_min: 156.5, 99.31
faulty_region_2_max: 193.5, 115.81
faulty_region_3_min: 116.75, 136.21
faulty_region_3_max: 133.25, 173.31
[homing_override]
gcode:
G1 Z3 F600
G28 X0 Y0
G1 X131 Y108 F5000
G28 Z0
axes: Z
set_position_x: 0
set_position_y: 0
set_position_z: 0
[output_pin BEEPER_pin]
pin: PH2
pwm: True
value: 0
shutdown_value:0
cycle_time: 0.001
scale: 1000
[force_move]
enable_force_move: True
[idle_timeout]
gcode:
M104 S0
M84
[gcode_macro PAUSE]
rename_existing: BASE_PAUSE
gcode:
{% if not printer.pause_resume.is_paused %}
M600
{% endif %}
[gcode_macro M600]
variable_extr_temp: 0
gcode:
{% set X = params.X|default(100) %}
{% set Y = params.Y|default(100) %}
{% set Z = params.Z|default(100) %}
BASE_PAUSE
SET_GCODE_VARIABLE MACRO=M600 VARIABLE=extr_temp VALUE={printer.extruder.target}
G91
{% if printer.extruder.temperature|float > 180 %}
G1 E-.8 F2700
{% endif %}
G1 Z{Z}
G90
G1 X{X} Y{Y} F3000
[gcode_macro RESUME]
rename_existing: BASE_RESUME
gcode:
{% if printer.pause_resume.is_paused %}
{% if printer["gcode_macro M600"].extr_temp %}
M109 S{printer["gcode_macro M600"].extr_temp}
{% endif %}
BASE_RESUME
{% endif %}
[gcode_macro LOAD_FILAMENT]
gcode:
M117 Loading Filament...
G92 E0.0
G91
G1 E50 F400
G1 E25 F100
G90
G92 E0.0
M400
M117 Load Complete
UPDATE_DELAYED_GCODE ID=clear_display DURATION=5
[gcode_macro UNLOAD_FILAMENT]
gcode:
M117 Unloading Filament...
G92 E0.0
G91
G1 E-32 F5200
G1 E-10 F100
G1 E-38 F1000
G90
G92 E0.0
M400
M300 S300 P1000
M117 Remove Filament Now!
UPDATE_DELAYED_GCODE ID=clear_display DURATION=5
[gcode_macro G80]
gcode:
G28
BED_MESH_CALIBRATE
G1 X0 Y0 F4000
[gcode_macro G81]
gcode:
{% set S = params.S|default(0) %}
BED_MESH_OUTPUT CENTER_ZERO={S}
[gcode_macro M300]
gcode:
{% set S = params.S|default(1000) %}
{% set P = params.P|default(100) %}
SET_PIN PIN=BEEPER_pin VALUE={S}
G4 P{P}
SET_PIN PIN=BEEPER_pin VALUE=0
[gcode_macro PRINT_START]
gcode:
{% set MATERIAL = params.MATERIAL|default("Unknown") %}
{% set LAYER_HEIGHT = params.LAYER_HEIGHT|default(0) %}
M83
CLEAR_PAUSE
SET_IDLE_TIMEOUT TIMEOUT=600
SET_PRESSURE_ADVANCE ADVANCE=0
SET_GCODE_OFFSET Z=0
G90
M104 S170
M190 S{params.BTMP}
M109 S170
G80
M104 S{params.ETMP}
G1 X1 Y-3.0 Z20 F1000.0 ; go outside print area
M109 S{params.ETMP}
G1 Z.4
G92 E0.0
G91
G1 X60.0 E9.0 F1000.0 ; intro line
G1 X40.0 E12.5 F1000.0 ; intro line
G90
G92 E0.0
{% if MATERIAL != "PLA" %}
SET_VELOCITY_LIMIT SQUARE_CORNER_VELOCITY=1
{% endif %}
{% if LAYER_HEIGHT|float < 0.051 %}
M221 S100
{% else %}
M221 S95
{% endif %}
[gcode_macro PRINT_END]
gcode:
CLEAR_PAUSE
M400
BED_MESH_CLEAR
G92 E0.0
G91
{% if printer.gcode_move.gcode_position.x > 20 %}
{% if printer.gcode_move.gcode_position.y > 20 %}
G1 Z+1.00 X-20.0 Y-20.0 F20000 ;short quick move to disengage from print
{% else %}
G1 Z+1.00 X-20.0 F20000 ;short quick move to disengage from print
{% endif %}
{% elif printer.gcode_move.gcode_position.y > 20 %}
G1 Z+1.00 Y-20.0 F20000 ;short quick move to disengage from print
{% endif %}
G1 E-8.00 F500 ;retract additional filament to prevent oozing
G90
{% if printer.gcode_move.gcode_position.z < 100 %}
G0 Z100 F1500
{% elif printer.gcode_move.gcode_position.z < 190 %}
G91
G0 Z10 F1500
G90
{% endif %}
G0 X10 Y200 F6000
SET_GCODE_OFFSET Z=0 MOVE=1
TURN_OFF_HEATERS
SET_VELOCITY_LIMIT VELOCITY=300 SQUARE_CORNER_VELOCITY=5
M84
M107
M204 S3000
M221 S100
[gcode_macro CANCEL_PRINT]
rename_existing: BASE_CANCEL_PRINT
gcode:
PAUSE
SDCARD_RESET_FILE
PRINT_END
CLEAR_PAUSE
[gcode_macro TEST_REMOTE_METHOD]
gcode:
{action_call_remote_method(method="moonraker_test",
result="test")}

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,347 @@
[mcu]
serial: /dev/serial/by-id/usb
[printer]
kinematics: cartesian
max_velocity: 300
max_accel: 1500
max_z_velocity: 15
max_z_accel: 200
[stepper_x]
microsteps: 16
step_pin: PC0
dir_pin: !PL0
enable_pin: !PA7
rotation_distance: 32
endstop_pin: tmc2130_stepper_x:virtual_endstop
position_endstop: 0
position_min: 0
position_max: 250
homing_speed: 50
homing_retract_dist: 0
[tmc2130 stepper_x]
cs_pin: PG0
interpolate: True
run_current: .281738
hold_current: .281738
sense_resistor: 0.220
diag1_pin: !PK2
driver_IHOLDDELAY: 8
driver_TPOWERDOWN: 0
driver_TBL: 2
driver_TOFF: 3
driver_HEND: 1
driver_HSTRT: 5
driver_PWM_FREQ: 2
driver_PWM_GRAD: 2
driver_PWM_AMPL: 230
driver_PWM_AUTOSCALE: True
driver_SGT: 3
[stepper_y]
microsteps: 16
step_pin: PC1
dir_pin: PL1
enable_pin: !PA6
rotation_distance: 32
endstop_pin: tmc2130_stepper_y:virtual_endstop
position_endstop: -4
position_max: 210
position_min: -4
homing_speed: 50
homing_retract_dist: 0
[tmc2130 stepper_y]
cs_pin: PG2
interpolate: True
run_current: .3480291
hold_current: .3480291
sense_resistor: 0.220
diag1_pin: !PK7
driver_IHOLDDELAY: 8
driver_TPOWERDOWN: 0
driver_TBL: 2
driver_TOFF: 3
driver_HEND: 1
driver_HSTRT: 5
driver_PWM_FREQ: 2
driver_PWM_GRAD: 2
driver_PWM_AMPL: 235
driver_PWM_AUTOSCALE: True
driver_SGT: 3
[stepper_z]
microsteps: 16
step_pin: PC2
dir_pin: !PL2
enable_pin: !PA5
rotation_distance: 8
endstop_pin: probe:z_virtual_endstop
position_max: 220
position_min: -2
homing_speed: 13.333
[tmc2130 stepper_z]
cs_pin: PK5
interpolate: True
run_current: .53033
hold_current: .53033
sense_resistor: 0.220
diag1_pin: !PK6
driver_IHOLDDELAY: 8
driver_TPOWERDOWN: 0
driver_TBL: 2
driver_TOFF: 3
driver_HEND: 1
driver_HSTRT: 5
driver_PWM_FREQ: 2
driver_PWM_GRAD: 4
driver_PWM_AMPL: 200
driver_PWM_AUTOSCALE: True
driver_SGT: 4
[extruder]
microsteps: 8
step_pin: PC3
dir_pin: PL6
enable_pin: !PA4
rotation_distance: 6.53061216
full_steps_per_rotation: 400
nozzle_diameter: 0.4
filament_diameter: 1.750
max_extrude_cross_section: 50.0
# Allows to load filament and purge up to 500mm
max_extrude_only_distance: 500.0
max_extrude_only_velocity: 120.0
max_extrude_only_accel: 1250.0
heater_pin: PE5
sensor_type: ATC Semitec 104GT-2
sensor_pin: PF0
control: pid
pid_Kp: 16.13
pid_Ki: 1.1625
pid_Kd: 56.23
min_temp: 0
max_temp: 305
[tmc2130 extruder]
cs_pin: PK4
interpolate: True
run_current: 0.41432
hold_current: 0.3
sense_resistor: 0.220
diag1_pin: !PK3
driver_IHOLDDELAY: 8
driver_TPOWERDOWN: 0
driver_TBL: 2
driver_TOFF: 3
driver_HEND: 1
driver_HSTRT: 5
driver_PWM_FREQ: 2
driver_PWM_GRAD:4
driver_PWM_AMPL: 240
driver_PWM_AUTOSCALE: True
driver_SGT: 3
[heater_bed]
heater_pin: PG5
sensor_type: EPCOS 100K B57560G104F
sensor_pin: PF2
control: pid
pid_Kp: 126.13
pid_Ki: 4.3
pid_Kd: 924.76
min_temp: 0
max_temp: 125
[verify_heater heater_bed]
max_error: 240
check_gain_time: 120
[heater_fan nozzle_cooling_fan]
pin: PH5
heater: extruder
heater_temp: 50.0
[fan]
pin: PH3
[respond]
default_type: command
[probe]
pin: PB4
x_offset: 23
y_offset: 5
z_offset: 0.8
speed: 12.0
[bed_mesh]
speed: 140
horizontal_move_z: 2
mesh_min: 24, 6
mesh_max: 238, 210
probe_count: 7
mesh_pps: 2
fade_start: 1
fade_end: 10
fade_target: 0
move_check_distance: 15
algorithm: bicubic
bicubic_tension: .2
relative_reference_index: 24
faulty_region_1_min: 116.75, 41.81
faulty_region_1_max: 133.25, 78.81
faulty_region_2_min: 156.5, 99.31
faulty_region_2_max: 193.5, 115.81
faulty_region_3_min: 116.75, 136.21
faulty_region_3_max: 133.25, 173.31
[homing_override]
gcode:
G1 Z3 F600
G28 X0 Y0
G1 X131 Y108 F5000
G28 Z0
axes: Z
set_position_x: 0
set_position_y: 0
set_position_z: 0
[output_pin BEEPER_pin]
pin: PH2
pwm: True
value: 0
shutdown_value:0
cycle_time: 0.001
scale: 1000
[force_move]
enable_force_move: True
[idle_timeout]
gcode:
M104 S0
M84
[gcode_macro LOAD_FILAMENT]
gcode:
M117 Loading Filament...
G92 E0.0
G91
G1 E50 F400
G1 E25 F100
G90
G92 E0.0
M400
M117 Load Complete
UPDATE_DELAYED_GCODE ID=clear_display DURATION=5
[gcode_macro UNLOAD_FILAMENT]
gcode:
M117 Unloading Filament...
G92 E0.0
G91
G1 E-32 F5200
G1 E-10 F100
G1 E-38 F1000
G90
G92 E0.0
M400
M300 S300 P1000
M117 Remove Filament Now!
UPDATE_DELAYED_GCODE ID=clear_display DURATION=5
[gcode_macro G80]
gcode:
G28
BED_MESH_CALIBRATE
G1 X0 Y0 F4000
[gcode_macro G81]
gcode:
{% set S = params.S|default(0) %}
BED_MESH_OUTPUT CENTER_ZERO={S}
[gcode_macro M300]
gcode:
{% set S = params.S|default(1000) %}
{% set P = params.P|default(100) %}
SET_PIN PIN=BEEPER_pin VALUE={S}
G4 P{P}
SET_PIN PIN=BEEPER_pin VALUE=0
[gcode_macro PRINT_START]
gcode:
{% set MATERIAL = params.MATERIAL|default("Unknown") %}
{% set LAYER_HEIGHT = params.LAYER_HEIGHT|default(0) %}
M83
CLEAR_PAUSE
SET_IDLE_TIMEOUT TIMEOUT=600
SET_PRESSURE_ADVANCE ADVANCE=0
SET_GCODE_OFFSET Z=0
G90
M104 S170
M190 S{params.BTMP}
M109 S170
G80
M104 S{params.ETMP}
G1 X1 Y-3.0 Z20 F1000.0 ; go outside print area
M109 S{params.ETMP}
G1 Z.4
G92 E0.0
G91
G1 X60.0 E9.0 F1000.0 ; intro line
G1 X40.0 E12.5 F1000.0 ; intro line
G90
G92 E0.0
{% if MATERIAL != "PLA" %}
SET_VELOCITY_LIMIT SQUARE_CORNER_VELOCITY=1
{% endif %}
{% if LAYER_HEIGHT|float < 0.051 %}
M221 S100
{% else %}
M221 S95
{% endif %}
[gcode_macro PRINT_END]
gcode:
CLEAR_PAUSE
M400
BED_MESH_CLEAR
G92 E0.0
G91
{% if printer.gcode_move.gcode_position.x > 20 %}
{% if printer.gcode_move.gcode_position.y > 20 %}
G1 Z+1.00 X-20.0 Y-20.0 F20000 ;short quick move to disengage from print
{% else %}
G1 Z+1.00 X-20.0 F20000 ;short quick move to disengage from print
{% endif %}
{% elif printer.gcode_move.gcode_position.y > 20 %}
G1 Z+1.00 Y-20.0 F20000 ;short quick move to disengage from print
{% endif %}
G1 E-8.00 F500 ;retract additional filament to prevent oozing
G90
{% if printer.gcode_move.gcode_position.z < 100 %}
G0 Z100 F1500
{% elif printer.gcode_move.gcode_position.z < 190 %}
G91
G0 Z10 F1500
G90
{% endif %}
G0 X10 Y200 F6000
SET_GCODE_OFFSET Z=0 MOVE=1
TURN_OFF_HEATERS
SET_VELOCITY_LIMIT VELOCITY=300 SQUARE_CORNER_VELOCITY=5
M84
M107
M204 S3000
M221 S100
[gcode_macro TEST_REMOTE_METHOD]
gcode:
{action_call_remote_method(method="moonraker_test",
result="test")}

View File

@ -0,0 +1,18 @@
[server]
host: 0.0.0.0
port: 7010
ssl_port: 7011
klippy_uds_address: ${klippy_uds_path}
[database]
database_path: ${database_path}
[machine]
provider: none
[file_manager]
config_path: ${config_path}
log_path: ${log_path}
[secrets]
secrets_path: ${secrets_path}

View File

@ -0,0 +1,20 @@
[server]
host: 0.0.0.0
port: 7010
ssl_port: 7011
ssl_certificate_path: ${ssl_certificate_path}
ssl_key_path: ${ssl_key_path}
klippy_uds_address: ${klippy_uds_path}
[database]
database_path: ${database_path}
[machine]
provider: none
[file_manager]
config_path: ${config_path}
log_path: ${log_path}
[secrets]
secrets_path: ${secrets_path}

View File

@ -0,0 +1,18 @@
[server]
host: 0.0.0.0
port: 7010
klippy_uds_address: ${klippy_uds_path}
# Syntax error
database]
database_path: ${database_path}
[machine]
provider: none
[file_manager]
config_path: ${config_path}
log_path: ${log_path}
[secrets]
secrets_path: ${secrets_path}

View File

@ -0,0 +1,3 @@
[mqtt_credentials]
username: mqttuser
password: mqttpass

View File

@ -0,0 +1,6 @@
{
"mqtt_credentials": {
"username": "mqttuser",
"password": "mqttpass"
}
}

View File

@ -0,0 +1,39 @@
[prefix_sec one]
[prefix_sec two]
[prefix_sec three]
[test_options]
test_int: 1
test_float: 3.5
test_bool: True
test_string: Hello World
test_list:
one
two
three
test_int_list: 1,2,3
test_float_list: 1.5,2.8,3.2
test_multi_list:
1,2,3
4,5,6
test_dict:
one=1
two=2
three=3
test_dict_empty_field:
one=test
two
three
test_template: {secrets.mqtt_credentials.username}
test_gpio: gpiochip0/gpio26
test_gpio_no_chip: gpio26
test_gpio_invert: !gpiochip0/gpio26
test_gpio_no_chip_invert: !gpio26
# The following four options should result in an error, cant
# pullup/pulldown an output pin
test_gpio_pullup: ^gpiochip0/gpio26
test_gpio_pullup_no_chip: ^gpio26
test_gpio_pulldown: ~gpiochip0/gpio26
test_gpio_pulldown_no_chip: ~gpio26

View File

@ -0,0 +1,22 @@
[server]
host: 0.0.0.0
port: 7010
klippy_uds_address: ${klippy_uds_path}
# Add an option that is not registered, should
# generate a warning
unknown_option: True
[machine]
provider: none
[database]
database_path: ${database_path}
[file_manager]
config_path: ${config_path}
log_path: ${log_path}
[secrets]
secrets_path: ${secrets_path}
[machine unparsed]

177
tests/conftest.py Normal file
View File

@ -0,0 +1,177 @@
from __future__ import annotations
import pytest
import pytest_asyncio
import asyncio
import shutil
import re
import pathlib
import sys
import shlex
import tempfile
import subprocess
from typing import Iterator, Dict, AsyncIterator, Any
from moonraker import Server
from eventloop import EventLoop
import utils
from fixtures import KlippyProcess, HttpClient, WebsocketClient
ASSETS = pathlib.Path(__file__).parent.joinpath("assets")
def pytest_addoption(parser: pytest.Parser, pluginmanager):
parser.addoption("--klipper-path", action="store", dest="klipper_path")
parser.addoption("--klipper-exec", action="store", dest="klipper_exec")
def interpolate_config(source_path: pathlib.Path,
dest_path: pathlib.Path,
keys: Dict[str, Any]
) -> None:
def interp(match):
return str(keys[match.group(1)])
sub_data = re.sub(r"\${([^}]+)}", interp, source_path.read_text())
dest_path.write_text(sub_data)
@pytest.fixture(scope="session", autouse=True)
def ssl_certs() -> Iterator[Dict[str, pathlib.Path]]:
with tempfile.TemporaryDirectory(prefix="moonraker-certs-") as tmpdir:
tmp_path = pathlib.Path(tmpdir)
cert_path = tmp_path.joinpath("certificate.pem")
key_path = tmp_path.joinpath("privkey.pem")
cmd = (
f"openssl req -newkey rsa:4096 -nodes -keyout {key_path} "
f"-x509 -days 365 -out {cert_path} -sha256 "
"-subj '/C=US/ST=NRW/L=Earth/O=Moonraker/OU=IT/"
"CN=www.moonraker-test.com/emailAddress=mail@moonraker-test.com'"
)
args = shlex.split(cmd)
subprocess.run(args, check=True)
yield {
"ssl_certificate_path": cert_path,
"ssl_key_path": key_path,
}
@pytest.fixture(scope="class")
def event_loop() -> Iterator[asyncio.AbstractEventLoop]:
loop = asyncio.get_event_loop_policy().new_event_loop()
yield loop
loop.close()
@pytest.fixture(scope="class")
def path_args(request: pytest.FixtureRequest,
ssl_certs: Dict[str, pathlib.Path]
) -> Iterator[Dict[str, pathlib.Path]]:
path_marker = request.node.get_closest_marker("run_paths")
paths = {
"moonraker_conf": "base_server.conf",
"secrets": "secrets.ini",
"printer_cfg": "base_printer.cfg"
}
if path_marker is not None:
paths.update(path_marker.kwargs)
moon_cfg_path = ASSETS.joinpath(f"moonraker/{paths['moonraker_conf']}")
secrets_path = ASSETS.joinpath(f"moonraker/{paths['secrets']}")
pcfg_path = ASSETS.joinpath(f"klipper/{paths['printer_cfg']}")
with tempfile.TemporaryDirectory(prefix="moonraker-test") as tmpdir:
tmp_path = pathlib.Path(tmpdir)
secrets_dest = tmp_path.joinpath(paths['secrets'])
shutil.copy(secrets_path, secrets_dest)
cfg_path = tmp_path.joinpath("config")
cfg_path.mkdir()
log_path = tmp_path.joinpath("logs")
log_path.mkdir()
db_path = tmp_path.joinpath("database")
db_path.mkdir()
gcode_path = tmp_path.joinpath("gcode_files")
gcode_path.mkdir()
dest_paths = {
"asset_path": ASSETS,
"config_path": cfg_path,
"database_path": db_path,
"log_path": log_path,
"gcode_path": gcode_path,
"secrets_path": secrets_dest,
"klippy_uds_path": tmp_path.joinpath("klippy_uds"),
"klippy_pty_path": tmp_path.joinpath("klippy_pty"),
"klipper.dict": ASSETS.joinpath("klipper/klipper.dict"),
}
dest_paths.update(ssl_certs)
if "moonraker_log" in paths:
dest_paths['moonraker.log'] = log_path.joinpath(
paths["moonraker_log"])
moon_cfg_dest = cfg_path.joinpath("moonraker.conf")
interpolate_config(moon_cfg_path, moon_cfg_dest, dest_paths)
dest_paths['moonraker.conf'] = moon_cfg_dest
pcfg_dest = cfg_path.joinpath("printer.cfg")
interpolate_config(pcfg_path, pcfg_dest, dest_paths)
dest_paths['printer.cfg'] = pcfg_dest
if "moonraker_bkp" in paths:
bkp_source = ASSETS.joinpath("moonraker/base_server.conf")
bkp_dest = cfg_path.joinpath(paths["moonraker_bkp"])
interpolate_config(bkp_source, bkp_dest, dest_paths)
yield dest_paths
@pytest.fixture(scope="class")
def klippy(path_args: Dict[str, pathlib.Path],
pytestconfig: pytest.Config) -> Iterator[KlippyProcess]:
kpath = pytestconfig.getoption('klipper_path', "~/klipper")
kexec = pytestconfig.getoption('klipper_exec', None)
if kexec is None:
kexec = sys.executable
exec = pathlib.Path(kexec).expanduser()
klipper_path = pathlib.Path(kpath).expanduser()
base_cmd = f"{exec} {klipper_path}/klippy/klippy.py "
kproc = KlippyProcess(base_cmd, path_args)
kproc.start()
yield kproc
kproc.stop()
@pytest.fixture(scope="class")
def base_server(path_args: Dict[str, pathlib.Path],
event_loop: asyncio.AbstractEventLoop
) -> Iterator[Server]:
evtloop = EventLoop()
args = {
'config_file': str(path_args['moonraker.conf']),
'log_file': str(path_args.get("moonraker.log", "")),
'software_version': "moonraker-pytest"
}
ql = logger = None
if args["log_file"]:
ql, logger, warning = utils.setup_logging(args)
if warning:
args["log_warning"] = warning
yield Server(args, logger, evtloop)
if ql is not None:
ql.stop()
@pytest_asyncio.fixture(scope="class")
async def full_server(base_server: Server) -> AsyncIterator[Server]:
base_server.load_components()
ret = base_server.server_init(start_server=False)
await asyncio.wait_for(ret, 4.)
yield base_server
if base_server.event_loop.aioloop.is_running():
await base_server._stop_server(exit_reason="terminate")
@pytest_asyncio.fixture(scope="class")
async def ready_server(full_server: Server, klippy: KlippyProcess):
ret = full_server.start_server(connect_to_klippy=False)
await asyncio.wait_for(ret, 4.)
ret = full_server.klippy_connection.connect()
await asyncio.wait_for(ret, 4.)
yield full_server
@pytest_asyncio.fixture(scope="class")
async def http_client() -> AsyncIterator[HttpClient]:
client = HttpClient()
yield client
client.close()
@pytest_asyncio.fixture(scope="class")
async def websocket_client(request: pytest.FixtureRequest
) -> AsyncIterator[WebsocketClient]:
conn_marker = request.node.get_closest_marker("no_ws_connect")
client = WebsocketClient()
if conn_marker is None:
await client.connect()
yield client
client.close()

5
tests/fixtures/__init__.py vendored Normal file
View File

@ -0,0 +1,5 @@
from .klippy_process import KlippyProcess
from .http_client import HttpClient
from .websocket_client import WebsocketClient
__all__ = ("KlippyProcess", "HttpClient", "WebsocketClient")

77
tests/fixtures/http_client.py vendored Normal file
View File

@ -0,0 +1,77 @@
from __future__ import annotations
import json
from tornado.httpclient import AsyncHTTPClient, HTTPRequest
from tornado.httputil import HTTPHeaders
from tornado.escape import url_escape
from typing import Dict, Any, Optional
class HttpClient:
def __init__(self,
type: str = "http",
port: int = 7010
) -> None:
self.client = AsyncHTTPClient()
assert type in ["http", "https"]
self.prefix = f"{type}://127.0.0.1:{port}/"
self.last_response_headers: HTTPHeaders = HTTPHeaders()
def get_response_headers(self) -> HTTPHeaders:
return self.last_response_headers
async def _do_request(self,
method: str,
endpoint: str,
args: Dict[str, Any] = {},
headers: Optional[Dict[str, str]] = None
) -> Dict[str, Any]:
ep = "/".join([url_escape(part, plus=False) for part in
endpoint.lstrip("/").split("/")])
url = self.prefix + ep
method = method.upper()
body: Optional[str] = "" if method == "POST" else None
if args:
if method == "GET":
parts = []
for key, val in args.items():
if isinstance(val, list):
val = ",".join(val)
if val:
parts.append(f"{key}={val}")
else:
parts.append(key)
qs = url_escape("&".join(parts))
url += "?" + qs
else:
body = json.dumps(args)
if headers is None:
headers = {}
headers["Content-Type"] = "application/json"
request = HTTPRequest(url, method, headers, body=body,
request_timeout=2., connect_timeout=2.)
ret = await self.client.fetch(request)
self.last_response_headers = HTTPHeaders(ret.headers)
return json.loads(ret.body)
async def get(self,
endpoint: str,
args: Dict[str, Any] = {},
headers: Optional[Dict[str, str]] = None
) -> Dict[str, Any]:
return await self._do_request("GET", endpoint, args, headers)
async def post(self,
endpoint: str,
args: Dict[str, Any] = {},
headers: Optional[Dict[str, str]] = None,
) -> Dict[str, Any]:
return await self._do_request("POST", endpoint, args, headers)
async def delete(self,
endpoint: str,
args: Dict[str, Any] = {},
headers: Optional[Dict[str, str]] = None
) -> Dict[str, Any]:
return await self._do_request("DELETE", endpoint, args, headers)
def close(self):
self.client.close()

81
tests/fixtures/klippy_process.py vendored Normal file
View File

@ -0,0 +1,81 @@
from __future__ import annotations
import pytest
import os
import subprocess
import time
import pathlib
import shlex
from typing import Dict, Optional
class KlippyProcess:
def __init__(self,
base_cmd: str,
path_args: Dict[str, pathlib.Path],
) -> None:
self.base_cmd = base_cmd
self.config_path = path_args['printer.cfg']
self.orig_config = self.config_path
self.dict_path = path_args["klipper.dict"]
self.pty_path = path_args["klippy_pty_path"]
self.uds_path = path_args["klippy_uds_path"]
self.proc: Optional[subprocess.Popen] = None
self.fd: int = -1
def start(self):
if self.proc is not None:
return
args = (
f"{self.config_path} -o /dev/null -d {self.dict_path} "
f"-a {self.uds_path} -I {self.pty_path}"
)
cmd = f"{self.base_cmd} {args}"
cmd_parts = shlex.split(cmd)
self.proc = subprocess.Popen(cmd_parts)
for _ in range(25):
if self.pty_path.exists():
try:
self.fd = os.open(
str(self.pty_path), os.O_RDWR | os.O_NONBLOCK)
except Exception:
pass
else:
break
time.sleep(.01)
else:
self.stop()
pytest.fail("Unable to start Klippy process")
return False
return True
def send_gcode(self, gcode: str) -> None:
if self.fd == -1:
return
try:
os.write(self.fd, f"{gcode}\n".encode())
except Exception:
pass
def restart(self):
self.stop()
self.start()
def stop(self):
if self.fd != -1:
os.close(self.fd)
self.fd = -1
if self.proc is not None:
self.proc.terminate()
try:
self.proc.wait(2.)
except subprocess.TimeoutExpired:
self.proc.kill()
self.proc = None
def get_paths(self) -> Dict[str, pathlib.Path]:
return {
"printer.cfg": self.config_path,
"klipper.dict": self.dict_path,
"klippy_uds_path": self.uds_path,
"klippy_pty_path": self.pty_path,
}

135
tests/fixtures/websocket_client.py vendored Normal file
View File

@ -0,0 +1,135 @@
from __future__ import annotations
import pytest
import json
import asyncio
import tornado.websocket
from typing import (
TYPE_CHECKING,
Union,
Tuple,
Callable,
Dict,
List,
Any,
Optional,
)
if TYPE_CHECKING:
from tornado.websocket import WebSocketClientConnection
class WebsocketError(Exception):
def __init__(self, code, *args: object) -> None:
super().__init__(*args)
self.code = code
class WebsocketClient:
def __init__(self,
type: str = "ws",
port: int = 7010
) -> None:
self.ws: Optional[WebSocketClientConnection] = None
self.pending_requests: Dict[int, asyncio.Future] = {}
self.notify_cbs: Dict[str, List[Callable[..., None]]] = {}
assert type in ["ws", "wss"]
self.url = f"{type}://127.0.0.1:{port}/websocket"
async def connect(self, token: Optional[str] = None) -> None:
url = self.url
if token is not None:
url += f"?token={token}"
self.ws = await tornado.websocket.websocket_connect(
url, connect_timeout=2.,
on_message_callback=self._on_message_received)
async def request(self,
remote_method: str,
args: Dict[str, Any] = {}
) -> Dict[str, Any]:
if self.ws is None:
pytest.fail("Websocket Not Connected")
loop = asyncio.get_running_loop()
fut = loop.create_future()
req, req_id = self._encode_request(remote_method, args)
self.pending_requests[req_id] = fut
await self.ws.write_message(req)
return await asyncio.wait_for(fut, 2.)
def _encode_request(self,
method: str,
args: Dict[str, Any]
) -> Tuple[str, int]:
request: Dict[str, Any] = {
'jsonrpc': "2.0",
'method': method,
}
if args:
request['params'] = args
req_id = id(request)
request["id"] = req_id
return json.dumps(request), req_id
def _on_message_received(self, message: Union[str, bytes, None]) -> None:
if isinstance(message, str):
self._decode_jsonrpc(message)
def _decode_jsonrpc(self, data: str) -> None:
try:
resp: Dict[str, Any] = json.loads(data)
except json.JSONDecodeError:
pytest.fail(f"Websocket JSON Decode Error: {data}")
header = resp.get('jsonrpc', "")
if header != "2.0":
# Invalid Json, set error if we can get the id
pytest.fail(f"Invalid jsonrpc header: {data}")
req_id: Optional[int] = resp.get("id")
method: Optional[str] = resp.get("method")
if method is not None:
if req_id is not None:
params = resp.get("params", [])
if not isinstance(params, list):
pytest.fail("jsonrpc notification params"
f"should always be a list: {data}")
if method in self.notify_cbs:
for func in self.notify_cbs[method]:
func(*params)
else:
# This is a request from the server (should not happen)
pytest.fail(f"Server should not request from client: {data}")
elif req_id is not None:
pending_fut = self.pending_requests.pop(req_id, None)
if pending_fut is None:
# No future pending for this response
return
# This is a response
if "result" in resp:
pending_fut.set_result(resp["result"])
elif "error" in resp:
err = resp["error"]
try:
code = err["code"]
msg = err["message"]
except Exception:
pytest.fail(f"Invalid jsonrpc error: {data}")
exc = WebsocketError(code, msg)
pending_fut.set_exception(exc)
else:
pytest.fail(
f"Invalid jsonrpc packet, no result or error: {data}")
else:
# Invalid json
pytest.fail(f"Invalid jsonrpc packet, no id: {data}")
def register_notify_callback(self, name: str, callback) -> None:
if name in self.notify_cbs:
self.notify_cbs[name].append(callback)
else:
self.notify_cbs[name][callback]
def close(self):
for fut in self.pending_requests.values():
if not fut.done():
fut.set_exception(WebsocketError(
0, "Closing Websocket Client"))
if self.ws is not None:
self.ws.close(1000, "Test Complete")

70
tests/mocks/__init__.py Normal file
View File

@ -0,0 +1,70 @@
from __future__ import annotations
import asyncio
from utils import ServerError
from .mock_gpio import MockGpiod
__all__ = ("MockReader", "MockWriter", "MockComponent", "MockWebsocket",
"MockGpiod")
class MockWriter:
def __init__(self, wait_drain: bool = False) -> None:
self.wait_drain = wait_drain
def write(self, data: str) -> None:
pass
async def drain(self) -> None:
if self.wait_drain:
evt = asyncio.Event()
await evt.wait()
else:
raise ServerError("TestError")
class MockReader:
def __init__(self, action: str = "") -> None:
self.action = action
self.eof = False
def at_eof(self) -> bool:
return self.eof
async def readuntil(self, stop: bytes) -> bytes:
if self.action == "wait":
evt = asyncio.Event()
await evt.wait()
return b""
elif self.action == "raise_error":
raise ServerError("TestError")
else:
self.eof = True
return b"NotJsonDecodable"
class MockComponent:
def __init__(self,
err_init: bool = False,
err_exit: bool = False,
err_close: bool = False
) -> None:
self.err_init = err_init
self.err_exit = err_exit
self.err_close = err_close
async def component_init(self):
if self.err_init:
raise ServerError("test")
async def on_exit(self):
if self.err_exit:
raise ServerError("test")
async def close(self):
if self.err_close:
raise ServerError("test")
class MockWebsocket:
def __init__(self, fut: asyncio.Future) -> None:
self.future = fut
def queue_message(self, data: str):
self.future.set_result(data)

193
tests/mocks/mock_gpio.py Normal file
View File

@ -0,0 +1,193 @@
from __future__ import annotations
import os
import logging
from typing import Dict, Optional, List, Tuple
class GpioException(Exception):
pass
class MockGpiod:
LINE_REQ_DIR_OUT = 3
LINE_REQ_EV_BOTH_EDGES = 6
LINE_REQ_FLAG_ACTIVE_LOW = 1 << 2
LINE_REQ_FLAG_BIAS_DISABLE = 1 << 3
LINE_REQ_FLAG_BIAS_PULL_DOWN = 1 << 4
LINE_REQ_FLAG_BIAS_PULL_UP = 1 << 5
def __init__(self, version: str = "1.2") -> None:
self.version = version
self.Chip = MockChipWrapper(self)
self.LineEvent = MockLineEvent
self.chips: Dict[str, MockChip] = {}
def version_string(self) -> str:
return self.version
def version_tuple(self) -> Tuple[int, ...]:
return tuple([int(v) for v in self.version.split(".")])
def get_chip(self, chip_name) -> Optional[MockChip]:
return self.chips.get(chip_name, None)
def add_chip(self, chip: MockChip):
self.chips[chip.name] = chip
def pop_chip(self, name: str):
self.chips.pop(name, None)
def find_line(self, chip_id: str, pin_id: str) -> MockLine:
if chip_id not in self.chips:
raise GpioException(f"Unable to find chip {chip_id}")
return self.chips[chip_id].find_line(pin_id)
class MockChipWrapper:
OPEN_BY_NAME = 2
def __init__(self, gpiod: MockGpiod) -> None:
self.mock_gpiod = gpiod
def __call__(self, chip_name: str, flags: int) -> MockChip:
if chip_name in self.mock_gpiod.chips:
return self.mock_gpiod.chips[chip_name]
chip = MockChip(chip_name, flags, self.mock_gpiod)
self.mock_gpiod.add_chip(chip)
return chip
class MockChip:
def __init__(self,
chip_name: str,
flags: int,
mock_gpiod: MockGpiod
) -> None:
self.name = chip_name
self.flags = flags
self.mock_gpiod = mock_gpiod
self.requested_lines: Dict[str, MockLine] = {}
def get_line(self, pin_id: str) -> MockLine:
if pin_id in self.requested_lines:
raise GpioException(f"Line {pin_id} already reserved")
line = MockLine(self, pin_id, self.mock_gpiod)
self.requested_lines[pin_id] = line
return line
def find_line(self, pin_id: str) -> MockLine:
if pin_id not in self.requested_lines:
raise GpioException(f"Unable to find line {pin_id}")
return self.requested_lines[pin_id]
def pop_line(self, name: str) -> None:
self.requested_lines.pop(name, None)
def close(self) -> None:
for line in list(self.requested_lines.values()):
line.release()
self.requested_lines = {}
self.mock_gpiod.pop_chip(self.name)
class MockLine:
def __init__(self,
chip: MockChip,
name: str,
mock_gpiod: MockGpiod
) -> None:
self.mock_gpiod = mock_gpiod
self.chip = chip
self.name = name
self.consumer_name: str = ""
self.is_event = False
self.invert = False
self.value = 0
self.read_pipe: Optional[int] = None
self.write_pipe: Optional[int] = None
self.bias = "not_configured"
def request(self,
consumer: str,
type: int,
flags: int = 0,
default_vals: Optional[List[int]] = None,
default_val: Optional[int] = None
) -> None:
self.consumer_name = consumer
version = self.mock_gpiod.version_tuple()
if type == MockGpiod.LINE_REQ_DIR_OUT:
self.is_event = False
if default_vals is not None:
if version > (1, 2):
logging.warn("default_vals is deprecated in gpiod 1.3+")
self.value = default_vals[0]
elif default_val is not None:
if version < (1, 3):
raise GpioException(
"default_val not available in gpiod < 1.3")
self.value = default_val
elif type == MockGpiod.LINE_REQ_EV_BOTH_EDGES:
self.is_event = True
if version >= (1, 5):
if flags & MockGpiod.LINE_REQ_FLAG_BIAS_DISABLE:
self.bias = "disabled"
elif flags & MockGpiod.LINE_REQ_FLAG_BIAS_PULL_DOWN:
self.bias = "pulldown"
elif flags & MockGpiod.LINE_REQ_FLAG_BIAS_PULL_UP:
self.bias = "pullup"
self.read_pipe, self.write_pipe = os.pipe2(os.O_NONBLOCK)
else:
raise GpioException("Unsupported GPIO Type")
if flags & MockGpiod.LINE_REQ_FLAG_ACTIVE_LOW:
self.invert = True
def release(self) -> None:
if self.read_pipe is not None:
try:
os.close(self.read_pipe)
except Exception:
pass
if self.write_pipe is not None:
try:
os.close(self.write_pipe)
except Exception:
pass
self.chip.pop_line(self.name)
def set_value(self, value: int) -> None:
if self.is_event:
raise GpioException("Cannot set the value for an input pin")
self.value = int(not not value)
def get_value(self) -> int:
return self.value
def event_read(self) -> MockLineEvent:
if self.read_pipe is None:
raise GpioException
try:
data = os.read(self.read_pipe, 64)
except Exception:
pass
else:
value = int(not not data[-1])
self.value = value
return MockLineEvent(self.value)
def event_get_fd(self) -> int:
if self.read_pipe is None:
raise GpioException("Event not configured")
return self.read_pipe
def simulate_line_event(self, value: int) -> None:
if self.write_pipe is None:
raise GpioException("Event not configured")
val = bytes([int(not not value)])
try:
os.write(self.write_pipe, val)
except Exception:
pass
class MockLineEvent:
RISING_EDGE = 1
FALLING_EDGE = 2
def __init__(self, value: int) -> None:
if value == 1:
self.type = self.RISING_EDGE
else:
self.type = self.FALLING_EDGE

498
tests/test_config.py Normal file
View File

@ -0,0 +1,498 @@
from __future__ import annotations
import pathlib
import pytest
import hashlib
import confighelper
import shutil
from confighelper import ConfigError
from moonraker import Server
from utils import ServerError
from components import gpio
from mocks import MockGpiod
from typing import TYPE_CHECKING, Dict
if TYPE_CHECKING:
from confighelper import ConfigHelper
@pytest.fixture(scope="class")
def config(base_server: Server) -> ConfigHelper:
base_server.load_component(base_server.config, "secrets")
return base_server.config
@pytest.fixture(scope="class")
def test_config(config: ConfigHelper,
path_args: Dict[str, pathlib.Path]
) -> ConfigHelper:
assets = path_args['asset_path']
sup_cfg_path = assets.joinpath("moonraker/supplemental.conf")
if not sup_cfg_path.exists():
pytest.fail("Supplemental config not found")
cfg = config.read_supplemental_config(str(sup_cfg_path))
return cfg["test_options"]
@pytest.fixture(scope="function")
def gpio_config(test_config: ConfigHelper,
monkeypatch: pytest.MonkeyPatch
) -> ConfigHelper:
def load_gpio_mock(name: str) -> MockGpiod:
return MockGpiod()
monkeypatch.setattr(gpio, "load_system_module", load_gpio_mock)
yield test_config
server = test_config.get_server()
gpio_comp = server.lookup_component("gpio", None)
if gpio_comp is not None:
gpio_comp.close()
gpio_comp.reserved_gpios = {}
class TestConfigGeneric:
def test_get_server(self, config: ConfigHelper):
server = config.get_server()
assert isinstance(server, Server)
def test_get_item(self, config: ConfigHelper):
sec = config["file_manager"]
assert sec.section == "file_manager"
def test_get_item_fail(self, config: ConfigHelper):
with pytest.raises(ConfigError):
config["not_available"]
def test_contains(self, config: ConfigHelper):
assert "file_manager" in config
def test_not_contains(self, config: ConfigHelper):
assert "not_available" not in config
def test_has_option(self, config: ConfigHelper):
assert config.has_option("host")
def test_get_name(self, config: ConfigHelper):
assert config.get_name() == "server"
def test_get_options(self,
config: ConfigHelper,
path_args: Dict[str, pathlib.Path]):
expected = {
"host": "0.0.0.0",
"port": "7010",
"ssl_port": "7011",
"klippy_uds_address": str(path_args["klippy_uds_path"])
}
assert expected == config.get_options()
def test_get_hash(self, config: ConfigHelper):
opts = config.get_options()
expected_hash = hashlib.sha256()
for opt, val in opts.items():
expected_hash.update(opt.encode())
expected_hash.update(val.encode())
cfg_hash = config.get_hash().hexdigest()
assert cfg_hash == expected_hash.hexdigest()
def test_missing_supplemental_config(config: ConfigHelper):
no_file = pathlib.Path("nofile")
with pytest.raises(ConfigError):
config.read_supplemental_config(no_file)
def test_error_supplemental_config(config: ConfigHelper,
path_args: Dict[str, pathlib.Path]):
assets = path_args["asset_path"]
invalid_cfg = assets.joinpath("moonraker/invalid_config.conf")
if not invalid_cfg.exists():
pytest.fail("Invalid Config File does not exist")
with pytest.raises(ConfigError):
config.read_supplemental_config(invalid_cfg)
def test_prefix_sections(test_config: ConfigHelper):
prefix = test_config.get_prefix_sections("prefix_sec")
expected = ["prefix_sec one", "prefix_sec two", "prefix_sec three"]
assert prefix == expected
class TestGetString:
def test_get_str_exists(self, test_config: ConfigHelper):
val = test_config.get("test_string")
assert val == "Hello World"
def test_get_st_fail(self, test_config: ConfigHelper):
with pytest.raises(ConfigError):
test_config.get("invalid_option")
def test_get_st_default(self, test_config: ConfigHelper):
assert test_config.get("invalid_option", None) is None
def test_get_int_deprecate(self, test_config: ConfigHelper):
server = test_config.get_server()
test_config.get("test_string", deprecate=True)
expected = (
f"[test_options]: Option 'test_string' in is "
"deprecated, see the configuration documention "
"at https://moonraker.readthedocs.io"
)
assert expected in server.warnings
class TestGetInt:
def test_get_int_exists(self, test_config: ConfigHelper):
val = test_config.getint("test_int")
assert val == 1
def test_get_int_fail(self, test_config: ConfigHelper):
with pytest.raises(ConfigError):
test_config.getint("invalid_option")
def test_get_int_default(self, test_config: ConfigHelper):
assert test_config.getint("invalid_option", None) is None
def test_get_int_fail_above(self, test_config: ConfigHelper):
with pytest.raises(ConfigError):
test_config.getint("test_int", above=1)
def test_get_int_fail_below(self, test_config: ConfigHelper):
with pytest.raises(ConfigError):
test_config.getint("test_int", below=1)
def test_get_int_fail_minval(self, test_config: ConfigHelper):
with pytest.raises(ConfigError):
test_config.getint("test_int", minval=2)
def test_get_int_fail_maxval(self, test_config: ConfigHelper):
with pytest.raises(ConfigError):
test_config.getint("test_int", maxval=0)
def test_get_int_pass_all(self, test_config: ConfigHelper):
val = test_config.getint("test_int", above=0, below=2,
minval=1, maxval=1)
assert val == 1
def test_get_int_deprecate(self, test_config: ConfigHelper):
server = test_config.get_server()
test_config.getint("test_int", deprecate=True)
expected = (
f"[test_options]: Option 'test_int' in is "
"deprecated, see the configuration documention "
"at https://moonraker.readthedocs.io"
)
assert expected in server.warnings
class TestGetFloat:
def test_get_float_exists(self, test_config: ConfigHelper):
val = test_config.getfloat("test_float")
assert 3.5 == pytest.approx(val)
def test_get_float_fail(self, test_config: ConfigHelper):
with pytest.raises(ConfigError):
test_config.getfloat("invalid_option")
def test_get_float_default(self, test_config: ConfigHelper):
assert test_config.getfloat("invalid_option", None) is None
def test_get_float_fail_above(self, test_config: ConfigHelper):
with pytest.raises(ConfigError):
test_config.getfloat("test_float", above=3.55)
def test_get_float_fail_below(self, test_config: ConfigHelper):
with pytest.raises(ConfigError):
test_config.getfloat("test_float", below=3.45)
def test_get_float_fail_minval(self, test_config: ConfigHelper):
with pytest.raises(ConfigError):
test_config.getfloat("test_float", minval=3.6)
def test_get_float_fail_maxval(self, test_config: ConfigHelper):
with pytest.raises(ConfigError):
test_config.getfloat("test_float", maxval=3.45)
def test_get_float_pass_all(self, test_config: ConfigHelper):
val = test_config.getfloat("test_float", above=3.45, below=3.55,
minval=3, maxval=4)
assert 3.5 == pytest.approx(val)
def test_get_float_deprecate(self, test_config: ConfigHelper):
server = test_config.get_server()
test_config.getfloat("test_float", deprecate=True)
expected = (
f"[test_options]: Option 'test_float' in is "
"deprecated, see the configuration documention "
"at https://moonraker.readthedocs.io"
)
assert expected in server.warnings
class TestGetBoolean:
def test_get_boolean_exists(self, test_config: ConfigHelper):
val = test_config.getboolean("test_bool")
assert val is True
def test_get_float_fail(self, test_config: ConfigHelper):
with pytest.raises(ConfigError):
test_config.getboolean("invalid_option")
def test_get_float_default(self, test_config: ConfigHelper):
assert test_config.getboolean("invalid_option", None) is None
def test_get_int_deprecate(self, test_config: ConfigHelper):
server = test_config.get_server()
test_config.getboolean("test_bool", deprecate=True)
expected = (
f"[test_options]: Option 'test_bool' in is "
"deprecated, see the configuration documention "
"at https://moonraker.readthedocs.io"
)
assert expected in server.warnings
class TestGetList:
def test_get_list_exists(self, test_config: ConfigHelper):
val = test_config.getlist("test_list")
assert val == ["one", "two", "three"]
def test_get_list_fail(self, test_config: ConfigHelper):
with pytest.raises(ConfigError):
test_config.getlist("invalid_option")
def test_get_list_default(self, test_config: ConfigHelper):
assert test_config.getlist("invalid_option", None) is None
def test_get_int_list(self, test_config: ConfigHelper):
val = test_config.getintlist("test_int_list", separator=",")
assert val == [1, 2, 3]
def test_get_float_list(self, test_config: ConfigHelper):
val = test_config.getfloatlist("test_float_list", separator=",")
assert val == pytest.approx([1.5, 2.8, 3.2])
def test_get_multi_list(self, test_config: ConfigHelper):
val = test_config.getlists("test_multi_list", list_type=int,
separators=("\n", ","))
assert val == [[1, 2, 3], [4, 5, 6]]
def test_get_list_deprecate(self, test_config: ConfigHelper):
server = test_config.get_server()
test_config.getlist("test_list", deprecate=True)
expected = (
f"[test_options]: Option 'test_list' in is "
"deprecated, see the configuration documention "
"at https://moonraker.readthedocs.io"
)
assert expected in server.warnings
class TestGetDict:
def test_get_dict_exists(self, test_config: ConfigHelper):
val = test_config.getdict("test_dict", dict_type=int)
assert val == {"one": 1, "two": 2, "three": 3}
def test_get_dict_fail(self, test_config: ConfigHelper):
with pytest.raises(ConfigError):
test_config.getdict("invalid_option")
def test_get_dict_default(self, test_config: ConfigHelper):
assert test_config.getdict("invalid_option", None) is None
def test_get_dict_empty_fields(self, test_config: ConfigHelper):
val = test_config.getdict("test_dict_empty_field",
allow_empty_fields=True)
assert val == {"one": "test", "two": None, "three": None}
def test_get_dict_empty_fields_fail(self, test_config: ConfigHelper):
with pytest.raises(ConfigError):
test_config.getdict("test_dict_empty_field")
def test_get_dict_deprecate(self, test_config: ConfigHelper):
server = test_config.get_server()
test_config.getdict("test_dict", deprecate=True)
expected = (
f"[test_options]: Option 'test_dict' in is "
"deprecated, see the configuration documention "
"at https://moonraker.readthedocs.io"
)
assert expected in server.warnings
class TestGetTemplate:
def test_get_template_exists(self, test_config: ConfigHelper):
val = test_config.gettemplate("test_template").render()
assert val == "mqttuser"
@pytest.mark.asyncio
async def test_get_template_async(self, test_config: ConfigHelper):
templ = test_config.gettemplate("test_template", is_async=True)
val = await templ.render_async()
assert val == "mqttuser"
def test_get_template_plain(self, test_config: ConfigHelper):
val = test_config.gettemplate("test_string").render()
assert val == "Hello World"
def test_get_template_fail(self, test_config: ConfigHelper):
with pytest.raises(ConfigError):
test_config.gettemplate("invalid_option")
def test_get_template_render_fail(self, test_config: ConfigHelper):
with pytest.raises(ServerError):
test_config.gettemplate("test_template", is_async=True).render()
def test_get_template_default(self, test_config: ConfigHelper):
assert test_config.gettemplate("invalid_option", None) is None
def test_load_template(self, test_config: ConfigHelper):
val = test_config.load_template("test_template").render()
assert val == "mqttuser"
def test_load_template_default(self, test_config: ConfigHelper):
templ = test_config.load_template(
"invalid_option", "{secrets.mqtt_credentials.password}")
val = templ.render()
assert val == "mqttpass"
def test_get_template_deprecate(self, test_config: ConfigHelper):
server = test_config.get_server()
test_config.gettemplate("test_template", deprecate=True)
expected = (
f"[test_options]: Option 'test_template' in is "
"deprecated, see the configuration documention "
"at https://moonraker.readthedocs.io"
)
assert expected in server.warnings
class TestGetGpioOut:
def test_get_gpio_exists(self, gpio_config: ConfigHelper):
val: gpio.GpioOutputPin = gpio_config.getgpioout("test_gpio")
assert (
val.orig == "gpiochip0/gpio26" and
val.name == "gpiochip0:gpio26" and
val.inverted is False and
val.value == 0
)
def test_get_gpio_no_chip(self, gpio_config: ConfigHelper):
val: gpio.GpioOutputPin = gpio_config.getgpioout("test_gpio_no_chip")
assert (
val.orig == "gpio26" and
val.name == "gpiochip0:gpio26" and
val.inverted is False and
val.value == 0
)
def test_get_gpio_invert(self, gpio_config: ConfigHelper):
val: gpio.GpioOutputPin = gpio_config.getgpioout("test_gpio_invert")
assert (
val.orig == "!gpiochip0/gpio26" and
val.name == "gpiochip0:gpio26" and
val.inverted is True and
val.value == 0
)
def test_get_gpio_no_chip_invert(self, gpio_config: ConfigHelper):
val: gpio.GpioOutputPin = gpio_config.getgpioout(
"test_gpio_no_chip_invert")
assert (
val.orig == "!gpio26" and
val.name == "gpiochip0:gpio26" and
val.inverted is True and
val.value == 0
)
def test_get_gpio_initial_value(self, gpio_config: ConfigHelper):
val: gpio.GpioOutputPin = gpio_config.getgpioout(
"test_gpio", initial_value=1)
assert (
val.orig == "gpiochip0/gpio26" and
val.name == "gpiochip0:gpio26" and
val.inverted is False and
val.value == 1
)
def test_get_gpio_fail(self, gpio_config: ConfigHelper):
with pytest.raises(ConfigError):
gpio_config.getgpioout("invalid_option")
def test_get_gpio_default(self, gpio_config: ConfigHelper):
assert gpio_config.getgpioout("invalid_option", None) is None
@pytest.mark.parametrize("opt", ["pullup", "pullup_no_chip",
"pulldown", "pulldown_no_chip"])
def test_get_gpio_invalid(self, gpio_config: ConfigHelper, opt: str):
option = f"test_gpio_{opt}"
if not gpio_config.has_option(option):
pytest.fail(f"No option {option}")
with pytest.raises(ConfigError):
gpio_config.getgpioout(option)
def test_get_gpio_deprecated(self, gpio_config: ConfigHelper):
server = gpio_config.get_server()
gpio_config.getgpioout("test_gpio", deprecate=True)
expected = (
f"[test_options]: Option 'test_gpio' in is "
"deprecated, see the configuration documention "
"at https://moonraker.readthedocs.io"
)
assert expected in server.warnings
class TestGetConfiguration:
def test_get_config_no_exist(self, base_server: Server):
fake_path = pathlib.Path("no_exist")
if fake_path.exists():
pytest.fail("Path exists")
args = dict(base_server.app_args)
args["config_file"] = str(fake_path)
with pytest.raises(ConfigError):
confighelper.get_configuration(base_server, args)
def test_get_config_no_access(self,
base_server: Server,
path_args: Dict[str, pathlib.Path]
):
cfg_path = path_args["config_path"]
test_cfg = cfg_path.joinpath("test.conf")
shutil.copy(path_args["moonraker.conf"], test_cfg)
test_cfg.chmod(mode=222)
args = dict(base_server.app_args)
args["config_file"] = str(test_cfg)
with pytest.raises(ConfigError):
confighelper.get_configuration(base_server, args)
def test_get_config_no_server(self,
base_server: Server,
path_args: Dict[str, pathlib.Path]
):
assets = path_args['asset_path']
sup_cfg_path = assets.joinpath("moonraker/supplemental.conf")
if not sup_cfg_path.exists():
pytest.fail("Supplemental config not found")
args = dict(base_server.app_args)
args["config_file"] = str(sup_cfg_path)
with pytest.raises(ConfigError):
confighelper.get_configuration(base_server, args)
class TestBackupConfig:
def test_backup_fail(self, caplog: pytest.LogCaptureFixture):
fake_path = pathlib.Path("no_exist")
if fake_path.exists():
pytest.fail("Path exists")
confighelper.backup_config(fake_path)
assert "Failed to create a backup" == caplog.messages[-1]
def test_find_backup_fail(self):
fake_path = pathlib.Path("no_exist")
if fake_path.exists():
pytest.fail("Path exists")
result = confighelper.find_config_backup(fake_path)
assert result is None
def test_backup_config_success(self, path_args: Dict[str, pathlib.Path]):
cfg_path = path_args["moonraker.conf"]
bkp_dest = cfg_path.parent.joinpath(".moonraker.conf.bkp")
if bkp_dest.exists():
pytest.fail("Backup Already Exists")
confighelper.backup_config(str(cfg_path))
assert bkp_dest.is_file()
def test_backup_skip(self, path_args: Dict[str, pathlib.Path]):
cfg_path = path_args["moonraker.conf"]
bkp_dest = cfg_path.parent.joinpath(".moonraker.conf.bkp")
if not bkp_dest.exists():
pytest.fail("Backup Not Present")
stat = bkp_dest.stat()
confighelper.backup_config(str(cfg_path))
assert stat == bkp_dest.stat()
def test_find_backup(self, path_args: Dict[str, pathlib.Path]):
cfg_path = path_args["moonraker.conf"]
bkp_dest = cfg_path.parent.joinpath(".moonraker.conf.bkp")
bkp = confighelper.find_config_backup(str(cfg_path))
assert bkp == str(bkp_dest)

View File

@ -0,0 +1,271 @@
from __future__ import annotations
import pytest
import asyncio
import pathlib
from typing import TYPE_CHECKING, Dict
from moonraker import ServerError
from klippy_connection import KlippyRequest
from mocks import MockReader, MockWriter
if TYPE_CHECKING:
from moonraker import Server
from conftest import KlippyProcess
@pytest.mark.usefixtures("klippy")
@pytest.mark.asyncio
async def test_klippy_startup(full_server: Server):
evtloop = full_server.get_event_loop()
futs = [evtloop.create_future() for _ in range(3)]
events = {
"server:klippy_identified": lambda: futs[0].set_result("id"),
"server:klippy_started": lambda x: futs[1].set_result("started"),
"server:klippy_ready": lambda: futs[2].set_result("ready")
}
for name, func in events.items():
full_server.register_event_handler(name, func)
await full_server.start_server()
ret = await asyncio.wait_for(asyncio.gather(*futs), 4.)
assert (
ret == ["id", "started", "ready"] and
full_server.klippy_connection.is_connected()
)
@pytest.mark.asyncio
async def test_gcode_response(ready_server: Server,
klippy: KlippyProcess):
evtloop = ready_server.get_event_loop()
fut = evtloop.create_future()
def on_gc_resp(resp: str):
if not fut.done():
fut.set_result(resp)
ready_server.register_event_handler("server:gcode_response", on_gc_resp)
klippy.send_gcode("M118 Moonraker Test")
await asyncio.wait_for(fut, 1.)
assert "Moonraker Test" in fut.result()
@pytest.mark.asyncio
async def test_klippy_shutdown(ready_server: Server, klippy: KlippyProcess):
evtloop = ready_server.get_event_loop()
fut = evtloop.create_future()
def on_shutdown():
if not fut.done():
fut.set_result("shutdown")
ready_server.register_event_handler("server:klippy_shutdown", on_shutdown)
klippy.send_gcode("M112")
await asyncio.wait_for(fut, 2.)
assert fut.result() == "shutdown"
@pytest.mark.asyncio
async def test_klippy_disconnect(ready_server: Server, klippy: KlippyProcess):
evtloop = ready_server.get_event_loop()
fut = evtloop.create_future()
def on_disconnect():
if not fut.done():
fut.set_result("disconnect")
ready_server.register_event_handler("server:klippy_disconnect",
on_disconnect)
klippy.stop()
await asyncio.wait_for(fut, 2.)
assert fut.result() == "disconnect"
@pytest.mark.asyncio
async def test_klippy_reconnect(ready_server: Server, klippy: KlippyProcess):
evtloop = ready_server.get_event_loop()
fut = evtloop.create_future()
def on_reconnect():
if not fut.done():
fut.set_result("test")
ready_server.register_event_handler("server:klippy_ready",
on_reconnect)
klippy.send_gcode("RESTART")
await asyncio.wait_for(fut, 4.)
assert fut.result() == "test"
@pytest.mark.asyncio
async def test_no_klippy_connection_error(full_server: Server):
await full_server.start_server()
with pytest.raises(ServerError):
kapis = full_server.klippy_connection.klippy_apis
await kapis.run_gcode("M115")
@pytest.mark.asyncio
async def test_status_update(ready_server: Server, klippy: KlippyProcess):
evtloop = ready_server.get_event_loop()
fut = evtloop.create_future()
def on_status_update(data):
if not fut.done():
fut.set_result(data)
ready_server.register_event_handler("server:status_update",
on_status_update)
kapis = ready_server.klippy_connection.klippy_apis
await kapis.subscribe_objects({"toolhead": None})
klippy.send_gcode("G28")
await asyncio.wait_for(fut, 2.)
assert isinstance(fut.result(), dict)
@pytest.mark.run_paths(printer_cfg="error_printer.cfg")
@pytest.mark.asyncio
async def test_klippy_error(ready_server: Server):
kconn = ready_server.klippy_connection
assert kconn.state == "error"
@pytest.mark.run_paths(printer_cfg="missing_reqs.cfg")
@pytest.mark.asyncio
async def test_missing_reqs(ready_server: Server):
mreqs = sorted(ready_server.klippy_connection.missing_requirements)
expected = ["display_status", "pause_resume", "virtual_sdcard"]
assert mreqs == expected
@pytest.mark.asyncio
async def test_connection_close(full_server: Server):
await full_server.start_server()
# Test multiple close attempts, the second to enter
# should wait and exit
ret = full_server.klippy_connection.close(True)
ret2 = full_server.klippy_connection.close(True)
await asyncio.wait_for(asyncio.gather(ret, ret2), 4.)
kconn = full_server.klippy_connection
assert kconn.connection_task.cancelled()
@pytest.mark.asyncio
async def test_init_error(base_server: Server):
base_server.server_running = True
kconn = base_server.klippy_connection
def mock_is_connected():
return kconn.init_attempts < 3
kconn.is_connected = mock_is_connected
ret = await kconn._init_klippy_connection()
assert ret is False
def test_connect_fail(base_server: Server):
ret = base_server.klippy_connection.connect()
assert ret.result() is False
@pytest.mark.asyncio
async def test_wait_connect_fail(base_server: Server):
ret = await base_server.klippy_connection.wait_connected()
assert ret is False
@pytest.mark.asyncio
async def test_no_uds(base_server: Server):
attempts = [1, 2, 3]
def mock_is_running():
attempts.pop(0)
return len(attempts) > 0
base_server.is_running = mock_is_running
ret = await base_server.klippy_connection._do_connect()
assert ret is False
@pytest.mark.asyncio
async def test_no_uds_access(base_server: Server,
path_args: Dict[str, pathlib.Path]):
attempts = [1, 2, 3]
uds_path = path_args['klippy_uds_path']
uds_path.write_text("test")
uds_path.chmod(mode=222)
def mock_is_running():
attempts.pop(0)
return len(attempts) > 0
base_server.is_running = mock_is_running
ret = await base_server.klippy_connection._do_connect()
assert ret is False
@pytest.mark.asyncio
async def test_write_not_connected(base_server: Server):
req = KlippyRequest("", {})
kconn = base_server.klippy_connection
await kconn._write_request(req)
assert isinstance(req.response, ServerError)
@pytest.mark.asyncio
async def test_write_error(base_server: Server):
req = KlippyRequest("", {})
kconn = base_server.klippy_connection
kconn.writer = MockWriter()
await kconn._write_request(req)
assert isinstance(req.response, ServerError)
@pytest.mark.asyncio
async def test_write_cancelled(base_server: Server):
req = KlippyRequest("", {})
kconn = base_server.klippy_connection
kconn.writer = MockWriter(wait_drain=True)
task = base_server.event_loop.create_task(kconn._write_request(req))
base_server.event_loop.delay_callback(.01, task.cancel)
with pytest.raises(asyncio.CancelledError):
await task
@pytest.mark.asyncio
async def test_read_error(base_server: Server,
caplog: pytest.LogCaptureFixture):
mock_reader = MockReader("raise_error")
kconn = base_server.klippy_connection
await kconn._read_stream(mock_reader)
assert "Klippy Stream Read Error" == caplog.messages[-1]
@pytest.mark.asyncio
async def test_read_cancelled(base_server: Server):
mock_reader = MockReader("wait")
kconn = base_server.klippy_connection
task = base_server.event_loop.create_task(
kconn._read_stream(mock_reader))
base_server.event_loop.delay_callback(.01, task.cancel)
with pytest.raises(asyncio.CancelledError):
await task
@pytest.mark.asyncio
async def test_read_decode_error(base_server: Server,
caplog: pytest.LogCaptureFixture):
mock_reader = MockReader()
kconn = base_server.klippy_connection
await kconn._read_stream(mock_reader)
assert "Error processing Klippy Host Response:" in caplog.messages[-1]
def test_process_unknown_method(base_server: Server,
caplog: pytest.LogCaptureFixture):
cmd = {"method": "test_unknown"}
kconn = base_server.klippy_connection
kconn._process_command(cmd)
assert "Unknown method received: test_unknown" == caplog.messages[-1]
def test_process_unknown_request(base_server: Server,
caplog: pytest.LogCaptureFixture):
cmd = {"id": 4543}
kconn = base_server.klippy_connection
kconn._process_command(cmd)
expected = f"No request matching request ID: 4543, response: {cmd}"
assert expected == caplog.messages[-1]
def test_process_invalid_request(base_server: Server):
req = KlippyRequest("", {})
kconn = base_server.klippy_connection
kconn.pending_requests[req.id] = req
cmd = {"id": req.id}
kconn._process_command(cmd)
assert isinstance(req.response, ServerError)
# TODO: This can probably go in a class with test apis
@pytest.mark.asyncio
async def test_call_remote_method(base_server: Server,
klippy: KlippyProcess):
fut = base_server.get_event_loop().create_future()
def method_test(result):
fut.set_result(result)
base_server.register_remote_method("moonraker_test", method_test)
base_server.load_components()
await base_server.server_init()
ret = base_server.klippy_connection.wait_connected()
await asyncio.wait_for(ret, 4.)
klippy.send_gcode("TEST_REMOTE_METHOD")
await fut
await base_server._stop_server("terminate")
assert fut.result() == "test"

510
tests/test_server.py Normal file
View File

@ -0,0 +1,510 @@
from __future__ import annotations
import pytest
import pytest_asyncio
import asyncio
import socket
import pathlib
from collections import namedtuple
from moonraker import CORE_COMPONENTS, Server
from moonraker import main as servermain
from eventloop import EventLoop
from utils import ServerError
from confighelper import ConfigError
from components.klippy_apis import KlippyAPI
from mocks import MockComponent, MockWebsocket
from typing import (
TYPE_CHECKING,
AsyncIterator,
Dict,
Optional
)
if TYPE_CHECKING:
from fixtures import HttpClient, WebsocketClient
MockArgs = namedtuple('MockArgs', ["logfile", "nologfile", "configfile"])
@pytest.mark.run_paths(moonraker_conf="invalid_config.conf")
def test_invalid_config(path_args: Dict[str, pathlib.Path]):
evtloop = EventLoop()
args = {
'config_file': str(path_args['moonraker.conf']),
'log_file': "",
'software_version': "moonraker-pytest"
}
with pytest.raises(ConfigError):
Server(args, None, evtloop)
def test_config_and_log_warnings(path_args: Dict[str, pathlib.Path]):
evtloop = EventLoop()
args = {
'config_file': str(path_args['moonraker.conf']),
'log_file': "",
'software_version': "moonraker-pytest",
'log_warning': "Log Warning Test",
'config_warning': "Config Warning Test"
}
expected = ["Log Warning Test", "Config Warning Test"]
server = Server(args, None, evtloop)
assert server.warnings == expected
@pytest.mark.run_paths(moonraker_conf="unparsed_server.conf")
@pytest.mark.asyncio
async def test_unparsed_config_items(full_server: Server):
expected_warnings = [
"Unparsed config section [machine unparsed] detected.",
"Unparsed config option 'unknown_option: True' detected "
"in section [server]."]
warn_cnt = 0
for warn in full_server.warnings:
for expected in expected_warnings:
if warn.startswith(expected):
warn_cnt += 1
assert warn_cnt == 2
@pytest.mark.run_paths(moonraker_log="moonraker.log")
@pytest.mark.asyncio
async def test_file_logger(base_server: Server,
path_args: Dict[str, pathlib.Path]):
log_path = path_args.get("moonraker.log", None)
assert log_path is not None and log_path.exists()
def test_signal_handler(base_server: Server,
event_loop: asyncio.AbstractEventLoop):
base_server._handle_term_signal()
event_loop.run_forever()
assert base_server.exit_reason == "terminate"
class TestInstantiation:
def test_running(self, base_server: Server):
assert base_server.is_running() is False
def test_app_args(self,
path_args: Dict[str, pathlib.Path],
base_server: Server):
args = {
'config_file': str(path_args['moonraker.conf']),
'log_file': str(path_args.get("moonlog", "")),
'software_version': "moonraker-pytest"
}
assert base_server.get_app_args() == args
def test_pending_tasks(self, base_server: Server):
loop = base_server.get_event_loop().aioloop
assert len(loop._ready) == 0
def test_klippy_info(self, base_server: Server):
assert base_server.get_klippy_info() == {}
def test_klippy_state(self, base_server: Server):
assert base_server.get_klippy_state() == "disconnected"
def test_host_info(self, base_server: Server):
hinfo = {
'hostname': socket.gethostname(),
'address': "0.0.0.0",
'port': 7010,
'ssl_port': 7011
}
assert base_server.get_host_info() == hinfo
def test_klippy_connection(self, base_server: Server):
assert base_server.klippy_connection.is_connected() is False
def test_components(self, base_server: Server):
key_list = sorted(list(base_server.components.keys()))
assert key_list == [
"application",
"internal_transport",
"klippy_connection",
"websockets",
]
def test_endpoint_registered(self, base_server: Server):
app = base_server.moonraker_app
assert "/server/info" in app.api_cache
@pytest.mark.asyncio
async def test_notification(self, base_server: Server):
base_server.register_notification("test:test_event")
fut = base_server.event_loop.create_future()
wsm = base_server.lookup_component("websockets")
wsm.websockets[1] = MockWebsocket(fut)
base_server.send_event("test:test_event", "test")
ret = await fut
expected = {
'jsonrpc': "2.0",
'method': "notify_test_event",
'params': ["test"]
}
assert expected == ret
class TestLoadComponent:
def test_load_component_fail(self, base_server: Server):
with pytest.raises(ServerError):
base_server.load_component(
base_server.config, "invalid_component")
def test_failed_component_set(self, base_server: Server):
assert "invalid_component" in base_server.failed_components
def test_load_component_fail_with_default(self, base_server: Server):
comp = base_server.load_component(
base_server.config, "invalid_component", None)
assert comp is None
def test_lookup_failed(self, base_server: Server):
with pytest.raises(ServerError):
base_server.lookup_component("invalid_component")
def test_lookup_failed_with_default(self, base_server: Server):
comp = base_server.lookup_component("invalid_component", None)
assert comp is None
def test_load_component(self, base_server: Server):
comp = base_server.load_component(base_server.config, "klippy_apis")
assert isinstance(comp, KlippyAPI)
def test_lookup_component(self, base_server: Server):
comp = base_server.lookup_component('klippy_apis')
assert isinstance(comp, KlippyAPI)
def test_component_attr(self, base_server: Server):
key_list = sorted(list(base_server.components.keys()))
assert key_list == [
"application",
"internal_transport",
"klippy_apis",
"klippy_connection",
"websockets",
]
class TestCoreServer:
@pytest_asyncio.fixture(scope="class")
async def core_server(self, base_server: Server) -> AsyncIterator[Server]:
base_server.load_components()
yield base_server
await base_server._stop_server("terminate")
def test_running(self, core_server: Server):
assert core_server.is_running() is False
def test_http_servers(self, core_server: Server):
app = core_server.lookup_component("application")
assert (
app.http_server is None and
app.secure_server is None
)
def test_warnings(self, core_server: Server):
assert len(core_server.warnings) == 0
def test_failed_components(self, core_server: Server):
assert len(core_server.failed_components) == 0
def test_lookup_components(self, core_server: Server):
comps = []
for comp_name in CORE_COMPONENTS:
comps.append(core_server.lookup_component(comp_name, None))
assert None not in comps
def test_pending_tasks(self, core_server: Server):
loop = core_server.get_event_loop().aioloop
assert len(loop._ready) == 0
def test_register_component_fail(self, core_server: Server):
with pytest.raises(ServerError):
core_server.register_component("machine", object())
def test_register_remote_method(self, core_server: Server):
core_server.register_remote_method("moonraker_test", lambda: None)
kconn = core_server.klippy_connection
assert "moonraker_test" in kconn.remote_methods
def test_register_method_exists(self, core_server: Server):
with pytest.raises(ServerError):
core_server.register_remote_method(
"shutdown_machine", lambda: None)
class TestServerInit:
def test_running(self, full_server: Server):
assert full_server.is_running() is False
def test_http_servers(self, full_server: Server):
app = full_server.lookup_component("application")
assert (
app.http_server is None and
app.secure_server is None
)
def test_warnings(self, full_server: Server):
assert len(full_server.warnings) == 0
def test_failed_components(self, full_server: Server):
assert len(full_server.failed_components) == 0
def test_lookup_components(self, full_server: Server):
comps = []
for comp_name in CORE_COMPONENTS:
comps.append(full_server.lookup_component(comp_name, None))
assert None not in comps
def test_config_backup(self,
full_server: Server,
path_args: Dict[str, pathlib.Path]):
cfg = path_args["config_path"].joinpath(".moonraker.conf.bkp")
assert cfg.is_file()
class TestServerStart:
@pytest_asyncio.fixture(scope="class")
async def server(self, full_server: Server) -> Server:
await full_server.start_server(connect_to_klippy=False)
return full_server
def test_running(self, server: Server):
assert server.is_running() is True
def test_http_servers(self, server: Server):
app = server.lookup_component("application")
assert (
app.http_server is not None and
app.secure_server is None
)
@pytest.mark.run_paths(moonraker_conf="base_server_ssl.conf")
class TestSecureServerStart:
@pytest_asyncio.fixture(scope="class")
async def server(self, full_server: Server) -> Server:
await full_server.start_server(connect_to_klippy=False)
return full_server
def test_running(self, server: Server):
assert server.is_running() is True
def test_http_servers(self, server: Server):
app = server.lookup_component("application")
assert (
app.http_server is not None and
app.secure_server is not None
)
@pytest.mark.asyncio
async def test_component_init_error(base_server: Server):
base_server.register_component("testcomp", MockComponent(err_init=True))
await base_server.server_init(False)
assert "testcomp" in base_server.failed_components
@pytest.mark.asyncio
async def test_component_exit_error(base_server: Server,
caplog: pytest.LogCaptureFixture):
base_server.register_component("testcomp", MockComponent(err_exit=True))
await base_server._stop_server("terminate")
expected = "Error executing 'on_exit()' for component: testcomp"
assert expected in caplog.messages
@pytest.mark.asyncio
async def test_component_close_error(base_server: Server,
caplog: pytest.LogCaptureFixture):
base_server.register_component("testcomp", MockComponent(err_close=True))
await base_server._stop_server("terminate")
expected = "Error executing 'close()' for component: testcomp"
assert expected in caplog.messages
def test_register_event(base_server: Server):
def test_func():
pass
base_server.register_event_handler("test:my_test", test_func)
assert base_server.events["test:my_test"] == [test_func]
def test_register_async_event(base_server: Server):
async def test_func():
pass
base_server.register_event_handler("test:my_test", test_func)
assert base_server.events["test:my_test"] == [test_func]
@pytest.mark.asyncio
async def test_send_event(full_server: Server):
evtloop = full_server.get_event_loop()
fut = evtloop.create_future()
def test_func(arg):
fut.set_result(arg)
full_server.register_event_handler("test:my_test", test_func)
full_server.send_event("test:my_test", "test")
result = await fut
assert result == "test"
@pytest.mark.asyncio
async def test_send_async_event(full_server: Server):
evtloop = full_server.get_event_loop()
fut = evtloop.create_future()
async def test_func(arg):
fut.set_result(arg)
full_server.register_event_handler("test:my_test", test_func)
full_server.send_event("test:my_test", "test")
result = await fut
assert result == "test"
@pytest.mark.asyncio
async def test_register_remote_method_running(full_server: Server):
await full_server.start_server(connect_to_klippy=False)
with pytest.raises(ServerError):
full_server.register_remote_method(
"moonraker_test", lambda: None)
@pytest.mark.usefixtures("event_loop")
def test_main(path_args: Dict[str, pathlib.Path],
monkeypatch: pytest.MonkeyPatch,
caplog: pytest.LogCaptureFixture):
tries = [1]
def mock_init(self: Server):
reason = "terminate"
if tries:
reason = "restart"
tries.pop(0)
self.event_loop.delay_callback(.01, self._stop_server, reason)
cfg_path = path_args["moonraker.conf"]
args = MockArgs("", True, str(cfg_path))
monkeypatch.setattr(Server, "server_init", mock_init)
code: Optional[int] = None
try:
servermain(args)
except SystemExit as e:
code = e.code
assert (
code == 0 and
"Attempting Server Restart..." in caplog.messages and
"Server Shutdown" == caplog.messages[-1]
)
@pytest.mark.run_paths(moonraker_conf="invalid_config.conf")
def test_main_config_error(path_args: Dict[str, pathlib.Path],
caplog: pytest.LogCaptureFixture):
cfg_path = path_args["moonraker.conf"]
args = MockArgs("", True, str(cfg_path))
try:
servermain(args)
except SystemExit as e:
code = e.code
assert code == 1 and "Server Config Error" in caplog.messages
@pytest.mark.run_paths(moonraker_conf="invalid_config.conf",
moonraker_bkp=".moonraker.conf.bkp")
@pytest.mark.usefixtures("event_loop")
def test_main_restore_config(path_args: Dict[str, pathlib.Path],
monkeypatch: pytest.MonkeyPatch,
caplog: pytest.LogCaptureFixture):
def mock_init(self: Server):
reason = "terminate"
self.event_loop.delay_callback(.01, self._stop_server, reason)
cfg_path = path_args["moonraker.conf"]
args = MockArgs("", True, str(cfg_path))
monkeypatch.setattr(Server, "server_init", mock_init)
code: Optional[int] = None
try:
servermain(args)
except SystemExit as e:
code = e.code
assert (
code == 0 and
"Loaded server from most recent working configuration:" in caplog.text
)
class TestEndpoints:
@pytest_asyncio.fixture(scope="class")
async def server(self, full_server: Server):
await full_server.start_server()
yield full_server
@pytest.mark.asyncio
async def test_http_server_info(self,
server: Server,
http_client: HttpClient):
ret = await http_client.get("/server/info")
comps = list(server.components.keys())
expected = {
'klippy_connected': False,
'klippy_state': "disconnected",
'components': comps,
'failed_components': [],
'registered_directories': ["config", "logs"],
'warnings': [],
'websocket_count': 0,
'moonraker_version': "moonraker-pytest",
'missing_klippy_requirements': []
}
assert ret["result"] == expected
@pytest.mark.asyncio
async def test_http_server_config(self,
server: Server,
http_client: HttpClient):
cfg = server.config.get_parsed_config()
ret = await http_client.get("/server/config")
assert ret["result"]["config"] == cfg
@pytest.mark.asyncio
async def test_websocket_server_info(self,
server: Server,
websocket_client: WebsocketClient):
ret = await websocket_client.request("server.info")
comps = list(server.components.keys())
expected = {
'klippy_connected': False,
'klippy_state': "disconnected",
'components': comps,
'failed_components': [],
'registered_directories': ["config", "logs"],
'warnings': [],
'websocket_count': 1,
'moonraker_version': "moonraker-pytest",
'missing_klippy_requirements': []
}
assert ret == expected
@pytest.mark.asyncio
async def test_websocket_server_config(self,
server: Server,
websocket_client: WebsocketClient):
cfg = server.config.get_parsed_config()
ret = await websocket_client.request("server.config")
assert ret["config"] == cfg
def test_server_restart(base_server: Server,
http_client: HttpClient,
event_loop: asyncio.AbstractEventLoop):
result = {}
async def do_restart():
base_server.load_components()
await base_server.start_server()
ret = await http_client.post("/server/restart")
result.update(ret)
event_loop.create_task(do_restart())
event_loop.run_forever()
assert result["result"] == "ok" and base_server.exit_reason == "restart"
@pytest.mark.no_ws_connect
def test_websocket_restart(base_server: Server,
websocket_client: WebsocketClient,
event_loop: asyncio.AbstractEventLoop):
result = {}
async def do_restart():
base_server.load_components()
await base_server.start_server()
await websocket_client.connect()
ret = await websocket_client.request("server.restart")
result["result"] = ret
event_loop.create_task(do_restart())
event_loop.run_forever()
assert result["result"] == "ok" and base_server.exit_reason == "restart"
# TODO:
# test invalid cert, key (probably should do that in test_app.py)