from PySide2 import QtCore, QtWidgets, QtSerialPort import datetime # ------------------------------- Константы протокола ------------------------ WATCH_SERVICE_BIT = 0x8000 DEBUG_OK = 0 # ожидаемый код успешного чтения SIGN_BIT_MASK = 0x80 FRAC_MASK_FULL = 0x7F # если используем 7 бит дробной части # ---------------------------------------------------------------- CRC util --- def crc16_ibm(data: bytes, *, init=0xFFFF) -> int: """CRC16-IBM (aka CRC-16/ANSI, polynomial 0xA001 reflected).""" crc = init for b in data: crc ^= b for _ in range(8): if crc & 1: crc = (crc >> 1) ^ 0xA001 else: crc >>= 1 return crc & 0xFFFF class Spoiler(QtWidgets.QWidget): def __init__(self, title="", animationDuration=300, parent=None): super().__init__(parent) self._animationDuration = animationDuration # --- Toggle button --- self.toggleButton = QtWidgets.QToolButton(self) self.toggleButton.setStyleSheet("QToolButton { border: none; }") self.toggleButton.setToolButtonStyle(QtCore.Qt.ToolButtonTextBesideIcon) self.toggleButton.setArrowType(QtCore.Qt.RightArrow) self.toggleButton.setText(title) self.toggleButton.setCheckable(True) # --- Header line --- self.headerLine = QtWidgets.QFrame(self) self.headerLine.setFrameShape(QtWidgets.QFrame.HLine) self.headerLine.setFrameShadow(QtWidgets.QFrame.Sunken) # --- Content area --- self.contentArea = QtWidgets.QScrollArea(self) self.contentArea.setSizePolicy(QtWidgets.QSizePolicy.Expanding, QtWidgets.QSizePolicy.Fixed) self.contentArea.setFrameShape(QtWidgets.QFrame.NoFrame) self.contentArea.setWidgetResizable(True) self._contentWidget = QtWidgets.QWidget() self.contentArea.setWidget(self._contentWidget) self.contentArea.setMaximumHeight(0) # --- Анимация только по контенту --- self._ani_content = QtCore.QPropertyAnimation(self.contentArea, b"maximumHeight") self._ani_content.setDuration(animationDuration) self._ani_content.setEasingCurve(QtCore.QEasingCurve.InOutCubic) # Следим за шагами анимации → обновляем родителя self._ani_content.valueChanged.connect(self._adjust_parent_size) # --- Layout --- self.mainLayout = QtWidgets.QGridLayout(self) self.mainLayout.setVerticalSpacing(0) self.mainLayout.setContentsMargins(0, 0, 0, 0) self.mainLayout.addWidget(self.toggleButton, 0, 0, 1, 1) self.mainLayout.addWidget(self.headerLine, 0, 1, 1, 1) self.mainLayout.addWidget(self.contentArea, 1, 0, 1, 2) # --- Signals --- self.toggleButton.clicked.connect(self._on_toggled) def setContentLayout(self, contentLayout): old = self._contentWidget.layout() if old: QtWidgets.QWidget().setLayout(old) self._contentWidget.setLayout(contentLayout) def _adjust_parent_size(self, *_): top = self.window() if top: size = top.size() size.setHeight(top.sizeHint().height()) # берём новую высоту top.resize(size) # ширина остаётся прежней def _on_toggled(self, checked: bool): self.toggleButton.setArrowType(QtCore.Qt.DownArrow if checked else QtCore.Qt.RightArrow) contentHeight = self._contentWidget.sizeHint().height() self._ani_content.stop() self._ani_content.setStartValue(self.contentArea.maximumHeight()) self._ani_content.setEndValue(contentHeight if checked else 0) # --- Фиксируем ширину на время анимации --- w = self.width() self.setFixedWidth(w) self._ani_content.finished.connect(lambda: self.setMaximumWidth(16777215)) # сброс фикса self._ani_content.start() # --------------------------- DebugTerminalWidget --------------------------- class DebugTerminalWidget(QtWidgets.QWidget): nameRead = QtCore.Signal(int, int, int, str) # index, status, iq, name valueRead = QtCore.Signal(int, int, int, int, float) # для одиночного valuesRead = QtCore.Signal(int, int, list, list, list, list) # base, count, idx_list, iq_list, raw_list, float_list portOpened = QtCore.Signal(str) portClosed = QtCore.Signal(str) txBytes = QtCore.Signal(bytes) rxBytes = QtCore.Signal(bytes) def __init__(self, parent=None, *, start_byte=0x0A, cmd_byte=0x44, iq_scaling=None, read_timeout_ms=250, auto_crc_check=True, drop_if_busy=False, replace_if_busy=True): super().__init__(parent) self.device_addr = start_byte self.cmd_byte = cmd_byte self.read_timeout_ms = read_timeout_ms self.auto_crc_check = auto_crc_check self._drop_if_busy = drop_if_busy self._replace_if_busy = replace_if_busy if iq_scaling is None: iq_scaling = {n: float(1 << n) for n in range(16)} iq_scaling[0] = 1.0 self.iq_scaling = iq_scaling # Serial self.serial = QtSerialPort.QSerialPort(self) self.serial.setBaudRate(115200) self.serial.readyRead.connect(self._on_ready_read) self.serial.errorOccurred.connect(self._on_serial_error) # State self._rx_buf = bytearray() self._busy = False self._pending_cmd = None # (frame, meta) self._txn_meta = None # {'service':bool,'index':int,'varqnt':int,'chain':...} self._txn_timer = QtCore.QTimer(self) self._txn_timer.setSingleShot(True) self._txn_timer.timeout.connect(self._on_txn_timeout) self._poll_timer = QtCore.QTimer(self) self._poll_timer.timeout.connect(self._on_poll_timeout) self._polling = False # Кэш: index -> (status, iq, name) self._name_cache = {} # Очередь требуемых service индексов перед чтением блока self._service_queue = [] # список индексов self._pending_data_after_services = None # (base, count) self._build_ui() self._connect_ui() self.set_available_ports() # ------------------------------ UI ---------------------------------- def _build_ui(self): layout = QtWidgets.QVBoxLayout(self) # --- Serial group --- g_serial = QtWidgets.QGroupBox("Serial Port") hs = QtWidgets.QHBoxLayout(g_serial) self.cmb_port = QtWidgets.QComboBox() self.btn_refresh = QtWidgets.QPushButton("Refresh") self.cmb_baud = QtWidgets.QComboBox() self.cmb_baud.addItems(["9600","19200","38400","57600","115200","230400"]) self.cmb_baud.setCurrentText("115200") self.btn_open = QtWidgets.QPushButton("Open") hs.addWidget(QtWidgets.QLabel("Port:")) hs.addWidget(self.cmb_port, 1) hs.addWidget(self.btn_refresh) hs.addSpacing(10) hs.addWidget(QtWidgets.QLabel("Baud:")) hs.addWidget(self.cmb_baud) hs.addWidget(self.btn_open) # --- Watch group (будет растягиваться) --- g_watch = QtWidgets.QGroupBox("Watch Variables") grid = QtWidgets.QGridLayout(g_watch) grid.setHorizontalSpacing(8) grid.setVerticalSpacing(4) self.spin_index = QtWidgets.QSpinBox() self.spin_index.setRange(0, 0x7FFF) self.spin_index.setAccelerated(True) self.chk_hex_index = QtWidgets.QCheckBox("Hex") self.spin_count = QtWidgets.QSpinBox(); self.spin_count.setRange(1,255); self.spin_count.setValue(1) self.btn_read_service = QtWidgets.QPushButton("Read Name/Type") self.btn_read_values = QtWidgets.QPushButton("Read Value(s)") self.btn_poll = QtWidgets.QPushButton("Start Polling") self.spin_interval = QtWidgets.QSpinBox(); self.spin_interval.setRange(50,10000); self.spin_interval.setValue(500); self.spin_interval.setSuffix(" ms") self.chk_auto_service = QtWidgets.QCheckBox("Auto service before values if miss cache"); self.chk_auto_service.setChecked(True) self.chk_raw = QtWidgets.QCheckBox("Raw (no IQ scaling)") self.lbl_name = QtWidgets.QLineEdit(); self.lbl_name.setReadOnly(True) self.lbl_iq = QtWidgets.QLabel("-") self.edit_single_value = QtWidgets.QLineEdit(); self.edit_single_value.setReadOnly(True) # --- Таблица: теперь 5 столбцов (если уже поменяли) --- self.tbl_values = QtWidgets.QTableWidget(0, 5) self.tbl_values.setHorizontalHeaderLabels(["Index","Name","IQ","Raw","Scaled"]) self.tbl_values.setSizePolicy(QtWidgets.QSizePolicy.Expanding, QtWidgets.QSizePolicy.Expanding) hh = self.tbl_values.horizontalHeader() hh.setSectionResizeMode(0, QtWidgets.QHeaderView.ResizeToContents) hh.setSectionResizeMode(1, QtWidgets.QHeaderView.ResizeToContents) hh.setSectionResizeMode(2, QtWidgets.QHeaderView.ResizeToContents) hh.setSectionResizeMode(3, QtWidgets.QHeaderView.ResizeToContents) hh.setSectionResizeMode(4, QtWidgets.QHeaderView.Stretch) vh = self.tbl_values.verticalHeader() vh.setVisible(False) r = 0 grid.addWidget(QtWidgets.QLabel("Base Index:"), r, 0) grid.addWidget(self.spin_index, r, 1) grid.addWidget(self.chk_hex_index, r, 2); r += 1 grid.addWidget(QtWidgets.QLabel("Count:"), r, 0) grid.addWidget(self.spin_count, r, 1); r += 1 grid.addWidget(self.btn_read_service, r, 0) grid.addWidget(self.btn_read_values, r, 1) grid.addWidget(self.btn_poll, r, 2); r += 1 grid.addWidget(QtWidgets.QLabel("Interval:"), r, 0) grid.addWidget(self.spin_interval, r, 1) grid.addWidget(self.chk_auto_service, r, 2); r += 1 grid.addWidget(QtWidgets.QLabel("Name:"), r, 0) grid.addWidget(self.lbl_name, r, 1, 1, 2); r += 1 grid.addWidget(QtWidgets.QLabel("IQ:"), r, 0) grid.addWidget(self.lbl_iq, r, 1) grid.addWidget(self.chk_raw, r, 2); r += 1 grid.addWidget(QtWidgets.QLabel("Single:"), r, 0) grid.addWidget(self.edit_single_value, r, 1, 1, 2); r += 1 grid.addWidget(QtWidgets.QLabel("Array Values:"), r, 0); r += 1 # --- Строка с таблицей, назначаем stretch --- grid.addWidget(self.tbl_values, r, 0, 1, 3) grid.setRowStretch(r, 1) # таблица тянется # Все предыдущие строки по умолчанию имеют stretch=0 # Можно явно grid.setRowStretch(i, 0) при желании # --- Добавляем группы в главный layout --- layout.addWidget(g_serial, 0) # не растягивается (stretch=0) layout.addWidget(g_watch, 1) # растягивается (stretch=1) # --- UART Log (минимальная высота) --- self.log_spoiler = Spoiler("UART Log", animationDuration=300, parent=self) self.log_spoiler.setSizePolicy(QtWidgets.QSizePolicy.Expanding, QtWidgets.QSizePolicy.Minimum) log_layout = QtWidgets.QVBoxLayout() self.txt_log = QtWidgets.QTextEdit() self.txt_log.setReadOnly(True) self.txt_log.setFontFamily("Courier") self.txt_log.setSizePolicy(QtWidgets.QSizePolicy.Expanding, QtWidgets.QSizePolicy.Minimum) log_layout.addWidget(self.txt_log) self.log_spoiler.setContentLayout(log_layout) # Добавляем лог последним, но без stretch (0) layout.addWidget(self.log_spoiler, 0) # Строчки распределения: g_watch = 1, остальное = 0 # Если хочешь принудительно: # layout.setStretchFactor(g_serial, 0) # PySide2: нет прямого метода, можно: layout.setStretch(layout.indexOf(g_serial), 0) layout.setStretch(layout.indexOf(g_watch), 1) layout.setStretch(layout.indexOf(self.log_spoiler), 0) def _connect_ui(self): self.btn_refresh.clicked.connect(self.set_available_ports) self.btn_open.clicked.connect(self._open_close_port) self.btn_read_service.clicked.connect(self.request_service_single) self.btn_read_values.clicked.connect(self.request_values) self.btn_poll.clicked.connect(self._toggle_polling) self.chk_hex_index.stateChanged.connect(self._toggle_index_base) # ----------------------------- SERIAL MGMT ---------------------------- def set_available_ports(self): cur = self.cmb_port.currentText() self.cmb_port.blockSignals(True) self.cmb_port.clear() for info in QtSerialPort.QSerialPortInfo.availablePorts(): self.cmb_port.addItem(info.portName()) if cur: ix = self.cmb_port.findText(cur) if ix >= 0: self.cmb_port.setCurrentIndex(ix) self.cmb_port.blockSignals(False) def _open_close_port(self): if self.serial.isOpen(): name = self.serial.portName() self.serial.close() self.btn_open.setText("Open") self._log(f"[PORT] Closed {name}") self.portClosed.emit(name) return port = self.cmb_port.currentText() if not port: self._log("[ERR] No port selected") return self.serial.setPortName(port) self.serial.setBaudRate(int(self.cmb_baud.currentText())) if not self.serial.open(QtCore.QIODevice.ReadWrite): self._log(f"[ERR] Open fail {port}: {self.serial.errorString()}") return self.btn_open.setText("Close") self._log(f"[PORT] Opened {port}") self.portOpened.emit(port) # ---------------------------- FRAME BUILD ----------------------------- def _build_request(self, index: int, *, service: bool, varqnt: int) -> bytes: dbg = index & 0x7FFF if service: dbg |= WATCH_SERVICE_BIT hi = (dbg >> 8) & 0xFF lo = dbg & 0xFF q = varqnt & 0xFF payload = bytes([self.device_addr & 0xFF, self.cmd_byte & 0xFF, hi, lo, q]) crc = crc16_ibm(payload) return payload + bytes([crc & 0xFF, (crc >> 8) & 0xFF]) # ----------------------------- PUBLIC API ----------------------------- def request_service_single(self): idx = int(self.spin_index.value()) self._enqueue_or_start(idx, service=True, varqnt=0) def request_values(self): base = int(self.spin_index.value()) count = int(self.spin_count.value()) needed = [] if self.chk_auto_service.isChecked(): for i in range(base, base+count): if i not in self._name_cache: needed.append(i) if needed: self._service_queue = needed[:] # копия self._pending_data_after_services = (base, count) self._log(f"[AUTO] Need service for {len(needed)} indices: {needed}") self._kick_service_queue() else: self._enqueue_or_start(base, service=False, varqnt=count) # -------------------------- SERVICE QUEUE FLOW ------------------------ def _kick_service_queue(self): if self._busy: return # дождёмся завершения if self._service_queue: nxt = self._service_queue.pop(0) # не используем chain, просто по завершению снова вызовем _kick_service_queue self._enqueue_or_start(nxt, service=True, varqnt=0, queue_mode=True) elif self._pending_data_after_services: base, count = self._pending_data_after_services self._pending_data_after_services = None self._enqueue_or_start(base, service=False, varqnt=count) # ------------------------ TRANSACTION SCHEDULER ----------------------- def _enqueue_or_start(self, index, service: bool, varqnt: int, chain_after=None, queue_mode=False): frame = self._build_request(index, service=service, varqnt=varqnt) meta = {'service': service, 'index': index, 'varqnt': varqnt, 'chain': chain_after, 'queue_mode': queue_mode} if self._busy: if self._drop_if_busy and not self._replace_if_busy: self._log("[LOCKSTEP] Busy -> drop") return if self._replace_if_busy: self._pending_cmd = (frame, meta) self._log("[LOCKSTEP] Busy -> replaced pending") else: self._log("[LOCKSTEP] Busy -> ignore") return self._start_txn(frame, meta) def _start_txn(self, frame: bytes, meta: dict): self._busy = True self._txn_meta = meta self._rx_buf.clear() self._set_ui_busy(True) self._send(frame) self._txn_timer.start(self.read_timeout_ms) def _end_txn(self): self._txn_timer.stop() queue_mode = False chain = None if self._txn_meta: queue_mode = self._txn_meta.get('queue_mode', False) chain = self._txn_meta.get('chain') self._txn_meta = None self._busy = False self._rx_buf.clear() self._set_ui_busy(False) # Если был chain -> запустить его if chain: base, serv, q = chain self._enqueue_or_start(base, service=serv, varqnt=q) return if self._pending_cmd is not None: frame, meta = self._pending_cmd self._pending_cmd = None QtCore.QTimer.singleShot(0, lambda f=frame,m=meta: self._start_txn(f,m)) return # Если это был элемент очереди service -> continue if queue_mode: QtCore.QTimer.singleShot(0, self._kick_service_queue) return def _on_txn_timeout(self): if not self._busy: return self._log("[TIMEOUT] No response") if self._rx_buf: self._log_frame(bytes(self._rx_buf), tx=False) self._end_txn() # ------------------------------- TX/RX --------------------------------- def _send(self, data: bytes): w = self.serial.write(data) if w != len(data): self._log(f"[ERR] Write short {w}/{len(data)}") self.txBytes.emit(data) self._log_frame(data, tx=True) def _on_ready_read(self): self._rx_buf.extend(self.serial.readAll().data()) if not self._busy: if self._rx_buf: self._log("[WARN] Data while idle -> drop") self._log_frame(bytes(self._rx_buf), tx=False) self._rx_buf.clear() return self._try_parse() # ------------------------------- PARSING ------------------------------- def _try_parse(self): if not self._txn_meta: return service = self._txn_meta['service'] buf = self._rx_buf trailer_len = 4 if service: if len(buf) < 7 + trailer_len: # adr cmd vhi vlo status iq nameLen + trailer return name_len = buf[6] expected = 7 + name_len + trailer_len if len(buf) < expected: return frame = bytes(buf[:expected]) del buf[:expected] self.rxBytes.emit(frame) self._log_frame(frame, tx=False) self._parse_service_frame(frame) self._end_txn() else: if len(buf) < 6 + trailer_len: # adr cmd vhi vlo varqnt status + trailer return varqnt = buf[4] status = buf[5] if status != DEBUG_OK: expected = 8 + trailer_len # + errIndex(2) if len(buf) < expected: return frame = bytes(buf[:expected]) del buf[:expected] self.rxBytes.emit(frame) self._log_frame(frame, tx=False) self._parse_data_frame(frame, error_mode=True) self._end_txn() else: expected = 6 + varqnt*2 + trailer_len if len(buf) < expected: return frame = bytes(buf[:expected]) del buf[:expected] self.rxBytes.emit(frame) self._log_frame(frame, tx=False) self._parse_data_frame(frame, error_mode=False) self._end_txn() def _check_crc(self, payload: bytes, crc_lo: int, crc_hi: int): if not self.auto_crc_check: return True crc_rx = (crc_hi << 8) | crc_lo crc_calc = crc16_ibm(payload) if crc_calc != crc_rx: self._log(f"[CRC FAIL] calc=0x{crc_calc:04X} rx=0x{crc_rx:04X}") return False self._log("[CRC OK]") return True @staticmethod def _clear_service_bit(vhi, vlo): return ((vhi & 0x7F) << 8) | vlo def _parse_service_frame(self, frame: bytes): payload = frame[:-4] crc_lo, crc_hi = frame[-4], frame[-3] if len(payload) < 7: self._log("[ERR] Service frame too short") return self._check_crc(payload, crc_lo, crc_hi) adr, cmd, vhi, vlo, status, iq_raw, name_len = payload[:7] index = self._clear_service_bit(vhi, vlo) if len(payload) < 7 + name_len: self._log("[ERR] Service name truncated") return name_bytes = payload[7:7+name_len] name = name_bytes.decode(errors='replace') # ### PATCH: извлекаем признаки is_signed = (iq_raw & SIGN_BIT_MASK) != 0 frac_bits = iq_raw & FRAC_MASK_FULL if status == DEBUG_OK: # Кэшируем расширенный кортеж self._name_cache[index] = (status, iq_raw, name, is_signed, frac_bits) self.nameRead.emit(index, status, iq_raw, name) if self.spin_count.value() == 1 and index == self.spin_index.value(): if status == DEBUG_OK: self.lbl_name.setText(name) # Отображаем IQ как «число_дробных_бит + s/u» self.lbl_iq.setText(f"{frac_bits}{'s' if is_signed else 'u'}") else: self.lbl_name.setText('') self.lbl_iq.setText('-') self._log(f"[SERVICE] idx={index} status={status} iq_raw=0x{iq_raw:02X} " f"sign={'S' if is_signed else 'U'} frac={frac_bits} name='{name}'") def _parse_data_frame(self, frame: bytes, *, error_mode: bool): payload = frame[:-4] crc_lo, crc_hi = frame[-4], frame[-3] if len(payload) < 6: self._log("[ERR] Data frame too short") return self._check_crc(payload, crc_lo, crc_hi) adr, cmd, vhi, vlo, varqnt, status = payload[:6] base = self._clear_service_bit(vhi, vlo) if error_mode: if len(payload) < 8: self._log("[ERR] Error frame truncated") return err_hi, err_lo = payload[6:8] bad_index = (err_hi << 8) | err_lo self._log(f"[DATA] ERROR status={status} bad_index={bad_index}") self.valueRead.emit(bad_index, status, 0, 0, float('nan')) self.valuesRead.emit(base, 0, [], [], [], []) return # Нормальный кадр if len(payload) < 6 + varqnt*2: self._log("[ERR] Data payload truncated") return raw_vals = [] pos = 6 for _ in range(varqnt): hi = payload[pos]; lo = payload[pos+1]; pos += 2 raw16 = (hi << 8) | lo raw_vals.append(raw16) # пока храним как 0..65535 idx_list = [] iq_list = [] name_list = [] scaled_list = [] display_raw_list = [] # для таблицы Raw (с учётом знака, если знак есть) for ofs, raw16 in enumerate(raw_vals): idx = base + ofs # В кэше теперь 5 элементов status_i, iq_raw, name_i, is_signed, frac_bits = self._name_cache.get( idx, (DEBUG_OK, 0, '', False, 0) ) # Приведение знака if is_signed and (raw16 & 0x8000): value_int = raw16 - 0x10000 # signed 16-bit else: value_int = raw16 # Масштаб if self.chk_raw.isChecked(): scale = 1.0 else: # scale берём: если в словаре нет — вычисляем 2**frac_bits scale = self.iq_scaling.get(frac_bits, 2.0 ** frac_bits) scaled = value_int / scale idx_list.append(idx) iq_list.append(iq_raw) # сырой байт (с битом знака) name_list.append(name_i) scaled_list.append(scaled) display_raw_list.append(value_int) # Populate table self._populate_table(idx_list, name_list, iq_list, display_raw_list, scaled_list) if varqnt == 1: self.edit_single_value.setText( str(display_raw_list[0]) if self.chk_raw.isChecked() else f"{scaled_list[0]:.6g}" ) if idx_list[0] == self.spin_index.value(): # Отобразим мета # Достаём из кэша снова (или можно из name_list/iq_list + пересчитать) _, iq_raw0, name0, is_signed0, frac0 = self._name_cache.get(idx_list[0], (DEBUG_OK, 0, '', False, 0)) self.lbl_name.setText(name0) self.lbl_iq.setText(f"{frac0}{'s' if is_signed0 else 'u'}") self.valueRead.emit(idx_list[0], status, iq_list[0], display_raw_list[0], scaled_list[0]) else: self.edit_single_value.setText("") self.valuesRead.emit(base, varqnt, idx_list, iq_list, display_raw_list, scaled_list) self._log(f"[DATA] base={base} q={varqnt} values={[f'{v:.6g}' for v in scaled_list] if not self.chk_raw.isChecked() else raw_vals}") def _populate_table(self, idxs, names, iqs, raws, scaled): self.tbl_values.setRowCount(len(idxs)) for row, (idx, nm, iq_raw, rv, sv) in enumerate(zip(idxs, names, iqs, raws, scaled)): is_signed = (iq_raw & SIGN_BIT_MASK) != 0 frac_bits = iq_raw & FRAC_MASK_FULL iq_disp = f"{frac_bits}{'s' if is_signed else 'u'}" raw_display = str(rv) scaled_display = raw_display if self.chk_raw.isChecked() else f"{sv:.6g}" items = [ QtWidgets.QTableWidgetItem(str(idx)), QtWidgets.QTableWidgetItem(nm), QtWidgets.QTableWidgetItem(iq_disp), QtWidgets.QTableWidgetItem(raw_display), QtWidgets.QTableWidgetItem(scaled_display), ] for it in items: it.setFlags(it.flags() & ~QtCore.Qt.ItemIsEditable) for c, it in enumerate(items): self.tbl_values.setItem(row, c, it) # ------------------------------ POLLING -------------------------------- def _toggle_polling(self): if self._polling: self._poll_timer.stop(); self._polling=False; self.btn_poll.setText("Start Polling"); self._log("[POLL] Stopped") self.btn_read_service.setEnabled(True) self.btn_read_values.setEnabled(True) else: self._poll_timer.start(self.spin_interval.value()); self._polling=True; self.btn_poll.setText("Stop Polling"); self._log(f"[POLL] Started interval={self.spin_interval.value()}ms") self.btn_read_service.setEnabled(False) self.btn_read_values.setEnabled(False) def _on_poll_timeout(self): if not self.serial.isOpen() or self._busy: return self.request_values() # ------------------------------ HELPERS -------------------------------- def _toggle_index_base(self, st): val = self.spin_index.value() if st == QtCore.Qt.Checked: self.spin_index.setDisplayIntegerBase(16); self.spin_index.setPrefix("0x") else: self.spin_index.setDisplayIntegerBase(10); self.spin_index.setPrefix("") self.spin_index.setValue(val) def _set_ui_busy(self, busy: bool): if self._polling == False: self.btn_read_service.setEnabled(not busy) self.btn_read_values.setEnabled(not busy) def _on_serial_error(self, err): if err == QtSerialPort.QSerialPort.NoError: return self._log(f"[SERIAL ERR] {self.serial.errorString()} ({err})") if self._busy: self._end_txn() # ------------------------------ LOGGING -------------------------------- def _log(self, msg: str): ts = datetime.datetime.now().strftime('%H:%M:%S.%f')[:-3] self.txt_log.append(f"{ts} {msg}") def _log_frame(self, data: bytes, *, tx: bool): tag = 'TX' if tx else 'RX' hexs = ' '.join(f"{b:02X}" for b in data) ascii_part = ''.join(chr(b) if 32 <= b < 127 else '.' for b in data) self._log(f"[{tag}] {hexs} |{ascii_part}|") # ---------------------------------------------------------- Demo harness --- class _DemoWindow(QtWidgets.QMainWindow): def __init__(self): super().__init__() self.setWindowTitle("DebugVar Terminal") self.term = DebugTerminalWidget(self) self.setCentralWidget(self.term) # connect sample signals -> print self.term.nameRead.connect(self._on_name) self.term.valueRead.connect(self._on_value) def _on_name(self, index, status, iq, name): print(f"Name idx={index} status={status} iq={iq} name='{name}'") def _on_value(self, index, status, iq, raw16, floatVal): print(f"Value idx={index} status={status} iq={iq} raw={raw16} val={floatVal}") def closeEvent(self, event): """Вызывается при закрытии окна.""" # Явно удаляем центральный виджет self.setCentralWidget(None) if self.term: self.term.deleteLater() self.term = None super().closeEvent(event) # ------------------------------- Demo -------------------------------------- if __name__ == "__main__": import sys app = QtWidgets.QApplication(sys.argv) win = _DemoWindow() win.show() sys.exit(app.exec_())