276 lines
9.6 KiB
Python
276 lines
9.6 KiB
Python
import sys
|
|
import csv
|
|
import serial
|
|
import serial.tools.list_ports
|
|
import os
|
|
import subprocess
|
|
from PySide2.QtWidgets import (
|
|
QApplication, QWidget, QVBoxLayout, QLabel, QLineEdit,
|
|
QPushButton, QFileDialog, QMessageBox, QComboBox, QHBoxLayout
|
|
)
|
|
from PySide2.QtCore import Qt
|
|
|
|
class ESPLoggerTerminal(QWidget):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.setWindowTitle("ESP32 Log Downloader")
|
|
self.resize(500, 300)
|
|
|
|
layout = QVBoxLayout(self)
|
|
|
|
# --- COM-port selection ---
|
|
port_layout = QHBoxLayout()
|
|
port_layout.addWidget(QLabel("Serial port:"))
|
|
self.port_combo = QComboBox()
|
|
port_layout.addWidget(self.port_combo)
|
|
self.refresh_btn = QPushButton("Refresh")
|
|
self.refresh_btn.clicked.connect(self.refresh_ports)
|
|
port_layout.addWidget(self.refresh_btn)
|
|
layout.addLayout(port_layout)
|
|
|
|
layout.addWidget(QLabel("Baud rate:"))
|
|
self.baud_input = QLineEdit("115200")
|
|
layout.addWidget(self.baud_input)
|
|
|
|
layout.addWidget(QLabel("CSV separator:"))
|
|
self.sep_input = QLineEdit(";")
|
|
layout.addWidget(self.sep_input)
|
|
|
|
# --- File selection ---
|
|
file_layout = QHBoxLayout()
|
|
self.file_edit = QLineEdit()
|
|
file_layout.addWidget(self.file_edit)
|
|
self.browse_btn = QPushButton("Browse...")
|
|
self.browse_btn.clicked.connect(self.select_file)
|
|
file_layout.addWidget(self.browse_btn)
|
|
layout.addLayout(file_layout)
|
|
|
|
open_layout = QHBoxLayout()
|
|
self.open_file_btn = QPushButton("Open File")
|
|
self.open_file_btn.clicked.connect(self.open_file)
|
|
open_layout.addWidget(self.open_file_btn)
|
|
|
|
self.open_folder_btn = QPushButton("Open Folder")
|
|
self.open_folder_btn.clicked.connect(self.open_folder)
|
|
open_layout.addWidget(self.open_folder_btn)
|
|
layout.addLayout(open_layout)
|
|
|
|
# --- Buttons ---
|
|
self.get_log_btn = QPushButton("Get Log and Save CSV")
|
|
self.get_log_btn.clicked.connect(self.get_log)
|
|
layout.addWidget(self.get_log_btn)
|
|
|
|
self.clear_log_btn = QPushButton("Clear Logs")
|
|
self.clear_log_btn.clicked.connect(self.clear_logs)
|
|
layout.addWidget(self.clear_log_btn)
|
|
|
|
self.status_label = QLabel("")
|
|
self.status_label.setAlignment(Qt.AlignCenter)
|
|
layout.addWidget(self.status_label)
|
|
|
|
# загрузка списка портов при запуске
|
|
self.refresh_ports()
|
|
|
|
def refresh_ports(self):
|
|
"""Обновить список доступных COM/tty"""
|
|
self.port_combo.clear()
|
|
ports = serial.tools.list_ports.comports()
|
|
for p in ports:
|
|
self.port_combo.addItem(f"{p.device} ({p.description})", p.device)
|
|
if not ports:
|
|
self.port_combo.addItem("No ports found", "")
|
|
|
|
def select_file(self):
|
|
fname, _ = QFileDialog.getSaveFileName(self, "Select CSV File", "", "CSV Files (*.csv)")
|
|
if fname:
|
|
self.file_edit.setText(fname)
|
|
|
|
def open_file(self):
|
|
fname = self.file_edit.text().strip()
|
|
if os.path.isfile(fname):
|
|
os.startfile(fname) # Windows only
|
|
else:
|
|
QMessageBox.warning(self, "Warning", "File does not exist")
|
|
|
|
def open_folder(self):
|
|
fname = self.file_edit.text().strip()
|
|
if os.path.isfile(fname):
|
|
subprocess.run(["explorer", f"/select,{os.path.abspath(fname)}"])
|
|
elif os.path.isdir(fname):
|
|
subprocess.run(["explorer", os.path.abspath(fname)])
|
|
else:
|
|
QMessageBox.warning(self, "Warning", "File does not exist")
|
|
|
|
def get_log(self):
|
|
port = self.port_combo.currentData()
|
|
try:
|
|
baud = int(self.baud_input.text().strip())
|
|
except ValueError:
|
|
QMessageBox.warning(self, "Error", "Invalid baud rate")
|
|
return
|
|
sep = self.sep_input.text() or ","
|
|
|
|
if not port:
|
|
QMessageBox.warning(self, "Error", "Select a serial port")
|
|
return
|
|
|
|
fname = self.file_edit.text().strip()
|
|
if not fname:
|
|
QMessageBox.warning(self, "Error", "Select a CSV file to save logs")
|
|
return
|
|
|
|
try:
|
|
self.status_label.setText("Connecting to ESP32...")
|
|
QApplication.processEvents()
|
|
ser = serial.Serial(port, baud, timeout=2) # короче таймаут
|
|
ser.reset_input_buffer()
|
|
ser.reset_output_buffer()
|
|
|
|
ser.write(b's\n')
|
|
import time
|
|
# Ждем 100 мс
|
|
time.sleep(0.1)
|
|
|
|
ser.write(b'd\n')
|
|
|
|
# Ждём первую реакцию от ESP32 (например "=== LOG DUMP START ===")
|
|
pre_line = ser.readline().decode(errors='ignore').strip()
|
|
first_line = ser.readline().decode(errors='ignore').strip()
|
|
if not first_line or "LOG DUMP" not in (first_line or pre_line):
|
|
ser.close()
|
|
QMessageBox.warning(self, "Error", "Selected port is not responding like ESP32")
|
|
return
|
|
|
|
|
|
self.status_label.setText("Receiving log...")
|
|
QApplication.processEvents()
|
|
|
|
lines = []
|
|
start_time = time.time()
|
|
timeout_sec = 30
|
|
|
|
while True:
|
|
if time.time() - start_time > timeout_sec:
|
|
QMessageBox.warning(self, "Timeout", "No complete log received (timeout).")
|
|
break
|
|
|
|
line = ser.readline().decode(errors='ignore').strip()
|
|
if line:
|
|
start_time = time.time()
|
|
if not line:
|
|
continue
|
|
if line.startswith("=== LOG DUMP END ==="):
|
|
break
|
|
if line.startswith("Entry "):
|
|
lines.append(line)
|
|
self.status_label.setText(f"Received {len(lines)} entries...")
|
|
QApplication.processEvents()
|
|
except Exception as e:
|
|
QMessageBox.critical(self, "Error", f"Serial error: {e}")
|
|
return
|
|
finally:
|
|
if 'ser' in locals() and ser and ser.is_open:
|
|
ser.close()
|
|
|
|
if not lines:
|
|
QMessageBox.information(self, "Info", "No log data received")
|
|
return
|
|
|
|
try:
|
|
with open(fname, "w", newline="") as f:
|
|
writer = csv.writer(f, delimiter=sep)
|
|
writer.writerow(["SEQ", "TS", "EVT", "PAYLOAD"])
|
|
for line in lines:
|
|
parts = line.split()
|
|
seq = parts[2].split(":")[1]
|
|
ts = parts[3].split(":")[1]
|
|
evt = parts[4].split(":")[1]
|
|
payload = ""
|
|
if len(parts) > 5 and parts[5].startswith("PAYLOAD:"):
|
|
payload = line.split("PAYLOAD:", 1)[1]
|
|
writer.writerow([seq, ts, evt, payload])
|
|
except Exception as e:
|
|
QMessageBox.critical(self, "Error", f"CSV write error: {e}")
|
|
return
|
|
|
|
self.status_label.setText(f"Log saved to {fname}")
|
|
QMessageBox.information(self, "Success", f"CSV saved: {fname}")
|
|
|
|
|
|
|
|
def clear_logs(self):
|
|
"""Очистка логов на ESP32"""
|
|
port = self.port_combo.currentData()
|
|
try:
|
|
baud = int(self.baud_input.text().strip())
|
|
except ValueError:
|
|
QMessageBox.warning(self, "Error", "Invalid baud rate")
|
|
return
|
|
|
|
if not port:
|
|
QMessageBox.warning(self, "Error", "Select a serial port")
|
|
return
|
|
|
|
try:
|
|
ser = serial.Serial(port, baud, timeout=2)
|
|
ser.reset_input_buffer()
|
|
ser.reset_output_buffer()
|
|
ser.write(b's\n')
|
|
|
|
import time
|
|
# Ждем 100 мс
|
|
time.sleep(0.1)
|
|
|
|
ser.write(b'c\n')
|
|
|
|
|
|
# Ждём первую реакцию от ESP32 (например "=== LOG ERASING ===")
|
|
pre_line = ser.readline().decode(errors='ignore').strip()
|
|
first_line = ser.readline().decode(errors='ignore').strip()
|
|
if not first_line or "LOG ERASING" not in (first_line or pre_line):
|
|
ser.close()
|
|
QMessageBox.warning(self, "Error", "Selected port is not responding like ESP32")
|
|
return
|
|
|
|
self.status_label.setText("Erasing... May take a few second")
|
|
QApplication.processEvents()
|
|
|
|
|
|
lines = []
|
|
start_time = time.time()
|
|
timeout_sec = 15
|
|
|
|
while True:
|
|
if time.time() - start_time > timeout_sec:
|
|
QMessageBox.warning(self, "Timeout", "No confirmation from ESP32 (timeout).")
|
|
break
|
|
|
|
line = ser.readline().decode(errors='ignore').strip()
|
|
if line:
|
|
start_time = time.time()
|
|
if not line:
|
|
continue
|
|
|
|
if "CLEARED" in line:
|
|
self.status_label.setText("Logs cleared on ESP32")
|
|
QMessageBox.information(self, "Success", "Logs cleared on ESP32")
|
|
QApplication.processEvents()
|
|
break
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
QMessageBox.critical(self, "Error", f"Serial error: {e}")
|
|
|
|
finally:
|
|
if 'ser' in locals() and ser and ser.is_open:
|
|
ser.close()
|
|
|
|
if __name__ == "__main__":
|
|
app = QApplication(sys.argv)
|
|
w = ESPLoggerTerminal()
|
|
w.show()
|
|
sys.exit(app.exec_())
|