Greetings friends. I have really enjoyed building my own Corsair server and after playing for several months, I'm done. It was excellent. I decided to share the work that I made, since this forum has helped me so much.
First up is my server manager. I like being able to start/stop by button pressing and not launching scripts.
1. change icon.png to icon.ico (its attached)
a. I had to change the file type so it would upload here.
2. place the bg and icon in the same folder as the program. (feel free to change either. just keep the same name)
3. Install python. (The Corsair server already uses python 3.9.5 so that is what I have built everything around.
4. I had intended to package this as an .exe so there are many comments related to that. Some of the programming is related to that as well. (you can ignore that and just run the script).
5. You will need a little bit of knowledge on using pip to install the prerequisites.
6. Pro Tip: ChatGPT is your friend. Paste the whole code in there and ask how to use pip to install the needed items. Its actually stupid easy.
a. example: pip install psutil
7. You will want to update the file paths. On or around line 115, 121, 133. I've set mine up to run 2 fields (Primal Desert = Season. Black Sands = Hadum/Elvia).
8. I noticed that mobs will stop spawning so I set up a daily restart at 4am. If you do not want to do that, add a # in front of line 150 or so. It reads self._schedule_daily_restart(). If you put a # in front of it, its commented out (not used).
Note: I'm not a professional. I dont warranty anything. I dont support them. I just share what I got. Feel free to do whatever you like with it.
# Server Manager for Black Desert Corsair Server
# Author: Njinir
#
# This application allows you to monitor and manage the status of your
# Black Desert Corsair game servers, including IIS, SQL Server, and custom game executables.
# It also provides real-time system resource usage (CPU, RAM, Disk, Network).
#
# --- PyInstaller Instructions ---
# To create a standalone executable (.exe) of this application, follow these steps:
# 1. Ensure you have PyInstaller installed: `pip install pyinstaller`
# 2. Ensure you have Pillow installed: `pip install Pillow`
# 3. Make sure 'icon.ico' and 'bg.png' are in the same directory as this Python script.
# 4. Open your command prompt or terminal in that directory.
# 5. Run the following command:
# pyinstaller --onefile --windowed --add-data "icon.ico;." --add-data "bg.png;." server_manager.py
# (Replace 'server_manager.py' with the actual name of your Python script if different)
# 6. After the command finishes, look for the 'dist' folder in your directory.
# Your executable will be inside, e.g., 'dist\server_manager.exe'.
import tkinter as tk
from tkinter import messagebox, PhotoImage, ttk
import subprocess
import os
import sys
import psutil
import time
# Import Pillow for image manipulation
try:
from PIL import Image, ImageTk
except ImportError:
print("Error: 'Pillow' library not found.")
print("Please install it using: pip install Pillow")
print("Exiting.")
sys.exit()
# --- Helper for PyInstaller resource paths ---
def resource_path(relative_path):
"""
Get the absolute path to a resource, working correctly for both
development environments and PyInstaller bundled executables.
PyInstaller creates a temporary folder and stores the path in _MEIPASS.
"""
try:
# Check if the application is running as a PyInstaller bundle
base_path = sys._MEIPASS
except Exception:
# If not bundled, assume it's running from the script's directory
base_path = os.path.abspath(".")
return os.path.join(base_path, relative_path)
class ServerManagerApp:
def __init__(self, root):
self.root = root
self.root.title("Black Desert Corsair Server Manager")
# Adjusted window size: width 600, height 800 (taller than wider)
self.root.geometry("600x800")
self.root.resizable(False, False)
self.root.configure(bg="black")
# Configure root grid to allow main_frame to be centered with background visible
self.root.grid_rowconfigure(0, weight=1) # Top padding row
self.root.grid_rowconfigure(1, weight=0) # Row for main_frame (will not expand vertically)
self.root.grid_rowconfigure(2, weight=1) # Bottom padding row
self.root.grid_columnconfigure(0, weight=1) # Left padding column
self.root.grid_columnconfigure(1, weight=0) # Column for main_frame (will not expand horizontally)
self.root.grid_columnconfigure(2, weight=1) # Right padding column
# --- Set Application Icon ---
try:
self.root.iconbitmap(resource_path("icon.ico"))
except tk.TclError:
print("Warning: icon.ico not found or invalid. Skipping application icon.")
except Exception as e:
print(f"An unexpected error occurred while setting icon: {e}")
# --- Set Background Image ---
try:
self.bg_image_pil = Image.open(resource_path("bg.png"))
self.bg_image_tk = ImageTk.PhotoImage(self.bg_image_pil) # Initial Tkinter PhotoImage
self.bg_label = tk.Label(self.root, image=self.bg_image_tk)
# Place the background label to cover the entire root window
self.bg_label.place(x=0, y=0, relwidth=1, relheight=1)
self.bg_label.lower() # Send to back so other widgets are on top
# Bind the configure event to resize the background image when the window size changes
self.root.bind("<Configure>", self._resize_background)
except FileNotFoundError:
print("Warning: bg.png not found. Skipping background image.")
except Exception as e:
print(f"An unexpected error occurred while setting background: {e}")
# --- Combined Server Definitions ---
self.servers = {
"IIS Server": {
"type": "service",
"service_name": "W3SVC",
"process_name": "w3wp.exe",
"path": "C:\\Windows\\System32\\inetsrv\\w3wp.exe",
"start_command": "net start W3SVC",
"stop_command": "net stop W3SVC"
},
"SQL Server": {
"type": "service",
"service_name": "MSSQLSERVER",
"process_name": "sqlservr.exe",
"path": "C:\\Program Files\\Microsoft SQL Server\\MSSQL15.SQLEXPRESS\\MSSQL\\Binn\\sqlservr.exe",
"start_command": "net start MSSQLSERVER",
"stop_command": "net stop MSSQLSERVER"
},
"Authentication Server": {
"type": "executable",
"process_name": "Authentication_ReleaseOp_x64_unpack.exe",
"path": r"D:\1_Primal_Desert\bin64\Authentication_ReleaseOp_x64_unpack.exe",
"args": ["NotDaemon"]
},
"Server Manager": {
"type": "executable",
"process_name": "CrimsonDesertServerManager_ReleaseOp_x64.exe",
"path": r"D:\1_Primal_Desert\bin64\CrimsonDesertServerManager_ReleaseOp_x64.exe",
"args": ["NotDaemon"]
},
"Primal Desert": {
"type": "executable",
"process_name": "CrimsonDesertServer_ReleaseOp_x64_Corsair_Ida.exe",
"path": r"D:\1_Primal_Desert\bin64\CrimsonDesertServer_ReleaseOp_x64_Corsair_Ida.exe",
"args": ["NotDaemon"] # Changed from "-param" to "NotDaemon"
},
"Black Sands": {
"type": "executable",
"process_name": "CrimsonDesertServer_ReleaseOp_x64_Corsair_Ida.exe",
"path": r"D:\2_Black_Sands\bin64\CrimsonDesertServer_ReleaseOp_x64_Corsair_Ida.exe",
"args": ["NotDaemon"] # Changed from "-param" to "NotDaemon"
}
}
self.running_game_processes = {}
self.orb_canvas_items = {}
self.status_labels = {}
self.toggle_buttons = {}
self.last_net_io_counters = psutil.net_io_counters()
self.last_net_time = time.time()
self._create_widgets()
self._initial_status_check()
self._schedule_status_update()
self._schedule_system_metrics_update()
self._schedule_daily_restart() # DELETE or COMMENT OUT this line if you do not want to do daily restart.
self.root.protocol("WM_DELETE_WINDOW", self.on_closing)
def _resize_background(self, event):
"""Resizes the background image to fit the current window dimensions using Pillow."""
if event.widget == self.root:
new_width = event.width
new_height = event.height
if new_width > 0 and new_height > 0:
try:
# Resize the PIL image to the new window dimensions
resized_image = self.bg_image_pil.resize((new_width, new_height), Image.Resampling.LANCZOS)
# Convert the resized PIL image to a Tkinter PhotoImage
self.bg_image_tk = ImageTk.PhotoImage(resized_image)
# Update the label's image
self.bg_label.config(image=self.bg_image_tk)
except Exception as e:
print(f"Error resizing background image with Pillow: {e}")
def _create_widgets(self):
"""Creates all the UI elements for the application."""
style = ttk.Style()
style.theme_use('default')
# Lighter black for the main container
lighter_black = "#222222"
style.configure('TFrame', background=lighter_black) # Apply lighter background to TFrame
style.configure('TLabel', background=lighter_black, foreground='white') # Ensure labels match
style.configure('TButton', background='#333333', foreground='white', font=('Inter', 10, 'bold'))
style.map('TButton',
background=[('active', '#555555')],
foreground=[('disabled', '#888888')])
main_frame = ttk.Frame(self.root, padding="20", style='TFrame')
# Place main_frame in the center cell of the root's grid
main_frame.grid(row=1, column=1) # No sticky here, let padding rows/cols handle centering
# Configure main_frame's internal grid columns and rows to expand
main_frame.grid_columnconfigure(0, weight=0) # Orb column
main_frame.grid_columnconfigure(1, weight=1) # Label column (expands)
main_frame.grid_columnconfigure(2, weight=0) # Button column
current_row = 0 # Start row counter for widgets within main_frame
# --- System Status Section ---
status_frame = ttk.LabelFrame(main_frame, text="System Status", style='TFrame', labelanchor="nw")
status_frame.grid(row=current_row, column=0, columnspan=3, pady=10, padx=10, sticky="ew")
current_row += 1
self.cpu_label = ttk.Label(status_frame, text="CPU Usage: --%", font=("Arial", 10), style='TLabel')
self.cpu_label.pack(anchor="w", padx=5, pady=2)
self.ram_label = ttk.Label(status_frame, text="RAM Usage: --%", font=("Arial", 10), style='TLabel')
self.ram_label.pack(anchor="w", padx=5, pady=2)
self.c_drive_label = ttk.Label(status_frame, text="C:\\ Usage: --%", font=("Arial", 10), style='TLabel')
self.c_drive_label.pack(anchor="w", padx=5, pady=2)
self.d_drive_label = ttk.Label(status_frame, text="D:\\ Usage: --%", font=("Arial", 10), style='TLabel')
self.d_drive_label.pack(anchor="w", padx=5, pady=2)
self.network_label = ttk.Label(status_frame, text="Network: Rx -- KB/s, Tx -- KB/s", font=("Arial", 10), style='TLabel')
self.network_label.pack(anchor="w", padx=5, pady=2)
ttk.Separator(main_frame, orient='horizontal').grid(row=current_row, columnspan=3, pady=10, padx=10, sticky="ew")
current_row += 1
# --- Server Control Sections ---
for server_name, config in self.servers.items():
self._add_server_row(main_frame, server_name, server_name, current_row)
current_row += 1
def _add_server_row(self, parent_frame, display_name, identifier, row_num):
"""Helper function to add a row for a server (orb, label, and toggle button)."""
# Orb Canvas - set background to match the main_frame's lighter black
orb_canvas = tk.Canvas(parent_frame, width=20, height=20, bg="#222222", highlightthickness=0)
orb_canvas.grid(row=row_num, column=0, padx=5, pady=5, sticky="w")
orb_item = orb_canvas.create_oval(5, 5, 15, 15, fill="gray", outline="gray")
self.orb_canvas_items[identifier] = (orb_canvas, orb_item)
label = ttk.Label(parent_frame, text=display_name, font=("Inter", 10), style='TLabel')
label.grid(row=row_num, column=1, padx=5, pady=5, sticky="w")
self.status_labels[identifier] = label
button = ttk.Button(parent_frame, text="Checking...")
button.config(command=lambda id=identifier: self._toggle_server(id))
button.grid(row=row_num, column=2, padx=5, pady=5, sticky="e")
self.toggle_buttons[identifier] = button
button["state"] = "disabled"
def _set_orb_color(self, identifier, color):
"""Updates the color of a specific orb."""
if identifier in self.orb_canvas_items:
canvas, item_id = self.orb_canvas_items[identifier]
canvas.itemconfig(item_id, fill=color, outline=color)
def _get_service_status(self, service_name):
"""
Checks the status of a Windows service using 'sc query'.
Returns True if running, False if stopped/not found, None if error.
"""
try:
result = subprocess.run(
["sc", "query", service_name],
capture_output=True,
text=True,
check=False,
creationflags=subprocess.CREATE_NO_WINDOW
)
if "STATE : 4 RUNNING" in result.stdout:
return True
elif "STATE : 1 STOPPED" in result.stdout or \
f"[SC] EnumQueryServicesStatus:OpenService FAILED 1060" in result.stderr:
return False
else:
print(f"Warning: Could not determine status for service '{service_name}'. Output:\n{result.stdout}\n{result.stderr}")
return None
except FileNotFoundError:
messagebox.showerror("Error", "The 'sc' command was not found. Ensure it's in your system's PATH.")
return None
except Exception as e:
print(f"Error checking service '{service_name}': {e}")
return None
def _check_executable_status(self, exe_path):
"""
Checks if a process with the given executable path is currently running.
Uses psutil for robust process checking.
"""
normalized_exe_path = os.path.normcase(exe_path)
for proc in psutil.process_iter(['exe']):
try:
if proc.info['exe'] is not None and os.path.normcase(proc.info['exe']) == normalized_exe_path:
return True
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
continue
except Exception as e:
print(f"Error accessing process info for PID {proc.pid}: {e}")
continue
return False
def _start_server_process(self, server_name):
"""Starts a server process (executable or service)."""
config = self.servers[server_name]
server_type = config["type"]
# Prevent starting if already running
if (server_type == "executable" and self._check_executable_status(config["path"])) or \
(server_type == "service" and self._get_service_status(config["service_name"])):
messagebox.showinfo("Status", f"{server_name} is already running.")
return
try:
if server_type == "service":
subprocess.Popen(config["start_command"], shell=True, creationflags=subprocess.CREATE_NO_WINDOW)
messagebox.showinfo("Status", f"Attempting to start {server_name} service. Please check its status in a moment.")
elif server_type == "executable":
full_command = [config["path"]] + config.get("args", [])
working_directory = os.path.dirname(config["path"])
process = subprocess.Popen(full_command, cwd=working_directory, creationflags=subprocess.CREATE_NO_WINDOW)
self.running_game_processes[config["path"]] = process # Track executable processes
messagebox.showinfo("Status", f"Attempting to start {server_name}. Please check its status in a moment.")
# Small delay and re-check to see if it immediately exited
self.root.after(1000, lambda: self._check_immediate_exit(server_name, config["path"], process))
except FileNotFoundError:
messagebox.showerror("Error", f"Executable or command not found for {server_name}. Please verify the path/command.")
except Exception as e:
messagebox.showerror("Error", f"Failed to start {server_name}: {e}")
finally:
# Schedule a general status update, which will re-evaluate all servers
self.root.after(1000, self._update_server_status)
def _check_immediate_exit(self, server_name, path, process):
"""Checks if an executable process terminated immediately after launch."""
if process.poll() is not None: # Process has terminated
print(f"Warning: '{server_name}' process terminated immediately after launch. Exit code: {process.poll()}")
messagebox.showerror("Server Launch Failed", f"'{server_name}' terminated immediately after launch. Exit code: {process.poll()}. Please check server logs.")
if path in self.running_game_processes:
del self.running_game_processes[path]
# Force UI update for this specific server immediately after crash detection
self._update_single_server_ui(server_name)
def _stop_server_process(self, server_name):
"""Stops a server process (executable or service)."""
config = self.servers[server_name]
server_type = config["type"]
# Prevent stopping if already stopped
if not ((server_type == "executable" and self._check_executable_status(config["path"])) or \
(server_type == "service" and self._get_service_status(config["service_name"]))):
messagebox.showinfo("Status", f"{server_name} is already stopped.")
return
try:
if server_type == "service":
subprocess.Popen(config["stop_command"], shell=True, creationflags=subprocess.CREATE_NO_WINDOW)
messagebox.showinfo("Status", f"Attempting to stop {server_name} service. Please check its status in a moment.")
elif server_type == "executable":
path = config["path"]
target_psutil_process = None
# 1. Check if we have a Popen object and convert it to psutil.Process
if path in self.running_game_processes:
popen_obj = self.running_game_processes[path]
if popen_obj.poll() is None: # It's still running
try:
target_psutil_process = psutil.Process(popen_obj.pid)
except psutil.NoSuchProcess:
# Process might have just exited, or PID is invalid
pass
# Remove from tracking regardless, as we're now handling it via psutil
del self.running_game_processes[path]
# 2. If not found via Popen, search all processes by path
if target_psutil_process is None:
normalized_path = os.path.normcase(path)
for proc in psutil.process_iter(['pid', 'exe']):
try:
if proc.info['exe'] is not None and os.path.normcase(proc.info['exe']) == normalized_path:
target_psutil_process = proc
break
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
continue
if target_psutil_process:
try:
if target_psutil_process.is_running(): # Use psutil's method
target_psutil_process.terminate()
time_start = time.time()
# Wait for up to 60 seconds for graceful termination
while target_psutil_process.is_running() and (time.time() - time_start) < 60:
time.sleep(0.5) # Check every half second
if target_psutil_process.is_running():
# If still running after timeout, force kill
target_psutil_process.kill()
time_start = time.time()
# Wait for up to 60 seconds for kill to complete
while target_psutil_process.is_running() and (time.time() - time_start) < 60:
time.sleep(0.5)
if not target_psutil_process.is_running(): # Final check after attempts
messagebox.showinfo("Status", f"{server_name} stopped.")
else:
messagebox.showerror("Error", f"Failed to stop {server_name} after multiple attempts within timeout.")
except psutil.NoSuchProcess:
messagebox.showinfo("Status", f"{server_name} was already stopped or disappeared.")
except psutil.AccessDenied:
messagebox.showerror("Error", f"Access denied to stop {server_name}. Run as administrator.")
except Exception as e:
messagebox.showerror("Error", f"Error stopping {server_name}: {e}")
else:
messagebox.showinfo("Status", f"Could not find a running process for {server_name} at the specified path to stop.")
except Exception as e:
messagebox.showerror("Error", f"Failed to stop {server_name}: {e}")
finally:
# Schedule a general status update, which will re-evaluate all servers
self.root.after(1000, self._update_server_status)
def _toggle_server(self, server_name):
"""Handles the toggle button click for any server (service or executable)."""
config = self.servers[server_name]
server_type = config["type"]
# Disable button during operation
self.toggle_buttons[server_name]["state"] = "disabled"
self.toggle_buttons[server_name].config(text="Working...")
is_running = False
if server_type == "service":
is_running = self._get_service_status(config["service_name"])
elif server_type == "executable":
is_running = self._check_executable_status(config["path"])
if is_running:
self._stop_server_process(server_name)
else:
self._start_server_process(server_name)
# Re-enable button after operation (status update will set text)
# The _update_single_server_ui or _update_server_status will handle re-enabling
# based on the actual status, so no need to explicitly enable here.
# self.toggle_buttons[server_name]["state"] = "normal"
def _initial_status_check(self):
"""Performs an immediate status check on app startup."""
print("Performing initial server status check...")
# Call the general update, which now uses _update_single_server_ui for each
self._update_server_status(initial=True)
def _update_single_server_ui(self, server_name):
"""
Updates the UI elements (orb, label, button) for a single specified server.
"""
config = self.servers[server_name]
is_running = False
if config["type"] == "service":
is_running = self._get_service_status(config["service_name"])
elif config["type"] == "executable":
is_running = self._check_executable_status(config["path"])
button = self.toggle_buttons[server_name]
label = self.status_labels[server_name]
if is_running:
self._set_orb_color(server_name, "green")
label.config(text=f"{server_name} (Running)", foreground="green")
button.config(text="Stop")
else:
self._set_orb_color(server_name, "red")
label.config(text=f"{server_name} (Stopped)", foreground="red")
button.config(text="Start")
# Ensure button is enabled after update, unless it's in a "Working..." state
if button["text"] != "Working...": # Don't re-enable if it's explicitly disabled for an ongoing operation
button["state"] = "normal"
def _update_server_status(self, initial=False):
"""
Updates the status of all servers and their corresponding UI elements.
Called periodically.
"""
for server_name in self.servers.keys():
self._update_single_server_ui(server_name)
def _schedule_status_update(self):
"""Schedules the next call to _update_server_status."""
self.root.after(2000, self._update_server_status)
def _update_system_metrics(self):
"""
Fetches and updates the display for CPU, RAM, Disk, and Network usage.
This function is scheduled to run periodically.
"""
try:
# --- CPU Usage ---
cpu_percent = psutil.cpu_percent(interval=0.5)
self.cpu_label.config(text=f"CPU Usage: {cpu_percent:.1f}%")
# --- RAM Usage ---
ram = psutil.virtual_memory()
self.ram_label.config(text=f"RAM Usage: {ram.percent:.1f}% ({ram.used / (1024**3):.2f} GB / {ram.total / (1024**3):.2f} GB)")
# --- Disk Usage (C: and D:) ---
try:
c_disk = psutil.disk_usage('C:')
self.c_drive_label.config(text=f"C:\\ Usage: {c_disk.percent:.1f}% ({c_disk.used / (1024**3):.2f} GB / {c_disk.total / (1024**3):.2f} GB)")
except Exception:
self.c_drive_label.config(text="C:\\ Usage: N/A (Drive not found or accessible)")
try:
d_disk = psutil.disk_usage('D:')
self.d_drive_label.config(text=f"D:\\ Usage: {d_disk.percent:.1f}% ({d_disk.used / (1024**3):.2f} GB / {d_disk.total / (1024**3):.2f} GB)")
except Exception:
self.d_drive_label.config(text="D:\\ Usage: N/A (Drive not found or accessible)")
# --- Network Usage ---
current_net_io_counters = psutil.net_io_counters()
current_net_time = time.time()
time_diff = current_net_time - self.last_net_time
if time_diff > 0:
bytes_sent_diff = current_net_io_counters.bytes_sent - self.last_net_io_counters.bytes_sent
bytes_recv_diff = current_net_io_counters.bytes_recv - self.last_net_io_counters.bytes_recv
tx_speed_kbps = (bytes_sent_diff / time_diff) / 1024
rx_speed_kbps = (bytes_recv_diff / time_diff) / 1024
self.network_label.config(text=f"Network: Rx {rx_speed_kbps:.2f} KB/s, Tx {tx_speed_kbps:.2f} KB/s")
else:
self.network_label.config(text="Network: Calculating...")
self.last_net_io_counters = current_net_io_counters
self.last_net_time = current_net_time
except Exception as e:
print(f"Error updating system metrics: {e}")
self.cpu_label.config(text="CPU Usage: Error")
self.ram_label.config(text="RAM Usage: Error")
self.c_drive_label.config(text="C:\\ Usage: Error")
self.d_drive_label.config(text="D:\\ Usage: Error")
self.network_label.config(text="Network: Error")
# Schedule the next update for system metrics after 5000 milliseconds (5 seconds)
self.root.after(5000, self._update_system_metrics)
def _schedule_system_metrics_update(self):
"""Schedules the initial and subsequent calls to _update_system_metrics."""
self.root.after(0, self._update_system_metrics)
def on_closing(self):
"""Handles closing the application window."""
print("Closing application. Attempting to terminate any tracked game processes.")
for path, process_popen in list(self.running_game_processes.items()): # Iterate a copy
if process_popen.poll() is None: # Still running via Popen object
try:
# Get psutil.Process object from Popen's PID
proc_psutil = psutil.Process(process_popen.pid)
if proc_psutil.is_running():
proc_psutil.terminate()
proc_psutil.wait(timeout=2)
if proc_psutil.is_running():
proc_psutil.kill()
proc_psutil.wait(timeout=2)
print(f"Terminated process for: {path}")
except (psutil.NoSuchProcess, psutil.AccessDenied) as e:
print(f"Process for {path} already gone or access denied on close: {e}")
except Exception as e:
print(f"Error terminating process {path} on close: {e}")
# Remove from tracking regardless, as it's either terminated or already gone
if path in self.running_game_processes:
del self.running_game_processes[path]
self.root.destroy()
# --- Silent control helpers (no popups) ---
def _start_server_process_silent(self, server_name):
config = self.servers[server_name]
t = config["type"]
# already running? skip
if (t == "executable" and self._check_executable_status(config["path"])) or \
(t == "service" and self._get_service_status(config["service_name"])):
return
try:
if t == "service":
subprocess.Popen(config["start_command"], shell=True, creationflags=subprocess.CREATE_NO_WINDOW)
else:
full_command = [config["path"]] + config.get("args", [])
working_directory = os.path.dirname(config["path"])
p = subprocess.Popen(full_command, cwd=working_directory, creationflags=subprocess.CREATE_NO_WINDOW)
self.running_game_processes[config["path"]] = p
except Exception as e:
print(f"[AUTO] Start failed for {server_name}: {e}")
finally:
self.root.after(1500, lambda: self._update_single_server_ui(server_name))
def _stop_server_process_silent(self, server_name):
config = self.servers[server_name]
t = config["type"]
# already stopped? skip
if not ((t == "executable" and self._check_executable_status(config["path"])) or \
(t == "service" and self._get_service_status(config["service_name"]))):
return
try:
if t == "service":
subprocess.Popen(config["stop_command"], shell=True, creationflags=subprocess.CREATE_NO_WINDOW)
else:
path = config["path"]
target = None
# Prefer tracked Popen
if path in self.running_game_processes:
po = self.running_game_processes[path]
if po.poll() is None:
try:
target = psutil.Process(po.pid)
except psutil.NoSuchProcess:
pass
del self.running_game_processes[path]
# Fallback: locate by exe path
if target is None:
npath = os.path.normcase(path)
for proc in psutil.process_iter(['pid','exe']):
try:
if proc.info['exe'] and os.path.normcase(proc.info['exe']) == npath:
target = proc
break
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
continue
if target:
try:
if target.is_running():
target.terminate()
target.wait(timeout=60)
if target.is_running():
target.kill()
target.wait(timeout=60)
except Exception as e:
print(f"[AUTO] Stop error for {server_name}: {e}")
except Exception as e:
print(f"[AUTO] Stop failed for {server_name}: {e}")
finally:
self.root.after(1500, lambda: self._update_single_server_ui(server_name))
# --- Daily restart scheduler ---
def _schedule_daily_restart(self):
"""Schedule the next 4:00 AM local restart."""
from datetime import datetime, timedelta, time as dtime
now = datetime.now()
target = datetime.combine(now.date(), dtime(4, 0))
if now >= target:
target = target + timedelta(days=1)
delay_ms = int((target - now).total_seconds() * 1000)
print(f"[AUTO] Next daily restart scheduled at {target} (in {delay_ms/1000:.0f}s)")
self.root.after(delay_ms, self._run_daily_restart)
def _run_daily_restart(self):
"""Stop all, wait 5 minutes, then start with 1-minute gaps; reschedule tomorrow."""
stop_order = ["Authentication Server", "Server Manager", "Primal Desert", "Black Sands"]
start_order = ["Authentication Server", "Server Manager", "Primal Desert", "Black Sands"]
print("[AUTO] Daily restart: stopping services/processes...")
for name in stop_order:
self._stop_server_process_silent(name)
# after 5 minutes, begin staggered starts
self.root.after(5 * 60 * 1000, lambda: self._start_staggered(start_order, index=0))
# schedule next day right away so it persists even if app stays up
self._schedule_daily_restart()
def _start_staggered(self, names, index):
if index >= len(names):
print("[AUTO] Daily restart: completed staggered starts.")
return
name = names[index]
print(f"[AUTO] Starting '{name}' (step {index+1}/{len(names)})...")
self._start_server_process_silent(name)
# wait 1 minute, then start next
self.root.after(60 * 1000, lambda: self._start_staggered(names, index + 1))
if __name__ == "__main__":
# Check for psutil
try:
import psutil
except ImportError:
print("Error: 'psutil' library not found.")
print("Please install it using: pip install psutil")
print("Exiting.")
sys.exit()
root = tk.Tk()
app = ServerManagerApp(root)
root.mainloop()
Нет комментариев.
Операция выполнена успешно.