Начата работа над lowlevel терминалкой (по адресам из xml)

This commit is contained in:
Razvalyaev 2025-07-19 18:01:40 +03:00
parent 171f176d63
commit 6830743477
8 changed files with 1802 additions and 698 deletions

252
Src/path_hints.py Normal file
View File

@ -0,0 +1,252 @@
# path_hints.py
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple
import re
# ---------------------- tokenization helpers ----------------------
def split_path_tokens(path: str) -> List[str]:
"""
Разбивает строку пути на логические части:
'foo[2].bar[1]->baz' -> ['foo', '[2]', 'bar', '[1]', 'baz']
Аналог твоей split_path(), но оставлена как чистая функция.
"""
tokens: List[str] = []
token = ''
i = 0
L = len(path)
while i < L:
c = path[i]
# '->'
if c == '-' and i + 1 < L and path[i:i+2] == '->':
if token:
tokens.append(token)
token = ''
i += 2
continue
# одиночный '-' в конце
if c == '-' and i == L - 1:
i += 1
continue
# '.'
if c == '.':
if token:
tokens.append(token)
token = ''
i += 1
continue
# '[' ... ']'
if c == '[':
if token:
tokens.append(token)
token = ''
idx = ''
while i < L and path[i] != ']':
idx += path[i]
i += 1
if i < L and path[i] == ']':
idx += ']'
i += 1
tokens.append(idx)
continue
# обычный символ
token += c
i += 1
if token:
tokens.append(token)
return tokens
def canonical_key(path: str) -> str:
"""
Преобразует путь к канонической форме для индекса / поиска:
- '->' -> '.'
- '[' -> '.['
- lower()
"""
p = path.replace('->', '.')
p = p.replace('[', '.[')
return p.lower()
# ---------------------- индекс узлов ----------------------
@dataclass
class PathNode:
"""
Узел в логическом дереве путей.
Храним:
- собственное имя (локальное, напр. 'controller' или '[3]')
- полный путь (оригинальный, как его должен видеть пользователь)
- тип (опционально; widget может хранить отдельно)
- дети
"""
name: str
full_path: str
type_str: str = ''
children: Dict[str, "PathNode"] = field(default_factory=dict)
def add_child(self, child: "PathNode") -> None:
self.children[child.name] = child
class PathHints:
"""
Движок автоподсказок / completion.
Работает с плоским списком ПОЛНЫХ имён (как показываются пользователю).
Сам восстанавливает иерархию и выдаёт подсказки по текущему вводу.
Qt-независим.
"""
def __init__(self) -> None:
self._paths: List[str] = []
self._types: Dict[str, str] = {} # full_path -> type_str (опционально)
self._index: Dict[str, PathNode] = {} # canonical full path -> node
self._root_children: Dict[str, PathNode] = {} # top-level по первому токену
# ------------ Подаём данные ------------
def set_paths(self,
paths: List[Tuple[str, Optional[str]]]
) -> None:
"""
paths: список кортежей (full_path, type_str|None).
Пример: ('project.controller.read.errors.bit.status_er0', 'unsigned int')
Поля могут содержать '->' и индексы, т.е. строки в пользовательском формате.
NOTE: порядок не важен; дерево строится автоматически.
"""
self._paths = []
self._types.clear()
self._index.clear()
self._root_children.clear()
for p, t in paths:
if t is None:
t = ''
self._add_path(p, t)
def _add_path(self, full_path: str, type_str: str) -> None:
self._paths.append(full_path)
self._types[full_path] = type_str
toks = split_path_tokens(full_path)
if not toks:
return
cur_dict = self._root_children
cur_full = ''
parent_node: Optional[PathNode] = None
for i, tok in enumerate(toks):
# Собираем ПОЛНЫЙ путь
if cur_full == '':
cur_full = tok
else:
if tok.startswith('['):
cur_full += tok
else:
cur_full += '.' + tok
# Если узел уже есть
node = cur_dict.get(tok)
if node is None:
# --- ВАЖНО: full_path = cur_full ---
node = PathNode(name=tok, full_path=cur_full)
cur_dict[tok] = node
# Регистрируем все узлы, включая промежуточные
self._index[canonical_key(cur_full)] = node
parent_node = node
cur_dict = node.children
# В последний узел добавляем тип
if parent_node:
parent_node.type_str = type_str
# ------------ Поиск узла ------------
def find_node(self, path: str) -> Optional[PathNode]:
return self._index.get(canonical_key(path))
# ------------ Подсказки ------------
def suggest(self,
text: str,
*,
include_partial: bool = True
) -> List[str]:
"""
Вернёт список *полных имён узлов*, подходящих под ввод.
Правила (упрощённо, повторяя твою update_completions()):
- Если текст пуст top-level.
- Если заканчивается на '.' или '->' или '[' вернуть детей текущего узла.
- Иначе фильтр по последнему фрагменту (prefix substring match).
"""
text = text or ''
stripped = text.strip()
# пусто: top-level
if stripped == '':
return sorted(self._root_full_names())
# Завершение по разделителю?
if stripped.endswith('.') or stripped.endswith('->') or stripped.endswith('['):
base = stripped[:-1] if stripped.endswith('[') else stripped.rstrip('.').rstrip('>').rstrip('-')
node = self.find_node(base)
if node:
return self._children_full_names(node)
# не нашли базу — ничего
return []
# иначе: обычный поиск по последней части
toks = split_path_tokens(stripped)
prefix_last = toks[-1].lower() if toks else ''
parent_toks = toks[:-1]
if not parent_toks:
# фильтр top-level
res = []
for name, node in self._root_children.items():
if prefix_last == '' or prefix_last in name.lower():
res.append(node.full_path)
return sorted(res)
# есть родитель
parent_path = self._join_tokens(parent_toks)
parent_node = self.find_node(parent_path)
if not parent_node:
return []
res = []
for child in parent_node.children.values():
if prefix_last == '' or prefix_last in child.name.lower():
res.append(child.full_path)
return sorted(res)
# ------------ внутренние вспомогательные ------------
def _root_full_names(self) -> List[str]:
return [node.full_path for node in self._root_children.values()]
def _children_full_names(self, node: PathNode) -> List[str]:
return [ch.full_path for ch in node.children.values()]
@staticmethod
def _join_tokens(tokens: List[str]) -> str:
"""
Собираем путь обратно. Для внутренних нужд (поиск), формат не критичен
всё равно canonical_key() нормализует.
"""
if not tokens:
return ''
out = tokens[0]
for t in tokens[1:]:
if t.startswith('['):
out += t
else:
out += '.' + t
return out

View File

