debugVarTool/Src/tms_debugvar_term.py
Razvalyaev 96496a0256 все неплохо работает.
сейв перед попыткой улучшить lowlevel debug
2025-07-21 13:40:52 +03:00

1138 lines
47 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from PySide2 import QtCore, QtWidgets, QtSerialPort
from tms_debugvar_lowlevel import LowLevelSelectorWidget
import datetime
import time
# ------------------------------- Константы протокола ------------------------
WATCH_SERVICE_BIT = 0x8000
DEBUG_OK = 0 # ожидаемый код успешного чтения
SIGN_BIT_MASK = 0x80
FRAC_MASK_FULL = 0x7F # если используем 7 бит дробной части
# --- Debug status codes (из прошивки) ---
DEBUG_OK = 0x00
DEBUG_ERR = 0x80 # общий флаг ошибки (старший бит)
DEBUG_ERR_VAR_NUMB = DEBUG_ERR | (1 << 0)
DEBUG_ERR_INVALID_VAR = DEBUG_ERR | (1 << 1)
DEBUG_ERR_ADDR = DEBUG_ERR | (1 << 2)
DEBUG_ERR_ADDR_ALIGN = DEBUG_ERR | (1 << 3)
DEBUG_ERR_INTERNAL = DEBUG_ERR | (1 << 4)
DEBUG_ERR_DATATIME = DEBUG_ERR | (1 << 5)
DEBUG_ERR_RS = DEBUG_ERR | (1 << 5)
# для декодирования по битам
_DEBUG_ERR_BITS = (
(1 << 0, "Invalid Variable Index"),
(1 << 1, "Invalid Variable"),
(1 << 2, "Invalid Address"),
(1 << 3, "Invalid Address Align"),
(1 << 4, "Internal Code Error"),
(1 << 5, "Invalid Data or Time"),
(1 << 6, "Error with RS"),
)
# ---------------------------------------------------------------- CRC util ---
def crc16_ibm(data: bytes, *, init=0xFFFF) -> int:
"""CRC16-IBM (aka CRC-16/ANSI, polynomial 0xA001 reflected)."""
crc = init
for b in data:
crc ^= b
for _ in range(8):
if crc & 1:
crc = (crc >> 1) ^ 0xA001
else:
crc >>= 1
return crc & 0xFFFF
def _decode_debug_status(status: int) -> str:
"""Преобразует код статуса прошивки в строку.
Возвращает 'OK' или перечисление битов через '|'.
Не зависит от того, WATCH или LowLevel.
"""
if status == DEBUG_OK:
return "OK"
parts = []
if status & DEBUG_ERR:
for mask, name in _DEBUG_ERR_BITS:
if status & mask:
parts.append(name)
if not parts: # старший бит есть, но ни один из известных младших не выставлен
parts.append("ERR")
else:
# Неожиданно: статус !=0, но бит DEBUG_ERR не стоит
parts.append(f"0x{status:02X}")
return "|".join(parts)
class Spoiler(QtWidgets.QWidget):
def __init__(self, title="", animationDuration=300, parent=None):
super().__init__(parent)
self._animationDuration = animationDuration
self.state = False
# --- Toggle button ---
self.toggleButton = QtWidgets.QToolButton(self)
self.toggleButton.setStyleSheet("QToolButton { border: none; }")
self.toggleButton.setToolButtonStyle(QtCore.Qt.ToolButtonTextBesideIcon)
self.toggleButton.setArrowType(QtCore.Qt.RightArrow)
self.toggleButton.setText(title)
self.toggleButton.setCheckable(True)
# --- Header line ---
self.headerLine = QtWidgets.QFrame(self)
self.headerLine.setFrameShape(QtWidgets.QFrame.HLine)
self.headerLine.setFrameShadow(QtWidgets.QFrame.Sunken)
# --- Content area ---
self.contentArea = QtWidgets.QScrollArea(self)
self.contentArea.setSizePolicy(QtWidgets.QSizePolicy.Expanding, QtWidgets.QSizePolicy.Fixed)
self.contentArea.setFrameShape(QtWidgets.QFrame.NoFrame)
self.contentArea.setWidgetResizable(True)
self._contentWidget = QtWidgets.QWidget()
self.contentArea.setWidget(self._contentWidget)
self.contentArea.setMaximumHeight(0)
# --- Анимация только по контенту ---
self._ani_content = QtCore.QPropertyAnimation(self.contentArea, b"maximumHeight")
self._ani_content.setDuration(animationDuration)
self._ani_content.setEasingCurve(QtCore.QEasingCurve.InOutCubic)
# Следим за шагами анимации → обновляем родителя
self._ani_content.valueChanged.connect(self._adjust_parent_size)
# --- Layout ---
self.mainLayout = QtWidgets.QGridLayout(self)
self.mainLayout.setVerticalSpacing(0)
self.mainLayout.setContentsMargins(0, 0, 0, 0)
self.mainLayout.addWidget(self.toggleButton, 0, 0, 1, 1)
self.mainLayout.addWidget(self.headerLine, 0, 1, 1, 1)
self.mainLayout.addWidget(self.contentArea, 1, 0, 1, 2)
# --- Signals ---
self.toggleButton.clicked.connect(self._on_toggled)
def setContentLayout(self, contentLayout):
old = self._contentWidget.layout()
if old:
QtWidgets.QWidget().setLayout(old)
self._contentWidget.setLayout(contentLayout)
def getState(self):
return self.state
def _adjust_parent_size(self, *_):
top = self.window()
if top:
size = top.size()
size.setHeight(top.sizeHint().height()) # берём новую высоту
top.resize(size) # ширина остаётся прежней
def _on_toggled(self, checked: bool):
self.state = checked
self.toggleButton.setArrowType(QtCore.Qt.DownArrow if checked else QtCore.Qt.RightArrow)
contentHeight = self._contentWidget.sizeHint().height()
self._ani_content.stop()
self._ani_content.setStartValue(self.contentArea.maximumHeight())
self._ani_content.setEndValue(contentHeight if checked else 0)
# --- Фиксируем ширину на время анимации ---
w = self.width()
self.setFixedWidth(w)
self._ani_content.finished.connect(lambda: self.setMaximumWidth(16777215)) # сброс фикса
self._ani_content.start()
# --------------------------- DebugTerminalWidget ---------------------------
class DebugTerminalWidget(QtWidgets.QWidget):
# Существующие сигналы (Watch)
nameRead = QtCore.Signal(int, int, int, str)
valueRead = QtCore.Signal(int, int, int, int, float)
valuesRead = QtCore.Signal(int, int, list, list, list, list)
# Новые сигналы (LowLevel)
llValueRead = QtCore.Signal(int, int, int, int, float) # addr, status, rettype_raw, raw16_signed, scaled
portOpened = QtCore.Signal(str)
portClosed = QtCore.Signal(str)
txBytes = QtCore.Signal(bytes)
rxBytes = QtCore.Signal(bytes)
def __init__(self, parent=None, *,
start_byte=0x0A,
cmd_byte=0x46,
cmd_lowlevel=0x47,
iq_scaling=None,
read_timeout_ms=250,
auto_crc_check=True,
drop_if_busy=False,
replace_if_busy=True):
super().__init__(parent)
self.device_addr = start_byte
self.cmd_byte = cmd_byte
self.cmd_lowlevel = cmd_lowlevel
self.read_timeout_ms = read_timeout_ms
self.auto_crc_check = auto_crc_check
self._drop_if_busy = drop_if_busy
self._replace_if_busy = replace_if_busy
self._last_txn_timestamp = 0
if iq_scaling is None:
iq_scaling = {n: float(1 << n) for n in range(31)}
iq_scaling[0] = 1.0
self.iq_scaling = iq_scaling
# Serial
self.serial = QtSerialPort.QSerialPort(self)
self.serial.setBaudRate(115200)
self.serial.readyRead.connect(self._on_ready_read)
self.serial.errorOccurred.connect(self._on_serial_error)
# State
self._rx_buf = bytearray()
self._busy = False
self._pending_cmd = None # (frame, meta)
self._txn_meta = None # {'service':bool,'index':int,'varqnt':int,'chain':...,'lowlevel':bool}
self._txn_timer = QtCore.QTimer(self)
self._txn_timer.setSingleShot(True)
self._txn_timer.timeout.connect(self._on_txn_timeout)
# Watch polling
self._poll_timer = QtCore.QTimer(self)
self._poll_timer.timeout.connect(self._on_poll_timeout)
self._polling = False
# LowLevel polling
self._ll_poll_timer = QtCore.QTimer(self)
self._ll_poll_timer.timeout.connect(self._on_ll_poll_timeout)
self._ll_polling = False
self._ll_current_var_info = None # Хранит инфо о выбранной LL переменной
# Кэш: index -> (status, iq, name, is_signed, frac_bits)
self._name_cache = {}
# Очередь service индексов
self._service_queue = []
self._pending_data_after_services = None # (base, count)
self._build_ui()
self._connect_ui()
self.set_available_ports()
# ------------------------------ UI ----------------------------------
def _build_ui(self):
layout = QtWidgets.QVBoxLayout(self)
# --- Serial group ---
g_serial = QtWidgets.QGroupBox("Serial Port")
hs = QtWidgets.QHBoxLayout(g_serial)
self.cmb_port = QtWidgets.QComboBox()
self.btn_refresh = QtWidgets.QPushButton("Refresh")
self.cmb_baud = QtWidgets.QComboBox()
self.cmb_baud.addItems(["9600","19200","38400","57600","115200","230400"])
self.cmb_baud.setCurrentText("115200")
self.btn_open = QtWidgets.QPushButton("Open")
hs.addWidget(QtWidgets.QLabel("Port:"))
hs.addWidget(self.cmb_port, 1)
hs.addWidget(self.btn_refresh)
hs.addSpacing(10)
hs.addWidget(QtWidgets.QLabel("Baud:"))
hs.addWidget(self.cmb_baud)
hs.addWidget(self.btn_open)
# --- TabWidget ---
self.tabs = QtWidgets.QTabWidget()
self._build_watch_tab()
self._build_lowlevel_tab() # <-- Вызываем новый метод
g_control = QtWidgets.QGroupBox("Control / Status")
control_layout = QtWidgets.QHBoxLayout(g_control) # Используем QHBoxLayout
# Форма для статусов слева
form_control = QtWidgets.QFormLayout()
self.lbl_status = QtWidgets.QLabel("Idle")
self.lbl_status.setStyleSheet("font-weight: bold; color: grey;")
form_control.addRow("Status:", self.lbl_status)
self.lbl_actual_interval = QtWidgets.QLabel("-")
form_control.addRow("Actual Interval:", self.lbl_actual_interval)
control_layout.addLayout(form_control, 1) # Растягиваем форму
# Галочка Raw справа
self.chk_raw = QtWidgets.QCheckBox("Raw (no IQ scaling)")
control_layout.addWidget(self.chk_raw)
# --- UART Log ---
self.log_spoiler = Spoiler("UART Log", animationDuration=300, parent=self)
self.log_spoiler.setSizePolicy(QtWidgets.QSizePolicy.Expanding,
QtWidgets.QSizePolicy.Minimum)
log_layout = QtWidgets.QVBoxLayout()
self.txt_log = QtWidgets.QTextEdit(); self.txt_log.setReadOnly(True)
self.txt_log.setFontFamily("Courier")
log_layout.addWidget(self.txt_log)
self.log_spoiler.setContentLayout(log_layout)
layout.addWidget(g_serial)
layout.addWidget(self.tabs, 1)
layout.addWidget(g_control)
layout.addWidget(self.log_spoiler)
layout.setStretch(layout.indexOf(g_serial), 0)
layout.setStretch(layout.indexOf(self.tabs), 1)
def _build_watch_tab(self):
tab = QtWidgets.QWidget()
main_layout = QtWidgets.QVBoxLayout(tab)
# --- Variable Selector ---
g_selector = QtWidgets.QGroupBox("Variable Selector")
selector_layout = QtWidgets.QVBoxLayout(g_selector)
form_selector = QtWidgets.QFormLayout()
h_layout = QtWidgets.QHBoxLayout()
self.spin_index = QtWidgets.QSpinBox()
self.spin_index.setRange(0, 0x7FFF)
self.spin_index.setAccelerated(True)
self.chk_hex_index = QtWidgets.QCheckBox("Hex")
self.spin_count = QtWidgets.QSpinBox()
self.spin_count.setRange(1, 255)
self.spin_count.setValue(1)
# Первая группа: Base Index + spin + checkbox
base_index_layout = QtWidgets.QHBoxLayout()
base_index_label = QtWidgets.QLabel("Base Index")
base_index_layout.addWidget(base_index_label)
base_index_layout.addWidget(self.spin_index)
base_index_layout.addWidget(self.chk_hex_index)
base_index_layout.setSpacing(5)
# Вторая группа: spin_count + метка справа
count_layout = QtWidgets.QHBoxLayout()
count_layout.setSpacing(2) # минимальный отступ
count_layout.addWidget(self.spin_count)
count_label = QtWidgets.QLabel("Cnt")
count_layout.addWidget(count_label)
# Добавляем обе группы в общий горизонтальный лэйаут
h_layout.addLayout(base_index_layout)
h_layout.addSpacing(20)
h_layout.addLayout(count_layout)
form_selector.addRow(h_layout)
self.spin_interval = QtWidgets.QSpinBox()
self.spin_interval.setRange(50, 10000)
self.spin_interval.setValue(500)
self.spin_interval.setSuffix(" ms")
form_selector.addRow("Interval:", self.spin_interval)
selector_layout.addLayout(form_selector)
btn_layout = QtWidgets.QHBoxLayout()
self.btn_update_service = QtWidgets.QPushButton("Update Service")
self.btn_read_values = QtWidgets.QPushButton("Read Value(s)")
self.btn_poll = QtWidgets.QPushButton("Start Polling")
btn_layout.addWidget(self.btn_update_service)
btn_layout.addWidget(self.btn_read_values)
btn_layout.addWidget(self.btn_poll)
selector_layout.addLayout(btn_layout)
# --- Table ---
g_table = QtWidgets.QGroupBox("Table")
table_layout = QtWidgets.QVBoxLayout(g_table)
self.tbl_values = QtWidgets.QTableWidget(0, 5)
self.tbl_values.setHorizontalHeaderLabels(["Index", "Name", "IQ", "Raw", "Scaled"])
hh = self.tbl_values.horizontalHeader()
for i in range(4):
hh.setSectionResizeMode(i, QtWidgets.QHeaderView.ResizeToContents)
hh.setSectionResizeMode(4, QtWidgets.QHeaderView.Stretch)
self.tbl_values.verticalHeader().setVisible(False)
table_layout.addWidget(self.tbl_values)
# --- Вертикальный сплиттер ---
v_split = QtWidgets.QSplitter(QtCore.Qt.Vertical)
v_split.addWidget(g_selector)
v_split.addWidget(g_table)
v_split.setStretchFactor(0, 1)
v_split.setStretchFactor(1, 3)
v_split.setStretchFactor(2, 1)
main_layout.addWidget(v_split)
self.tabs.addTab(tab, "Watch")
table_layout.addWidget(self.tbl_values)
def _build_lowlevel_tab(self):
tab = QtWidgets.QWidget()
main_layout = QtWidgets.QVBoxLayout(tab)
# --- GroupBox для селектора переменной ---
group_var_selector = QtWidgets.QGroupBox("Variable Selector", tab)
var_selector_layout = QtWidgets.QVBoxLayout(group_var_selector)
self.ll_selector = LowLevelSelectorWidget(group_var_selector)
var_selector_layout.addWidget(self.ll_selector)
# --- GroupBox для панели управления чтением ---
group_read_controls = QtWidgets.QGroupBox("Read Selected Variable", tab)
grid = QtWidgets.QGridLayout(group_read_controls)
self.btn_ll_read = QtWidgets.QPushButton("Read Once")
self.btn_ll_poll = QtWidgets.QPushButton("Start Polling")
self.spin_ll_interval = QtWidgets.QSpinBox()
self.spin_ll_interval.setRange(50, 10000)
self.spin_ll_interval.setValue(500)
self.spin_ll_interval.setSuffix(" ms")
# Поля для отображения результата
self.ll_val_status = QtWidgets.QLabel("-")
self.ll_val_rettype = QtWidgets.QLabel("-")
self.ll_val_scaled = QtWidgets.QLabel("-")
# Размещение виджетов в grid
grid.addWidget(self.btn_ll_read, 0, 0)
grid.addWidget(self.btn_ll_poll, 0, 1)
grid.addWidget(QtWidgets.QLabel("Interval:"), 1, 0)
grid.addWidget(self.spin_ll_interval, 1, 1)
# Форма для результатов
form_layout = QtWidgets.QFormLayout()
form_layout.addRow("Status:", self.ll_val_status)
form_layout.addRow("Return Type:", self.ll_val_rettype)
form_layout.addRow("Scaled Value:", self.ll_val_scaled)
# Поле Raw Value убрано
grid.addLayout(form_layout, 2, 0, 1, 2) # Растягиваем на 2 колонки
grid.setColumnStretch(1, 1)
# Собираем layout вкладки
v_split = QtWidgets.QSplitter(QtCore.Qt.Vertical, tab)
v_split.addWidget(group_var_selector)
v_split.addWidget(group_read_controls)
v_split.setStretchFactor(0, 1) # Селектор растягивается
v_split.setStretchFactor(1, 0) # Панель чтения - нет
main_layout.addWidget(v_split)
self.tabs.addTab(tab, "LowLevel")
def _connect_ui(self):
# Watch
self.btn_refresh.clicked.connect(self.set_available_ports)
self.btn_open.clicked.connect(self._open_close_port)
self.btn_update_service.clicked.connect(self.request_service_update_for_table)
self.btn_read_values.clicked.connect(self.request_values)
self.btn_poll.clicked.connect(self._toggle_polling)
self.chk_hex_index.stateChanged.connect(self._toggle_index_base)
# LowLevel (новые и переделанные)
self.ll_selector.variablePrepared.connect(self._on_ll_variable_prepared)
self.ll_selector.xmlLoaded.connect(lambda p: self._log(f"[LL] XML loaded: {p}"))
self.btn_ll_read.clicked.connect(self.request_lowlevel_once)
self.btn_ll_poll.clicked.connect(self._toggle_ll_polling)
def set_status(self, text: str, mode: str = "idle"):
colors = {
"idle": "gray",
"service": "blue",
"values": "green",
"error": "red"
}
color = colors.get(mode.lower(), "black")
self.lbl_status.setText(text)
self.lbl_status.setStyleSheet(f"font-weight: bold; color: {color};")
# ----------------------------- SERIAL MGMT ----------------------------
def set_available_ports(self):
cur = self.cmb_port.currentText()
self.cmb_port.blockSignals(True)
self.cmb_port.clear()
for info in QtSerialPort.QSerialPortInfo.availablePorts():
self.cmb_port.addItem(info.portName())
if cur:
ix = self.cmb_port.findText(cur)
if ix >= 0:
self.cmb_port.setCurrentIndex(ix)
self.cmb_port.blockSignals(False)
def _open_close_port(self):
if self.serial.isOpen():
name = self.serial.portName()
self.serial.close()
self.btn_open.setText("Open")
self._log(f"[PORT] Closed {name}")
self.portClosed.emit(name)
return
port = self.cmb_port.currentText()
if not port:
self._log("[ERR] No port selected")
return
self.serial.setPortName(port)
self.serial.setBaudRate(int(self.cmb_baud.currentText()))
if not self.serial.open(QtCore.QIODevice.ReadWrite):
self._log(f"[ERR] Open fail {port}: {self.serial.errorString()}")
return
self.btn_open.setText("Close")
self._log(f"[PORT] Opened {port}")
self.portOpened.emit(port)
# ---------------------------- FRAME BUILD -----------------------------
def _build_request(self, index: int, *, service: bool, varqnt: int) -> bytes:
dbg = index & 0x7FFF
if service:
dbg |= WATCH_SERVICE_BIT
hi = (dbg >> 8) & 0xFF
lo = dbg & 0xFF
q = varqnt & 0xFF
payload = bytes([self.device_addr & 0xFF, self.cmd_byte & 0xFF, hi, lo, q])
crc = crc16_ibm(payload)
return payload + bytes([crc & 0xFF, (crc >> 8) & 0xFF])
def _build_lowlevel_request(self, var_info: dict) -> bytes:
# Формат: [adr][cmd_lowlevel][year_hi][year_lo][month][day][hour][minute][addr2][addr1][addr0][pt_type][iq_type][return_type]
# Пытаемся получить время из переданной информации
dt_info = var_info.get('datetime')
if dt_info:
# Используем время из var_info
year = dt_info.get('year', 2000)
month = dt_info.get('month', 1)
day = dt_info.get('day', 1)
hour = dt_info.get('hour', 0)
minute = dt_info.get('minute', 0)
self._log("[LL] Using time from selector.")
else:
# Если в var_info времени нет, используем текущее системное время (старое поведение)
now = QtCore.QDateTime.currentDateTime()
year = now.date().year()
month = now.date().month()
day = now.date().day()
hour = now.time().hour()
minute = now.time().minute()
self._log("[LL] Fallback to current system time.")
addr = var_info.get('address', 0)
addr2 = (addr >> 16) & 0xFF
addr1 = (addr >> 8) & 0xFF
addr0 = addr & 0xFF
pt_type = var_info.get('ptr_type', 0) & 0xFF
iq_type = var_info.get('iq_type', 0) & 0xFF
ret_type = var_info.get('return_type', 0) & 0xFF
frame_wo_crc = bytes([
self.device_addr & 0xFF, self.cmd_lowlevel & 0xFF,
(year >> 8) & 0xFF, year & 0xFF,
month & 0xFF, day & 0xFF, hour & 0xFF, minute & 0xFF,
addr2, addr1, addr0, pt_type, iq_type, ret_type
])
crc = crc16_ibm(frame_wo_crc)
return frame_wo_crc + bytes([crc & 0xFF, (crc >> 8) & 0xFF])
# ----------------------------- PUBLIC API -----------------------------
def request_service_single(self):
idx = int(self.spin_index.value())
self._enqueue_or_start(idx, service=True, varqnt=0)
def request_service_update_for_table(self):
"""
Очищает кеш имен/типов для всех видимых в таблице переменных
и инициирует их повторное чтение.
"""
indices_to_update = []
for row in range(self.tbl_values.rowCount()):
item = self.tbl_values.item(row, 0)
if item and item.text().isdigit():
indices_to_update.append(int(item.text()))
if not indices_to_update:
self._log("[SERVICE] No variables in table to update.")
return
self._log(f"[SERVICE] Queuing name/type update for {len(indices_to_update)} variables.")
# Очищаем кеш для этих индексов, чтобы принудительно их перечитать
for index in indices_to_update:
if index in self._name_cache:
del self._name_cache[index]
# Запускаем стандартный запрос значений. Он автоматически обработает
# отсутствующую сервисную информацию (имена/типы) перед запросом данных.
#self.request_values()
def request_values(self):
base = int(self.spin_index.value())
count = int(self.spin_count.value())
needed = []
for i in range(base, base+count):
if i not in self._name_cache:
needed.append(i)
if needed:
self._service_queue = needed[:]
self._pending_data_after_services = (base, count)
self._log(f"[AUTO] Need service for {len(needed)} indices: {needed}")
self.set_status("read service...", "service")
self._kick_service_queue()
else:
self.set_status("read values...", "values")
self._enqueue_or_start(base, service=False, varqnt=count)
def request_lowlevel_once(self):
"""Запрашивает чтение выбранной LowLevel переменной."""
if not self.serial.isOpen():
self._log("[LL] Port is not open.")
return
if self._busy:
self._log("[LL] Busy, request dropped.")
return
if not self._ll_current_var_info:
self._log("[LL] No variable selected!")
if self._ll_polling: # Если поллинг активен, но переменная пропала - стоп
self._toggle_ll_polling()
return
frame = self._build_lowlevel_request(self._ll_current_var_info)
meta = {'lowlevel': True}
self.set_status("read lowlevel...", "values")
self._enqueue_raw(frame, meta)
# -------------------------- SERVICE QUEUE FLOW ------------------------
# ... (код без изменений)
def _kick_service_queue(self):
if self._busy:
return
if self._service_queue:
nxt = self._service_queue.pop(0)
self._enqueue_or_start(nxt, service=True, varqnt=0, queue_mode=True)
elif self._pending_data_after_services:
base, count = self._pending_data_after_services
self._pending_data_after_services = None
self._enqueue_or_start(base, service=False, varqnt=count)
# ------------------------ TRANSACTION SCHEDULER -----------------------
# ... (код без изменений)
def _enqueue_raw(self, frame: bytes, meta: dict):
if self._busy:
if self._drop_if_busy and not self._replace_if_busy:
self._log("[LOCKSTEP] Busy -> drop")
return
if self._replace_if_busy:
self._pending_cmd = (frame, meta)
self._log("[LOCKSTEP] Busy -> replaced pending")
else:
self._log("[LOCKSTEP] Busy -> ignore")
return
self._start_txn(frame, meta)
def _enqueue_or_start(self, index, service: bool, varqnt: int, chain_after=None, queue_mode=False):
frame = self._build_request(index, service=service, varqnt=varqnt)
meta = {'service': service, 'index': index, 'varqnt': varqnt, 'chain': chain_after, 'queue_mode': queue_mode, 'lowlevel': False}
if self._busy:
if self._drop_if_busy and not self._replace_if_busy:
self._log("[LOCKSTEP] Busy -> drop")
return
if self._replace_if_busy:
self._pending_cmd = (frame, meta)
self._log("[LOCKSTEP] Busy -> replaced pending")
else:
self._log("[LOCKSTEP] Busy -> ignore")
return
self._start_txn(frame, meta)
def _start_txn(self, frame: bytes, meta: dict):
now = time.perf_counter()
if self._last_txn_timestamp is not None:
delta_ms = (now - self._last_txn_timestamp) * 1000
# Обновляем UI только если он уже создан
if hasattr(self, 'lbl_actual_interval'):
self.lbl_actual_interval.setText(f"{delta_ms:.1f} ms")
self._last_txn_timestamp = now
self._busy = True
self._txn_meta = meta
self._rx_buf.clear()
self._set_ui_busy(True)
self._send(frame)
self._txn_timer.start(self.read_timeout_ms)
def _end_txn(self):
self._txn_timer.stop()
queue_mode = False
chain = None
meta = self._txn_meta
if meta:
queue_mode = meta.get('queue_mode', False)
chain = meta.get('chain')
self._txn_meta = None
self._busy = False
self._rx_buf.clear()
self._set_ui_busy(False)
if chain:
base, serv, q = chain
self._enqueue_or_start(base, service=serv, varqnt=q)
return
if self._pending_cmd is not None:
frame, meta = self._pending_cmd; self._pending_cmd = None
QtCore.QTimer.singleShot(0, lambda f=frame,m=meta: self._start_txn(f,m))
return
if queue_mode:
QtCore.QTimer.singleShot(0, self._kick_service_queue)
return
def _on_txn_timeout(self):
if not self._busy: return
is_ll = self._txn_meta.get('lowlevel', False) if self._txn_meta else False
log_prefix = "[LL TIMEOUT]" if is_ll else "[TIMEOUT]"
self._log(f"{log_prefix} No response")
if self._rx_buf:
self._log_frame(bytes(self._rx_buf), tx=False)
self._end_txn()
# ------------------------------- TX/RX ---------------------------------
# ... (код без изменений)
def _send(self, data: bytes):
w = self.serial.write(data)
if w != len(data):
self._log(f"[ERR] Write short {w}/{len(data)}")
self.txBytes.emit(data)
self._log_frame(data, tx=True)
def _on_ready_read(self):
self._rx_buf.extend(self.serial.readAll().data())
if not self._busy:
if self._rx_buf:
self._log("[WARN] Data while idle -> drop")
self._log_frame(bytes(self._rx_buf), tx=False)
self._rx_buf.clear()
return
self._try_parse()
# ------------------------------- PARSING -------------------------------
def _try_parse(self):
if not self._txn_meta:
return
self.set_status("IDLE", "idle")
if self._txn_meta.get('lowlevel', False):
self._try_parse_lowlevel()
else:
self._try_parse_watch()
def _try_parse_watch(self):
# ... (код без изменений)
service = self._txn_meta['service']
buf = self._rx_buf
trailer_len = 4
if service:
if len(buf) < 7 + trailer_len:
return
name_len = buf[6]
expected = 7 + name_len + trailer_len
if len(buf) < expected:
return
frame = bytes(buf[:expected]); del buf[:expected]
self.rxBytes.emit(frame); self._log_frame(frame, tx=False)
self._parse_service_frame(frame)
self._end_txn()
else:
if len(buf) < 6 + trailer_len:
return
varqnt = buf[4]; status = buf[5]
if status != DEBUG_OK:
expected = 8 + trailer_len
if len(buf) < expected: return
frame = bytes(buf[:expected]); del buf[:expected]
self.rxBytes.emit(frame); self._log_frame(frame, tx=False)
self._parse_data_frame(frame, error_mode=True)
self._end_txn()
else:
expected = 6 + varqnt*2 + trailer_len
if len(buf) < expected: return
frame = bytes(buf[:expected]); del buf[:expected]
self.rxBytes.emit(frame); self._log_frame(frame, tx=False)
self._parse_data_frame(frame, error_mode=False)
self._end_txn()
def _try_parse_lowlevel(self):
# Ожидаемая длина: Успех=13, Ошибка=10
buf = self._rx_buf
if len(buf) < 10: # Минимальная длина (ошибка)
return
# Проверяем, что ответ для нас
if buf[1] != self.cmd_lowlevel:
self._log("[LL] Unexpected cmd in lowlevel parser, flushing.")
self._log_frame(bytes(self._rx_buf), tx=False)
self._rx_buf.clear()
# Не завершаем транзакцию, ждём таймаута
return
status = buf[2]
expected_len = 13 if status == DEBUG_OK else 10
if len(buf) >= expected_len:
frame = bytes(buf[:expected_len])
del buf[:expected_len]
self.rxBytes.emit(frame)
self._log_frame(frame, tx=False)
self._parse_lowlevel_frame(frame, success=(status == DEBUG_OK))
self._end_txn()
def _check_crc(self, payload: bytes, crc_lo: int, crc_hi: int):
if not self.auto_crc_check:
return True
crc_rx = (crc_hi << 8) | crc_lo
crc_calc = crc16_ibm(payload)
if crc_calc != crc_rx:
self._log(f"[CRC FAIL] calc=0x{crc_calc:04X} rx=0x{crc_rx:04X}")
return False
self._log("[CRC OK]")
return True
@staticmethod
def _clear_service_bit(vhi, vlo):
return ((vhi & 0x7F) << 8) | vlo
def _parse_service_frame(self, frame: bytes):
# ... (код без изменений)
payload = frame[:-4]; crc_lo, crc_hi = frame[-4], frame[-3]
if len(payload) < 7:
self._log("[ERR] Service frame too short"); return
self._check_crc(payload, crc_lo, crc_hi)
adr, cmd, vhi, vlo, status, iq_raw, name_len = payload[:7]
status_desc = _decode_debug_status(status)
index = self._clear_service_bit(vhi, vlo)
if len(payload) < 7 + name_len:
self._log("[ERR] Service name truncated"); return
name_bytes = payload[7:7+name_len]; name = name_bytes.decode(errors='replace')
is_signed = (iq_raw & SIGN_BIT_MASK) != 0
frac_bits = iq_raw & FRAC_MASK_FULL
if status == DEBUG_OK:
self._name_cache[index] = (status, iq_raw, name, is_signed, frac_bits)
self.nameRead.emit(index, status, iq_raw, name)
self._log(f"[SERVICE] idx={index} status={status} iq_raw=0x{iq_raw:02X} sign={'S' if is_signed else 'U'} frac={frac_bits} name='{name}'")
def _parse_data_frame(self, frame: bytes, *, error_mode: bool):
# ... (код без изменений)
payload = frame[:-4]; crc_lo, crc_hi = frame[-4], frame[-3]
if len(payload) < 6:
self._log("[ERR] Data frame too short"); return
self._check_crc(payload, crc_lo, crc_hi)
adr, cmd, vhi, vlo, varqnt, status = payload[:6]
base = self._clear_service_bit(vhi, vlo)
if error_mode:
self.set_status("error", "error")
if len(payload) < 8:
self._log("[ERR] Error frame truncated"); return
err_hi, err_lo = payload[6:8]
bad_index = (err_hi << 8) | err_lo
desc = _decode_debug_status(status)
self._log(f"[DATA] ERROR status=0x{status:02X} ({desc}) bad_index={bad_index}")
# Обновим UI
self._populate_watch_error(bad_index, status)
# Сигналы (оставляем совместимость)
self.valueRead.emit(bad_index, status, 0, 0, float('nan'))
self.valuesRead.emit(base, 0, [], [], [], [])
return
if len(payload) < 6 + varqnt*2:
self._log("[ERR] Data payload truncated"); return
raw_vals = []
pos = 6
for _ in range(varqnt):
hi = payload[pos]; lo = payload[pos+1]; pos += 2
raw16 = (hi << 8) | lo
raw_vals.append(raw16)
idx_list = []; iq_list = []; name_list = []; scaled_list = []; display_raw_list = []
for ofs, raw16 in enumerate(raw_vals):
idx = base + ofs
status_i, iq_raw, name_i, is_signed, frac_bits = self._name_cache.get(idx, (DEBUG_OK, 0, '', False, 0))
if is_signed and (raw16 & 0x8000):
value_int = raw16 - 0x10000
else:
value_int = raw16
if self.chk_raw.isChecked():
scale = 1.0
else:
scale = self.iq_scaling.get(frac_bits, 1.0 / (1 << frac_bits))
scaled = float(value_int) / scale if frac_bits > 0 else float(value_int)
idx_list.append(idx); iq_list.append(iq_raw); name_list.append(name_i)
scaled_list.append(scaled); display_raw_list.append(value_int)
self._populate_table(idx_list, name_list, iq_list, display_raw_list, scaled_list)
if varqnt == 1:
if idx_list[0] == self.spin_index.value():
_, iq_raw0, name0, is_signed0, frac0 = self._name_cache.get(idx_list[0], (DEBUG_OK, 0, '', False, 0))
self.valueRead.emit(idx_list[0], status, iq_list[0], display_raw_list[0], scaled_list[0])
else:
self.valuesRead.emit(base, varqnt, idx_list, iq_list, display_raw_list, scaled_list)
self._log(f"[DATA] base={base} q={varqnt} values={[f'{v:.6g}' for v in scaled_list] if not self.chk_raw.isChecked() else raw_vals}")
def _parse_lowlevel_frame(self, frame: bytes, success: bool):
payload_len = 9 if success else 6
crc_pos = payload_len
payload = frame[:payload_len]
crc_lo, crc_hi = frame[crc_pos], frame[crc_pos+1]
self._check_crc(payload, crc_lo, crc_hi)
status = payload[2]
addr2, addr1, addr0 = payload[3], payload[4], payload[5]
addr24 = (addr2 << 16) | (addr1 << 8) | addr0
status_desc = _decode_debug_status(status)
self.ll_val_status.setText(f"0x{status:02X} ({status_desc})")
if not success:
self.ll_val_rettype.setText('-')
self.ll_val_scaled.setText(f"<ERROR:{status_desc}>")
self._log(f"[LL] ERROR status=0x{status:02X} ({status_desc}) addr=0x{addr24:06X}")
return
return_type = payload[6]
data_hi, data_lo = payload[7], payload[8]
raw16 = (data_hi << 8) | data_lo
is_signed = (return_type & SIGN_BIT_MASK) != 0
frac_bits = return_type & FRAC_MASK_FULL
if is_signed and (raw16 & 0x8000):
value_int = raw16 - 0x10000
else:
value_int = raw16
if self.chk_raw.isChecked():
scale = 1.0
else:
scale = self.iq_scaling.get(frac_bits, 1.0 / (1 << frac_bits)) # 1 / 2^N
scaled = float(value_int) / scale
# Обновляем UI
self.ll_val_rettype.setText(f"0x{return_type:02X} ({frac_bits}{'s' if is_signed else 'u'})")
self.ll_val_scaled.setText(f"{scaled:.6g}")
self.llValueRead.emit(addr24, status, return_type, value_int, scaled)
self._log(f"[LL] OK addr=0x{addr24:06X} type=0x{return_type:02X} raw={value_int} scaled={scaled:.6g}")
def _populate_watch_error(self, bad_index: int, status: int):
"""Отобразить строку ошибки при неудачном ответе WATCH."""
desc = _decode_debug_status(status)
self.tbl_values.setRowCount(1)
self.tbl_values.setItem(0, 0, QtWidgets.QTableWidgetItem(str(bad_index)))
self.tbl_values.setItem(0, 1, QtWidgets.QTableWidgetItem(f"<ERROR:{desc}>"))
self.tbl_values.setItem(0, 2, QtWidgets.QTableWidgetItem("-"))
self.tbl_values.setItem(0, 3, QtWidgets.QTableWidgetItem("-"))
self.tbl_values.setItem(0, 4, QtWidgets.QTableWidgetItem("<ERROR>"))\
def _populate_table(self, idxs, names, iqs, raws, scaled):
"""
Быстрое массовое обновление таблицы значений.
- Не пересоздаём QTableWidgetItem при каждом вызове: обновляем текст.
- Блокируем сортировку, сигналы и обновления на время заполнения.
- Предвычисляем отображаемые строки (особенно формат scaled).
"""
tbl = self.tbl_values
n = len(idxs)
# Заморозка UI на время массового обновления
prev_sorting = tbl.isSortingEnabled()
tbl.setSortingEnabled(False)
tbl.blockSignals(True)
tbl.setUpdatesEnabled(False)
# Подготовка размера
if tbl.rowCount() != n:
tbl.setRowCount(n)
# Предварительно решаем: показывать сырые или масштабированные значения
show_raw = self.chk_raw.isChecked()
# Готовим строки (ускоряет при больших объёмах)
# str() заранее, чтобы не повторять в цикле
idx_strs = [str(v) for v in idxs]
raw_strs = [str(v) for v in raws]
scaled_strs = raw_strs if show_raw else [f"{v:.6g}" for v in scaled]
# Флаги необновляемых ячеек (только выбор/просмотр)
flags_ro = QtCore.Qt.ItemIsSelectable | QtCore.Qt.ItemIsEnabled
# Локальный шорткат для быстрой установки текста в ячейку
def _set_text(row, col, text):
item = tbl.item(row, col)
if item is None:
item = QtWidgets.QTableWidgetItem(text)
item.setFlags(flags_ro)
tbl.setItem(row, col, item)
else:
# обновим текст только при изменении (немного экономит на больших данных)
if item.text() != text:
item.setText(text)
if item.flags() != flags_ro:
item.setFlags(flags_ro)
# Основной цикл
for row in range(n):
iq_raw = iqs[row]
is_signed = (iq_raw & SIGN_BIT_MASK) != 0
frac_bits = iq_raw & FRAC_MASK_FULL
iq_disp = f"{frac_bits}{'s' if is_signed else 'u'}"
_set_text(row, 0, idx_strs[row])
_set_text(row, 1, names[row])
_set_text(row, 2, iq_disp)
_set_text(row, 3, raw_strs[row])
_set_text(row, 4, scaled_strs[row])
# Разморозка
tbl.blockSignals(False)
tbl.setUpdatesEnabled(True)
tbl.setSortingEnabled(prev_sorting)
tbl.viewport().update()
# ------------------------------ POLLING --------------------------------
def _toggle_polling(self):
if self._polling:
self._poll_timer.stop()
self._polling = False
self.btn_poll.setText("Start Polling")
self._log("[POLL] Stopped")
else:
interval = self.spin_interval.value()
self._poll_timer.start(interval)
self._polling = True
self.btn_poll.setText("Stop Polling")
self._log(f"[POLL] Started interval={interval}ms")
self._set_ui_busy(False) # Обновить доступность кнопок
def _on_poll_timeout(self):
self.request_values()
def _toggle_ll_polling(self):
"""Включает и выключает поллинг для LowLevel вкладки."""
if self._ll_polling:
self._ll_poll_timer.stop()
self._ll_polling = False
self.btn_ll_poll.setText("Start Polling")
self._log("[LL POLL] Stopped")
else:
if not self._ll_current_var_info:
self._log("[LL POLL] Cannot start: no variable selected.")
return
interval = self.spin_ll_interval.value()
self._ll_poll_timer.start(interval)
self._ll_polling = True
self.btn_ll_poll.setText("Stop Polling")
self._log(f"[LL POLL] Started interval={interval}ms")
self._set_ui_busy(False) # Обновить доступность кнопок
def _on_ll_poll_timeout(self):
"""Слот таймера поллинга для LowLevel."""
self.request_lowlevel_once()
def _on_ll_variable_prepared(self, var_info: dict):
"""Срабатывает при выборе переменной в селекторе."""
self._ll_current_var_info = var_info
self._log(f"[LL] Selected variable '{var_info['path']}' @ {var_info['address_hex']}")
# Сбрасываем старые значения
self.ll_val_status.setText("-")
self.ll_val_rettype.setText("-")
self.ll_val_scaled.setText("-")
# ------------------------------ HELPERS --------------------------------
def _toggle_index_base(self, st):
# ... (код без изменений)
val = self.spin_index.value()
if st == QtCore.Qt.Checked:
self.spin_index.setDisplayIntegerBase(16); self.spin_index.setPrefix("0x")
else:
self.spin_index.setDisplayIntegerBase(10); self.spin_index.setPrefix("")
self.spin_index.setValue(val)
def _set_ui_busy(self, busy: bool):
# Блокируем кнопки в зависимости от состояния 'busy' и 'polling'
# Watch tab
can_use_watch = not busy and not (self._polling or self._ll_polling)
#self.btn_update_service.setEnabled(can_use_watch)
self.btn_read_values.setEnabled(can_use_watch)
# LowLevel tab
can_use_ll = not busy and not (self._ll_polling or self._polling)
self.btn_ll_read.setEnabled(can_use_ll)
def _on_serial_error(self, err):
# ... (код без изменений)
if err == QtSerialPort.QSerialPort.NoError: return
self._log(f"[SERIAL ERR] {self.serial.errorString()} ({err})")
if self._busy: self._end_txn()
# ------------------------------ LOGGING --------------------------------
def _log(self, msg: str):
# ... (код без изменений)
if not self.log_spoiler.getState():
return
ts = datetime.datetime.now().strftime('%H:%M:%S.%f')[:-3]
self.txt_log.append(f"{ts} {msg}")
def _log_frame(self, data: bytes, *, tx: bool):
# ... (код без изменений)
if not self.log_spoiler.getState():
return
tag = 'TX' if tx else 'RX'
hexs = ' '.join(f"{b:02X}" for b in data)
ascii_part = ''.join(chr(b) if 32 <= b < 127 else '.' for b in data)
self._log(f"[{tag}] {hexs} |{ascii_part}|")
# ---------------------------------------------------------- Demo harness ---
class _DemoWindow(QtWidgets.QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle("DebugVar Terminal")
self.term = DebugTerminalWidget(self)
self.setCentralWidget(self.term)
self.term.nameRead.connect(self._on_name)
self.term.valueRead.connect(self._on_value)
self.term.llValueRead.connect(self._on_ll_value)
def _on_name(self, index, status, iq, name):
return
print(f"Name idx={index} status={status} iq={iq} name='{name}'")
def _on_value(self, index, status, iq, raw16, floatVal):
return
print(f"Value idx={index} status={status} iq={iq} raw={raw16} val={floatVal}")
def _on_ll_value(self, addr, status, rettype_raw, raw16, scaled):
return
print(f"LL addr=0x{addr:06X} status={status} type=0x{rettype_raw:02X} raw={raw16} scaled={scaled}")
def format_address(addr_text: str) -> str:
try:
value = int(addr_text, 16)
except ValueError:
value = 0
return f"0x{value:06X}"
def closeEvent(self, event):
self.setCentralWidget(None)
if self.term:
self.term.deleteLater(); self.term = None
super().closeEvent(event)
# ------------------------------- Demo --------------------------------------
if __name__ == "__main__":
import sys
app = QtWidgets.QApplication(sys.argv)
win = _DemoWindow(); win.show()
win.resize(640, 520)
sys.exit(app.exec_())