from PySide2 import QtCore, QtWidgets, QtSerialPort from tms_debugvar_lowlevel import LowLevelSelectorWidget import datetime import time from csv_logger import CsvLogger # ------------------------------- Константы протокола ------------------------ WATCH_SERVICE_BIT = 0x8000 DEBUG_OK = 0 # ожидаемый код успешного чтения SIGN_BIT_MASK = 0x80 FRAC_MASK_FULL = 0x7F # если используем 7 бит дробной части # --- Debug status codes (из прошивки) --- DEBUG_OK = 0x00 DEBUG_ERR = 0x80 # общий флаг ошибки (старший бит) DEBUG_ERR_VAR_NUMB = DEBUG_ERR | (1 << 0) DEBUG_ERR_INVALID_VAR = DEBUG_ERR | (1 << 1) DEBUG_ERR_ADDR = DEBUG_ERR | (1 << 2) DEBUG_ERR_ADDR_ALIGN = DEBUG_ERR | (1 << 3) DEBUG_ERR_INTERNAL = DEBUG_ERR | (1 << 4) DEBUG_ERR_DATATIME = DEBUG_ERR | (1 << 5) DEBUG_ERR_RS = DEBUG_ERR | (1 << 5) # для декодирования по битам _DEBUG_ERR_BITS = ( (1 << 0, "Invalid Variable Index"), (1 << 1, "Invalid Variable"), (1 << 2, "Invalid Address"), (1 << 3, "Invalid Address Align"), (1 << 4, "Internal Code Error"), (1 << 5, "Invalid Data or Time"), (1 << 6, "Error with RS"), ) # ---------------------------------------------------------------- CRC util --- def crc16_ibm(data: bytes, *, init=0xFFFF) -> int: """CRC16-IBM (aka CRC-16/ANSI, polynomial 0xA001 reflected).""" crc = init for b in data: crc ^= b for _ in range(8): if crc & 1: crc = (crc >> 1) ^ 0xA001 else: crc >>= 1 return crc & 0xFFFF def _decode_debug_status(status: int) -> str: """Преобразует код статуса прошивки в строку. Возвращает 'OK' или перечисление битов через '|'. Не зависит от того, WATCH или LowLevel. """ if status == DEBUG_OK: return "OK" parts = [] if status & DEBUG_ERR: for mask, name in _DEBUG_ERR_BITS: if status & mask: parts.append(name) if not parts: # старший бит есть, но ни один из известных младших не выставлен parts.append("ERR") else: # Неожиданно: статус !=0, но бит DEBUG_ERR не стоит parts.append(f"0x{status:02X}") return "|".join(parts) class Spoiler(QtWidgets.QWidget): def __init__(self, title="", animationDuration=300, parent=None): super().__init__(parent) self._animationDuration = animationDuration self.state = False # --- Toggle button --- self.toggleButton = QtWidgets.QToolButton(self) self.toggleButton.setStyleSheet("QToolButton { border: none; }") self.toggleButton.setToolButtonStyle(QtCore.Qt.ToolButtonTextBesideIcon) self.toggleButton.setArrowType(QtCore.Qt.RightArrow) self.toggleButton.setText(title) self.toggleButton.setCheckable(True) # --- Header line --- self.headerLine = QtWidgets.QFrame(self) self.headerLine.setFrameShape(QtWidgets.QFrame.HLine) self.headerLine.setFrameShadow(QtWidgets.QFrame.Sunken) # --- Content area --- self.contentArea = QtWidgets.QScrollArea(self) self.contentArea.setSizePolicy(QtWidgets.QSizePolicy.Expanding, QtWidgets.QSizePolicy.Fixed) self.contentArea.setFrameShape(QtWidgets.QFrame.NoFrame) self.contentArea.setWidgetResizable(True) self._contentWidget = QtWidgets.QWidget() self.contentArea.setWidget(self._contentWidget) self.contentArea.setMaximumHeight(0) # --- Анимация только по контенту --- self._ani_content = QtCore.QPropertyAnimation(self.contentArea, b"maximumHeight") self._ani_content.setDuration(animationDuration) self._ani_content.setEasingCurve(QtCore.QEasingCurve.InOutCubic) # Следим за шагами анимации → обновляем родителя self._ani_content.valueChanged.connect(self._adjust_parent_size) # --- Layout --- self.mainLayout = QtWidgets.QGridLayout(self) self.mainLayout.setVerticalSpacing(0) self.mainLayout.setContentsMargins(0, 0, 0, 0) self.mainLayout.addWidget(self.toggleButton, 0, 0, 1, 1) self.mainLayout.addWidget(self.headerLine, 0, 1, 1, 1) self.mainLayout.addWidget(self.contentArea, 1, 0, 1, 2) # --- Signals --- self.toggleButton.clicked.connect(self._on_toggled) def setContentLayout(self, contentLayout): old = self._contentWidget.layout() if old: QtWidgets.QWidget().setLayout(old) self._contentWidget.setLayout(contentLayout) def getState(self): return self.state def _adjust_parent_size(self, *_): top = self.window() if top: size = top.size() size.setHeight(top.sizeHint().height()) # берём новую высоту top.resize(size) # ширина остаётся прежней def _on_toggled(self, checked: bool): self.state = checked self.toggleButton.setArrowType(QtCore.Qt.DownArrow if checked else QtCore.Qt.RightArrow) contentHeight = self._contentWidget.sizeHint().height() self._ani_content.stop() self._ani_content.setStartValue(self.contentArea.maximumHeight()) self._ani_content.setEndValue(contentHeight if checked else 0) # --- Фиксируем ширину на время анимации --- w = self.width() self.setFixedWidth(w) self._ani_content.finished.connect(lambda: self.setMaximumWidth(16777215)) # сброс фикса self._ani_content.start() # --------------------------- DebugTerminalWidget --------------------------- class DebugTerminalWidget(QtWidgets.QWidget): # Существующие сигналы (Watch) nameRead = QtCore.Signal(int, int, int, str) valueRead = QtCore.Signal(int, int, int, int, float) valuesRead = QtCore.Signal(int, int, list, list, list, list) # Новые сигналы (LowLevel) llValueRead = QtCore.Signal(int, int, int, int, float) # addr, status, rettype_raw, raw16_signed, scaled portOpened = QtCore.Signal(str) portClosed = QtCore.Signal(str) txBytes = QtCore.Signal(bytes) rxBytes = QtCore.Signal(bytes) def __init__(self, parent=None, *, start_byte=0x0A, cmd_byte=0x46, cmd_lowlevel=0x47, iq_scaling=None, read_timeout_ms=250, auto_crc_check=True, drop_if_busy=False, replace_if_busy=True): super().__init__(parent) self.device_addr = start_byte self.cmd_byte = cmd_byte self.cmd_lowlevel = cmd_lowlevel self.read_timeout_ms = read_timeout_ms self.auto_crc_check = auto_crc_check self._drop_if_busy = drop_if_busy self._replace_if_busy = replace_if_busy self._last_txn_timestamp = 0 self._ll_polling_active = False if iq_scaling is None: iq_scaling = {n: float(1 << n) for n in range(31)} iq_scaling[0] = 1.0 self.iq_scaling = iq_scaling # Serial self.serial = QtSerialPort.QSerialPort(self) self.serial.setBaudRate(115200) self.serial.readyRead.connect(self._on_ready_read) self.serial.errorOccurred.connect(self._on_serial_error) # State self._rx_buf = bytearray() self._busy = False self._pending_cmd = None # (frame, meta) self._txn_meta = None # {'service':bool,'index':int,'varqnt':int,'chain':...,'lowlevel':bool} self._txn_timer = QtCore.QTimer(self) self._txn_timer.setSingleShot(True) self._txn_timer.timeout.connect(self._on_txn_timeout) # Watch polling self._poll_timer = QtCore.QTimer(self) self._poll_timer.timeout.connect(self._on_poll_timeout) self._polling = False # LowLevel polling self._ll_poll_timer = QtCore.QTimer(self) self._ll_poll_timer.timeout.connect(self._on_ll_poll_timeout) self._ll_polling = False self._ll_polling_variables = [] # List of selected variables for polling self._ll_current_poll_index = -1 # Index of the variable currently being polled in the _ll_polling_variables list self._ll_current_var_info = [] self.csv_logger = CsvLogger() self._csv_logging_active = False self._last_csv_timestamp = 0 # Для отслеживания времени записи # Кэш: index -> (status, iq, name, is_signed, frac_bits) self._name_cache = {} # Очередь service индексов self._service_queue = [] self._pending_data_after_services = None # (base, count) self._build_ui() self._connect_ui() self.set_available_ports() # ------------------------------ UI ---------------------------------- def _build_ui(self): layout = QtWidgets.QVBoxLayout(self) # --- Serial group --- g_serial = QtWidgets.QGroupBox("Serial Port") hs = QtWidgets.QHBoxLayout(g_serial) self.cmb_port = QtWidgets.QComboBox() self.btn_refresh = QtWidgets.QPushButton("Refresh") self.cmb_baud = QtWidgets.QComboBox() self.cmb_baud.addItems(["9600","19200","38400","57600","115200","230400"]) self.cmb_baud.setCurrentText("115200") self.btn_open = QtWidgets.QPushButton("Open") hs.addWidget(QtWidgets.QLabel("Port:")) hs.addWidget(self.cmb_port, 1) hs.addWidget(self.btn_refresh) hs.addSpacing(10) hs.addWidget(QtWidgets.QLabel("Baud:")) hs.addWidget(self.cmb_baud) hs.addWidget(self.btn_open) # --- TabWidget --- self.tabs = QtWidgets.QTabWidget() self._build_watch_tab() self._build_lowlevel_tab() # <-- Вызываем новый метод g_control = QtWidgets.QGroupBox("Control / Status") control_layout = QtWidgets.QHBoxLayout(g_control) # Используем QHBoxLayout # Форма для статусов слева form_control = QtWidgets.QFormLayout() self.lbl_status = QtWidgets.QLabel("Idle") self.lbl_status.setStyleSheet("font-weight: bold; color: grey;") form_control.addRow("Status:", self.lbl_status) self.lbl_actual_interval = QtWidgets.QLabel("-") form_control.addRow("Actual Interval:", self.lbl_actual_interval) control_layout.addLayout(form_control, 1) # Растягиваем форму # Галочка Raw справа self.chk_raw = QtWidgets.QCheckBox("Raw (no IQ scaling)") control_layout.addWidget(self.chk_raw) # Создаем QGroupBox для группировки элементов управления CSV self.csv_log_groupbox = QtWidgets.QGroupBox("CSV Logging") csv_log_layout = QtWidgets.QVBoxLayout(self.csv_log_groupbox) # Передаем groupbox как родительский layout # Элементы управления CSV h_file_select = QtWidgets.QHBoxLayout() self.btn_select_csv_file = QtWidgets.QPushButton("Выбрать файл CSV") # Убедитесь, что self.csv_logger инициализирован где-то до этого момента self.lbl_csv_filename = QtWidgets.QLabel(self.csv_logger.filename) h_file_select.addWidget(self.btn_select_csv_file) h_file_select.addWidget(self.lbl_csv_filename, 1) csv_log_layout.addLayout(h_file_select) h_control_buttons = QtWidgets.QHBoxLayout() self.btn_start_csv_logging = QtWidgets.QPushButton("Начать запись в CSV") self.btn_stop_csv_logging = QtWidgets.QPushButton("Остановить запись в CSV") self.btn_save_csv_data = QtWidgets.QPushButton("Сохранить данные в CSV") self.btn_stop_csv_logging.setEnabled(False) # По умолчанию остановлена h_control_buttons.addWidget(self.btn_start_csv_logging) h_control_buttons.addWidget(self.btn_stop_csv_logging) h_control_buttons.addWidget(self.btn_save_csv_data) csv_log_layout.addLayout(h_control_buttons) # Добавляем QGroupBox в основной лейаут # --- UART Log --- self.log_spoiler = Spoiler("UART Log", animationDuration=300, parent=self) self.log_spoiler.setSizePolicy(QtWidgets.QSizePolicy.Expanding, QtWidgets.QSizePolicy.Minimum) log_layout = QtWidgets.QVBoxLayout() self.txt_log = QtWidgets.QTextEdit(); self.txt_log.setReadOnly(True) self.txt_log.setFontFamily("Courier") log_layout.addWidget(self.txt_log) self.log_spoiler.setContentLayout(log_layout) layout.addWidget(g_serial) layout.addWidget(self.tabs, 1) layout.addWidget(g_control) layout.addWidget(self.csv_log_groupbox) layout.addWidget(self.log_spoiler) layout.setStretch(layout.indexOf(g_serial), 0) layout.setStretch(layout.indexOf(self.tabs), 1) def _build_watch_tab(self): tab = QtWidgets.QWidget() main_layout = QtWidgets.QVBoxLayout(tab) # --- Variable Selector --- g_selector = QtWidgets.QGroupBox("Variable Selector") selector_layout = QtWidgets.QVBoxLayout(g_selector) form_selector = QtWidgets.QFormLayout() h_layout = QtWidgets.QHBoxLayout() self.spin_index = QtWidgets.QSpinBox() self.spin_index.setRange(0, 0x7FFF) self.spin_index.setAccelerated(True) self.chk_hex_index = QtWidgets.QCheckBox("Hex") self.spin_count = QtWidgets.QSpinBox() self.spin_count.setRange(1, 255) self.spin_count.setValue(1) # Первая группа: Base Index + spin + checkbox base_index_layout = QtWidgets.QHBoxLayout() base_index_label = QtWidgets.QLabel("Base Index") base_index_layout.addWidget(base_index_label) base_index_layout.addWidget(self.spin_index) base_index_layout.addWidget(self.chk_hex_index) base_index_layout.setSpacing(5) # Вторая группа: spin_count + метка справа count_layout = QtWidgets.QHBoxLayout() count_layout.setSpacing(2) # минимальный отступ count_layout.addWidget(self.spin_count) count_label = QtWidgets.QLabel("Cnt") count_layout.addWidget(count_label) # Добавляем обе группы в общий горизонтальный лэйаут h_layout.addLayout(base_index_layout) h_layout.addSpacing(20) h_layout.addLayout(count_layout) form_selector.addRow(h_layout) self.spin_interval = QtWidgets.QSpinBox() self.spin_interval.setRange(50, 10000) self.spin_interval.setValue(500) self.spin_interval.setSuffix(" ms") form_selector.addRow("Interval:", self.spin_interval) selector_layout.addLayout(form_selector) btn_layout = QtWidgets.QHBoxLayout() self.btn_update_service = QtWidgets.QPushButton("Update Service") self.btn_read_values = QtWidgets.QPushButton("Read Value(s)") self.btn_poll = QtWidgets.QPushButton("Start Polling") btn_layout.addWidget(self.btn_update_service) btn_layout.addWidget(self.btn_read_values) btn_layout.addWidget(self.btn_poll) selector_layout.addLayout(btn_layout) # --- Table --- g_table = QtWidgets.QGroupBox("Table") table_layout = QtWidgets.QVBoxLayout(g_table) self.tbl_values = QtWidgets.QTableWidget(0, 5) self.tbl_values.setHorizontalHeaderLabels(["Index", "Name", "IQ", "Raw", "Scaled"]) hh = self.tbl_values.horizontalHeader() for i in range(4): hh.setSectionResizeMode(i, QtWidgets.QHeaderView.ResizeToContents) hh.setSectionResizeMode(4, QtWidgets.QHeaderView.Stretch) self.tbl_values.verticalHeader().setVisible(False) table_layout.addWidget(self.tbl_values) # --- Вертикальный сплиттер --- v_split = QtWidgets.QSplitter(QtCore.Qt.Vertical) v_split.addWidget(g_selector) v_split.addWidget(g_table) v_split.setStretchFactor(0, 1) v_split.setStretchFactor(1, 3) v_split.setStretchFactor(2, 1) main_layout.addWidget(v_split) self.tabs.addTab(tab, "Watch") table_layout.addWidget(self.tbl_values) def _build_lowlevel_tab(self): # создаём виджет LowLevelSelectorWidget self.ll_selector = LowLevelSelectorWidget() # добавляем как корневой виджет вкладки self.tabs.addTab(self.ll_selector, "LowLevel") def _connect_ui(self): # Watch self.btn_refresh.clicked.connect(self.set_available_ports) self.btn_open.clicked.connect(self._open_close_port) self.btn_update_service.clicked.connect(self.request_service_update_for_table) self.btn_read_values.clicked.connect(self.request_values) self.btn_poll.clicked.connect(self._toggle_polling) self.chk_hex_index.stateChanged.connect(self._toggle_index_base) # LowLevel (новые и переделанные) self.ll_selector.variablePrepared.connect(self._on_ll_variable_prepared) self.ll_selector.xmlLoaded.connect(lambda p: self._log(f"[LL] XML loaded: {p}")) self.ll_selector.btn_read_once.clicked.connect(self.request_lowlevel_once) self.ll_selector.btn_start_polling.clicked.connect(self._toggle_ll_polling) # --- CSV Logging --- self.btn_select_csv_file.clicked.connect(self._select_csv_file) self.btn_start_csv_logging.clicked.connect(self._start_csv_logging) self.btn_stop_csv_logging.clicked.connect(self._stop_csv_logging) self.btn_save_csv_data.clicked.connect(self._save_csv_data) def set_status(self, text: str, mode: str = "idle"): colors = { "idle": "gray", "service": "blue", "values": "green", "error": "red" } color = colors.get(mode.lower(), "black") self.lbl_status.setText(text) self.lbl_status.setStyleSheet(f"font-weight: bold; color: {color};") # ----------------------------- SERIAL MGMT ---------------------------- def set_available_ports(self): cur = self.cmb_port.currentText() self.cmb_port.blockSignals(True) self.cmb_port.clear() for info in QtSerialPort.QSerialPortInfo.availablePorts(): self.cmb_port.addItem(info.portName()) if cur: ix = self.cmb_port.findText(cur) if ix >= 0: self.cmb_port.setCurrentIndex(ix) self.cmb_port.blockSignals(False) def _open_close_port(self): if self.serial.isOpen(): name = self.serial.portName() self.serial.close() self.btn_open.setText("Open") self._log(f"[PORT OK] Closed {name}") self.portClosed.emit(name) return port = self.cmb_port.currentText() if not port: self._log("[ERR] No port selected") return self.serial.setPortName(port) self.serial.setBaudRate(int(self.cmb_baud.currentText())) if not self.serial.open(QtCore.QIODevice.ReadWrite): self._log(f"[ERR] Open fail {port}: {self.serial.errorString()}") return self.btn_open.setText("Close") self._log(f"[PORT OK] Opened {port}") self.portOpened.emit(port) # ---------------------------- FRAME BUILD ----------------------------- def _build_request(self, index: int, *, service: bool, varqnt: int) -> bytes: dbg = index & 0x7FFF if service: dbg |= WATCH_SERVICE_BIT hi = (dbg >> 8) & 0xFF lo = dbg & 0xFF q = varqnt & 0xFF payload = bytes([self.device_addr & 0xFF, self.cmd_byte & 0xFF, hi, lo, q]) crc = crc16_ibm(payload) return payload + bytes([crc & 0xFF, (crc >> 8) & 0xFF]) def _build_lowlevel_request(self, var_info: dict) -> bytes: # Формат: [adr][cmd_lowlevel][year_hi][year_lo][month][day][hour][minute][addr2][addr1][addr0][pt_type][iq_type][return_type] # Пытаемся получить время из переданной информации dt_info = self.ll_selector.get_datetime() if dt_info: # Используем время из var_info year = dt_info.year month = dt_info.month day = dt_info.day hour = dt_info.hour minute = dt_info.minute self._log("[LL] Using time from selector.") else: return addr = var_info.get('address', 0) addr2 = (addr >> 16) & 0xFF addr1 = (addr >> 8) & 0xFF addr0 = addr & 0xFF # Ensure 'ptr_type' and 'iq_type' from var_info are integers (enum values) # Use a fallback to 0 if they are not found or not integers pt_type = var_info.get('ptr_type_enum', 0) & 0xFF iq_type = var_info.get('iq_type_enum', 0) & 0xFF ret_type = var_info.get('return_type_enum', 0) & 0xFF frame_wo_crc = bytes([ self.device_addr & 0xFF, self.cmd_lowlevel & 0xFF, (year >> 8) & 0xFF, year & 0xFF, month & 0xFF, day & 0xFF, hour & 0xFF, minute & 0xFF, addr2, addr1, addr0, pt_type, iq_type, ret_type ]) crc = crc16_ibm(frame_wo_crc) return frame_wo_crc + bytes([crc & 0xFF, (crc >> 8) & 0xFF]) # ----------------------------- PUBLIC API ----------------------------- def request_service_single(self): idx = int(self.spin_index.value()) self._enqueue_or_start(idx, service=True, varqnt=0) def request_service_update_for_table(self): """ Очищает кеш имен/типов для всех видимых в таблице переменных и инициирует их повторное чтение. """ indices_to_update = [] for row in range(self.tbl_values.rowCount()): item = self.tbl_values.item(row, 0) if item and item.text().isdigit(): indices_to_update.append(int(item.text())) if not indices_to_update: self._log("[SERVICE] No variables in table to update.") return self._log(f"[SERVICE] Queuing name/type update for {len(indices_to_update)} variables.") # Очищаем кеш для этих индексов, чтобы принудительно их перечитать for index in indices_to_update: if index in self._name_cache: del self._name_cache[index] # Запускаем стандартный запрос значений. Он автоматически обработает # отсутствующую сервисную информацию (имена/типы) перед запросом данных. if not (self._polling or self._ll_polling): self.request_values() def request_values(self): self._update_interval() base = int(self.spin_index.value()) count = int(self.spin_count.value()) needed = [] for i in range(base, base+count): if i not in self._name_cache: needed.append(i) if needed: self._service_queue = needed[:] self._pending_data_after_services = (base, count) self._log(f"[AUTO] Need service for {len(needed)} indices: {needed}") self.set_status("Read service...", "service") self._kick_service_queue() else: self.set_status("Read values...", "values") self._enqueue_or_start(base, service=False, varqnt=count) def request_lowlevel_once(self): """Запрашивает чтение выбранной LowLevel переменной (однократно).""" if not self.serial.isOpen(): self._log("[LL] Port is not open.") return if self._busy: self._log("[LL] Busy, request dropped.") return # Если переменная не подготовлена, или нет актуальной информации if not hasattr(self, '_ll_current_var_info') or not self._ll_current_var_info: self._log("[LL] No variable prepared/selected for single read!") return frame = self._build_lowlevel_request(self._ll_current_var_info) # --- НОВОЕ: Передаем ll_var_info в метаданные транзакции --- meta = {'lowlevel': True, 'll_polling': False, 'll_var_info': self._ll_current_var_info} self.set_status("Read lowlevel...", "values") self._enqueue_raw(frame, meta) # -------------------------- SERVICE QUEUE FLOW ------------------------ # ... (код без изменений) def _kick_service_queue(self): if self._busy: return if self._service_queue: nxt = self._service_queue.pop(0) self._enqueue_or_start(nxt, service=True, varqnt=0, queue_mode=True) elif self._pending_data_after_services: base, count = self._pending_data_after_services self._pending_data_after_services = None self._enqueue_or_start(base, service=False, varqnt=count) # ------------------------ TRANSACTION SCHEDULER ----------------------- # ... (код без изменений) def _enqueue_raw(self, frame: bytes, meta: dict): # Добавляем ll_var_info, если это LL запрос if meta.get('lowlevel', False) and 'll_var_info' not in meta: # Это должно быть установлено вызывающим кодом, но для безопасности # или если LL polling не передал var_info явно meta['ll_var_info'] = self._ll_current_var_info # Используем last prepared var info for single shots if self._busy: # ... существующий код ... if self._replace_if_busy: self._pending_cmd = (frame, meta) self._log("[LOCKSTEP] Busy -> replaced pending") else: self._log("[LOCKSTEP] Busy -> ignore") return self._start_txn(frame, meta) def _enqueue_or_start(self, index, service: bool, varqnt: int, chain_after=None, queue_mode=False): frame = self._build_request(index, service=service, varqnt=varqnt) meta = {'service': service, 'index': index, 'varqnt': varqnt, 'chain': chain_after, 'queue_mode': queue_mode, 'lowlevel': False} if self._busy: if self._drop_if_busy and not self._replace_if_busy: self._log("[LOCKSTEP] Busy -> drop") return if self._replace_if_busy: self._pending_cmd = (frame, meta) self._log("[LOCKSTEP] Busy -> replaced pending") else: self._log("[LOCKSTEP] Busy -> ignore") return self._start_txn(frame, meta) def _start_txn(self, frame: bytes, meta: dict): if(meta.get('service')): self._update_interval() self._busy = True self._txn_meta = meta self._rx_buf.clear() self._set_ui_busy(True) self._send(frame) self._txn_timer.start(self.read_timeout_ms) def _end_txn(self): self._txn_timer.stop() queue_mode = False chain = None meta = self._txn_meta if meta: queue_mode = meta.get('queue_mode', False) chain = meta.get('chain') self._txn_meta = None self._busy = False self._rx_buf.clear() self._set_ui_busy(False) if chain: base, serv, q = chain self._enqueue_or_start(base, service=serv, varqnt=q) return if self._pending_cmd is not None: frame, meta = self._pending_cmd self._pending_cmd = None QtCore.QTimer.singleShot(0, lambda f=frame, m=meta: self._start_txn(f, m)) return if queue_mode: QtCore.QTimer.singleShot(0, self._kick_service_queue) # !!! Раньше тут было `return`, его убираем # Если идёт LL polling — переходим сразу к следующей переменной if self._ll_polling and (self._ll_poll_index < len(self._ll_polling_variables)): self._process_next_ll_variable_in_cycle() return def _on_txn_timeout(self): if not self._busy: return is_ll = self._txn_meta.get('lowlevel', False) if self._txn_meta else False log_prefix = "[LL TIMEOUT]" if is_ll else "[TIMEOUT]" self._log(f"{log_prefix} No response") if self._rx_buf: self._log_frame(bytes(self._rx_buf), tx=False) self._end_txn() self.set_status("Timeout", "error") # ------------------------------- TX/RX --------------------------------- # ... (код без изменений) def _send(self, data: bytes): w = self.serial.write(data) if w != len(data): self._log(f"[ERR] Write short {w}/{len(data)}") self.txBytes.emit(data) self._log_frame(data, tx=True) def _on_ready_read(self): self._rx_buf.extend(self.serial.readAll().data()) if not self._busy: if self._rx_buf: self._log("[WARN] Data while idle -> drop") self._log_frame(bytes(self._rx_buf), tx=False) self._rx_buf.clear() return self._try_parse() if not (self._polling or self._ll_polling): self.set_status("Idle", "idle") # ------------------------------- PARSING ------------------------------- def _try_parse(self): if not self._txn_meta: return if self._txn_meta.get('lowlevel', False): self._try_parse_lowlevel() else: self._try_parse_watch() def _try_parse_watch(self): # ... (код без изменений) service = self._txn_meta['service'] buf = self._rx_buf trailer_len = 4 if service: if len(buf) < 7 + trailer_len: return name_len = buf[6] expected = 7 + name_len + trailer_len if len(buf) < expected: return frame = bytes(buf[:expected]); del buf[:expected] self.rxBytes.emit(frame); self._log_frame(frame, tx=False) self._parse_service_frame(frame) self._end_txn() else: if len(buf) < 6 + trailer_len: return varqnt = buf[4]; status = buf[5] if status != DEBUG_OK: expected = 8 + trailer_len if len(buf) < expected: return frame = bytes(buf[:expected]); del buf[:expected] self.rxBytes.emit(frame); self._log_frame(frame, tx=False) self._parse_data_frame(frame, error_mode=True) self._end_txn() else: expected = 6 + varqnt*2 + trailer_len if len(buf) < expected: return frame = bytes(buf[:expected]); del buf[:expected] self.rxBytes.emit(frame); self._log_frame(frame, tx=False) self._parse_data_frame(frame, error_mode=False) self._end_txn() def _try_parse_lowlevel(self): # Ожидаемая длина: Успех=13, Ошибка=10 buf = self._rx_buf if len(buf) < 10: # Минимальная длина (ошибка) return # Проверяем, что ответ для нас if buf[1] != self.cmd_lowlevel: self._log("[LL] Unexpected cmd in lowlevel parser, flushing.") self._log_frame(bytes(self._rx_buf), tx=False) self._rx_buf.clear() # Не завершаем транзакцию, ждём таймаута return status = buf[2] expected_len = 13 if status == DEBUG_OK else 10 if len(buf) >= expected_len: frame = bytes(buf[:expected_len]) del buf[:expected_len] self.rxBytes.emit(frame) self._log_frame(frame, tx=False) self._parse_lowlevel_frame(frame, success=(status == DEBUG_OK)) self._end_txn() def _check_crc(self, payload: bytes, crc_lo: int, crc_hi: int): if not self.auto_crc_check: return True crc_rx = (crc_hi << 8) | crc_lo crc_calc = crc16_ibm(payload) if crc_calc != crc_rx: self._log(f"[CRC FAIL] calc=0x{crc_calc:04X} rx=0x{crc_rx:04X}") return False self._log("[CRC OK]") return True @staticmethod def _clear_service_bit(vhi, vlo): return ((vhi & 0x7F) << 8) | vlo def _parse_service_frame(self, frame: bytes): # ... (код без изменений) payload = frame[:-4]; crc_lo, crc_hi = frame[-4], frame[-3] if len(payload) < 7: self._log("[ERR] Service frame too short"); return self._check_crc(payload, crc_lo, crc_hi) adr, cmd, vhi, vlo, status, iq_raw, name_len = payload[:7] status_desc = _decode_debug_status(status) index = self._clear_service_bit(vhi, vlo) if len(payload) < 7 + name_len: self._log("[ERR] Service name truncated"); return name_bytes = payload[7:7+name_len]; name = name_bytes.decode(errors='replace') is_signed = (iq_raw & SIGN_BIT_MASK) != 0 frac_bits = iq_raw & FRAC_MASK_FULL if status == DEBUG_OK: self._name_cache[index] = (status, iq_raw, name, is_signed, frac_bits) self.nameRead.emit(index, status, iq_raw, name) self._log(f"[SERVICE] idx={index} status={status} iq_raw=0x{iq_raw:02X} sign={'S' if is_signed else 'U'} frac={frac_bits} name='{name}'") def _parse_data_frame(self, frame: bytes, *, error_mode: bool): payload = frame[:-4]; crc_lo, crc_hi = frame[-4], frame[-3] if len(payload) < 6: self._log("[ERR] Data frame too short"); return self._check_crc(payload, crc_lo, crc_hi) adr, cmd, vhi, vlo, varqnt, status = payload[:6] base = self._clear_service_bit(vhi, vlo) if error_mode: self.set_status("Error", "error") if len(payload) < 8: self._log("[ERR] Error frame truncated"); return err_hi, err_lo = payload[6:8] bad_index = (err_hi << 8) | err_lo desc = _decode_debug_status(status) self._log(f"[DATA] ERROR status=0x{status:02X} ({desc}) bad_index={bad_index}") # Обновим UI self._populate_watch_error(bad_index, status) # Сигналы (оставляем совместимость) self.valueRead.emit(bad_index, status, 0, 0, float('nan')) self.valuesRead.emit(base, 0, [], [], [], []) return if len(payload) < 6 + varqnt*2: self._log("[ERR] Data payload truncated"); return raw_vals = [] pos = 6 for _ in range(varqnt): hi = payload[pos]; lo = payload[pos+1]; pos += 2 raw16 = (hi << 8) | lo raw_vals.append(raw16) idx_list = []; iq_list = []; name_list = []; scaled_list = []; display_raw_list = [] # Получаем текущее время один раз для всех переменных в этом фрейме current_time = time.time() for ofs, raw16 in enumerate(raw_vals): idx = base + ofs status_i, iq_raw, name_i, is_signed, frac_bits = self._name_cache.get(idx, (DEBUG_OK, 0, '', False, 0)) if is_signed and (raw16 & 0x8000): value_int = raw16 - 0x10000 else: value_int = raw16 if self.chk_raw.isChecked(): scale = 1.0 else: scale = self.iq_scaling.get(frac_bits, 1.0 / (1 << frac_bits)) scaled = float(value_int) / scale if frac_bits > 0 else float(value_int) idx_list.append(idx); iq_list.append(iq_raw); name_list.append(name_i) scaled_list.append(scaled); display_raw_list.append(value_int) # --- Здесь записываем имя и значение в csv_logger --- self.csv_logger.set_value(current_time, name_i, scaled) self._populate_table(idx_list, name_list, iq_list, display_raw_list, scaled_list) if varqnt == 1: if idx_list[0] == self.spin_index.value(): _, iq_raw0, name0, is_signed0, frac0 = self._name_cache.get(idx_list[0], (DEBUG_OK, 0, '', False, 0)) self.valueRead.emit(idx_list[0], status, iq_list[0], display_raw_list[0], scaled_list[0]) else: self.valuesRead.emit(base, varqnt, idx_list, iq_list, display_raw_list, scaled_list) self._log(f"[DATA] base={base} q={varqnt} values={[f'{v:.6g}' for v in scaled_list] if not self.chk_raw.isChecked() else raw_vals}") def _parse_lowlevel_frame(self, frame: bytes, success: bool): payload_len = 9 if success else 6 crc_pos = payload_len payload = frame[:payload_len] crc_lo, crc_hi = frame[crc_pos], frame[crc_pos+1] self._check_crc(payload, crc_lo, crc_hi) status = payload[2] addr2, addr1, addr0 = payload[3], payload[4], payload[5] addr24 = (addr2 << 16) | (addr1 << 8) | addr0 status_desc = _decode_debug_status(status) return_type = payload[6] data_hi, data_lo = payload[7], payload[8] raw16 = (data_hi << 8) | data_lo is_signed = (return_type & SIGN_BIT_MASK) != 0 frac_bits = return_type & FRAC_MASK_FULL if is_signed and (raw16 & 0x8000): value_int = raw16 - 0x10000 else: value_int = raw16 if self.chk_raw.isChecked(): scale = 1.0 else: scale = self.iq_scaling.get(frac_bits, 1.0 / (1 << frac_bits)) # 1 / 2^N scaled = float(value_int) / scale self.llValueRead.emit(addr24, status, return_type, value_int, scaled) var_name = None if self._ll_current_var_info.get("address") == addr24: var_name = self._ll_current_var_info.get("name") display_val = value_int if self.chk_raw.isChecked() else scaled if var_name: self.ll_selector.set_variable_value(var_name, display_val) self._log(f"[LL] OK addr=0x{addr24:06X} type=0x{return_type:02X} raw={value_int} scaled={scaled:.6g}") current_time = time.time() # Получаем текущее время self.csv_logger.set_value(current_time, var_name, display_val) def _populate_watch_error(self, bad_index: int, status: int): """Отобразить строку ошибки при неудачном ответе WATCH.""" desc = _decode_debug_status(status) self.tbl_values.setRowCount(1) self.tbl_values.setItem(0, 0, QtWidgets.QTableWidgetItem(str(bad_index))) self.tbl_values.setItem(0, 1, QtWidgets.QTableWidgetItem(f"")) self.tbl_values.setItem(0, 2, QtWidgets.QTableWidgetItem("-")) self.tbl_values.setItem(0, 3, QtWidgets.QTableWidgetItem("-")) self.tbl_values.setItem(0, 4, QtWidgets.QTableWidgetItem(""))\ def _populate_table(self, idxs, names, iqs, raws, scaled): """ Быстрое массовое обновление таблицы значений. - Не пересоздаём QTableWidgetItem при каждом вызове: обновляем текст. - Блокируем сортировку, сигналы и обновления на время заполнения. - Предвычисляем отображаемые строки (особенно формат scaled). """ tbl = self.tbl_values n = len(idxs) # Заморозка UI на время массового обновления prev_sorting = tbl.isSortingEnabled() tbl.setSortingEnabled(False) tbl.blockSignals(True) tbl.setUpdatesEnabled(False) # Подготовка размера if tbl.rowCount() != n: tbl.setRowCount(n) # Предварительно решаем: показывать сырые или масштабированные значения show_raw = self.chk_raw.isChecked() # Готовим строки (ускоряет при больших объёмах) # str() заранее, чтобы не повторять в цикле idx_strs = [str(v) for v in idxs] raw_strs = [str(v) for v in raws] scaled_strs = raw_strs if show_raw else [f"{v:.6g}" for v in scaled] # Флаги необновляемых ячеек (только выбор/просмотр) flags_ro = QtCore.Qt.ItemIsSelectable | QtCore.Qt.ItemIsEnabled # Локальный шорткат для быстрой установки текста в ячейку def _set_text(row, col, text): item = tbl.item(row, col) if item is None: item = QtWidgets.QTableWidgetItem(text) item.setFlags(flags_ro) tbl.setItem(row, col, item) else: # обновим текст только при изменении (немного экономит на больших данных) if item.text() != text: item.setText(text) if item.flags() != flags_ro: item.setFlags(flags_ro) # Основной цикл for row in range(n): iq_raw = iqs[row] is_signed = (iq_raw & SIGN_BIT_MASK) != 0 frac_bits = iq_raw & FRAC_MASK_FULL iq_disp = f"{frac_bits}{'s' if is_signed else 'u'}" _set_text(row, 0, idx_strs[row]) _set_text(row, 1, names[row]) _set_text(row, 2, iq_disp) _set_text(row, 3, raw_strs[row]) _set_text(row, 4, scaled_strs[row]) # Разморозка tbl.blockSignals(False) tbl.setUpdatesEnabled(True) tbl.setSortingEnabled(prev_sorting) tbl.viewport().update() # ------------------------------ POLLING -------------------------------- def _toggle_polling(self): if self._polling: self._poll_timer.stop() self._polling = False self.btn_poll.setText("Start Polling") self.set_status("Idle", "idle") self._log("[POLL] Stopped") else: interval = self.spin_interval.value() self._poll_timer.start(interval) self._polling = True self.btn_poll.setText("Stop Polling") self.set_status("Idle", "idle") self._log(f"[POLL] Started interval={interval}ms") self._set_ui_busy(False) # Обновить доступность кнопок def _on_poll_timeout(self): self.request_values() def _toggle_ll_polling(self): if self._ll_polling: # If currently polling, stop self._ll_polling = False self.ll_selector.btn_start_polling.setText("Start Polling") self._ll_poll_timer.stop() self._ll_polling_variables.clear() self._ll_current_poll_index = -1 self._log("[LL Polling] Stopped.") else: # If not polling, start # Get all selected variables from the LowLevelSelectorWidget self._ll_polling_variables = self.ll_selector.get_selected_variables_and_addresses() if not self._ll_polling_variables: self._log("[LL] No variables selected for polling. Aborting.") self.set_status("Error.", "error") return self._ll_polling = True self.ll_selector.btn_start_polling.setText("Stop Polling") self._ll_current_poll_index = 0 # Start from the first variable self._log(f"[LL Polling] Started. Polling {len(self._ll_polling_variables)} variables.") # Start the timer. It will trigger _on_ll_poll_timeout, which starts the cycle. # The first cycle starts immediately, subsequent cycles wait for the interval. self._ll_poll_timer.setInterval(self.ll_selector.spin_interval.value()) self._ll_poll_timer.start() # Start the timer for recurrent cycles # Immediately kick off the first variable read of the first cycle self._start_ll_cycle() def _on_ll_poll_timeout(self): """Вызывается по таймеру для старта нового цикла.""" if self._ll_polling and not self._busy: self._start_ll_cycle() elif self._busy: self._log("[LL Polling] Busy, skip cycle start.") def _start_ll_cycle(self): self._update_interval() """Запускает новый цикл опроса всех переменных.""" if not self._ll_polling or not self._ll_polling_variables: return self._ll_poll_index = 0 self._process_next_ll_variable_in_cycle() def _on_ll_variable_prepared(self, var_info: dict): """Срабатывает при выборе переменной в селекторе.""" self._ll_current_var_info = var_info def _process_next_ll_variable_in_cycle(self): if not self._ll_polling: # Добавим проверку, чтобы избежать вызова, если LL polling отключен return if self._ll_poll_index < len(self._ll_polling_variables): var_info = self._ll_polling_variables[self._ll_poll_index] self._on_ll_variable_prepared(var_info) self._ll_poll_index += 1 frame = self._build_lowlevel_request(var_info) # --- НОВОЕ: Передаем var_info в метаданные транзакции для LL polling --- meta = {'lowlevel': True, 'll_polling': True, 'll_var_info': var_info} self.set_status(f"Polling LL: {var_info.get('name')}", "values") self._enqueue_raw(frame, meta) else: # Цикл завершен, перезапускаем таймер для следующего полного цикла self._ll_poll_index = 0 self._ll_poll_timer.start(self.ll_selector.spin_interval.value()) self.set_status("LL polling cycle done, waiting...", "idle") # ------------------------------ HELPERS -------------------------------- def _toggle_index_base(self, st): # ... (код без изменений) val = self.spin_index.value() if st == QtCore.Qt.Checked: self.spin_index.setDisplayIntegerBase(16); self.spin_index.setPrefix("0x") else: self.spin_index.setDisplayIntegerBase(10); self.spin_index.setPrefix("") self.spin_index.setValue(val) def _set_ui_busy(self, busy: bool): # Блокируем кнопки в зависимости от состояния 'busy' и 'polling' # Watch tab can_use_watch = not busy and not (self._polling or self._ll_polling) #self.btn_update_service.setEnabled(can_use_watch) self.btn_read_values.setEnabled(can_use_watch) # LowLevel tab can_use_ll = not busy and not (self._ll_polling or self._polling) self.ll_selector.btn_read_once.setEnabled(can_use_ll) def _on_serial_error(self, err): # ... (код без изменений) if err == QtSerialPort.QSerialPort.NoError: return self._log(f"[SERIAL ERR] {self.serial.errorString()} ({err})") if self._busy: self._end_txn() # ------------------------------ LOGGING -------------------------------- def _select_csv_file(self): """Открывает диалог выбора файла для CSV и обновляет UI.""" if self.csv_logger.select_file(self): # Передаем self как parent для диалога self.lbl_csv_filename.setText(self.csv_logger.filename) self._log(f"CSV file set to: {self.csv_logger.filename}") def _start_csv_logging(self): """Начинает запись данных в CSV. Устанавливает заголовки в зависимости от активной вкладки.""" if not self.serial.isOpen(): self._log("[CSV] Невозможно начать запись: COM порт не открыт.") self.set_status("Port closed", "error") return # Определяем активную вкладку и устанавливаем заголовки current_tab_index = self.tabs.currentIndex() varnames_for_csv = [] if self.tabs.tabText(current_tab_index) == "Watch": # Для вкладки Watch берем имена из кэша, если они есть, иначе используем Index_X base_index = self.spin_index.value() count = self.spin_count.value() for i in range(base_index, base_index + count): if i in self._name_cache and self._name_cache[i][2]: # status, iq_raw, name, is_signed, frac_bits varnames_for_csv.append(self._name_cache[i][2]) else: varnames_for_csv.append(f"Index_{i}") self._log(f"[CSV] Начинается запись для Watch переменных: {varnames_for_csv}") elif self.tabs.tabText(current_tab_index) == "LowLevel": # Для вкладки LowLevel берем имена из ll_selector selected_vars = self.ll_selector.get_selected_variables_and_addresses() varnames_for_csv = [var['name'] for var in selected_vars if 'name' in var] if not varnames_for_csv: self._log("[CSV] Внимание: На вкладке LowLevel не выбраны переменные для записи.") self._log(f"[CSV] Начинается запись для LowLevel переменных: {varnames_for_csv}") else: self._log("[CSV] Неизвестная активная вкладка. Невозможно определить заголовки CSV.") return if not varnames_for_csv: self._log("[CSV] Нет переменных для записи в CSV. Запись не начата.") return self.csv_logger.set_titles(varnames_for_csv) self._csv_logging_active = True self.btn_start_csv_logging.setEnabled(False) self.btn_stop_csv_logging.setEnabled(True) self.set_status("CSV Logging ACTIVE", "values") self._log("[CSV] Запись данных в CSV началась.") def _stop_csv_logging(self): """Останавливает запись данных в CSV.""" self._csv_logging_active = False self.btn_start_csv_logging.setEnabled(True) self.btn_stop_csv_logging.setEnabled(False) self.set_status("CSV Logging STOPPED", "idle") self._log("[CSV] Запись данных в CSV остановлена.") def _save_csv_data(self): """Сохраняет все собранные данные в CSV файл.""" if self._csv_logging_active: self._log("[CSV] Запись активна. Сначала остановите запись.") self.set_status("Stop logging first", "error") return self.csv_logger.write_to_csv() self.set_status("CSV data saved", "idle") def _log(self, msg: str): # ... (код без изменений) if 'ERR' in msg: self.set_status(msg, 'error') if 'OK' in msg: self.set_status('Idle', 'idle') if not self.log_spoiler.getState(): return ts = datetime.datetime.now().strftime('%H:%M:%S.%f')[:-3] self.txt_log.append(f"{ts} {msg}") def _log_frame(self, data: bytes, *, tx: bool): # ... (код без изменений) if not self.log_spoiler.getState(): return tag = 'TX' if tx else 'RX' hexs = ' '.join(f"{b:02X}" for b in data) ascii_part = ''.join(chr(b) if 32 <= b < 127 else '.' for b in data) self._log(f"[{tag}] {hexs} |{ascii_part}|") def _update_interval(self): now = time.perf_counter() if self._last_txn_timestamp is not None: delta_ms = (now - self._last_txn_timestamp) * 1000 # Обновляем UI только если он уже создан if hasattr(self, 'lbl_actual_interval'): self.lbl_actual_interval.setText(f"{delta_ms:.1f} ms") self._last_txn_timestamp = now # ---------------------------------------------------------- Demo harness --- class _DemoWindow(QtWidgets.QMainWindow): def __init__(self): super().__init__() self.setWindowTitle("DebugVar Terminal") self.term = DebugTerminalWidget(self) self.setCentralWidget(self.term) self.resize(1000, 600) def closeEvent(self, event): self.setCentralWidget(None) if self.term: self.term.deleteLater(); self.term = None super().closeEvent(event) # ------------------------------- Demo -------------------------------------- if __name__ == "__main__": import sys app = QtWidgets.QApplication(sys.argv) win = _DemoWindow(); win.show() sys.exit(app.exec_())