gpu_monitoring_system / gpu_fan_controller.py
meccatronis's picture
Upload gpu_fan_controller.py with huggingface_hub
1fd6502 verified
#!/usr/bin/env python3
"""
Advanced GPU Fan Controller
Provides sophisticated fan control with multiple profiles, safety features,
and comprehensive logging. Supports temperature-based curves, manual override,
and automatic fallback modes.
"""
import time
import os
import sys
import json
import logging
import signal
import argparse
from typing import Dict, List, Optional, Callable
from dataclasses import dataclass, asdict
from enum import Enum
import threading
from pathlib import Path
from gpu_monitoring import GPUManager, GPUStatus
logger = logging.getLogger(__name__)
class FanMode(Enum):
"""Fan control modes."""
AUTO = "auto"
MANUAL = "manual"
OFF = "off"
EMERGENCY = "emergency"
class ProfileType(Enum):
"""Types of fan control profiles."""
SILENT = "silent"
BALANCED = "balanced"
PERFORMANCE = "performance"
CUSTOM = "custom"
@dataclass
class FanProfile:
"""Fan control profile configuration."""
name: str
profile_type: ProfileType
description: str
curve: Dict[str, float]
safety: Dict[str, float]
enabled: bool = True
def __post_init__(self):
# Validate curve parameters
required_curve_keys = ['min_temp', 'max_temp', 'min_pwm', 'max_pwm']
for key in required_curve_keys:
if key not in self.curve:
raise ValueError(f"Missing required curve parameter: {key}")
# Validate safety parameters
required_safety_keys = ['emergency_temp', 'emergency_pwm', 'max_fan_time']
for key in required_safety_keys:
if key not in self.safety:
raise ValueError(f"Missing required safety parameter: {key}")
@dataclass
class FanStatus:
"""Current fan status."""
mode: FanMode
profile: str
current_pwm: int
target_pwm: int
temperature: float
last_update: float
manual_override: bool = False
emergency_mode: bool = False
class FanController:
"""Advanced GPU fan controller with multiple profiles and safety features."""
def __init__(self, config_file: str = "config/fan_profiles.json"):
self.config_file = config_file
self.profiles = {}
self.current_profile = None
self.current_mode = FanMode.AUTO
self.manual_pwm = 0
self.running = False
self.lock = threading.Lock()
# GPU management
self.gpu_manager = GPUManager()
self.gpu_name = None
# Status tracking
self.status = None
self.last_status_update = 0
# Safety features
self.emergency_temp = 85.0
self.emergency_pwm = 255
self.max_fan_time = 300 # 5 minutes
self.fan_on_time = 0
# Configuration
self.update_interval = 2.0
self.log_interval = 30.0
self.last_log_time = 0
# Callbacks
self.status_callbacks = []
# Load configuration
self.load_profiles()
def load_profiles(self):
"""Load fan control profiles from configuration file."""
try:
if os.path.exists(self.config_file):
with open(self.config_file, 'r') as f:
config_data = json.load(f)
for profile_name, profile_data in config_data.items():
profile = FanProfile(
name=profile_data['name'],
profile_type=ProfileType(profile_data['profile_type']),
description=profile_data['description'],
curve=profile_data['curve'],
safety=profile_data['safety'],
enabled=profile_data.get('enabled', True)
)
self.profiles[profile_name] = profile
logger.info(f"Loaded {len(self.profiles)} fan profiles")
# Set default profile
if self.profiles:
default_profile = next(iter(self.profiles.values()))
self.set_profile(default_profile.name)
logger.info(f"Set default profile: {default_profile.name}")
else:
# Create default profiles
self.create_default_profiles()
self.save_profiles()
except Exception as e:
logger.error(f"Error loading profiles: {e}")
self.create_default_profiles()
def create_default_profiles(self):
"""Create default fan control profiles."""
self.profiles = {
"silent": FanProfile(
name="Silent",
profile_type=ProfileType.SILENT,
description="Quiet operation with lower fan speeds",
curve={
"min_temp": 40.0,
"max_temp": 65.0,
"min_pwm": 120,
"max_pwm": 220
},
safety={
"emergency_temp": 85.0,
"emergency_pwm": 255,
"max_fan_time": 300
}
),
"balanced": FanProfile(
name="Balanced",
profile_type=ProfileType.BALANCED,
description="Balanced performance and noise",
curve={
"min_temp": 38.0,
"max_temp": 60.0,
"min_pwm": 155,
"max_pwm": 255
},
safety={
"emergency_temp": 80.0,
"emergency_pwm": 255,
"max_fan_time": 300
}
),
"performance": FanProfile(
name="Performance",
profile_type=ProfileType.PERFORMANCE,
description="Maximum cooling for high performance",
curve={
"min_temp": 35.0,
"max_temp": 55.0,
"min_pwm": 180,
"max_pwm": 255
},
safety={
"emergency_temp": 75.0,
"emergency_pwm": 255,
"max_fan_time": 300
}
)
}
logger.info("Created default fan profiles")
def save_profiles(self):
"""Save current profiles to configuration file."""
try:
os.makedirs(os.path.dirname(self.config_file), exist_ok=True)
config_data = {}
for name, profile in self.profiles.items():
config_data[name] = {
'name': profile.name,
'profile_type': profile.profile_type.value,
'description': profile.description,
'curve': profile.curve,
'safety': profile.safety,
'enabled': profile.enabled
}
with open(self.config_file, 'w') as f:
json.dump(config_data, f, indent=2)
logger.info("Saved fan profiles to configuration file")
except Exception as e:
logger.error(f"Error saving profiles: {e}")
def initialize(self) -> bool:
"""Initialize the fan controller."""
logger.info("Initializing fan controller...")
# Initialize GPU manager
if not self.gpu_manager.initialize():
logger.error("Failed to initialize GPU manager")
return False
# Get first GPU
gpus = self.gpu_manager.get_gpu_list()
if not gpus:
logger.error("No GPUs detected")
return False
self.gpu_name = gpus[0]
logger.info(f"Using GPU: {self.gpu_name}")
# Check permissions
if not self.check_permissions():
logger.error("Insufficient permissions for fan control")
return False
# Initialize fan
self.set_fan_mode(FanMode.AUTO)
self.set_pwm(0)
logger.info("Fan controller initialized successfully")
return True
def check_permissions(self) -> bool:
"""Check if we have write permissions to fan control files."""
try:
gpu_info = self.gpu_manager.get_gpu_info(self.gpu_name)
if not gpu_info:
return False
hwmon_path = gpu_info[0]['hwmon_path']
pwm_file = os.path.join(hwmon_path, "pwm1")
pwm_enable = os.path.join(hwmon_path, "pwm1_enable")
# Test write permissions
with open(pwm_enable, 'w') as f:
f.write('1')
with open(pwm_file, 'w') as f:
f.write('0')
return True
except Exception as e:
logger.debug(f"Permission check failed: {e}")
return False
def set_profile(self, profile_name: str) -> bool:
"""Set the current fan control profile."""
with self.lock:
if profile_name not in self.profiles:
logger.error(f"Profile '{profile_name}' not found")
return False
profile = self.profiles[profile_name]
if not profile.enabled:
logger.error(f"Profile '{profile_name}' is disabled")
return False
self.current_profile = profile
logger.info(f"Switched to profile: {profile.name}")
return True
def set_mode(self, mode: FanMode):
"""Set the fan control mode."""
with self.lock:
self.current_mode = mode
logger.info(f"Set fan mode to: {mode.value}")
def set_manual_pwm(self, pwm: int):
"""Set manual PWM value (0-255)."""
with self.lock:
pwm = max(0, min(255, pwm)) # Clamp to valid range
self.manual_pwm = pwm
self.set_mode(FanMode.MANUAL)
logger.info(f"Set manual PWM to: {pwm}")
def set_fan_mode(self, mode: FanMode):
"""Set fan mode and enable/disable fan control."""
try:
gpu_info = self.gpu_manager.get_gpu_info(self.gpu_name)
if not gpu_info:
return False
hwmon_path = gpu_info[0]['hwmon_path']
fan_enable = os.path.join(hwmon_path, "fan1_enable")
pwm_enable = os.path.join(hwmon_path, "pwm1_enable")
if mode == FanMode.OFF:
with open(fan_enable, 'w') as f:
f.write('0')
with open(pwm_enable, 'w') as f:
f.write('0')
else:
with open(fan_enable, 'w') as f:
f.write('1')
with open(pwm_enable, 'w') as f:
f.write('1')
return True
except Exception as e:
logger.error(f"Error setting fan mode: {e}")
return False
def set_pwm(self, pwm: int):
"""Set PWM value (0-255)."""
try:
gpu_info = self.gpu_manager.get_gpu_info(self.gpu_name)
if not gpu_info:
return False
hwmon_path = gpu_info[0]['hwmon_path']
pwm_file = os.path.join(hwmon_path, "pwm1")
pwm = max(0, min(255, pwm)) # Clamp to valid range
with open(pwm_file, 'w') as f:
f.write(str(int(pwm)))
return True
except Exception as e:
logger.error(f"Error setting PWM: {e}")
return False
def calculate_target_pwm(self, temperature: float) -> int:
"""Calculate target PWM based on temperature and current profile."""
if not self.current_profile:
return 0
curve = self.current_profile.curve
safety = self.current_profile.safety
# Emergency temperature handling
if temperature >= safety['emergency_temp']:
return int(safety['emergency_pwm'])
# Temperature-based curve calculation
min_temp = curve['min_temp']
max_temp = curve['max_temp']
min_pwm = curve['min_pwm']
max_pwm = curve['max_pwm']
if temperature <= min_temp:
return int(min_pwm)
elif temperature >= max_temp:
return int(max_pwm)
else:
# Linear interpolation
temp_range = max_temp - min_temp
pwm_range = max_pwm - min_pwm
return int(min_pwm + ((temperature - min_temp) / temp_range) * pwm_range)
def check_safety_limits(self, temperature: float, current_pwm: int) -> bool:
"""Check if safety limits are exceeded."""
if not self.current_profile:
return False
safety = self.current_profile.safety
# Emergency temperature check
if temperature >= safety['emergency_temp']:
return True
# Maximum fan time check
if current_pwm >= 250: # High fan speed threshold
self.fan_on_time += self.update_interval
if self.fan_on_time >= safety['max_fan_time']:
logger.warning(f"Fan has been at high speed for {safety['max_fan_time']} seconds")
return True
else:
self.fan_on_time = 0
return False
def update_fan_control(self):
"""Update fan control based on current conditions."""
try:
# Get current GPU status
status_dict = self.gpu_manager.get_status(self.gpu_name)
gpu_status = status_dict.get(self.gpu_name)
if not gpu_status:
logger.warning("Failed to get GPU status")
return False
temperature = gpu_status.temperature
current_time = time.time()
# Calculate target PWM
target_pwm = 0
emergency_mode = False
with self.lock:
if self.current_mode == FanMode.MANUAL:
target_pwm = self.manual_pwm
elif self.current_mode == FanMode.OFF:
target_pwm = 0
else: # AUTO mode
target_pwm = self.calculate_target_pwm(temperature)
# Check safety limits
if self.check_safety_limits(temperature, target_pwm):
target_pwm = int(self.current_profile.safety['emergency_pwm'])
emergency_mode = True
self.current_mode = FanMode.EMERGENCY
# Apply PWM
if self.set_pwm(target_pwm):
# Update status
self.status = FanStatus(
mode=self.current_mode,
profile=self.current_profile.name if self.current_profile else "unknown",
current_pwm=target_pwm,
target_pwm=target_pwm,
temperature=temperature,
last_update=current_time,
manual_override=(self.current_mode == FanMode.MANUAL),
emergency_mode=emergency_mode
)
# Log status periodically
if current_time - self.last_log_time >= self.log_interval:
pwm_percent = int(target_pwm * 100 / 255)
logger.info(f"Temp: {temperature:.1f}°C | PWM: {target_pwm} ({pwm_percent}%) | Mode: {self.current_mode.value}")
self.last_log_time = current_time
# Notify callbacks
self._notify_status_callbacks()
return True
except Exception as e:
logger.error(f"Error updating fan control: {e}")
return False
def add_status_callback(self, callback: Callable[[FanStatus], None]):
"""Add a callback function to be called when status updates."""
self.status_callbacks.append(callback)
def _notify_status_callbacks(self):
"""Notify all registered status callbacks."""
if self.status:
for callback in self.status_callbacks:
try:
callback(self.status)
except Exception as e:
logger.error(f"Error in status callback: {e}")
def run(self):
"""Main control loop."""
logger.info("Starting fan controller...")
self.running = True
try:
while self.running:
self.update_fan_control()
time.sleep(self.update_interval)
except KeyboardInterrupt:
logger.info("Stopping fan controller...")
self.running = False
except Exception as e:
logger.error(f"Fatal error in fan controller: {e}")
self.running = False
def stop(self):
"""Stop the fan controller."""
logger.info("Stopping fan controller...")
self.running = False
# Set fan to safe state
self.set_mode(FanMode.OFF)
self.set_pwm(0)
def get_status(self) -> Optional[FanStatus]:
"""Get current fan status."""
return self.status
def get_profiles(self) -> Dict[str, FanProfile]:
"""Get all available profiles."""
return self.profiles.copy()
def add_profile(self, profile: FanProfile):
"""Add a new fan profile."""
with self.lock:
self.profiles[profile.name] = profile
self.save_profiles()
logger.info(f"Added profile: {profile.name}")
def remove_profile(self, profile_name: str):
"""Remove a fan profile."""
with self.lock:
if profile_name in self.profiles:
del self.profiles[profile_name]
self.save_profiles()
logger.info(f"Removed profile: {profile_name}")
class FanControllerCLI:
"""Command-line interface for fan controller."""
def __init__(self):
self.controller = None
def setup_logging(self, log_level: str):
"""Setup logging configuration."""
numeric_level = getattr(logging, log_level.upper(), logging.INFO)
logging.basicConfig(
level=numeric_level,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('/var/log/gpu_fan_control.log'),
logging.StreamHandler(sys.stdout)
]
)
def run(self):
"""Run the fan controller with command-line arguments."""
parser = argparse.ArgumentParser(description='Advanced GPU Fan Controller')
parser.add_argument('--profile', type=str, help='Fan profile to use')
parser.add_argument('--manual-pwm', type=int, choices=range(0, 256), help='Manual PWM value (0-255)')
parser.add_argument('--config', type=str, default='config/fan_profiles.json', help='Configuration file path')
parser.add_argument('--log-level', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'], default='INFO', help='Log level')
parser.add_argument('--list-profiles', action='store_true', help='List available profiles')
parser.add_argument('--daemon', action='store_true', help='Run as daemon')
args = parser.parse_args()
# Setup logging
self.setup_logging(args.log_level)
# Initialize controller
self.controller = FanController(args.config)
if args.list_profiles:
self.list_profiles()
return
if not self.controller.initialize():
logger.error("Failed to initialize fan controller")
sys.exit(1)
# Apply command-line settings
if args.profile:
if not self.controller.set_profile(args.profile):
logger.error(f"Failed to set profile: {args.profile}")
sys.exit(1)
if args.manual_pwm is not None:
self.controller.set_manual_pwm(args.manual_pwm)
# Setup signal handlers
signal.signal(signal.SIGINT, self.signal_handler)
signal.signal(signal.SIGTERM, self.signal_handler)
# Run controller
if args.daemon:
logger.info("Running as daemon...")
self.controller.run()
else:
try:
self.controller.run()
except KeyboardInterrupt:
logger.info("Received interrupt signal")
def list_profiles(self):
"""List available fan profiles."""
controller = FanController()
controller.load_profiles()
print("Available fan profiles:")
for name, profile in controller.profiles.items():
status = "✓" if profile.enabled else "✗"
print(f" {status} {name}: {profile.description}")
def signal_handler(self, signum, frame):
"""Handle shutdown signals."""
logger.info(f"Received signal {signum}, shutting down...")
if self.controller:
self.controller.stop()
sys.exit(0)
if __name__ == "__main__":
cli = FanControllerCLI()
cli.run()