debugVarTool/Src/tms_debugvar_term.py

621 lines
25 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import sys
import struct
import datetime
from PySide2 import QtCore, QtWidgets, QtSerialPort
from PySide2.QtCore import QTimer
# ---------------------------------------------------------------- CRC util ---
def crc16_ibm(data: bytes, *, init=0xFFFF) -> int:
"""CRC16-IBM (aka CRC-16/ANSI, polynomial 0xA001 reflected)."""
crc = init
for b in data:
crc ^= b
for _ in range(8):
if crc & 1:
crc = (crc >> 1) ^ 0xA001
else:
crc >>= 1
return crc & 0xFFFF
class Spoiler(QtWidgets.QWidget):
def __init__(self, title="", animationDuration=300, parent=None):
super().__init__(parent)
self._animationDuration = animationDuration
# --- Toggle button ---
self.toggleButton = QtWidgets.QToolButton(self)
self.toggleButton.setStyleSheet("QToolButton { border: none; }")
self.toggleButton.setToolButtonStyle(QtCore.Qt.ToolButtonTextBesideIcon)
self.toggleButton.setArrowType(QtCore.Qt.RightArrow)
self.toggleButton.setText(title)
self.toggleButton.setCheckable(True)
# --- Header line ---
self.headerLine = QtWidgets.QFrame(self)
self.headerLine.setFrameShape(QtWidgets.QFrame.HLine)
self.headerLine.setFrameShadow(QtWidgets.QFrame.Sunken)
# --- Content area ---
self.contentArea = QtWidgets.QScrollArea(self)
self.contentArea.setSizePolicy(QtWidgets.QSizePolicy.Expanding, QtWidgets.QSizePolicy.Fixed)
self.contentArea.setFrameShape(QtWidgets.QFrame.NoFrame)
self.contentArea.setWidgetResizable(True)
self._contentWidget = QtWidgets.QWidget()
self.contentArea.setWidget(self._contentWidget)
self.contentArea.setMaximumHeight(0)
# --- Анимация только по контенту ---
self._ani_content = QtCore.QPropertyAnimation(self.contentArea, b"maximumHeight")
self._ani_content.setDuration(animationDuration)
self._ani_content.setEasingCurve(QtCore.QEasingCurve.InOutCubic)
# Следим за шагами анимации → обновляем родителя
self._ani_content.valueChanged.connect(self._adjust_parent_size)
# --- Layout ---
self.mainLayout = QtWidgets.QGridLayout(self)
self.mainLayout.setVerticalSpacing(0)
self.mainLayout.setContentsMargins(0, 0, 0, 0)
self.mainLayout.addWidget(self.toggleButton, 0, 0, 1, 1)
self.mainLayout.addWidget(self.headerLine, 0, 1, 1, 1)
self.mainLayout.addWidget(self.contentArea, 1, 0, 1, 2)
# --- Signals ---
self.toggleButton.clicked.connect(self._on_toggled)
def setContentLayout(self, contentLayout):
old = self._contentWidget.layout()
if old:
QtWidgets.QWidget().setLayout(old)
self._contentWidget.setLayout(contentLayout)
def _adjust_parent_size(self, *_):
top = self.window()
if top:
size = top.size()
size.setHeight(top.sizeHint().height()) # берём новую высоту
top.resize(size) # ширина остаётся прежней
def _on_toggled(self, checked: bool):
self.toggleButton.setArrowType(QtCore.Qt.DownArrow if checked else QtCore.Qt.RightArrow)
contentHeight = self._contentWidget.sizeHint().height()
self._ani_content.stop()
self._ani_content.setStartValue(self.contentArea.maximumHeight())
self._ani_content.setEndValue(contentHeight if checked else 0)
# --- Фиксируем ширину на время анимации ---
w = self.width()
self.setFixedWidth(w)
self._ani_content.finished.connect(lambda: self.setMaximumWidth(16777215)) # сброс фикса
self._ani_content.start()
class DebugTerminalWidget(QtWidgets.QWidget):
nameRead = QtCore.Signal(int, int, int, str) # index, status, iq, name
valueRead = QtCore.Signal(int, int, int, int, float) # index, status, iq, raw16, floatVal
portOpened = QtCore.Signal(str)
portClosed = QtCore.Signal(str)
txBytes = QtCore.Signal(bytes) # raw bytes sent
rxBytes = QtCore.Signal(bytes) # raw bytes received (frame only)
def __init__(self, parent=None, *,
start_byte=0x0A,
cmd_byte=0x44,
name_field_len=11,
signed=True,
iq_scaling=None,
read_timeout_ms=200,
auto_crc_check=True,
drop_if_busy=False,
replace_if_busy=True):
super().__init__(parent)
self.start_byte = start_byte
self.cmd_byte = cmd_byte
self.name_field_len = name_field_len
self.signed = signed
self.read_timeout_ms = read_timeout_ms
self.auto_crc_check = auto_crc_check
# lockstep policy flags
self._drop_if_busy = drop_if_busy
self._replace_if_busy = replace_if_busy
if iq_scaling is None:
iq_scaling = {n: float(1 << n) for n in range(16)}
iq_scaling[0] = 1.0
self.iq_scaling = iq_scaling
# Serial port ---------------------------------------------------------
self.serial = QtSerialPort.QSerialPort(self)
self.serial.setBaudRate(115200)
self.serial.readyRead.connect(self._on_ready_read)
self.serial.errorOccurred.connect(self._on_serial_error)
self._index_change_timer = QtCore.QTimer(self)
self._index_change_timer.setSingleShot(True)
self._index_change_timer.timeout.connect(self._on_index_change_timeout)
self._index_change_delay_ms = 200 # задержка перед отправкой запроса
# RX state ------------------------------------------------------------
self._rx_buf = bytearray()
self._waiting_name = False
self._expected_min_len = 0
self._expected_exact_len = None # if known exactly
# Lockstep tx/rx state ------------------------------------------------
self._busy = False # True => запрос отправлен, ждём ответ/таймаут
self._pending_cmd = None # (frame, is_name, index) ожидающий отправки
self._active_index = None # индекс текущего запроса (для сигналов)
# Timer for per-transaction timeout ----------------------------------
self._txn_timer = QtCore.QTimer(self)
self._txn_timer.setSingleShot(True)
self._txn_timer.timeout.connect(self._on_txn_timeout)
# Polling timer -------------------------------------------------------
self._poll_timer = QtCore.QTimer(self)
self._poll_timer.timeout.connect(self._on_poll_timeout)
self._polling = False
self._build_ui()
self.btn_open.clicked.connect(self._open_close_port)
self.btn_refresh.clicked.connect(self.set_available_ports)
self.btn_read_name.clicked.connect(self.request_name)
self.btn_read_value.clicked.connect(self.request_value)
self.btn_poll.clicked.connect(self._toggle_polling)
self.set_available_ports()
# ------------------------------------------------------------------ UI ---
def _build_ui(self):
main_layout = QtWidgets.QVBoxLayout(self)
main_layout.setContentsMargins(10, 10, 10, 10)
main_layout.setSpacing(12)
# --- Serial Port Group ---
port_group = QtWidgets.QGroupBox("Serial Port")
port_layout = QtWidgets.QHBoxLayout(port_group)
self.cmb_port = QtWidgets.QComboBox()
self.btn_refresh = QtWidgets.QPushButton("Refresh")
self.cmb_baud = QtWidgets.QComboBox()
self.cmb_baud.addItems(["9600", "19200", "38400", "57600", "115200", "230400"])
self.cmb_baud.setCurrentText("115200")
self.btn_open = QtWidgets.QPushButton("Open")
port_layout.addWidget(QtWidgets.QLabel("Port:"))
port_layout.addWidget(self.cmb_port, 1)
port_layout.addWidget(self.btn_refresh)
port_layout.addSpacing(20)
port_layout.addWidget(QtWidgets.QLabel("Baud rate:"))
port_layout.addWidget(self.cmb_baud, 0)
port_layout.addWidget(self.btn_open)
main_layout.addWidget(port_group)
# --- Variable Control Group ---
var_group = QtWidgets.QGroupBox("Watch Variable")
var_layout = QtWidgets.QGridLayout(var_group)
var_layout.setHorizontalSpacing(10)
var_layout.setVerticalSpacing(6)
self.spin_index = QtWidgets.QSpinBox()
self.spin_index.setRange(0, 0x7FFF)
self.spin_index.setAccelerated(True)
self.spin_index.valueChanged.connect(self._on_index_changed)
self.chk_hex_index = QtWidgets.QCheckBox("Hex")
self.chk_hex_index.stateChanged.connect(self._toggle_index_base)
self.btn_read_name = QtWidgets.QPushButton("Read Name")
self.btn_read_value = QtWidgets.QPushButton("Read Value")
self.btn_poll = QtWidgets.QPushButton("Start Polling")
self.spin_interval = QtWidgets.QSpinBox()
self.spin_interval.setRange(100, 5000)
self.spin_interval.setValue(500)
self.spin_interval.setSuffix(" ms")
self.edit_name = QtWidgets.QLineEdit()
self.edit_name.setReadOnly(True)
self.edit_value = QtWidgets.QLineEdit()
self.edit_value.setReadOnly(True)
self.lbl_iq = QtWidgets.QLabel("-")
self.chk_raw = QtWidgets.QCheckBox("Raw (no IQ format)")
var_layout.addWidget(QtWidgets.QLabel("Index:"), 0, 0)
var_layout.addWidget(self.spin_index, 0, 1)
var_layout.addWidget(self.chk_hex_index, 0, 2)
var_layout.addWidget(self.btn_read_name, 1, 0)
var_layout.addWidget(self.btn_read_value, 1, 1)
var_layout.addWidget(self.btn_poll, 1, 2)
var_layout.addWidget(QtWidgets.QLabel("Interval:"), 2, 0)
var_layout.addWidget(self.spin_interval, 2, 1)
var_layout.addWidget(QtWidgets.QLabel("Name:"), 3, 0)
var_layout.addWidget(self.edit_name, 3, 1, 1, 2)
var_layout.addWidget(QtWidgets.QLabel("Value:"), 4, 0)
var_layout.addWidget(self.edit_value, 4, 1, 1, 2)
var_layout.addWidget(QtWidgets.QLabel("IQ:"), 5, 0)
var_layout.addWidget(self.lbl_iq, 5, 1)
var_layout.addWidget(self.chk_raw, 5, 2)
main_layout.addWidget(var_group)
# --- Collapsible UART Log ---
self.log_spoiler = Spoiler("UART Log", animationDuration=300, parent=self)
log_layout = QtWidgets.QVBoxLayout()
self.txt_log = QtWidgets.QTextEdit()
self.txt_log.setReadOnly(True)
self.txt_log.setFontFamily("Courier")
log_layout.addWidget(self.txt_log)
self.log_spoiler.setContentLayout(log_layout)
main_layout.addWidget(self.log_spoiler, 1)
def _toggle_log_panel(self, checked):
if checked:
self.toggle_log_btn.setArrowType(QtCore.Qt.DownArrow)
self.log_panel.show()
else:
self.toggle_log_btn.setArrowType(QtCore.Qt.RightArrow)
self.log_panel.hide()
# ----------------------------------------------------------- Port mgmt ---
def set_available_ports(self):
"""Enumerate COM ports and repopulate combo box."""
current = self.cmb_port.currentText()
self.cmb_port.blockSignals(True)
self.cmb_port.clear()
for info in QtSerialPort.QSerialPortInfo.availablePorts():
self.cmb_port.addItem(info.portName())
if current:
ix = self.cmb_port.findText(current)
if ix >= 0:
self.cmb_port.setCurrentIndex(ix)
self.cmb_port.blockSignals(False)
def _open_close_port(self):
if self.serial.isOpen():
name = self.serial.portName()
self.serial.close()
self.btn_open.setText("Open")
self._log(f"[PORT] Closed {name}")
self.portClosed.emit(name)
return
port_name = self.cmb_port.currentText()
if not port_name:
self._log("[ERR] No port selected")
return
baud = int(self.cmb_baud.currentText())
self.serial.setPortName(port_name)
self.serial.setBaudRate(baud)
if not self.serial.open(QtCore.QIODevice.ReadWrite):
self._log(f"[ERR] Failed to open {port_name}: {self.serial.errorString()}")
return
self.btn_open.setText("Close")
self._log(f"[PORT] Opened {port_name} @ {baud}")
self.portOpened.emit(port_name)
# --------------------------------------------------------- Frame build ---
def _build_request(self, index: int, read_name: bool) -> bytes:
if read_name:
dbg = 0x8000 | (index & 0x7FFF)
else:
dbg = index & 0x7FFF
hi = (dbg >> 8) & 0xFF
lo = dbg & 0xFF
return bytes([self.start_byte & 0xFF, self.cmd_byte & 0xFF, hi, lo])
# ------------------------------- PUBLIC API (safe single outstanding) ---
def request_name(self):
self._issue_command(is_name=True)
def request_value(self):
self._issue_command(is_name=False)
def _issue_command(self, *, is_name: bool):
index = int(self.spin_index.value())
frame = self._build_request(index, is_name)
if self._busy:
if self._drop_if_busy:
self._log("[LOCKSTEP] Busy -> drop new request")
return
if self._replace_if_busy:
self._pending_cmd = (frame, is_name, index)
self._log("[LOCKSTEP] Busy -> replaced pending request")
else:
# queue disabled; ignore
self._log("[LOCKSTEP] Busy -> ignore (no replace)")
return
# idle -> send immediately
self._start_transaction(frame, is_name, index)
# ------------------------------------------------------ TXN lifecycle ---
def _start_transaction(self, frame: bytes, is_name: bool, index: int):
"""Mark busy, compute expected length, send frame, start timeout."""
self._busy = True
self._active_index = index
self._waiting_name = is_name
# Expected minimal len: hdr[4] + payload(name/val) + crc/trailer[4]
if is_name:
self._expected_min_len = 4 + self.name_field_len + 4
else:
self._expected_min_len = 4 + 2 + 4
self._expected_exact_len = self._expected_min_len # protocol fixed-size now
self._rx_buf.clear()
self._set_ui_busy(True)
self._send(frame)
self._txn_timer.start(self.read_timeout_ms)
def _end_transaction(self):
"""Common exit path after parse or timeout."""
self._txn_timer.stop()
self._busy = False
self._active_index = None
self._expected_min_len = 0
self._expected_exact_len = None
self._rx_buf.clear()
self._set_ui_busy(False)
# if we have pending -> fire it now
if self._pending_cmd is not None:
frame, is_name, index = self._pending_cmd
self._pending_cmd = None
# start immediately (no recursion issues; single-shot via singleShot)
QtCore.QTimer.singleShot(0, lambda f=frame, n=is_name, i=index: self._start_transaction(f, n, i))
def _on_txn_timeout(self):
if not self._busy:
return
self._log("[TIMEOUT] Response not received in time; aborting transaction")
# log any garbage that came in
if self._rx_buf:
self._log_frame(bytes(self._rx_buf), tx=False)
self._end_transaction()
# --------------------------------------------------------------- TX/RX ---
def _send(self, data: bytes):
n = self.serial.write(data)
if n != len(data):
self._log(f"[ERR] Write incomplete: {n}/{len(data)}")
self.txBytes.emit(data)
self._log_frame(data, tx=True)
def _on_ready_read(self):
if not self._busy:
# unexpected data while idle -> just log & drop
chunk = self.serial.readAll().data()
if chunk:
self._log("[WARN] RX while idle -> ignored")
self._log_frame(chunk, tx=False)
return
self._rx_buf.extend(self.serial.readAll().data())
# If exact length known and reached -> parse immediately (no wait for timeout)
if self._expected_exact_len is not None and len(self._rx_buf) >= self._expected_exact_len:
frame = bytes(self._rx_buf[:self._expected_exact_len])
# log rx; if extra bytes remain we'll keep them for next txn (unlikely)
self.rxBytes.emit(frame)
self._log_frame(frame, tx=False)
self._parse_response(frame)
# discard everything consumed
del self._rx_buf[:self._expected_exact_len]
if self._rx_buf:
self._log("[WARN] Extra RX bytes after frame -> stash for next txn")
self._end_transaction()
return
# If only min len known: check >= min -> try parse; else keep waiting (timer still running)
if self._expected_min_len and len(self._rx_buf) >= self._expected_min_len:
frame = bytes(self._rx_buf)
self.rxBytes.emit(frame)
self._log_frame(frame, tx=False)
self._parse_response(frame)
self._end_transaction()
return
# else: wait for more data or timeout
def _on_serial_error(self, err):
if err == QtSerialPort.QSerialPort.NoError:
return
self._log(f"[SERIAL ERR] {self.serial.errorString()} ({err})")
# treat as txn failure if busy
if self._busy:
self._end_transaction()
# ------------------------------------------------------------- Parsing ---
def _parse_response(self, frame: bytes):
# basic length check
if len(frame) < 8: # minimal structure
self._log("[ERR] Frame too short")
return
# trailer: crcLo crcHi 0 0
if len(frame) < 4:
return # can't parse yet
crc_lo = frame[-4]
crc_hi = frame[-3]
crc_rx = (crc_hi << 8) | crc_lo
z1 = frame[-2]
z2 = frame[-1]
if z1 != 0 or z2 != 0:
self._log("[WARN] Frame trailer not 0,0")
payload = frame[:-4]
if self.auto_crc_check:
crc_calc = crc16_ibm(payload)
if crc_calc != crc_rx:
self._log(f"[CRC FAIL] calc=0x{crc_calc:04X} rx=0x{crc_rx:04X}")
else:
self._log("[CRC OK]")
# header fields
addr = payload[0]
cmd = payload[1]
status = payload[2]
iq = payload[3]
if cmd != self.cmd_byte:
self._log(f"[WARN] Unexpected cmd 0x{cmd:02X}")
index = self._active_index if self._active_index is not None else self.spin_index.value()
if self._waiting_name:
name_bytes = payload[4:4 + self.name_field_len]
# stop at first NUL
nul = name_bytes.find(b"\x00")
if nul >= 0:
name_str = name_bytes[:nul].decode(errors="replace")
else:
name_str = name_bytes.decode(errors="replace")
self.edit_name.setText(name_str)
self.lbl_iq.setText(str(iq))
self.nameRead.emit(index, status, iq, name_str)
else:
# Чтение значения
if status != 0:
# Ошибка чтения переменной — считаем её недействительной
self.edit_value.setText("INVALID")
self.edit_value.setStyleSheet("color: red; font-weight: bold;")
self.lbl_iq.setText("-")
self.valueRead.emit(index, status, iq, 0, float('nan'))
self._log(f"[ERR] Variable at index {index} invalid, status={status}")
return
raw_lo = payload[4] if len(payload) > 4 else 0
raw_hi = payload[5] if len(payload) > 5 else 0
raw16 = (raw_hi << 8) | raw_lo
if self.signed and (raw16 & 0x8000):
raw_signed = raw16 - 0x10000
else:
raw_signed = raw16
if self.chk_raw.isChecked():
disp = str(raw_signed)
float_val = float(raw_signed)
else:
scale = self.iq_scaling.get(iq, 1.0)
float_val = raw_signed / scale
disp = f"{float_val:.6g}" # compact
self.edit_value.setText(disp)
self.lbl_iq.setText(str(iq))
self.valueRead.emit(index, status, iq, raw_signed, float_val)
# -------------------------------------------------------------- Helpers ---
def _toggle_index_base(self, state):
val = self.spin_index.value()
if state == QtCore.Qt.Checked:
self.spin_index.setDisplayIntegerBase(16)
self.spin_index.setPrefix("0x")
self.spin_index.setValue(val) # refresh display
else:
self.spin_index.setDisplayIntegerBase(10)
self.spin_index.setPrefix("")
self.spin_index.setValue(val)
def _on_index_changed(self, new_index: int):
if self._polling:
self._index_change_timer.start(self._index_change_delay_ms)
def _on_index_change_timeout(self):
# Здесь запускаем запрос имени или значения по новому индексу
if self._polling:
# если включён polling — можно просто перезапустить опрос с новым индексом
# например:
self._restart_polling_cycle()
def _restart_polling_cycle(self):
# Прервать текущую транзакцию (если есть)
if self._busy:
# Если занято — запустить таймер, который через немного проверит снова
# Можно, например, использовать QTimer.singleShot (если PyQt/PySide)
QTimer.singleShot(10, self._restart_polling_cycle) # через 100 мс повторить попытку
return
# можно отправить запрос имени, если нужно
self.request_name()
# Запустить следующий запрос
self._on_poll_timeout()
def _set_polling_ui(self, polling: bool):
# Если polling == True -> блокируем кнопки Read/Write, иначе разблокируем
self.btn_read_name.setDisabled(polling)
self.btn_read_value.setDisabled(polling)
def _toggle_polling(self):
if self._polling:
self._poll_timer.stop()
self._polling = False
self.btn_poll.setText("Start Polling")
self._set_polling_ui(False)
self._log("[POLL] Stopped")
else:
interval = self.spin_interval.value()
self._poll_timer.start(interval)
self._polling = True
self.btn_poll.setText("Stop Polling")
self._set_polling_ui(True)
self._log(f"[POLL] Started, interval {interval} ms")
def _on_poll_timeout(self):
self._poll_once()
def _poll_once(self):
if self._polling and self.serial.isOpen() and not self._busy:
self.request_value()
# если busy -> просто пропускаем тик; не ставим pending, чтобы не накапливать очередь
def _set_ui_busy(self, busy: bool):
'''self.btn_read_name.setEnabled(not busy)
self.btn_read_value.setEnabled(not busy)
# Не запрещаем Stop Polling, иначе нельзя прервать зависший запрос
self.spin_index.setEnabled(not busy)'''
def _log(self, msg: str):
ts = datetime.datetime.now().strftime("%H:%M:%S.%f")[:-3]
self.txt_log.append(f"{ts} {msg}")
def _log_frame(self, data: bytes, *, tx: bool):
dir_tag = "TX" if tx else "RX"
hex_bytes = ' '.join(f"{b:02X}" for b in data)
# ascii printable map
ascii_bytes = ''.join(chr(b) if 32 <= b < 127 else '.' for b in data)
self._log(f"[{dir_tag}] {hex_bytes} |{ascii_bytes}|")
# ---------------------------------------------------------- Demo harness ---
class _DemoWindow(QtWidgets.QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle("DebugVar Terminal")
self.term = DebugTerminalWidget(self)
self.setCentralWidget(self.term)
# connect sample signals -> print
self.term.nameRead.connect(self._on_name)
self.term.valueRead.connect(self._on_value)
def _on_name(self, index, status, iq, name):
print(f"Name idx={index} status={status} iq={iq} name='{name}'")
def _on_value(self, index, status, iq, raw16, floatVal):
print(f"Value idx={index} status={status} iq={iq} raw={raw16} val={floatVal}")
# ----------------------------------------------------------------- main ---
if __name__ == "__main__":
app = QtWidgets.QApplication(sys.argv)
win = _DemoWindow()
win.show()
sys.exit(app.exec_())