ESP_WifiTest/esp_client_emu.py

132 lines
4.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import sys
import socket
import threading
import time
from PySide2.QtWidgets import (
QApplication, QWidget, QVBoxLayout, QLabel, QLineEdit,
QPushButton, QMessageBox, QSpinBox
)
from PySide2.QtCore import Qt, Signal, QObject
class LogSignal(QObject):
log_msg = Signal(str)
class ESPClientTerminal(QWidget):
def __init__(self):
super().__init__()
self.setWindowTitle("ESP32 TCP Client")
self.resize(400, 200)
self.layout = QVBoxLayout(self)
self.layout.addWidget(QLabel("Server IP:"))
self.ip_input = QLineEdit("192.168.0.96")
self.layout.addWidget(self.ip_input)
self.layout.addWidget(QLabel("Server port:"))
self.port_input = QLineEdit("1234")
self.layout.addWidget(self.port_input)
self.layout.addWidget(QLabel("Packet interval (ms):"))
self.interval_input = QSpinBox()
self.interval_input.setRange(50, 10000)
self.interval_input.setValue(500)
self.layout.addWidget(self.interval_input)
self.start_btn = QPushButton("Start Sending")
self.start_btn.clicked.connect(self.start_sending)
self.layout.addWidget(self.start_btn)
self.stop_btn = QPushButton("Stop Sending")
self.stop_btn.clicked.connect(self.stop_sending)
self.stop_btn.setEnabled(False)
self.layout.addWidget(self.stop_btn)
self.status_label = QLabel("")
self.status_label.setAlignment(Qt.AlignCenter)
self.layout.addWidget(self.status_label)
self.running = False
self.thread = None
self.log_signal = LogSignal()
self.log_signal.log_msg.connect(self.update_status)
def update_status(self, msg):
self.status_label.setText(msg)
def start_sending(self):
server_ip = self.ip_input.text().strip()
try:
server_port = int(self.port_input.text().strip())
except ValueError:
QMessageBox.warning(self, "Error", "Invalid port")
return
interval = self.interval_input.value() / 1000.0 # ms -> sec
if not server_ip:
QMessageBox.warning(self, "Error", "Enter server IP")
return
self.running = True
self.start_btn.setEnabled(False)
self.stop_btn.setEnabled(True)
self.thread = threading.Thread(target=self.send_loop, args=(server_ip, server_port, interval), daemon=True)
self.thread.start()
def stop_sending(self):
self.running = False
self.start_btn.setEnabled(True)
self.stop_btn.setEnabled(False)
self.status_label.setText("Stopped sending")
def send_loop(self, ip, port, interval):
seq = 0
sock = None
while self.running:
try:
# Если сокета нет пытаемся подключиться
if sock is None:
self.log_signal.log_msg.emit(f"Connecting to {ip}:{port}...")
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5)
sock.connect((ip, port))
self.log_signal.log_msg.emit(f"Connected to {ip}:{port}")
# Отправляем данные
seq += 1
ts = int(time.time() * 1000)
payload = "PING"
msg = f"SEQ:{seq} TS:{ts} PAYLOAD:{payload}\n"
sock.sendall(msg.encode())
# Читаем ответ
data = sock.recv(1024).decode()
self.log_signal.log_msg.emit(f"Sent SEQ:{seq} | Echo: {data.strip()}")
time.sleep(interval)
except Exception as e:
self.log_signal.log_msg.emit(f"Connection lost: {e}")
if sock:
try:
sock.close()
except:
pass
sock = None
time.sleep(2) # ждём перед реконнектом
if sock:
try:
sock.close()
except:
pass
self.log_signal.log_msg.emit("Stopped / Disconnected")
if __name__ == "__main__":
app = QApplication(sys.argv)
w = ESPClientTerminal()
w.show()
sys.exit(app.exec_())