Files
ds18b20-MODBUS/john103C6T6NewVer/mock_server.py
2026-06-25 17:25:41 +03:00

315 lines
11 KiB
Python

import json
import random
import threading
import time
from http.server import HTTPServer, SimpleHTTPRequestHandler
from pathlib import Path
from urllib.parse import parse_qs, urlparse
SENSOR_COUNT = 16
VALVE_COUNT = 32
DEFAULT_OPEN_DEGREES_MAX = 90
def make_default_sensors():
return [
{
"id": f"zone_{number}",
"name": f"Датчик {number}",
"value": 24.0 + (number % 4),
"setpoint": 28.0,
"unit": "°C",
"zone": str(number),
"ds18b20Id": f"28-00-00-00-00-00-00-{number:02X}",
}
for number in range(1, SENSOR_COUNT + 1)
]
def make_default_valves():
valves = []
for index in range(VALVE_COUNT):
number = index + 1
zone = (index % SENSOR_COUNT) + 1
valves.append(
{
"id": f"valve_{number}",
"name": f"Клапан {number}",
"zone": str(zone),
"mode": "auto",
"position": 0,
"targetTemp": 28,
"isOpen": False,
"openDegrees": 0,
"openDegreesMax": DEFAULT_OPEN_DEGREES_MAX,
}
)
return valves
SENSORS = make_default_sensors()
VALVES = make_default_valves()
SERIAL_STATE = {"connected": False, "transport": "rtu", "port": None, "unitId": 3}
state_lock = threading.Lock()
def clamp(value, min_value, max_value):
return max(min_value, min(max_value, value))
def open_degrees_max(valve):
return max(1, int(valve.get("openDegreesMax", DEFAULT_OPEN_DEGREES_MAX)))
def position_to_degrees(position, valve):
max_degrees = open_degrees_max(valve)
return clamp(round(position * max_degrees / 100), 0, max_degrees)
def apply_open_counter(valve):
valve["openDegrees"] = position_to_degrees(valve.get("position", 0), valve)
def sensor_by_zone(zone):
for sensor in SENSORS:
if str(sensor["zone"]) == str(zone):
return sensor
return None
def valve_by_id(valve_id):
for valve in VALVES:
if valve["id"] == valve_id:
return valve
return None
def calibrate_valve(valve):
valve["position"] = 0
valve["openDegrees"] = 0
valve["isOpen"] = False
def calibrate_all_valves():
for valve in VALVES:
calibrate_valve(valve)
def update_physics_loop():
while True:
with state_lock:
for sensor in SENSORS:
valve = valve_by_id(f"valve_{sensor['zone']}")
if not valve:
continue
if valve["mode"] == "manual":
target = 20 + clamp(valve["position"], 0, 100) * 0.7
else:
target = float(valve.get("targetTemp", sensor["setpoint"]))
drift = (target - sensor["value"]) * 0.07
noise = (random.random() - 0.5) * 0.2
sensor["value"] = clamp(sensor["value"] + drift + noise, -40, 150)
sensor["value"] = round(sensor["value"], 2)
for valve in VALVES:
apply_open_counter(valve)
valve["isOpen"] = valve.get("position", 0) > 0
time.sleep(2.0)
class Handler(SimpleHTTPRequestHandler):
def end_headers(self):
self.send_header("Cache-Control", "no-store, no-cache, must-revalidate, max-age=0")
self.send_header("Pragma", "no-cache")
self.send_header("Expires", "0")
super().end_headers()
def _send_json(self, payload, code=200):
body = json.dumps(payload).encode("utf-8")
self.send_response(code)
self.send_header("Content-Type", "application/json; charset=utf-8")
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Access-Control-Allow-Methods", "GET, POST, PUT, OPTIONS")
self.send_header("Access-Control-Allow-Headers", "Content-Type")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def do_OPTIONS(self):
self.send_response(204)
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Access-Control-Allow-Methods", "GET, PUT, OPTIONS")
self.send_header("Access-Control-Allow-Headers", "Content-Type")
self.end_headers()
def do_POST(self):
parsed = urlparse(self.path)
parts = [seg for seg in parsed.path.split("/") if seg]
if parsed.path in ("/api/serial/connect", "/api/connect"):
length = int(self.headers.get("Content-Length", "0") or "0")
body = self.rfile.read(length)
try:
payload = json.loads(body.decode("utf-8", errors="ignore")) if body else {}
except Exception:
payload = {}
transport = str(payload.get("transport", payload.get("mode", "rtu"))).lower()
try:
unit_id = int(payload.get("unitId", payload.get("slaveId", payload.get("slave", SERIAL_STATE.get("unitId", 3)))))
except Exception:
unit_id = 3
unit_id = max(1, min(247, unit_id))
if transport in ("tcp", "modbus_tcp", "modbus-tcp"):
host = str(payload.get("host") or payload.get("ip") or "127.0.0.1").strip()
tcp_port = int(payload.get("tcpPort", payload.get("tcp_port", payload.get("port", 502))))
SERIAL_STATE.update({
"connected": True,
"transport": "tcp",
"unitId": unit_id,
"host": host,
"tcpPort": tcp_port,
"address": f"{host}:{tcp_port}",
"port": f"{host}:{tcp_port}",
})
else:
port = str(payload.get("port") or "COM1").strip()
SERIAL_STATE.update({"connected": True, "transport": "rtu", "port": port, "unitId": unit_id})
self._send_json({"ok": True, **SERIAL_STATE})
return
if parsed.path in ("/api/serial/disconnect", "/api/disconnect"):
transport = SERIAL_STATE.get("transport", "rtu")
SERIAL_STATE.update({"connected": False, "transport": transport, "port": None})
self._send_json({"ok": True, **SERIAL_STATE})
return
if len(parts) == 3 and parts[0] == "api" and parts[1] == "valves" and parts[2] == "calibrate-all":
with state_lock:
calibrate_all_valves()
self._send_json({"ok": True, "valves": [dict(item) for item in VALVES], "message": "calibration started"})
return
if len(parts) == 4 and parts[0] == "api" and parts[1] == "valves" and parts[3] == "calibrate":
valve_id = parts[2]
with state_lock:
valve = valve_by_id(valve_id)
if not valve:
self.send_response(404)
self.end_headers()
return
calibrate_valve(valve)
self._send_json({"ok": True, "valve": dict(valve), "message": "calibration started"})
return
self.send_response(404)
self.end_headers()
def do_GET(self):
parsed = urlparse(self.path)
if parsed.path in ("/api/serial/status", "/api/state"):
self._send_json({"ok": True, **SERIAL_STATE})
return
if parsed.path in ("/api/serial/ports", "/api/ports"):
self._send_json({"ok": True, "ports": [], "selected": SERIAL_STATE.get("port")})
return
if parsed.path == "/api/sensors":
self._send_json(self._snapshot_sensors())
return
if parsed.path == "/api/valves":
self._send_json(self._snapshot_valves())
return
# Serve static files (index.html, app.js, styles.css)
file = Path(parsed.path.lstrip("/"))
if parsed.path == "/":
file = Path("index.html")
if file.exists():
return super().do_GET()
self.send_response(404)
self.send_header("Content-Type", "text/plain; charset=utf-8")
self.end_headers()
self.wfile.write(b"not found")
def do_PUT(self):
parsed = urlparse(self.path)
segments = [seg for seg in parsed.path.split("/") if seg]
if len(segments) == 2 and segments[0] == "api":
self.send_response(400)
self.end_headers()
return
if len(segments) != 3 or segments[0] != "api":
self.send_response(404)
self.end_headers()
return
_, resource, resource_id = segments
length = int(self.headers.get("Content-Length", "0"))
body = self.rfile.read(length)
try:
payload = json.loads(body.decode("utf-8")) if body else {}
except Exception:
payload = {}
with state_lock:
if resource == "sensors":
sensor = next((s for s in SENSORS if s["id"] == resource_id), None)
if not sensor:
self.send_response(404)
self.end_headers()
return
if "setpoint" in payload:
sensor["setpoint"] = float(payload["setpoint"])
self._send_json(sensor)
return
if resource == "valves":
valve = valve_by_id(resource_id)
if not valve:
self.send_response(404)
self.end_headers()
return
max_degrees = open_degrees_max(valve)
if "openDegreesMax" in payload:
valve["openDegreesMax"] = max(1, int(payload["openDegreesMax"]))
max_degrees = open_degrees_max(valve)
valve["openDegrees"] = position_to_degrees(valve["position"], valve)
if "openDegrees" in payload:
valve["openDegrees"] = clamp(int(payload["openDegrees"]), 0, open_degrees_max(valve))
valve["position"] = clamp(round(valve["openDegrees"] * 100 / max_degrees), 0, 100)
elif "position" in payload:
valve["position"] = clamp(int(payload["position"]), 0, 100)
valve["openDegrees"] = position_to_degrees(valve["position"], valve)
if "mode" in payload:
valve["mode"] = payload["mode"]
if "targetTemp" in payload:
valve["targetTemp"] = float(payload["targetTemp"])
valve["isOpen"] = valve["position"] > 0
self._send_json(valve)
return
self.send_response(404)
self.end_headers()
def _snapshot_sensors(self):
with state_lock:
return [dict(item) for item in SENSORS]
def _snapshot_valves(self):
with state_lock:
return [dict(item) for item in VALVES]
def main():
calibrate_all_valves()
threading.Thread(target=update_physics_loop, daemon=True).start()
server = HTTPServer(("0.0.0.0", 8080), Handler)
print("Server started: http://127.0.0.1:8080")
server.serve_forever()
if __name__ == "__main__":
main()