import csv import numbers import time from datetime import datetime from PySide2 import QtWidgets class CsvLogger: """ Логгер, совместимый по формату с C-реализацией CSV_AddTitlesLine / CSV_AddLogLine. Публичный API сохранён: set_titles(varnames) set_value(timestamp, varname, varvalue) select_file(parent=None) -> bool write_to_csv() Использование: 1) set_titles([...]) 2) многократно set_value(ts, name, value) 3) select_file() (по желанию) 4) write_to_csv() """ def __init__(self, filename="log.csv", delimiter=';'): self._filename = filename self._delimiter = delimiter # Пользовательские заголовки self.variable_names_ordered = [] # Полные заголовки CSV (Ticks(X), Ticks(Y), Time(Y), ...) self.headers = ['t'] # до вызова set_titles placeholder # Данные: {timestamp_key: {varname: value, ...}} # timestamp_key = то, что передано в set_value (float/int/etc) self.data_rows = {} # Внутренние структуры для генерации CSV-формата С self._row_wall_dt = {} # {timestamp_key: datetime при первой записи} self._base_ts = None # timestamp_key первой строки (число) self._base_ts_val = 0.0 # float значение первой строки (для delta) self._tick_x_start = 0 # начальный тик (можно менять вручную при необходимости) # ---- Свойства ---- @property def filename(self): return self._filename # ---- Публичные методы ---- def set_titles(self, varnames): """ Устанавливает имена переменных. Формирует полные заголовки CSV в формате С-лога. """ if not isinstance(varnames, list): raise TypeError("Varnames must be a list of strings.") if not all(isinstance(name, str) for name in varnames): raise ValueError("All variable names must be strings.") self.variable_names_ordered = varnames self.headers = ["Ticks(X)", "Ticks(Y)", "Time(Y)"] + self.variable_names_ordered # Сброс данных (структура изменилась) self.data_rows.clear() self._row_wall_dt.clear() self._base_ts = None self._base_ts_val = 0.0 def set_value(self, timestamp, varname, varvalue): """ Установить ОДНО значение в ОДНУ колонку для заданного timestamp’а. timestamp — float секунд с эпохи (time.time()). """ if varname not in self.variable_names_ordered: return # игнор, как у тебя было # Новая строка? if timestamp not in self.data_rows: # Инициализируем поля переменных значением None self.data_rows[timestamp] = {vn: None for vn in self.variable_names_ordered} # Дата/время строки из ПЕРЕДАННОГО timestamp (а не datetime.now()!) try: ts_float = float(timestamp) except Exception: # если какая-то дичь прилетела, пусть будет 0 (эпоха) чтобы не упасть ts_float = 0.0 self._row_wall_dt[timestamp] = datetime.fromtimestamp(ts_float) # База для расчёта Ticks(Y) — первая строка if self._base_ts is None: self._base_ts = timestamp self._base_ts_val = ts_float # Записываем значение self.data_rows[timestamp][varname] = varvalue def select_file(self, parent=None) -> bool: """ Диалог выбора файла. """ options = QtWidgets.QFileDialog.Options() filename, _ = QtWidgets.QFileDialog.getSaveFileName( parent, "Сохранить данные CSV", self._filename, "CSV Files (*.csv);;All Files (*)", options=options ) if filename: if not filename.lower().endswith('.csv'): filename += '.csv' self._filename = filename return True else: return False def write_to_csv(self): """ Формирует CSV в формате C: Ticks(X);Ticks(Y);Time(Y);Var1;Var2;... 0;0,000000;22/07/2025 13:45:12:0123;...;... Правила значений: - Тик X: автоинкремент от 0 (или self._tick_x_start) по порядку сортировки timestamp. - Ticks(Y): дельта (секунды,микросекунды) между текущим timestamp и первым timestamp. - Time(Y): wallclock строки (datetime.now() при первом появлении timestamp). - Значение < 0 -> пустая ячейка (как if(raw_data[i] >= 0) else ;) - None -> пустая ячейка. """ if len(self.headers) <= 3: # только служебные поля без переменных print("Ошибка: Заголовки не установлены или не содержат переменных. Вызовите set_titles() перед записью.") return if not self._filename: print("Ошибка: Имя файла не определено. select_file() или задайте при инициализации.") return if not self.data_rows: print("Предупреждение: Нет данных для записи.") # всё равно создадим файл с одними заголовками try: with open(self._filename, 'w', newline='', encoding='utf-8') as csvfile: # QUOTE_NONE + escapechar для чистого формата без кавычек (как в С-строке) writer = csv.writer( csvfile, delimiter=self._delimiter, quoting=csv.QUOTE_NONE, escapechar='\\', lineterminator='\r\n' ) # Пишем заголовки writer.writerow(self.headers) if self.data_rows: sorted_ts = sorted(self.data_rows.keys(), key=self._ts_sort_key) # убедимся, что база была зафиксирована if self._base_ts is None: self._base_ts = sorted_ts[0] self._base_ts_val = self._coerce_ts_to_float(self._base_ts) tick_x = self._tick_x_start for ts in sorted_ts: row_dict = self.data_rows[ts] # delta по timestamp cur_ts_val = self._coerce_ts_to_float(ts) delta_us = int(round((cur_ts_val - self._base_ts_val) * 1_000_000)) if delta_us < 0: delta_us = 0 # защита seconds = delta_us // 1_000_000 micros = delta_us % 1_000_000 # wallclock строки dt = self._row_wall_dt.get(ts, datetime.now()) # Формат DD/MM/YYYY HH:MM:SS:мммм (4 цифры ms, как в C: us/1000) time_str = dt.strftime("%d/%m/%Y %H:%M:%S") + f":{dt.microsecond // 1000:04d}" # Значения row_vals = [] for vn in self.variable_names_ordered: v = row_dict.get(vn) if v is None: row_vals.append("") # нет данных else: # если числовое и <0 -> пусто (как в C: если raw_data[i] >= 0 else ;) if isinstance(v, numbers.Number) and v < 0: row_vals.append("") else: row_vals.append(v) csv_row = [tick_x, f"{seconds},{micros:06d}", time_str] + row_vals writer.writerow(csv_row) tick_x += 1 print(f"Данные успешно записаны в '{self._filename}'") except Exception as e: print(f"Ошибка при записи в файл '{self._filename}': {e}") # ---- Вспомогательные ---- def _coerce_ts_to_float(self, ts): """ Пробуем привести переданный timestamp к float. Разрешаем int/float/str, остальное -> индекс по порядку (0). """ if isinstance(ts, numbers.Number): return float(ts) try: return float(ts) except Exception: # fallback: нечисловой ключ -> используем порядковый индекс # (таких почти не должно быть, но на всякий) return 0.0 def _ts_sort_key(self, ts): """ Ключ сортировки timestamp’ов — сначала попытка float, потом str. """ if isinstance(ts, numbers.Number): return (0, float(ts)) try: return (0, float(ts)) except Exception: return (1, str(ts))