debugVarTool/Src/tms_debugvar_term.py
Razvalyaev 788ad19464 кууууча всего по терминалке, надо резгребать и структурировать
базово:
+сделан lowlevel для кучи переменных (пока работает медленно)
+сделан сохранение принимаемых значений в лог
+ gui терминалок подогнаны под один стиль плюс минус
2025-07-22 18:05:12 +03:00

1255 lines
54 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from PySide2 import QtCore, QtWidgets, QtSerialPort
from tms_debugvar_lowlevel import LowLevelSelectorWidget
import datetime
import time
from csv_logger import CsvLogger
# ------------------------------- Константы протокола ------------------------
WATCH_SERVICE_BIT = 0x8000
DEBUG_OK = 0 # ожидаемый код успешного чтения
SIGN_BIT_MASK = 0x80
FRAC_MASK_FULL = 0x7F # если используем 7 бит дробной части
# --- Debug status codes (из прошивки) ---
DEBUG_OK = 0x00
DEBUG_ERR = 0x80 # общий флаг ошибки (старший бит)
DEBUG_ERR_VAR_NUMB = DEBUG_ERR | (1 << 0)
DEBUG_ERR_INVALID_VAR = DEBUG_ERR | (1 << 1)
DEBUG_ERR_ADDR = DEBUG_ERR | (1 << 2)
DEBUG_ERR_ADDR_ALIGN = DEBUG_ERR | (1 << 3)
DEBUG_ERR_INTERNAL = DEBUG_ERR | (1 << 4)
DEBUG_ERR_DATATIME = DEBUG_ERR | (1 << 5)
DEBUG_ERR_RS = DEBUG_ERR | (1 << 5)
# для декодирования по битам
_DEBUG_ERR_BITS = (
(1 << 0, "Invalid Variable Index"),
(1 << 1, "Invalid Variable"),
(1 << 2, "Invalid Address"),
(1 << 3, "Invalid Address Align"),
(1 << 4, "Internal Code Error"),
(1 << 5, "Invalid Data or Time"),
(1 << 6, "Error with RS"),
)
# ---------------------------------------------------------------- CRC util ---
def crc16_ibm(data: bytes, *, init=0xFFFF) -> int:
"""CRC16-IBM (aka CRC-16/ANSI, polynomial 0xA001 reflected)."""
crc = init
for b in data:
crc ^= b
for _ in range(8):
if crc & 1:
crc = (crc >> 1) ^ 0xA001
else:
crc >>= 1
return crc & 0xFFFF
def _decode_debug_status(status: int) -> str:
"""Преобразует код статуса прошивки в строку.
Возвращает 'OK' или перечисление битов через '|'.
Не зависит от того, WATCH или LowLevel.
"""
if status == DEBUG_OK:
return "OK"
parts = []
if status & DEBUG_ERR:
for mask, name in _DEBUG_ERR_BITS:
if status & mask:
parts.append(name)
if not parts: # старший бит есть, но ни один из известных младших не выставлен
parts.append("ERR")
else:
# Неожиданно: статус !=0, но бит DEBUG_ERR не стоит
parts.append(f"0x{status:02X}")
return "|".join(parts)
class Spoiler(QtWidgets.QWidget):
def __init__(self, title="", animationDuration=300, parent=None):
super().__init__(parent)
self._animationDuration = animationDuration
self.state = False
# --- Toggle button ---
self.toggleButton = QtWidgets.QToolButton(self)
self.toggleButton.setStyleSheet("QToolButton { border: none; }")
self.toggleButton.setToolButtonStyle(QtCore.Qt.ToolButtonTextBesideIcon)
self.toggleButton.setArrowType(QtCore.Qt.RightArrow)
self.toggleButton.setText(title)
self.toggleButton.setCheckable(True)
# --- Header line ---
self.headerLine = QtWidgets.QFrame(self)
self.headerLine.setFrameShape(QtWidgets.QFrame.HLine)
self.headerLine.setFrameShadow(QtWidgets.QFrame.Sunken)
# --- Content area ---
self.contentArea = QtWidgets.QScrollArea(self)
self.contentArea.setSizePolicy(QtWidgets.QSizePolicy.Expanding, QtWidgets.QSizePolicy.Fixed)
self.contentArea.setFrameShape(QtWidgets.QFrame.NoFrame)
self.contentArea.setWidgetResizable(True)
self._contentWidget = QtWidgets.QWidget()
self.contentArea.setWidget(self._contentWidget)
self.contentArea.setMaximumHeight(0)
# --- Анимация только по контенту ---
self._ani_content = QtCore.QPropertyAnimation(self.contentArea, b"maximumHeight")
self._ani_content.setDuration(animationDuration)
self._ani_content.setEasingCurve(QtCore.QEasingCurve.InOutCubic)
# Следим за шагами анимации → обновляем родителя
self._ani_content.valueChanged.connect(self._adjust_parent_size)
# --- Layout ---
self.mainLayout = QtWidgets.QGridLayout(self)
self.mainLayout.setVerticalSpacing(0)
self.mainLayout.setContentsMargins(0, 0, 0, 0)
self.mainLayout.addWidget(self.toggleButton, 0, 0, 1, 1)
self.mainLayout.addWidget(self.headerLine, 0, 1, 1, 1)
self.mainLayout.addWidget(self.contentArea, 1, 0, 1, 2)
# --- Signals ---
self.toggleButton.clicked.connect(self._on_toggled)
def setContentLayout(self, contentLayout):
old = self._contentWidget.layout()
if old:
QtWidgets.QWidget().setLayout(old)
self._contentWidget.setLayout(contentLayout)
def getState(self):
return self.state
def _adjust_parent_size(self, *_):
top = self.window()
if top:
size = top.size()
size.setHeight(top.sizeHint().height()) # берём новую высоту
top.resize(size) # ширина остаётся прежней
def _on_toggled(self, checked: bool):
self.state = checked
self.toggleButton.setArrowType(QtCore.Qt.DownArrow if checked else QtCore.Qt.RightArrow)
contentHeight = self._contentWidget.sizeHint().height()
self._ani_content.stop()
self._ani_content.setStartValue(self.contentArea.maximumHeight())
self._ani_content.setEndValue(contentHeight if checked else 0)
# --- Фиксируем ширину на время анимации ---
w = self.width()
self.setFixedWidth(w)
self._ani_content.finished.connect(lambda: self.setMaximumWidth(16777215)) # сброс фикса
self._ani_content.start()
# --------------------------- DebugTerminalWidget ---------------------------
class DebugTerminalWidget(QtWidgets.QWidget):
# Существующие сигналы (Watch)
nameRead = QtCore.Signal(int, int, int, str)
valueRead = QtCore.Signal(int, int, int, int, float)
valuesRead = QtCore.Signal(int, int, list, list, list, list)
# Новые сигналы (LowLevel)
llValueRead = QtCore.Signal(int, int, int, int, float) # addr, status, rettype_raw, raw16_signed, scaled
portOpened = QtCore.Signal(str)
portClosed = QtCore.Signal(str)
txBytes = QtCore.Signal(bytes)
rxBytes = QtCore.Signal(bytes)
def __init__(self, parent=None, *,
start_byte=0x0A,
cmd_byte=0x46,
cmd_lowlevel=0x47,
iq_scaling=None,
read_timeout_ms=250,
auto_crc_check=True,
drop_if_busy=False,
replace_if_busy=True):
super().__init__(parent)
self.device_addr = start_byte
self.cmd_byte = cmd_byte
self.cmd_lowlevel = cmd_lowlevel
self.read_timeout_ms = read_timeout_ms
self.auto_crc_check = auto_crc_check
self._drop_if_busy = drop_if_busy
self._replace_if_busy = replace_if_busy
self._last_txn_timestamp = 0
self._ll_polling_active = False
if iq_scaling is None:
iq_scaling = {n: float(1 << n) for n in range(31)}
iq_scaling[0] = 1.0
self.iq_scaling = iq_scaling
# Serial
self.serial = QtSerialPort.QSerialPort(self)
self.serial.setBaudRate(115200)
self.serial.readyRead.connect(self._on_ready_read)
self.serial.errorOccurred.connect(self._on_serial_error)
# State
self._rx_buf = bytearray()
self._busy = False
self._pending_cmd = None # (frame, meta)
self._txn_meta = None # {'service':bool,'index':int,'varqnt':int,'chain':...,'lowlevel':bool}
self._txn_timer = QtCore.QTimer(self)
self._txn_timer.setSingleShot(True)
self._txn_timer.timeout.connect(self._on_txn_timeout)
# Watch polling
self._poll_timer = QtCore.QTimer(self)
self._poll_timer.timeout.connect(self._on_poll_timeout)
self._polling = False
# LowLevel polling
self._ll_poll_timer = QtCore.QTimer(self)
self._ll_poll_timer.timeout.connect(self._on_ll_poll_timeout)
self._ll_polling = False
self._ll_polling_variables = [] # List of selected variables for polling
self._ll_current_poll_index = -1 # Index of the variable currently being polled in the _ll_polling_variables list
self._ll_current_var_info = []
self.csv_logger = CsvLogger()
self._csv_logging_active = False
self._last_csv_timestamp = 0 # Для отслеживания времени записи
# Кэш: index -> (status, iq, name, is_signed, frac_bits)
self._name_cache = {}
# Очередь service индексов
self._service_queue = []
self._pending_data_after_services = None # (base, count)
self._build_ui()
self._connect_ui()
self.set_available_ports()
# ------------------------------ UI ----------------------------------
def _build_ui(self):
layout = QtWidgets.QVBoxLayout(self)
# --- Serial group ---
g_serial = QtWidgets.QGroupBox("Serial Port")
hs = QtWidgets.QHBoxLayout(g_serial)
self.cmb_port = QtWidgets.QComboBox()
self.btn_refresh = QtWidgets.QPushButton("Refresh")
self.cmb_baud = QtWidgets.QComboBox()
self.cmb_baud.addItems(["9600","19200","38400","57600","115200","230400"])
self.cmb_baud.setCurrentText("115200")
self.btn_open = QtWidgets.QPushButton("Open")
hs.addWidget(QtWidgets.QLabel("Port:"))
hs.addWidget(self.cmb_port, 1)
hs.addWidget(self.btn_refresh)
hs.addSpacing(10)
hs.addWidget(QtWidgets.QLabel("Baud:"))
hs.addWidget(self.cmb_baud)
hs.addWidget(self.btn_open)
# --- TabWidget ---
self.tabs = QtWidgets.QTabWidget()
self._build_watch_tab()
self._build_lowlevel_tab() # <-- Вызываем новый метод
g_control = QtWidgets.QGroupBox("Control / Status")
control_layout = QtWidgets.QHBoxLayout(g_control) # Используем QHBoxLayout
# Форма для статусов слева
form_control = QtWidgets.QFormLayout()
self.lbl_status = QtWidgets.QLabel("Idle")
self.lbl_status.setStyleSheet("font-weight: bold; color: grey;")
form_control.addRow("Status:", self.lbl_status)
self.lbl_actual_interval = QtWidgets.QLabel("-")
form_control.addRow("Actual Interval:", self.lbl_actual_interval)
control_layout.addLayout(form_control, 1) # Растягиваем форму
# Галочка Raw справа
self.chk_raw = QtWidgets.QCheckBox("Raw (no IQ scaling)")
control_layout.addWidget(self.chk_raw)
# Создаем QGroupBox для группировки элементов управления CSV
self.csv_log_groupbox = QtWidgets.QGroupBox("CSV Logging")
csv_log_layout = QtWidgets.QVBoxLayout(self.csv_log_groupbox) # Передаем groupbox как родительский layout
# Элементы управления CSV
h_file_select = QtWidgets.QHBoxLayout()
self.btn_select_csv_file = QtWidgets.QPushButton("Выбрать файл CSV")
# Убедитесь, что self.csv_logger инициализирован где-то до этого момента
self.lbl_csv_filename = QtWidgets.QLabel(self.csv_logger.filename)
h_file_select.addWidget(self.btn_select_csv_file)
h_file_select.addWidget(self.lbl_csv_filename, 1)
csv_log_layout.addLayout(h_file_select)
h_control_buttons = QtWidgets.QHBoxLayout()
self.btn_start_csv_logging = QtWidgets.QPushButton("Начать запись в CSV")
self.btn_stop_csv_logging = QtWidgets.QPushButton("Остановить запись в CSV")
self.btn_save_csv_data = QtWidgets.QPushButton("Сохранить данные в CSV")
self.btn_stop_csv_logging.setEnabled(False) # По умолчанию остановлена
h_control_buttons.addWidget(self.btn_start_csv_logging)
h_control_buttons.addWidget(self.btn_stop_csv_logging)
h_control_buttons.addWidget(self.btn_save_csv_data)
csv_log_layout.addLayout(h_control_buttons)
# Добавляем QGroupBox в основной лейаут
# --- UART Log ---
self.log_spoiler = Spoiler("UART Log", animationDuration=300, parent=self)
self.log_spoiler.setSizePolicy(QtWidgets.QSizePolicy.Expanding,
QtWidgets.QSizePolicy.Minimum)
log_layout = QtWidgets.QVBoxLayout()
self.txt_log = QtWidgets.QTextEdit(); self.txt_log.setReadOnly(True)
self.txt_log.setFontFamily("Courier")
log_layout.addWidget(self.txt_log)
self.log_spoiler.setContentLayout(log_layout)
layout.addWidget(g_serial)
layout.addWidget(self.tabs, 1)
layout.addWidget(g_control)
layout.addWidget(self.csv_log_groupbox)
layout.addWidget(self.log_spoiler)
layout.setStretch(layout.indexOf(g_serial), 0)
layout.setStretch(layout.indexOf(self.tabs), 1)
def _build_watch_tab(self):
tab = QtWidgets.QWidget()
main_layout = QtWidgets.QVBoxLayout(tab)
# --- Variable Selector ---
g_selector = QtWidgets.QGroupBox("Variable Selector")
selector_layout = QtWidgets.QVBoxLayout(g_selector)
form_selector = QtWidgets.QFormLayout()
h_layout = QtWidgets.QHBoxLayout()
self.spin_index = QtWidgets.QSpinBox()
self.spin_index.setRange(0, 0x7FFF)
self.spin_index.setAccelerated(True)
self.chk_hex_index = QtWidgets.QCheckBox("Hex")
self.spin_count = QtWidgets.QSpinBox()
self.spin_count.setRange(1, 255)
self.spin_count.setValue(1)
# Первая группа: Base Index + spin + checkbox
base_index_layout = QtWidgets.QHBoxLayout()
base_index_label = QtWidgets.QLabel("Base Index")
base_index_layout.addWidget(base_index_label)
base_index_layout.addWidget(self.spin_index)
base_index_layout.addWidget(self.chk_hex_index)
base_index_layout.setSpacing(5)
# Вторая группа: spin_count + метка справа
count_layout = QtWidgets.QHBoxLayout()
count_layout.setSpacing(2) # минимальный отступ
count_layout.addWidget(self.spin_count)
count_label = QtWidgets.QLabel("Cnt")
count_layout.addWidget(count_label)
# Добавляем обе группы в общий горизонтальный лэйаут
h_layout.addLayout(base_index_layout)
h_layout.addSpacing(20)
h_layout.addLayout(count_layout)
form_selector.addRow(h_layout)
self.spin_interval = QtWidgets.QSpinBox()
self.spin_interval.setRange(50, 10000)
self.spin_interval.setValue(500)
self.spin_interval.setSuffix(" ms")
form_selector.addRow("Interval:", self.spin_interval)
selector_layout.addLayout(form_selector)
btn_layout = QtWidgets.QHBoxLayout()
self.btn_update_service = QtWidgets.QPushButton("Update Service")
self.btn_read_values = QtWidgets.QPushButton("Read Value(s)")
self.btn_poll = QtWidgets.QPushButton("Start Polling")
btn_layout.addWidget(self.btn_update_service)
btn_layout.addWidget(self.btn_read_values)
btn_layout.addWidget(self.btn_poll)
selector_layout.addLayout(btn_layout)
# --- Table ---
g_table = QtWidgets.QGroupBox("Table")
table_layout = QtWidgets.QVBoxLayout(g_table)
self.tbl_values = QtWidgets.QTableWidget(0, 5)
self.tbl_values.setHorizontalHeaderLabels(["Index", "Name", "IQ", "Raw", "Scaled"])
hh = self.tbl_values.horizontalHeader()
for i in range(4):
hh.setSectionResizeMode(i, QtWidgets.QHeaderView.ResizeToContents)
hh.setSectionResizeMode(4, QtWidgets.QHeaderView.Stretch)
self.tbl_values.verticalHeader().setVisible(False)
table_layout.addWidget(self.tbl_values)
# --- Вертикальный сплиттер ---
v_split = QtWidgets.QSplitter(QtCore.Qt.Vertical)
v_split.addWidget(g_selector)
v_split.addWidget(g_table)
v_split.setStretchFactor(0, 1)
v_split.setStretchFactor(1, 3)
v_split.setStretchFactor(2, 1)
main_layout.addWidget(v_split)
self.tabs.addTab(tab, "Watch")
table_layout.addWidget(self.tbl_values)
def _build_lowlevel_tab(self):
# создаём виджет LowLevelSelectorWidget
self.ll_selector = LowLevelSelectorWidget()
# добавляем как корневой виджет вкладки
self.tabs.addTab(self.ll_selector, "LowLevel")
def _connect_ui(self):
# Watch
self.btn_refresh.clicked.connect(self.set_available_ports)
self.btn_open.clicked.connect(self._open_close_port)
self.btn_update_service.clicked.connect(self.request_service_update_for_table)
self.btn_read_values.clicked.connect(self.request_values)
self.btn_poll.clicked.connect(self._toggle_polling)
self.chk_hex_index.stateChanged.connect(self._toggle_index_base)
# LowLevel (новые и переделанные)
self.ll_selector.variablePrepared.connect(self._on_ll_variable_prepared)
self.ll_selector.xmlLoaded.connect(lambda p: self._log(f"[LL] XML loaded: {p}"))
self.ll_selector.btn_read_once.clicked.connect(self.request_lowlevel_once)
self.ll_selector.btn_start_polling.clicked.connect(self._toggle_ll_polling)
# --- CSV Logging ---
self.btn_select_csv_file.clicked.connect(self._select_csv_file)
self.btn_start_csv_logging.clicked.connect(self._start_csv_logging)
self.btn_stop_csv_logging.clicked.connect(self._stop_csv_logging)
self.btn_save_csv_data.clicked.connect(self._save_csv_data)
def set_status(self, text: str, mode: str = "idle"):
colors = {
"idle": "gray",
"service": "blue",
"values": "green",
"error": "red"
}
color = colors.get(mode.lower(), "black")
self.lbl_status.setText(text)
self.lbl_status.setStyleSheet(f"font-weight: bold; color: {color};")
# ----------------------------- SERIAL MGMT ----------------------------
def set_available_ports(self):
cur = self.cmb_port.currentText()
self.cmb_port.blockSignals(True)
self.cmb_port.clear()
for info in QtSerialPort.QSerialPortInfo.availablePorts():
self.cmb_port.addItem(info.portName())
if cur:
ix = self.cmb_port.findText(cur)
if ix >= 0:
self.cmb_port.setCurrentIndex(ix)
self.cmb_port.blockSignals(False)
def _open_close_port(self):
if self.serial.isOpen():
name = self.serial.portName()
self.serial.close()
self.btn_open.setText("Open")
self._log(f"[PORT OK] Closed {name}")
self.portClosed.emit(name)
return
port = self.cmb_port.currentText()
if not port:
self._log("[ERR] No port selected")
return
self.serial.setPortName(port)
self.serial.setBaudRate(int(self.cmb_baud.currentText()))
if not self.serial.open(QtCore.QIODevice.ReadWrite):
self._log(f"[ERR] Open fail {port}: {self.serial.errorString()}")
return
self.btn_open.setText("Close")
self._log(f"[PORT OK] Opened {port}")
self.portOpened.emit(port)
# ---------------------------- FRAME BUILD -----------------------------
def _build_request(self, index: int, *, service: bool, varqnt: int) -> bytes:
dbg = index & 0x7FFF
if service:
dbg |= WATCH_SERVICE_BIT
hi = (dbg >> 8) & 0xFF
lo = dbg & 0xFF
q = varqnt & 0xFF
payload = bytes([self.device_addr & 0xFF, self.cmd_byte & 0xFF, hi, lo, q])
crc = crc16_ibm(payload)
return payload + bytes([crc & 0xFF, (crc >> 8) & 0xFF])
def _build_lowlevel_request(self, var_info: dict) -> bytes:
# Формат: [adr][cmd_lowlevel][year_hi][year_lo][month][day][hour][minute][addr2][addr1][addr0][pt_type][iq_type][return_type]
# Пытаемся получить время из переданной информации
dt_info = self.ll_selector.get_datetime()
if dt_info:
# Используем время из var_info
year = dt_info.year
month = dt_info.month
day = dt_info.day
hour = dt_info.hour
minute = dt_info.minute
self._log("[LL] Using time from selector.")
else:
return
addr = var_info.get('address', 0)
addr2 = (addr >> 16) & 0xFF
addr1 = (addr >> 8) & 0xFF
addr0 = addr & 0xFF
# Ensure 'ptr_type' and 'iq_type' from var_info are integers (enum values)
# Use a fallback to 0 if they are not found or not integers
pt_type = var_info.get('ptr_type_enum', 0) & 0xFF
iq_type = var_info.get('iq_type_enum', 0) & 0xFF
ret_type = var_info.get('return_type_enum', 0) & 0xFF
frame_wo_crc = bytes([
self.device_addr & 0xFF, self.cmd_lowlevel & 0xFF,
(year >> 8) & 0xFF, year & 0xFF,
month & 0xFF, day & 0xFF, hour & 0xFF, minute & 0xFF,
addr2, addr1, addr0, pt_type, iq_type, ret_type
])
crc = crc16_ibm(frame_wo_crc)
return frame_wo_crc + bytes([crc & 0xFF, (crc >> 8) & 0xFF])
# ----------------------------- PUBLIC API -----------------------------
def request_service_single(self):
idx = int(self.spin_index.value())
self._enqueue_or_start(idx, service=True, varqnt=0)
def request_service_update_for_table(self):
"""
Очищает кеш имен/типов для всех видимых в таблице переменных
и инициирует их повторное чтение.
"""
indices_to_update = []
for row in range(self.tbl_values.rowCount()):
item = self.tbl_values.item(row, 0)
if item and item.text().isdigit():
indices_to_update.append(int(item.text()))
if not indices_to_update:
self._log("[SERVICE] No variables in table to update.")
return
self._log(f"[SERVICE] Queuing name/type update for {len(indices_to_update)} variables.")
# Очищаем кеш для этих индексов, чтобы принудительно их перечитать
for index in indices_to_update:
if index in self._name_cache:
del self._name_cache[index]
# Запускаем стандартный запрос значений. Он автоматически обработает
# отсутствующую сервисную информацию (имена/типы) перед запросом данных.
if not (self._polling or self._ll_polling):
self.request_values()
def request_values(self):
self._update_interval()
base = int(self.spin_index.value())
count = int(self.spin_count.value())
needed = []
for i in range(base, base+count):
if i not in self._name_cache:
needed.append(i)
if needed:
self._service_queue = needed[:]
self._pending_data_after_services = (base, count)
self._log(f"[AUTO] Need service for {len(needed)} indices: {needed}")
self.set_status("Read service...", "service")
self._kick_service_queue()
else:
self.set_status("Read values...", "values")
self._enqueue_or_start(base, service=False, varqnt=count)
def request_lowlevel_once(self):
"""Запрашивает чтение выбранной LowLevel переменной (однократно)."""
if not self.serial.isOpen():
self._log("[LL] Port is not open.")
return
if self._busy:
self._log("[LL] Busy, request dropped.")
return
# Если переменная не подготовлена, или нет актуальной информации
if not hasattr(self, '_ll_current_var_info') or not self._ll_current_var_info:
self._log("[LL] No variable prepared/selected for single read!")
return
frame = self._build_lowlevel_request(self._ll_current_var_info)
# --- НОВОЕ: Передаем ll_var_info в метаданные транзакции ---
meta = {'lowlevel': True, 'll_polling': False, 'll_var_info': self._ll_current_var_info}
self.set_status("Read lowlevel...", "values")
self._enqueue_raw(frame, meta)
# -------------------------- SERVICE QUEUE FLOW ------------------------
# ... (код без изменений)
def _kick_service_queue(self):
if self._busy:
return
if self._service_queue:
nxt = self._service_queue.pop(0)
self._enqueue_or_start(nxt, service=True, varqnt=0, queue_mode=True)
elif self._pending_data_after_services:
base, count = self._pending_data_after_services
self._pending_data_after_services = None
self._enqueue_or_start(base, service=False, varqnt=count)
# ------------------------ TRANSACTION SCHEDULER -----------------------
# ... (код без изменений)
def _enqueue_raw(self, frame: bytes, meta: dict):
# Добавляем ll_var_info, если это LL запрос
if meta.get('lowlevel', False) and 'll_var_info' not in meta:
# Это должно быть установлено вызывающим кодом, но для безопасности
# или если LL polling не передал var_info явно
meta['ll_var_info'] = self._ll_current_var_info # Используем last prepared var info for single shots
if self._busy:
# ... существующий код ...
if self._replace_if_busy:
self._pending_cmd = (frame, meta)
self._log("[LOCKSTEP] Busy -> replaced pending")
else:
self._log("[LOCKSTEP] Busy -> ignore")
return
self._start_txn(frame, meta)
def _enqueue_or_start(self, index, service: bool, varqnt: int, chain_after=None, queue_mode=False):
frame = self._build_request(index, service=service, varqnt=varqnt)
meta = {'service': service, 'index': index, 'varqnt': varqnt, 'chain': chain_after, 'queue_mode': queue_mode, 'lowlevel': False}
if self._busy:
if self._drop_if_busy and not self._replace_if_busy:
self._log("[LOCKSTEP] Busy -> drop")
return
if self._replace_if_busy:
self._pending_cmd = (frame, meta)
self._log("[LOCKSTEP] Busy -> replaced pending")
else:
self._log("[LOCKSTEP] Busy -> ignore")
return
self._start_txn(frame, meta)
def _start_txn(self, frame: bytes, meta: dict):
if(meta.get('service')):
self._update_interval()
self._busy = True
self._txn_meta = meta
self._rx_buf.clear()
self._set_ui_busy(True)
self._send(frame)
self._txn_timer.start(self.read_timeout_ms)
def _end_txn(self):
self._txn_timer.stop()
queue_mode = False
chain = None
meta = self._txn_meta
if meta:
queue_mode = meta.get('queue_mode', False)
chain = meta.get('chain')
self._txn_meta = None
self._busy = False
self._rx_buf.clear()
self._set_ui_busy(False)
if chain:
base, serv, q = chain
self._enqueue_or_start(base, service=serv, varqnt=q)
return
if self._pending_cmd is not None:
frame, meta = self._pending_cmd
self._pending_cmd = None
QtCore.QTimer.singleShot(0, lambda f=frame, m=meta: self._start_txn(f, m))
return
if queue_mode:
QtCore.QTimer.singleShot(0, self._kick_service_queue)
# !!! Раньше тут было `return`, его убираем
# Если идёт LL polling — переходим сразу к следующей переменной
if self._ll_polling and (self._ll_poll_index < len(self._ll_polling_variables)):
self._process_next_ll_variable_in_cycle()
return
def _on_txn_timeout(self):
if not self._busy: return
is_ll = self._txn_meta.get('lowlevel', False) if self._txn_meta else False
log_prefix = "[LL TIMEOUT]" if is_ll else "[TIMEOUT]"
self._log(f"{log_prefix} No response")
if self._rx_buf:
self._log_frame(bytes(self._rx_buf), tx=False)
self._end_txn()
self.set_status("Timeout", "error")
# ------------------------------- TX/RX ---------------------------------
# ... (код без изменений)
def _send(self, data: bytes):
w = self.serial.write(data)
if w != len(data):
self._log(f"[ERR] Write short {w}/{len(data)}")
self.txBytes.emit(data)
self._log_frame(data, tx=True)
def _on_ready_read(self):
self._rx_buf.extend(self.serial.readAll().data())
if not self._busy:
if self._rx_buf:
self._log("[WARN] Data while idle -> drop")
self._log_frame(bytes(self._rx_buf), tx=False)
self._rx_buf.clear()
return
self._try_parse()
if not (self._polling or self._ll_polling):
self.set_status("Idle", "idle")
# ------------------------------- PARSING -------------------------------
def _try_parse(self):
if not self._txn_meta:
return
if self._txn_meta.get('lowlevel', False):
self._try_parse_lowlevel()
else:
self._try_parse_watch()
def _try_parse_watch(self):
# ... (код без изменений)
service = self._txn_meta['service']
buf = self._rx_buf
trailer_len = 4
if service:
if len(buf) < 7 + trailer_len:
return
name_len = buf[6]
expected = 7 + name_len + trailer_len
if len(buf) < expected:
return
frame = bytes(buf[:expected]); del buf[:expected]
self.rxBytes.emit(frame); self._log_frame(frame, tx=False)
self._parse_service_frame(frame)
self._end_txn()
else:
if len(buf) < 6 + trailer_len:
return
varqnt = buf[4]; status = buf[5]
if status != DEBUG_OK:
expected = 8 + trailer_len
if len(buf) < expected: return
frame = bytes(buf[:expected]); del buf[:expected]
self.rxBytes.emit(frame); self._log_frame(frame, tx=False)
self._parse_data_frame(frame, error_mode=True)
self._end_txn()
else:
expected = 6 + varqnt*2 + trailer_len
if len(buf) < expected: return
frame = bytes(buf[:expected]); del buf[:expected]
self.rxBytes.emit(frame); self._log_frame(frame, tx=False)
self._parse_data_frame(frame, error_mode=False)
self._end_txn()
def _try_parse_lowlevel(self):
# Ожидаемая длина: Успех=13, Ошибка=10
buf = self._rx_buf
if len(buf) < 10: # Минимальная длина (ошибка)
return
# Проверяем, что ответ для нас
if buf[1] != self.cmd_lowlevel:
self._log("[LL] Unexpected cmd in lowlevel parser, flushing.")
self._log_frame(bytes(self._rx_buf), tx=False)
self._rx_buf.clear()
# Не завершаем транзакцию, ждём таймаута
return
status = buf[2]
expected_len = 13 if status == DEBUG_OK else 10
if len(buf) >= expected_len:
frame = bytes(buf[:expected_len])
del buf[:expected_len]
self.rxBytes.emit(frame)
self._log_frame(frame, tx=False)
self._parse_lowlevel_frame(frame, success=(status == DEBUG_OK))
self._end_txn()
def _check_crc(self, payload: bytes, crc_lo: int, crc_hi: int):
if not self.auto_crc_check:
return True
crc_rx = (crc_hi << 8) | crc_lo
crc_calc = crc16_ibm(payload)
if crc_calc != crc_rx:
self._log(f"[CRC FAIL] calc=0x{crc_calc:04X} rx=0x{crc_rx:04X}")
return False
self._log("[CRC OK]")
return True
@staticmethod
def _clear_service_bit(vhi, vlo):
return ((vhi & 0x7F) << 8) | vlo
def _parse_service_frame(self, frame: bytes):
# ... (код без изменений)
payload = frame[:-4]; crc_lo, crc_hi = frame[-4], frame[-3]
if len(payload) < 7:
self._log("[ERR] Service frame too short"); return
self._check_crc(payload, crc_lo, crc_hi)
adr, cmd, vhi, vlo, status, iq_raw, name_len = payload[:7]
status_desc = _decode_debug_status(status)
index = self._clear_service_bit(vhi, vlo)
if len(payload) < 7 + name_len:
self._log("[ERR] Service name truncated"); return
name_bytes = payload[7:7+name_len]; name = name_bytes.decode(errors='replace')
is_signed = (iq_raw & SIGN_BIT_MASK) != 0
frac_bits = iq_raw & FRAC_MASK_FULL
if status == DEBUG_OK:
self._name_cache[index] = (status, iq_raw, name, is_signed, frac_bits)
self.nameRead.emit(index, status, iq_raw, name)
self._log(f"[SERVICE] idx={index} status={status} iq_raw=0x{iq_raw:02X} sign={'S' if is_signed else 'U'} frac={frac_bits} name='{name}'")
def _parse_data_frame(self, frame: bytes, *, error_mode: bool):
payload = frame[:-4]; crc_lo, crc_hi = frame[-4], frame[-3]
if len(payload) < 6:
self._log("[ERR] Data frame too short"); return
self._check_crc(payload, crc_lo, crc_hi)
adr, cmd, vhi, vlo, varqnt, status = payload[:6]
base = self._clear_service_bit(vhi, vlo)
if error_mode:
self.set_status("Error", "error")
if len(payload) < 8:
self._log("[ERR] Error frame truncated"); return
err_hi, err_lo = payload[6:8]
bad_index = (err_hi << 8) | err_lo
desc = _decode_debug_status(status)
self._log(f"[DATA] ERROR status=0x{status:02X} ({desc}) bad_index={bad_index}")
# Обновим UI
self._populate_watch_error(bad_index, status)
# Сигналы (оставляем совместимость)
self.valueRead.emit(bad_index, status, 0, 0, float('nan'))
self.valuesRead.emit(base, 0, [], [], [], [])
return
if len(payload) < 6 + varqnt*2:
self._log("[ERR] Data payload truncated"); return
raw_vals = []
pos = 6
for _ in range(varqnt):
hi = payload[pos]; lo = payload[pos+1]; pos += 2
raw16 = (hi << 8) | lo
raw_vals.append(raw16)
idx_list = []; iq_list = []; name_list = []; scaled_list = []; display_raw_list = []
# Получаем текущее время один раз для всех переменных в этом фрейме
current_time = time.time()
for ofs, raw16 in enumerate(raw_vals):
idx = base + ofs
status_i, iq_raw, name_i, is_signed, frac_bits = self._name_cache.get(idx, (DEBUG_OK, 0, '', False, 0))
if is_signed and (raw16 & 0x8000):
value_int = raw16 - 0x10000
else:
value_int = raw16
if self.chk_raw.isChecked():
scale = 1.0
else:
scale = self.iq_scaling.get(frac_bits, 1.0 / (1 << frac_bits))
scaled = float(value_int) / scale if frac_bits > 0 else float(value_int)
idx_list.append(idx); iq_list.append(iq_raw); name_list.append(name_i)
scaled_list.append(scaled); display_raw_list.append(value_int)
# --- Здесь записываем имя и значение в csv_logger ---
self.csv_logger.set_value(current_time, name_i, scaled)
self._populate_table(idx_list, name_list, iq_list, display_raw_list, scaled_list)
if varqnt == 1:
if idx_list[0] == self.spin_index.value():
_, iq_raw0, name0, is_signed0, frac0 = self._name_cache.get(idx_list[0], (DEBUG_OK, 0, '', False, 0))
self.valueRead.emit(idx_list[0], status, iq_list[0], display_raw_list[0], scaled_list[0])
else:
self.valuesRead.emit(base, varqnt, idx_list, iq_list, display_raw_list, scaled_list)
self._log(f"[DATA] base={base} q={varqnt} values={[f'{v:.6g}' for v in scaled_list] if not self.chk_raw.isChecked() else raw_vals}")
def _parse_lowlevel_frame(self, frame: bytes, success: bool):
payload_len = 9 if success else 6
crc_pos = payload_len
payload = frame[:payload_len]
crc_lo, crc_hi = frame[crc_pos], frame[crc_pos+1]
self._check_crc(payload, crc_lo, crc_hi)
status = payload[2]
addr2, addr1, addr0 = payload[3], payload[4], payload[5]
addr24 = (addr2 << 16) | (addr1 << 8) | addr0
status_desc = _decode_debug_status(status)
return_type = payload[6]
data_hi, data_lo = payload[7], payload[8]
raw16 = (data_hi << 8) | data_lo
is_signed = (return_type & SIGN_BIT_MASK) != 0
frac_bits = return_type & FRAC_MASK_FULL
if is_signed and (raw16 & 0x8000):
value_int = raw16 - 0x10000
else:
value_int = raw16
if self.chk_raw.isChecked():
scale = 1.0
else:
scale = self.iq_scaling.get(frac_bits, 1.0 / (1 << frac_bits)) # 1 / 2^N
scaled = float(value_int) / scale
self.llValueRead.emit(addr24, status, return_type, value_int, scaled)
var_name = None
if self._ll_current_var_info.get("address") == addr24:
var_name = self._ll_current_var_info.get("name")
display_val = value_int if self.chk_raw.isChecked() else scaled
if var_name:
self.ll_selector.set_variable_value(var_name, display_val)
self._log(f"[LL] OK addr=0x{addr24:06X} type=0x{return_type:02X} raw={value_int} scaled={scaled:.6g}")
current_time = time.time() # Получаем текущее время
self.csv_logger.set_value(current_time, var_name, display_val)
def _populate_watch_error(self, bad_index: int, status: int):
"""Отобразить строку ошибки при неудачном ответе WATCH."""
desc = _decode_debug_status(status)
self.tbl_values.setRowCount(1)
self.tbl_values.setItem(0, 0, QtWidgets.QTableWidgetItem(str(bad_index)))
self.tbl_values.setItem(0, 1, QtWidgets.QTableWidgetItem(f"<ERROR:{desc}>"))
self.tbl_values.setItem(0, 2, QtWidgets.QTableWidgetItem("-"))
self.tbl_values.setItem(0, 3, QtWidgets.QTableWidgetItem("-"))
self.tbl_values.setItem(0, 4, QtWidgets.QTableWidgetItem("<ERROR>"))\
def _populate_table(self, idxs, names, iqs, raws, scaled):
"""
Быстрое массовое обновление таблицы значений.
- Не пересоздаём QTableWidgetItem при каждом вызове: обновляем текст.
- Блокируем сортировку, сигналы и обновления на время заполнения.
- Предвычисляем отображаемые строки (особенно формат scaled).
"""
tbl = self.tbl_values
n = len(idxs)
# Заморозка UI на время массового обновления
prev_sorting = tbl.isSortingEnabled()
tbl.setSortingEnabled(False)
tbl.blockSignals(True)
tbl.setUpdatesEnabled(False)
# Подготовка размера
if tbl.rowCount() != n:
tbl.setRowCount(n)
# Предварительно решаем: показывать сырые или масштабированные значения
show_raw = self.chk_raw.isChecked()
# Готовим строки (ускоряет при больших объёмах)
# str() заранее, чтобы не повторять в цикле
idx_strs = [str(v) for v in idxs]
raw_strs = [str(v) for v in raws]
scaled_strs = raw_strs if show_raw else [f"{v:.6g}" for v in scaled]
# Флаги необновляемых ячеек (только выбор/просмотр)
flags_ro = QtCore.Qt.ItemIsSelectable | QtCore.Qt.ItemIsEnabled
# Локальный шорткат для быстрой установки текста в ячейку
def _set_text(row, col, text):
item = tbl.item(row, col)
if item is None:
item = QtWidgets.QTableWidgetItem(text)
item.setFlags(flags_ro)
tbl.setItem(row, col, item)
else:
# обновим текст только при изменении (немного экономит на больших данных)
if item.text() != text:
item.setText(text)
if item.flags() != flags_ro:
item.setFlags(flags_ro)
# Основной цикл
for row in range(n):
iq_raw = iqs[row]
is_signed = (iq_raw & SIGN_BIT_MASK) != 0
frac_bits = iq_raw & FRAC_MASK_FULL
iq_disp = f"{frac_bits}{'s' if is_signed else 'u'}"
_set_text(row, 0, idx_strs[row])
_set_text(row, 1, names[row])
_set_text(row, 2, iq_disp)
_set_text(row, 3, raw_strs[row])
_set_text(row, 4, scaled_strs[row])
# Разморозка
tbl.blockSignals(False)
tbl.setUpdatesEnabled(True)
tbl.setSortingEnabled(prev_sorting)
tbl.viewport().update()
# ------------------------------ POLLING --------------------------------
def _toggle_polling(self):
if self._polling:
self._poll_timer.stop()
self._polling = False
self.btn_poll.setText("Start Polling")
self.set_status("Idle", "idle")
self._log("[POLL] Stopped")
else:
interval = self.spin_interval.value()
self._poll_timer.start(interval)
self._polling = True
self.btn_poll.setText("Stop Polling")
self.set_status("Idle", "idle")
self._log(f"[POLL] Started interval={interval}ms")
self._set_ui_busy(False) # Обновить доступность кнопок
def _on_poll_timeout(self):
self.request_values()
def _toggle_ll_polling(self):
if self._ll_polling: # If currently polling, stop
self._ll_polling = False
self.ll_selector.btn_start_polling.setText("Start Polling")
self._ll_poll_timer.stop()
self._ll_polling_variables.clear()
self._ll_current_poll_index = -1
self._log("[LL Polling] Stopped.")
else: # If not polling, start
# Get all selected variables from the LowLevelSelectorWidget
self._ll_polling_variables = self.ll_selector.get_selected_variables_and_addresses()
if not self._ll_polling_variables:
self._log("[LL] No variables selected for polling. Aborting.")
self.set_status("Error.", "error")
return
self._ll_polling = True
self.ll_selector.btn_start_polling.setText("Stop Polling")
self._ll_current_poll_index = 0 # Start from the first variable
self._log(f"[LL Polling] Started. Polling {len(self._ll_polling_variables)} variables.")
# Start the timer. It will trigger _on_ll_poll_timeout, which starts the cycle.
# The first cycle starts immediately, subsequent cycles wait for the interval.
self._ll_poll_timer.setInterval(self.ll_selector.spin_interval.value())
self._ll_poll_timer.start() # Start the timer for recurrent cycles
# Immediately kick off the first variable read of the first cycle
self._start_ll_cycle()
def _on_ll_poll_timeout(self):
"""Вызывается по таймеру для старта нового цикла."""
if self._ll_polling and not self._busy:
self._start_ll_cycle()
elif self._busy:
self._log("[LL Polling] Busy, skip cycle start.")
def _start_ll_cycle(self):
self._update_interval()
"""Запускает новый цикл опроса всех переменных."""
if not self._ll_polling or not self._ll_polling_variables:
return
self._ll_poll_index = 0
self._process_next_ll_variable_in_cycle()
def _on_ll_variable_prepared(self, var_info: dict):
"""Срабатывает при выборе переменной в селекторе."""
self._ll_current_var_info = var_info
def _process_next_ll_variable_in_cycle(self):
if not self._ll_polling: # Добавим проверку, чтобы избежать вызова, если LL polling отключен
return
if self._ll_poll_index < len(self._ll_polling_variables):
var_info = self._ll_polling_variables[self._ll_poll_index]
self._on_ll_variable_prepared(var_info)
self._ll_poll_index += 1
frame = self._build_lowlevel_request(var_info)
# --- НОВОЕ: Передаем var_info в метаданные транзакции для LL polling ---
meta = {'lowlevel': True, 'll_polling': True, 'll_var_info': var_info}
self.set_status(f"Polling LL: {var_info.get('name')}", "values")
self._enqueue_raw(frame, meta)
else:
# Цикл завершен, перезапускаем таймер для следующего полного цикла
self._ll_poll_index = 0
self._ll_poll_timer.start(self.ll_selector.spin_interval.value())
self.set_status("LL polling cycle done, waiting...", "idle")
# ------------------------------ HELPERS --------------------------------
def _toggle_index_base(self, st):
# ... (код без изменений)
val = self.spin_index.value()
if st == QtCore.Qt.Checked:
self.spin_index.setDisplayIntegerBase(16); self.spin_index.setPrefix("0x")
else:
self.spin_index.setDisplayIntegerBase(10); self.spin_index.setPrefix("")
self.spin_index.setValue(val)
def _set_ui_busy(self, busy: bool):
# Блокируем кнопки в зависимости от состояния 'busy' и 'polling'
# Watch tab
can_use_watch = not busy and not (self._polling or self._ll_polling)
#self.btn_update_service.setEnabled(can_use_watch)
self.btn_read_values.setEnabled(can_use_watch)
# LowLevel tab
can_use_ll = not busy and not (self._ll_polling or self._polling)
self.ll_selector.btn_read_once.setEnabled(can_use_ll)
def _on_serial_error(self, err):
# ... (код без изменений)
if err == QtSerialPort.QSerialPort.NoError: return
self._log(f"[SERIAL ERR] {self.serial.errorString()} ({err})")
if self._busy: self._end_txn()
# ------------------------------ LOGGING --------------------------------
def _select_csv_file(self):
"""Открывает диалог выбора файла для CSV и обновляет UI."""
if self.csv_logger.select_file(self): # Передаем self как parent для диалога
self.lbl_csv_filename.setText(self.csv_logger.filename)
self._log(f"CSV file set to: {self.csv_logger.filename}")
def _start_csv_logging(self):
"""Начинает запись данных в CSV. Устанавливает заголовки в зависимости от активной вкладки."""
if not self.serial.isOpen():
self._log("[CSV] Невозможно начать запись: COM порт не открыт.")
self.set_status("Port closed", "error")
return
# Определяем активную вкладку и устанавливаем заголовки
current_tab_index = self.tabs.currentIndex()
varnames_for_csv = []
if self.tabs.tabText(current_tab_index) == "Watch":
# Для вкладки Watch берем имена из кэша, если они есть, иначе используем Index_X
base_index = self.spin_index.value()
count = self.spin_count.value()
for i in range(base_index, base_index + count):
if i in self._name_cache and self._name_cache[i][2]: # status, iq_raw, name, is_signed, frac_bits
varnames_for_csv.append(self._name_cache[i][2])
else:
varnames_for_csv.append(f"Index_{i}")
self._log(f"[CSV] Начинается запись для Watch переменных: {varnames_for_csv}")
elif self.tabs.tabText(current_tab_index) == "LowLevel":
# Для вкладки LowLevel берем имена из ll_selector
selected_vars = self.ll_selector.get_selected_variables_and_addresses()
varnames_for_csv = [var['name'] for var in selected_vars if 'name' in var]
if not varnames_for_csv:
self._log("[CSV] Внимание: На вкладке LowLevel не выбраны переменные для записи.")
self._log(f"[CSV] Начинается запись для LowLevel переменных: {varnames_for_csv}")
else:
self._log("[CSV] Неизвестная активная вкладка. Невозможно определить заголовки CSV.")
return
if not varnames_for_csv:
self._log("[CSV] Нет переменных для записи в CSV. Запись не начата.")
return
self.csv_logger.set_titles(varnames_for_csv)
self._csv_logging_active = True
self.btn_start_csv_logging.setEnabled(False)
self.btn_stop_csv_logging.setEnabled(True)
self.set_status("CSV Logging ACTIVE", "values")
self._log("[CSV] Запись данных в CSV началась.")
def _stop_csv_logging(self):
"""Останавливает запись данных в CSV."""
self._csv_logging_active = False
self.btn_start_csv_logging.setEnabled(True)
self.btn_stop_csv_logging.setEnabled(False)
self.set_status("CSV Logging STOPPED", "idle")
self._log("[CSV] Запись данных в CSV остановлена.")
def _save_csv_data(self):
"""Сохраняет все собранные данные в CSV файл."""
if self._csv_logging_active:
self._log("[CSV] Запись активна. Сначала остановите запись.")
self.set_status("Stop logging first", "error")
return
self.csv_logger.write_to_csv()
self.set_status("CSV data saved", "idle")
def _log(self, msg: str):
# ... (код без изменений)
if 'ERR' in msg:
self.set_status(msg, 'error')
if 'OK' in msg:
self.set_status('Idle', 'idle')
if not self.log_spoiler.getState():
return
ts = datetime.datetime.now().strftime('%H:%M:%S.%f')[:-3]
self.txt_log.append(f"{ts} {msg}")
def _log_frame(self, data: bytes, *, tx: bool):
# ... (код без изменений)
if not self.log_spoiler.getState():
return
tag = 'TX' if tx else 'RX'
hexs = ' '.join(f"{b:02X}" for b in data)
ascii_part = ''.join(chr(b) if 32 <= b < 127 else '.' for b in data)
self._log(f"[{tag}] {hexs} |{ascii_part}|")
def _update_interval(self):
now = time.perf_counter()
if self._last_txn_timestamp is not None:
delta_ms = (now - self._last_txn_timestamp) * 1000
# Обновляем UI только если он уже создан
if hasattr(self, 'lbl_actual_interval'):
self.lbl_actual_interval.setText(f"{delta_ms:.1f} ms")
self._last_txn_timestamp = now
# ---------------------------------------------------------- Demo harness ---
class _DemoWindow(QtWidgets.QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle("DebugVar Terminal")
self.term = DebugTerminalWidget(self)
self.setCentralWidget(self.term)
self.resize(1000, 600)
def closeEvent(self, event):
self.setCentralWidget(None)
if self.term:
self.term.deleteLater(); self.term = None
super().closeEvent(event)
# ------------------------------- Demo --------------------------------------
if __name__ == "__main__":
import sys
app = QtWidgets.QApplication(sys.argv)
win = _DemoWindow(); win.show()
sys.exit(app.exec_())