import sys import socket import threading import time from PySide2.QtWidgets import ( QApplication, QWidget, QVBoxLayout, QLabel, QLineEdit, QPushButton, QMessageBox, QSpinBox ) from PySide2.QtCore import Qt, Signal, QObject class LogSignal(QObject): log_msg = Signal(str) class ESPClientTerminal(QWidget): def __init__(self): super().__init__() self.setWindowTitle("ESP32 TCP Client") self.resize(400, 200) self.layout = QVBoxLayout(self) self.layout.addWidget(QLabel("Server IP:")) self.ip_input = QLineEdit("192.168.0.96") self.layout.addWidget(self.ip_input) self.layout.addWidget(QLabel("Server port:")) self.port_input = QLineEdit("1234") self.layout.addWidget(self.port_input) self.layout.addWidget(QLabel("Packet interval (ms):")) self.interval_input = QSpinBox() self.interval_input.setRange(50, 10000) self.interval_input.setValue(500) self.layout.addWidget(self.interval_input) self.start_btn = QPushButton("Start Sending") self.start_btn.clicked.connect(self.start_sending) self.layout.addWidget(self.start_btn) self.stop_btn = QPushButton("Stop Sending") self.stop_btn.clicked.connect(self.stop_sending) self.stop_btn.setEnabled(False) self.layout.addWidget(self.stop_btn) self.status_label = QLabel("") self.status_label.setAlignment(Qt.AlignCenter) self.layout.addWidget(self.status_label) self.running = False self.thread = None self.log_signal = LogSignal() self.log_signal.log_msg.connect(self.update_status) def update_status(self, msg): self.status_label.setText(msg) def start_sending(self): server_ip = self.ip_input.text().strip() try: server_port = int(self.port_input.text().strip()) except ValueError: QMessageBox.warning(self, "Error", "Invalid port") return interval = self.interval_input.value() / 1000.0 # ms -> sec if not server_ip: QMessageBox.warning(self, "Error", "Enter server IP") return self.running = True self.start_btn.setEnabled(False) self.stop_btn.setEnabled(True) self.thread = threading.Thread(target=self.send_loop, args=(server_ip, server_port, interval), daemon=True) self.thread.start() def stop_sending(self): self.running = False self.start_btn.setEnabled(True) self.stop_btn.setEnabled(False) self.status_label.setText("Stopped sending") def send_loop(self, ip, port, interval): seq = 0 sock = None while self.running: try: # Если сокета нет – пытаемся подключиться if sock is None: self.log_signal.log_msg.emit(f"Connecting to {ip}:{port}...") sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(5) sock.connect((ip, port)) self.log_signal.log_msg.emit(f"Connected to {ip}:{port}") # Отправляем данные seq += 1 ts = int(time.time() * 1000) payload = "PING" msg = f"SEQ:{seq} TS:{ts} PAYLOAD:{payload}\n" sock.sendall(msg.encode()) # Читаем ответ data = sock.recv(1024).decode() self.log_signal.log_msg.emit(f"Sent SEQ:{seq} | Echo: {data.strip()}") time.sleep(interval) except Exception as e: self.log_signal.log_msg.emit(f"Connection lost: {e}") if sock: try: sock.close() except: pass sock = None time.sleep(2) # ждём перед реконнектом if sock: try: sock.close() except: pass self.log_signal.log_msg.emit("Stopped / Disconnected") if __name__ == "__main__": app = QApplication(sys.argv) w = ESPClientTerminal() w.show() sys.exit(app.exec_())