Files
ds18b20-MODBUS/john103C6T6NewVer/serial_bridge.py
2026-06-25 17:25:41 +03:00

1137 lines
44 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import argparse
import json
import os
import re
import socket
import threading
import time
from http.server import HTTPServer, SimpleHTTPRequestHandler
from pathlib import Path
from typing import Optional
from urllib.parse import urlparse
import serial
from serial.tools import list_ports
SENSOR_COUNT = 16
VALVE_COUNT = 32
DEFAULT_OPEN_DEGREES_MAX = 90
MODBUS_UNIT_ID = 3
MODBUS_TEMP_INPUT_BASE = 0
MODBUS_DS18B20_ID_INPUT_BASE = 1000
MODBUS_ROOM_INPUT_BASE = 400
MODBUS_ROOM_INPUT_REGS = 18
MODBUS_SETPOINT_HOLDING_BASE = 0
MODBUS_TEMPERATURE_SCALE = 10.0
MODBUS_ROOM_HOLDING_BASE = 300
MODBUS_ROOM_HOLDING_REGS = 8
MODBUS_ROOM_HOLDING_SETPOINT_X10 = 0
MODBUS_ROOM_HOLDING_HYST_X10 = 1
MODBUS_ROOM_HOLDING_POSITION_PCT = 2
MODBUS_ROOM_HOLDING_ANGLE_MAX = 3
MODBUS_ROOM_HOLDING_MODE = 4
MODBUS_ROOM_HOLDING_COMMAND = 5
MODBUS_ROOM_HOLDING_LOCATION = 6
MODBUS_ROOM_HOLDING_APPLY = 7
MODBUS_SENSOR_STATUS_COIL_BASE = 128
MODBUS_VALVE_OPEN_COIL_BASE = 256
MODBUS_VALVE_CLOSE_COIL_BASE = 288
MODBUS_APPLY_PARAMS_COIL = 384
MODBUS_POLL_INTERVAL = 1.0
ROOM_MODE_AUTO = 0
ROOM_MODE_MANUAL = 1
ROOM_COMMAND_STOP = 0
ROOM_COMMAND_OPEN = 1
ROOM_COMMAND_CLOSE = 2
def modbus_crc16(data: bytes) -> int:
crc = 0xFFFF
for byte in data:
crc ^= byte
for _ in range(8):
if crc & 1:
crc = (crc >> 1) ^ 0xA001
else:
crc >>= 1
return crc & 0xFFFF
def to_signed16(value: int) -> int:
value = int(value) & 0xFFFF
return value - 0x10000 if value & 0x8000 else value
def numeric_suffix(value: object, fallback: int = 1) -> int:
match = re.search(r"\d+", str(value or ""))
return int(match.group(0)) if match else fallback
def ds18b20_id_from_registers(registers: list[int]) -> str:
if len(registers) < 4:
return ""
data = bytearray()
for value in registers[:4]:
data.extend((int(value) & 0xFFFF).to_bytes(2, "little"))
if not any(data):
return ""
return "-".join(f"{byte:02X}" for byte in data)
def make_default_sensors():
return [
{
"id": f"zone_{number}",
"name": f"Датчик {number}",
"value": 0.0,
"setpoint": 28.0,
"unit": "°C",
"zone": str(number),
"ds18b20Id": f"28-00-00-00-00-00-00-{number:02X}",
}
for number in range(1, SENSOR_COUNT + 1)
]
def make_default_valves():
valves = []
for index in range(VALVE_COUNT):
number = index + 1
zone = (index % SENSOR_COUNT) + 1
valves.append(
{
"id": f"valve_{number}",
"name": f"Клапан {number}",
"zone": str(zone),
"mode": "auto",
"position": 0,
"targetTemp": 28,
"isOpen": False,
"openDegrees": 0,
"openDegreesMax": DEFAULT_OPEN_DEGREES_MAX,
}
)
return valves
DEFAULT_SENSORS = make_default_sensors()
DEFAULT_VALVES = make_default_valves()
def clamp(value, min_value, max_value):
try:
v = float(value)
except Exception:
return min_value
if v < min_value:
return min_value
if v > max_value:
return max_value
return v
def discover_serial_ports() -> list[dict[str, str]]:
ports: dict[str, dict[str, str]] = {}
try:
available_ports = list_ports.comports(include_links=True)
except TypeError:
available_ports = list_ports.comports()
except Exception:
available_ports = []
for item in available_ports:
add_serial_port(ports, item.device, item.description, item.hwid)
if os.name == "nt":
try:
import winreg
with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, r"HARDWARE\DEVICEMAP\SERIALCOMM") as key:
index = 0
while True:
try:
value_name, device, _ = winreg.EnumValue(key, index)
except OSError:
break
if str(device).upper() not in ports:
add_serial_port(ports, device, "Windows serial port", f"registry:{value_name}")
index += 1
except (ImportError, OSError):
pass
return sorted(ports.values(), key=port_sort_key)
class BridgeState:
def __init__(self):
self.sensors = [dict(item) for item in DEFAULT_SENSORS]
self.valves = [dict(item) for item in DEFAULT_VALVES]
state = BridgeState()
PREFERRED_PORT_TOKENS = (
"st-link",
"stm",
"stmicro",
"usb",
"vcp",
"virtual com",
"usb serial",
)
def add_serial_port(ports: dict[str, dict[str, str]], device: object, description: object = "", hwid: object = "") -> None:
device_text = str(device).strip()
if not device_text:
return
key = device_text.upper()
ports[key] = {
"device": device_text,
"description": str(description or "").strip(),
"hwid": str(hwid or "").strip(),
}
def port_text(port: dict[str, str]) -> str:
return " ".join([port.get("device", ""), port.get("description", ""), port.get("hwid", "")]).lower()
def port_com_number(port: dict[str, str]) -> int:
device = port.get("device", "").upper()
if device.startswith("COM") and device[3:].isdigit():
return int(device[3:])
return 0
def port_sort_key(port: dict[str, str]) -> tuple[int, int, str]:
text = port_text(port)
rank = 0 if any(token in text for token in PREFERRED_PORT_TOKENS) else 1
com_num = port_com_number(port)
if 1 <= com_num <= 4:
rank = 2
return (rank, com_num, port.get("device", "").upper())
def preferred_serial_port(ports: list[dict[str, str]]) -> str | None:
if not ports:
return None
return sorted(ports, key=port_sort_key)[0]["device"]
state_lock = threading.Lock()
def valve_open_degrees_max(valve: dict) -> int:
return int(clamp(valve.get("openDegreesMax", DEFAULT_OPEN_DEGREES_MAX), 1, 10000))
def position_from_degrees(degrees: int, max_degrees: int) -> int:
if max_degrees <= 0:
return 0
return int(round(clamp(degrees, 0, max_degrees) * 100 / max_degrees))
def degrees_from_position(position: int, max_degrees: int) -> int:
return int(round(clamp(position, 0, 100) * max_degrees / 100))
class SerialBridge:
def __init__(self, port: str, baudrate: int, parity: str, stopbits: float, bytesize: int, timeout: float):
self.port = port
self.baudrate = baudrate
self.parity = parity
self.stopbits = stopbits
self.bytesize = bytesize
self.timeout = timeout
self.transport = "rtu"
self.tcp_host = ""
self.tcp_port = 502
self.tcp_socket = None
self.tcp_file = None
self.unit_id = MODBUS_UNIT_ID
self.transaction_id = 0
self.request_lock = threading.Lock()
self.serial = None
self.running = False
self.thread = None
def configure_rtu(self, port: str, baudrate: int, parity: str, stopbits: float, bytesize: int, timeout: float):
self.transport = "rtu"
self.port = port
self.baudrate = baudrate
self.parity = parity
self.stopbits = stopbits
self.bytesize = bytesize
self.timeout = timeout
def configure_tcp(self, host: str, tcp_port: int = 502, timeout: float = 0.8):
self.transport = "tcp"
self.tcp_host = host
self.tcp_port = int(tcp_port)
self.timeout = timeout
self.port = f"{self.tcp_host}:{self.tcp_port}"
def connect(self):
if self.running and self.thread and self.thread.is_alive():
self.disconnect()
if self.transport == "tcp":
self.tcp_socket = socket.create_connection((self.tcp_host, self.tcp_port), timeout=self.timeout)
self.tcp_socket.settimeout(self.timeout)
self.tcp_file = self.tcp_socket.makefile("rb")
else:
self.serial = serial.Serial(
self.port,
self.baudrate,
parity=self.parity,
stopbits=self.stopbits,
bytesize=self.bytesize,
timeout=self.timeout,
)
self.running = True
self.thread = threading.Thread(target=self.modbus_poll_loop, daemon=True)
self.thread.start()
def disconnect(self):
self.running = False
if self.thread and self.thread.is_alive():
try:
self.thread.join(timeout=0.5)
except Exception:
pass
if self.serial and self.serial.is_open:
self.serial.close()
if self.tcp_file:
try:
self.tcp_file.close()
except Exception:
pass
self.tcp_file = None
if self.tcp_socket:
try:
self.tcp_socket.close()
except Exception:
pass
self.tcp_socket = None
def is_connected(self):
if self.transport == "tcp":
return self.tcp_socket is not None
return bool(self.serial and self.serial.is_open)
def status(self):
connected = self.is_connected()
if self.transport == "tcp":
return {
"ok": True,
"connected": connected,
"transport": "tcp",
"unitId": self.unit_id,
"host": self.tcp_host,
"tcpPort": self.tcp_port,
"address": f"{self.tcp_host}:{self.tcp_port}" if self.tcp_host else "",
"port": f"{self.tcp_host}:{self.tcp_port}" if connected and self.tcp_host else None,
}
return {
"ok": True,
"connected": connected,
"transport": "rtu",
"unitId": self.unit_id,
"port": self.port if connected else None,
"baudrate": self.baudrate,
}
def send(self, payload: dict):
if not self.is_connected():
return
typ = str(payload.get("type", "")).lower()
if typ == "sensor":
self.write_sensor_payload(payload)
return
if typ == "valve":
self.write_valve_payload(payload)
return
def _recv_exact_tcp(self, size: int) -> bytes:
data = bytearray()
while len(data) < size:
chunk = self.tcp_socket.recv(size - len(data))
if not chunk:
raise ConnectionError("Modbus TCP connection closed")
data.extend(chunk)
return bytes(data)
def _read_exact_serial(self, size: int) -> bytes:
data = bytearray()
deadline = time.monotonic() + max(float(self.timeout or 0.3), 0.3)
while len(data) < size and time.monotonic() < deadline:
chunk = self.serial.read(size - len(data))
if chunk:
data.extend(chunk)
else:
time.sleep(0.005)
if len(data) != size:
raise TimeoutError("Modbus RTU response timeout")
return bytes(data)
def modbus_request(self, function: int, payload: bytes) -> bytes:
pdu = bytes([function]) + payload
with self.request_lock:
if self.transport == "tcp":
self.transaction_id = (self.transaction_id + 1) & 0xFFFF
mbap = (
self.transaction_id.to_bytes(2, "big")
+ b"\x00\x00"
+ (len(pdu) + 1).to_bytes(2, "big")
+ bytes([self.unit_id])
)
self.tcp_socket.sendall(mbap + pdu)
header = self._recv_exact_tcp(7)
length = int.from_bytes(header[4:6], "big")
response_pdu = self._recv_exact_tcp(max(0, length - 1))
else:
frame = bytes([self.unit_id]) + pdu
crc = modbus_crc16(frame)
request = frame + crc.to_bytes(2, "little")
try:
self.serial.reset_input_buffer()
except Exception:
pass
self.serial.write(request)
header = self._read_exact_serial(3)
if header[0] != self.unit_id:
raise ValueError("Unexpected Modbus RTU unit id")
if header[1] & 0x80:
tail = self._read_exact_serial(2)
frame = header + tail
if modbus_crc16(frame[:-2]) != int.from_bytes(frame[-2:], "little"):
raise ValueError("Bad Modbus RTU CRC")
raise RuntimeError(f"Modbus exception {header[2]}")
if header[1] in (0x01, 0x02, 0x03, 0x04):
tail = self._read_exact_serial(header[2] + 2)
else:
tail = self._read_exact_serial(5)
frame = header + tail
if modbus_crc16(frame[:-2]) != int.from_bytes(frame[-2:], "little"):
raise ValueError("Bad Modbus RTU CRC")
response_pdu = frame[1:-2]
if not response_pdu:
raise TimeoutError("Empty Modbus response")
if response_pdu[0] & 0x80:
code = response_pdu[1] if len(response_pdu) > 1 else 0
raise RuntimeError(f"Modbus exception {code}")
if response_pdu[0] != function:
raise ValueError("Unexpected Modbus function")
return response_pdu
def read_registers(self, function: int, address: int, count: int) -> list[int]:
response = self.modbus_request(function, address.to_bytes(2, "big") + count.to_bytes(2, "big"))
byte_count = response[1]
data = response[2:2 + byte_count]
return [int.from_bytes(data[index:index + 2], "big") for index in range(0, len(data), 2)]
def read_input_registers(self, address: int, count: int) -> list[int]:
return self.read_registers(0x04, address, count)
def read_holding_registers(self, address: int, count: int) -> list[int]:
return self.read_registers(0x03, address, count)
def read_coils(self, address: int, count: int) -> list[bool]:
response = self.modbus_request(0x01, address.to_bytes(2, "big") + count.to_bytes(2, "big"))
data = response[2:2 + response[1]]
bits: list[bool] = []
for byte in data:
for bit in range(8):
bits.append(bool(byte & (1 << bit)))
if len(bits) >= count:
return bits
return bits
def write_register(self, address: int, value: int):
self.modbus_request(0x06, address.to_bytes(2, "big") + (int(value) & 0xFFFF).to_bytes(2, "big"))
def write_coil(self, address: int, value: bool):
raw_value = 0xFF00 if value else 0x0000
self.modbus_request(0x05, address.to_bytes(2, "big") + raw_value.to_bytes(2, "big"))
def write_sensor_payload(self, payload: dict):
sensor_number = numeric_suffix(payload.get("id"), 1)
channel = max(0, min(SENSOR_COUNT - 1, sensor_number - 1))
if "setpoint" in payload:
setpoint_x10 = round(float(payload["setpoint"]) * MODBUS_TEMPERATURE_SCALE)
self.write_register(MODBUS_SETPOINT_HOLDING_BASE + channel, setpoint_x10)
self.write_register(MODBUS_ROOM_HOLDING_BASE + channel * MODBUS_ROOM_HOLDING_REGS + MODBUS_ROOM_HOLDING_SETPOINT_X10, setpoint_x10)
self.write_register(MODBUS_ROOM_HOLDING_BASE + channel * MODBUS_ROOM_HOLDING_REGS + MODBUS_ROOM_HOLDING_APPLY, 1)
self.write_coil(MODBUS_APPLY_PARAMS_COIL, True)
def write_valve_payload(self, payload: dict):
valve_number = numeric_suffix(payload.get("id"), 1)
channel = max(0, min(SENSOR_COUNT - 1, (valve_number - 1) // 2))
room_base = MODBUS_ROOM_HOLDING_BASE + channel * MODBUS_ROOM_HOLDING_REGS
if "targetTemp" in payload:
setpoint_x10 = round(float(payload["targetTemp"]) * MODBUS_TEMPERATURE_SCALE)
self.write_register(MODBUS_SETPOINT_HOLDING_BASE + channel, setpoint_x10)
self.write_register(room_base + MODBUS_ROOM_HOLDING_SETPOINT_X10, setpoint_x10)
self.write_register(room_base + MODBUS_ROOM_HOLDING_APPLY, 1)
self.write_coil(MODBUS_APPLY_PARAMS_COIL, True)
if "mode" in payload:
mode = ROOM_MODE_MANUAL if str(payload.get("mode")).lower() == "manual" else ROOM_MODE_AUTO
self.write_register(room_base + MODBUS_ROOM_HOLDING_MODE, mode)
if "position" not in payload and "isOpen" not in payload:
return
if "position" in payload:
position_pct = int(clamp(payload.get("position") or 0, 0, 100))
should_open = position_pct > 0
else:
should_open = bool(payload.get("isOpen"))
position_pct = 100 if should_open else 0
self.write_register(room_base + MODBUS_ROOM_HOLDING_POSITION_PCT, position_pct)
self.write_register(room_base + MODBUS_ROOM_HOLDING_COMMAND, ROOM_COMMAND_OPEN if should_open else ROOM_COMMAND_CLOSE)
self.write_coil(MODBUS_VALVE_OPEN_COIL_BASE + channel, should_open)
self.write_coil(MODBUS_VALVE_CLOSE_COIL_BASE + channel, not should_open)
def modbus_poll_loop(self):
while self.running:
try:
self.poll_modbus_state()
except Exception:
time.sleep(0.2)
time.sleep(MODBUS_POLL_INTERVAL)
def apply_room_registers(self, room_regs: list[int]):
with state_lock:
for index in range(SENSOR_COUNT):
start = index * MODBUS_ROOM_INPUT_REGS
room = room_regs[start:start + MODBUS_ROOM_INPUT_REGS]
if len(room) < MODBUS_ROOM_INPUT_REGS:
continue
sensor = state.sensors[index]
sensor["value"] = round(to_signed16(room[6]) / MODBUS_TEMPERATURE_SCALE, 1)
sensor["setpoint"] = round(to_signed16(room[7]) / MODBUS_TEMPERATURE_SCALE, 1)
sensor["ds18b20Id"] = ds18b20_id_from_registers(room[2:6])
sensor["connected"] = bool(room[12])
sensor["locationCode"] = int(room[1])
position = int(clamp(room[9], 0, 100))
open_degrees_max = int(room[11] or DEFAULT_OPEN_DEGREES_MAX)
open_degrees = int(clamp(room[10], 0, open_degrees_max))
mode = "manual" if int(room[15]) == ROOM_MODE_MANUAL else "auto"
open_state = bool(room[13])
close_state = bool(room[14])
open_index = index * 2
close_index = open_index + 1
if open_index < len(state.valves):
valve = state.valves[open_index]
valve["targetTemp"] = sensor["setpoint"]
valve["mode"] = mode
valve["position"] = position
valve["openDegrees"] = open_degrees
valve["openDegreesMax"] = open_degrees_max
valve["isOpen"] = open_state or position > 0
valve["commandState"] = int(room[16])
if close_index < len(state.valves):
valve = state.valves[close_index]
valve["targetTemp"] = sensor["setpoint"]
valve["mode"] = mode
valve["position"] = 100 if close_state else 0
valve["openDegrees"] = degrees_from_position(valve["position"], open_degrees_max)
valve["openDegreesMax"] = open_degrees_max
valve["isOpen"] = close_state
valve["commandState"] = int(room[16])
def poll_modbus_state(self):
if not self.is_connected():
return
try:
room_regs = self.read_input_registers(MODBUS_ROOM_INPUT_BASE, SENSOR_COUNT * MODBUS_ROOM_INPUT_REGS)
self.apply_room_registers(room_regs)
return
except Exception:
pass
temperatures = self.read_input_registers(MODBUS_TEMP_INPUT_BASE, SENSOR_COUNT)
sensor_ids = self.read_input_registers(MODBUS_DS18B20_ID_INPUT_BASE, SENSOR_COUNT * 4)
setpoints = self.read_holding_registers(MODBUS_SETPOINT_HOLDING_BASE, SENSOR_COUNT)
try:
sensor_connected = self.read_coils(MODBUS_SENSOR_STATUS_COIL_BASE, SENSOR_COUNT)
except Exception:
sensor_connected = [True] * SENSOR_COUNT
try:
valve_open = self.read_coils(MODBUS_VALVE_OPEN_COIL_BASE, SENSOR_COUNT)
except Exception:
valve_open = [False] * SENSOR_COUNT
try:
valve_close = self.read_coils(MODBUS_VALVE_CLOSE_COIL_BASE, SENSOR_COUNT)
except Exception:
valve_close = [False] * SENSOR_COUNT
with state_lock:
for index, sensor in enumerate(state.sensors[:SENSOR_COUNT]):
if index < len(temperatures):
sensor["value"] = round(to_signed16(temperatures[index]) / 10.0, 1)
id_start = index * 4
ds18b20_id = ds18b20_id_from_registers(sensor_ids[id_start:id_start + 4])
if ds18b20_id:
sensor["ds18b20Id"] = ds18b20_id
if index < len(setpoints):
sensor["setpoint"] = round(to_signed16(setpoints[index]) / MODBUS_TEMPERATURE_SCALE, 1)
if index < len(sensor_connected):
sensor["connected"] = bool(sensor_connected[index])
for index in range(SENSOR_COUNT):
open_index = index * 2
close_index = open_index + 1
open_state = bool(valve_open[index]) if index < len(valve_open) else False
close_state = bool(valve_close[index]) if index < len(valve_close) else False
position = 100 if open_state else 0 if close_state else int(state.valves[open_index].get("position", 0))
if open_index < len(state.valves):
valve = state.valves[open_index]
valve["isOpen"] = open_state
valve["position"] = position
valve["openDegrees"] = degrees_from_position(position, valve_open_degrees_max(valve))
if index < len(setpoints):
valve["targetTemp"] = round(to_signed16(setpoints[index]) / MODBUS_TEMPERATURE_SCALE, 1)
if close_index < len(state.valves):
valve = state.valves[close_index]
valve["isOpen"] = close_state
valve["position"] = 100 if close_state else 0
valve["openDegrees"] = degrees_from_position(valve["position"], valve_open_degrees_max(valve))
if index < len(setpoints):
valve["targetTemp"] = round(to_signed16(setpoints[index]) / MODBUS_TEMPERATURE_SCALE, 1)
def read_loop(self):
while self.running:
try:
raw = self.tcp_file.readline() if self.transport == "tcp" else self.serial.readline()
if not raw:
continue
try:
line = raw.decode("utf-8", errors="ignore").strip()
except Exception:
line = ""
if not line:
continue
updated = parse_input_line(line)
if not updated:
continue
with state_lock:
apply_updates(updated)
except Exception:
time.sleep(0.05)
def zone_to_id(zone: Optional[str], kind: str):
if zone is None:
return None
idx = str(zone).strip().replace("zone_", "")
if kind == "sensor":
return f"zone_{idx}"
return f"valve_{idx}"
def parse_input_line(line: str):
# JSON:
# {"sensors":[...], "valves":[...]} or {"s1": {"value": 24.6}, ...}
try:
payload = json.loads(line)
except Exception:
payload = None
if isinstance(payload, dict):
if isinstance(payload.get("sensors"), list) or isinstance(payload.get("valves"), list):
return {
"sensors": payload.get("sensors") if isinstance(payload.get("sensors"), list) else None,
"valves": payload.get("valves") if isinstance(payload.get("valves"), list) else None,
}
if any(isinstance(k, str) and (k.startswith("T") or k.startswith("S")) for k in payload.keys()):
return {"kv": payload}
if isinstance(payload, list):
# allow list with {"type":"sensor",...}
return {"items": payload}
# key=value format:
# T1=24.7;T2=25.1;V1_MODE=auto;V1_POS=45;...
pairs = [p.strip() for p in line.split(";") if "=" in p]
updates = {"kv": {}}
for pair in pairs:
key, value = pair.split("=", 1)
updates["kv"][key.strip().upper()] = value.strip()
if not updates["kv"]:
return None
return updates
def apply_updates(payload):
if not payload:
return
if "sensors" in payload and isinstance(payload["sensors"], list):
for item in payload["sensors"]:
sid = str(item.get("id") or item.get("sensorId") or item.get("code") or "").strip()
sensor = next((s for s in state.sensors if s["id"] == sid), None)
if not sensor:
continue
if "value" in item:
sensor["value"] = clamp(item["value"], -1000, 1000)
if "setpoint" in item:
sensor["setpoint"] = clamp(item["setpoint"], -1000, 1000)
if "unit" in item:
sensor["unit"] = item["unit"]
if "valves" in payload and isinstance(payload["valves"], list):
for item in payload["valves"]:
vid = str(item.get("id") or item.get("valveId") or item.get("code") or "").strip()
valve = next((v for v in state.valves if v["id"] == vid), None)
if not valve:
continue
if "mode" in item:
mode = str(item["mode"]).lower()
valve["mode"] = "manual" if mode == "manual" else "auto"
if "position" in item:
valve["position"] = int(clamp(item["position"], 0, 100))
valve["openDegrees"] = degrees_from_position(valve["position"], valve_open_degrees_max(valve))
if "openDegreesMax" in item:
valve["openDegreesMax"] = int(clamp(item["openDegreesMax"], 1, 10000))
if "openDegrees" in item:
max_degrees = valve_open_degrees_max(valve)
valve["openDegrees"] = int(clamp(item["openDegrees"], 0, max_degrees))
valve["position"] = position_from_degrees(valve["openDegrees"], max_degrees)
if "targetTemp" in item:
valve["targetTemp"] = clamp(item["targetTemp"], -1000, 1000)
if "isOpen" in item:
valve["isOpen"] = bool(item["isOpen"])
if "items" in payload and isinstance(payload["items"], list):
for item in payload["items"]:
if not isinstance(item, dict):
continue
typ = str(item.get("type", "")).lower()
if typ == "sensor":
sid = str(item.get("id") or item.get("sensorId") or item.get("zone") or "")
sensor = next((s for s in state.sensors if s["id"] == sid), None)
if sensor:
if "value" in item:
sensor["value"] = clamp(item["value"], -1000, 1000)
elif typ == "valve":
vid = str(item.get("id") or item.get("valveId") or item.get("zone") or "")
valve = next((v for v in state.valves if v["id"] == vid), None)
if valve:
if "mode" in item:
mode = str(item["mode"]).lower()
valve["mode"] = "manual" if mode == "manual" else "auto"
if "openDegreesMax" in item:
valve["openDegreesMax"] = int(clamp(item["openDegreesMax"], 1, 10000))
if "openDegrees" in item:
max_degrees = valve_open_degrees_max(valve)
valve["openDegrees"] = int(clamp(item["openDegrees"], 0, max_degrees))
valve["position"] = position_from_degrees(valve["openDegrees"], max_degrees)
if "position" in item and "openDegrees" not in item:
max_degrees = valve_open_degrees_max(valve)
valve["position"] = int(clamp(item["position"], 0, 100))
valve["openDegrees"] = degrees_from_position(valve["position"], max_degrees)
kv = payload.get("kv", {})
if not kv:
return
sensor_map = {sensor["id"].split("_")[1]: sensor for sensor in state.sensors if "_" in sensor["id"]}
valve_map = {valve["id"].split("_")[1]: valve for valve in state.valves if "_" in valve["id"]}
# Temperature value keys
# T1=24.5 | TEMP1=24.5 | Z1_TEMP=24.5
for key, raw_value in kv.items():
value = raw_value
key_upper = key.upper()
num = re.search(r"\d+", key_upper)
idx = num.group(0) if num else None
if key_upper.startswith("T") or key_upper.startswith("TEMP") or "TEMP" in key_upper:
zone = idx
if zone and zone in sensor_map:
try:
sensor_map[zone]["value"] = clamp(value, -1000, 1000)
except Exception:
pass
continue
if key_upper.startswith("S") and idx and "SETPOINT" in key_upper:
zone = idx
if zone in sensor_map:
try:
sensor_map[zone]["setpoint"] = clamp(value, -1000, 1000)
except Exception:
pass
continue
# valve keys: V1_MODE=auto, V1_POS=45, V1_TGT=28
if key_upper.startswith("V") and idx:
valve = valve_map.get(idx)
if not valve:
continue
if "MODE" in key_upper:
valve["mode"] = "manual" if "MANUAL" in key_upper and str(value).lower() == "manual" else str(value).lower()
if "POS" in key_upper:
try:
valve["position"] = int(clamp(value, 0, 100))
valve["openDegrees"] = degrees_from_position(valve["position"], valve_open_degrees_max(valve))
except Exception:
pass
if "TGT" in key_upper:
try:
valve["targetTemp"] = clamp(value, -1000, 1000)
except Exception:
pass
if "MAXDEG" in key_upper:
max_degrees = int(clamp(value, 1, 10000))
valve["openDegreesMax"] = max_degrees
max_degrees = valve_open_degrees_max(valve)
try:
valve["openDegrees"] = int(
clamp(valve.get("openDegrees", degrees_from_position(valve.get("position", 0), max_degrees)), 0, max_degrees)
)
valve["position"] = position_from_degrees(valve["openDegrees"], max_degrees)
except Exception:
pass
elif "DEG" in key_upper:
try:
max_degrees = valve_open_degrees_max(valve)
valve["openDegrees"] = int(clamp(value, 0, max_degrees))
valve["position"] = position_from_degrees(valve["openDegrees"], max_degrees)
except Exception:
pass
for valve in state.valves:
max_degrees = valve_open_degrees_max(valve)
valve["openDegrees"] = int(clamp(valve.get("openDegrees", degrees_from_position(valve.get("position", 0), max_degrees)), 0, max_degrees))
valve["isOpen"] = int(valve.get("position", 0)) > 0
def update_sensor(id_, patch):
sid = str(id_)
with state_lock:
sensor = next((item for item in state.sensors if item["id"] == sid), None)
if not sensor:
return False
sensor.update(patch)
return True
def update_valve(id_, patch):
vid = str(id_)
with state_lock:
valve = next((item for item in state.valves if item["id"] == vid), None)
if not valve:
return False
if "openDegreesMax" in patch:
valve["openDegreesMax"] = int(clamp(patch["openDegreesMax"], 1, 10000))
if "mode" in patch:
mode = str(patch["mode"]).lower()
valve["mode"] = "manual" if mode == "manual" else "auto"
if "targetTemp" in patch:
valve["targetTemp"] = clamp(patch["targetTemp"], -1000, 1000)
if "isOpen" in patch:
valve["isOpen"] = bool(patch["isOpen"])
if "name" in patch:
valve["name"] = patch["name"]
if "openDegrees" in patch:
max_degrees = valve_open_degrees_max(valve)
valve["openDegrees"] = int(clamp(patch["openDegrees"], 0, max_degrees))
valve["position"] = position_from_degrees(valve["openDegrees"], max_degrees)
if "position" in patch:
max_degrees = valve_open_degrees_max(valve)
valve["position"] = int(clamp(patch["position"], 0, 100))
valve["openDegrees"] = degrees_from_position(valve["position"], max_degrees)
if "openDegreesMax" in patch and "openDegrees" not in patch and "position" not in patch:
max_degrees = valve_open_degrees_max(valve)
valve["openDegrees"] = int(clamp(valve.get("openDegrees", 0), 0, max_degrees))
valve["isOpen"] = int(valve.get("position", 0)) > 0
return True
def _calibrate_valve_locked(valve: dict) -> dict:
valve["position"] = 0
valve["openDegrees"] = 0
valve["isOpen"] = False
return dict(valve)
def calibrate_valve(channel_id: str) -> dict | None:
with state_lock:
valve = next((item for item in state.valves if item["id"] == str(channel_id)), None)
if not valve:
return None
updated = _calibrate_valve_locked(valve)
bridge.send({
"type": "valve",
"id": str(channel_id),
"command": "calibrate",
"position": 0,
"openDegrees": 0,
"close": True,
})
return updated
def calibrate_all_valves() -> list[dict]:
with state_lock:
calibrated = [_calibrate_valve_locked(valve) for valve in state.valves]
for item in calibrated:
bridge.send({
"type": "valve",
"id": item["id"],
"command": "calibrate",
"position": 0,
"openDegrees": 0,
"close": True,
})
return [dict(item) for item in calibrated]
def snapshot_sensors():
with state_lock:
return [dict(item) for item in state.sensors]
def snapshot_valves():
with state_lock:
return [dict(item) for item in state.valves]
class Handler(SimpleHTTPRequestHandler):
def end_headers(self):
self.send_header("Cache-Control", "no-store, no-cache, must-revalidate, max-age=0")
self.send_header("Pragma", "no-cache")
self.send_header("Expires", "0")
super().end_headers()
def _send_json(self, payload, code=200):
body = json.dumps(payload, ensure_ascii=False).encode("utf-8")
self.send_response(code)
self.send_header("Content-Type", "application/json; charset=utf-8")
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Access-Control-Allow-Methods", "GET, PUT, POST, OPTIONS")
self.send_header("Access-Control-Allow-Headers", "Content-Type")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def do_OPTIONS(self):
self.send_response(204)
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Access-Control-Allow-Methods", "GET, PUT, POST, OPTIONS")
self.send_header("Access-Control-Allow-Headers", "Content-Type")
self.end_headers()
def do_GET(self):
parsed = urlparse(self.path)
if parsed.path == "/api/sensors":
self._send_json(snapshot_sensors())
return
if parsed.path == "/api/valves":
self._send_json(snapshot_valves())
return
if parsed.path in ("/api/serial/ports", "/api/ports"):
ports = discover_serial_ports()
self._send_json(
{"ok": True, "ports": ports, "selected": bridge.port or preferred_serial_port(ports)}
)
return
if parsed.path in ("/api/serial/status", "/api/state"):
status = bridge.status()
if status.get("connected") and bridge.port:
status["selected"] = bridge.port
self._send_json(status)
return
if parsed.path == "/":
self.path = "/index.html"
return super().do_GET()
def do_POST(self):
parsed = urlparse(self.path)
parts = [seg for seg in parsed.path.split("/") if seg]
if len(parts) == 3 and parts[0] == "api" and parts[1] == "valves" and parts[2] == "calibrate-all":
status = {"ok": True, "valves": calibrate_all_valves(), "message": "calibration started"}
self._send_json(status)
return
if len(parts) == 4 and parts[0] == "api" and parts[1] == "valves" and parts[3] == "calibrate":
valve_id = parts[2]
updated = calibrate_valve(valve_id)
if not updated:
self.send_response(404)
self.end_headers()
return
self._send_json({"ok": True, "valve": updated, "message": "calibration started"})
return
if parsed.path in ("/api/serial/connect", "/api/connect"):
length = int(self.headers.get("Content-Length", "0") or "0")
body = self.rfile.read(length)
try:
payload = json.loads(body.decode("utf-8", errors="ignore")) if body else {}
except Exception:
payload = {}
transport = str(payload.get("transport", payload.get("mode", "rtu"))).lower()
timeout = float(payload.get("timeout", 0.8 if transport == "tcp" else 0.3))
try:
unit_id = int(payload.get("unitId", payload.get("slaveId", payload.get("slave", bridge.unit_id))))
except Exception:
unit_id = MODBUS_UNIT_ID
bridge.unit_id = int(clamp(unit_id, 1, 247))
if transport in ("tcp", "modbus_tcp", "modbus-tcp"):
host = str(payload.get("host") or payload.get("ip") or payload.get("address") or "").strip()
if not host:
self._send_json({"ok": False, "error": "TCP host/IP is required"}, 400)
return
tcp_port = int(payload.get("tcpPort", payload.get("tcp_port", payload.get("modbusPort", payload.get("port", 502)))))
bridge.configure_tcp(host, tcp_port, timeout)
else:
port = str(payload.get("port", "")).strip()
if not port:
self.send_response(400)
self.end_headers()
return
baud = payload.get("baudrate", payload.get("baud", 115200))
parity = payload.get("parity", "N")
stopbits = payload.get("stopbits", payload.get("stopBits", 1))
bytesize = payload.get("bytesize", payload.get("byteSize", 8))
bridge.configure_rtu(port, baud, parity, stopbits, bytesize, timeout)
try:
bridge.connect()
except Exception as exc:
self._send_json({"ok": False, "error": str(exc)}, 500)
return
calibrate_all_valves()
status = bridge.status()
status["ok"] = True
self._send_json(status)
return
if parsed.path in ("/api/serial/disconnect", "/api/disconnect"):
transport = bridge.transport
bridge.disconnect()
self._send_json({"ok": True, "connected": False, "transport": transport})
return
self.send_response(404)
self.end_headers()
def do_PUT(self):
parsed = urlparse(self.path)
parts = [seg for seg in parsed.path.split("/") if seg]
if len(parts) != 3 or parts[0] != "api":
self.send_response(404)
self.end_headers()
return
_, resource, raw_id = parts
item_id = raw_id
length = int(self.headers.get("Content-Length", "0") or "0")
body = self.rfile.read(length)
try:
payload = json.loads(body.decode("utf-8", errors="ignore")) if body else {}
except Exception:
payload = {}
if resource == "sensors":
patch = {}
if "setpoint" in payload:
patch["setpoint"] = float(payload["setpoint"])
if not patch:
patch = {k: v for k, v in payload.items() if k in ("setpoint", "value", "unit", "name")}
if not update_sensor(item_id, patch):
self.send_response(404)
self.end_headers()
return
if bridge.is_connected():
try:
bridge.send({"type": "sensor", "id": item_id, **patch})
except Exception as exc:
self._send_json({"ok": False, "error": str(exc)}, 500)
return
self._send_json(next(item for item in state.sensors if item["id"] == item_id))
return
if resource == "valves":
patch = {}
if "mode" in payload:
mode = str(payload["mode"]).lower()
patch["mode"] = "manual" if mode == "manual" else "auto"
if "position" in payload:
patch["position"] = int(clamp(payload["position"], 0, 100))
if "targetTemp" in payload:
patch["targetTemp"] = float(payload["targetTemp"])
if not patch:
patch = {k: v for k, v in payload.items() if k in ("mode", "position", "targetTemp", "isOpen", "name")}
if not update_valve(item_id, patch):
self.send_response(404)
self.end_headers()
return
if bridge.is_connected():
try:
bridge.send({"type": "valve", "id": item_id, **patch})
except Exception as exc:
self._send_json({"ok": False, "error": str(exc)}, 500)
return
self._send_json(next(item for item in state.valves if item["id"] == item_id))
return
self.send_response(404)
self.end_headers()
def parse_args():
parser = argparse.ArgumentParser(description="Serial bridge for MCU data to Web GUI")
parser.add_argument("--host", default="0.0.0.0")
parser.add_argument("--port", type=int, default=8080)
parser.add_argument("--serial-port", default="")
parser.add_argument("--baudrate", type=int, default=115200)
parser.add_argument("--parity", default="N")
parser.add_argument("--stopbits", type=float, default=1)
parser.add_argument("--bytesize", type=int, default=8)
parser.add_argument("--timeout", type=float, default=0.3)
return parser.parse_args()
if __name__ == "__main__":
args = parse_args()
bridge = SerialBridge(
args.serial_port,
baudrate=args.baudrate,
parity=args.parity,
stopbits=args.stopbits,
bytesize=args.bytesize,
timeout=args.timeout,
)
if args.serial_port:
try:
bridge.connect()
print(f"COM connected: {args.serial_port} @ {args.baudrate}")
calibrate_all_valves()
except Exception as exc:
print(f"Не удалось открыть {args.serial_port}: {exc}. Запуск только в памяти.")
else:
print("COM порт не указан. Сервер стартует в локальном режиме без чтения порта.")
server = HTTPServer((args.host, args.port), Handler)
print(f"Server started: http://{args.host}:{args.port}")
try:
server.serve_forever()
except KeyboardInterrupt:
pass
finally:
bridge.disconnect()
server.server_close()