@ -0,0 +1,739 @@
"""
LowLevelSelectorWidget (PySide2)
--------------------------------
Виджет для:
* Выбора XML файла с описанием переменных (как в примере пользователя)
* Парсинга всех <variable> и их вложенных <member>
* Построения плоского списка путей (имя/подпуть) с расчётом абсолютного адреса (base_address + offset)
* Определения структур с полями даты (year, month, day, hour, minute)
* Выбора переменной и (опционально) переменной даты / ручного ввода даты
* Выбора типов: ptr_type (pt_*), iq_type, return_type
* Форматирования адреса в виде 0x000000 (6 HEX)
* Генерации словаря/кадра для последующей LowLevel-команды (не отправляет сам)
Интеграция:
* Подключите сигнал variablePrepared(dict) к функции, формирующей и отправляющей пакет.
* Содержимое dict:
{
'address': int,
'address_hex': str, # '0x....'
'ptr_type': int, # значение enum pt_*
'iq_type': int,
'return_type': int,
'datetime': {
'year': int,
'month': int,
'day': int,
'hour': int,
'minute': int,
},
'path': str, # полный путь переменной
'type_string': str, # строка типа из XML
}
Зависимости: только PySide2 и стандартная библиотека.
"""
from __future__ import annotations
import sys
import xml.etree.ElementTree as ET
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Tuple
from PySide2 import QtCore, QtGui, QtWidgets
from path_hints import PathHints
# ------------------------------------------------------------ Enumerations --
# Сопоставление строк из XML типу ptr_type (адаптируйте под реальный проект)
PTR_TYPE_MAP = {
'int8': 'pt_int8', 'signed char': 'pt_int8', 'char': 'pt_int8',
'int16': 'pt_int16', 'short': 'pt_int16', 'int': 'pt_int16',
'int32': 'pt_int32', 'long': 'pt_int32',
'int64': 'pt_int64', 'long long': 'pt_int64',
'uint8': 'pt_uint8', 'unsigned char': 'pt_uint8',
'uint16': 'pt_uint16', 'unsigned short': 'pt_uint16', 'unsigned int': 'pt_uint16',
'uint32': 'pt_uint32', 'unsigned long': 'pt_uint32',
'uint64': 'pt_uint64', 'unsigned long long': 'pt_uint64',
'float': 'pt_float', 'floatf': 'pt_float',
'struct': 'pt_struct', 'union': 'pt_union',
}
PT_ENUM_ORDER = [
'pt_unknown','pt_int8','pt_int16','pt_int32','pt_int64',
'pt_uint8','pt_uint16','pt_uint32','pt_uint64','pt_float',
'pt_struct','pt_union'
]
IQ_ENUM_ORDER = [
't_iq_none','t_iq','t_iq1','t_iq2','t_iq3','t_iq4','t_iq5','t_iq6',
't_iq7','t_iq8','t_iq9','t_iq10','t_iq11','t_iq12','t_iq13','t_iq14',
't_iq15','t_iq16','t_iq17','t_iq18','t_iq19','t_iq20','t_iq21','t_iq22',
't_iq23','t_iq24','t_iq25','t_iq26','t_iq27','t_iq28','t_iq29','t_iq30'
]
# Для примера: маппинг имени enum -> числовое значение (индекс по порядку)
PT_ENUM_VALUE = {name: idx for idx, name in enumerate(PT_ENUM_ORDER)}
IQ_ENUM_VALUE = {name: idx for idx, name in enumerate(IQ_ENUM_ORDER)}
# -------------------------------------------------------------- Data types --
DATE_FIELD_SET = {'year','month','day','hour','minute'}
@dataclass
class MemberNode:
name: str
offset: int = 0
type_str: str = ''
size: Optional[int] = None
children: List['MemberNode'] = field(default_factory=list)
# --- новые, но необязательные (совместимость) ---
kind: Optional[str] = None # 'array', 'union', ...
count: Optional[int] = None # size1 (число элементов в массиве)
def is_date_struct(self) -> bool:
if not self.children:
return False
child_names = {c.name for c in self.children}
return DATE_FIELD_SET.issubset(child_names)
@dataclass
class VariableNode:
name: str
address: int
type_str: str
size: Optional[int]
members: List[MemberNode] = field(default_factory=list)
# --- новые, но необязательные ---
kind: Optional[str] = None # 'array'
count: Optional[int] = None # size1
def base_address_hex(self) -> str:
return f"0x{self.address:06X}"
# --------------------------- XML Parser ----------------------------
class VariablesXML:
"""
Читает твой XML и выдаёт плоский список путей:
- Массивы -> name[i], многоуровневые -> name[i][j]
- Указатель на структуру -> дети через '->'
- Обычная структура -> дети через '.'
"""
# предположительные размеры примитивов (под STM/MCU: int=2)
_PRIM_SIZE = {
'char':1, 'signed char':1, 'unsigned char':1, 'uint8_t':1, 'int8_t':1,
'short':2, 'short int':2, 'signed short':2, 'unsigned short':2,
'uint16_t':2, 'int16_t':2,
'int':2, 'signed int':2, 'unsigned int':2,
'long':4, 'unsigned long':4, 'int32_t':4, 'uint32_t':4,
'float':4,
'long long':8, 'unsigned long long':8, 'int64_t':8, 'uint64_t':8, 'double':8,
}
def __init__(self, path: str):
self.path = path
self.timestamp: str = ''
self.variables: List[VariableNode] = []
self._parse()
# ------------------ low helpers ------------------
@staticmethod
def _parse_int_guess(txt: Optional[str]) -> Optional[int]:
if not txt:
return None
txt = txt.strip()
if txt.startswith(('0x','0X')):
return int(txt, 16)
# если в строке есть буквы A-F → возможно hex
if any(c in 'abcdefABCDEF' for c in txt):
try:
return int(txt, 16)
except ValueError:
pass
try:
return int(txt, 10)
except ValueError:
return None
@staticmethod
def _is_pointer_to_struct(t: str) -> bool:
if not t:
return False
low = t.replace('\t',' ').replace('\n',' ')
return 'struct ' in low and '*' in low
@staticmethod
def _is_struct_or_union(t: str) -> bool:
if not t:
return False
low = t.strip()
return low.startswith('struct ') or low.startswith('union ')
@staticmethod
def _strip_array_suffix(t: str) -> str:
return t[:-2].strip() if t.endswith('[]') else t
def _guess_primitive_size(self, type_str: str) -> Optional[int]:
if not type_str:
return None
base = type_str
for tok in ('volatile','const'):
base = base.replace(tok, '')
base = base.replace('*',' ')
base = base.replace('[',' ').replace(']',' ')
base = ' '.join(base.split()).strip()
return self._PRIM_SIZE.get(base)
# ------------------ XML read ------------------
def _parse(self):
tree = ET.parse(self.path)
root = tree.getroot()
ts = root.find('timestamp')
self.timestamp = ts.text.strip() if ts is not None and ts.text else ''
def parse_member(elem) -> MemberNode:
name = elem.get('name','')
offset = int(elem.get('offset','0'),16) if elem.get('offset') else 0
t = elem.get('type','') or ''
size_attr = elem.get('size')
size = int(size_attr,16) if size_attr else None
kind = elem.get('kind')
size1_attr = elem.get('size1')
count = None
if size1_attr:
count = self._parse_int_guess(size1_attr)
node = MemberNode(name=name, offset=offset, type_str=t, size=size,
kind=kind, count=count)
for ch in elem.findall('member'):
node.children.append(parse_member(ch))
return node
for var in root.findall('variable'):
addr = int(var.get('address','0'),16)
name = var.get('name','')
t = var.get('type','') or ''
size_attr = var.get('size')
size = int(size_attr,16) if size_attr else None
kind = var.get('kind')
size1_attr = var.get('size1')
count = None
if size1_attr:
count = self._parse_int_guess(size1_attr)
members = [parse_member(m) for m in var.findall('member')]
self.variables.append(
VariableNode(name=name, address=addr, type_str=t, size=size,
members=members, kind=kind, count=count)
)
# ------------------ flatten (expanded) ------------------
def flattened(self,
max_array_elems: Optional[int] = None
) -> List[Tuple[str,int,str]]:
"""
Вернёт [(path, addr, type_str), ...].
max_array_elems: ограничить разворачивание больших массивов (None = все).
"""
out: List[Tuple[str,int,str]] = []
def add(path: str, addr: int, t: str):
out.append((path, addr, t))
def compute_stride(size_bytes: Optional[int],
count: Optional[int],
base_type: Optional[str],
node_children: Optional[List[MemberNode]]) -> int:
# 1) пробуем size/count
if size_bytes and count and count > 0:
stride = size_bytes // count
if stride * count != size_bytes:
# округлённо вверх
stride = (size_bytes + count - 1) // count
if stride <= 0:
stride = 1
return stride
# 2) размер примитива по типу
if base_type:
gs = self._guess_primitive_size(base_type)
if gs:
return gs
# 3) попытка по детям (структура)
if node_children:
min_off = min(ch.offset for ch in node_children)
max_end = min_off
for ch in node_children:
sz = ch.size
if not sz:
sz = self._guess_primitive_size(ch.type_str) or 1
end = ch.offset + sz
if end > max_end:
max_end = end
stride = max_end - min_off
if stride > 0:
return stride
return 1
def expand_members(prefix_name: str,
base_addr: int,
members: List[MemberNode],
parent_is_ptr_struct: bool):
"""
Разворачиваем список members относительно базового адреса.
parent_is_ptr_struct: если True, то соединение '->' иначе '.'
"""
join = '->' if parent_is_ptr_struct else '.'
for m in members:
path_m = f"{prefix_name}{join}{m.name}" if prefix_name else m.name
addr_m = base_addr + m.offset
add(path_m, addr_m, m.type_str)
# массив?
if (m.kind == 'array') or m.type_str.endswith('[]'):
count = m.count
if count is None:
count = 0 # неизвестно → не разворачиваем
if count <= 0:
continue
base_t = self._strip_array_suffix(m.type_str)
stride = compute_stride(m.size, count, base_t, m.children if m.children else None)
limit = count if max_array_elems is None else min(count, max_array_elems)
for i in range(limit):
path_i = f"{path_m}[{i}]"
addr_i = addr_m + i*stride
add(path_i, addr_i, base_t)
# элемент массива: если структура / union → раскроем поля
if m.children and self._is_struct_or_union(base_t):
expand_members(path_i, addr_i, m.children, parent_is_ptr_struct=False)
# элемент массива: если указатель на структуру
elif self._is_pointer_to_struct(base_t):
# у таких обычно нет children в XML, но если есть — используем
expand_members(path_i, addr_i, m.children, parent_is_ptr_struct=True)
continue
# не массив
if m.children:
is_ptr_struct = self._is_pointer_to_struct(m.type_str)
expand_members(path_m, addr_m, m.children, parent_is_ptr_struct=is_ptr_struct)
# --- top-level ---
for v in self.variables:
add(v.name, v.address, v.type_str)
# top-level массив?
if (v.kind == 'array') or v.type_str.endswith('[]'):
count = v.count
if count is None:
count = 0
if count > 0:
base_t = self._strip_array_suffix(v.type_str)
stride = compute_stride(v.size, count, base_t, v.members if v.members else None)
limit = count if max_array_elems is None else min(count, max_array_elems)
for i in range(limit):
p = f"{v.name}[{i}]"
a = v.address + i*stride
add(p, a, base_t)
# массив структур?
if v.members and self._is_struct_or_union(base_t):
expand_members(p, a, v.members, parent_is_ptr_struct=False)
# массив указателей на структуры?
elif self._is_pointer_to_struct(base_t):
expand_members(p, a, v.members, parent_is_ptr_struct=True)
continue # к след. переменной
# top-level не массив
if v.members:
is_ptr_struct = self._is_pointer_to_struct(v.type_str)
expand_members(v.name, v.address, v.members, parent_is_ptr_struct=is_ptr_struct)
return out
# -------------------- date candidates (как было) --------------------
def date_struct_candidates(self) -> List[Tuple[str,int]]:
cands = []
for v in self.variables:
# верхний уровень (если есть все поля даты)
direct_names = {mm.name for mm in v.members}
if DATE_FIELD_SET.issubset(direct_names):
cands.append((v.name, v.address))
# проверка членов первого уровня
for m in v.members:
if m.is_date_struct():
cands.append((f"{v.name}.{m.name}", v.address + m.offset))
return cands
# ------------------------------------------- Address / validation helpers --
HEX_ADDR_MASK = QtCore.QRegExp(r"0x[0-9A-Fa-f]{0,6}")
class HexAddrValidator(QtGui.QRegExpValidator):
def __init__(self, parent=None):
super().__init__(HEX_ADDR_MASK, parent)
@staticmethod
def normalize(text: str) -> str:
if not text:
return '0x000000'
try:
val = int(text,16)
except ValueError:
return '0x000000'
return f"0x{val & 0xFFFFFF:06X}"
# --------------------------------------------------------- Main Widget ----
class LowLevelSelectorWidget(QtWidgets.QWidget):
variablePrepared = QtCore.Signal(dict)
xmlLoaded = QtCore.Signal(str)
def __init__(self, parent=None):
super().__init__(parent)
self.setWindowTitle('LowLevel Variable Selector')
self._xml: Optional[VariablesXML] = None
self._paths = []
self._path_info = {}
self._addr_index = {}
self._hints = PathHints()
self._build_ui()
self._connect()
def _build_ui(self):
lay = QtWidgets.QVBoxLayout(self)
# --- File chooser ---
file_box = QtWidgets.QHBoxLayout()
self.btn_load = QtWidgets.QPushButton('Load XML...')
self.lbl_file = QtWidgets.QLabel('<no file>')
self.lbl_file.setTextInteractionFlags(QtCore.Qt.TextSelectableByMouse)
file_box.addWidget(self.btn_load)
file_box.addWidget(self.lbl_file, 1)
lay.addLayout(file_box)
self.lbl_timestamp = QtWidgets.QLabel('Timestamp: -')
lay.addWidget(self.lbl_timestamp)
form = QtWidgets.QFormLayout()
# --- Search field for variable ---
self.edit_var_search = QtWidgets.QLineEdit()
self.edit_var_search.setPlaceholderText("Введите имя/путь или адрес 0x......")
form.addRow('Variable:', self.edit_var_search)
# Popup list
self._popup = QtWidgets.QListView()
self._popup.setWindowFlags(QtCore.Qt.ToolTip)
self._popup.setEditTriggers(QtWidgets.QAbstractItemView.NoEditTriggers)
self._popup.setHorizontalScrollBarPolicy(QtCore.Qt.ScrollBarAlwaysOff)
self._popup.clicked.connect(self._on_popup_clicked)
self._model_all = QtGui.QStandardItemModel(self)
self._model_filtered = QtGui.QStandardItemModel(self)
# Address
self.edit_address = QtWidgets.QLineEdit('0x000000')
self.edit_address.setValidator(HexAddrValidator(self))
self.edit_address.setMaximumWidth(120)
form.addRow('Address:', self.edit_address)
# Manual date spins
dt_row = QtWidgets.QHBoxLayout()
self.spin_year = QtWidgets.QSpinBox(); self.spin_year.setRange(2000, 2100); self.spin_year.setValue(2025)
self.spin_month = QtWidgets.QSpinBox(); self.spin_month.setRange(1,12)
self.spin_day = QtWidgets.QSpinBox(); self.spin_day.setRange(1,31)
self.spin_hour = QtWidgets.QSpinBox(); self.spin_hour.setRange(0,23)
self.spin_minute = QtWidgets.QSpinBox(); self.spin_minute.setRange(0,59)
for w,label in [(self.spin_year,'Y'),(self.spin_month,'M'),(self.spin_day,'D'),(self.spin_hour,'h'),(self.spin_minute,'m')]:
box = QtWidgets.QVBoxLayout()
box.addWidget(QtWidgets.QLabel(label, alignment=QtCore.Qt.AlignHCenter))
box.addWidget(w)
dt_row.addLayout(box)
form.addRow('Manual Date:', dt_row)
# Types
self.cmb_ptr_type = QtWidgets.QComboBox(); self.cmb_ptr_type.addItems(PT_ENUM_ORDER)
self.cmb_iq_type = QtWidgets.QComboBox(); self.cmb_iq_type.addItems(IQ_ENUM_ORDER)
self.cmb_return_type = QtWidgets.QComboBox(); self.cmb_return_type.addItems(IQ_ENUM_ORDER)
form.addRow('ptr_type:', self.cmb_ptr_type)
form.addRow('iq_type:', self.cmb_iq_type)
form.addRow('return_type:', self.cmb_return_type)
lay.addLayout(form)
self.btn_prepare = QtWidgets.QPushButton('Prepare Variable Dict')
lay.addWidget(self.btn_prepare)
lay.addStretch(1)
self.txt_info = QtWidgets.QPlainTextEdit()
self.txt_info.setReadOnly(True)
self.txt_info.setMaximumHeight(140)
lay.addWidget(QtWidgets.QLabel('Info:'))
lay.addWidget(self.txt_info)
# Event filter for keyboard on search field
self.edit_var_search.installEventFilter(self)
def _connect(self):
self.btn_load.clicked.connect(self._on_load_xml)
self.edit_address.editingFinished.connect(self._normalize_address)
self.btn_prepare.clicked.connect(self._emit_variable)
self.edit_var_search.textEdited.connect(self._on_var_search_edited)
self.edit_var_search.returnPressed.connect(self._activate_current_popup_selection)
# ---------------- XML Load ----------------
def _on_load_xml(self):
path, _ = QtWidgets.QFileDialog.getOpenFileName(
self, 'Select variables XML', '', 'XML Files (*.xml);;All Files (*)')
if not path:
return
try:
self._xml = VariablesXML(path)
except Exception as e:
QtWidgets.QMessageBox.critical(self, 'Parse error', f'Ошибка парсинга:\n{e}')
return
self.lbl_file.setText(path)
self.lbl_timestamp.setText(f'Timestamp: {self._xml.timestamp or "-"}')
self._populate_variables()
self._apply_timestamp_to_date()
self.xmlLoaded.emit(path)
self._log(f'Loaded {path}, variables={len(self._xml.variables)}')
def _apply_timestamp_to_date(self):
if not self._xml.timestamp:
return
import datetime
try:
# Пример: "Sat Jul 19 15:27:59 2025"
dt = datetime.datetime.strptime(self._xml.timestamp, "%a %b %d %H:%M:%S %Y")
self.spin_year.setValue(dt.year)
self.spin_month.setValue(dt.month)
self.spin_day.setValue(dt.day)
self.spin_hour.setValue(dt.hour)
self.spin_minute.setValue(dt.minute)
except Exception as e:
print(f"Ошибка разбора timestamp '{self._xml.timestamp}': {e}")
def _populate_variables(self):
if not self._xml:
return
flat = self._xml.flattened()
# flat: [(path, addr, type_str), ...]
self._paths = []
self._path_info = {}
self._addr_index = {}
self._model_all.clear() # держим «сырой» полный список (можно не показывать)
self._model_filtered.clear() # текущие подсказки
# индексирование
for path, addr, t in flat:
self._paths.append(path)
self._path_info[path] = (addr, t)
if addr in self._addr_index:
self._addr_index[addr] = None
else:
self._addr_index[addr] = path
# наполняем «all» модель (необязательная, но пусть остаётся — не используем напрямую)
it = QtGui.QStandardItem(f"{path} [{addr:06X}]")
it.setData(path, QtCore.Qt.UserRole+1)
it.setData(addr, QtCore.Qt.UserRole+2)
it.setData(t, QtCore.Qt.UserRole+3)
self._model_all.appendRow(it)
# построить подсказки
self._hints.set_paths([(p, self._path_info[p][1]) for p in self._paths])
# начальное состояние попапа (пустой ввод → top-level)
self._update_popup_model(self._hints.suggest(''))
self._log(f"Variables loaded: {len(flat)}")
# --------------- Search mechanics ---------------
def _update_popup_model(self, paths: List[str]):
"""Обновляет модель попапа списком путей (full paths)."""
self._model_filtered.clear()
limit = 400
added = 0
for p in paths:
info = self._path_info.get(p)
if not info:
continue
addr, t = info
it = QtGui.QStandardItem(f"{p} [{addr:06X}]")
it.setData(p, QtCore.Qt.UserRole+1)
it.setData(addr, QtCore.Qt.UserRole+2)
it.setData(t, QtCore.Qt.UserRole+3)
self._model_filtered.appendRow(it)
added += 1
if added >= limit:
break
if added >= limit:
extra = QtGui.QStandardItem("... (more results truncated)")
extra.setEnabled(False)
self._model_filtered.appendRow(extra)
def _show_popup(self):
if self._model_filtered.rowCount() == 0:
self._popup.hide()
return
self._popup.setModel(self._model_filtered)
self._popup.setMinimumWidth(self.edit_var_search.width())
pos = self.edit_var_search.mapToGlobal(QtCore.QPoint(0, self.edit_var_search.height()))
self._popup.move(pos)
self._popup.show()
self._popup.raise_()
self._popup.setFocus()
self._popup.setCurrentIndex(self._model_filtered.index(0,0))
def _hide_popup(self):
self._popup.hide()
def _on_var_search_edited(self, text: str):
t = text.strip()
# адрес?
if t.startswith("0x") and len(t) >= 3:
try:
addr = int(t, 16)
path = self._addr_index.get(addr)
if path:
self._set_current_variable(path, from_address=True)
self._hide_popup()
return
except ValueError:
pass
# подсказки по имени
suggestions = self._hints.suggest(t)
self._update_popup_model(suggestions)
self._show_popup()
def _on_popup_clicked(self, idx: QtCore.QModelIndex):
if not idx.isValid():
return
path = idx.data(QtCore.Qt.UserRole+1)
if path:
self._set_current_variable(path)
self._hide_popup()
def _activate_current_popup_selection(self):
if self._popup.isVisible():
idx = self._popup.currentIndex()
if idx.isValid():
self._on_popup_clicked(idx)
return
# Попытка прямого совпадения
path = self.edit_var_search.text().strip()
if path in self._path_info:
self._set_current_variable(path)
def eventFilter(self, obj, ev):
if obj is self.edit_var_search and ev.type() == QtCore.QEvent.KeyPress:
if ev.key() in (QtCore.Qt.Key_Down, QtCore.Qt.Key_Up):
if not self._popup.isVisible():
self._show_popup()
else:
step = 1 if ev.key()==QtCore.Qt.Key_Down else -1
cur = self._popup.currentIndex()
row = cur.row() + step
if row < 0: row = 0
if row >= self._model_filtered.rowCount():
row = self._model_filtered.rowCount()-1
self._popup.setCurrentIndex(self._model_filtered.index(row,0))
return True
elif ev.key() == QtCore.Qt.Key_Escape:
self._hide_popup()
return True
return super().eventFilter(obj, ev)
def _set_current_variable(self, path: str, from_address=False):
if path not in self._path_info:
return
addr, type_str = self._path_info[path]
self.edit_var_search.setText(path)
self.edit_address.setText(f"0x{addr:06X}")
ptr_enum_name = self._map_type_to_ptr_enum(type_str)
self._select_combo_text(self.cmb_ptr_type, ptr_enum_name)
source = "ADDR" if from_address else "SEARCH"
self._log(f"[{source}] Selected {path} @0x{addr:06X} type={type_str} -> ptr={ptr_enum_name}")
# --------------- Date struct / address / helpers ---------------
def _normalize_address(self):
self.edit_address.setText(HexAddrValidator.normalize(self.edit_address.text()))
def _map_type_to_ptr_enum(self, type_str: str) -> str:
if not type_str:
return 'pt_unknown'
low = type_str.lower()
token = low.replace('*',' ').replace('[',' ').split()[0]
return PTR_TYPE_MAP.get(token, 'pt_unknown')
def _select_combo_text(self, combo: QtWidgets.QComboBox, text: str):
ix = combo.findText(text)
if ix >= 0:
combo.setCurrentIndex(ix)
def _collect_datetime(self) -> Dict[str,int]:
return {
'year': self.spin_year.value(),
'month': self.spin_month.value(),
'day': self.spin_day.value(),
'hour': self.spin_hour.value(),
'minute': self.spin_minute.value(),
}
def _emit_variable(self):
if not self._path_info:
QtWidgets.QMessageBox.warning(self, 'No XML', 'Сначала загрузите XML файл.')
return
path = self.edit_var_search.text().strip()
if path not in self._path_info:
QtWidgets.QMessageBox.warning(self, 'Variable', 'Переменная не выбрана / не найдена.')
return
addr, type_str = self._path_info[path]
ptr_type_name = self.cmb_ptr_type.currentText()
iq_type_name = self.cmb_iq_type.currentText()
ret_type_name = self.cmb_return_type.currentText()
out = {
'address': addr,
'address_hex': f"0x{addr:06X}",
'ptr_type': PT_ENUM_VALUE.get(ptr_type_name, 0),
'iq_type': IQ_ENUM_VALUE.get(iq_type_name, 0),
'return_type': IQ_ENUM_VALUE.get(ret_type_name, 0),
'datetime': self._collect_datetime(),
'path': path,
'type_string': type_str,
'ptr_type_name': ptr_type_name,
'iq_type_name': iq_type_name,
'return_type_name': ret_type_name,
}
self._log(f"Prepared variable: {out}")
self.variablePrepared.emit(out)
def _log(self, msg: str):
self.txt_info.appendPlainText(msg)
# ----------------------------------------------------------- Demo window --
class _DemoWindow(QtWidgets.QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle('LowLevel Selector Demo')
self.selector = LowLevelSelectorWidget(self)
self.setCentralWidget(self.selector)
self.selector.variablePrepared.connect(self.on_var)
def on_var(self, data: dict):
print('Variable prepared ->', data)
def closeEvent(self, ev):
self.setCentralWidget(None)
super().closeEvent(ev)
# ----------------------------------------------------------------- main ---
if __name__ == '__main__':
app = QtWidgets.QApplication(sys.argv)
w = _DemoWindow()
w.resize(640, 520)
w.show()
sys.exit(app.exec_())

View File

@ -1,5 +1,5 @@
from PySide2 import QtCore, QtWidgets, QtSerialPort
from tms_debugvar_lowlevel import LowLevelSelectorWidget
import datetime
# ------------------------------- Константы протокола ------------------------
@ -24,6 +24,7 @@ class Spoiler(QtWidgets.QWidget):
def __init__(self, title="", animationDuration=300, parent=None):
super().__init__(parent)
self._animationDuration = animationDuration
self.state = False
# --- Toggle button ---
self.toggleButton = QtWidgets.QToolButton(self)
@ -72,6 +73,9 @@ class Spoiler(QtWidgets.QWidget):
QtWidgets.QWidget().setLayout(old)
self._contentWidget.setLayout(contentLayout)
def getState(self):
return self.state
def _adjust_parent_size(self, *_):
top = self.window()
if top:
@ -80,6 +84,7 @@ class Spoiler(QtWidgets.QWidget):
top.resize(size) # ширина остаётся прежней
def _on_toggled(self, checked: bool):
self.state = checked
self.toggleButton.setArrowType(QtCore.Qt.DownArrow if checked else QtCore.Qt.RightArrow)
contentHeight = self._contentWidget.sizeHint().height()
@ -97,9 +102,13 @@ class Spoiler(QtWidgets.QWidget):
# --------------------------- DebugTerminalWidget ---------------------------
class DebugTerminalWidget(QtWidgets.QWidget):
nameRead = QtCore.Signal(int, int, int, str) # index, status, iq, name
valueRead = QtCore.Signal(int, int, int, int, float) # для одиночного
valuesRead = QtCore.Signal(int, int, list, list, list, list) # base, count, idx_list, iq_list, raw_list, float_list
# Существующие сигналы (Watch)
nameRead = QtCore.Signal(int, int, int, str)
valueRead = QtCore.Signal(int, int, int, int, float)
valuesRead = QtCore.Signal(int, int, list, list, list, list)
# Новые сигналы (LowLevel)
llValueRead = QtCore.Signal(int, int, int, int, float) # addr, status, rettype_raw, raw16_signed, scaled
portOpened = QtCore.Signal(str)
portClosed = QtCore.Signal(str)
txBytes = QtCore.Signal(bytes)
@ -107,7 +116,8 @@ class DebugTerminalWidget(QtWidgets.QWidget):
def __init__(self, parent=None, *,
start_byte=0x0A,
cmd_byte=0x44,
cmd_byte=0x46,
cmd_lowlevel=0x47,
iq_scaling=None,
read_timeout_ms=250,
auto_crc_check=True,
@ -116,13 +126,14 @@ class DebugTerminalWidget(QtWidgets.QWidget):
super().__init__(parent)
self.device_addr = start_byte
self.cmd_byte = cmd_byte
self.cmd_lowlevel = cmd_lowlevel
self.read_timeout_ms = read_timeout_ms
self.auto_crc_check = auto_crc_check
self._drop_if_busy = drop_if_busy
self._replace_if_busy = replace_if_busy
if iq_scaling is None:
iq_scaling = {n: float(1 << n) for n in range(16)}
iq_scaling = {n: float(1 << n) for n in range(31)}
iq_scaling[0] = 1.0
self.iq_scaling = iq_scaling
@ -136,21 +147,28 @@ class DebugTerminalWidget(QtWidgets.QWidget):
self._rx_buf = bytearray()
self._busy = False
self._pending_cmd = None # (frame, meta)
self._txn_meta = None # {'service':bool,'index':int,'varqnt':int,'chain':...}
self._txn_meta = None # {'service':bool,'index':int,'varqnt':int,'chain':...,'lowlevel':bool}
self._txn_timer = QtCore.QTimer(self)
self._txn_timer.setSingleShot(True)
self._txn_timer.timeout.connect(self._on_txn_timeout)
# Watch polling
self._poll_timer = QtCore.QTimer(self)
self._poll_timer.timeout.connect(self._on_poll_timeout)
self._polling = False
# Кэш: index -> (status, iq, name)
# LowLevel polling
self._ll_poll_timer = QtCore.QTimer(self)
self._ll_poll_timer.timeout.connect(self._on_ll_poll_timeout)
self._ll_polling = False
self._ll_current_var_info = None # Хранит инфо о выбранной LL переменной
# Кэш: index -> (status, iq, name, is_signed, frac_bits)
self._name_cache = {}
# Очередь требуемых service индексов перед чтением блока
self._service_queue = [] # список индексов
# Очередь service индексов
self._service_queue = []
self._pending_data_after_services = None # (base, count)
self._build_ui()
@ -178,15 +196,39 @@ class DebugTerminalWidget(QtWidgets.QWidget):
hs.addWidget(self.cmb_baud)
hs.addWidget(self.btn_open)
# --- Watch group (будет растягиваться) ---
# --- TabWidget ---
self.tabs = QtWidgets.QTabWidget()
self._build_watch_tab()
self._build_lowlevel_tab() # <-- Вызываем новый метод
# --- UART Log ---
self.log_spoiler = Spoiler("UART Log", animationDuration=300, parent=self)
self.log_spoiler.setSizePolicy(QtWidgets.QSizePolicy.Expanding,
QtWidgets.QSizePolicy.Minimum)
log_layout = QtWidgets.QVBoxLayout()
self.txt_log = QtWidgets.QTextEdit(); self.txt_log.setReadOnly(True)
self.txt_log.setFontFamily("Courier")
log_layout.addWidget(self.txt_log)
self.log_spoiler.setContentLayout(log_layout)
layout.addWidget(g_serial, 0)
layout.addWidget(self.tabs, 1)
layout.addWidget(self.log_spoiler, 0)
layout.setStretch(layout.indexOf(g_serial), 0)
layout.setStretch(layout.indexOf(self.tabs), 1)
layout.setStretch(layout.indexOf(self.log_spoiler), 0)
def _build_watch_tab(self):
# ... (код для вкладки Watch остаётся без изменений)
tab = QtWidgets.QWidget()
vtab = QtWidgets.QVBoxLayout(tab)
g_watch = QtWidgets.QGroupBox("Watch Variables")
grid = QtWidgets.QGridLayout(g_watch)
grid.setHorizontalSpacing(8)
grid.setVerticalSpacing(4)
self.spin_index = QtWidgets.QSpinBox()
self.spin_index.setRange(0, 0x7FFF)
self.spin_index.setAccelerated(True)
self.spin_index = QtWidgets.QSpinBox(); self.spin_index.setRange(0, 0x7FFF); self.spin_index.setAccelerated(True)
self.chk_hex_index = QtWidgets.QCheckBox("Hex")
self.spin_count = QtWidgets.QSpinBox(); self.spin_count.setRange(1,255); self.spin_count.setValue(1)
self.btn_read_service = QtWidgets.QPushButton("Read Name/Type")
@ -199,92 +241,94 @@ class DebugTerminalWidget(QtWidgets.QWidget):
self.lbl_iq = QtWidgets.QLabel("-")
self.edit_single_value = QtWidgets.QLineEdit(); self.edit_single_value.setReadOnly(True)
# --- Таблица: теперь 5 столбцов (если уже поменяли) ---
self.tbl_values = QtWidgets.QTableWidget(0, 5)
self.tbl_values.setHorizontalHeaderLabels(["Index","Name","IQ","Raw","Scaled"])
self.tbl_values.setSizePolicy(QtWidgets.QSizePolicy.Expanding,
QtWidgets.QSizePolicy.Expanding)
hh = self.tbl_values.horizontalHeader()
hh.setSectionResizeMode(0, QtWidgets.QHeaderView.ResizeToContents)
hh.setSectionResizeMode(1, QtWidgets.QHeaderView.ResizeToContents)
hh.setSectionResizeMode(2, QtWidgets.QHeaderView.ResizeToContents)
hh.setSectionResizeMode(3, QtWidgets.QHeaderView.ResizeToContents)
hh.setSectionResizeMode(4, QtWidgets.QHeaderView.Stretch)
vh = self.tbl_values.verticalHeader()
vh.setVisible(False)
self.tbl_values.verticalHeader().setVisible(False)
r = 0
grid.addWidget(QtWidgets.QLabel("Base Index:"), r, 0)
grid.addWidget(self.spin_index, r, 1)
grid.addWidget(self.chk_hex_index, r, 2); r += 1
grid.addWidget(QtWidgets.QLabel("Base Index:"), r, 0); grid.addWidget(self.spin_index, r, 1); grid.addWidget(self.chk_hex_index, r, 2); r+=1
grid.addWidget(QtWidgets.QLabel("Count:"), r, 0); grid.addWidget(self.spin_count, r, 1); r+=1
grid.addWidget(self.btn_read_service, r, 0); grid.addWidget(self.btn_read_values, r, 1); grid.addWidget(self.btn_poll, r, 2); r+=1
grid.addWidget(QtWidgets.QLabel("Interval:"), r, 0); grid.addWidget(self.spin_interval, r, 1); grid.addWidget(self.chk_auto_service, r, 2); r+=1
grid.addWidget(QtWidgets.QLabel("Name:"), r, 0); grid.addWidget(self.lbl_name, r, 1, 1, 2); r+=1
grid.addWidget(QtWidgets.QLabel("IQ:"), r, 0); grid.addWidget(self.lbl_iq, r, 1); grid.addWidget(self.chk_raw, r, 2); r+=1
grid.addWidget(QtWidgets.QLabel("Single:"), r, 0); grid.addWidget(self.edit_single_value, r, 1, 1, 2); r+=1
grid.addWidget(QtWidgets.QLabel("Array Values:"), r, 0); r+=1
grid.addWidget(self.tbl_values, r, 0, 1, 3); grid.setRowStretch(r, 1)
grid.addWidget(QtWidgets.QLabel("Count:"), r, 0)
grid.addWidget(self.spin_count, r, 1); r += 1
vtab.addWidget(g_watch, 1)
self.tabs.addTab(tab, "Watch")
grid.addWidget(self.btn_read_service, r, 0)
grid.addWidget(self.btn_read_values, r, 1)
grid.addWidget(self.btn_poll, r, 2); r += 1
def _build_lowlevel_tab(self):
tab = QtWidgets.QWidget()
main_layout = QtWidgets.QVBoxLayout(tab)
grid.addWidget(QtWidgets.QLabel("Interval:"), r, 0)
grid.addWidget(self.spin_interval, r, 1)
grid.addWidget(self.chk_auto_service, r, 2); r += 1
# --- Селектор переменной ---
self.ll_selector = LowLevelSelectorWidget(tab)
main_layout.addWidget(self.ll_selector, 1) # Даём ему растягиваться
grid.addWidget(QtWidgets.QLabel("Name:"), r, 0)
grid.addWidget(self.lbl_name, r, 1, 1, 2); r += 1
# --- Панель управления и статуса ---
g_controls = QtWidgets.QGroupBox("Controls & Status")
grid = QtWidgets.QGridLayout(g_controls)
grid.addWidget(QtWidgets.QLabel("IQ:"), r, 0)
grid.addWidget(self.lbl_iq, r, 1)
grid.addWidget(self.chk_raw, r, 2); r += 1
self.btn_ll_read = QtWidgets.QPushButton("Read Once")
self.btn_ll_poll = QtWidgets.QPushButton("Start Polling")
self.spin_ll_interval = QtWidgets.QSpinBox()
self.spin_ll_interval.setRange(50, 10000)
self.spin_ll_interval.setValue(500)
self.spin_ll_interval.setSuffix(" ms")
self.chk_ll_raw = QtWidgets.QCheckBox("Raw (no IQ scaling)")
grid.addWidget(QtWidgets.QLabel("Single:"), r, 0)
grid.addWidget(self.edit_single_value, r, 1, 1, 2); r += 1
self.ll_val_status = QtWidgets.QLabel("-")
self.ll_val_rettype = QtWidgets.QLabel("-")
self.ll_val_raw = QtWidgets.QLabel("-")
self.ll_val_scaled = QtWidgets.QLabel("-")
# Размещение виджетов
grid.addWidget(self.btn_ll_read, 0, 0)
grid.addWidget(self.btn_ll_poll, 0, 1)
grid.addWidget(QtWidgets.QLabel("Interval:"), 1, 0)
grid.addWidget(self.spin_ll_interval, 1, 1)
grid.addWidget(self.chk_ll_raw, 0, 2, 2, 1) # Растянем на 2 строки
grid.addWidget(QtWidgets.QLabel("Array Values:"), r, 0); r += 1
# Результаты в виде формы
form_layout = QtWidgets.QFormLayout()
form_layout.addRow("Status:", self.ll_val_status)
form_layout.addRow("Return Type:", self.ll_val_rettype)
form_layout.addRow("Raw Value:", self.ll_val_raw)
form_layout.addRow("Scaled Value:", self.ll_val_scaled)
grid.addLayout(form_layout, 2, 0, 1, 3)
grid.setColumnStretch(2, 1)
main_layout.addWidget(g_controls)
main_layout.setStretchFactor(g_controls, 0) # Не растягивать
# --- Строка с таблицей, назначаем stretch ---
grid.addWidget(self.tbl_values, r, 0, 1, 3)
grid.setRowStretch(r, 1) # таблица тянется
# Все предыдущие строки по умолчанию имеют stretch=0
# Можно явно grid.setRowStretch(i, 0) при желании
# --- Добавляем группы в главный layout ---
layout.addWidget(g_serial, 0) # не растягивается (stretch=0)
layout.addWidget(g_watch, 1) # растягивается (stretch=1)
# --- UART Log (минимальная высота) ---
self.log_spoiler = Spoiler("UART Log", animationDuration=300, parent=self)
self.log_spoiler.setSizePolicy(QtWidgets.QSizePolicy.Expanding,
QtWidgets.QSizePolicy.Minimum)
log_layout = QtWidgets.QVBoxLayout()
self.txt_log = QtWidgets.QTextEdit()
self.txt_log.setReadOnly(True)
self.txt_log.setFontFamily("Courier")
self.txt_log.setSizePolicy(QtWidgets.QSizePolicy.Expanding,
QtWidgets.QSizePolicy.Minimum)
log_layout.addWidget(self.txt_log)
self.log_spoiler.setContentLayout(log_layout)
# Добавляем лог последним, но без stretch (0)
layout.addWidget(self.log_spoiler, 0)
# Строчки распределения: g_watch = 1, остальное = 0
# Если хочешь принудительно:
# layout.setStretchFactor(g_serial, 0) # PySide2: нет прямого метода, можно:
layout.setStretch(layout.indexOf(g_serial), 0)
layout.setStretch(layout.indexOf(g_watch), 1)
layout.setStretch(layout.indexOf(self.log_spoiler), 0)
self.tabs.addTab(tab, "LowLevel")
def _connect_ui(self):
# Watch
self.btn_refresh.clicked.connect(self.set_available_ports)
self.btn_open.clicked.connect(self._open_close_port)
self.btn_read_service.clicked.connect(self.request_service_single)
self.btn_read_values.clicked.connect(self.request_values)
self.btn_poll.clicked.connect(self._toggle_polling)
self.chk_hex_index.stateChanged.connect(self._toggle_index_base)
# LowLevel (новые и переделанные)
self.ll_selector.variablePrepared.connect(self._on_ll_variable_prepared)
self.ll_selector.xmlLoaded.connect(lambda p: self._log(f"[LL] XML loaded: {p}"))
self.btn_ll_read.clicked.connect(self.request_lowlevel_once)
self.btn_ll_poll.clicked.connect(self._toggle_ll_polling)
# ----------------------------- SERIAL MGMT ----------------------------
# ... (код без изменений)
def set_available_ports(self):
cur = self.cmb_port.currentText()
self.cmb_port.blockSignals(True)
@ -330,6 +374,46 @@ class DebugTerminalWidget(QtWidgets.QWidget):
crc = crc16_ibm(payload)
return payload + bytes([crc & 0xFF, (crc >> 8) & 0xFF])
def _build_lowlevel_request(self, var_info: dict) -> bytes:
# Формат: [adr][cmd_lowlevel][year_hi][year_lo][month][day][hour][minute][addr2][addr1][addr0][pt_type][iq_type][return_type]
# Пытаемся получить время из переданной информации
dt_info = var_info.get('datetime')
if dt_info:
# Используем время из var_info
year = dt_info.get('year', 2000)
month = dt_info.get('month', 1)
day = dt_info.get('day', 1)
hour = dt_info.get('hour', 0)
minute = dt_info.get('minute', 0)
self._log("[LL] Using time from selector.")
else:
# Если в var_info времени нет, используем текущее системное время (старое поведение)
now = QtCore.QDateTime.currentDateTime()
year = now.date().year()
month = now.date().month()
day = now.date().day()
hour = now.time().hour()
minute = now.time().minute()
self._log("[LL] Fallback to current system time.")
addr = var_info.get('address', 0)
addr2 = (addr >> 16) & 0xFF
addr1 = (addr >> 8) & 0xFF
addr0 = addr & 0xFF
pt_type = var_info.get('ptr_type', 0) & 0xFF
iq_type = var_info.get('iq_type', 0) & 0xFF
ret_type = var_info.get('return_type', 0) & 0xFF
frame_wo_crc = bytes([
self.device_addr & 0xFF, self.cmd_lowlevel & 0xFF,
(year >> 8) & 0xFF, year & 0xFF,
month & 0xFF, day & 0xFF, hour & 0xFF, minute & 0xFF,
addr2, addr1, addr0, pt_type, iq_type, ret_type
])
crc = crc16_ibm(frame_wo_crc)
return frame_wo_crc + bytes([crc & 0xFF, (crc >> 8) & 0xFF])
# ----------------------------- PUBLIC API -----------------------------
def request_service_single(self):
idx = int(self.spin_index.value())
@ -344,30 +428,62 @@ class DebugTerminalWidget(QtWidgets.QWidget):
if i not in self._name_cache:
needed.append(i)
if needed:
self._service_queue = needed[:] # копия
self._service_queue = needed[:]
self._pending_data_after_services = (base, count)
self._log(f"[AUTO] Need service for {len(needed)} indices: {needed}")
self._kick_service_queue()
else:
self._enqueue_or_start(base, service=False, varqnt=count)
def request_lowlevel_once(self):
"""Запрашивает чтение выбранной LowLevel переменной."""
if not self.serial.isOpen():
self._log("[LL] Port is not open.")
return
if self._busy:
self._log("[LL] Busy, request dropped.")
return
if not self._ll_current_var_info:
self._log("[LL] No variable selected!")
if self._ll_polling: # Если поллинг активен, но переменная пропала - стоп
self._toggle_ll_polling()
return
frame = self._build_lowlevel_request(self._ll_current_var_info)
meta = {'lowlevel': True}
self._enqueue_raw(frame, meta)
# -------------------------- SERVICE QUEUE FLOW ------------------------
# ... (код без изменений)
def _kick_service_queue(self):
if self._busy:
return # дождёмся завершения
return
if self._service_queue:
nxt = self._service_queue.pop(0)
# не используем chain, просто по завершению снова вызовем _kick_service_queue
self._enqueue_or_start(nxt, service=True, varqnt=0, queue_mode=True)
elif self._pending_data_after_services:
base, count = self._pending_data_after_services
self._pending_data_after_services = None
self._enqueue_or_start(base, service=False, varqnt=count)
# ------------------------ TRANSACTION SCHEDULER -----------------------
# ... (код без изменений)
def _enqueue_raw(self, frame: bytes, meta: dict):
if self._busy:
if self._drop_if_busy and not self._replace_if_busy:
self._log("[LOCKSTEP] Busy -> drop")
return
if self._replace_if_busy:
self._pending_cmd = (frame, meta)
self._log("[LOCKSTEP] Busy -> replaced pending")
else:
self._log("[LOCKSTEP] Busy -> ignore")
return
self._start_txn(frame, meta)
def _enqueue_or_start(self, index, service: bool, varqnt: int, chain_after=None, queue_mode=False):
frame = self._build_request(index, service=service, varqnt=varqnt)
meta = {'service': service, 'index': index, 'varqnt': varqnt, 'chain': chain_after, 'queue_mode': queue_mode}
meta = {'service': service, 'index': index, 'varqnt': varqnt, 'chain': chain_after, 'queue_mode': queue_mode, 'lowlevel': False}
if self._busy:
if self._drop_if_busy and not self._replace_if_busy:
self._log("[LOCKSTEP] Busy -> drop")
@ -392,40 +508,38 @@ class DebugTerminalWidget(QtWidgets.QWidget):
self._txn_timer.stop()
queue_mode = False
chain = None
if self._txn_meta:
queue_mode = self._txn_meta.get('queue_mode', False)
chain = self._txn_meta.get('chain')
meta = self._txn_meta
if meta:
queue_mode = meta.get('queue_mode', False)
chain = meta.get('chain')
self._txn_meta = None
self._busy = False
self._rx_buf.clear()
self._set_ui_busy(False)
# Если был chain -> запустить его
if chain:
base, serv, q = chain
self._enqueue_or_start(base, service=serv, varqnt=q)
return
if self._pending_cmd is not None:
frame, meta = self._pending_cmd
self._pending_cmd = None
frame, meta = self._pending_cmd; self._pending_cmd = None
QtCore.QTimer.singleShot(0, lambda f=frame,m=meta: self._start_txn(f,m))
return
# Если это был элемент очереди service -> continue
if queue_mode:
QtCore.QTimer.singleShot(0, self._kick_service_queue)
return
def _on_txn_timeout(self):
if not self._busy:
return
self._log("[TIMEOUT] No response")
if not self._busy: return
is_ll = self._txn_meta.get('lowlevel', False) if self._txn_meta else False
log_prefix = "[LL TIMEOUT]" if is_ll else "[TIMEOUT]"
self._log(f"{log_prefix} No response")
if self._rx_buf:
self._log_frame(bytes(self._rx_buf), tx=False)
self._end_txn()
# ------------------------------- TX/RX ---------------------------------
# ... (код без изменений)
def _send(self, data: bytes):
w = self.serial.write(data)
if w != len(data):
@ -442,53 +556,76 @@ class DebugTerminalWidget(QtWidgets.QWidget):
self._rx_buf.clear()
return
self._try_parse()
# ------------------------------- PARSING -------------------------------
def _try_parse(self):
if not self._txn_meta:
return
if self._txn_meta.get('lowlevel', False):
self._try_parse_lowlevel()
else:
self._try_parse_watch()
def _try_parse_watch(self):
# ... (код без изменений)
service = self._txn_meta['service']
buf = self._rx_buf
trailer_len = 4
if service:
if len(buf) < 7 + trailer_len: # adr cmd vhi vlo status iq nameLen + trailer
if len(buf) < 7 + trailer_len:
return
name_len = buf[6]
expected = 7 + name_len + trailer_len
if len(buf) < expected:
return
frame = bytes(buf[:expected])
del buf[:expected]
self.rxBytes.emit(frame)
self._log_frame(frame, tx=False)
frame = bytes(buf[:expected]); del buf[:expected]
self.rxBytes.emit(frame); self._log_frame(frame, tx=False)
self._parse_service_frame(frame)
self._end_txn()
else:
if len(buf) < 6 + trailer_len: # adr cmd vhi vlo varqnt status + trailer
if len(buf) < 6 + trailer_len:
return
varqnt = buf[4]
status = buf[5]
varqnt = buf[4]; status = buf[5]
if status != DEBUG_OK:
expected = 8 + trailer_len # + errIndex(2)
if len(buf) < expected:
return
frame = bytes(buf[:expected])
del buf[:expected]
self.rxBytes.emit(frame)
self._log_frame(frame, tx=False)
expected = 8 + trailer_len
if len(buf) < expected: return
frame = bytes(buf[:expected]); del buf[:expected]
self.rxBytes.emit(frame); self._log_frame(frame, tx=False)
self._parse_data_frame(frame, error_mode=True)
self._end_txn()
else:
expected = 6 + varqnt*2 + trailer_len
if len(buf) < expected:
return
frame = bytes(buf[:expected])
del buf[:expected]
self.rxBytes.emit(frame)
self._log_frame(frame, tx=False)
if len(buf) < expected: return
frame = bytes(buf[:expected]); del buf[:expected]
self.rxBytes.emit(frame); self._log_frame(frame, tx=False)
self._parse_data_frame(frame, error_mode=False)
self._end_txn()
def _try_parse_lowlevel(self):
# Ожидаемая длина: Успех=13, Ошибка=10
buf = self._rx_buf
if len(buf) < 10: # Минимальная длина (ошибка)
return
# Проверяем, что ответ для нас
if buf[1] != self.cmd_lowlevel:
self._log("[LL] Unexpected cmd in lowlevel parser, flushing.")
self._log_frame(bytes(self._rx_buf), tx=False)
self._rx_buf.clear()
# Не завершаем транзакцию, ждём таймаута
return
status = buf[2]
expected_len = 13 if status == DEBUG_OK else 10
if len(buf) >= expected_len:
frame = bytes(buf[:expected_len])
del buf[:expected_len]
self.rxBytes.emit(frame)
self._log_frame(frame, tx=False)
self._parse_lowlevel_frame(frame, success=(status == DEBUG_OK))
self._end_txn()
def _check_crc(self, payload: bytes, crc_lo: int, crc_hi: int):
if not self.auto_crc_check:
return True
@ -499,135 +636,136 @@ class DebugTerminalWidget(QtWidgets.QWidget):
return False
self._log("[CRC OK]")
return True
@staticmethod
def _clear_service_bit(vhi, vlo):
return ((vhi & 0x7F) << 8) | vlo
def _parse_service_frame(self, frame: bytes):
payload = frame[:-4]
crc_lo, crc_hi = frame[-4], frame[-3]
# ... (код без изменений)
payload = frame[:-4]; crc_lo, crc_hi = frame[-4], frame[-3]
if len(payload) < 7:
self._log("[ERR] Service frame too short")
return
self._log("[ERR] Service frame too short"); return
self._check_crc(payload, crc_lo, crc_hi)
adr, cmd, vhi, vlo, status, iq_raw, name_len = payload[:7]
index = self._clear_service_bit(vhi, vlo)
if len(payload) < 7 + name_len:
self._log("[ERR] Service name truncated")
return
name_bytes = payload[7:7+name_len]
name = name_bytes.decode(errors='replace')
# ### PATCH: извлекаем признаки
self._log("[ERR] Service name truncated"); return
name_bytes = payload[7:7+name_len]; name = name_bytes.decode(errors='replace')
is_signed = (iq_raw & SIGN_BIT_MASK) != 0
frac_bits = iq_raw & FRAC_MASK_FULL
if status == DEBUG_OK:
# Кэшируем расширенный кортеж
self._name_cache[index] = (status, iq_raw, name, is_signed, frac_bits)
self.nameRead.emit(index, status, iq_raw, name)
if self.spin_count.value() == 1 and index == self.spin_index.value():
if status == DEBUG_OK:
self.lbl_name.setText(name)
# Отображаем IQ как «числоробных_бит + s/u»
self.lbl_iq.setText(f"{frac_bits}{'s' if is_signed else 'u'}")
self.lbl_name.setText(name); self.lbl_iq.setText(f"{frac_bits}{'s' if is_signed else 'u'}")
else:
self.lbl_name.setText('<err>')
self.lbl_iq.setText('-')
self._log(f"[SERVICE] idx={index} status={status} iq_raw=0x{iq_raw:02X} "
f"sign={'S' if is_signed else 'U'} frac={frac_bits} name='{name}'")
self.lbl_name.setText('<err>'); self.lbl_iq.setText('-')
self._log(f"[SERVICE] idx={index} status={status} iq_raw=0x{iq_raw:02X} sign={'S' if is_signed else 'U'} frac={frac_bits} name='{name}'")
def _parse_data_frame(self, frame: bytes, *, error_mode: bool):
payload = frame[:-4]
crc_lo, crc_hi = frame[-4], frame[-3]
# ... (код без изменений)
payload = frame[:-4]; crc_lo, crc_hi = frame[-4], frame[-3]
if len(payload) < 6:
self._log("[ERR] Data frame too short")
return
self._log("[ERR] Data frame too short"); return
self._check_crc(payload, crc_lo, crc_hi)
adr, cmd, vhi, vlo, varqnt, status = payload[:6]
base = self._clear_service_bit(vhi, vlo)
if error_mode:
if len(payload) < 8:
self._log("[ERR] Error frame truncated")
return
err_hi, err_lo = payload[6:8]
bad_index = (err_hi << 8) | err_lo
self._log("[ERR] Error frame truncated"); return
err_hi, err_lo = payload[6:8]; bad_index = (err_hi << 8) | err_lo
self._log(f"[DATA] ERROR status={status} bad_index={bad_index}")
self.valueRead.emit(bad_index, status, 0, 0, float('nan'))
self.valuesRead.emit(base, 0, [], [], [], [])
return
# Нормальный кадр
if len(payload) < 6 + varqnt*2:
self._log("[ERR] Data payload truncated")
return
self._log("[ERR] Data payload truncated"); return
raw_vals = []
pos = 6
for _ in range(varqnt):
hi = payload[pos]; lo = payload[pos+1]; pos += 2
raw16 = (hi << 8) | lo
raw_vals.append(raw16) # пока храним как 0..65535
idx_list = []
iq_list = []
name_list = []
scaled_list = []
display_raw_list = [] # для таблицы Raw (с учётом знака, если знак есть)
raw_vals.append(raw16)
idx_list = []; iq_list = []; name_list = []; scaled_list = []; display_raw_list = []
for ofs, raw16 in enumerate(raw_vals):
idx = base + ofs
# В кэше теперь 5 элементов
status_i, iq_raw, name_i, is_signed, frac_bits = self._name_cache.get(
idx,
(DEBUG_OK, 0, '', False, 0)
)
# Приведение знака
status_i, iq_raw, name_i, is_signed, frac_bits = self._name_cache.get(idx, (DEBUG_OK, 0, '', False, 0))
if is_signed and (raw16 & 0x8000):
value_int = raw16 - 0x10000 # signed 16-bit
value_int = raw16 - 0x10000
else:
value_int = raw16
# Масштаб
if self.chk_raw.isChecked():
scale = 1.0
else:
# scale берём: если в словаре нет — вычисляем 2**frac_bits
scale = self.iq_scaling.get(frac_bits, 2.0 ** frac_bits)
scaled = value_int / scale
idx_list.append(idx)
iq_list.append(iq_raw) # сырой байт (с битом знака)
name_list.append(name_i)
scaled_list.append(scaled)
display_raw_list.append(value_int)
# Populate table
scale = self.iq_scaling.get(frac_bits, 1.0 / (1 << frac_bits))
scaled = float(value_int) / scale if frac_bits > 0 else float(value_int)
idx_list.append(idx); iq_list.append(iq_raw); name_list.append(name_i)
scaled_list.append(scaled); display_raw_list.append(value_int)
self._populate_table(idx_list, name_list, iq_list, display_raw_list, scaled_list)
if varqnt == 1:
self.edit_single_value.setText(
str(display_raw_list[0]) if self.chk_raw.isChecked() else f"{scaled_list[0]:.6g}"
)
self.edit_single_value.setText(str(display_raw_list[0]) if self.chk_raw.isChecked() else f"{scaled_list[0]:.6g}")
if idx_list[0] == self.spin_index.value():
# Отобразим мета
# Достаём из кэша снова (или можно из name_list/iq_list + пересчитать)
_, iq_raw0, name0, is_signed0, frac0 = self._name_cache.get(idx_list[0], (DEBUG_OK, 0, '', False, 0))
self.lbl_name.setText(name0)
self.lbl_iq.setText(f"{frac0}{'s' if is_signed0 else 'u'}")
self.lbl_name.setText(name0); self.lbl_iq.setText(f"{frac0}{'s' if is_signed0 else 'u'}")
self.valueRead.emit(idx_list[0], status, iq_list[0], display_raw_list[0], scaled_list[0])
else:
self.edit_single_value.setText("")
self.valuesRead.emit(base, varqnt, idx_list, iq_list, display_raw_list, scaled_list)
self._log(f"[DATA] base={base} q={varqnt} values={[f'{v:.6g}' for v in scaled_list] if not self.chk_raw.isChecked() else raw_vals}")
def _parse_lowlevel_frame(self, frame: bytes, success: bool):
payload_len = 9 if success else 6
crc_pos = payload_len
payload = frame[:payload_len]
crc_lo, crc_hi = frame[crc_pos], frame[crc_pos+1]
self._check_crc(payload, crc_lo, crc_hi)
status = payload[2]
addr2, addr1, addr0 = payload[3], payload[4], payload[5]
addr24 = (addr2 << 16) | (addr1 << 8) | addr0
self.ll_val_status.setText(f"0x{status:02X} ({'OK' if status == DEBUG_OK else 'ERR'})")
if not success:
self.ll_val_rettype.setText('-')
self.ll_val_raw.setText('-')
self.ll_val_scaled.setText('<ERROR>')
self.llValueRead.emit(addr24, status, 0, 0, float('nan'))
self._log(f"[LL] ERROR status=0x{status:02X} addr=0x{addr24:06X}")
return
return_type = payload[6]
data_hi, data_lo = payload[7], payload[8]
raw16 = (data_hi << 8) | data_lo
is_signed = (return_type & SIGN_BIT_MASK) != 0
frac_bits = return_type & FRAC_MASK_FULL
if is_signed and (raw16 & 0x8000):
value_int = raw16 - 0x10000
else:
value_int = raw16
if self.chk_ll_raw.isChecked():
scale = 1.0
else:
scale = self.iq_scaling.get(frac_bits, 1.0 / (1 << frac_bits)) # 1 / 2^N
scaled = float(value_int) / scale
# Обновляем UI
self.ll_val_rettype.setText(f"0x{return_type:02X} ({frac_bits}{'s' if is_signed else 'u'})")
self.ll_val_raw.setText(str(value_int))
self.ll_val_scaled.setText(f"{scaled:.6g}")
self.llValueRead.emit(addr24, status, return_type, value_int, scaled)
self._log(f"[LL] OK addr=0x{addr24:06X} type=0x{return_type:02X} raw={value_int} scaled={scaled:.6g}")
def _populate_table(self, idxs, names, iqs, raws, scaled):
# ... (код без изменений)
self.tbl_values.setRowCount(len(idxs))
for row, (idx, nm, iq_raw, rv, sv) in enumerate(zip(idxs, names, iqs, raws, scaled)):
is_signed = (iq_raw & SIGN_BIT_MASK) != 0
@ -650,21 +788,56 @@ class DebugTerminalWidget(QtWidgets.QWidget):
# ------------------------------ POLLING --------------------------------
def _toggle_polling(self):
if self._polling:
self._poll_timer.stop(); self._polling=False; self.btn_poll.setText("Start Polling"); self._log("[POLL] Stopped")
self.btn_read_service.setEnabled(True)
self.btn_read_values.setEnabled(True)
self._poll_timer.stop()
self._polling = False
self.btn_poll.setText("Start Polling")
self._log("[POLL] Stopped")
else:
self._poll_timer.start(self.spin_interval.value()); self._polling=True; self.btn_poll.setText("Stop Polling"); self._log(f"[POLL] Started interval={self.spin_interval.value()}ms")
self.btn_read_service.setEnabled(False)
self.btn_read_values.setEnabled(False)
interval = self.spin_interval.value()
self._poll_timer.start(interval)
self._polling = True
self.btn_poll.setText("Stop Polling")
self._log(f"[POLL] Started interval={interval}ms")
self._set_ui_busy(False) # Обновить доступность кнопок
def _on_poll_timeout(self):
if not self.serial.isOpen() or self._busy:
return
self.request_values()
def _toggle_ll_polling(self):
"""Включает и выключает поллинг для LowLevel вкладки."""
if self._ll_polling:
self._ll_poll_timer.stop()
self._ll_polling = False
self.btn_ll_poll.setText("Start Polling")
self._log("[LL POLL] Stopped")
else:
if not self._ll_current_var_info:
self._log("[LL POLL] Cannot start: no variable selected.")
return
interval = self.spin_ll_interval.value()
self._ll_poll_timer.start(interval)
self._ll_polling = True
self.btn_ll_poll.setText("Stop Polling")
self._log(f"[LL POLL] Started interval={interval}ms")
self._set_ui_busy(False) # Обновить доступность кнопок
def _on_ll_poll_timeout(self):
"""Слот таймера поллинга для LowLevel."""
self.request_lowlevel_once()
def _on_ll_variable_prepared(self, var_info: dict):
"""Срабатывает при выборе переменной в селекторе."""
self._ll_current_var_info = var_info
self._log(f"[LL] Selected variable '{var_info['path']}' @ {var_info['address_hex']}")
# Сбрасываем старые значения
self.ll_val_status.setText("-")
self.ll_val_rettype.setText("-")
self.ll_val_raw.setText("-")
self.ll_val_scaled.setText("-")
# ------------------------------ HELPERS --------------------------------
def _toggle_index_base(self, st):
# ... (код без изменений)
val = self.spin_index.value()
if st == QtCore.Qt.Checked:
self.spin_index.setDisplayIntegerBase(16); self.spin_index.setPrefix("0x")
@ -673,28 +846,41 @@ class DebugTerminalWidget(QtWidgets.QWidget):
self.spin_index.setValue(val)
def _set_ui_busy(self, busy: bool):
if self._polling == False:
self.btn_read_service.setEnabled(not busy)
self.btn_read_values.setEnabled(not busy)
# Блокируем кнопки в зависимости от состояния 'busy' и 'polling'
# Watch tab
can_use_watch = not busy and not self._polling
self.btn_read_service.setEnabled(can_use_watch)
self.btn_read_values.setEnabled(can_use_watch)
# LowLevel tab
can_use_ll = not busy and not self._ll_polling
self.btn_ll_read.setEnabled(can_use_ll)
def _on_serial_error(self, err):
if err == QtSerialPort.QSerialPort.NoError:
return
# ... (код без изменений)
if err == QtSerialPort.QSerialPort.NoError: return
self._log(f"[SERIAL ERR] {self.serial.errorString()} ({err})")
if self._busy:
self._end_txn()
if self._busy: self._end_txn()
# ------------------------------ LOGGING --------------------------------
def _log(self, msg: str):
# ... (код без изменений)
if not self.log_spoiler.getState():
return
ts = datetime.datetime.now().strftime('%H:%M:%S.%f')[:-3]
self.txt_log.append(f"{ts} {msg}")
def _log_frame(self, data: bytes, *, tx: bool):
# ... (код без изменений)
if not self.log_spoiler.getState():
return
tag = 'TX' if tx else 'RX'
hexs = ' '.join(f"{b:02X}" for b in data)
ascii_part = ''.join(chr(b) if 32 <= b < 127 else '.' for b in data)
self._log(f"[{tag}] {hexs} |{ascii_part}|")
# ---------------------------------------------------------- Demo harness ---
class _DemoWindow(QtWidgets.QMainWindow):
def __init__(self):
@ -702,28 +888,38 @@ class _DemoWindow(QtWidgets.QMainWindow):
self.setWindowTitle("DebugVar Terminal")
self.term = DebugTerminalWidget(self)
self.setCentralWidget(self.term)
# connect sample signals -> print
self.term.nameRead.connect(self._on_name)
self.term.valueRead.connect(self._on_value)
self.term.llValueRead.connect(self._on_ll_value)
def _on_name(self, index, status, iq, name):
return
print(f"Name idx={index} status={status} iq={iq} name='{name}'")
def _on_value(self, index, status, iq, raw16, floatVal):
return
print(f"Value idx={index} status={status} iq={iq} raw={raw16} val={floatVal}")
def _on_ll_value(self, addr, status, rettype_raw, raw16, scaled):
return
print(f"LL addr=0x{addr:06X} status={status} type=0x{rettype_raw:02X} raw={raw16} scaled={scaled}")
def format_address(addr_text: str) -> str:
try:
value = int(addr_text, 16)
except ValueError:
value = 0
return f"0x{value:06X}"
def closeEvent(self, event):
"""Вызывается при закрытии окна."""
# Явно удаляем центральный виджет
self.setCentralWidget(None)
if self.term:
self.term.deleteLater()
self.term = None
self.term.deleteLater(); self.term = None
super().closeEvent(event)
# ------------------------------- Demo --------------------------------------
if __name__ == "__main__":
import sys
app = QtWidgets.QApplication(sys.argv)
win = _DemoWindow()
win.show()
sys.exit(app.exec_())
win = _DemoWindow(); win.show()
sys.exit(app.exec_())

View File

@ -1,86 +1,64 @@
import re
# variable_select_widget.py
import pickle
import hashlib
from typing import List, Dict, Any, Optional
from PySide2.QtWidgets import (
QWidget, QTreeWidget, QTreeWidgetItem, QVBoxLayout, QLineEdit,
QWidget, QTreeWidget, QTreeWidgetItem, QVBoxLayout, QLineEdit,
QHeaderView, QCompleter
)
from PySide2.QtGui import QKeyEvent
from PySide2.QtCore import Qt, QStringListModel
import pickle
import time
import hashlib
from path_hints import PathHints, canonical_key, split_path_tokens
# ------------------------------------------------------------------
# utils
# ------------------------------------------------------------------
def compute_vars_hash(vars_list):
return hashlib.sha1(pickle.dumps(vars_list)).hexdigest()
# Вспомогательные функции, которые теперь будут использоваться виджетом
def split_path(path):
"""
Разбивает путь на компоненты:
- 'foo[2].bar[1]->baz' ['foo', '[2]', 'bar', '[1]', 'baz']
Если видит '-' в конце строки (без '>' после) обрезает этот '-'
"""
tokens = []
token = ''
i = 0
length = len(path)
while i < length:
c = path[i]
# Разделители: '->' и '.'
if c == '-' and i + 1 < length and path[i:i+2] == '->':
if token:
tokens.append(token)
token = ''
i += 2
continue
elif c == '-' and i == length - 1:
# '-' на конце строки без '>' после — просто пропускаем его
i += 1
continue
elif c == '.':
if token:
tokens.append(token)
token = ''
i += 1
continue
elif c == '[':
if token:
tokens.append(token)
token = ''
idx = ''
while i < length and path[i] != ']':
idx += path[i]
i += 1
if i < length and path[i] == ']':
idx += ']'
i += 1
tokens.append(idx)
continue
else:
token += c
i += 1
if token:
tokens.append(token)
return tokens
def is_lazy_item(item):
def is_lazy_item(item: QTreeWidgetItem) -> bool:
return item.childCount() == 1 and item.child(0).text(0) == 'lazy_marker'
# ------------------------------------------------------------------
# VariableSelectWidget
# ------------------------------------------------------------------
class VariableSelectWidget(QWidget):
"""
Виджет выбора переменных с деревом + строкой поиска + автодополнением.
Подсказки полностью через PathHints.
ВАЖНО: ожидается, что в данных (vars_list) каждое var['name'] ПОЛНЫЙ ПУТЬ
(например: 'project.adc.status'), даже внутри children.
"""
ROLE_NAME = Qt.UserRole # локальный хвост (display)
ROLE_VAR_DICT = Qt.UserRole + 100 # исходный dict
ROLE_FULLPATH = Qt.UserRole + 200 # полный путь
def __init__(self, parent=None):
super().__init__(parent)
self.expanded_vars = []
self.node_index = {}
self.is_autocomplete_on = True # <--- ДОБАВИТЬ ЭТУ СТРОКУ
self._bckspc_pressed = False
self.manual_completion_active = False
self._vars_hash = None
# --- UI Элементы ---
# данные
self.expanded_vars: List[Dict[str, Any]] = []
self.is_autocomplete_on = True
self.manual_completion_active = False
self._bckspc_pressed = False
self._vars_hash: Optional[str] = None
# индекс: canonical_full_path -> item
self._item_by_canon: Dict[str, QTreeWidgetItem] = {}
# подсказки
self.hints = PathHints()
# --- UI ---
self.search_input = QLineEdit(self)
self.search_input.setPlaceholderText("Поиск...")
self.tree = QTreeWidget(self)
self.tree.setHeaderLabels(["Имя переменной", "Тип"])
self.tree.setSelectionMode(QTreeWidget.ExtendedSelection)
@ -97,32 +75,58 @@ class VariableSelectWidget(QWidget):
self.completer.setCaseSensitivity(Qt.CaseInsensitive)
self.completer.setFilterMode(Qt.MatchContains)
self.completer.setWidget(self.search_input)
self.completer.activated[str].connect(self.insert_completion)
# --- Layout ---
layout = QVBoxLayout(self)
layout.setContentsMargins(0, 0, 0, 0)
layout.addWidget(self.search_input)
layout.addWidget(self.tree)
# --- Соединения ---
#self.search_input.textChanged.connect(self.on_search_text_changed)
self.search_input.textChanged.connect(lambda text: self.on_search_text_changed(text))
# layout
lay = QVBoxLayout(self)
lay.setContentsMargins(0, 0, 0, 0)
lay.addWidget(self.search_input)
lay.addWidget(self.tree)
# signals
self.search_input.textChanged.connect(self.on_search_text_changed)
self.search_input.installEventFilter(self)
self.completer.activated[str].connect(lambda text: self.insert_completion(text))
# --- Публичные методы для управления виджетом снаружи ---
# ------------------------------------------------------------------
# public api
# ------------------------------------------------------------------
def set_autocomplete(self, enabled: bool):
"""Включает или выключает режим автодополнения."""
self.is_autocomplete_on = enabled
def set_data(self, vars_list):
"""Основной метод для загрузки данных в виджет."""
def set_data(self, vars_list: List[Dict[str, Any]]):
"""
Загружаем список переменных (формат: см. класс docstring).
"""
# deepcopy
self.expanded_vars = pickle.loads(pickle.dumps(vars_list, protocol=pickle.HIGHEST_PROTOCOL))
# self.build_completion_list() # Если нужна полная перестройка списка
self.populate_tree()
# rebuild hints из полного списка узлов (каждый узел уже с full_path)
self._rebuild_hints_from_vars(self.expanded_vars)
# rebuild tree
self.populate_tree(self.expanded_vars)
# ------------------------------------------------------------------
# hints builder: дети уже содержат ПОЛНЫЙ ПУТЬ
# ------------------------------------------------------------------
def _rebuild_hints_from_vars(self, vars_list: List[Dict[str, Any]]):
paths: List[tuple] = []
def walk(node: Dict[str, Any]):
full = node.get('name', '')
if full:
paths.append((full, node.get('type')))
for ch in node.get('children', []) or []:
walk(ch)
for v in vars_list:
walk(v)
self.hints.set_paths(paths)
# ------------------------------------------------------------------
# tree building
# ------------------------------------------------------------------
def populate_tree(self, vars_list=None):
if vars_list is None:
vars_list = self.expanded_vars
@ -130,477 +134,311 @@ class VariableSelectWidget(QWidget):
new_hash = compute_vars_hash(vars_list)
if self._vars_hash == new_hash:
return
self._vars_hash = new_hash
self.tree.setUpdatesEnabled(False)
self.tree.blockSignals(True)
self.tree.clear()
self.node_index.clear()
self._item_by_canon.clear()
for var in vars_list:
self.add_tree_item_lazy(None, var)
# построим top-level из входного списка: определяем по глубине токенов
# (vars_list может содержать и глубокие узлы; выберем корни = те, чей full_path не имеет родителя в списке)
full_to_node = {v['name']: v for v in vars_list}
# но safer: просто добавляем все как top-level, если ты уже передаёшь только корни.
# Если в твоих данных vars_list == корни, просто сделаем:
for v in vars_list:
self._add_tree_item_lazy(None, v)
self.tree.setUpdatesEnabled(True)
self.tree.blockSignals(False)
header = self.tree.header()
header.setSectionResizeMode(QHeaderView.Interactive)
header.setSectionResizeMode(1, QHeaderView.Stretch)
self.tree.setColumnWidth(0, 400)
def on_item_expanded(self, item):
def on_item_expanded(self, item: QTreeWidgetItem):
if is_lazy_item(item):
item.removeChild(item.child(0))
var = item.data(0, Qt.UserRole + 100)
var = item.data(0, self.ROLE_VAR_DICT)
if var:
for child_var in var.get('children', []):
self.add_tree_item_lazy(item, child_var)
for ch in var.get('children', []) or []:
self._add_tree_item_lazy(item, ch)
def get_full_item_name(self, item):
fullname = item.text(0)
# Заменяем '->' на '.'
fullname = fullname.replace('->', '.')
fullname = fullname.replace('[', '.[')
return fullname
def add_tree_item_lazy(self, parent, var):
name = var['name']
# ------------------------------------------------------------------
# item creation (var['name'] — ПОЛНЫЙ ПУТЬ)
# ------------------------------------------------------------------
def _add_tree_item_lazy(self, parent: Optional[QTreeWidgetItem], var: Dict[str, Any]):
full_path = var.get('name', '')
type_str = var.get('type', '')
item = QTreeWidgetItem([name, type_str])
item.setData(0, Qt.UserRole, name)
full_name = self.get_full_item_name(item)
self.node_index[full_name.lower()] = item
# здесь оставляем полный путь для отображения
item = QTreeWidgetItem([full_path, type_str])
item.setData(0, self.ROLE_NAME, full_path) # теперь ROLE_NAME = полный путь
item.setData(0, self.ROLE_VAR_DICT, var)
item.setData(0, self.ROLE_FULLPATH, full_path)
if "(bitfield:" in type_str:
item.setDisabled(True)
self.set_tool(item, "Битовые поля недоступны для выбора")
self._set_tool(item, "Битовые поля недоступны для выбора")
# метаданные
for i, attr in enumerate(['file', 'extern', 'static']):
item.setData(0, Qt.UserRole + 1 + i, var.get(attr))
# в дерево
if parent is None:
self.tree.addTopLevelItem(item)
else:
parent.addChild(item)
# Если есть дети — добавляем заглушку (чтобы можно было раскрыть)
# lazy children
if var.get('children'):
dummy = QTreeWidgetItem(["lazy_marker"])
item.addChild(dummy)
# Кэшируем детей для подгрузки по событию
item.setData(0, Qt.UserRole + 100, var) # Сохраняем var целиком
# индекс
self._item_by_canon[canonical_key(full_path)] = item
def show_matching_path(self, item, path_parts, level=0):
node_name = item.text(0).lower()
node_parts = split_path(node_name)
@staticmethod
def _tail_token(full_path: str) -> str:
toks = split_path_tokens(full_path)
return toks[-1] if toks else full_path
if 'project' in node_name:
a = 1
# ------------------------------------------------------------------
# filtering
# ------------------------------------------------------------------
def filter_tree(self):
"""
Быстрый фильтр:
- без разделителей substring по ЛОКАЛЬНОМУ имени top-level
- с разделителями структурный (по токенам full_path)
"""
text = (self.search_input.text() or '').strip()
low = text.lower()
parts = split_path_tokens(low) if low else []
# простой режим (нет ., ->, [):
if low and all(x not in low for x in ('.', '->', '[')):
for i in range(self.tree.topLevelItemCount()):
it = self.tree.topLevelItem(i)
full = (it.data(0, self.ROLE_FULLPATH) or '').lower()
it.setHidden(low not in full)
return
# структурный
for i in range(self.tree.topLevelItemCount()):
it = self.tree.topLevelItem(i)
self._show_matching_path(it, parts, 0)
def _show_matching_path(self, item: QTreeWidgetItem, path_parts: List[str], level: int = 0):
"""
Сравниваем введённый путь (разбитый на токены) с ПОЛНЫМ ПУТЁМ узла.
Алгоритм: берём полный путь узла, разбиваем в токены, берём уровень level,
и сравниваем с соответствующим токеном path_parts[level].
"""
full = (item.data(0, self.ROLE_FULLPATH) or '').lower()
node_parts = split_path_tokens(full)
if level >= len(path_parts):
# Путь полностью пройден — показываем только этот узел (без раскрытия всех детей)
item.setHidden(False)
item.setExpanded(False)
return True
if level >= len(node_parts):
# Уровень поиска больше длины пути узла — скрываем
item.setHidden(False)
item.setHidden(True)
return False
search_part = path_parts[level]
node_part = node_parts[level]
if search_part == node_part:
# Точное совпадение — показываем узел, идём вглубь только по совпадениям
item.setHidden(False)
matched_any = False
self.on_item_expanded(item)
for i in range(item.childCount()):
child = item.child(i)
if self.show_matching_path(child, path_parts, level + 1):
ch = item.child(i)
if self._show_matching_path(ch, path_parts, level + 1):
matched_any = True
item.setExpanded(matched_any)
return matched_any or item.childCount() == 0
elif node_part.startswith(search_part):
# Неполное совпадение — показываем только этот узел, детей скрываем, не раскрываем
item.setHidden(False)
item.setExpanded(False)
return True
elif search_part in node_part and (level == len(path_parts)-1):
# Неполное совпадение — показываем только этот узел, детей скрываем, не раскрываем
elif search_part in node_part and (level == len(path_parts) - 1):
item.setHidden(False)
item.setExpanded(False)
return True
else:
# Несовпадение — скрываем
item.setHidden(True)
return False
def filter_tree(self):
text = self.search_input.text().strip().lower()
path_parts = split_path(text) if text else []
if '.' not in text and '->' not in text and '[' not in text and text != '':
for i in range(self.tree.topLevelItemCount()):
item = self.tree.topLevelItem(i)
name = item.text(0).lower()
if text in name:
item.setHidden(False)
# Не сбрасываем expanded, чтобы можно было раскрывать вручную
else:
item.setHidden(True)
else:
for i in range(self.tree.topLevelItemCount()):
item = self.tree.topLevelItem(i)
self.show_matching_path(item, path_parts, 0)
def find_node_by_path(self, root_vars, path_list):
current_level = root_vars
node = None
for part in path_list:
node = None
for var in current_level:
if var['name'] == part:
node = var
break
if node is None:
return None
current_level = node.get('children', [])
return node
def update_completions(self, text=None):
# ------------------------------------------------------------------
# completions (ONLY PathHints)
# ------------------------------------------------------------------
def update_completions(self, text: Optional[str] = None) -> List[str]:
if text is None:
text = self.search_input.text().strip()
text = self.search_input.text()
suggestions = self.hints.suggest(text)
self.completer.setModel(QStringListModel(suggestions))
if suggestions:
self.completer.complete()
else:
text = text.strip()
self.completer.popup().hide()
return suggestions
normalized_text = text.replace('->', '.')
parts = split_path(text)
path_parts = parts[:-1] if parts else []
prefix = parts[-1].lower() if parts else ''
ends_with_sep = text.endswith('.') or text.endswith('->') or text.endswith('[')
is_index_suggestion = text.endswith('[')
def insert_completion(self, full_path: str):
"""
Пользователь выбрал подсказку (full_path).
Если у узла есть дети и пользователь не поставил разделитель
добавим '.'. Для массивного токена ('[0]') добавим '.' тоже.
(Позже допилим '->' при наличии метаданных.)
"""
node = self.hints.find_node(full_path)
text = full_path
completions = []
def find_exact_node(parts):
if not parts:
return None
fullname = parts[0]
for p in parts[1:]:
fullname += '.' + p
return self.node_index.get(fullname.lower())
if is_index_suggestion:
base_text = text[:-1] # убираем '['
parent_node = self.find_node_by_fullname(base_text)
if not parent_node:
base_text_clean = re.sub(r'\[\d+\]$', '', base_text)
parent_node = self.find_node_by_fullname(base_text_clean)
if parent_node:
seen = set()
for i in range(parent_node.childCount()):
child = parent_node.child(i)
if child.isHidden():
continue
cname = child.text(0)
m = re.match(rf'^{re.escape(base_text)}\[(\d+)\]$', cname)
if m and cname not in seen:
completions.append(cname)
seen.add(cname)
self.completer.setModel(QStringListModel(completions))
return completions
if ends_with_sep:
node = self.find_node_by_fullname(text[:-1])
if node:
for i in range(node.childCount()):
child = node.child(i)
if child.isHidden():
continue
completions.append(child.text(0))
elif not path_parts:
# Первый уровень — только если имя начинается с prefix
for i in range(self.tree.topLevelItemCount()):
item = self.tree.topLevelItem(i)
if item.isHidden():
continue
name = item.text(0)
if name.lower().startswith(prefix):
completions.append(name)
else:
node = find_exact_node(path_parts)
if node:
for i in range(node.childCount()):
child = node.child(i)
if child.isHidden():
continue
name = child.text(0)
name_parts = child.data(0, Qt.UserRole + 10)
if name_parts is None:
name_parts = split_path(name)
child.setData(0, Qt.UserRole + 10, name_parts)
if not name_parts:
continue
last_part = name_parts[-1].lower()
if prefix == '' or prefix in last_part: # ← строго startswith
completions.append(name)
self.completer.setModel(QStringListModel(completions))
self.completer.complete()
return completions
# Функция для поиска узла с полным именем
def find_node_by_fullname(self, name):
if name is None:
return None
normalized_name = name.replace('->', '.').lower()
normalized_name = normalized_name.replace('[', '.[').lower()
return self.node_index.get(normalized_name)
def insert_completion(self, text):
node = self.find_node_by_fullname(text)
if node and node.childCount() > 0 and not (text.endswith('.') or text.endswith('->') or text.endswith('[')):
# Определяем разделитель по имени первого ребёнка
child_name = node.child(0).text(0)
if child_name.startswith(text + '->'):
text += '->'
elif child_name.startswith(text + '.'):
text += '.'
elif '[' in child_name:
text += '[' # для массивов
if node and node.children and not (
text.endswith('.') or text.endswith('->') or text.endswith('[')
):
first_child = next(iter(node.children.values()))
if first_child.name.startswith('['):
text += '[' # пользователь сразу начнёт ввод индекса
else:
text += '.' # fallback
if not self._bckspc_pressed:
self.search_input.setText(text)
self.search_input.setCursorPosition(len(text))
self.run_completions(text)
else:
text += '.' # обычный переход
if not self._bckspc_pressed:
self.search_input.setText(text)
self.search_input.setCursorPosition(len(text))
self.run_completions(text)
# ------------------------------------------------------------------
# events
# ------------------------------------------------------------------
def eventFilter(self, obj, event):
if obj == self.search_input and isinstance(event, QKeyEvent):
if event.key() == Qt.Key_Space and event.modifiers() & Qt.ControlModifier:
self.manual_completion_active = True
text = self.search_input.text().strip()
self.run_completions(text)
self.run_completions(self.search_input.text())
elif event.key() == Qt.Key_Escape:
# Esc — выключаем ручной режим и скрываем подсказки, если autocomplete выключен
if not self.is_autocomplete_on:
self.manual_completion_active = False
self.completer.popup().hide()
return True
if event.key() == Qt.Key_Backspace:
self._bckspc_pressed = True
self._bckspc_pressed = True
else:
self._bckspc_pressed = False
return super().eventFilter(obj, event)
def run_completions(self, text):
completions = self.update_completions(text)
if not self.is_autocomplete_on and self._bckspc_pressed:
text = text[:-1]
if len(completions) == 1 and completions[0].lower() == text.lower():
# Найдем узел с таким именем
def find_exact_item(name):
stack = [self.tree.topLevelItem(i) for i in range(self.tree.topLevelItemCount())]
while stack:
node = stack.pop()
if node.text(0).lower() == name.lower():
return node
for i in range(node.childCount()):
stack.append(node.child(i))
return None
def run_completions(self, text: str):
if not self.is_autocomplete_on and not self.manual_completion_active:
self.completer.popup().hide()
return
self.update_completions(text)
node = find_exact_item(completions[0])
if node and node.childCount() > 0:
# Используем первую подсказку, чтобы определить нужный разделитель
completions = self.update_completions(text + '.')
if not completions:
return
suggestion = completions[0]
# Ищем, какой символ идёт после текущего текста
separator = '.'
if suggestion.startswith(text):
rest = suggestion[len(text):]
if rest.startswith(text + '->'):
separator += '->'
elif rest.startswith(text + '.'):
separator += '.'
elif '[' in rest:
separator += '[' # для массивов
else:
separator += '.' # fallback
if not self._bckspc_pressed:
self.search_input.setText(text + separator)
completions = self.update_completions(text)
self.completer.setModel(QStringListModel(completions))
self.completer.complete()
return True
# Иначе просто показываем подсказки
self.completer.setModel(QStringListModel(completions))
if completions:
self.completer.complete()
return True
def on_search_text_changed(self, text):
sender_widget = self.sender()
sender_name = sender_widget.objectName() if sender_widget else "Unknown Sender"
self.completer.setWidget(self.search_input)
def on_search_text_changed(self, text: str):
self.completer.setWidget(self.search_input)
self.filter_tree()
if text == None:
text = self.search_input.text().strip()
if text is None:
text = self.search_input.text()
if self.is_autocomplete_on:
self.run_completions(text)
else:
# Если выключено, показываем подсказки только если флаг ручного вызова True
if self.manual_completion_active:
self.run_completions(text)
else:
self.completer.popup().hide()
def focusInEvent(self, event):
if self.completer.widget() != self.search_input:
self.completer.setWidget(self.search_input)
super().focusInEvent(event)
def _custom_focus_in_event(self, event):
# Принудительно установить виджет для completer при получении фокуса
if self.completer.widget() != self.search_input:
self.completer.setWidget(self.search_input)
super(QLineEdit, self.search_input).focusInEvent(event) # Вызвать оригинальный обработчик
def build_completion_list(self):
completions = []
def recurse(var, prefix=''):
fullname = f"{prefix}.{var['name']}" if prefix else var['name']
completions.append(fullname)
for child in var.get('children', []):
recurse(child, fullname)
for v in self.expanded_vars:
recurse(v)
self.all_completions = completions
def set_tool(self, item, text):
item.setToolTip(0, text)
item.setToolTip(1, text)
def get_all_items(self):
"""Возвращает все конечные (leaf) элементы, исключая битовые поля и элементы с детьми (реальными)."""
def collect_leaf_items(parent):
leaf_items = []
for i in range(parent.childCount()):
child = parent.child(i)
if child.isHidden():
continue
# Если есть заглушка — раскрываем
self.on_item_expanded(child)
if child.childCount() == 0:
item_type = child.text(1)
if item_type and 'bitfield' in str(item_type).lower():
continue
leaf_items.append(child)
else:
leaf_items.extend(collect_leaf_items(child))
return leaf_items
all_leaf_items = []
for i in range(self.tree.topLevelItemCount()):
top = self.tree.topLevelItem(i)
# Раскрываем lazy, если надо
self.on_item_expanded(top)
if top.childCount() == 0:
item_type = top.text(1)
if item_type and 'bitfield' in str(item_type).lower():
continue
all_leaf_items.append(top)
else:
all_leaf_items.extend(collect_leaf_items(top))
return all_leaf_items
def _get_internal_selected_items(self):
"""Возвращает выделенные элементы и всех их потомков, включая lazy."""
selected = self.tree.selectedItems()
all_items = []
def collect_children(item):
# Раскрываем при необходимости
# Раскрываем lazy, если надо
self.on_item_expanded(item)
items = [item]
for i in range(item.childCount()):
child = item.child(i)
items.extend(collect_children(child))
return items
for item in selected:
all_items.extend(collect_children(item))
return all_items
def get_selected_items(self):
"""Возвращает только конечные (leaf) выделенные элементы, исключая bitfield."""
selected = self.tree.selectedItems()
leaf_items = []
for item in selected:
# Раскрываем lazy, если надо
self.on_item_expanded(item)
# Если у узла нет видимых/выделенных детей — он лист
if all(item.child(i).isHidden() or not item.child(i).isSelected() for i in range(item.childCount())):
item_type = item.data(0, Qt.UserRole)
if item_type and 'bitfield' in str(item_type).lower():
continue
leaf_items.append(item)
return leaf_items
def get_all_var_names(self):
"""Возвращает имена всех конечных (leaf) переменных, исключая битовые поля и группы."""
return [item.text(0) for item in self.get_all_items() if item.text(0)]
def _get_internal_selected_var_names(self):
"""Возвращает имена выделенных переменных."""
return [item.text(0) for item in self._get_internal_selected_items() if item.text(0)]
def get_selected_var_names(self):
"""Возвращает имена только конечных (leaf) переменных из выделенных."""
return [item.text(0) for item in self.get_selected_items() if item.text(0)]
def closeEvent(self, event):
self.completer.setWidget(None)
self.completer.deleteLater()
super().closeEvent(event)
super().closeEvent(event)
# ------------------------------------------------------------------
# lookup by full path
# ------------------------------------------------------------------
def find_item_by_fullpath(self, path: str) -> Optional[QTreeWidgetItem]:
return self._item_by_canon.get(canonical_key(path))
# ------------------------------------------------------------------
# tooltips
# ------------------------------------------------------------------
def _set_tool(self, item: QTreeWidgetItem, text: str):
item.setToolTip(0, text)
item.setToolTip(1, text)
# ------------------------------------------------------------------
# selection helpers
# ------------------------------------------------------------------
def get_all_items(self):
"""Все leaf-узлы (подгружаем lazy)."""
def collect_leaf(parent):
leaves = []
for i in range(parent.childCount()):
ch = parent.child(i)
if ch.isHidden():
continue
self.on_item_expanded(ch)
if ch.childCount() == 0:
t = ch.text(1)
if t and 'bitfield' in t.lower():
continue
leaves.append(ch)
else:
leaves.extend(collect_leaf(ch))
return leaves
out = []
for i in range(self.tree.topLevelItemCount()):
top = self.tree.topLevelItem(i)
self.on_item_expanded(top)
if top.childCount() == 0:
t = top.text(1)
if t and 'bitfield' in t.lower():
continue
out.append(top)
else:
out.extend(collect_leaf(top))
return out
def _get_internal_selected_items(self):
selected = self.tree.selectedItems()
all_items = []
def collect(item):
self.on_item_expanded(item)
res = [item]
for i in range(item.childCount()):
res.extend(collect(item.child(i)))
return res
for it in selected:
all_items.extend(collect(it))
return all_items
def get_selected_items(self):
selected = self.tree.selectedItems()
leaves = []
for it in selected:
self.on_item_expanded(it)
if all(it.child(i).isHidden() or not it.child(i).isSelected() for i in range(it.childCount())):
t = it.data(0, self.ROLE_NAME)
if t and isinstance(t, str) and 'bitfield' in t.lower():
continue
leaves.append(it)
return leaves
def get_all_var_names(self):
return [it.data(0, self.ROLE_FULLPATH) for it in self.get_all_items() if it.data(0, self.ROLE_FULLPATH)]
def _get_internal_selected_var_names(self):
return [it.data(0, self.ROLE_FULLPATH) for it in self._get_internal_selected_items() if it.data(0, self.ROLE_FULLPATH)]
def get_selected_var_names(self):
return [it.data(0, self.ROLE_FULLPATH) for it in self.get_selected_items() if it.data(0, self.ROLE_FULLPATH)]

View File

@ -10,6 +10,21 @@ DebugLowLevel_t debug_ll = DEBUG_LOWLEVEL_INIT; ///<
static int getDebugVar(DebugVar_t *var, int32_t *int_var, float *float_var);
static int convertDebugVarToIQx(DebugVar_t *var, int32_t *ret_var);
static int iqTypeToQ(DebugVarIQType_t t);
static int is_addr_in_allowed_ranges(uint32_t addr_val, const AddrRange_t *ranges, int count);
/**
* @brief Ìàññèâ äîïóñòèìûõ äèàïàçîíîâ àäðåñîâ äëÿ îòëàäî÷íîãî ÷òåíèÿ
*
* Âêëþ÷àåò â ñåáÿ íàáîð äèàïàçîíîâ ïàìÿòè, ðàçðåø¸ííûõ äëÿ äîñòóïà
* ôóíêöèåé Debug_LowLevel_ReadVar.
*/
static const AddrRange_t debug_allowed_ranges[] = ALLOWED_ADDRESS_RANGES;
/**
* @brief Êîëè÷åñòâî ýëåìåíòîâ â ìàññèâå debug_allowed_ranges
*/
static const int debug_allowed_ranges_count = sizeof(debug_allowed_ranges) / sizeof(debug_allowed_ranges[0]);
///////////////////////////----EXAPLE-----//////////////////////////////
int var_numb = 1; ///< Ïðèìåð ïåðåìåííîé äëÿ îòëàäêè
DebugVarName_t var_name; ///< Èìÿ ïåðåìåííîé
@ -151,7 +166,6 @@ int Debug_ReadVarName(int var_ind, DebugVarName_t name_ptr, int *length)
return 0;
}
/**
* @brief ×èòàåò çíà÷åíèå ïåðåìåííîé îòëàäêè ñ íèæíåãî óðîâíÿ.
* @param return_32b óêàçàòåëü íà ïåðåìåííóþ, êóäà çàïèñûâàåòñÿ ðåçóëüòàò.
@ -168,18 +182,8 @@ int Debug_LowLevel_ReadVar(int32_t *return_32b)
if (debug_ll.isVerified == 0)
return DEBUG_ERR_DATATIME;
// Ðàçðåø¸ííûå äèàïàçîíû ïàìÿòè (èç .cmd ôàéëà)
if (!(
(addr_val <= 0x0007FF) || // RAMM0 + RAMM1
(addr_val >= 0x008120 && addr_val <= 0x009FFC) || // L0 + L1 SARAM
(addr_val >= 0x3F8000 && addr_val <= 0x3F9FFF) || // PRAMH0 + DRAMH0
(addr_val >= 0x3FF000 && addr_val <= 0x3FFFFF) || // BOOTROM + RESET
(addr_val >= 0x080002 && addr_val <= 0x09FFFF) || // RAMEX1
(addr_val >= 0x0F0000 && addr_val <= 0x0FFEFF) || // RAMEX4
(addr_val >= 0x100002 && addr_val <= 0x103FFF) || // RAMEX0 + RAMEX2 + RAMEX01
(addr_val >= 0x102000 && addr_val <= 0x103FFF) // RAMEX2
)) {
if (is_addr_in_allowed_ranges(addr_val, debug_allowed_ranges, debug_allowed_ranges_count) != 0)
{
return DEBUG_ERR_ADDR; // Çàïðåù¸ííûé àäðåñ — íåëüçÿ ÷èòàòü
}
@ -215,6 +219,60 @@ int Debug_LowLevel_Initialize(DateTime_t* external_date)
}
/**
* @brief ×èòàåò âîçâðàùàåìûé òèï (IQ) íèçêîóðîâíåíî çàäàííîé ïåðåìåííîé.
* @param var_ind èíäåêñ ïåðåìåííîé.
* @param vartype óêàçàòåëü äëÿ âîçâðàòà òèïà.
* @return int 0: óñïåõ, 1: îøèáêà.
* @details Èñïîëüçóåòñÿ äëÿ ÷òåíèÿ âîçâðàùàåìîãî òèïà (IQ) ïåðåìåííûõ ïî èõ èíäåêñó.
*/
int Debug_LowLevel_ReadVarReturnType(int *vartype)
{
int rettype;
if(vartype == NULL)
return DEBUG_ERR_INTERNAL;
if((debug_ll.dbg_var.ptr_type == pt_struct) || (debug_ll.dbg_var.ptr_type == pt_union) ||
(debug_ll.dbg_var.ptr_type == pt_unknown))
return DEBUG_ERR_INVALID_VAR;
*vartype = iqTypeToQ(debug_ll.dbg_var.return_type);
return 0;
}
/**
* @brief ×èòàåò òèï íèçêîóðîâíåíî çàäàííîé ïåðåìåííîé.
* @param var_ind èíäåêñ ïåðåìåííîé.
* @param vartype óêàçàòåëü äëÿ âîçâðàòà òèïà.
* @return int 0: óñïåõ, 1: îøèáêà.
*/
int Debug_LowLevel_ReadVarType(int *vartype)
{
int rettype;
if(vartype == NULL)
return DEBUG_ERR_INTERNAL;
if((debug_ll.dbg_var.ptr_type == pt_struct) || (debug_ll.dbg_var.ptr_type == pt_union) ||
(debug_ll.dbg_var.ptr_type == pt_unknown))
return DEBUG_ERR_INVALID_VAR;
*vartype = debug_ll.dbg_var.ptr_type;
switch(debug_ll.dbg_var.ptr_type)
{
case pt_int8:
case pt_int16:
case pt_int32:
case pt_float:
*vartype = debug_ll.dbg_var.ptr_type | DEBUG_SIGNED_VAR;
break;
default:
*vartype = debug_ll.dbg_var.ptr_type;
break;
}
return 0;
}
@ -371,3 +429,23 @@ static int getDebugVar(DebugVar_t *var, int32_t *int_var, float *float_var)
return 0; // óñïåõ
}
/**
* @brief Ïðîâåðÿåò, âõîäèò ëè àäðåñ â îäèí èç äîïóñòèìûõ äèàïàçîíîâ
*
* @param addr_val - Çíà÷åíèå àäðåñà äëÿ ïðîâåðêè
* @param ranges - Óêàçàòåëü íà ìàññèâ äèàïàçîíîâ AddrRange_t
* @param count - Êîëè÷åñòâî äèàïàçîíîâ â ìàññèâå
* @return 0 åñëè àäðåñ íàõîäèòñÿ â îäíîì èç äèàïàçîíîâ, èíà÷å 1
*/
static int is_addr_in_allowed_ranges(uint32_t addr_val, const AddrRange_t *ranges, int count)
{
int i;
for (i = 0; i < count; i++) {
if (addr_val >= ranges[i].start && addr_val <= ranges[i].end) {
return 0;
}
}
return 1;
}

