Мануал Corsair Custom Server Manager

orohimaru2

Выдающийся
Местный
Сообщения
57
Розыгрыши
0
Репутация
114
Реакции
199
Баллы
1 388
Автор не я взял с Англ сайта
Приветствую, друзья! Мне очень понравилось собирать свой сервер Corsair, и после нескольких месяцев игры я наконец-то закончил. Это было превосходно. Я решил поделиться проделанной работой, так как этот форум мне очень помог.
1.webp
Сначала мой менеджер сервера. Мне нравится, что можно запускать/останавливать его нажатием кнопки, а не запускать скрипты.

1. Измените icon.png на icon.ico (он прикреплён)
. А. Мне пришлось изменить тип файла, чтобы он загружался сюда.
2. Поместите фон и значок в ту же папку, что и программа. (Можно менять любой из вариантов, просто оставьте прежнее имя).
3. Установите Python. (Сервер Corsair уже использует Python 3.9.5, поэтому я все построил на его основе.
4. Я намеревался упаковать это как .exe, поэтому есть много комментариев, связанных с этим. Часть программирования также связана с этим. (Вы можете проигнорировать это и просто запустить скрипт).
5. Вам понадобятся некоторые знания по использованию pip для установки необходимых компонентов.
6. Совет от профессионала: ChatGPT вам поможет. Вставьте весь код туда и спросите, как использовать pip для установки необходимых элементов. На самом деле это до смешного просто.
а. пример: pip install psutil
7. Вам нужно обновить пути к файлам. На строках 115, 121, 133 или около них. Я настроил свой сервер на запуск 2 полей (Primal Desert = Season. Black Sands = Hadum/Elvia).
8. Я заметил, что мобы перестают появляться, поэтому я настроил ежедневный перезапуск в 4 утра. Если вы не хотите этого делать, добавьте # в начале строки 150 или около того. Это выглядит как self._schedule_daily_restart(). Если поставить перед ним #, он будет закомментирован (не будет использоваться).

Примечание: я не профессионал. Я ничего не гарантирую. Я не поддерживаю их. Я просто делюсь тем, что у меня есть. Делайте с этим, что хотите.
Код:
# Server Manager for Black Desert Corsair Server
# Author: Njinir
#
# This application allows you to monitor and manage the status of your
# Black Desert Corsair game servers, including IIS, SQL Server, and custom game executables.
# It also provides real-time system resource usage (CPU, RAM, Disk, Network).
#
# --- PyInstaller Instructions ---
# To create a standalone executable (.exe) of this application, follow these steps:
# 1. Ensure you have PyInstaller installed: `pip install pyinstaller`
# 2. Ensure you have Pillow installed: `pip install Pillow`
# 3. Make sure 'icon.ico' and 'bg.png' are in the same directory as this Python script.
# 4. Open your command prompt or terminal in that directory.
# 5. Run the following command:
#    pyinstaller --onefile --windowed --add-data "icon.ico;." --add-data "bg.png;." server_manager.py
#    (Replace 'server_manager.py' with the actual name of your Python script if different)
# 6. After the command finishes, look for the 'dist' folder in your directory.
#    Your executable will be inside, e.g., 'dist\server_manager.exe'.

import tkinter as tk
from tkinter import messagebox, PhotoImage, ttk
import subprocess
import os
import sys
import psutil
import time

# Import Pillow for image manipulation
try:
    from PIL import Image, ImageTk
except ImportError:
    print("Error: 'Pillow' library not found.")
    print("Please install it using: pip install Pillow")
    print("Exiting.")
    sys.exit()

# --- Helper for PyInstaller resource paths ---
def resource_path(relative_path):
    """
    Get the absolute path to a resource, working correctly for both
    development environments and PyInstaller bundled executables.
    PyInstaller creates a temporary folder and stores the path in _MEIPASS.
    """
    try:
        # Check if the application is running as a PyInstaller bundle
        base_path = sys._MEIPASS
    except Exception:
        # If not bundled, assume it's running from the script's directory
        base_path = os.path.abspath(".")
    return os.path.join(base_path, relative_path)

class ServerManagerApp:
    def __init__(self, root):
        self.root = root
        self.root.title("Black Desert Corsair Server Manager")
        # Adjusted window size: width 600, height 800 (taller than wider)
        self.root.geometry("600x800")
        self.root.resizable(False, False)
        self.root.configure(bg="black")

        # Configure root grid to allow main_frame to be centered with background visible
        self.root.grid_rowconfigure(0, weight=1) # Top padding row
        self.root.grid_rowconfigure(1, weight=0) # Row for main_frame (will not expand vertically)
        self.root.grid_rowconfigure(2, weight=1) # Bottom padding row
        self.root.grid_columnconfigure(0, weight=1) # Left padding column
        self.root.grid_columnconfigure(1, weight=0) # Column for main_frame (will not expand horizontally)
        self.root.grid_columnconfigure(2, weight=1) # Right padding column

        # --- Set Application Icon ---
        try:
            self.root.iconbitmap(resource_path("icon.ico"))
        except tk.TclError:
            print("Warning: icon.ico not found or invalid. Skipping application icon.")
        except Exception as e:
            print(f"An unexpected error occurred while setting icon: {e}")

        # --- Set Background Image ---
        try:
            self.bg_image_pil = Image.open(resource_path("bg.png"))
            self.bg_image_tk = ImageTk.PhotoImage(self.bg_image_pil) # Initial Tkinter PhotoImage

            self.bg_label = tk.Label(self.root, image=self.bg_image_tk)
            # Place the background label to cover the entire root window
            self.bg_label.place(x=0, y=0, relwidth=1, relheight=1)
            self.bg_label.lower() # Send to back so other widgets are on top

            # Bind the configure event to resize the background image when the window size changes
            self.root.bind("<Configure>", self._resize_background)
        except FileNotFoundError:
            print("Warning: bg.png not found. Skipping background image.")
        except Exception as e:
            print(f"An unexpected error occurred while setting background: {e}")

        # --- Combined Server Definitions ---
        self.servers = {
            "IIS Server": {
                "type": "service",
                "service_name": "W3SVC",
                "process_name": "w3wp.exe",
                "path": "C:\\Windows\\System32\\inetsrv\\w3wp.exe",
                "start_command": "net start W3SVC",
                "stop_command": "net stop W3SVC"
            },
            "SQL Server": {
                "type": "service",
                "service_name": "MSSQLSERVER",
                "process_name": "sqlservr.exe",
                "path": "C:\\Program Files\\Microsoft SQL Server\\MSSQL15.SQLEXPRESS\\MSSQL\\Binn\\sqlservr.exe",
                "start_command": "net start MSSQLSERVER",
                "stop_command": "net stop MSSQLSERVER"
            },
            "Authentication Server": {
                "type": "executable",
                "process_name": "Authentication_ReleaseOp_x64_unpack.exe",
                "path": r"D:\1_Primal_Desert\bin64\Authentication_ReleaseOp_x64_unpack.exe",
                "args": ["NotDaemon"]
            },
            "Server Manager": {
                "type": "executable",
                "process_name": "CrimsonDesertServerManager_ReleaseOp_x64.exe",
                "path": r"D:\1_Primal_Desert\bin64\CrimsonDesertServerManager_ReleaseOp_x64.exe",
                "args": ["NotDaemon"]
            },
            "Primal Desert": {
                "type": "executable",
                "process_name": "CrimsonDesertServer_ReleaseOp_x64_Corsair_Ida.exe",
                "path": r"D:\1_Primal_Desert\bin64\CrimsonDesertServer_ReleaseOp_x64_Corsair_Ida.exe",
                "args": ["NotDaemon"] # Changed from "-param" to "NotDaemon"
            },
            "Black Sands": {
                "type": "executable",
                "process_name": "CrimsonDesertServer_ReleaseOp_x64_Corsair_Ida.exe",
                "path": r"D:\2_Black_Sands\bin64\CrimsonDesertServer_ReleaseOp_x64_Corsair_Ida.exe",
                "args": ["NotDaemon"] # Changed from "-param" to "NotDaemon"
            }
        }

        self.running_game_processes = {}
        self.orb_canvas_items = {}
        self.status_labels = {}
        self.toggle_buttons = {}

        self.last_net_io_counters = psutil.net_io_counters()
        self.last_net_time = time.time()

        self._create_widgets()
        self._initial_status_check()
        self._schedule_status_update()
        self._schedule_system_metrics_update()
        self._schedule_daily_restart() # DELETE or COMMENT OUT this line if you do not want to do daily restart.

        self.root.protocol("WM_DELETE_WINDOW", self.on_closing)

    def _resize_background(self, event):
        """Resizes the background image to fit the current window dimensions using Pillow."""
        if event.widget == self.root:
            new_width = event.width
            new_height = event.height

            if new_width > 0 and new_height > 0:
                try:
                    # Resize the PIL image to the new window dimensions
                    resized_image = self.bg_image_pil.resize((new_width, new_height), Image.Resampling.LANCZOS)
                    # Convert the resized PIL image to a Tkinter PhotoImage
                    self.bg_image_tk = ImageTk.PhotoImage(resized_image)
                    # Update the label's image
                    self.bg_label.config(image=self.bg_image_tk)
                except Exception as e:
                    print(f"Error resizing background image with Pillow: {e}")

    def _create_widgets(self):
        """Creates all the UI elements for the application."""
        style = ttk.Style()
        style.theme_use('default')
        # Lighter black for the main container
        lighter_black = "#222222"
        style.configure('TFrame', background=lighter_black) # Apply lighter background to TFrame
        style.configure('TLabel', background=lighter_black, foreground='white') # Ensure labels match
        style.configure('TButton', background='#333333', foreground='white', font=('Inter', 10, 'bold'))
        style.map('TButton',
                  background=[('active', '#555555')],
                  foreground=[('disabled', '#888888')])

        main_frame = ttk.Frame(self.root, padding="20", style='TFrame')
        # Place main_frame in the center cell of the root's grid
        main_frame.grid(row=1, column=1) # No sticky here, let padding rows/cols handle centering

        # Configure main_frame's internal grid columns and rows to expand
        main_frame.grid_columnconfigure(0, weight=0) # Orb column
        main_frame.grid_columnconfigure(1, weight=1) # Label column (expands)
        main_frame.grid_columnconfigure(2, weight=0) # Button column

        current_row = 0 # Start row counter for widgets within main_frame

        # --- System Status Section ---
        status_frame = ttk.LabelFrame(main_frame, text="System Status", style='TFrame', labelanchor="nw")
        status_frame.grid(row=current_row, column=0, columnspan=3, pady=10, padx=10, sticky="ew")
        current_row += 1

        self.cpu_label = ttk.Label(status_frame, text="CPU Usage: --%", font=("Arial", 10), style='TLabel')
        self.cpu_label.pack(anchor="w", padx=5, pady=2)
        self.ram_label = ttk.Label(status_frame, text="RAM Usage: --%", font=("Arial", 10), style='TLabel')
        self.ram_label.pack(anchor="w", padx=5, pady=2)
        self.c_drive_label = ttk.Label(status_frame, text="C:\\ Usage: --%", font=("Arial", 10), style='TLabel')
        self.c_drive_label.pack(anchor="w", padx=5, pady=2)
        self.d_drive_label = ttk.Label(status_frame, text="D:\\ Usage: --%", font=("Arial", 10), style='TLabel')
        self.d_drive_label.pack(anchor="w", padx=5, pady=2)
        self.network_label = ttk.Label(status_frame, text="Network: Rx -- KB/s, Tx -- KB/s", font=("Arial", 10), style='TLabel')
        self.network_label.pack(anchor="w", padx=5, pady=2)

        ttk.Separator(main_frame, orient='horizontal').grid(row=current_row, columnspan=3, pady=10, padx=10, sticky="ew")
        current_row += 1

        # --- Server Control Sections ---
        for server_name, config in self.servers.items():
            self._add_server_row(main_frame, server_name, server_name, current_row)
            current_row += 1

    def _add_server_row(self, parent_frame, display_name, identifier, row_num):
        """Helper function to add a row for a server (orb, label, and toggle button)."""
        # Orb Canvas - set background to match the main_frame's lighter black
        orb_canvas = tk.Canvas(parent_frame, width=20, height=20, bg="#222222", highlightthickness=0)
        orb_canvas.grid(row=row_num, column=0, padx=5, pady=5, sticky="w")
        orb_item = orb_canvas.create_oval(5, 5, 15, 15, fill="gray", outline="gray")
        self.orb_canvas_items[identifier] = (orb_canvas, orb_item)

        label = ttk.Label(parent_frame, text=display_name, font=("Inter", 10), style='TLabel')
        label.grid(row=row_num, column=1, padx=5, pady=5, sticky="w")
        self.status_labels[identifier] = label

        button = ttk.Button(parent_frame, text="Checking...")
        button.config(command=lambda id=identifier: self._toggle_server(id))
        button.grid(row=row_num, column=2, padx=5, pady=5, sticky="e")
        self.toggle_buttons[identifier] = button
        button["state"] = "disabled"

    def _set_orb_color(self, identifier, color):
        """Updates the color of a specific orb."""
        if identifier in self.orb_canvas_items:
            canvas, item_id = self.orb_canvas_items[identifier]
            canvas.itemconfig(item_id, fill=color, outline=color)

    def _get_service_status(self, service_name):
        """
        Checks the status of a Windows service using 'sc query'.
        Returns True if running, False if stopped/not found, None if error.
        """
        try:
            result = subprocess.run(
                ["sc", "query", service_name],
                capture_output=True,
                text=True,
                check=False,
                creationflags=subprocess.CREATE_NO_WINDOW
            )
            if "STATE              : 4  RUNNING" in result.stdout:
                return True
            elif "STATE              : 1  STOPPED" in result.stdout or \
                 f"[SC] EnumQueryServicesStatus:OpenService FAILED 1060" in result.stderr:
                return False
            else:
                print(f"Warning: Could not determine status for service '{service_name}'. Output:\n{result.stdout}\n{result.stderr}")
                return None
        except FileNotFoundError:
            messagebox.showerror("Error", "The 'sc' command was not found. Ensure it's in your system's PATH.")
            return None
        except Exception as e:
            print(f"Error checking service '{service_name}': {e}")
            return None

    def _check_executable_status(self, exe_path):
        """
        Checks if a process with the given executable path is currently running.
        Uses psutil for robust process checking.
        """
        normalized_exe_path = os.path.normcase(exe_path)
        for proc in psutil.process_iter(['exe']):
            try:
                if proc.info['exe'] is not None and os.path.normcase(proc.info['exe']) == normalized_exe_path:
                    return True
            except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
                continue
            except Exception as e:
                print(f"Error accessing process info for PID {proc.pid}: {e}")
                continue
        return False

    def _start_server_process(self, server_name):
        """Starts a server process (executable or service)."""
        config = self.servers[server_name]
        server_type = config["type"]

        # Prevent starting if already running
        if (server_type == "executable" and self._check_executable_status(config["path"])) or \
           (server_type == "service" and self._get_service_status(config["service_name"])):
            messagebox.showinfo("Status", f"{server_name} is already running.")
            return

        try:
            if server_type == "service":
                subprocess.Popen(config["start_command"], shell=True, creationflags=subprocess.CREATE_NO_WINDOW)
                messagebox.showinfo("Status", f"Attempting to start {server_name} service. Please check its status in a moment.")
            elif server_type == "executable":
                full_command = [config["path"]] + config.get("args", [])
                working_directory = os.path.dirname(config["path"])
                process = subprocess.Popen(full_command, cwd=working_directory, creationflags=subprocess.CREATE_NO_WINDOW)
                self.running_game_processes[config["path"]] = process # Track executable processes
                messagebox.showinfo("Status", f"Attempting to start {server_name}. Please check its status in a moment.")
                # Small delay and re-check to see if it immediately exited
                self.root.after(1000, lambda: self._check_immediate_exit(server_name, config["path"], process))
        except FileNotFoundError:
            messagebox.showerror("Error", f"Executable or command not found for {server_name}. Please verify the path/command.")
        except Exception as e:
            messagebox.showerror("Error", f"Failed to start {server_name}: {e}")
        finally:
            # Schedule a general status update, which will re-evaluate all servers
            self.root.after(1000, self._update_server_status)


    def _check_immediate_exit(self, server_name, path, process):
        """Checks if an executable process terminated immediately after launch."""
        if process.poll() is not None: # Process has terminated
            print(f"Warning: '{server_name}' process terminated immediately after launch. Exit code: {process.poll()}")
            messagebox.showerror("Server Launch Failed", f"'{server_name}' terminated immediately after launch. Exit code: {process.poll()}. Please check server logs.")
            if path in self.running_game_processes:
                del self.running_game_processes[path]
            # Force UI update for this specific server immediately after crash detection
            self._update_single_server_ui(server_name)

    def _stop_server_process(self, server_name):
        """Stops a server process (executable or service)."""
        config = self.servers[server_name]
        server_type = config["type"]

        # Prevent stopping if already stopped
        if not ((server_type == "executable" and self._check_executable_status(config["path"])) or \
                (server_type == "service" and self._get_service_status(config["service_name"]))):
            messagebox.showinfo("Status", f"{server_name} is already stopped.")
            return

        try:
            if server_type == "service":
                subprocess.Popen(config["stop_command"], shell=True, creationflags=subprocess.CREATE_NO_WINDOW)
                messagebox.showinfo("Status", f"Attempting to stop {server_name} service. Please check its status in a moment.")
            elif server_type == "executable":
                path = config["path"]
                target_psutil_process = None

                # 1. Check if we have a Popen object and convert it to psutil.Process
                if path in self.running_game_processes:
                    popen_obj = self.running_game_processes[path]
                    if popen_obj.poll() is None: # It's still running
                        try:
                            target_psutil_process = psutil.Process(popen_obj.pid)
                        except psutil.NoSuchProcess:
                            # Process might have just exited, or PID is invalid
                            pass
                    # Remove from tracking regardless, as we're now handling it via psutil
                    del self.running_game_processes[path]

                # 2. If not found via Popen, search all processes by path
                if target_psutil_process is None:
                    normalized_path = os.path.normcase(path)
                    for proc in psutil.process_iter(['pid', 'exe']):
                        try:
                            if proc.info['exe'] is not None and os.path.normcase(proc.info['exe']) == normalized_path:
                                target_psutil_process = proc
                                break
                        except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
                            continue

                if target_psutil_process:
                    try:
                        if target_psutil_process.is_running(): # Use psutil's method
                            target_psutil_process.terminate()
                            time_start = time.time()
                            # Wait for up to 60 seconds for graceful termination
                            while target_psutil_process.is_running() and (time.time() - time_start) < 60:
                                time.sleep(0.5) # Check every half second

                            if target_psutil_process.is_running():
                                # If still running after timeout, force kill
                                target_psutil_process.kill()
                                time_start = time.time()
                                # Wait for up to 60 seconds for kill to complete
                                while target_psutil_process.is_running() and (time.time() - time_start) < 60:
                                    time.sleep(0.5)

                        if not target_psutil_process.is_running(): # Final check after attempts
                            messagebox.showinfo("Status", f"{server_name} stopped.")
                        else:
                             messagebox.showerror("Error", f"Failed to stop {server_name} after multiple attempts within timeout.")

                    except psutil.NoSuchProcess:
                        messagebox.showinfo("Status", f"{server_name} was already stopped or disappeared.")
                    except psutil.AccessDenied:
                        messagebox.showerror("Error", f"Access denied to stop {server_name}. Run as administrator.")
                    except Exception as e:
                        messagebox.showerror("Error", f"Error stopping {server_name}: {e}")
                else:
                    messagebox.showinfo("Status", f"Could not find a running process for {server_name} at the specified path to stop.")
        except Exception as e:
            messagebox.showerror("Error", f"Failed to stop {server_name}: {e}")
        finally:
            # Schedule a general status update, which will re-evaluate all servers
            self.root.after(1000, self._update_server_status)

    def _toggle_server(self, server_name):
        """Handles the toggle button click for any server (service or executable)."""
        config = self.servers[server_name]
        server_type = config["type"]

        # Disable button during operation
        self.toggle_buttons[server_name]["state"] = "disabled"
        self.toggle_buttons[server_name].config(text="Working...")

        is_running = False
        if server_type == "service":
            is_running = self._get_service_status(config["service_name"])
        elif server_type == "executable":
            is_running = self._check_executable_status(config["path"])

        if is_running:
            self._stop_server_process(server_name)
        else:
            self._start_server_process(server_name)

        # Re-enable button after operation (status update will set text)
        # The _update_single_server_ui or _update_server_status will handle re-enabling
        # based on the actual status, so no need to explicitly enable here.
        # self.toggle_buttons[server_name]["state"] = "normal"


    def _initial_status_check(self):
        """Performs an immediate status check on app startup."""
        print("Performing initial server status check...")
        # Call the general update, which now uses _update_single_server_ui for each
        self._update_server_status(initial=True)

    def _update_single_server_ui(self, server_name):
        """
        Updates the UI elements (orb, label, button) for a single specified server.
        """
        config = self.servers[server_name]
        is_running = False
        if config["type"] == "service":
            is_running = self._get_service_status(config["service_name"])
        elif config["type"] == "executable":
            is_running = self._check_executable_status(config["path"])

        button = self.toggle_buttons[server_name]
        label = self.status_labels[server_name]

        if is_running:
            self._set_orb_color(server_name, "green")
            label.config(text=f"{server_name} (Running)", foreground="green")
            button.config(text="Stop")
        else:
            self._set_orb_color(server_name, "red")
            label.config(text=f"{server_name} (Stopped)", foreground="red")
            button.config(text="Start")

        # Ensure button is enabled after update, unless it's in a "Working..." state
        if button["text"] != "Working...": # Don't re-enable if it's explicitly disabled for an ongoing operation
             button["state"] = "normal"


    def _update_server_status(self, initial=False):
        """
        Updates the status of all servers and their corresponding UI elements.
        Called periodically.
        """
        for server_name in self.servers.keys():
            self._update_single_server_ui(server_name)

    def _schedule_status_update(self):
        """Schedules the next call to _update_server_status."""
        self.root.after(2000, self._update_server_status)

    def _update_system_metrics(self):
        """
        Fetches and updates the display for CPU, RAM, Disk, and Network usage.
        This function is scheduled to run periodically.
        """
        try:
            # --- CPU Usage ---
            cpu_percent = psutil.cpu_percent(interval=0.5)
            self.cpu_label.config(text=f"CPU Usage: {cpu_percent:.1f}%")

            # --- RAM Usage ---
            ram = psutil.virtual_memory()
            self.ram_label.config(text=f"RAM Usage: {ram.percent:.1f}% ({ram.used / (1024**3):.2f} GB / {ram.total / (1024**3):.2f} GB)")

            # --- Disk Usage (C: and D:) ---
            try:
                c_disk = psutil.disk_usage('C:')
                self.c_drive_label.config(text=f"C:\\ Usage: {c_disk.percent:.1f}% ({c_disk.used / (1024**3):.2f} GB / {c_disk.total / (1024**3):.2f} GB)")
            except Exception:
                self.c_drive_label.config(text="C:\\ Usage: N/A (Drive not found or accessible)")

            try:
                d_disk = psutil.disk_usage('D:')
                self.d_drive_label.config(text=f"D:\\ Usage: {d_disk.percent:.1f}% ({d_disk.used / (1024**3):.2f} GB / {d_disk.total / (1024**3):.2f} GB)")
            except Exception:
                self.d_drive_label.config(text="D:\\ Usage: N/A (Drive not found or accessible)")

            # --- Network Usage ---
            current_net_io_counters = psutil.net_io_counters()
            current_net_time = time.time()

            time_diff = current_net_time - self.last_net_time
            if time_diff > 0:
                bytes_sent_diff = current_net_io_counters.bytes_sent - self.last_net_io_counters.bytes_sent
                bytes_recv_diff = current_net_io_counters.bytes_recv - self.last_net_io_counters.bytes_recv

                tx_speed_kbps = (bytes_sent_diff / time_diff) / 1024
                rx_speed_kbps = (bytes_recv_diff / time_diff) / 1024

                self.network_label.config(text=f"Network: Rx {rx_speed_kbps:.2f} KB/s, Tx {tx_speed_kbps:.2f} KB/s")
            else:
                self.network_label.config(text="Network: Calculating...")

            self.last_net_io_counters = current_net_io_counters
            self.last_net_time = current_net_time

        except Exception as e:
            print(f"Error updating system metrics: {e}")
            self.cpu_label.config(text="CPU Usage: Error")
            self.ram_label.config(text="RAM Usage: Error")
            self.c_drive_label.config(text="C:\\ Usage: Error")
            self.d_drive_label.config(text="D:\\ Usage: Error")
            self.network_label.config(text="Network: Error")

        # Schedule the next update for system metrics after 5000 milliseconds (5 seconds)
        self.root.after(5000, self._update_system_metrics)

    def _schedule_system_metrics_update(self):
        """Schedules the initial and subsequent calls to _update_system_metrics."""
        self.root.after(0, self._update_system_metrics)

    def on_closing(self):
        """Handles closing the application window."""
        print("Closing application. Attempting to terminate any tracked game processes.")
        for path, process_popen in list(self.running_game_processes.items()): # Iterate a copy
            if process_popen.poll() is None: # Still running via Popen object
                try:
                    # Get psutil.Process object from Popen's PID
                    proc_psutil = psutil.Process(process_popen.pid)
                    if proc_psutil.is_running():
                        proc_psutil.terminate()
                        proc_psutil.wait(timeout=2)
                        if proc_psutil.is_running():
                            proc_psutil.kill()
                            proc_psutil.wait(timeout=2)
                        print(f"Terminated process for: {path}")
                except (psutil.NoSuchProcess, psutil.AccessDenied) as e:
                    print(f"Process for {path} already gone or access denied on close: {e}")
                except Exception as e:
                    print(f"Error terminating process {path} on close: {e}")
            # Remove from tracking regardless, as it's either terminated or already gone
            if path in self.running_game_processes:
                del self.running_game_processes[path]
        self.root.destroy()
   
        # --- Silent control helpers (no popups) ---
    def _start_server_process_silent(self, server_name):
        config = self.servers[server_name]
        t = config["type"]

        # already running? skip
        if (t == "executable" and self._check_executable_status(config["path"])) or \
           (t == "service" and self._get_service_status(config["service_name"])):
            return

        try:
            if t == "service":
                subprocess.Popen(config["start_command"], shell=True, creationflags=subprocess.CREATE_NO_WINDOW)
            else:
                full_command = [config["path"]] + config.get("args", [])
                working_directory = os.path.dirname(config["path"])
                p = subprocess.Popen(full_command, cwd=working_directory, creationflags=subprocess.CREATE_NO_WINDOW)
                self.running_game_processes[config["path"]] = p
        except Exception as e:
            print(f"[AUTO] Start failed for {server_name}: {e}")
        finally:
            self.root.after(1500, lambda: self._update_single_server_ui(server_name))

    def _stop_server_process_silent(self, server_name):
        config = self.servers[server_name]
        t = config["type"]

        # already stopped? skip
        if not ((t == "executable" and self._check_executable_status(config["path"])) or \
                (t == "service" and self._get_service_status(config["service_name"]))):
            return

        try:
            if t == "service":
                subprocess.Popen(config["stop_command"], shell=True, creationflags=subprocess.CREATE_NO_WINDOW)
            else:
                path = config["path"]
                target = None

                # Prefer tracked Popen
                if path in self.running_game_processes:
                    po = self.running_game_processes[path]
                    if po.poll() is None:
                        try:
                            target = psutil.Process(po.pid)
                        except psutil.NoSuchProcess:
                            pass
                    del self.running_game_processes[path]

                # Fallback: locate by exe path
                if target is None:
                    npath = os.path.normcase(path)
                    for proc in psutil.process_iter(['pid','exe']):
                        try:
                            if proc.info['exe'] and os.path.normcase(proc.info['exe']) == npath:
                                target = proc
                                break
                        except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
                            continue

                if target:
                    try:
                        if target.is_running():
                            target.terminate()
                            target.wait(timeout=60)
                            if target.is_running():
                                target.kill()
                                target.wait(timeout=60)
                    except Exception as e:
                        print(f"[AUTO] Stop error for {server_name}: {e}")
        except Exception as e:
            print(f"[AUTO] Stop failed for {server_name}: {e}")
        finally:
            self.root.after(1500, lambda: self._update_single_server_ui(server_name))

    # --- Daily restart scheduler ---
    def _schedule_daily_restart(self):
        """Schedule the next 4:00 AM local restart."""
        from datetime import datetime, timedelta, time as dtime

        now = datetime.now()
        target = datetime.combine(now.date(), dtime(4, 0))
        if now >= target:
            target = target + timedelta(days=1)
        delay_ms = int((target - now).total_seconds() * 1000)
        print(f"[AUTO] Next daily restart scheduled at {target} (in {delay_ms/1000:.0f}s)")
        self.root.after(delay_ms, self._run_daily_restart)

    def _run_daily_restart(self):
        """Stop all, wait 5 minutes, then start with 1-minute gaps; reschedule tomorrow."""
        stop_order = ["Authentication Server", "Server Manager", "Primal Desert", "Black Sands"]
        start_order = ["Authentication Server", "Server Manager", "Primal Desert", "Black Sands"]

        print("[AUTO] Daily restart: stopping services/processes...")
        for name in stop_order:
            self._stop_server_process_silent(name)

        # after 5 minutes, begin staggered starts
        self.root.after(5 * 60 * 1000, lambda: self._start_staggered(start_order, index=0))

        # schedule next day right away so it persists even if app stays up
        self._schedule_daily_restart()

    def _start_staggered(self, names, index):
        if index >= len(names):
            print("[AUTO] Daily restart: completed staggered starts.")
            return
        name = names[index]
        print(f"[AUTO] Starting '{name}' (step {index+1}/{len(names)})...")
        self._start_server_process_silent(name)
        # wait 1 minute, then start next
        self.root.after(60 * 1000, lambda: self._start_staggered(names, index + 1))

if __name__ == "__main__":
    # Check for psutil
    try:
        import psutil
    except ImportError:
        print("Error: 'psutil' library not found.")
        print("Please install it using: pip install psutil")
        print("Exiting.")
        sys.exit()

    root = tk.Tk()
    app = ServerManagerApp(root)
    root.mainloop()
 

Вложения

  • bg.webp
    bg.webp
    6,5 КБ · Просмотры: 1
  • icon.webp
    icon.webp
    7,8 КБ · Просмотры: 1

Назад
Сверху