500 lines
19 KiB
Python
500 lines
19 KiB
Python
"""
|
||
PySide2 Serial Debug Terminal Widget
|
||
=====================================
|
||
|
||
A small, embeddable terminal-style widget for talking to your MCU debug monitor
|
||
via the simple protocol you described:
|
||
|
||
Request (host -> target):
|
||
0x0A 0x44 <DbgNumbHigh> <DbgNumbLow>
|
||
* In <DbgNumbHigh>, the MSB (bit15 of the 16‑bit dbg number) selects read type:
|
||
0 = read value, 1 = read name.
|
||
* Remaining 15 bits = variable index (0..0x7FFF).
|
||
|
||
Response (target -> host): built per your WatchVar() C function. Layout:
|
||
[0] addr_recive (target address / echo)
|
||
[1] CMD_RS232_WATCH (command echo, e.g., 0x44)
|
||
[2] status (0x00 = OK, 0xFF = error; else vendor‑defined)
|
||
[3] IQType (enum: 0=int, 1=IQ1, 2=IQ2, ...)
|
||
if ReadName == 1:
|
||
[4..N] DebugVarName_t raw bytes (fixed length, default 32, configurable)
|
||
else (ReadName == 0):
|
||
[4] dataLo (LSB of 16‑bit value)
|
||
[5] dataHi (MSB of 16‑bit value)
|
||
[...next] crcLo crcHi 0x00 0x00 (CRC16‑IBM over the bytes preceding CRC)
|
||
|
||
NOTE: The C code shows a 32‑bit signed Data but only transmits the low 16 bits.
|
||
This widget decodes the 16‑bit word and sign‑extends to 32 when converting IQ,
|
||
keeping behavior close to the firmware sample. Adjust in subclass if needed.
|
||
|
||
Features
|
||
--------
|
||
- COM port selection (auto‑populate) & open/close.
|
||
- Variable index entry (0..0x7FFF) via spin box (hex display optional).
|
||
- Buttons: "Read Name", "Read Value".
|
||
- Raw vs Formatted output checkbox: when unchecked, IQ scaling -> float.
|
||
- Displays: Name, Value, IQ type.
|
||
- UART log window (timestamped hex + ASCII, TX/RX tagged).
|
||
- CRC16‑IBM check; bad CRC flagged in log.
|
||
- Signals for integration: nameRead(index, status, iq, nameStr), valueRead(index, status, iq, raw16, floatVal).
|
||
- Non‑blocking I/O via QSerialPort (QtSerialPort) — *preferred* since you are on Windows and already in Qt/PySide2. No extra threads needed.
|
||
(If you strongly prefer pyserial, see commented alt implementation at bottom.)
|
||
|
||
Tested Python target: 3.7 w/ PySide2.
|
||
|
||
Integration
|
||
-----------
|
||
Instantiate `DebugTerminalWidget(parent=None)` and embed into your main window or layout.
|
||
Call `set_available_ports()` periodically or on refresh to re‑enumerate COM ports.
|
||
|
||
Customization knobs (constructor args):
|
||
- cmd_byte: defaults to 0x44 (as in your request).
|
||
- start_byte: defaults to 0x0A.
|
||
- name_field_len: bytes to read when name requested (default 32; set to len(DebugVarName_t)).
|
||
- iq_scaling: dict mapping iq enum -> divisor (float). default: {0:1.0,1:2.0,2:4.0,3:8.0,... up to 15:2**n}.
|
||
- signed: treat 16‑bit value as signed (default True). If False, unsigned.
|
||
|
||
Limitations / TODO hooks
|
||
------------------------
|
||
- If firmware later sends full 32‑bit Data, override `_parse_value_payload()`.
|
||
- Multi‑frame or streaming modes not supported (only single WatchVar replies).
|
||
- Timeout & retry basic; expand as needed.
|
||
|
||
"""
|
||
|
||
import sys
|
||
import struct
|
||
import datetime
|
||
from PySide2 import QtCore, QtWidgets, QtSerialPort
|
||
from collections import deque
|
||
|
||
class DebugTerminalWidget(QtWidgets.QWidget):
|
||
nameRead = QtCore.Signal(int, int, int, str) # index, status, iq, name
|
||
valueRead = QtCore.Signal(int, int, int, int, float) # index, status, iq, raw16, floatVal
|
||
portOpened = QtCore.Signal(str)
|
||
portClosed = QtCore.Signal(str)
|
||
txBytes = QtCore.Signal(bytes) # raw bytes sent
|
||
rxBytes = QtCore.Signal(bytes) # raw bytes received (frame only)
|
||
|
||
def __init__(self, parent=None, *,
|
||
start_byte=0x0A,
|
||
cmd_byte=0x44,
|
||
name_field_len=11,
|
||
signed=True,
|
||
iq_scaling=None,
|
||
read_timeout_ms=200,
|
||
auto_crc_check=True):
|
||
super().__init__(parent)
|
||
self.start_byte = start_byte
|
||
self.cmd_byte = cmd_byte
|
||
self.name_field_len = name_field_len
|
||
self.signed = signed
|
||
self.read_timeout_ms = read_timeout_ms
|
||
self.auto_crc_check = auto_crc_check
|
||
self._busy = False
|
||
self._queue = deque()
|
||
|
||
if iq_scaling is None:
|
||
iq_scaling = {n: float(1 << n) for n in range(16)}
|
||
iq_scaling[0] = 1.0
|
||
self.iq_scaling = iq_scaling
|
||
|
||
# Serial port
|
||
self.serial = QtSerialPort.QSerialPort(self)
|
||
self.serial.setBaudRate(115200)
|
||
self.serial.readyRead.connect(self._on_ready_read)
|
||
self.serial.errorOccurred.connect(self._on_serial_error)
|
||
|
||
self._rx_buf = bytearray()
|
||
self._waiting_name = False
|
||
self._expected_min_len = 0
|
||
|
||
# Timer for polling
|
||
self._poll_timer = QtCore.QTimer(self)
|
||
self._poll_timer.timeout.connect(self._on_poll_timeout)
|
||
self._polling = False
|
||
|
||
self._build_ui()
|
||
|
||
self.btn_open.clicked.connect(self._open_close_port)
|
||
self.btn_refresh.clicked.connect(self.set_available_ports)
|
||
self.btn_read_name.clicked.connect(self.request_name)
|
||
self.btn_read_value.clicked.connect(self.request_value)
|
||
self.btn_poll.clicked.connect(self._toggle_polling)
|
||
|
||
self.set_available_ports()
|
||
|
||
# ------------------------------------------------------------------ UI ---
|
||
def _build_ui(self):
|
||
main_layout = QtWidgets.QVBoxLayout(self)
|
||
main_layout.setContentsMargins(10, 10, 10, 10)
|
||
main_layout.setSpacing(12)
|
||
|
||
# --- Serial Port Group ---
|
||
port_group = QtWidgets.QGroupBox("Serial Port")
|
||
port_layout = QtWidgets.QHBoxLayout(port_group)
|
||
|
||
self.cmb_port = QtWidgets.QComboBox()
|
||
self.btn_refresh = QtWidgets.QPushButton("Refresh")
|
||
self.cmb_baud = QtWidgets.QComboBox()
|
||
self.cmb_baud.addItems(["9600", "19200", "38400", "57600", "115200", "230400"])
|
||
self.cmb_baud.setCurrentText("115200")
|
||
self.btn_open = QtWidgets.QPushButton("Open")
|
||
|
||
port_layout.addWidget(QtWidgets.QLabel("Port:"))
|
||
port_layout.addWidget(self.cmb_port, 1)
|
||
port_layout.addWidget(self.btn_refresh)
|
||
port_layout.addSpacing(20)
|
||
port_layout.addWidget(QtWidgets.QLabel("Baud rate:"))
|
||
port_layout.addWidget(self.cmb_baud, 0)
|
||
port_layout.addWidget(self.btn_open)
|
||
|
||
main_layout.addWidget(port_group)
|
||
|
||
# --- Variable Control Group ---
|
||
var_group = QtWidgets.QGroupBox("Watch Variable")
|
||
var_layout = QtWidgets.QGridLayout(var_group)
|
||
var_layout.setHorizontalSpacing(10)
|
||
var_layout.setVerticalSpacing(6)
|
||
|
||
self.spin_index = QtWidgets.QSpinBox()
|
||
self.spin_index.setRange(0, 0x7FFF)
|
||
self.spin_index.setAccelerated(True)
|
||
self.spin_index.valueChanged.connect(self._on_index_changed)
|
||
|
||
self.chk_hex_index = QtWidgets.QCheckBox("Hex")
|
||
self.chk_hex_index.stateChanged.connect(self._toggle_index_base)
|
||
|
||
self.btn_read_name = QtWidgets.QPushButton("Read Name")
|
||
self.btn_read_value = QtWidgets.QPushButton("Read Value")
|
||
self.btn_poll = QtWidgets.QPushButton("Start Polling")
|
||
|
||
self.spin_interval = QtWidgets.QSpinBox()
|
||
self.spin_interval.setRange(100, 5000)
|
||
self.spin_interval.setValue(500)
|
||
self.spin_interval.setSuffix(" ms")
|
||
|
||
self.edit_name = QtWidgets.QLineEdit()
|
||
self.edit_name.setReadOnly(True)
|
||
|
||
self.edit_value = QtWidgets.QLineEdit()
|
||
self.edit_value.setReadOnly(True)
|
||
|
||
self.lbl_iq = QtWidgets.QLabel("-")
|
||
self.chk_raw = QtWidgets.QCheckBox("Raw (no IQ format)")
|
||
|
||
var_layout.addWidget(QtWidgets.QLabel("Index:"), 0, 0)
|
||
var_layout.addWidget(self.spin_index, 0, 1)
|
||
var_layout.addWidget(self.chk_hex_index, 0, 2)
|
||
|
||
var_layout.addWidget(self.btn_read_name, 1, 0)
|
||
var_layout.addWidget(self.btn_read_value, 1, 1)
|
||
var_layout.addWidget(self.btn_poll, 1, 2)
|
||
|
||
var_layout.addWidget(QtWidgets.QLabel("Interval:"), 2, 0)
|
||
var_layout.addWidget(self.spin_interval, 2, 1)
|
||
|
||
var_layout.addWidget(QtWidgets.QLabel("Name:"), 3, 0)
|
||
var_layout.addWidget(self.edit_name, 3, 1, 1, 2)
|
||
|
||
var_layout.addWidget(QtWidgets.QLabel("Value:"), 4, 0)
|
||
var_layout.addWidget(self.edit_value, 4, 1, 1, 2)
|
||
|
||
var_layout.addWidget(QtWidgets.QLabel("IQ:"), 5, 0)
|
||
var_layout.addWidget(self.lbl_iq, 5, 1)
|
||
var_layout.addWidget(self.chk_raw, 5, 2)
|
||
|
||
main_layout.addWidget(var_group)
|
||
|
||
# --- UART Log Group ---
|
||
log_group = QtWidgets.QGroupBox("UART Log")
|
||
log_layout = QtWidgets.QVBoxLayout(log_group)
|
||
|
||
self.txt_log = QtWidgets.QTextEdit()
|
||
self.txt_log.setReadOnly(True)
|
||
self.txt_log.setFontFamily("Courier")
|
||
log_layout.addWidget(self.txt_log)
|
||
|
||
main_layout.addWidget(log_group, 1)
|
||
|
||
|
||
# ----------------------------------------------------------- Port mgmt ---
|
||
def set_available_ports(self):
|
||
"""Enumerate COM ports and repopulate combo box."""
|
||
current = self.cmb_port.currentText()
|
||
self.cmb_port.blockSignals(True)
|
||
self.cmb_port.clear()
|
||
for info in QtSerialPort.QSerialPortInfo.availablePorts():
|
||
self.cmb_port.addItem(info.portName())
|
||
if current:
|
||
ix = self.cmb_port.findText(current)
|
||
if ix >= 0:
|
||
self.cmb_port.setCurrentIndex(ix)
|
||
self.cmb_port.blockSignals(False)
|
||
|
||
def _open_close_port(self):
|
||
if self.serial.isOpen():
|
||
name = self.serial.portName()
|
||
self.serial.close()
|
||
self.btn_open.setText("Open")
|
||
self._log(f"[PORT] Closed {name}")
|
||
self.portClosed.emit(name)
|
||
return
|
||
|
||
port_name = self.cmb_port.currentText()
|
||
if not port_name:
|
||
self._log("[ERR] No port selected")
|
||
return
|
||
|
||
baud = int(self.cmb_baud.currentText())
|
||
self.serial.setPortName(port_name)
|
||
self.serial.setBaudRate(baud)
|
||
if not self.serial.open(QtCore.QIODevice.ReadWrite):
|
||
self._log(f"[ERR] Failed to open {port_name}: {self.serial.errorString()}")
|
||
return
|
||
self.btn_open.setText("Close")
|
||
self._log(f"[PORT] Opened {port_name} @ {baud}")
|
||
self.portOpened.emit(port_name)
|
||
|
||
# --------------------------------------------------------- Frame build ---
|
||
def _build_request(self, index: int, read_name: bool) -> bytes:
|
||
if read_name:
|
||
dbg = 0x8000 | (index & 0x7FFF)
|
||
else:
|
||
dbg = index & 0x7FFF
|
||
hi = (dbg >> 8) & 0xFF
|
||
lo = dbg & 0xFF
|
||
return bytes([self.start_byte & 0xFF, self.cmd_byte & 0xFF, hi, lo])
|
||
|
||
def request_name(self):
|
||
index = int(self.spin_index.value())
|
||
frame = self._build_request(index, True)
|
||
self._enqueue_command(frame, is_name=True)
|
||
|
||
def request_value(self):
|
||
index = int(self.spin_index.value())
|
||
frame = self._build_request(index, False)
|
||
self._enqueue_command(frame, is_name=False)
|
||
|
||
def _enqueue_command(self, frame: bytes, is_name: bool):
|
||
self._queue.append((frame, is_name))
|
||
if not self._busy:
|
||
self._process_next_command()
|
||
|
||
def _process_next_command(self):
|
||
if not self._queue:
|
||
return
|
||
frame, is_name = self._queue.popleft()
|
||
self._busy = True
|
||
self._waiting_name = is_name
|
||
if is_name:
|
||
self._expected_min_len = 4 + self.name_field_len + 4
|
||
else:
|
||
self._expected_min_len = 4 + 2 + 4
|
||
self._send(frame)
|
||
|
||
def _finish_command(self):
|
||
self._busy = False
|
||
self._process_next_command()
|
||
# --------------------------------------------------------------- TX/RX ---
|
||
def _send(self, data: bytes):
|
||
n = self.serial.write(data)
|
||
if n != len(data):
|
||
self._log(f"[ERR] Write incomplete: {n}/{len(data)}")
|
||
self.txBytes.emit(data)
|
||
self._log_frame(data, tx=True)
|
||
# start a timeout timer to clear buffer if no response
|
||
QtCore.QTimer.singleShot(self.read_timeout_ms, self._check_timeout)
|
||
|
||
def _check_timeout(self):
|
||
self._busy = False
|
||
data = self.serial.readAll()
|
||
chunk = bytes(data)
|
||
if chunk:
|
||
self._log_frame(chunk, tx=False) # <-- логируем каждую порцию
|
||
self._rx_buf.extend(chunk)
|
||
if self._expected_min_len and len(self._rx_buf) >= self._expected_min_len:
|
||
self._log("[TIMEOUT] No complete response")
|
||
self._rx_buf.clear()
|
||
self._expected_min_len = 0
|
||
self._finish_command()
|
||
|
||
def _on_ready_read(self):
|
||
self._rx_buf.extend(self.serial.readAll().data())
|
||
# if we know the minimum expected length, test
|
||
if self._expected_min_len and len(self._rx_buf) >= self._expected_min_len:
|
||
# parse frame
|
||
frame = bytes(self._rx_buf)
|
||
self._rx_buf.clear()
|
||
self._expected_min_len = 0
|
||
self.rxBytes.emit(frame)
|
||
self._log_frame(frame, tx=False)
|
||
self._parse_response(frame)
|
||
|
||
self._finish_command()
|
||
|
||
def _on_serial_error(self, err):
|
||
if err == QtSerialPort.QSerialPort.NoError:
|
||
return
|
||
self._log(f"[SERIAL ERR] {self.serial.errorString()} ({err})")
|
||
|
||
# ------------------------------------------------------------- Parsing ---
|
||
def _parse_response(self, frame: bytes):
|
||
# basic length check
|
||
if len(frame) < 8: # minimal structure
|
||
self._log("[ERR] Frame too short")
|
||
return
|
||
|
||
# trailer: crcLo crcHi 0 0
|
||
if len(frame) < 4:
|
||
return # can't parse yet
|
||
crc_lo = frame[-4]
|
||
crc_hi = frame[-3]
|
||
crc_rx = (crc_hi << 8) | crc_lo
|
||
z1 = frame[-2]
|
||
z2 = frame[-1]
|
||
if z1 != 0 or z2 != 0:
|
||
self._log("[WARN] Frame trailer not 0,0")
|
||
|
||
payload = frame[:-4]
|
||
if self.auto_crc_check:
|
||
crc_calc = crc16_ibm(payload)
|
||
if crc_calc != crc_rx:
|
||
self._log(f"[CRC FAIL] calc=0x{crc_calc:04X} rx=0x{crc_rx:04X}")
|
||
else:
|
||
self._log("[CRC OK]")
|
||
|
||
# header fields
|
||
addr = payload[0]
|
||
cmd = payload[1]
|
||
status = payload[2]
|
||
iq = payload[3]
|
||
|
||
if cmd != self.cmd_byte:
|
||
self._log(f"[WARN] Unexpected cmd 0x{cmd:02X}")
|
||
|
||
if self._waiting_name:
|
||
name_bytes = payload[4:4 + self.name_field_len]
|
||
# stop at first NUL
|
||
nul = name_bytes.find(b"\x00")
|
||
if nul >= 0:
|
||
name_str = name_bytes[:nul].decode(errors="replace")
|
||
else:
|
||
name_str = name_bytes.decode(errors="replace")
|
||
self.edit_name.setText(name_str)
|
||
self.lbl_iq.setText(str(iq))
|
||
self.nameRead.emit(self.spin_index.value(), status, iq, name_str)
|
||
else:
|
||
raw_lo = payload[4] if len(payload) > 4 else 0
|
||
raw_hi = payload[5] if len(payload) > 5 else 0
|
||
raw16 = (raw_hi << 8) | raw_lo
|
||
if self.signed and raw16 & 0x8000:
|
||
raw_signed = raw16 - 0x10000
|
||
else:
|
||
raw_signed = raw16
|
||
if self.chk_raw.isChecked():
|
||
disp = str(raw_signed)
|
||
float_val = float(raw_signed)
|
||
else:
|
||
scale = self.iq_scaling.get(iq, 1.0)
|
||
float_val = raw_signed / scale
|
||
disp = f"{float_val:.6g}" # compact
|
||
self.edit_value.setText(disp)
|
||
self.lbl_iq.setText(str(iq))
|
||
self.valueRead.emit(self.spin_index.value(), status, iq, raw_signed, float_val)
|
||
|
||
# -------------------------------------------------------------- Helpers ---
|
||
def _toggle_index_base(self, state):
|
||
val = self.spin_index.value()
|
||
if state == QtCore.Qt.Checked:
|
||
self.spin_index.setDisplayIntegerBase(16)
|
||
self.spin_index.setPrefix("0x")
|
||
self.spin_index.setValue(val) # refresh display
|
||
else:
|
||
self.spin_index.setDisplayIntegerBase(10)
|
||
self.spin_index.setPrefix("")
|
||
self.spin_index.setValue(val)
|
||
|
||
def _on_index_changed(self, new_index: int):
|
||
if self._polling:
|
||
# В режиме polling при изменении индекса автоматически запрашиваем имя
|
||
self.request_name()
|
||
|
||
def _toggle_polling(self):
|
||
if self._polling:
|
||
self._poll_timer.stop()
|
||
self._polling = False
|
||
self.btn_poll.setText("Start Polling")
|
||
self._log("[POLL] Polling stopped")
|
||
else:
|
||
if not self.serial.isOpen():
|
||
self._log("[WARN] Port not open. Cannot start polling.")
|
||
return
|
||
interval = self.spin_interval.value()
|
||
self._poll_timer.start(interval)
|
||
self._polling = True
|
||
self.btn_poll.setText("Stop Polling")
|
||
self._log(f"[POLL] Polling started with interval {interval} ms")
|
||
self.request_name()
|
||
self._poll_once() # immediate first poll
|
||
|
||
def _on_poll_timeout(self):
|
||
self._poll_once()
|
||
|
||
def _poll_once(self):
|
||
if self._polling and self.serial.isOpen():
|
||
self.request_value()
|
||
|
||
def _log(self, msg: str):
|
||
ts = datetime.datetime.now().strftime("%H:%M:%S.%f")[:-3]
|
||
self.txt_log.append(f"{ts} {msg}")
|
||
|
||
def _log_frame(self, data: bytes, *, tx: bool):
|
||
dir_tag = "TX" if tx else "RX"
|
||
hex_bytes = ' '.join(f"{b:02X}" for b in data)
|
||
# ascii printable map
|
||
ascii_bytes = ''.join(chr(b) if 32 <= b < 127 else '.' for b in data)
|
||
self._log(f"[{dir_tag}] {hex_bytes} |{ascii_bytes}|")
|
||
|
||
|
||
# ---------------------------------------------------------------- CRC util ---
|
||
def crc16_ibm(data: bytes, *, init=0xFFFF) -> int:
|
||
"""CRC16-IBM (aka CRC-16/ANSI, polynomial 0xA001 reflected)."""
|
||
crc = init
|
||
for b in data:
|
||
crc ^= b
|
||
for _ in range(8):
|
||
if crc & 1:
|
||
crc = (crc >> 1) ^ 0xA001
|
||
else:
|
||
crc >>= 1
|
||
return crc & 0xFFFF
|
||
|
||
|
||
# ---------------------------------------------------------- Demo harness ---
|
||
class _DemoWindow(QtWidgets.QMainWindow):
|
||
def __init__(self):
|
||
super().__init__()
|
||
self.setWindowTitle("Debug Terminal Demo")
|
||
self.term = DebugTerminalWidget(self)
|
||
self.setCentralWidget(self.term)
|
||
# connect sample signals -> print
|
||
self.term.nameRead.connect(self._on_name)
|
||
self.term.valueRead.connect(self._on_value)
|
||
|
||
def _on_name(self, index, status, iq, name):
|
||
print(f"Name idx={index} status={status} iq={iq} name='{name}'")
|
||
|
||
def _on_value(self, index, status, iq, raw16, floatVal):
|
||
print(f"Value idx={index} status={status} iq={iq} raw={raw16} val={floatVal}")
|
||
|
||
|
||
# ----------------------------------------------------------------- main ---
|
||
if __name__ == "__main__":
|
||
app = QtWidgets.QApplication(sys.argv)
|
||
win = _DemoWindow()
|
||
win.resize(600, 500)
|
||
win.show()
|
||
sys.exit(app.exec_())
|
||
|