View File

@ -5,6 +5,18 @@
#define ALLOWED_ADDRESS_RANGES { \
{0x000000, 0x0007FF}, \
{0x008120, 0x009FFC}, \
{0x3F8000, 0x3F9FFF}, \
{0x3FF000, 0x3FFFFF}, \
{0x080002, 0x09FFFF}, \
{0x0F0000, 0x0FFEFF}, \
{0x100002, 0x103FFF}, \
{0x102000, 0x103FFF} \
}
#if UINT8_MAX // Åñëè åñòü òèï 8 áèò - çí÷à÷èò àäðåñàöèÿ ïî 8 áèò
#define ALIGN_8BIT 0x0 ///< Âûðàâíèâàíèå áåç îãðàíè÷åíèé (ëþáîé àäðåñ)
@ -141,6 +153,13 @@ typedef struct {
uint8_t minute; ///< Ìèíóòû (0-59)
} DateTime_t;
/**
* @brief Ñòðóêòóðà, îïèñûâàþùàÿ äèàïàçîí àäðåñîâ ïàìÿòè.
*/
typedef struct {
uint32_t start; ///< Íà÷àëüíûé àäðåñ äèàïàçîíà
uint32_t end; ///< Êîíå÷íûé àäðåñ äèàïàçîíà (âêëþ÷èòåëüíî)
} AddrRange_t;
/**
* @brief Ñòðóêòóðà íèæíåãî óðîâíÿ îòëàäêè.
*/
@ -176,9 +195,14 @@ int Debug_ReadVarName(int var_ind, DebugVarName_t name_ptr, int *length);
int Debug_ReadVarReturnType(int var_ind, int *vartype);
/* ×èòàåò òèï ïåðåìåííîé ïî èíäåêñó */
int Debug_ReadVarType(int var_ind, int *vartype);
/* ×èòàåò çíà÷åíèå ïåðåìåííîé ñ íèæíåãî óðîâíÿ */
int Debug_LowLevel_ReadVar(int32_t *return_long);
/* Èíèöèàëèçèðóåò îòëàäêó íèæíåãî óðîâíÿ */
int Debug_LowLevel_Initialize(DateTime_t *external_date);
/* ×èòàåò âîçâðàùàåìûé òèï (IQ) íèçêîóðîâíåíî çàäàííîé ïåðåìåííîé */
int Debug_LowLevel_ReadVarReturnType(int *vartype);
/* ×èòàåò òèï íèçêîóðîâíåíî çàäàííîé ïåðåìåííîé.*/
int Debug_LowLevel_ReadVarType(int *vartype);
#endif //DEBUG_TOOLS

