Автор не я взял с Англ сайта
Приветствую, друзья! Мне очень понравилось собирать свой сервер Corsair, и после нескольких месяцев игры я наконец-то закончил. Это было превосходно. Я решил поделиться проделанной работой, так как этот форум мне очень помог.
Сначала мой менеджер сервера. Мне нравится, что можно запускать/останавливать его нажатием кнопки, а не запускать скрипты.
1. Измените icon.png на icon.ico (он прикреплён)
. А. Мне пришлось изменить тип файла, чтобы он загружался сюда.
2. Поместите фон и значок в ту же папку, что и программа. (Можно менять любой из вариантов, просто оставьте прежнее имя).
3. Установите Python. (Сервер Corsair уже использует Python 3.9.5, поэтому я все построил на его основе.
4. Я намеревался упаковать это как .exe, поэтому есть много комментариев, связанных с этим. Часть программирования также связана с этим. (Вы можете проигнорировать это и просто запустить скрипт).
5. Вам понадобятся некоторые знания по использованию pip для установки необходимых компонентов.
6. Совет от профессионала: ChatGPT вам поможет. Вставьте весь код туда и спросите, как использовать pip для установки необходимых элементов. На самом деле это до смешного просто.
а. пример: pip install psutil
7. Вам нужно обновить пути к файлам. На строках 115, 121, 133 или около них. Я настроил свой сервер на запуск 2 полей (Primal Desert = Season. Black Sands = Hadum/Elvia).
8. Я заметил, что мобы перестают появляться, поэтому я настроил ежедневный перезапуск в 4 утра. Если вы не хотите этого делать, добавьте # в начале строки 150 или около того. Это выглядит как self._schedule_daily_restart(). Если поставить перед ним #, он будет закомментирован (не будет использоваться).
Примечание: я не профессионал. Я ничего не гарантирую. Я не поддерживаю их. Я просто делюсь тем, что у меня есть. Делайте с этим, что хотите.
Приветствую, друзья! Мне очень понравилось собирать свой сервер Corsair, и после нескольких месяцев игры я наконец-то закончил. Это было превосходно. Я решил поделиться проделанной работой, так как этот форум мне очень помог.

Сначала мой менеджер сервера. Мне нравится, что можно запускать/останавливать его нажатием кнопки, а не запускать скрипты.
1. Измените icon.png на icon.ico (он прикреплён)
. А. Мне пришлось изменить тип файла, чтобы он загружался сюда.
2. Поместите фон и значок в ту же папку, что и программа. (Можно менять любой из вариантов, просто оставьте прежнее имя).
3. Установите Python. (Сервер Corsair уже использует Python 3.9.5, поэтому я все построил на его основе.
4. Я намеревался упаковать это как .exe, поэтому есть много комментариев, связанных с этим. Часть программирования также связана с этим. (Вы можете проигнорировать это и просто запустить скрипт).
5. Вам понадобятся некоторые знания по использованию pip для установки необходимых компонентов.
6. Совет от профессионала: ChatGPT вам поможет. Вставьте весь код туда и спросите, как использовать pip для установки необходимых элементов. На самом деле это до смешного просто.
а. пример: pip install psutil
7. Вам нужно обновить пути к файлам. На строках 115, 121, 133 или около них. Я настроил свой сервер на запуск 2 полей (Primal Desert = Season. Black Sands = Hadum/Elvia).
8. Я заметил, что мобы перестают появляться, поэтому я настроил ежедневный перезапуск в 4 утра. Если вы не хотите этого делать, добавьте # в начале строки 150 или около того. Это выглядит как self._schedule_daily_restart(). Если поставить перед ним #, он будет закомментирован (не будет использоваться).
Примечание: я не профессионал. Я ничего не гарантирую. Я не поддерживаю их. Я просто делюсь тем, что у меня есть. Делайте с этим, что хотите.
Код:
# Server Manager for Black Desert Corsair Server
# Author: Njinir
#
# This application allows you to monitor and manage the status of your
# Black Desert Corsair game servers, including IIS, SQL Server, and custom game executables.
# It also provides real-time system resource usage (CPU, RAM, Disk, Network).
#
# --- PyInstaller Instructions ---
# To create a standalone executable (.exe) of this application, follow these steps:
# 1. Ensure you have PyInstaller installed: `pip install pyinstaller`
# 2. Ensure you have Pillow installed: `pip install Pillow`
# 3. Make sure 'icon.ico' and 'bg.png' are in the same directory as this Python script.
# 4. Open your command prompt or terminal in that directory.
# 5. Run the following command:
# pyinstaller --onefile --windowed --add-data "icon.ico;." --add-data "bg.png;." server_manager.py
# (Replace 'server_manager.py' with the actual name of your Python script if different)
# 6. After the command finishes, look for the 'dist' folder in your directory.
# Your executable will be inside, e.g., 'dist\server_manager.exe'.
import tkinter as tk
from tkinter import messagebox, PhotoImage, ttk
import subprocess
import os
import sys
import psutil
import time
# Import Pillow for image manipulation
try:
from PIL import Image, ImageTk
except ImportError:
print("Error: 'Pillow' library not found.")
print("Please install it using: pip install Pillow")
print("Exiting.")
sys.exit()
# --- Helper for PyInstaller resource paths ---
def resource_path(relative_path):
"""
Get the absolute path to a resource, working correctly for both
development environments and PyInstaller bundled executables.
PyInstaller creates a temporary folder and stores the path in _MEIPASS.
"""
try:
# Check if the application is running as a PyInstaller bundle
base_path = sys._MEIPASS
except Exception:
# If not bundled, assume it's running from the script's directory
base_path = os.path.abspath(".")
return os.path.join(base_path, relative_path)
class ServerManagerApp:
def __init__(self, root):
self.root = root
self.root.title("Black Desert Corsair Server Manager")
# Adjusted window size: width 600, height 800 (taller than wider)
self.root.geometry("600x800")
self.root.resizable(False, False)
self.root.configure(bg="black")
# Configure root grid to allow main_frame to be centered with background visible
self.root.grid_rowconfigure(0, weight=1) # Top padding row
self.root.grid_rowconfigure(1, weight=0) # Row for main_frame (will not expand vertically)
self.root.grid_rowconfigure(2, weight=1) # Bottom padding row
self.root.grid_columnconfigure(0, weight=1) # Left padding column
self.root.grid_columnconfigure(1, weight=0) # Column for main_frame (will not expand horizontally)
self.root.grid_columnconfigure(2, weight=1) # Right padding column
# --- Set Application Icon ---
try:
self.root.iconbitmap(resource_path("icon.ico"))
except tk.TclError:
print("Warning: icon.ico not found or invalid. Skipping application icon.")
except Exception as e:
print(f"An unexpected error occurred while setting icon: {e}")
# --- Set Background Image ---
try:
self.bg_image_pil = Image.open(resource_path("bg.png"))
self.bg_image_tk = ImageTk.PhotoImage(self.bg_image_pil) # Initial Tkinter PhotoImage
self.bg_label = tk.Label(self.root, image=self.bg_image_tk)
# Place the background label to cover the entire root window
self.bg_label.place(x=0, y=0, relwidth=1, relheight=1)
self.bg_label.lower() # Send to back so other widgets are on top
# Bind the configure event to resize the background image when the window size changes
self.root.bind("<Configure>", self._resize_background)
except FileNotFoundError:
print("Warning: bg.png not found. Skipping background image.")
except Exception as e:
print(f"An unexpected error occurred while setting background: {e}")
# --- Combined Server Definitions ---
self.servers = {
"IIS Server": {
"type": "service",
"service_name": "W3SVC",
"process_name": "w3wp.exe",
"path": "C:\\Windows\\System32\\inetsrv\\w3wp.exe",
"start_command": "net start W3SVC",
"stop_command": "net stop W3SVC"
},
"SQL Server": {
"type": "service",
"service_name": "MSSQLSERVER",
"process_name": "sqlservr.exe",
"path": "C:\\Program Files\\Microsoft SQL Server\\MSSQL15.SQLEXPRESS\\MSSQL\\Binn\\sqlservr.exe",
"start_command": "net start MSSQLSERVER",
"stop_command": "net stop MSSQLSERVER"
},
"Authentication Server": {
"type": "executable",
"process_name": "Authentication_ReleaseOp_x64_unpack.exe",
"path": r"D:\1_Primal_Desert\bin64\Authentication_ReleaseOp_x64_unpack.exe",
"args": ["NotDaemon"]
},
"Server Manager": {
"type": "executable",
"process_name": "CrimsonDesertServerManager_ReleaseOp_x64.exe",
"path": r"D:\1_Primal_Desert\bin64\CrimsonDesertServerManager_ReleaseOp_x64.exe",
"args": ["NotDaemon"]
},
"Primal Desert": {
"type": "executable",
"process_name": "CrimsonDesertServer_ReleaseOp_x64_Corsair_Ida.exe",
"path": r"D:\1_Primal_Desert\bin64\CrimsonDesertServer_ReleaseOp_x64_Corsair_Ida.exe",
"args": ["NotDaemon"] # Changed from "-param" to "NotDaemon"
},
"Black Sands": {
"type": "executable",
"process_name": "CrimsonDesertServer_ReleaseOp_x64_Corsair_Ida.exe",
"path": r"D:\2_Black_Sands\bin64\CrimsonDesertServer_ReleaseOp_x64_Corsair_Ida.exe",
"args": ["NotDaemon"] # Changed from "-param" to "NotDaemon"
}
}
self.running_game_processes = {}
self.orb_canvas_items = {}
self.status_labels = {}
self.toggle_buttons = {}
self.last_net_io_counters = psutil.net_io_counters()
self.last_net_time = time.time()
self._create_widgets()
self._initial_status_check()
self._schedule_status_update()
self._schedule_system_metrics_update()
self._schedule_daily_restart() # DELETE or COMMENT OUT this line if you do not want to do daily restart.
self.root.protocol("WM_DELETE_WINDOW", self.on_closing)
def _resize_background(self, event):
"""Resizes the background image to fit the current window dimensions using Pillow."""
if event.widget == self.root:
new_width = event.width
new_height = event.height
if new_width > 0 and new_height > 0:
try:
# Resize the PIL image to the new window dimensions
resized_image = self.bg_image_pil.resize((new_width, new_height), Image.Resampling.LANCZOS)
# Convert the resized PIL image to a Tkinter PhotoImage
self.bg_image_tk = ImageTk.PhotoImage(resized_image)
# Update the label's image
self.bg_label.config(image=self.bg_image_tk)
except Exception as e:
print(f"Error resizing background image with Pillow: {e}")
def _create_widgets(self):
"""Creates all the UI elements for the application."""
style = ttk.Style()
style.theme_use('default')
# Lighter black for the main container
lighter_black = "#222222"
style.configure('TFrame', background=lighter_black) # Apply lighter background to TFrame
style.configure('TLabel', background=lighter_black, foreground='white') # Ensure labels match
style.configure('TButton', background='#333333', foreground='white', font=('Inter', 10, 'bold'))
style.map('TButton',
background=[('active', '#555555')],
foreground=[('disabled', '#888888')])
main_frame = ttk.Frame(self.root, padding="20", style='TFrame')
# Place main_frame in the center cell of the root's grid
main_frame.grid(row=1, column=1) # No sticky here, let padding rows/cols handle centering
# Configure main_frame's internal grid columns and rows to expand
main_frame.grid_columnconfigure(0, weight=0) # Orb column
main_frame.grid_columnconfigure(1, weight=1) # Label column (expands)
main_frame.grid_columnconfigure(2, weight=0) # Button column
current_row = 0 # Start row counter for widgets within main_frame
# --- System Status Section ---
status_frame = ttk.LabelFrame(main_frame, text="System Status", style='TFrame', labelanchor="nw")
status_frame.grid(row=current_row, column=0, columnspan=3, pady=10, padx=10, sticky="ew")
current_row += 1
self.cpu_label = ttk.Label(status_frame, text="CPU Usage: --%", font=("Arial", 10), style='TLabel')
self.cpu_label.pack(anchor="w", padx=5, pady=2)
self.ram_label = ttk.Label(status_frame, text="RAM Usage: --%", font=("Arial", 10), style='TLabel')
self.ram_label.pack(anchor="w", padx=5, pady=2)
self.c_drive_label = ttk.Label(status_frame, text="C:\\ Usage: --%", font=("Arial", 10), style='TLabel')
self.c_drive_label.pack(anchor="w", padx=5, pady=2)
self.d_drive_label = ttk.Label(status_frame, text="D:\\ Usage: --%", font=("Arial", 10), style='TLabel')
self.d_drive_label.pack(anchor="w", padx=5, pady=2)
self.network_label = ttk.Label(status_frame, text="Network: Rx -- KB/s, Tx -- KB/s", font=("Arial", 10), style='TLabel')
self.network_label.pack(anchor="w", padx=5, pady=2)
ttk.Separator(main_frame, orient='horizontal').grid(row=current_row, columnspan=3, pady=10, padx=10, sticky="ew")
current_row += 1
# --- Server Control Sections ---
for server_name, config in self.servers.items():
self._add_server_row(main_frame, server_name, server_name, current_row)
current_row += 1
def _add_server_row(self, parent_frame, display_name, identifier, row_num):
"""Helper function to add a row for a server (orb, label, and toggle button)."""
# Orb Canvas - set background to match the main_frame's lighter black
orb_canvas = tk.Canvas(parent_frame, width=20, height=20, bg="#222222", highlightthickness=0)
orb_canvas.grid(row=row_num, column=0, padx=5, pady=5, sticky="w")
orb_item = orb_canvas.create_oval(5, 5, 15, 15, fill="gray", outline="gray")
self.orb_canvas_items[identifier] = (orb_canvas, orb_item)
label = ttk.Label(parent_frame, text=display_name, font=("Inter", 10), style='TLabel')
label.grid(row=row_num, column=1, padx=5, pady=5, sticky="w")
self.status_labels[identifier] = label
button = ttk.Button(parent_frame, text="Checking...")
button.config(command=lambda id=identifier: self._toggle_server(id))
button.grid(row=row_num, column=2, padx=5, pady=5, sticky="e")
self.toggle_buttons[identifier] = button
button["state"] = "disabled"
def _set_orb_color(self, identifier, color):
"""Updates the color of a specific orb."""
if identifier in self.orb_canvas_items:
canvas, item_id = self.orb_canvas_items[identifier]
canvas.itemconfig(item_id, fill=color, outline=color)
def _get_service_status(self, service_name):
"""
Checks the status of a Windows service using 'sc query'.
Returns True if running, False if stopped/not found, None if error.
"""
try:
result = subprocess.run(
["sc", "query", service_name],
capture_output=True,
text=True,
check=False,
creationflags=subprocess.CREATE_NO_WINDOW
)
if "STATE : 4 RUNNING" in result.stdout:
return True
elif "STATE : 1 STOPPED" in result.stdout or \
f"[SC] EnumQueryServicesStatus:OpenService FAILED 1060" in result.stderr:
return False
else:
print(f"Warning: Could not determine status for service '{service_name}'. Output:\n{result.stdout}\n{result.stderr}")
return None
except FileNotFoundError:
messagebox.showerror("Error", "The 'sc' command was not found. Ensure it's in your system's PATH.")
return None
except Exception as e:
print(f"Error checking service '{service_name}': {e}")
return None
def _check_executable_status(self, exe_path):
"""
Checks if a process with the given executable path is currently running.
Uses psutil for robust process checking.
"""
normalized_exe_path = os.path.normcase(exe_path)
for proc in psutil.process_iter(['exe']):
try:
if proc.info['exe'] is not None and os.path.normcase(proc.info['exe']) == normalized_exe_path:
return True
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
continue
except Exception as e:
print(f"Error accessing process info for PID {proc.pid}: {e}")
continue
return False
def _start_server_process(self, server_name):
"""Starts a server process (executable or service)."""
config = self.servers[server_name]
server_type = config["type"]
# Prevent starting if already running
if (server_type == "executable" and self._check_executable_status(config["path"])) or \
(server_type == "service" and self._get_service_status(config["service_name"])):
messagebox.showinfo("Status", f"{server_name} is already running.")
return
try:
if server_type == "service":
subprocess.Popen(config["start_command"], shell=True, creationflags=subprocess.CREATE_NO_WINDOW)
messagebox.showinfo("Status", f"Attempting to start {server_name} service. Please check its status in a moment.")
elif server_type == "executable":
full_command = [config["path"]] + config.get("args", [])
working_directory = os.path.dirname(config["path"])
process = subprocess.Popen(full_command, cwd=working_directory, creationflags=subprocess.CREATE_NO_WINDOW)
self.running_game_processes[config["path"]] = process # Track executable processes
messagebox.showinfo("Status", f"Attempting to start {server_name}. Please check its status in a moment.")
# Small delay and re-check to see if it immediately exited
self.root.after(1000, lambda: self._check_immediate_exit(server_name, config["path"], process))
except FileNotFoundError:
messagebox.showerror("Error", f"Executable or command not found for {server_name}. Please verify the path/command.")
except Exception as e:
messagebox.showerror("Error", f"Failed to start {server_name}: {e}")
finally:
# Schedule a general status update, which will re-evaluate all servers
self.root.after(1000, self._update_server_status)
def _check_immediate_exit(self, server_name, path, process):
"""Checks if an executable process terminated immediately after launch."""
if process.poll() is not None: # Process has terminated
print(f"Warning: '{server_name}' process terminated immediately after launch. Exit code: {process.poll()}")
messagebox.showerror("Server Launch Failed", f"'{server_name}' terminated immediately after launch. Exit code: {process.poll()}. Please check server logs.")
if path in self.running_game_processes:
del self.running_game_processes[path]
# Force UI update for this specific server immediately after crash detection
self._update_single_server_ui(server_name)
def _stop_server_process(self, server_name):
"""Stops a server process (executable or service)."""
config = self.servers[server_name]
server_type = config["type"]
# Prevent stopping if already stopped
if not ((server_type == "executable" and self._check_executable_status(config["path"])) or \
(server_type == "service" and self._get_service_status(config["service_name"]))):
messagebox.showinfo("Status", f"{server_name} is already stopped.")
return
try:
if server_type == "service":
subprocess.Popen(config["stop_command"], shell=True, creationflags=subprocess.CREATE_NO_WINDOW)
messagebox.showinfo("Status", f"Attempting to stop {server_name} service. Please check its status in a moment.")
elif server_type == "executable":
path = config["path"]
target_psutil_process = None
# 1. Check if we have a Popen object and convert it to psutil.Process
if path in self.running_game_processes:
popen_obj = self.running_game_processes[path]
if popen_obj.poll() is None: # It's still running
try:
target_psutil_process = psutil.Process(popen_obj.pid)
except psutil.NoSuchProcess:
# Process might have just exited, or PID is invalid
pass
# Remove from tracking regardless, as we're now handling it via psutil
del self.running_game_processes[path]
# 2. If not found via Popen, search all processes by path
if target_psutil_process is None:
normalized_path = os.path.normcase(path)
for proc in psutil.process_iter(['pid', 'exe']):
try:
if proc.info['exe'] is not None and os.path.normcase(proc.info['exe']) == normalized_path:
target_psutil_process = proc
break
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
continue
if target_psutil_process:
try:
if target_psutil_process.is_running(): # Use psutil's method
target_psutil_process.terminate()
time_start = time.time()
# Wait for up to 60 seconds for graceful termination
while target_psutil_process.is_running() and (time.time() - time_start) < 60:
time.sleep(0.5) # Check every half second
if target_psutil_process.is_running():
# If still running after timeout, force kill
target_psutil_process.kill()
time_start = time.time()
# Wait for up to 60 seconds for kill to complete
while target_psutil_process.is_running() and (time.time() - time_start) < 60:
time.sleep(0.5)
if not target_psutil_process.is_running(): # Final check after attempts
messagebox.showinfo("Status", f"{server_name} stopped.")
else:
messagebox.showerror("Error", f"Failed to stop {server_name} after multiple attempts within timeout.")
except psutil.NoSuchProcess:
messagebox.showinfo("Status", f"{server_name} was already stopped or disappeared.")
except psutil.AccessDenied:
messagebox.showerror("Error", f"Access denied to stop {server_name}. Run as administrator.")
except Exception as e:
messagebox.showerror("Error", f"Error stopping {server_name}: {e}")
else:
messagebox.showinfo("Status", f"Could not find a running process for {server_name} at the specified path to stop.")
except Exception as e:
messagebox.showerror("Error", f"Failed to stop {server_name}: {e}")
finally:
# Schedule a general status update, which will re-evaluate all servers
self.root.after(1000, self._update_server_status)
def _toggle_server(self, server_name):
"""Handles the toggle button click for any server (service or executable)."""
config = self.servers[server_name]
server_type = config["type"]
# Disable button during operation
self.toggle_buttons[server_name]["state"] = "disabled"
self.toggle_buttons[server_name].config(text="Working...")
is_running = False
if server_type == "service":
is_running = self._get_service_status(config["service_name"])
elif server_type == "executable":
is_running = self._check_executable_status(config["path"])
if is_running:
self._stop_server_process(server_name)
else:
self._start_server_process(server_name)
# Re-enable button after operation (status update will set text)
# The _update_single_server_ui or _update_server_status will handle re-enabling
# based on the actual status, so no need to explicitly enable here.
# self.toggle_buttons[server_name]["state"] = "normal"
def _initial_status_check(self):
"""Performs an immediate status check on app startup."""
print("Performing initial server status check...")
# Call the general update, which now uses _update_single_server_ui for each
self._update_server_status(initial=True)
def _update_single_server_ui(self, server_name):
"""
Updates the UI elements (orb, label, button) for a single specified server.
"""
config = self.servers[server_name]
is_running = False
if config["type"] == "service":
is_running = self._get_service_status(config["service_name"])
elif config["type"] == "executable":
is_running = self._check_executable_status(config["path"])
button = self.toggle_buttons[server_name]
label = self.status_labels[server_name]
if is_running:
self._set_orb_color(server_name, "green")
label.config(text=f"{server_name} (Running)", foreground="green")
button.config(text="Stop")
else:
self._set_orb_color(server_name, "red")
label.config(text=f"{server_name} (Stopped)", foreground="red")
button.config(text="Start")
# Ensure button is enabled after update, unless it's in a "Working..." state
if button["text"] != "Working...": # Don't re-enable if it's explicitly disabled for an ongoing operation
button["state"] = "normal"
def _update_server_status(self, initial=False):
"""
Updates the status of all servers and their corresponding UI elements.
Called periodically.
"""
for server_name in self.servers.keys():
self._update_single_server_ui(server_name)
def _schedule_status_update(self):
"""Schedules the next call to _update_server_status."""
self.root.after(2000, self._update_server_status)
def _update_system_metrics(self):
"""
Fetches and updates the display for CPU, RAM, Disk, and Network usage.
This function is scheduled to run periodically.
"""
try:
# --- CPU Usage ---
cpu_percent = psutil.cpu_percent(interval=0.5)
self.cpu_label.config(text=f"CPU Usage: {cpu_percent:.1f}%")
# --- RAM Usage ---
ram = psutil.virtual_memory()
self.ram_label.config(text=f"RAM Usage: {ram.percent:.1f}% ({ram.used / (1024**3):.2f} GB / {ram.total / (1024**3):.2f} GB)")
# --- Disk Usage (C: and D:) ---
try:
c_disk = psutil.disk_usage('C:')
self.c_drive_label.config(text=f"C:\\ Usage: {c_disk.percent:.1f}% ({c_disk.used / (1024**3):.2f} GB / {c_disk.total / (1024**3):.2f} GB)")
except Exception:
self.c_drive_label.config(text="C:\\ Usage: N/A (Drive not found or accessible)")
try:
d_disk = psutil.disk_usage('D:')
self.d_drive_label.config(text=f"D:\\ Usage: {d_disk.percent:.1f}% ({d_disk.used / (1024**3):.2f} GB / {d_disk.total / (1024**3):.2f} GB)")
except Exception:
self.d_drive_label.config(text="D:\\ Usage: N/A (Drive not found or accessible)")
# --- Network Usage ---
current_net_io_counters = psutil.net_io_counters()
current_net_time = time.time()
time_diff = current_net_time - self.last_net_time
if time_diff > 0:
bytes_sent_diff = current_net_io_counters.bytes_sent - self.last_net_io_counters.bytes_sent
bytes_recv_diff = current_net_io_counters.bytes_recv - self.last_net_io_counters.bytes_recv
tx_speed_kbps = (bytes_sent_diff / time_diff) / 1024
rx_speed_kbps = (bytes_recv_diff / time_diff) / 1024
self.network_label.config(text=f"Network: Rx {rx_speed_kbps:.2f} KB/s, Tx {tx_speed_kbps:.2f} KB/s")
else:
self.network_label.config(text="Network: Calculating...")
self.last_net_io_counters = current_net_io_counters
self.last_net_time = current_net_time
except Exception as e:
print(f"Error updating system metrics: {e}")
self.cpu_label.config(text="CPU Usage: Error")
self.ram_label.config(text="RAM Usage: Error")
self.c_drive_label.config(text="C:\\ Usage: Error")
self.d_drive_label.config(text="D:\\ Usage: Error")
self.network_label.config(text="Network: Error")
# Schedule the next update for system metrics after 5000 milliseconds (5 seconds)
self.root.after(5000, self._update_system_metrics)
def _schedule_system_metrics_update(self):
"""Schedules the initial and subsequent calls to _update_system_metrics."""
self.root.after(0, self._update_system_metrics)
def on_closing(self):
"""Handles closing the application window."""
print("Closing application. Attempting to terminate any tracked game processes.")
for path, process_popen in list(self.running_game_processes.items()): # Iterate a copy
if process_popen.poll() is None: # Still running via Popen object
try:
# Get psutil.Process object from Popen's PID
proc_psutil = psutil.Process(process_popen.pid)
if proc_psutil.is_running():
proc_psutil.terminate()
proc_psutil.wait(timeout=2)
if proc_psutil.is_running():
proc_psutil.kill()
proc_psutil.wait(timeout=2)
print(f"Terminated process for: {path}")
except (psutil.NoSuchProcess, psutil.AccessDenied) as e:
print(f"Process for {path} already gone or access denied on close: {e}")
except Exception as e:
print(f"Error terminating process {path} on close: {e}")
# Remove from tracking regardless, as it's either terminated or already gone
if path in self.running_game_processes:
del self.running_game_processes[path]
self.root.destroy()
# --- Silent control helpers (no popups) ---
def _start_server_process_silent(self, server_name):
config = self.servers[server_name]
t = config["type"]
# already running? skip
if (t == "executable" and self._check_executable_status(config["path"])) or \
(t == "service" and self._get_service_status(config["service_name"])):
return
try:
if t == "service":
subprocess.Popen(config["start_command"], shell=True, creationflags=subprocess.CREATE_NO_WINDOW)
else:
full_command = [config["path"]] + config.get("args", [])
working_directory = os.path.dirname(config["path"])
p = subprocess.Popen(full_command, cwd=working_directory, creationflags=subprocess.CREATE_NO_WINDOW)
self.running_game_processes[config["path"]] = p
except Exception as e:
print(f"[AUTO] Start failed for {server_name}: {e}")
finally:
self.root.after(1500, lambda: self._update_single_server_ui(server_name))
def _stop_server_process_silent(self, server_name):
config = self.servers[server_name]
t = config["type"]
# already stopped? skip
if not ((t == "executable" and self._check_executable_status(config["path"])) or \
(t == "service" and self._get_service_status(config["service_name"]))):
return
try:
if t == "service":
subprocess.Popen(config["stop_command"], shell=True, creationflags=subprocess.CREATE_NO_WINDOW)
else:
path = config["path"]
target = None
# Prefer tracked Popen
if path in self.running_game_processes:
po = self.running_game_processes[path]
if po.poll() is None:
try:
target = psutil.Process(po.pid)
except psutil.NoSuchProcess:
pass
del self.running_game_processes[path]
# Fallback: locate by exe path
if target is None:
npath = os.path.normcase(path)
for proc in psutil.process_iter(['pid','exe']):
try:
if proc.info['exe'] and os.path.normcase(proc.info['exe']) == npath:
target = proc
break
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
continue
if target:
try:
if target.is_running():
target.terminate()
target.wait(timeout=60)
if target.is_running():
target.kill()
target.wait(timeout=60)
except Exception as e:
print(f"[AUTO] Stop error for {server_name}: {e}")
except Exception as e:
print(f"[AUTO] Stop failed for {server_name}: {e}")
finally:
self.root.after(1500, lambda: self._update_single_server_ui(server_name))
# --- Daily restart scheduler ---
def _schedule_daily_restart(self):
"""Schedule the next 4:00 AM local restart."""
from datetime import datetime, timedelta, time as dtime
now = datetime.now()
target = datetime.combine(now.date(), dtime(4, 0))
if now >= target:
target = target + timedelta(days=1)
delay_ms = int((target - now).total_seconds() * 1000)
print(f"[AUTO] Next daily restart scheduled at {target} (in {delay_ms/1000:.0f}s)")
self.root.after(delay_ms, self._run_daily_restart)
def _run_daily_restart(self):
"""Stop all, wait 5 minutes, then start with 1-minute gaps; reschedule tomorrow."""
stop_order = ["Authentication Server", "Server Manager", "Primal Desert", "Black Sands"]
start_order = ["Authentication Server", "Server Manager", "Primal Desert", "Black Sands"]
print("[AUTO] Daily restart: stopping services/processes...")
for name in stop_order:
self._stop_server_process_silent(name)
# after 5 minutes, begin staggered starts
self.root.after(5 * 60 * 1000, lambda: self._start_staggered(start_order, index=0))
# schedule next day right away so it persists even if app stays up
self._schedule_daily_restart()
def _start_staggered(self, names, index):
if index >= len(names):
print("[AUTO] Daily restart: completed staggered starts.")
return
name = names[index]
print(f"[AUTO] Starting '{name}' (step {index+1}/{len(names)})...")
self._start_server_process_silent(name)
# wait 1 minute, then start next
self.root.after(60 * 1000, lambda: self._start_staggered(names, index + 1))
if __name__ == "__main__":
# Check for psutil
try:
import psutil
except ImportError:
print("Error: 'psutil' library not found.")
print("Please install it using: pip install psutil")
print("Exiting.")
sys.exit()
root = tk.Tk()
app = ServerManagerApp(root)
root.mainloop()