debugVarTool/Src/auto_updater.py

239 lines
8.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import sys
import shutil
import tempfile
import requests
import subprocess
from packaging.version import Version, InvalidVersion
from bs4 import BeautifulSoup
from PySide2.QtWidgets import (
QApplication, QMessageBox, QProgressDialog
)
from PySide2.QtCore import QThread, Signal
class DownloadThread(QThread):
progress = Signal(int)
finished = Signal(bool, str)
def __init__(self, url, dest_path):
super().__init__()
self.url = url
self.dest_path = dest_path
def run(self):
try:
with requests.get(self.url, stream=True) as r:
r.raise_for_status()
total = int(r.headers.get("content-length", 0))
downloaded = 0
with open(self.dest_path, 'wb') as f:
for chunk in r.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
downloaded += len(chunk)
if total > 0:
self.progress.emit(int(downloaded * 100 / total))
self.finished.emit(True, "")
except Exception as e:
self.finished.emit(False, str(e))
def check_and_update(
current_version: str,
git_releases_url: str,
exe_name: str,
zip_name: str = None,
parent_widget=None
):
print(f"[Updater] Текущая версия: {current_version}")
latest_ver, rel_page = _get_latest_release_info(git_releases_url)
if not latest_ver:
return
try:
current_ver_obj = Version(current_version)
except InvalidVersion:
return
if latest_ver <= current_ver_obj:
print(f"[Updater] Приложение актуально (v{current_version}).")
return
if not _ask_update_dialog(latest_ver, parent_widget):
return
file_url = _get_download_link(rel_page, exe_name, zip_name)
if not file_url:
_show_error("Не удалось найти файл для скачивания.", parent_widget)
return
temp_exe = os.path.join(tempfile.gettempdir(), f"new_{exe_name}")
if file_url.endswith(".zip") or file_url.endswith(".rar"):
# Выбираем расширение временного файла согласно скачиваемому файлу
ext = ".zip" if file_url.endswith(".zip") else ".rar"
temp_archive = temp_exe.replace(".exe", ext)
'''_start_download_gui(file_url, temp_archive, parent_widget, on_finished=lambda ok, err:
_handle_archive_download(ok, err, temp_archive, exe_name, temp_exe, parent_widget))'''
else:
_start_download_gui(file_url, temp_exe, parent_widget, on_finished=lambda ok, err:
_handle_exe_download(ok, err, temp_exe, parent_widget))
'''def _handle_archive_download(success, error, archive_path, exe_name, exe_dest, parent):
if not success:
_show_error(f"Ошибка при скачивании архива: {error}", parent)
return
ok = False
if archive_path.endswith(".zip"):
ok = _extract_exe_from_zip(archive_path, exe_name, exe_dest)
elif archive_path.endswith(".rar"):
ok = _extract_exe_from_rar(archive_path, exe_name, exe_dest)
if not ok:
_show_error(f"Не удалось извлечь {exe_name} из архива.", parent)
return
_update_self(exe_dest)'''
'''def _extract_exe_from_rar(rar_path, exe_name, dest_path):
try:
with rarfile.RarFile(rar_path) as rar_ref:
for file in rar_ref.namelist():
if file.endswith(exe_name):
rar_ref.extract(file, os.path.dirname(dest_path))
src = os.path.join(os.path.dirname(dest_path), file)
shutil.move(src, dest_path)
return True
except Exception as e:
print(f"[Updater] Ошибка при распаковке RAR архива: {e}")
return False'''
def _handle_exe_download(success, error, exe_path, parent):
if not success:
_show_error(f"Ошибка при скачивании файла: {error}", parent)
return
_update_self(exe_path)
def _start_download_gui(url, dest_path, parent, on_finished):
dialog = QProgressDialog("Скачивание обновления...", "Отмена", 0, 100, parent)
dialog.setWindowTitle("Обновление")
dialog.setMinimumDuration(0)
dialog.setAutoClose(False)
dialog.setAutoReset(False)
thread = DownloadThread(url, dest_path)
thread.progress.connect(dialog.setValue)
thread.finished.connect(lambda ok, err: (
dialog.close(),
on_finished(ok, err)
))
thread.start()
dialog.exec_()
def _ask_update_dialog(latest_ver, parent):
msg = QMessageBox(parent)
msg.setIcon(QMessageBox.Information)
msg.setWindowTitle("Доступно обновление")
msg.setText(f"Доступна новая версия: v{latest_ver}\nЖелаете обновиться?")
msg.setStandardButtons(QMessageBox.Yes | QMessageBox.No)
return msg.exec_() == QMessageBox.Yes
def _show_error(text, parent):
msg = QMessageBox(parent)
msg.setIcon(QMessageBox.Critical)
msg.setWindowTitle("Ошибка обновления")
msg.setText(text)
msg.exec_()
def _get_latest_release_info(base_url):
try:
parts = base_url.strip("/").split("/")
owner, repo = parts[-3], parts[-2]
tags_url = f"https://git.arktika.cyou/api/v1/repos/{owner}/{repo}/tags"
response = requests.get(tags_url, timeout=10)
response.raise_for_status()
tags = response.json()
versions = []
for tag in tags:
raw_tag = tag.get("name", "")
norm_tag = raw_tag.lstrip("v")
try:
parsed_ver = Version(norm_tag)
versions.append((parsed_ver, raw_tag))
except InvalidVersion:
continue
if not versions:
return None, None
versions.sort(reverse=True)
latest_ver, latest_tag = versions[0]
release_url = f"https://git.arktika.cyou/{owner}/{repo}/releases/tag/{latest_tag}"
return latest_ver, release_url
except Exception as e:
print(f"[Updater] Ошибка при получении тега через API: {e}")
return None, None
def _get_download_link(release_page_url, exe_name, zip_name=None):
try:
resp = requests.get(release_page_url, timeout=10)
soup = BeautifulSoup(resp.text, "html.parser")
links = soup.select("a[href]")
def normalize(href):
if href.startswith("http"):
return href
return "https://git.arktika.cyou" + href
for link in links:
href = link["href"]
if href.endswith(exe_name):
return normalize(href)
if zip_name:
for link in links:
href = link["href"]
if href.endswith(zip_name):
return normalize(href)
except Exception as e:
print(f"[Updater] Ошибка при поиске файла обновления: {e}")
return None
'''def _extract_exe_from_zip(zip_path, exe_name, dest_path):
try:
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
for file in zip_ref.namelist():
if file.endswith(exe_name):
zip_ref.extract(file, os.path.dirname(dest_path))
src = os.path.join(os.path.dirname(dest_path), file)
shutil.move(src, dest_path)
return True
except Exception as e:
print(f"[Updater] Ошибка при распаковке архива: {e}")
return False'''
def _update_self(new_exe_path):
cur_path = os.path.abspath(sys.executable if getattr(sys, 'frozen', False) else sys.argv[0])
bat_path = os.path.join(tempfile.gettempdir(), "update.bat")
with open(bat_path, "w") as bat:
bat.write(f"""@echo off
timeout /t 2 > nul
taskkill /F /IM "{os.path.basename(cur_path)}" > nul 2>&1
move /Y "{new_exe_path}" "{cur_path}" > nul
start "" "{cur_path}"
del "%~f0"
""")
subprocess.Popen(["cmd", "/c", bat_path])
sys.exit(0)