debugVarTool/Src/tms_debugvar_term.py
Razvalyaev f2c4b7b3cd структурирован код debug_tools
доработана демо-терминалка для считывания tms переменных и встроена в DebugVarEdit
2025-07-19 10:56:46 +03:00

729 lines
31 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from PySide2 import QtCore, QtWidgets, QtSerialPort
import datetime
# ------------------------------- Константы протокола ------------------------
WATCH_SERVICE_BIT = 0x8000
DEBUG_OK = 0 # ожидаемый код успешного чтения
SIGN_BIT_MASK = 0x80
FRAC_MASK_FULL = 0x7F # если используем 7 бит дробной части
# ---------------------------------------------------------------- CRC util ---
def crc16_ibm(data: bytes, *, init=0xFFFF) -> int:
"""CRC16-IBM (aka CRC-16/ANSI, polynomial 0xA001 reflected)."""
crc = init
for b in data:
crc ^= b
for _ in range(8):
if crc & 1:
crc = (crc >> 1) ^ 0xA001
else:
crc >>= 1
return crc & 0xFFFF
class Spoiler(QtWidgets.QWidget):
def __init__(self, title="", animationDuration=300, parent=None):
super().__init__(parent)
self._animationDuration = animationDuration
# --- Toggle button ---
self.toggleButton = QtWidgets.QToolButton(self)
self.toggleButton.setStyleSheet("QToolButton { border: none; }")
self.toggleButton.setToolButtonStyle(QtCore.Qt.ToolButtonTextBesideIcon)
self.toggleButton.setArrowType(QtCore.Qt.RightArrow)
self.toggleButton.setText(title)
self.toggleButton.setCheckable(True)
# --- Header line ---
self.headerLine = QtWidgets.QFrame(self)
self.headerLine.setFrameShape(QtWidgets.QFrame.HLine)
self.headerLine.setFrameShadow(QtWidgets.QFrame.Sunken)
# --- Content area ---
self.contentArea = QtWidgets.QScrollArea(self)
self.contentArea.setSizePolicy(QtWidgets.QSizePolicy.Expanding, QtWidgets.QSizePolicy.Fixed)
self.contentArea.setFrameShape(QtWidgets.QFrame.NoFrame)
self.contentArea.setWidgetResizable(True)
self._contentWidget = QtWidgets.QWidget()
self.contentArea.setWidget(self._contentWidget)
self.contentArea.setMaximumHeight(0)
# --- Анимация только по контенту ---
self._ani_content = QtCore.QPropertyAnimation(self.contentArea, b"maximumHeight")
self._ani_content.setDuration(animationDuration)
self._ani_content.setEasingCurve(QtCore.QEasingCurve.InOutCubic)
# Следим за шагами анимации → обновляем родителя
self._ani_content.valueChanged.connect(self._adjust_parent_size)
# --- Layout ---
self.mainLayout = QtWidgets.QGridLayout(self)
self.mainLayout.setVerticalSpacing(0)
self.mainLayout.setContentsMargins(0, 0, 0, 0)
self.mainLayout.addWidget(self.toggleButton, 0, 0, 1, 1)
self.mainLayout.addWidget(self.headerLine, 0, 1, 1, 1)
self.mainLayout.addWidget(self.contentArea, 1, 0, 1, 2)
# --- Signals ---
self.toggleButton.clicked.connect(self._on_toggled)
def setContentLayout(self, contentLayout):
old = self._contentWidget.layout()
if old:
QtWidgets.QWidget().setLayout(old)
self._contentWidget.setLayout(contentLayout)
def _adjust_parent_size(self, *_):
top = self.window()
if top:
size = top.size()
size.setHeight(top.sizeHint().height()) # берём новую высоту
top.resize(size) # ширина остаётся прежней
def _on_toggled(self, checked: bool):
self.toggleButton.setArrowType(QtCore.Qt.DownArrow if checked else QtCore.Qt.RightArrow)
contentHeight = self._contentWidget.sizeHint().height()
self._ani_content.stop()
self._ani_content.setStartValue(self.contentArea.maximumHeight())
self._ani_content.setEndValue(contentHeight if checked else 0)
# --- Фиксируем ширину на время анимации ---
w = self.width()
self.setFixedWidth(w)
self._ani_content.finished.connect(lambda: self.setMaximumWidth(16777215)) # сброс фикса
self._ani_content.start()
# --------------------------- DebugTerminalWidget ---------------------------
class DebugTerminalWidget(QtWidgets.QWidget):
nameRead = QtCore.Signal(int, int, int, str) # index, status, iq, name
valueRead = QtCore.Signal(int, int, int, int, float) # для одиночного
valuesRead = QtCore.Signal(int, int, list, list, list, list) # base, count, idx_list, iq_list, raw_list, float_list
portOpened = QtCore.Signal(str)
portClosed = QtCore.Signal(str)
txBytes = QtCore.Signal(bytes)
rxBytes = QtCore.Signal(bytes)
def __init__(self, parent=None, *,
start_byte=0x0A,
cmd_byte=0x44,
iq_scaling=None,
read_timeout_ms=250,
auto_crc_check=True,
drop_if_busy=False,
replace_if_busy=True):
super().__init__(parent)
self.device_addr = start_byte
self.cmd_byte = cmd_byte
self.read_timeout_ms = read_timeout_ms
self.auto_crc_check = auto_crc_check
self._drop_if_busy = drop_if_busy
self._replace_if_busy = replace_if_busy
if iq_scaling is None:
iq_scaling = {n: float(1 << n) for n in range(16)}
iq_scaling[0] = 1.0
self.iq_scaling = iq_scaling
# Serial
self.serial = QtSerialPort.QSerialPort(self)
self.serial.setBaudRate(115200)
self.serial.readyRead.connect(self._on_ready_read)
self.serial.errorOccurred.connect(self._on_serial_error)
# State
self._rx_buf = bytearray()
self._busy = False
self._pending_cmd = None # (frame, meta)
self._txn_meta = None # {'service':bool,'index':int,'varqnt':int,'chain':...}
self._txn_timer = QtCore.QTimer(self)
self._txn_timer.setSingleShot(True)
self._txn_timer.timeout.connect(self._on_txn_timeout)
self._poll_timer = QtCore.QTimer(self)
self._poll_timer.timeout.connect(self._on_poll_timeout)
self._polling = False
# Кэш: index -> (status, iq, name)
self._name_cache = {}
# Очередь требуемых service индексов перед чтением блока
self._service_queue = [] # список индексов
self._pending_data_after_services = None # (base, count)
self._build_ui()
self._connect_ui()
self.set_available_ports()
# ------------------------------ UI ----------------------------------
def _build_ui(self):
layout = QtWidgets.QVBoxLayout(self)
# --- Serial group ---
g_serial = QtWidgets.QGroupBox("Serial Port")
hs = QtWidgets.QHBoxLayout(g_serial)
self.cmb_port = QtWidgets.QComboBox()
self.btn_refresh = QtWidgets.QPushButton("Refresh")
self.cmb_baud = QtWidgets.QComboBox()
self.cmb_baud.addItems(["9600","19200","38400","57600","115200","230400"])
self.cmb_baud.setCurrentText("115200")
self.btn_open = QtWidgets.QPushButton("Open")
hs.addWidget(QtWidgets.QLabel("Port:"))
hs.addWidget(self.cmb_port, 1)
hs.addWidget(self.btn_refresh)
hs.addSpacing(10)
hs.addWidget(QtWidgets.QLabel("Baud:"))
hs.addWidget(self.cmb_baud)
hs.addWidget(self.btn_open)
# --- Watch group (будет растягиваться) ---
g_watch = QtWidgets.QGroupBox("Watch Variables")
grid = QtWidgets.QGridLayout(g_watch)
grid.setHorizontalSpacing(8)
grid.setVerticalSpacing(4)
self.spin_index = QtWidgets.QSpinBox()
self.spin_index.setRange(0, 0x7FFF)
self.spin_index.setAccelerated(True)
self.chk_hex_index = QtWidgets.QCheckBox("Hex")
self.spin_count = QtWidgets.QSpinBox(); self.spin_count.setRange(1,255); self.spin_count.setValue(1)
self.btn_read_service = QtWidgets.QPushButton("Read Name/Type")
self.btn_read_values = QtWidgets.QPushButton("Read Value(s)")
self.btn_poll = QtWidgets.QPushButton("Start Polling")
self.spin_interval = QtWidgets.QSpinBox(); self.spin_interval.setRange(50,10000); self.spin_interval.setValue(500); self.spin_interval.setSuffix(" ms")
self.chk_auto_service = QtWidgets.QCheckBox("Auto service before values if miss cache"); self.chk_auto_service.setChecked(True)
self.chk_raw = QtWidgets.QCheckBox("Raw (no IQ scaling)")
self.lbl_name = QtWidgets.QLineEdit(); self.lbl_name.setReadOnly(True)
self.lbl_iq = QtWidgets.QLabel("-")
self.edit_single_value = QtWidgets.QLineEdit(); self.edit_single_value.setReadOnly(True)
# --- Таблица: теперь 5 столбцов (если уже поменяли) ---
self.tbl_values = QtWidgets.QTableWidget(0, 5)
self.tbl_values.setHorizontalHeaderLabels(["Index","Name","IQ","Raw","Scaled"])
self.tbl_values.setSizePolicy(QtWidgets.QSizePolicy.Expanding,
QtWidgets.QSizePolicy.Expanding)
hh = self.tbl_values.horizontalHeader()
hh.setSectionResizeMode(0, QtWidgets.QHeaderView.ResizeToContents)
hh.setSectionResizeMode(1, QtWidgets.QHeaderView.ResizeToContents)
hh.setSectionResizeMode(2, QtWidgets.QHeaderView.ResizeToContents)
hh.setSectionResizeMode(3, QtWidgets.QHeaderView.ResizeToContents)
hh.setSectionResizeMode(4, QtWidgets.QHeaderView.Stretch)
vh = self.tbl_values.verticalHeader()
vh.setVisible(False)
r = 0
grid.addWidget(QtWidgets.QLabel("Base Index:"), r, 0)
grid.addWidget(self.spin_index, r, 1)
grid.addWidget(self.chk_hex_index, r, 2); r += 1
grid.addWidget(QtWidgets.QLabel("Count:"), r, 0)
grid.addWidget(self.spin_count, r, 1); r += 1
grid.addWidget(self.btn_read_service, r, 0)
grid.addWidget(self.btn_read_values, r, 1)
grid.addWidget(self.btn_poll, r, 2); r += 1
grid.addWidget(QtWidgets.QLabel("Interval:"), r, 0)
grid.addWidget(self.spin_interval, r, 1)
grid.addWidget(self.chk_auto_service, r, 2); r += 1
grid.addWidget(QtWidgets.QLabel("Name:"), r, 0)
grid.addWidget(self.lbl_name, r, 1, 1, 2); r += 1
grid.addWidget(QtWidgets.QLabel("IQ:"), r, 0)
grid.addWidget(self.lbl_iq, r, 1)
grid.addWidget(self.chk_raw, r, 2); r += 1
grid.addWidget(QtWidgets.QLabel("Single:"), r, 0)
grid.addWidget(self.edit_single_value, r, 1, 1, 2); r += 1
grid.addWidget(QtWidgets.QLabel("Array Values:"), r, 0); r += 1
# --- Строка с таблицей, назначаем stretch ---
grid.addWidget(self.tbl_values, r, 0, 1, 3)
grid.setRowStretch(r, 1) # таблица тянется
# Все предыдущие строки по умолчанию имеют stretch=0
# Можно явно grid.setRowStretch(i, 0) при желании
# --- Добавляем группы в главный layout ---
layout.addWidget(g_serial, 0) # не растягивается (stretch=0)
layout.addWidget(g_watch, 1) # растягивается (stretch=1)
# --- UART Log (минимальная высота) ---
self.log_spoiler = Spoiler("UART Log", animationDuration=300, parent=self)
self.log_spoiler.setSizePolicy(QtWidgets.QSizePolicy.Expanding,
QtWidgets.QSizePolicy.Minimum)
log_layout = QtWidgets.QVBoxLayout()
self.txt_log = QtWidgets.QTextEdit()
self.txt_log.setReadOnly(True)
self.txt_log.setFontFamily("Courier")
self.txt_log.setSizePolicy(QtWidgets.QSizePolicy.Expanding,
QtWidgets.QSizePolicy.Minimum)
log_layout.addWidget(self.txt_log)
self.log_spoiler.setContentLayout(log_layout)
# Добавляем лог последним, но без stretch (0)
layout.addWidget(self.log_spoiler, 0)
# Строчки распределения: g_watch = 1, остальное = 0
# Если хочешь принудительно:
# layout.setStretchFactor(g_serial, 0) # PySide2: нет прямого метода, можно:
layout.setStretch(layout.indexOf(g_serial), 0)
layout.setStretch(layout.indexOf(g_watch), 1)
layout.setStretch(layout.indexOf(self.log_spoiler), 0)
def _connect_ui(self):
self.btn_refresh.clicked.connect(self.set_available_ports)
self.btn_open.clicked.connect(self._open_close_port)
self.btn_read_service.clicked.connect(self.request_service_single)
self.btn_read_values.clicked.connect(self.request_values)
self.btn_poll.clicked.connect(self._toggle_polling)
self.chk_hex_index.stateChanged.connect(self._toggle_index_base)
# ----------------------------- SERIAL MGMT ----------------------------
def set_available_ports(self):
cur = self.cmb_port.currentText()
self.cmb_port.blockSignals(True)
self.cmb_port.clear()
for info in QtSerialPort.QSerialPortInfo.availablePorts():
self.cmb_port.addItem(info.portName())
if cur:
ix = self.cmb_port.findText(cur)
if ix >= 0:
self.cmb_port.setCurrentIndex(ix)
self.cmb_port.blockSignals(False)
def _open_close_port(self):
if self.serial.isOpen():
name = self.serial.portName()
self.serial.close()
self.btn_open.setText("Open")
self._log(f"[PORT] Closed {name}")
self.portClosed.emit(name)
return
port = self.cmb_port.currentText()
if not port:
self._log("[ERR] No port selected")
return
self.serial.setPortName(port)
self.serial.setBaudRate(int(self.cmb_baud.currentText()))
if not self.serial.open(QtCore.QIODevice.ReadWrite):
self._log(f"[ERR] Open fail {port}: {self.serial.errorString()}")
return
self.btn_open.setText("Close")
self._log(f"[PORT] Opened {port}")
self.portOpened.emit(port)
# ---------------------------- FRAME BUILD -----------------------------
def _build_request(self, index: int, *, service: bool, varqnt: int) -> bytes:
dbg = index & 0x7FFF
if service:
dbg |= WATCH_SERVICE_BIT
hi = (dbg >> 8) & 0xFF
lo = dbg & 0xFF
q = varqnt & 0xFF
payload = bytes([self.device_addr & 0xFF, self.cmd_byte & 0xFF, hi, lo, q])
crc = crc16_ibm(payload)
return payload + bytes([crc & 0xFF, (crc >> 8) & 0xFF])
# ----------------------------- PUBLIC API -----------------------------
def request_service_single(self):
idx = int(self.spin_index.value())
self._enqueue_or_start(idx, service=True, varqnt=0)
def request_values(self):
base = int(self.spin_index.value())
count = int(self.spin_count.value())
needed = []
if self.chk_auto_service.isChecked():
for i in range(base, base+count):
if i not in self._name_cache:
needed.append(i)
if needed:
self._service_queue = needed[:] # копия
self._pending_data_after_services = (base, count)
self._log(f"[AUTO] Need service for {len(needed)} indices: {needed}")
self._kick_service_queue()
else:
self._enqueue_or_start(base, service=False, varqnt=count)
# -------------------------- SERVICE QUEUE FLOW ------------------------
def _kick_service_queue(self):
if self._busy:
return # дождёмся завершения
if self._service_queue:
nxt = self._service_queue.pop(0)
# не используем chain, просто по завершению снова вызовем _kick_service_queue
self._enqueue_or_start(nxt, service=True, varqnt=0, queue_mode=True)
elif self._pending_data_after_services:
base, count = self._pending_data_after_services
self._pending_data_after_services = None
self._enqueue_or_start(base, service=False, varqnt=count)
# ------------------------ TRANSACTION SCHEDULER -----------------------
def _enqueue_or_start(self, index, service: bool, varqnt: int, chain_after=None, queue_mode=False):
frame = self._build_request(index, service=service, varqnt=varqnt)
meta = {'service': service, 'index': index, 'varqnt': varqnt, 'chain': chain_after, 'queue_mode': queue_mode}
if self._busy:
if self._drop_if_busy and not self._replace_if_busy:
self._log("[LOCKSTEP] Busy -> drop")
return
if self._replace_if_busy:
self._pending_cmd = (frame, meta)
self._log("[LOCKSTEP] Busy -> replaced pending")
else:
self._log("[LOCKSTEP] Busy -> ignore")
return
self._start_txn(frame, meta)
def _start_txn(self, frame: bytes, meta: dict):
self._busy = True
self._txn_meta = meta
self._rx_buf.clear()
self._set_ui_busy(True)
self._send(frame)
self._txn_timer.start(self.read_timeout_ms)
def _end_txn(self):
self._txn_timer.stop()
queue_mode = False
chain = None
if self._txn_meta:
queue_mode = self._txn_meta.get('queue_mode', False)
chain = self._txn_meta.get('chain')
self._txn_meta = None
self._busy = False
self._rx_buf.clear()
self._set_ui_busy(False)
# Если был chain -> запустить его
if chain:
base, serv, q = chain
self._enqueue_or_start(base, service=serv, varqnt=q)
return
if self._pending_cmd is not None:
frame, meta = self._pending_cmd
self._pending_cmd = None
QtCore.QTimer.singleShot(0, lambda f=frame,m=meta: self._start_txn(f,m))
return
# Если это был элемент очереди service -> continue
if queue_mode:
QtCore.QTimer.singleShot(0, self._kick_service_queue)
return
def _on_txn_timeout(self):
if not self._busy:
return
self._log("[TIMEOUT] No response")
if self._rx_buf:
self._log_frame(bytes(self._rx_buf), tx=False)
self._end_txn()
# ------------------------------- TX/RX ---------------------------------
def _send(self, data: bytes):
w = self.serial.write(data)
if w != len(data):
self._log(f"[ERR] Write short {w}/{len(data)}")
self.txBytes.emit(data)
self._log_frame(data, tx=True)
def _on_ready_read(self):
self._rx_buf.extend(self.serial.readAll().data())
if not self._busy:
if self._rx_buf:
self._log("[WARN] Data while idle -> drop")
self._log_frame(bytes(self._rx_buf), tx=False)
self._rx_buf.clear()
return
self._try_parse()
# ------------------------------- PARSING -------------------------------
def _try_parse(self):
if not self._txn_meta:
return
service = self._txn_meta['service']
buf = self._rx_buf
trailer_len = 4
if service:
if len(buf) < 7 + trailer_len: # adr cmd vhi vlo status iq nameLen + trailer
return
name_len = buf[6]
expected = 7 + name_len + trailer_len
if len(buf) < expected:
return
frame = bytes(buf[:expected])
del buf[:expected]
self.rxBytes.emit(frame)
self._log_frame(frame, tx=False)
self._parse_service_frame(frame)
self._end_txn()
else:
if len(buf) < 6 + trailer_len: # adr cmd vhi vlo varqnt status + trailer
return
varqnt = buf[4]
status = buf[5]
if status != DEBUG_OK:
expected = 8 + trailer_len # + errIndex(2)
if len(buf) < expected:
return
frame = bytes(buf[:expected])
del buf[:expected]
self.rxBytes.emit(frame)
self._log_frame(frame, tx=False)
self._parse_data_frame(frame, error_mode=True)
self._end_txn()
else:
expected = 6 + varqnt*2 + trailer_len
if len(buf) < expected:
return
frame = bytes(buf[:expected])
del buf[:expected]
self.rxBytes.emit(frame)
self._log_frame(frame, tx=False)
self._parse_data_frame(frame, error_mode=False)
self._end_txn()
def _check_crc(self, payload: bytes, crc_lo: int, crc_hi: int):
if not self.auto_crc_check:
return True
crc_rx = (crc_hi << 8) | crc_lo
crc_calc = crc16_ibm(payload)
if crc_calc != crc_rx:
self._log(f"[CRC FAIL] calc=0x{crc_calc:04X} rx=0x{crc_rx:04X}")
return False
self._log("[CRC OK]")
return True
@staticmethod
def _clear_service_bit(vhi, vlo):
return ((vhi & 0x7F) << 8) | vlo
def _parse_service_frame(self, frame: bytes):
payload = frame[:-4]
crc_lo, crc_hi = frame[-4], frame[-3]
if len(payload) < 7:
self._log("[ERR] Service frame too short")
return
self._check_crc(payload, crc_lo, crc_hi)
adr, cmd, vhi, vlo, status, iq_raw, name_len = payload[:7]
index = self._clear_service_bit(vhi, vlo)
if len(payload) < 7 + name_len:
self._log("[ERR] Service name truncated")
return
name_bytes = payload[7:7+name_len]
name = name_bytes.decode(errors='replace')
# ### PATCH: извлекаем признаки
is_signed = (iq_raw & SIGN_BIT_MASK) != 0
frac_bits = iq_raw & FRAC_MASK_FULL
if status == DEBUG_OK:
# Кэшируем расширенный кортеж
self._name_cache[index] = (status, iq_raw, name, is_signed, frac_bits)
self.nameRead.emit(index, status, iq_raw, name)
if self.spin_count.value() == 1 and index == self.spin_index.value():
if status == DEBUG_OK:
self.lbl_name.setText(name)
# Отображаем IQ как «числоробных_бит + s/u»
self.lbl_iq.setText(f"{frac_bits}{'s' if is_signed else 'u'}")
else:
self.lbl_name.setText('<err>')
self.lbl_iq.setText('-')
self._log(f"[SERVICE] idx={index} status={status} iq_raw=0x{iq_raw:02X} "
f"sign={'S' if is_signed else 'U'} frac={frac_bits} name='{name}'")
def _parse_data_frame(self, frame: bytes, *, error_mode: bool):
payload = frame[:-4]
crc_lo, crc_hi = frame[-4], frame[-3]
if len(payload) < 6:
self._log("[ERR] Data frame too short")
return
self._check_crc(payload, crc_lo, crc_hi)
adr, cmd, vhi, vlo, varqnt, status = payload[:6]
base = self._clear_service_bit(vhi, vlo)
if error_mode:
if len(payload) < 8:
self._log("[ERR] Error frame truncated")
return
err_hi, err_lo = payload[6:8]
bad_index = (err_hi << 8) | err_lo
self._log(f"[DATA] ERROR status={status} bad_index={bad_index}")
self.valueRead.emit(bad_index, status, 0, 0, float('nan'))
self.valuesRead.emit(base, 0, [], [], [], [])
return
# Нормальный кадр
if len(payload) < 6 + varqnt*2:
self._log("[ERR] Data payload truncated")
return
raw_vals = []
pos = 6
for _ in range(varqnt):
hi = payload[pos]; lo = payload[pos+1]; pos += 2
raw16 = (hi << 8) | lo
raw_vals.append(raw16) # пока храним как 0..65535
idx_list = []
iq_list = []
name_list = []
scaled_list = []
display_raw_list = [] # для таблицы Raw (с учётом знака, если знак есть)
for ofs, raw16 in enumerate(raw_vals):
idx = base + ofs
# В кэше теперь 5 элементов
status_i, iq_raw, name_i, is_signed, frac_bits = self._name_cache.get(
idx,
(DEBUG_OK, 0, '', False, 0)
)
# Приведение знака
if is_signed and (raw16 & 0x8000):
value_int = raw16 - 0x10000 # signed 16-bit
else:
value_int = raw16
# Масштаб
if self.chk_raw.isChecked():
scale = 1.0
else:
# scale берём: если в словаре нет — вычисляем 2**frac_bits
scale = self.iq_scaling.get(frac_bits, 2.0 ** frac_bits)
scaled = value_int / scale
idx_list.append(idx)
iq_list.append(iq_raw) # сырой байт (с битом знака)
name_list.append(name_i)
scaled_list.append(scaled)
display_raw_list.append(value_int)
# Populate table
self._populate_table(idx_list, name_list, iq_list, display_raw_list, scaled_list)
if varqnt == 1:
self.edit_single_value.setText(
str(display_raw_list[0]) if self.chk_raw.isChecked() else f"{scaled_list[0]:.6g}"
)
if idx_list[0] == self.spin_index.value():
# Отобразим мета
# Достаём из кэша снова (или можно из name_list/iq_list + пересчитать)
_, iq_raw0, name0, is_signed0, frac0 = self._name_cache.get(idx_list[0], (DEBUG_OK, 0, '', False, 0))
self.lbl_name.setText(name0)
self.lbl_iq.setText(f"{frac0}{'s' if is_signed0 else 'u'}")
self.valueRead.emit(idx_list[0], status, iq_list[0], display_raw_list[0], scaled_list[0])
else:
self.edit_single_value.setText("")
self.valuesRead.emit(base, varqnt, idx_list, iq_list, display_raw_list, scaled_list)
self._log(f"[DATA] base={base} q={varqnt} values={[f'{v:.6g}' for v in scaled_list] if not self.chk_raw.isChecked() else raw_vals}")
def _populate_table(self, idxs, names, iqs, raws, scaled):
self.tbl_values.setRowCount(len(idxs))
for row, (idx, nm, iq_raw, rv, sv) in enumerate(zip(idxs, names, iqs, raws, scaled)):
is_signed = (iq_raw & SIGN_BIT_MASK) != 0
frac_bits = iq_raw & FRAC_MASK_FULL
iq_disp = f"{frac_bits}{'s' if is_signed else 'u'}"
raw_display = str(rv)
scaled_display = raw_display if self.chk_raw.isChecked() else f"{sv:.6g}"
items = [
QtWidgets.QTableWidgetItem(str(idx)),
QtWidgets.QTableWidgetItem(nm),
QtWidgets.QTableWidgetItem(iq_disp),
QtWidgets.QTableWidgetItem(raw_display),
QtWidgets.QTableWidgetItem(scaled_display),
]
for it in items:
it.setFlags(it.flags() & ~QtCore.Qt.ItemIsEditable)
for c, it in enumerate(items):
self.tbl_values.setItem(row, c, it)
# ------------------------------ POLLING --------------------------------
def _toggle_polling(self):
if self._polling:
self._poll_timer.stop(); self._polling=False; self.btn_poll.setText("Start Polling"); self._log("[POLL] Stopped")
self.btn_read_service.setEnabled(True)
self.btn_read_values.setEnabled(True)
else:
self._poll_timer.start(self.spin_interval.value()); self._polling=True; self.btn_poll.setText("Stop Polling"); self._log(f"[POLL] Started interval={self.spin_interval.value()}ms")
self.btn_read_service.setEnabled(False)
self.btn_read_values.setEnabled(False)
def _on_poll_timeout(self):
if not self.serial.isOpen() or self._busy:
return
self.request_values()
# ------------------------------ HELPERS --------------------------------
def _toggle_index_base(self, st):
val = self.spin_index.value()
if st == QtCore.Qt.Checked:
self.spin_index.setDisplayIntegerBase(16); self.spin_index.setPrefix("0x")
else:
self.spin_index.setDisplayIntegerBase(10); self.spin_index.setPrefix("")
self.spin_index.setValue(val)
def _set_ui_busy(self, busy: bool):
if self._polling == False:
self.btn_read_service.setEnabled(not busy)
self.btn_read_values.setEnabled(not busy)
def _on_serial_error(self, err):
if err == QtSerialPort.QSerialPort.NoError:
return
self._log(f"[SERIAL ERR] {self.serial.errorString()} ({err})")
if self._busy:
self._end_txn()
# ------------------------------ LOGGING --------------------------------
def _log(self, msg: str):
ts = datetime.datetime.now().strftime('%H:%M:%S.%f')[:-3]
self.txt_log.append(f"{ts} {msg}")
def _log_frame(self, data: bytes, *, tx: bool):
tag = 'TX' if tx else 'RX'
hexs = ' '.join(f"{b:02X}" for b in data)
ascii_part = ''.join(chr(b) if 32 <= b < 127 else '.' for b in data)
self._log(f"[{tag}] {hexs} |{ascii_part}|")
# ---------------------------------------------------------- Demo harness ---
class _DemoWindow(QtWidgets.QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle("DebugVar Terminal")
self.term = DebugTerminalWidget(self)
self.setCentralWidget(self.term)
# connect sample signals -> print
self.term.nameRead.connect(self._on_name)
self.term.valueRead.connect(self._on_value)
def _on_name(self, index, status, iq, name):
print(f"Name idx={index} status={status} iq={iq} name='{name}'")
def _on_value(self, index, status, iq, raw16, floatVal):
print(f"Value idx={index} status={status} iq={iq} raw={raw16} val={floatVal}")
def closeEvent(self, event):
"""Вызывается при закрытии окна."""
# Явно удаляем центральный виджет
self.setCentralWidget(None)
if self.term:
self.term.deleteLater()
self.term = None
super().closeEvent(event)
# ------------------------------- Demo --------------------------------------
if __name__ == "__main__":
import sys
app = QtWidgets.QApplication(sys.argv)
win = _DemoWindow()
win.show()
sys.exit(app.exec_())