import argparse import json import os import re import socket import threading import time from http.server import HTTPServer, SimpleHTTPRequestHandler from pathlib import Path from typing import Optional from urllib.parse import urlparse import serial from serial.tools import list_ports SENSOR_COUNT = 16 VALVE_COUNT = 32 DEFAULT_OPEN_DEGREES_MAX = 90 MODBUS_UNIT_ID = 3 MODBUS_TEMP_INPUT_BASE = 0 MODBUS_DS18B20_ID_INPUT_BASE = 1000 MODBUS_ROOM_INPUT_BASE = 400 MODBUS_ROOM_INPUT_REGS = 18 MODBUS_SETPOINT_HOLDING_BASE = 0 MODBUS_TEMPERATURE_SCALE = 10.0 MODBUS_ROOM_HOLDING_BASE = 300 MODBUS_ROOM_HOLDING_REGS = 8 MODBUS_ROOM_HOLDING_SETPOINT_X10 = 0 MODBUS_ROOM_HOLDING_HYST_X10 = 1 MODBUS_ROOM_HOLDING_POSITION_PCT = 2 MODBUS_ROOM_HOLDING_ANGLE_MAX = 3 MODBUS_ROOM_HOLDING_MODE = 4 MODBUS_ROOM_HOLDING_COMMAND = 5 MODBUS_ROOM_HOLDING_LOCATION = 6 MODBUS_ROOM_HOLDING_APPLY = 7 MODBUS_SENSOR_STATUS_COIL_BASE = 128 MODBUS_VALVE_OPEN_COIL_BASE = 256 MODBUS_VALVE_CLOSE_COIL_BASE = 288 MODBUS_APPLY_PARAMS_COIL = 384 MODBUS_POLL_INTERVAL = 1.0 ROOM_MODE_AUTO = 0 ROOM_MODE_MANUAL = 1 ROOM_COMMAND_STOP = 0 ROOM_COMMAND_OPEN = 1 ROOM_COMMAND_CLOSE = 2 def modbus_crc16(data: bytes) -> int: crc = 0xFFFF for byte in data: crc ^= byte for _ in range(8): if crc & 1: crc = (crc >> 1) ^ 0xA001 else: crc >>= 1 return crc & 0xFFFF def to_signed16(value: int) -> int: value = int(value) & 0xFFFF return value - 0x10000 if value & 0x8000 else value def numeric_suffix(value: object, fallback: int = 1) -> int: match = re.search(r"\d+", str(value or "")) return int(match.group(0)) if match else fallback def ds18b20_id_from_registers(registers: list[int]) -> str: if len(registers) < 4: return "" data = bytearray() for value in registers[:4]: data.extend((int(value) & 0xFFFF).to_bytes(2, "little")) if not any(data): return "" return "-".join(f"{byte:02X}" for byte in data) def make_default_sensors(): return [ { "id": f"zone_{number}", "name": f"Датчик {number}", "value": 0.0, "setpoint": 28.0, "unit": "°C", "zone": str(number), "ds18b20Id": f"28-00-00-00-00-00-00-{number:02X}", } for number in range(1, SENSOR_COUNT + 1) ] def make_default_valves(): valves = [] for index in range(VALVE_COUNT): number = index + 1 zone = (index % SENSOR_COUNT) + 1 valves.append( { "id": f"valve_{number}", "name": f"Клапан {number}", "zone": str(zone), "mode": "auto", "position": 0, "targetTemp": 28, "isOpen": False, "openDegrees": 0, "openDegreesMax": DEFAULT_OPEN_DEGREES_MAX, } ) return valves DEFAULT_SENSORS = make_default_sensors() DEFAULT_VALVES = make_default_valves() def clamp(value, min_value, max_value): try: v = float(value) except Exception: return min_value if v < min_value: return min_value if v > max_value: return max_value return v def discover_serial_ports() -> list[dict[str, str]]: ports: dict[str, dict[str, str]] = {} try: available_ports = list_ports.comports(include_links=True) except TypeError: available_ports = list_ports.comports() except Exception: available_ports = [] for item in available_ports: add_serial_port(ports, item.device, item.description, item.hwid) if os.name == "nt": try: import winreg with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, r"HARDWARE\DEVICEMAP\SERIALCOMM") as key: index = 0 while True: try: value_name, device, _ = winreg.EnumValue(key, index) except OSError: break if str(device).upper() not in ports: add_serial_port(ports, device, "Windows serial port", f"registry:{value_name}") index += 1 except (ImportError, OSError): pass return sorted(ports.values(), key=port_sort_key) class BridgeState: def __init__(self): self.sensors = [dict(item) for item in DEFAULT_SENSORS] self.valves = [dict(item) for item in DEFAULT_VALVES] state = BridgeState() PREFERRED_PORT_TOKENS = ( "st-link", "stm", "stmicro", "usb", "vcp", "virtual com", "usb serial", ) def add_serial_port(ports: dict[str, dict[str, str]], device: object, description: object = "", hwid: object = "") -> None: device_text = str(device).strip() if not device_text: return key = device_text.upper() ports[key] = { "device": device_text, "description": str(description or "").strip(), "hwid": str(hwid or "").strip(), } def port_text(port: dict[str, str]) -> str: return " ".join([port.get("device", ""), port.get("description", ""), port.get("hwid", "")]).lower() def port_com_number(port: dict[str, str]) -> int: device = port.get("device", "").upper() if device.startswith("COM") and device[3:].isdigit(): return int(device[3:]) return 0 def port_sort_key(port: dict[str, str]) -> tuple[int, int, str]: text = port_text(port) rank = 0 if any(token in text for token in PREFERRED_PORT_TOKENS) else 1 com_num = port_com_number(port) if 1 <= com_num <= 4: rank = 2 return (rank, com_num, port.get("device", "").upper()) def preferred_serial_port(ports: list[dict[str, str]]) -> str | None: if not ports: return None return sorted(ports, key=port_sort_key)[0]["device"] state_lock = threading.Lock() def valve_open_degrees_max(valve: dict) -> int: return int(clamp(valve.get("openDegreesMax", DEFAULT_OPEN_DEGREES_MAX), 1, 10000)) def position_from_degrees(degrees: int, max_degrees: int) -> int: if max_degrees <= 0: return 0 return int(round(clamp(degrees, 0, max_degrees) * 100 / max_degrees)) def degrees_from_position(position: int, max_degrees: int) -> int: return int(round(clamp(position, 0, 100) * max_degrees / 100)) class SerialBridge: def __init__(self, port: str, baudrate: int, parity: str, stopbits: float, bytesize: int, timeout: float): self.port = port self.baudrate = baudrate self.parity = parity self.stopbits = stopbits self.bytesize = bytesize self.timeout = timeout self.transport = "rtu" self.tcp_host = "" self.tcp_port = 502 self.tcp_socket = None self.tcp_file = None self.unit_id = MODBUS_UNIT_ID self.transaction_id = 0 self.request_lock = threading.Lock() self.serial = None self.running = False self.thread = None def configure_rtu(self, port: str, baudrate: int, parity: str, stopbits: float, bytesize: int, timeout: float): self.transport = "rtu" self.port = port self.baudrate = baudrate self.parity = parity self.stopbits = stopbits self.bytesize = bytesize self.timeout = timeout def configure_tcp(self, host: str, tcp_port: int = 502, timeout: float = 0.8): self.transport = "tcp" self.tcp_host = host self.tcp_port = int(tcp_port) self.timeout = timeout self.port = f"{self.tcp_host}:{self.tcp_port}" def connect(self): if self.running and self.thread and self.thread.is_alive(): self.disconnect() if self.transport == "tcp": self.tcp_socket = socket.create_connection((self.tcp_host, self.tcp_port), timeout=self.timeout) self.tcp_socket.settimeout(self.timeout) self.tcp_file = self.tcp_socket.makefile("rb") else: self.serial = serial.Serial( self.port, self.baudrate, parity=self.parity, stopbits=self.stopbits, bytesize=self.bytesize, timeout=self.timeout, ) self.running = True self.thread = threading.Thread(target=self.modbus_poll_loop, daemon=True) self.thread.start() def disconnect(self): self.running = False if self.thread and self.thread.is_alive(): try: self.thread.join(timeout=0.5) except Exception: pass if self.serial and self.serial.is_open: self.serial.close() if self.tcp_file: try: self.tcp_file.close() except Exception: pass self.tcp_file = None if self.tcp_socket: try: self.tcp_socket.close() except Exception: pass self.tcp_socket = None def is_connected(self): if self.transport == "tcp": return self.tcp_socket is not None return bool(self.serial and self.serial.is_open) def status(self): connected = self.is_connected() if self.transport == "tcp": return { "ok": True, "connected": connected, "transport": "tcp", "unitId": self.unit_id, "host": self.tcp_host, "tcpPort": self.tcp_port, "address": f"{self.tcp_host}:{self.tcp_port}" if self.tcp_host else "", "port": f"{self.tcp_host}:{self.tcp_port}" if connected and self.tcp_host else None, } return { "ok": True, "connected": connected, "transport": "rtu", "unitId": self.unit_id, "port": self.port if connected else None, "baudrate": self.baudrate, } def send(self, payload: dict): if not self.is_connected(): return typ = str(payload.get("type", "")).lower() if typ == "sensor": self.write_sensor_payload(payload) return if typ == "valve": self.write_valve_payload(payload) return def _recv_exact_tcp(self, size: int) -> bytes: data = bytearray() while len(data) < size: chunk = self.tcp_socket.recv(size - len(data)) if not chunk: raise ConnectionError("Modbus TCP connection closed") data.extend(chunk) return bytes(data) def _read_exact_serial(self, size: int) -> bytes: data = bytearray() deadline = time.monotonic() + max(float(self.timeout or 0.3), 0.3) while len(data) < size and time.monotonic() < deadline: chunk = self.serial.read(size - len(data)) if chunk: data.extend(chunk) else: time.sleep(0.005) if len(data) != size: raise TimeoutError("Modbus RTU response timeout") return bytes(data) def modbus_request(self, function: int, payload: bytes) -> bytes: pdu = bytes([function]) + payload with self.request_lock: if self.transport == "tcp": self.transaction_id = (self.transaction_id + 1) & 0xFFFF mbap = ( self.transaction_id.to_bytes(2, "big") + b"\x00\x00" + (len(pdu) + 1).to_bytes(2, "big") + bytes([self.unit_id]) ) self.tcp_socket.sendall(mbap + pdu) header = self._recv_exact_tcp(7) length = int.from_bytes(header[4:6], "big") response_pdu = self._recv_exact_tcp(max(0, length - 1)) else: frame = bytes([self.unit_id]) + pdu crc = modbus_crc16(frame) request = frame + crc.to_bytes(2, "little") try: self.serial.reset_input_buffer() except Exception: pass self.serial.write(request) header = self._read_exact_serial(3) if header[0] != self.unit_id: raise ValueError("Unexpected Modbus RTU unit id") if header[1] & 0x80: tail = self._read_exact_serial(2) frame = header + tail if modbus_crc16(frame[:-2]) != int.from_bytes(frame[-2:], "little"): raise ValueError("Bad Modbus RTU CRC") raise RuntimeError(f"Modbus exception {header[2]}") if header[1] in (0x01, 0x02, 0x03, 0x04): tail = self._read_exact_serial(header[2] + 2) else: tail = self._read_exact_serial(5) frame = header + tail if modbus_crc16(frame[:-2]) != int.from_bytes(frame[-2:], "little"): raise ValueError("Bad Modbus RTU CRC") response_pdu = frame[1:-2] if not response_pdu: raise TimeoutError("Empty Modbus response") if response_pdu[0] & 0x80: code = response_pdu[1] if len(response_pdu) > 1 else 0 raise RuntimeError(f"Modbus exception {code}") if response_pdu[0] != function: raise ValueError("Unexpected Modbus function") return response_pdu def read_registers(self, function: int, address: int, count: int) -> list[int]: response = self.modbus_request(function, address.to_bytes(2, "big") + count.to_bytes(2, "big")) byte_count = response[1] data = response[2:2 + byte_count] return [int.from_bytes(data[index:index + 2], "big") for index in range(0, len(data), 2)] def read_input_registers(self, address: int, count: int) -> list[int]: return self.read_registers(0x04, address, count) def read_holding_registers(self, address: int, count: int) -> list[int]: return self.read_registers(0x03, address, count) def read_coils(self, address: int, count: int) -> list[bool]: response = self.modbus_request(0x01, address.to_bytes(2, "big") + count.to_bytes(2, "big")) data = response[2:2 + response[1]] bits: list[bool] = [] for byte in data: for bit in range(8): bits.append(bool(byte & (1 << bit))) if len(bits) >= count: return bits return bits def write_register(self, address: int, value: int): self.modbus_request(0x06, address.to_bytes(2, "big") + (int(value) & 0xFFFF).to_bytes(2, "big")) def write_coil(self, address: int, value: bool): raw_value = 0xFF00 if value else 0x0000 self.modbus_request(0x05, address.to_bytes(2, "big") + raw_value.to_bytes(2, "big")) def write_sensor_payload(self, payload: dict): sensor_number = numeric_suffix(payload.get("id"), 1) channel = max(0, min(SENSOR_COUNT - 1, sensor_number - 1)) if "setpoint" in payload: setpoint_x10 = round(float(payload["setpoint"]) * MODBUS_TEMPERATURE_SCALE) self.write_register(MODBUS_SETPOINT_HOLDING_BASE + channel, setpoint_x10) self.write_register(MODBUS_ROOM_HOLDING_BASE + channel * MODBUS_ROOM_HOLDING_REGS + MODBUS_ROOM_HOLDING_SETPOINT_X10, setpoint_x10) self.write_register(MODBUS_ROOM_HOLDING_BASE + channel * MODBUS_ROOM_HOLDING_REGS + MODBUS_ROOM_HOLDING_APPLY, 1) self.write_coil(MODBUS_APPLY_PARAMS_COIL, True) def write_valve_payload(self, payload: dict): valve_number = numeric_suffix(payload.get("id"), 1) channel = max(0, min(SENSOR_COUNT - 1, (valve_number - 1) // 2)) room_base = MODBUS_ROOM_HOLDING_BASE + channel * MODBUS_ROOM_HOLDING_REGS if "targetTemp" in payload: setpoint_x10 = round(float(payload["targetTemp"]) * MODBUS_TEMPERATURE_SCALE) self.write_register(MODBUS_SETPOINT_HOLDING_BASE + channel, setpoint_x10) self.write_register(room_base + MODBUS_ROOM_HOLDING_SETPOINT_X10, setpoint_x10) self.write_register(room_base + MODBUS_ROOM_HOLDING_APPLY, 1) self.write_coil(MODBUS_APPLY_PARAMS_COIL, True) if "mode" in payload: mode = ROOM_MODE_MANUAL if str(payload.get("mode")).lower() == "manual" else ROOM_MODE_AUTO self.write_register(room_base + MODBUS_ROOM_HOLDING_MODE, mode) if "position" not in payload and "isOpen" not in payload: return if "position" in payload: position_pct = int(clamp(payload.get("position") or 0, 0, 100)) should_open = position_pct > 0 else: should_open = bool(payload.get("isOpen")) position_pct = 100 if should_open else 0 self.write_register(room_base + MODBUS_ROOM_HOLDING_POSITION_PCT, position_pct) self.write_register(room_base + MODBUS_ROOM_HOLDING_COMMAND, ROOM_COMMAND_OPEN if should_open else ROOM_COMMAND_CLOSE) self.write_coil(MODBUS_VALVE_OPEN_COIL_BASE + channel, should_open) self.write_coil(MODBUS_VALVE_CLOSE_COIL_BASE + channel, not should_open) def modbus_poll_loop(self): while self.running: try: self.poll_modbus_state() except Exception: time.sleep(0.2) time.sleep(MODBUS_POLL_INTERVAL) def apply_room_registers(self, room_regs: list[int]): with state_lock: for index in range(SENSOR_COUNT): start = index * MODBUS_ROOM_INPUT_REGS room = room_regs[start:start + MODBUS_ROOM_INPUT_REGS] if len(room) < MODBUS_ROOM_INPUT_REGS: continue sensor = state.sensors[index] sensor["value"] = round(to_signed16(room[6]) / MODBUS_TEMPERATURE_SCALE, 1) sensor["setpoint"] = round(to_signed16(room[7]) / MODBUS_TEMPERATURE_SCALE, 1) sensor["ds18b20Id"] = ds18b20_id_from_registers(room[2:6]) sensor["connected"] = bool(room[12]) sensor["locationCode"] = int(room[1]) position = int(clamp(room[9], 0, 100)) open_degrees_max = int(room[11] or DEFAULT_OPEN_DEGREES_MAX) open_degrees = int(clamp(room[10], 0, open_degrees_max)) mode = "manual" if int(room[15]) == ROOM_MODE_MANUAL else "auto" open_state = bool(room[13]) close_state = bool(room[14]) open_index = index * 2 close_index = open_index + 1 if open_index < len(state.valves): valve = state.valves[open_index] valve["targetTemp"] = sensor["setpoint"] valve["mode"] = mode valve["position"] = position valve["openDegrees"] = open_degrees valve["openDegreesMax"] = open_degrees_max valve["isOpen"] = open_state or position > 0 valve["commandState"] = int(room[16]) if close_index < len(state.valves): valve = state.valves[close_index] valve["targetTemp"] = sensor["setpoint"] valve["mode"] = mode valve["position"] = 100 if close_state else 0 valve["openDegrees"] = degrees_from_position(valve["position"], open_degrees_max) valve["openDegreesMax"] = open_degrees_max valve["isOpen"] = close_state valve["commandState"] = int(room[16]) def poll_modbus_state(self): if not self.is_connected(): return try: room_regs = self.read_input_registers(MODBUS_ROOM_INPUT_BASE, SENSOR_COUNT * MODBUS_ROOM_INPUT_REGS) self.apply_room_registers(room_regs) return except Exception: pass temperatures = self.read_input_registers(MODBUS_TEMP_INPUT_BASE, SENSOR_COUNT) sensor_ids = self.read_input_registers(MODBUS_DS18B20_ID_INPUT_BASE, SENSOR_COUNT * 4) setpoints = self.read_holding_registers(MODBUS_SETPOINT_HOLDING_BASE, SENSOR_COUNT) try: sensor_connected = self.read_coils(MODBUS_SENSOR_STATUS_COIL_BASE, SENSOR_COUNT) except Exception: sensor_connected = [True] * SENSOR_COUNT try: valve_open = self.read_coils(MODBUS_VALVE_OPEN_COIL_BASE, SENSOR_COUNT) except Exception: valve_open = [False] * SENSOR_COUNT try: valve_close = self.read_coils(MODBUS_VALVE_CLOSE_COIL_BASE, SENSOR_COUNT) except Exception: valve_close = [False] * SENSOR_COUNT with state_lock: for index, sensor in enumerate(state.sensors[:SENSOR_COUNT]): if index < len(temperatures): sensor["value"] = round(to_signed16(temperatures[index]) / 10.0, 1) id_start = index * 4 ds18b20_id = ds18b20_id_from_registers(sensor_ids[id_start:id_start + 4]) if ds18b20_id: sensor["ds18b20Id"] = ds18b20_id if index < len(setpoints): sensor["setpoint"] = round(to_signed16(setpoints[index]) / MODBUS_TEMPERATURE_SCALE, 1) if index < len(sensor_connected): sensor["connected"] = bool(sensor_connected[index]) for index in range(SENSOR_COUNT): open_index = index * 2 close_index = open_index + 1 open_state = bool(valve_open[index]) if index < len(valve_open) else False close_state = bool(valve_close[index]) if index < len(valve_close) else False position = 100 if open_state else 0 if close_state else int(state.valves[open_index].get("position", 0)) if open_index < len(state.valves): valve = state.valves[open_index] valve["isOpen"] = open_state valve["position"] = position valve["openDegrees"] = degrees_from_position(position, valve_open_degrees_max(valve)) if index < len(setpoints): valve["targetTemp"] = round(to_signed16(setpoints[index]) / MODBUS_TEMPERATURE_SCALE, 1) if close_index < len(state.valves): valve = state.valves[close_index] valve["isOpen"] = close_state valve["position"] = 100 if close_state else 0 valve["openDegrees"] = degrees_from_position(valve["position"], valve_open_degrees_max(valve)) if index < len(setpoints): valve["targetTemp"] = round(to_signed16(setpoints[index]) / MODBUS_TEMPERATURE_SCALE, 1) def read_loop(self): while self.running: try: raw = self.tcp_file.readline() if self.transport == "tcp" else self.serial.readline() if not raw: continue try: line = raw.decode("utf-8", errors="ignore").strip() except Exception: line = "" if not line: continue updated = parse_input_line(line) if not updated: continue with state_lock: apply_updates(updated) except Exception: time.sleep(0.05) def zone_to_id(zone: Optional[str], kind: str): if zone is None: return None idx = str(zone).strip().replace("zone_", "") if kind == "sensor": return f"zone_{idx}" return f"valve_{idx}" def parse_input_line(line: str): # JSON: # {"sensors":[...], "valves":[...]} or {"s1": {"value": 24.6}, ...} try: payload = json.loads(line) except Exception: payload = None if isinstance(payload, dict): if isinstance(payload.get("sensors"), list) or isinstance(payload.get("valves"), list): return { "sensors": payload.get("sensors") if isinstance(payload.get("sensors"), list) else None, "valves": payload.get("valves") if isinstance(payload.get("valves"), list) else None, } if any(isinstance(k, str) and (k.startswith("T") or k.startswith("S")) for k in payload.keys()): return {"kv": payload} if isinstance(payload, list): # allow list with {"type":"sensor",...} return {"items": payload} # key=value format: # T1=24.7;T2=25.1;V1_MODE=auto;V1_POS=45;... pairs = [p.strip() for p in line.split(";") if "=" in p] updates = {"kv": {}} for pair in pairs: key, value = pair.split("=", 1) updates["kv"][key.strip().upper()] = value.strip() if not updates["kv"]: return None return updates def apply_updates(payload): if not payload: return if "sensors" in payload and isinstance(payload["sensors"], list): for item in payload["sensors"]: sid = str(item.get("id") or item.get("sensorId") or item.get("code") or "").strip() sensor = next((s for s in state.sensors if s["id"] == sid), None) if not sensor: continue if "value" in item: sensor["value"] = clamp(item["value"], -1000, 1000) if "setpoint" in item: sensor["setpoint"] = clamp(item["setpoint"], -1000, 1000) if "unit" in item: sensor["unit"] = item["unit"] if "valves" in payload and isinstance(payload["valves"], list): for item in payload["valves"]: vid = str(item.get("id") or item.get("valveId") or item.get("code") or "").strip() valve = next((v for v in state.valves if v["id"] == vid), None) if not valve: continue if "mode" in item: mode = str(item["mode"]).lower() valve["mode"] = "manual" if mode == "manual" else "auto" if "position" in item: valve["position"] = int(clamp(item["position"], 0, 100)) valve["openDegrees"] = degrees_from_position(valve["position"], valve_open_degrees_max(valve)) if "openDegreesMax" in item: valve["openDegreesMax"] = int(clamp(item["openDegreesMax"], 1, 10000)) if "openDegrees" in item: max_degrees = valve_open_degrees_max(valve) valve["openDegrees"] = int(clamp(item["openDegrees"], 0, max_degrees)) valve["position"] = position_from_degrees(valve["openDegrees"], max_degrees) if "targetTemp" in item: valve["targetTemp"] = clamp(item["targetTemp"], -1000, 1000) if "isOpen" in item: valve["isOpen"] = bool(item["isOpen"]) if "items" in payload and isinstance(payload["items"], list): for item in payload["items"]: if not isinstance(item, dict): continue typ = str(item.get("type", "")).lower() if typ == "sensor": sid = str(item.get("id") or item.get("sensorId") or item.get("zone") or "") sensor = next((s for s in state.sensors if s["id"] == sid), None) if sensor: if "value" in item: sensor["value"] = clamp(item["value"], -1000, 1000) elif typ == "valve": vid = str(item.get("id") or item.get("valveId") or item.get("zone") or "") valve = next((v for v in state.valves if v["id"] == vid), None) if valve: if "mode" in item: mode = str(item["mode"]).lower() valve["mode"] = "manual" if mode == "manual" else "auto" if "openDegreesMax" in item: valve["openDegreesMax"] = int(clamp(item["openDegreesMax"], 1, 10000)) if "openDegrees" in item: max_degrees = valve_open_degrees_max(valve) valve["openDegrees"] = int(clamp(item["openDegrees"], 0, max_degrees)) valve["position"] = position_from_degrees(valve["openDegrees"], max_degrees) if "position" in item and "openDegrees" not in item: max_degrees = valve_open_degrees_max(valve) valve["position"] = int(clamp(item["position"], 0, 100)) valve["openDegrees"] = degrees_from_position(valve["position"], max_degrees) kv = payload.get("kv", {}) if not kv: return sensor_map = {sensor["id"].split("_")[1]: sensor for sensor in state.sensors if "_" in sensor["id"]} valve_map = {valve["id"].split("_")[1]: valve for valve in state.valves if "_" in valve["id"]} # Temperature value keys # T1=24.5 | TEMP1=24.5 | Z1_TEMP=24.5 for key, raw_value in kv.items(): value = raw_value key_upper = key.upper() num = re.search(r"\d+", key_upper) idx = num.group(0) if num else None if key_upper.startswith("T") or key_upper.startswith("TEMP") or "TEMP" in key_upper: zone = idx if zone and zone in sensor_map: try: sensor_map[zone]["value"] = clamp(value, -1000, 1000) except Exception: pass continue if key_upper.startswith("S") and idx and "SETPOINT" in key_upper: zone = idx if zone in sensor_map: try: sensor_map[zone]["setpoint"] = clamp(value, -1000, 1000) except Exception: pass continue # valve keys: V1_MODE=auto, V1_POS=45, V1_TGT=28 if key_upper.startswith("V") and idx: valve = valve_map.get(idx) if not valve: continue if "MODE" in key_upper: valve["mode"] = "manual" if "MANUAL" in key_upper and str(value).lower() == "manual" else str(value).lower() if "POS" in key_upper: try: valve["position"] = int(clamp(value, 0, 100)) valve["openDegrees"] = degrees_from_position(valve["position"], valve_open_degrees_max(valve)) except Exception: pass if "TGT" in key_upper: try: valve["targetTemp"] = clamp(value, -1000, 1000) except Exception: pass if "MAXDEG" in key_upper: max_degrees = int(clamp(value, 1, 10000)) valve["openDegreesMax"] = max_degrees max_degrees = valve_open_degrees_max(valve) try: valve["openDegrees"] = int( clamp(valve.get("openDegrees", degrees_from_position(valve.get("position", 0), max_degrees)), 0, max_degrees) ) valve["position"] = position_from_degrees(valve["openDegrees"], max_degrees) except Exception: pass elif "DEG" in key_upper: try: max_degrees = valve_open_degrees_max(valve) valve["openDegrees"] = int(clamp(value, 0, max_degrees)) valve["position"] = position_from_degrees(valve["openDegrees"], max_degrees) except Exception: pass for valve in state.valves: max_degrees = valve_open_degrees_max(valve) valve["openDegrees"] = int(clamp(valve.get("openDegrees", degrees_from_position(valve.get("position", 0), max_degrees)), 0, max_degrees)) valve["isOpen"] = int(valve.get("position", 0)) > 0 def update_sensor(id_, patch): sid = str(id_) with state_lock: sensor = next((item for item in state.sensors if item["id"] == sid), None) if not sensor: return False sensor.update(patch) return True def update_valve(id_, patch): vid = str(id_) with state_lock: valve = next((item for item in state.valves if item["id"] == vid), None) if not valve: return False if "openDegreesMax" in patch: valve["openDegreesMax"] = int(clamp(patch["openDegreesMax"], 1, 10000)) if "mode" in patch: mode = str(patch["mode"]).lower() valve["mode"] = "manual" if mode == "manual" else "auto" if "targetTemp" in patch: valve["targetTemp"] = clamp(patch["targetTemp"], -1000, 1000) if "isOpen" in patch: valve["isOpen"] = bool(patch["isOpen"]) if "name" in patch: valve["name"] = patch["name"] if "openDegrees" in patch: max_degrees = valve_open_degrees_max(valve) valve["openDegrees"] = int(clamp(patch["openDegrees"], 0, max_degrees)) valve["position"] = position_from_degrees(valve["openDegrees"], max_degrees) if "position" in patch: max_degrees = valve_open_degrees_max(valve) valve["position"] = int(clamp(patch["position"], 0, 100)) valve["openDegrees"] = degrees_from_position(valve["position"], max_degrees) if "openDegreesMax" in patch and "openDegrees" not in patch and "position" not in patch: max_degrees = valve_open_degrees_max(valve) valve["openDegrees"] = int(clamp(valve.get("openDegrees", 0), 0, max_degrees)) valve["isOpen"] = int(valve.get("position", 0)) > 0 return True def _calibrate_valve_locked(valve: dict) -> dict: valve["position"] = 0 valve["openDegrees"] = 0 valve["isOpen"] = False return dict(valve) def calibrate_valve(channel_id: str) -> dict | None: with state_lock: valve = next((item for item in state.valves if item["id"] == str(channel_id)), None) if not valve: return None updated = _calibrate_valve_locked(valve) bridge.send({ "type": "valve", "id": str(channel_id), "command": "calibrate", "position": 0, "openDegrees": 0, "close": True, }) return updated def calibrate_all_valves() -> list[dict]: with state_lock: calibrated = [_calibrate_valve_locked(valve) for valve in state.valves] for item in calibrated: bridge.send({ "type": "valve", "id": item["id"], "command": "calibrate", "position": 0, "openDegrees": 0, "close": True, }) return [dict(item) for item in calibrated] def snapshot_sensors(): with state_lock: return [dict(item) for item in state.sensors] def snapshot_valves(): with state_lock: return [dict(item) for item in state.valves] class Handler(SimpleHTTPRequestHandler): def end_headers(self): self.send_header("Cache-Control", "no-store, no-cache, must-revalidate, max-age=0") self.send_header("Pragma", "no-cache") self.send_header("Expires", "0") super().end_headers() def _send_json(self, payload, code=200): body = json.dumps(payload, ensure_ascii=False).encode("utf-8") self.send_response(code) self.send_header("Content-Type", "application/json; charset=utf-8") self.send_header("Access-Control-Allow-Origin", "*") self.send_header("Access-Control-Allow-Methods", "GET, PUT, POST, OPTIONS") self.send_header("Access-Control-Allow-Headers", "Content-Type") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) def do_OPTIONS(self): self.send_response(204) self.send_header("Access-Control-Allow-Origin", "*") self.send_header("Access-Control-Allow-Methods", "GET, PUT, POST, OPTIONS") self.send_header("Access-Control-Allow-Headers", "Content-Type") self.end_headers() def do_GET(self): parsed = urlparse(self.path) if parsed.path == "/api/sensors": self._send_json(snapshot_sensors()) return if parsed.path == "/api/valves": self._send_json(snapshot_valves()) return if parsed.path in ("/api/serial/ports", "/api/ports"): ports = discover_serial_ports() self._send_json( {"ok": True, "ports": ports, "selected": bridge.port or preferred_serial_port(ports)} ) return if parsed.path in ("/api/serial/status", "/api/state"): status = bridge.status() if status.get("connected") and bridge.port: status["selected"] = bridge.port self._send_json(status) return if parsed.path == "/": self.path = "/index.html" return super().do_GET() def do_POST(self): parsed = urlparse(self.path) parts = [seg for seg in parsed.path.split("/") if seg] if len(parts) == 3 and parts[0] == "api" and parts[1] == "valves" and parts[2] == "calibrate-all": status = {"ok": True, "valves": calibrate_all_valves(), "message": "calibration started"} self._send_json(status) return if len(parts) == 4 and parts[0] == "api" and parts[1] == "valves" and parts[3] == "calibrate": valve_id = parts[2] updated = calibrate_valve(valve_id) if not updated: self.send_response(404) self.end_headers() return self._send_json({"ok": True, "valve": updated, "message": "calibration started"}) return if parsed.path in ("/api/serial/connect", "/api/connect"): length = int(self.headers.get("Content-Length", "0") or "0") body = self.rfile.read(length) try: payload = json.loads(body.decode("utf-8", errors="ignore")) if body else {} except Exception: payload = {} transport = str(payload.get("transport", payload.get("mode", "rtu"))).lower() timeout = float(payload.get("timeout", 0.8 if transport == "tcp" else 0.3)) try: unit_id = int(payload.get("unitId", payload.get("slaveId", payload.get("slave", bridge.unit_id)))) except Exception: unit_id = MODBUS_UNIT_ID bridge.unit_id = int(clamp(unit_id, 1, 247)) if transport in ("tcp", "modbus_tcp", "modbus-tcp"): host = str(payload.get("host") or payload.get("ip") or payload.get("address") or "").strip() if not host: self._send_json({"ok": False, "error": "TCP host/IP is required"}, 400) return tcp_port = int(payload.get("tcpPort", payload.get("tcp_port", payload.get("modbusPort", payload.get("port", 502))))) bridge.configure_tcp(host, tcp_port, timeout) else: port = str(payload.get("port", "")).strip() if not port: self.send_response(400) self.end_headers() return baud = payload.get("baudrate", payload.get("baud", 115200)) parity = payload.get("parity", "N") stopbits = payload.get("stopbits", payload.get("stopBits", 1)) bytesize = payload.get("bytesize", payload.get("byteSize", 8)) bridge.configure_rtu(port, baud, parity, stopbits, bytesize, timeout) try: bridge.connect() except Exception as exc: self._send_json({"ok": False, "error": str(exc)}, 500) return calibrate_all_valves() status = bridge.status() status["ok"] = True self._send_json(status) return if parsed.path in ("/api/serial/disconnect", "/api/disconnect"): transport = bridge.transport bridge.disconnect() self._send_json({"ok": True, "connected": False, "transport": transport}) return self.send_response(404) self.end_headers() def do_PUT(self): parsed = urlparse(self.path) parts = [seg for seg in parsed.path.split("/") if seg] if len(parts) != 3 or parts[0] != "api": self.send_response(404) self.end_headers() return _, resource, raw_id = parts item_id = raw_id length = int(self.headers.get("Content-Length", "0") or "0") body = self.rfile.read(length) try: payload = json.loads(body.decode("utf-8", errors="ignore")) if body else {} except Exception: payload = {} if resource == "sensors": patch = {} if "setpoint" in payload: patch["setpoint"] = float(payload["setpoint"]) if not patch: patch = {k: v for k, v in payload.items() if k in ("setpoint", "value", "unit", "name")} if not update_sensor(item_id, patch): self.send_response(404) self.end_headers() return if bridge.is_connected(): try: bridge.send({"type": "sensor", "id": item_id, **patch}) except Exception as exc: self._send_json({"ok": False, "error": str(exc)}, 500) return self._send_json(next(item for item in state.sensors if item["id"] == item_id)) return if resource == "valves": patch = {} if "mode" in payload: mode = str(payload["mode"]).lower() patch["mode"] = "manual" if mode == "manual" else "auto" if "position" in payload: patch["position"] = int(clamp(payload["position"], 0, 100)) if "targetTemp" in payload: patch["targetTemp"] = float(payload["targetTemp"]) if not patch: patch = {k: v for k, v in payload.items() if k in ("mode", "position", "targetTemp", "isOpen", "name")} if not update_valve(item_id, patch): self.send_response(404) self.end_headers() return if bridge.is_connected(): try: bridge.send({"type": "valve", "id": item_id, **patch}) except Exception as exc: self._send_json({"ok": False, "error": str(exc)}, 500) return self._send_json(next(item for item in state.valves if item["id"] == item_id)) return self.send_response(404) self.end_headers() def parse_args(): parser = argparse.ArgumentParser(description="Serial bridge for MCU data to Web GUI") parser.add_argument("--host", default="0.0.0.0") parser.add_argument("--port", type=int, default=8080) parser.add_argument("--serial-port", default="") parser.add_argument("--baudrate", type=int, default=115200) parser.add_argument("--parity", default="N") parser.add_argument("--stopbits", type=float, default=1) parser.add_argument("--bytesize", type=int, default=8) parser.add_argument("--timeout", type=float, default=0.3) return parser.parse_args() if __name__ == "__main__": args = parse_args() bridge = SerialBridge( args.serial_port, baudrate=args.baudrate, parity=args.parity, stopbits=args.stopbits, bytesize=args.bytesize, timeout=args.timeout, ) if args.serial_port: try: bridge.connect() print(f"COM connected: {args.serial_port} @ {args.baudrate}") calibrate_all_valves() except Exception as exc: print(f"Не удалось открыть {args.serial_port}: {exc}. Запуск только в памяти.") else: print("COM порт не указан. Сервер стартует в локальном режиме без чтения порта.") server = HTTPServer((args.host, args.port), Handler) print(f"Server started: http://{args.host}:{args.port}") try: server.serve_forever() except KeyboardInterrupt: pass finally: bridge.disconnect() server.server_close()