debugVarTool/Src/tms_debugvar_term.py

971 lines
42 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from PySide2 import QtCore, QtWidgets, QtSerialPort
from tms_debugvar_lowlevel import LowLevelSelectorWidget
import datetime
# ------------------------------- Константы протокола ------------------------
WATCH_SERVICE_BIT = 0x8000
DEBUG_OK = 0 # ожидаемый код успешного чтения
SIGN_BIT_MASK = 0x80
FRAC_MASK_FULL = 0x7F # если используем 7 бит дробной части
# ---------------------------------------------------------------- CRC util ---
def crc16_ibm(data: bytes, *, init=0xFFFF) -> int:
"""CRC16-IBM (aka CRC-16/ANSI, polynomial 0xA001 reflected)."""
crc = init
for b in data:
crc ^= b
for _ in range(8):
if crc & 1:
crc = (crc >> 1) ^ 0xA001
else:
crc >>= 1
return crc & 0xFFFF
class Spoiler(QtWidgets.QWidget):
def __init__(self, title="", animationDuration=300, parent=None):
super().__init__(parent)
self._animationDuration = animationDuration
self.state = False
# --- Toggle button ---
self.toggleButton = QtWidgets.QToolButton(self)
self.toggleButton.setStyleSheet("QToolButton { border: none; }")
self.toggleButton.setToolButtonStyle(QtCore.Qt.ToolButtonTextBesideIcon)
self.toggleButton.setArrowType(QtCore.Qt.RightArrow)
self.toggleButton.setText(title)
self.toggleButton.setCheckable(True)
# --- Header line ---
self.headerLine = QtWidgets.QFrame(self)
self.headerLine.setFrameShape(QtWidgets.QFrame.HLine)
self.headerLine.setFrameShadow(QtWidgets.QFrame.Sunken)
# --- Content area ---
self.contentArea = QtWidgets.QScrollArea(self)
self.contentArea.setSizePolicy(QtWidgets.QSizePolicy.Expanding, QtWidgets.QSizePolicy.Fixed)
self.contentArea.setFrameShape(QtWidgets.QFrame.NoFrame)
self.contentArea.setWidgetResizable(True)
self._contentWidget = QtWidgets.QWidget()
self.contentArea.setWidget(self._contentWidget)
self.contentArea.setMaximumHeight(0)
# --- Анимация только по контенту ---
self._ani_content = QtCore.QPropertyAnimation(self.contentArea, b"maximumHeight")
self._ani_content.setDuration(animationDuration)
self._ani_content.setEasingCurve(QtCore.QEasingCurve.InOutCubic)
# Следим за шагами анимации → обновляем родителя
self._ani_content.valueChanged.connect(self._adjust_parent_size)
# --- Layout ---
self.mainLayout = QtWidgets.QGridLayout(self)
self.mainLayout.setVerticalSpacing(0)
self.mainLayout.setContentsMargins(0, 0, 0, 0)
self.mainLayout.addWidget(self.toggleButton, 0, 0, 1, 1)
self.mainLayout.addWidget(self.headerLine, 0, 1, 1, 1)
self.mainLayout.addWidget(self.contentArea, 1, 0, 1, 2)
# --- Signals ---
self.toggleButton.clicked.connect(self._on_toggled)
def setContentLayout(self, contentLayout):
old = self._contentWidget.layout()
if old:
QtWidgets.QWidget().setLayout(old)
self._contentWidget.setLayout(contentLayout)
def getState(self):
return self.state
def _adjust_parent_size(self, *_):
top = self.window()
if top:
size = top.size()
size.setHeight(top.sizeHint().height()) # берём новую высоту
top.resize(size) # ширина остаётся прежней
def _on_toggled(self, checked: bool):
self.state = checked
self.toggleButton.setArrowType(QtCore.Qt.DownArrow if checked else QtCore.Qt.RightArrow)
contentHeight = self._contentWidget.sizeHint().height()
self._ani_content.stop()
self._ani_content.setStartValue(self.contentArea.maximumHeight())
self._ani_content.setEndValue(contentHeight if checked else 0)
# --- Фиксируем ширину на время анимации ---
w = self.width()
self.setFixedWidth(w)
self._ani_content.finished.connect(lambda: self.setMaximumWidth(16777215)) # сброс фикса
self._ani_content.start()
# --------------------------- DebugTerminalWidget ---------------------------
class DebugTerminalWidget(QtWidgets.QWidget):
# Существующие сигналы (Watch)
nameRead = QtCore.Signal(int, int, int, str)
valueRead = QtCore.Signal(int, int, int, int, float)
valuesRead = QtCore.Signal(int, int, list, list, list, list)
# Новые сигналы (LowLevel)
llValueRead = QtCore.Signal(int, int, int, int, float) # addr, status, rettype_raw, raw16_signed, scaled
portOpened = QtCore.Signal(str)
portClosed = QtCore.Signal(str)
txBytes = QtCore.Signal(bytes)
rxBytes = QtCore.Signal(bytes)
def __init__(self, parent=None, *,
start_byte=0x0A,
cmd_byte=0x46,
cmd_lowlevel=0x47,
iq_scaling=None,
read_timeout_ms=250,
auto_crc_check=True,
drop_if_busy=False,
replace_if_busy=True):
super().__init__(parent)
self.device_addr = start_byte
self.cmd_byte = cmd_byte
self.cmd_lowlevel = cmd_lowlevel
self.read_timeout_ms = read_timeout_ms
self.auto_crc_check = auto_crc_check
self._drop_if_busy = drop_if_busy
self._replace_if_busy = replace_if_busy
if iq_scaling is None:
iq_scaling = {n: float(1 << n) for n in range(31)}
iq_scaling[0] = 1.0
self.iq_scaling = iq_scaling
# Serial
self.serial = QtSerialPort.QSerialPort(self)
self.serial.setBaudRate(115200)
self.serial.readyRead.connect(self._on_ready_read)
self.serial.errorOccurred.connect(self._on_serial_error)
# State
self._rx_buf = bytearray()
self._busy = False
self._pending_cmd = None # (frame, meta)
self._txn_meta = None # {'service':bool,'index':int,'varqnt':int,'chain':...,'lowlevel':bool}
self._txn_timer = QtCore.QTimer(self)
self._txn_timer.setSingleShot(True)
self._txn_timer.timeout.connect(self._on_txn_timeout)
# Watch polling
self._poll_timer = QtCore.QTimer(self)
self._poll_timer.timeout.connect(self._on_poll_timeout)
self._polling = False
# LowLevel polling
self._ll_poll_timer = QtCore.QTimer(self)
self._ll_poll_timer.timeout.connect(self._on_ll_poll_timeout)
self._ll_polling = False
self._ll_current_var_info = None # Хранит инфо о выбранной LL переменной
# Кэш: index -> (status, iq, name, is_signed, frac_bits)
self._name_cache = {}
# Очередь service индексов
self._service_queue = []
self._pending_data_after_services = None # (base, count)
self._build_ui()
self._connect_ui()
self.set_available_ports()
# ------------------------------ UI ----------------------------------
def _build_ui(self):
layout = QtWidgets.QVBoxLayout(self)
# --- Serial group ---
g_serial = QtWidgets.QGroupBox("Serial Port")
hs = QtWidgets.QHBoxLayout(g_serial)
self.cmb_port = QtWidgets.QComboBox()
self.btn_refresh = QtWidgets.QPushButton("Refresh")
self.cmb_baud = QtWidgets.QComboBox()
self.cmb_baud.addItems(["9600","19200","38400","57600","115200","230400"])
self.cmb_baud.setCurrentText("115200")
self.btn_open = QtWidgets.QPushButton("Open")
hs.addWidget(QtWidgets.QLabel("Port:"))
hs.addWidget(self.cmb_port, 1)
hs.addWidget(self.btn_refresh)
hs.addSpacing(10)
hs.addWidget(QtWidgets.QLabel("Baud:"))
hs.addWidget(self.cmb_baud)
hs.addWidget(self.btn_open)
# --- TabWidget ---
self.tabs = QtWidgets.QTabWidget()
self._build_watch_tab()
self._build_lowlevel_tab() # <-- Вызываем новый метод
# --- UART Log ---
self.log_spoiler = Spoiler("UART Log", animationDuration=300, parent=self)
self.log_spoiler.setSizePolicy(QtWidgets.QSizePolicy.Expanding,
QtWidgets.QSizePolicy.Minimum)
log_layout = QtWidgets.QVBoxLayout()
self.txt_log = QtWidgets.QTextEdit(); self.txt_log.setReadOnly(True)
self.txt_log.setFontFamily("Courier")
log_layout.addWidget(self.txt_log)
self.log_spoiler.setContentLayout(log_layout)
layout.addWidget(g_serial, 0)
layout.addWidget(self.tabs, 1)
layout.addWidget(self.log_spoiler, 0)
layout.setStretch(layout.indexOf(g_serial), 0)
layout.setStretch(layout.indexOf(self.tabs), 1)
layout.setStretch(layout.indexOf(self.log_spoiler), 0)
def _build_watch_tab(self):
# ... (код для вкладки Watch остаётся без изменений)
tab = QtWidgets.QWidget()
vtab = QtWidgets.QVBoxLayout(tab)
g_watch = QtWidgets.QGroupBox("Watch Variables")
grid = QtWidgets.QGridLayout(g_watch)
grid.setHorizontalSpacing(8)
grid.setVerticalSpacing(4)
self.spin_index = QtWidgets.QSpinBox(); self.spin_index.setRange(0, 0x7FFF); self.spin_index.setAccelerated(True)
self.chk_hex_index = QtWidgets.QCheckBox("Hex")
self.spin_count = QtWidgets.QSpinBox(); self.spin_count.setRange(1,255); self.spin_count.setValue(1)
self.btn_read_service = QtWidgets.QPushButton("Read Name/Type")
self.btn_read_values = QtWidgets.QPushButton("Read Value(s)")
self.btn_poll = QtWidgets.QPushButton("Start Polling")
self.spin_interval = QtWidgets.QSpinBox(); self.spin_interval.setRange(50,10000); self.spin_interval.setValue(500); self.spin_interval.setSuffix(" ms")
self.chk_auto_service = QtWidgets.QCheckBox("Auto service before values if miss cache"); self.chk_auto_service.setChecked(True)
self.chk_raw = QtWidgets.QCheckBox("Raw (no IQ scaling)")
self.lbl_name = QtWidgets.QLineEdit(); self.lbl_name.setReadOnly(True)
self.lbl_iq = QtWidgets.QLabel("-")
self.edit_single_value = QtWidgets.QLineEdit(); self.edit_single_value.setReadOnly(True)
self.tbl_values = QtWidgets.QTableWidget(0, 5)
self.tbl_values.setHorizontalHeaderLabels(["Index","Name","IQ","Raw","Scaled"])
hh = self.tbl_values.horizontalHeader()
hh.setSectionResizeMode(0, QtWidgets.QHeaderView.ResizeToContents)
hh.setSectionResizeMode(1, QtWidgets.QHeaderView.ResizeToContents)
hh.setSectionResizeMode(2, QtWidgets.QHeaderView.ResizeToContents)
hh.setSectionResizeMode(3, QtWidgets.QHeaderView.ResizeToContents)
hh.setSectionResizeMode(4, QtWidgets.QHeaderView.Stretch)
self.tbl_values.verticalHeader().setVisible(False)
r = 0
grid.addWidget(QtWidgets.QLabel("Base Index:"), r, 0); grid.addWidget(self.spin_index, r, 1); grid.addWidget(self.chk_hex_index, r, 2); r+=1
grid.addWidget(QtWidgets.QLabel("Count:"), r, 0); grid.addWidget(self.spin_count, r, 1); r+=1
grid.addWidget(self.btn_read_service, r, 0); grid.addWidget(self.btn_read_values, r, 1); grid.addWidget(self.btn_poll, r, 2); r+=1
grid.addWidget(QtWidgets.QLabel("Interval:"), r, 0); grid.addWidget(self.spin_interval, r, 1); grid.addWidget(self.chk_auto_service, r, 2); r+=1
grid.addWidget(QtWidgets.QLabel("Name:"), r, 0); grid.addWidget(self.lbl_name, r, 1, 1, 2); r+=1
grid.addWidget(QtWidgets.QLabel("IQ:"), r, 0); grid.addWidget(self.lbl_iq, r, 1); grid.addWidget(self.chk_raw, r, 2); r+=1
grid.addWidget(QtWidgets.QLabel("Single:"), r, 0); grid.addWidget(self.edit_single_value, r, 1, 1, 2); r+=1
grid.addWidget(QtWidgets.QLabel("Array Values:"), r, 0); r+=1
grid.addWidget(self.tbl_values, r, 0, 1, 3); grid.setRowStretch(r, 1)
vtab.addWidget(g_watch, 1)
self.tabs.addTab(tab, "Watch")
def _build_lowlevel_tab(self):
tab = QtWidgets.QWidget()
main_layout = QtWidgets.QVBoxLayout(tab)
# --- Селектор переменной ---
self.ll_selector = LowLevelSelectorWidget(tab)
main_layout.addWidget(self.ll_selector, 1) # Даём ему растягиваться
# --- Панель управления и статуса ---
g_controls = QtWidgets.QGroupBox("Controls & Status")
grid = QtWidgets.QGridLayout(g_controls)
self.btn_ll_read = QtWidgets.QPushButton("Read Once")
self.btn_ll_poll = QtWidgets.QPushButton("Start Polling")
self.spin_ll_interval = QtWidgets.QSpinBox()
self.spin_ll_interval.setRange(50, 10000)
self.spin_ll_interval.setValue(500)
self.spin_ll_interval.setSuffix(" ms")
self.chk_ll_raw = QtWidgets.QCheckBox("Raw (no IQ scaling)")
self.ll_val_status = QtWidgets.QLabel("-")
self.ll_val_rettype = QtWidgets.QLabel("-")
self.ll_val_raw = QtWidgets.QLabel("-")
self.ll_val_scaled = QtWidgets.QLabel("-")
# Размещение виджетов
grid.addWidget(self.btn_ll_read, 0, 0)
grid.addWidget(self.btn_ll_poll, 0, 1)
grid.addWidget(QtWidgets.QLabel("Interval:"), 1, 0)
grid.addWidget(self.spin_ll_interval, 1, 1)
grid.addWidget(self.chk_ll_raw, 0, 2, 2, 1) # Растянем на 2 строки
# Результаты в виде формы
form_layout = QtWidgets.QFormLayout()
form_layout.addRow("Status:", self.ll_val_status)
form_layout.addRow("Return Type:", self.ll_val_rettype)
form_layout.addRow("Raw Value:", self.ll_val_raw)
form_layout.addRow("Scaled Value:", self.ll_val_scaled)
grid.addLayout(form_layout, 2, 0, 1, 3)
grid.setColumnStretch(2, 1)
main_layout.addWidget(g_controls)
main_layout.setStretchFactor(g_controls, 0) # Не растягивать
self.tabs.addTab(tab, "LowLevel")
def _connect_ui(self):
# Watch
self.btn_refresh.clicked.connect(self.set_available_ports)
self.btn_open.clicked.connect(self._open_close_port)
self.btn_read_service.clicked.connect(self.request_service_single)
self.btn_read_values.clicked.connect(self.request_values)
self.btn_poll.clicked.connect(self._toggle_polling)
self.chk_hex_index.stateChanged.connect(self._toggle_index_base)
# LowLevel (новые и переделанные)
self.ll_selector.variablePrepared.connect(self._on_ll_variable_prepared)
self.ll_selector.xmlLoaded.connect(lambda p: self._log(f"[LL] XML loaded: {p}"))
self.btn_ll_read.clicked.connect(self.request_lowlevel_once)
self.btn_ll_poll.clicked.connect(self._toggle_ll_polling)
# ----------------------------- SERIAL MGMT ----------------------------
# ... (код без изменений)
def set_available_ports(self):
cur = self.cmb_port.currentText()
self.cmb_port.blockSignals(True)
self.cmb_port.clear()
for info in QtSerialPort.QSerialPortInfo.availablePorts():
self.cmb_port.addItem(info.portName())
if cur:
ix = self.cmb_port.findText(cur)
if ix >= 0:
self.cmb_port.setCurrentIndex(ix)
self.cmb_port.blockSignals(False)
def _open_close_port(self):
if self.serial.isOpen():
name = self.serial.portName()
self.serial.close()
self.btn_open.setText("Open")
self._log(f"[PORT] Closed {name}")
self.portClosed.emit(name)
return
port = self.cmb_port.currentText()
if not port:
self._log("[ERR] No port selected")
return
self.serial.setPortName(port)
self.serial.setBaudRate(int(self.cmb_baud.currentText()))
if not self.serial.open(QtCore.QIODevice.ReadWrite):
self._log(f"[ERR] Open fail {port}: {self.serial.errorString()}")
return
self.btn_open.setText("Close")
self._log(f"[PORT] Opened {port}")
self.portOpened.emit(port)
# ---------------------------- FRAME BUILD -----------------------------
def _build_request(self, index: int, *, service: bool, varqnt: int) -> bytes:
dbg = index & 0x7FFF
if service:
dbg |= WATCH_SERVICE_BIT
hi = (dbg >> 8) & 0xFF
lo = dbg & 0xFF
q = varqnt & 0xFF
payload = bytes([self.device_addr & 0xFF, self.cmd_byte & 0xFF, hi, lo, q])
crc = crc16_ibm(payload)
return payload + bytes([crc & 0xFF, (crc >> 8) & 0xFF])
def _build_lowlevel_request(self, var_info: dict) -> bytes:
# Формат: [adr][cmd_lowlevel][year_hi][year_lo][month][day][hour][minute][addr2][addr1][addr0][pt_type][iq_type][return_type]
# Пытаемся получить время из переданной информации
dt_info = var_info.get('datetime')
if dt_info:
# Используем время из var_info
year = dt_info.get('year', 2000)
month = dt_info.get('month', 1)
day = dt_info.get('day', 1)
hour = dt_info.get('hour', 0)
minute = dt_info.get('minute', 0)
self._log("[LL] Using time from selector.")
else:
# Если в var_info времени нет, используем текущее системное время (старое поведение)
now = QtCore.QDateTime.currentDateTime()
year = now.date().year()
month = now.date().month()
day = now.date().day()
hour = now.time().hour()
minute = now.time().minute()
self._log("[LL] Fallback to current system time.")
addr = var_info.get('address', 0)
addr2 = (addr >> 16) & 0xFF
addr1 = (addr >> 8) & 0xFF
addr0 = addr & 0xFF
pt_type = var_info.get('ptr_type', 0) & 0xFF
iq_type = var_info.get('iq_type', 0) & 0xFF
ret_type = var_info.get('return_type', 0) & 0xFF
frame_wo_crc = bytes([
self.device_addr & 0xFF, self.cmd_lowlevel & 0xFF,
(year >> 8) & 0xFF, year & 0xFF,
month & 0xFF, day & 0xFF, hour & 0xFF, minute & 0xFF,
addr2, addr1, addr0, pt_type, iq_type, ret_type
])
crc = crc16_ibm(frame_wo_crc)
return frame_wo_crc + bytes([crc & 0xFF, (crc >> 8) & 0xFF])
# ----------------------------- PUBLIC API -----------------------------
def request_service_single(self):
idx = int(self.spin_index.value())
self._enqueue_or_start(idx, service=True, varqnt=0)
def request_values(self):
base = int(self.spin_index.value())
count = int(self.spin_count.value())
needed = []
if self.chk_auto_service.isChecked():
for i in range(base, base+count):
if i not in self._name_cache:
needed.append(i)
if needed:
self._service_queue = needed[:]
self._pending_data_after_services = (base, count)
self._log(f"[AUTO] Need service for {len(needed)} indices: {needed}")
self._kick_service_queue()
else:
self._enqueue_or_start(base, service=False, varqnt=count)
def request_lowlevel_once(self):
"""Запрашивает чтение выбранной LowLevel переменной."""
if not self.serial.isOpen():
self._log("[LL] Port is not open.")
return
if self._busy:
self._log("[LL] Busy, request dropped.")
return
if not self._ll_current_var_info:
self._log("[LL] No variable selected!")
if self._ll_polling: # Если поллинг активен, но переменная пропала - стоп
self._toggle_ll_polling()
return
frame = self._build_lowlevel_request(self._ll_current_var_info)
meta = {'lowlevel': True}
self._enqueue_raw(frame, meta)
# -------------------------- SERVICE QUEUE FLOW ------------------------
# ... (код без изменений)
def _kick_service_queue(self):
if self._busy:
return
if self._service_queue:
nxt = self._service_queue.pop(0)
self._enqueue_or_start(nxt, service=True, varqnt=0, queue_mode=True)
elif self._pending_data_after_services:
base, count = self._pending_data_after_services
self._pending_data_after_services = None
self._enqueue_or_start(base, service=False, varqnt=count)
# ------------------------ TRANSACTION SCHEDULER -----------------------
# ... (код без изменений)
def _enqueue_raw(self, frame: bytes, meta: dict):
if self._busy:
if self._drop_if_busy and not self._replace_if_busy:
self._log("[LOCKSTEP] Busy -> drop")
return
if self._replace_if_busy:
self._pending_cmd = (frame, meta)
self._log("[LOCKSTEP] Busy -> replaced pending")
else:
self._log("[LOCKSTEP] Busy -> ignore")
return
self._start_txn(frame, meta)
def _enqueue_or_start(self, index, service: bool, varqnt: int, chain_after=None, queue_mode=False):
frame = self._build_request(index, service=service, varqnt=varqnt)
meta = {'service': service, 'index': index, 'varqnt': varqnt, 'chain': chain_after, 'queue_mode': queue_mode, 'lowlevel': False}
if self._busy:
if self._drop_if_busy and not self._replace_if_busy:
self._log("[LOCKSTEP] Busy -> drop")
return
if self._replace_if_busy:
self._pending_cmd = (frame, meta)
self._log("[LOCKSTEP] Busy -> replaced pending")
else:
self._log("[LOCKSTEP] Busy -> ignore")
return
self._start_txn(frame, meta)
def _start_txn(self, frame: bytes, meta: dict):
self._busy = True
self._txn_meta = meta
self._rx_buf.clear()
self._set_ui_busy(True)
self._send(frame)
self._txn_timer.start(self.read_timeout_ms)
def _end_txn(self):
self._txn_timer.stop()
queue_mode = False
chain = None
meta = self._txn_meta
if meta:
queue_mode = meta.get('queue_mode', False)
chain = meta.get('chain')
self._txn_meta = None
self._busy = False
self._rx_buf.clear()
self._set_ui_busy(False)
if chain:
base, serv, q = chain
self._enqueue_or_start(base, service=serv, varqnt=q)
return
if self._pending_cmd is not None:
frame, meta = self._pending_cmd; self._pending_cmd = None
QtCore.QTimer.singleShot(0, lambda f=frame,m=meta: self._start_txn(f,m))
return
if queue_mode:
QtCore.QTimer.singleShot(0, self._kick_service_queue)
return
def _on_txn_timeout(self):
if not self._busy: return
is_ll = self._txn_meta.get('lowlevel', False) if self._txn_meta else False
log_prefix = "[LL TIMEOUT]" if is_ll else "[TIMEOUT]"
self._log(f"{log_prefix} No response")
if self._rx_buf:
self._log_frame(bytes(self._rx_buf), tx=False)
self._end_txn()
# ------------------------------- TX/RX ---------------------------------
# ... (код без изменений)
def _send(self, data: bytes):
w = self.serial.write(data)
if w != len(data):
self._log(f"[ERR] Write short {w}/{len(data)}")
self.txBytes.emit(data)
self._log_frame(data, tx=True)
def _on_ready_read(self):
self._rx_buf.extend(self.serial.readAll().data())
if not self._busy:
if self._rx_buf:
self._log("[WARN] Data while idle -> drop")
self._log_frame(bytes(self._rx_buf), tx=False)
self._rx_buf.clear()
return
self._try_parse()
# ------------------------------- PARSING -------------------------------
def _try_parse(self):
if not self._txn_meta:
return
if self._txn_meta.get('lowlevel', False):
self._try_parse_lowlevel()
else:
self._try_parse_watch()
def _try_parse_watch(self):
# ... (код без изменений)
service = self._txn_meta['service']
buf = self._rx_buf
trailer_len = 4
if service:
if len(buf) < 7 + trailer_len:
return
name_len = buf[6]
expected = 7 + name_len + trailer_len
if len(buf) < expected:
return
frame = bytes(buf[:expected]); del buf[:expected]
self.rxBytes.emit(frame); self._log_frame(frame, tx=False)
self._parse_service_frame(frame)
self._end_txn()
else:
if len(buf) < 6 + trailer_len:
return
varqnt = buf[4]; status = buf[5]
if status != DEBUG_OK:
expected = 8 + trailer_len
if len(buf) < expected: return
frame = bytes(buf[:expected]); del buf[:expected]
self.rxBytes.emit(frame); self._log_frame(frame, tx=False)
self._parse_data_frame(frame, error_mode=True)
self._end_txn()
else:
expected = 6 + varqnt*2 + trailer_len
if len(buf) < expected: return
frame = bytes(buf[:expected]); del buf[:expected]
self.rxBytes.emit(frame); self._log_frame(frame, tx=False)
self._parse_data_frame(frame, error_mode=False)
self._end_txn()
def _try_parse_lowlevel(self):
# Ожидаемая длина: Успех=13, Ошибка=10
buf = self._rx_buf
if len(buf) < 10: # Минимальная длина (ошибка)
return
# Проверяем, что ответ для нас
if buf[1] != self.cmd_lowlevel:
self._log("[LL] Unexpected cmd in lowlevel parser, flushing.")
self._log_frame(bytes(self._rx_buf), tx=False)
self._rx_buf.clear()
# Не завершаем транзакцию, ждём таймаута
return
status = buf[2]
expected_len = 13 if status == DEBUG_OK else 10
if len(buf) >= expected_len:
frame = bytes(buf[:expected_len])
del buf[:expected_len]
self.rxBytes.emit(frame)
self._log_frame(frame, tx=False)
self._parse_lowlevel_frame(frame, success=(status == DEBUG_OK))
self._end_txn()
def _check_crc(self, payload: bytes, crc_lo: int, crc_hi: int):
if not self.auto_crc_check:
return True
crc_rx = (crc_hi << 8) | crc_lo
crc_calc = crc16_ibm(payload)
if crc_calc != crc_rx:
self._log(f"[CRC FAIL] calc=0x{crc_calc:04X} rx=0x{crc_rx:04X}")
return False
self._log("[CRC OK]")
return True
@staticmethod
def _clear_service_bit(vhi, vlo):
return ((vhi & 0x7F) << 8) | vlo
def _parse_service_frame(self, frame: bytes):
# ... (код без изменений)
payload = frame[:-4]; crc_lo, crc_hi = frame[-4], frame[-3]
if len(payload) < 7:
self._log("[ERR] Service frame too short"); return
self._check_crc(payload, crc_lo, crc_hi)
adr, cmd, vhi, vlo, status, iq_raw, name_len = payload[:7]
index = self._clear_service_bit(vhi, vlo)
if len(payload) < 7 + name_len:
self._log("[ERR] Service name truncated"); return
name_bytes = payload[7:7+name_len]; name = name_bytes.decode(errors='replace')
is_signed = (iq_raw & SIGN_BIT_MASK) != 0
frac_bits = iq_raw & FRAC_MASK_FULL
if status == DEBUG_OK:
self._name_cache[index] = (status, iq_raw, name, is_signed, frac_bits)
self.nameRead.emit(index, status, iq_raw, name)
if self.spin_count.value() == 1 and index == self.spin_index.value():
if status == DEBUG_OK:
self.lbl_name.setText(name); self.lbl_iq.setText(f"{frac_bits}{'s' if is_signed else 'u'}")
else:
self.lbl_name.setText('<err>'); self.lbl_iq.setText('-')
self._log(f"[SERVICE] idx={index} status={status} iq_raw=0x{iq_raw:02X} sign={'S' if is_signed else 'U'} frac={frac_bits} name='{name}'")
def _parse_data_frame(self, frame: bytes, *, error_mode: bool):
# ... (код без изменений)
payload = frame[:-4]; crc_lo, crc_hi = frame[-4], frame[-3]
if len(payload) < 6:
self._log("[ERR] Data frame too short"); return
self._check_crc(payload, crc_lo, crc_hi)
adr, cmd, vhi, vlo, varqnt, status = payload[:6]
base = self._clear_service_bit(vhi, vlo)
if error_mode:
if len(payload) < 8:
self._log("[ERR] Error frame truncated"); return
err_hi, err_lo = payload[6:8]; bad_index = (err_hi << 8) | err_lo
self._log(f"[DATA] ERROR status={status} bad_index={bad_index}")
self.valueRead.emit(bad_index, status, 0, 0, float('nan'))
self.valuesRead.emit(base, 0, [], [], [], [])
return
if len(payload) < 6 + varqnt*2:
self._log("[ERR] Data payload truncated"); return
raw_vals = []
pos = 6
for _ in range(varqnt):
hi = payload[pos]; lo = payload[pos+1]; pos += 2
raw16 = (hi << 8) | lo
raw_vals.append(raw16)
idx_list = []; iq_list = []; name_list = []; scaled_list = []; display_raw_list = []
for ofs, raw16 in enumerate(raw_vals):
idx = base + ofs
status_i, iq_raw, name_i, is_signed, frac_bits = self._name_cache.get(idx, (DEBUG_OK, 0, '', False, 0))
if is_signed and (raw16 & 0x8000):
value_int = raw16 - 0x10000
else:
value_int = raw16
if self.chk_raw.isChecked():
scale = 1.0
else:
scale = self.iq_scaling.get(frac_bits, 1.0 / (1 << frac_bits))
scaled = float(value_int) / scale if frac_bits > 0 else float(value_int)
idx_list.append(idx); iq_list.append(iq_raw); name_list.append(name_i)
scaled_list.append(scaled); display_raw_list.append(value_int)
self._populate_table(idx_list, name_list, iq_list, display_raw_list, scaled_list)
if varqnt == 1:
self.edit_single_value.setText(str(display_raw_list[0]) if self.chk_raw.isChecked() else f"{scaled_list[0]:.6g}")
if idx_list[0] == self.spin_index.value():
_, iq_raw0, name0, is_signed0, frac0 = self._name_cache.get(idx_list[0], (DEBUG_OK, 0, '', False, 0))
self.lbl_name.setText(name0); self.lbl_iq.setText(f"{frac0}{'s' if is_signed0 else 'u'}")
self.valueRead.emit(idx_list[0], status, iq_list[0], display_raw_list[0], scaled_list[0])
else:
self.edit_single_value.setText("")
self.valuesRead.emit(base, varqnt, idx_list, iq_list, display_raw_list, scaled_list)
self._log(f"[DATA] base={base} q={varqnt} values={[f'{v:.6g}' for v in scaled_list] if not self.chk_raw.isChecked() else raw_vals}")
def _parse_lowlevel_frame(self, frame: bytes, success: bool):
payload_len = 9 if success else 6
crc_pos = payload_len
payload = frame[:payload_len]
crc_lo, crc_hi = frame[crc_pos], frame[crc_pos+1]
self._check_crc(payload, crc_lo, crc_hi)
status = payload[2]
addr2, addr1, addr0 = payload[3], payload[4], payload[5]
addr24 = (addr2 << 16) | (addr1 << 8) | addr0
self.ll_val_status.setText(f"0x{status:02X} ({'OK' if status == DEBUG_OK else 'ERR'})")
if not success:
self.ll_val_rettype.setText('-')
self.ll_val_raw.setText('-')
self.ll_val_scaled.setText('<ERROR>')
self.llValueRead.emit(addr24, status, 0, 0, float('nan'))
self._log(f"[LL] ERROR status=0x{status:02X} addr=0x{addr24:06X}")
return
return_type = payload[6]
data_hi, data_lo = payload[7], payload[8]
raw16 = (data_hi << 8) | data_lo
is_signed = (return_type & SIGN_BIT_MASK) != 0
frac_bits = return_type & FRAC_MASK_FULL
if is_signed and (raw16 & 0x8000):
value_int = raw16 - 0x10000
else:
value_int = raw16
if self.chk_ll_raw.isChecked():
scale = 1.0
else:
scale = self.iq_scaling.get(frac_bits, 1.0 / (1 << frac_bits)) # 1 / 2^N
scaled = float(value_int) / scale
# Обновляем UI
self.ll_val_rettype.setText(f"0x{return_type:02X} ({frac_bits}{'s' if is_signed else 'u'})")
self.ll_val_raw.setText(str(value_int))
self.ll_val_scaled.setText(f"{scaled:.6g}")
self.llValueRead.emit(addr24, status, return_type, value_int, scaled)
self._log(f"[LL] OK addr=0x{addr24:06X} type=0x{return_type:02X} raw={value_int} scaled={scaled:.6g}")
def _populate_table(self, idxs, names, iqs, raws, scaled):
"""
Быстрое массовое обновление таблицы значений.
- Не пересоздаём QTableWidgetItem при каждом вызове: обновляем текст.
- Блокируем сортировку, сигналы и обновления на время заполнения.
- Предвычисляем отображаемые строки (особенно формат scaled).
"""
tbl = self.tbl_values
n = len(idxs)
# Заморозка UI на время массового обновления
prev_sorting = tbl.isSortingEnabled()
tbl.setSortingEnabled(False)
tbl.blockSignals(True)
tbl.setUpdatesEnabled(False)
# Подготовка размера
if tbl.rowCount() != n:
tbl.setRowCount(n)
# Предварительно решаем: показывать сырые или масштабированные значения
show_raw = self.chk_raw.isChecked()
# Готовим строки (ускоряет при больших объёмах)
# str() заранее, чтобы не повторять в цикле
idx_strs = [str(v) for v in idxs]
raw_strs = [str(v) for v in raws]
scaled_strs = raw_strs if show_raw else [f"{v:.6g}" for v in scaled]
# Флаги необновляемых ячеек (только выбор/просмотр)
flags_ro = QtCore.Qt.ItemIsSelectable | QtCore.Qt.ItemIsEnabled
# Локальный шорткат для быстрой установки текста в ячейку
def _set_text(row, col, text):
item = tbl.item(row, col)
if item is None:
item = QtWidgets.QTableWidgetItem(text)
item.setFlags(flags_ro)
tbl.setItem(row, col, item)
else:
# обновим текст только при изменении (немного экономит на больших данных)
if item.text() != text:
item.setText(text)
if item.flags() != flags_ro:
item.setFlags(flags_ro)
# Основной цикл
for row in range(n):
iq_raw = iqs[row]
is_signed = (iq_raw & SIGN_BIT_MASK) != 0
frac_bits = iq_raw & FRAC_MASK_FULL
iq_disp = f"{frac_bits}{'s' if is_signed else 'u'}"
_set_text(row, 0, idx_strs[row])
_set_text(row, 1, names[row])
_set_text(row, 2, iq_disp)
_set_text(row, 3, raw_strs[row])
_set_text(row, 4, scaled_strs[row])
# Разморозка
tbl.blockSignals(False)
tbl.setUpdatesEnabled(True)
tbl.setSortingEnabled(prev_sorting)
tbl.viewport().update()
# ------------------------------ POLLING --------------------------------
def _toggle_polling(self):
if self._polling:
self._poll_timer.stop()
self._polling = False
self.btn_poll.setText("Start Polling")
self._log("[POLL] Stopped")
else:
interval = self.spin_interval.value()
self._poll_timer.start(interval)
self._polling = True
self.btn_poll.setText("Stop Polling")
self._log(f"[POLL] Started interval={interval}ms")
self._set_ui_busy(False) # Обновить доступность кнопок
def _on_poll_timeout(self):
self.request_values()
def _toggle_ll_polling(self):
"""Включает и выключает поллинг для LowLevel вкладки."""
if self._ll_polling:
self._ll_poll_timer.stop()
self._ll_polling = False
self.btn_ll_poll.setText("Start Polling")
self._log("[LL POLL] Stopped")
else:
if not self._ll_current_var_info:
self._log("[LL POLL] Cannot start: no variable selected.")
return
interval = self.spin_ll_interval.value()
self._ll_poll_timer.start(interval)
self._ll_polling = True
self.btn_ll_poll.setText("Stop Polling")
self._log(f"[LL POLL] Started interval={interval}ms")
self._set_ui_busy(False) # Обновить доступность кнопок
def _on_ll_poll_timeout(self):
"""Слот таймера поллинга для LowLevel."""
self.request_lowlevel_once()
def _on_ll_variable_prepared(self, var_info: dict):
"""Срабатывает при выборе переменной в селекторе."""
self._ll_current_var_info = var_info
self._log(f"[LL] Selected variable '{var_info['path']}' @ {var_info['address_hex']}")
# Сбрасываем старые значения
self.ll_val_status.setText("-")
self.ll_val_rettype.setText("-")
self.ll_val_raw.setText("-")
self.ll_val_scaled.setText("-")
# ------------------------------ HELPERS --------------------------------
def _toggle_index_base(self, st):
# ... (код без изменений)
val = self.spin_index.value()
if st == QtCore.Qt.Checked:
self.spin_index.setDisplayIntegerBase(16); self.spin_index.setPrefix("0x")
else:
self.spin_index.setDisplayIntegerBase(10); self.spin_index.setPrefix("")
self.spin_index.setValue(val)
def _set_ui_busy(self, busy: bool):
# Блокируем кнопки в зависимости от состояния 'busy' и 'polling'
# Watch tab
can_use_watch = not busy and not self._polling
self.btn_read_service.setEnabled(can_use_watch)
self.btn_read_values.setEnabled(can_use_watch)
# LowLevel tab
can_use_ll = not busy and not self._ll_polling
self.btn_ll_read.setEnabled(can_use_ll)
def _on_serial_error(self, err):
# ... (код без изменений)
if err == QtSerialPort.QSerialPort.NoError: return
self._log(f"[SERIAL ERR] {self.serial.errorString()} ({err})")
if self._busy: self._end_txn()
# ------------------------------ LOGGING --------------------------------
def _log(self, msg: str):
# ... (код без изменений)
if not self.log_spoiler.getState():
return
ts = datetime.datetime.now().strftime('%H:%M:%S.%f')[:-3]
self.txt_log.append(f"{ts} {msg}")
def _log_frame(self, data: bytes, *, tx: bool):
# ... (код без изменений)
if not self.log_spoiler.getState():
return
tag = 'TX' if tx else 'RX'
hexs = ' '.join(f"{b:02X}" for b in data)
ascii_part = ''.join(chr(b) if 32 <= b < 127 else '.' for b in data)
self._log(f"[{tag}] {hexs} |{ascii_part}|")
# ---------------------------------------------------------- Demo harness ---
class _DemoWindow(QtWidgets.QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle("DebugVar Terminal")
self.term = DebugTerminalWidget(self)
self.setCentralWidget(self.term)
self.term.nameRead.connect(self._on_name)
self.term.valueRead.connect(self._on_value)
self.term.llValueRead.connect(self._on_ll_value)
def _on_name(self, index, status, iq, name):
return
print(f"Name idx={index} status={status} iq={iq} name='{name}'")
def _on_value(self, index, status, iq, raw16, floatVal):
return
print(f"Value idx={index} status={status} iq={iq} raw={raw16} val={floatVal}")
def _on_ll_value(self, addr, status, rettype_raw, raw16, scaled):
return
print(f"LL addr=0x{addr:06X} status={status} type=0x{rettype_raw:02X} raw={raw16} scaled={scaled}")
def format_address(addr_text: str) -> str:
try:
value = int(addr_text, 16)
except ValueError:
value = 0
return f"0x{value:06X}"
def closeEvent(self, event):
self.setCentralWidget(None)
if self.term:
self.term.deleteLater(); self.term = None
super().closeEvent(event)
# ------------------------------- Demo --------------------------------------
if __name__ == "__main__":
import sys
app = QtWidgets.QApplication(sys.argv)
win = _DemoWindow(); win.show()
sys.exit(app.exec_())