View File

@ -1,23 +0,0 @@
#include "debug_tools.h"
// Инклюды для доступа к переменным
#include "bender.h"
// Экстерны для доступа к переменным
extern int ADC0finishAddr;
// Определение массива с указателями на переменные для отладки
int DebugVar_Qnt = 5;
#pragma DATA_SECTION(dbg_vars,".dbgvar_info")
// pointer_type iq_type return_iq_type short_name
DebugVar_t dbg_vars[] = {\
{(uint8_t *)&freqTerm, pt_float, t_iq_none, t_iq10, "freqT" }, \
{(uint8_t *)&ADC_sf[0][0], pt_int16, t_iq_none, t_iq_none, "ADC_sf00" }, \
{(uint8_t *)&ADC_sf[0][1], pt_int16, t_iq_none, t_iq_none, "ADC_sf01" }, \
{(uint8_t *)&ADC_sf[0][2], pt_int16, t_iq_none, t_iq_none, "ADC_sf02" }, \
{(uint8_t *)&ADC_sf[0][3], pt_int16, t_iq_none, t_iq_none, "ADC_sf03" }, \
{(uint8_t *)&Bender[0].KOhms, pt_uint16, t_iq, t_iq10, "Bend0.KOhm" }, \
{(uint8_t *)&Bender[0].Times, pt_uint16, t_iq_none, t_iq_none, "Bend0.Time" }, \
};

View File

@ -1,5 +1,5 @@
# pyinstaller --onefile --distpath . --workpath ./build --specpath ./build parse_xml.py
# python -m nuitka --standalone --onefile --output-dir=. --output-dir=./build parse_xml.py
# python -m nuitka --standalone --onefile --output-dir=./build parse_xml.py
import xml.etree.ElementTree as ET
import xml.dom.minidom
import sys
@ -348,5 +348,5 @@ with open(output_path, "w", encoding="utf-8") as f:
f.write(pretty_xml)
os.remove(input_path)
#os.remove(info_path)
os.remove(info_path)
print(f"Simplified and formatted XML saved to: {output_path}")