wled: Add serial support for direct communication (#325)

wled: Add serial support for direct communication

Signed-off-by:  Richard Mitchell <richardjm+moonraker@gmail.com>
This commit is contained in:
Richard Mitchell 2022-01-13 00:56:14 +00:00 committed by GitHub
parent 59fd35827e
commit cfd3d63a0d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 129 additions and 40 deletions

View File

@ -1106,9 +1106,16 @@ Enables control of an WLED strip.
# moonraker.conf
[wled strip_name]
type:
# The type of device. Can be either http, or serial.
# This parameter must be provided.
address:
# The address should be a valid ip or hostname for the wled webserver and
# must be specified
# The address should be a valid ip or hostname for the wled webserver.
# Required when type: http
serial:
# The serial port to be used to communicate directly to wled. Requires wled
# 0.13 Build 2108250 or later.
# Required when type: serial
initial_preset:
# Initial preset ID (favourite) to use. If not specified initial_colors
# will be used instead.
@ -1124,21 +1131,29 @@ color_order:
# Color order for WLED strip, RGB or RGBW (default: RGB)
```
Below are some potential examples:
Below are some examples:
```ini
# moonraker.conf
[wled case]
type: http
address: led1.lan
initial_preset: 45
chain_count: 76
[wled lounge]
type: http
address: 192.168.0.45
initial_red: 0.5
initial_green: 0.4
initial_blue: 0.3
chain_count: 42
[wled stealthburner]
type: serial
serial: /dev/serial/by-id/usb-1a86_USB_Serial-if00-port0
initial_white: 0.6
chain_count: 3
```
It is possible to control wled from the klippy host, this can be done using

View File

@ -1,6 +1,6 @@
# WLED neopixel support
#
# Copyright (C) 2021 Richard Mitchell <richardjm+moonraker@gmail.com>
# Copyright (C) 2021-2022 Richard Mitchell <richardjm+moonraker@gmail.com>
#
# This file may be distributed under the terms of the GNU GPLv3 license.
@ -13,6 +13,8 @@ from enum import Enum
import logging
import json
import asyncio
import os
import serial
from tornado.httpclient import AsyncHTTPClient
from tornado.httpclient import HTTPRequest
from tornado.escape import json_decode
@ -49,26 +51,17 @@ class OnOff(str, Enum):
on: str = "on"
off: str = "off"
class Strip:
class Strip():
def __init__(self: Strip,
name: str,
color_order: ColorOrder,
cfg: ConfigHelper):
self.server = cfg.get_server()
self.client = AsyncHTTPClient()
self.request_mutex = asyncio.Lock()
self.name = name
self.color_order = color_order
# Read the uri information
addr: str = cfg.get("address")
port: int = cfg.getint("port", 80)
protocol: str = cfg.get("protocol", "http")
self.url = f"{protocol}://{addr}:{port}/json"
self.timeout: float = cfg.getfloat("timeout", 2.)
self.initial_preset: int = cfg.getint("initial_preset", -1)
self.initial_red: float = cfg.getfloat("initial_red", 0.5)
self.initial_green: float = cfg.getfloat("initial_green", 0.5)
@ -128,35 +121,28 @@ class Strip:
elem_size = len(led_data)
self._chain_data[(index-1)*elem_size:index*elem_size] = led_data
async def send_wled_command_impl(self: Strip,
state: Dict[str, Any]) -> None:
pass
def close(self: Strip):
pass
async def _send_wled_command(self: Strip,
state: Dict[str, Any]) -> None:
async with self.request_mutex:
try:
logging.debug(f"WLED: url:{self.url} json:{state}")
try:
await self.send_wled_command_impl(state)
headers = {"Content-Type": "application/json"}
request = HTTPRequest(url=self.url,
method="POST",
headers=headers,
body=json.dumps(state),
connect_timeout=self.timeout,
request_timeout=self.timeout)
response = await self.client.fetch(request)
logging.debug(
f"WLED: url:{self.url} status:{response.code} "
f"response:{response.body}")
self.error_state = None
except Exception as e:
msg = f"WLED: Error {e}"
self.error_state = msg
logging.exception(msg)
raise self.server.error(msg)
self.error_state = None
except Exception as e:
msg = f"WLED: Error {e}"
self.error_state = msg
logging.exception(msg)
raise self.server.error(msg)
async def wled_on(self: Strip, preset: int) -> None:
self.onoff = OnOff.on
logging.debug(f"WLED: on {self.name} PRESET={preset}")
logging.debug(f"WLED: {self.name} on PRESET={preset}")
if preset < 0:
# WLED_ON STRIP=strip (no args) - reset to default
await self.initialize()
@ -166,7 +152,7 @@ class Strip:
await self._send_wled_command({"on": True, "ps": preset})
async def wled_off(self: Strip) -> None:
logging.debug(f"WLED: off {self.name}")
logging.debug(f"WLED: {self.name} off")
self.onoff = OnOff.off
await self._send_wled_command({"on": False})
@ -217,6 +203,74 @@ class Strip:
# next transmitting
self.send_full_chain_data = True
class StripHttp(Strip):
def __init__(self: StripHttp,
name: str,
color_order: ColorOrder,
cfg: ConfigHelper):
super().__init__(name, color_order, cfg)
# Read the uri information
addr: str = cfg.get("address")
port: int = cfg.getint("port", 80)
protocol: str = cfg.get("protocol", "http")
self.url = f"{protocol}://{addr}:{port}/json"
self.timeout: float = cfg.getfloat("timeout", 2.)
self.client = AsyncHTTPClient()
async def send_wled_command_impl(self: StripHttp,
state: Dict[str, Any]) -> None:
async with self.request_mutex:
logging.debug(f"WLED: url:{self.url} json:{state}")
headers = {"Content-Type": "application/json"}
request = HTTPRequest(url=self.url,
method="POST",
headers=headers,
body=json.dumps(state),
connect_timeout=self.timeout,
request_timeout=self.timeout)
response = await self.client.fetch(request)
logging.debug(
f"WLED: url:{self.url} status:{response.code} "
f"response:{response.body}")
class StripSerial(Strip):
def __init__(self: StripSerial,
name: str,
color_order: ColorOrder,
cfg: ConfigHelper):
super().__init__(name, color_order, cfg)
# Read the serial information (requires wled 0.13 2108250 or greater)
serialport: str = cfg.get("serial")
baud: int = cfg.getint("baud", 115200, above=49)
# write_timeout of 0 is non-blocking
self.ser = serial.Serial(serialport, baud,
write_timeout=0)
fd = self.ser.fileno()
os.set_blocking(fd, False)
async def send_wled_command_impl(self: StripSerial,
state: Dict[str, Any]) -> None:
async with self.request_mutex:
logging.debug(f"WLED: serial:{self.ser.name} json:{state}")
if not self.ser.is_open:
self.ser.open()
# asyncio support is still experimental in pySerial
self.ser.write(json.dumps(state).encode())
def close(self: StripSerial):
if self.ser.is_open:
self.ser.close()
logging.info(f"WLED: Closing serial {self.ser.name}")
class WLED:
def __init__(self: WLED, config: ConfigHelper) -> None:
try:
@ -230,6 +284,10 @@ class WLED:
"RGB": ColorOrder.RGB,
"RGBW": ColorOrder.RGBW
}
strip_types = {
"http": StripHttp,
"serial": StripSerial
}
self.strips = {}
for section in prefix_sections:
cfg = config[section]
@ -245,10 +303,22 @@ class WLED:
color_order_cfg: str = cfg.get("color_order", "RGB")
color_order = color_orders.get(color_order_cfg)
if color_order is None:
raise config.error(
raise cfg.error(
f"Color order not supported: {color_order_cfg}")
self.strips[name] = Strip(name, color_order, cfg)
strip_type: str = cfg.get("type", "http")
strip_class: Optional[Type[Strip]]
strip_class = strip_types.get(strip_type)
if strip_class is None:
raise cfg.error(f"Unsupported Strip Type: {strip_type}")
try:
strip = strip_class(name, color_order, cfg)
except Exception as e:
msg = f"Failed to initialise strip [{cfg.get_name()}]\n{e}"
self.server.add_warning(msg)
continue
self.strips[name] = strip
# Register two remote methods for GCODE
self.server.register_remote_method(
@ -429,5 +499,9 @@ class WLED:
raise self.server.error(f"Unsupported wled request: {req}")
def close(self) -> None:
for strip in self.strips.values():
strip.close()
def load_component(config: ConfigHelper) -> WLED:
return WLED(config)