"""Simplified performance predictor for MLPerf configurations using XGBoost.""" import logging import random from collections import Counter, defaultdict import numpy as np import pandas as pd import xgboost as xgb from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score from sklearn.model_selection import train_test_split from utils import FEATURE_TYPES logger = logging.getLogger(__name__) class PerformancePredictor: """Predicts performance for hardware configurations.""" def __init__(self, dataset: pd.DataFrame, test_size: float = 0.2): """Initialize with benchmark dataset.""" self.df = dataset self.model = None self.target = "metrics.result_per_accelerator" self.features = [] self.test_size = test_size self.evaluation_data = pd.DataFrame() self.evaluation_metrics = {} self.feature_importance = pd.DataFrame(columns=["Feature", "Importance"]) self.excluded_features = { "model.name", "model.mlperf_name", "software.framework", "system.name", } self.excluded_features.update( { col for col in dataset.columns if col.startswith("submission.") or col.startswith("metrics.") } ) self.distributions = {} self.max_accelerators = int(dataset["system.accelerator.total_count"].max()) self.max_gpu_memory = float(dataset["system.accelerator.memory_capacity"].max()) self.max_cpu_memory = float(dataset["system.memory.capacity"].max()) self.frameworks = sorted( list( set( col.replace("software.framework.", "") for col in dataset.columns if col.startswith("software.framework.") and col != "software.framework" ) ) ) logger.info( f"Found {len(self.frameworks)} unique frameworks: {', '.join(self.frameworks)}" ) self._identify_features() self._analyze_data_distributions() self._train_model() def _identify_features(self): """Identify features for model training.""" all_columns = set(self.df.columns) available_features = all_columns - self.excluded_features - {self.target} self.features = [f for f in available_features if not self.df[f].isna().all()] logger.info(f"Identified {len(self.features)} features for model training") def _analyze_data_distributions(self): """Analyze feature distributions for realistic data generation.""" categorical_features = { col for col in self.df.columns if self.df[col].dtype == "object" or col in FEATURE_TYPES.get("categorical", []) } for feature in categorical_features: values = self.df[feature].dropna().tolist() if values: counter = Counter(values) total = sum(counter.values()) self.distributions[feature] = { value: count / total for value, count in counter.items() } continuous_features = { col for col in self.df.columns if col in FEATURE_TYPES.get("continuous", []) or pd.api.types.is_numeric_dtype(self.df[col].dtype) if col not in categorical_features and not col.startswith("metrics.") } for feature in continuous_features: values = self.df[feature].dropna() if len(values) > 0: self.distributions[feature] = { "min": float(values.min()), "max": float(values.max()), "mean": float(values.mean()), "std": float(values.std()), "median": float(values.median()), "values": sorted(values.unique().tolist()), } self._analyze_feature_relationships() logger.info(f"Analyzed distributions for {len(self.distributions)} features") def _analyze_feature_relationships(self): """Analyze relationships between related features.""" self._analyze_vendor_accelerator_relations() self._analyze_vendor_cpu_relations() self._analyze_accelerator_memory_relations() self._analyze_interconnect_relations() self._analyze_software_relations() self._analyze_device_counts() def _analyze_vendor_accelerator_relations(self): """Map vendors to their accelerators.""" vendor_accelerators = defaultdict(list) for _, row in self.df.iterrows(): vendor = row.get("system.accelerator.vendor") acc = row.get("system.accelerator.name") if vendor and acc: vendor_accelerators[vendor].append(acc) self.distributions["vendor_accelerators"] = {} for vendor, accelerators in vendor_accelerators.items(): counter = Counter(accelerators) total = sum(counter.values()) self.distributions["vendor_accelerators"][vendor] = { acc: count / total for acc, count in counter.items() } def _analyze_vendor_cpu_relations(self): """Map CPU vendors to their models.""" vendor_cpus = defaultdict(list) for _, row in self.df.iterrows(): vendor = row.get("system.cpu.vendor") model = row.get("system.cpu.model") if vendor and model: vendor_cpus[vendor].append(model) self.distributions["vendor_cpus"] = {} for vendor, models in vendor_cpus.items(): counter = Counter(models) total = sum(counter.values()) self.distributions["vendor_cpus"][vendor] = { model: count / total for model, count in counter.items() } def _analyze_accelerator_memory_relations(self): """Map accelerator models to memory capacities.""" acc_memory = defaultdict(list) for _, row in self.df.iterrows(): acc = row.get("system.accelerator.name") memory = row.get("system.accelerator.memory_capacity") if acc and memory: acc_memory[acc].append(memory) self.distributions["accelerator_memory"] = {} for acc, memories in acc_memory.items(): if memories: counter = Counter(memories) most_common = counter.most_common(1)[0][0] if counter else None self.distributions["accelerator_memory"][acc] = { "min": min(memories), "max": max(memories), "mean": sum(memories) / len(memories), "most_common": most_common, "values": sorted(set(memories)), } def _analyze_interconnect_relations(self): """Map vendors to interconnect types.""" vendor_interconnects = defaultdict(list) for _, row in self.df.iterrows(): vendor = row.get("system.accelerator.vendor") interconnect = row.get("system.interconnect.accelerator") if vendor and interconnect: vendor_interconnects[vendor].append(interconnect) self.distributions["vendor_interconnects"] = {} for vendor, interconnects in vendor_interconnects.items(): counter = Counter(interconnects) total = sum(counter.values()) self.distributions["vendor_interconnects"][vendor] = { ic: count / total for ic, count in counter.items() } def _analyze_software_relations(self): """Map vendors to software stacks.""" vendor_software = defaultdict(lambda: defaultdict(list)) for _, row in self.df.iterrows(): vendor = row.get("system.accelerator.vendor") if not vendor: continue os = row.get("software.operating_system") if os: vendor_software[vendor]["os"].append(os) for col in self.df.columns: if ( col.startswith("software.framework.") and col != "software.framework" ): framework = col.replace("software.framework.", "") version = row.get(col) if version: vendor_software[vendor][framework].append(version) self.distributions["vendor_software"] = {} for vendor, software_dict in vendor_software.items(): self.distributions["vendor_software"][vendor] = {} for software_type, values in software_dict.items(): counter = Counter(values) total = sum(counter.values()) self.distributions["vendor_software"][vendor][software_type] = { value: count / total for value, count in counter.items() } def _analyze_device_counts(self): """Analyze distribution of device counts.""" counts = self.df["system.accelerator.total_count"].dropna().astype(int).tolist() if counts: counter = Counter(counts) total = sum(counter.values()) self.distributions["device_count"] = { count: freq / total for count, freq in counter.items() } self.distributions["device_count_values"] = sorted(list(set(counts))) node_counts = self.df["system.number_of_nodes"].dropna().astype(int).tolist() if node_counts: counter = Counter(node_counts) total = sum(counter.values()) self.distributions["node_count"] = { count: freq / total for count, freq in counter.items() } self.distributions["node_count_values"] = sorted(list(set(node_counts))) device_node_pairs = [ ( int(row["system.number_of_nodes"]), int(row["system.accelerator.total_count"]), ) for _, row in self.df.iterrows() if row.get("system.number_of_nodes") and row.get("system.accelerator.total_count") ] node_to_devices = defaultdict(list) for nodes, devices in device_node_pairs: node_to_devices[nodes].append(devices) self.distributions["node_device_relation"] = {} for node_count, device_counts in node_to_devices.items(): counter = Counter(device_counts) total = sum(counter.values()) self.distributions["node_device_relation"][node_count] = { count: freq / total for count, freq in counter.items() } def _train_model(self): """Train XGBoost model on available data with train/test split.""" df_clean = self.df.dropna(subset=[self.target]) X = df_clean[self.features] y = df_clean[self.target] for col in X.select_dtypes(include=["object"]).columns: with pd.option_context("mode.chained_assignment", None): X[col] = X[col].astype("category") try: strat_column = df_clean["system.accelerator.name"].fillna("unknown") X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=self.test_size, stratify=strat_column, random_state=42 ) logger.info( f"Created stratified train/test split ({100 - self.test_size * 100:.0f}%/{self.test_size * 100:.0f}%) with {len(X_train)} training and {len(X_test)} test samples" ) except ValueError: X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=self.test_size, random_state=42 ) logger.info( f"Created regular train/test split with {len(X_train)} training and {len(X_test)} test samples" ) self.model = xgb.XGBRegressor( objective="reg:squarederror", n_estimators=100, max_depth=6, learning_rate=0.1, subsample=0.8, enable_categorical=True, ) self.model.fit(X_train, y_train) logger.info(f"Trained XGBoost model on {len(X_train)} rows") self._evaluate_model(X_test, y_test, df_clean.loc[X_test.index]) def _evaluate_model(self, X_test, y_test, test_df): """Evaluate model performance on test set.""" if X_test.empty: logger.warning("No test data available for evaluation") return y_pred = self.model.predict(X_test) rmse = np.sqrt(mean_squared_error(y_test, y_pred)) mae = mean_absolute_error(y_test, y_pred) r2 = r2_score(y_test, y_pred) mape = np.mean(np.abs((y_test - y_pred) / y_test)) * 100 self.evaluation_metrics = { "rmse": rmse, "mae": mae, "r2": r2, "mape": mape, "test_size": len(y_test), "training_size": len(self.df) - len(y_test), } eval_data = test_df[ [ "system.accelerator.name", "system.accelerator.vendor", "system.accelerator.total_count", ] ].copy() eval_data["actual"] = y_test eval_data["predicted"] = y_pred eval_data["error"] = y_pred - y_test eval_data["error_percent"] = (y_pred - y_test) / y_test * 100 self.evaluation_data = eval_data.copy() logger.info( f"Model evaluation - RMSE: {rmse:.2f}, MAE: {mae:.2f}, R²: {r2:.3f}, MAPE: {mape:.2f}%" ) logger.info( f"Evaluation data shape: {eval_data.shape}, with columns: {list(eval_data.columns)}" ) logger.info(f"Evaluation data sample: {eval_data.head(2).to_dict()}") logger.info( f"Evaluation data stored as class attribute with shape: {self.evaluation_data.shape}" ) importance = self.model.feature_importances_ feature_importance = pd.DataFrame( {"Feature": self.model.feature_names_in_, "Importance": importance} ).sort_values("Importance", ascending=False) self.feature_importance = feature_importance.head(10).copy() logger.info( f"Top 5 important features: {', '.join(self.feature_importance['Feature'].head(5).tolist())}" ) def get_evaluation_metrics(self) -> dict: """Return model evaluation metrics.""" logger.info(f"Getting evaluation metrics: {self.evaluation_metrics}") return self.evaluation_metrics.copy() if self.evaluation_metrics else {} def get_evaluation_data(self) -> pd.DataFrame: """Return evaluation data for visualization.""" data_shape = ( "empty" if self.evaluation_data.empty else self.evaluation_data.shape ) logger.info(f"Getting evaluation data with shape: {data_shape}") return self.evaluation_data.copy() if not self.evaluation_data.empty else None def get_feature_importance(self) -> pd.DataFrame: """Return feature importance data.""" logger.info( f"Getting feature importance with shape: {self.feature_importance.shape}" ) return ( self.feature_importance.copy() if not self.feature_importance.empty else pd.DataFrame(columns=["Feature", "Importance"]) ) def generate_predictions( self, architecture: str, parameters: float, constraints: dict = None, num_configs: int = 10, ) -> pd.DataFrame: """Generate and predict performance for hardware configurations.""" constraints = constraints or {} logger.info( f"Generating {num_configs} predictions for {architecture} model with {parameters}B parameters" ) configs = self._generate_configs( architecture, parameters, constraints, num_configs ) if not configs: return pd.DataFrame() configs_df = pd.DataFrame(configs) model_features = getattr(self.model, "feature_names_in_", self.features) for feature in model_features: if feature not in configs_df.columns: configs_df[feature] = None X_pred = configs_df[model_features] for col in X_pred.select_dtypes(include=["object"]).columns: with pd.option_context("mode.chained_assignment", None): X_pred[col] = X_pred[col].astype("category") configs_df[self.target] = self.model.predict(X_pred) configs_df["predicted"] = True configs_df["metrics.result"] = ( configs_df[self.target] * configs_df["system.accelerator.total_count"] ) configs_df["system.name"] = "Hypothetical system - ongoing work" logger.info( f"Performance range: {configs_df[self.target].min():.2f} - {configs_df[self.target].max():.2f} tokens/s per accelerator" ) return configs_df def _sample_from_distribution(self, distribution: dict) -> any: """Sample a value from a categorical distribution.""" items = list(distribution.keys()) probabilities = list(distribution.values()) return np.random.choice(items, p=probabilities) def _sample_continuous_value(self, feature: str) -> float: """Sample a continuous value from the feature distribution.""" dist = self.distributions[feature] if "values" in dist and dist["values"]: if len(dist["values"]) > 3: value = np.random.normal(dist["mean"], max(dist["std"], 1.0)) value = max(dist["min"], min(dist["max"], value)) closest_idx = min( range(len(dist["values"])), key=lambda i: abs(dist["values"][i] - value), ) return dist["values"][closest_idx] else: return random.choice(dist["values"]) elif all(k in dist for k in ["min", "max", "mean", "std"]): value = np.random.normal(dist["mean"], max(dist["std"], 1.0)) return max(dist["min"], min(dist["max"], value)) return np.random.uniform(dist["min"], dist["max"]) def _get_device_count(self, min_devices=None, max_devices=None) -> int: """Get a realistic device count based on distribution and constraints.""" valid_counts = [ count for count in self.distributions["device_count_values"] if (min_devices is None or count >= min_devices) and (max_devices is None or count <= max_devices) ] if valid_counts: probs = { count: self.distributions["device_count"][count] for count in valid_counts if count in self.distributions["device_count"] } if probs: total = sum(probs.values()) items = list(probs.keys()) weights = [probs[item] / total for item in items] return np.random.choice(items, p=weights) return random.choice(valid_counts) if min_devices is not None and max_devices is not None: valid_powers = [ 2**i for i in range(10) if min_devices <= 2**i <= max_devices ] if valid_powers: return random.choice(valid_powers) return random.randint(min_devices, max_devices) return random.choice([1, 2, 4, 8, 16]) def _get_vendor_accelerator(self, vendor=None) -> tuple: """Get a vendor and accelerator pair.""" if vendor is None or vendor == "Any": vendor = self._sample_from_distribution( self.distributions["system.accelerator.vendor"] ) if vendor in self.distributions["vendor_accelerators"]: accelerator = self._sample_from_distribution( self.distributions["vendor_accelerators"][vendor] ) else: accelerator = self._sample_from_distribution( self.distributions["system.accelerator.name"] ) return vendor, accelerator def _get_memory_for_accelerator( self, vendor: str, accelerator: str, min_memory=None, max_memory=None ) -> float: """Get appropriate memory capacity for a given accelerator.""" if accelerator in self.distributions["accelerator_memory"]: mem_dist = self.distributions["accelerator_memory"][accelerator] if "values" in mem_dist: valid_values = [ m for m in mem_dist["values"] if (min_memory is None or m >= min_memory) and (max_memory is None or m <= max_memory) ] if valid_values: return random.choice(valid_values) if "most_common" in mem_dist: most_common = mem_dist["most_common"] if (min_memory is None or most_common >= min_memory) and ( max_memory is None or most_common <= max_memory ): return most_common dist = self.distributions["system.accelerator.memory_capacity"] valid_values = [ m for m in dist["values"] if (min_memory is None or m >= min_memory) and (max_memory is None or m <= max_memory) ] if valid_values: return random.choice(valid_values) min_val = max(dist["min"], min_memory or dist["min"]) max_val = min(dist["max"], max_memory or dist["max"]) if min_val <= max_val: mean = min(max(dist["mean"], min_val), max_val) std = max(dist["std"], 1.0) for _ in range(5): value = np.random.normal(mean, std) if min_val <= value <= max_val: return value return np.random.uniform(min_val, max_val) return None def _get_node_config(self, total_devices: int) -> tuple: """Determine number of nodes and devices per node.""" VALID_GPUS_PER_NODE = [1, 2, 4, 8] for gpus_per_node in sorted(VALID_GPUS_PER_NODE, reverse=True): if total_devices % gpus_per_node == 0: return total_devices // gpus_per_node, gpus_per_node for gpus_per_node in sorted(VALID_GPUS_PER_NODE, reverse=True): if gpus_per_node <= total_devices: nodes = total_devices // gpus_per_node return nodes, gpus_per_node return 1, 1 def _get_cpu_config(self) -> dict: """Generate a CPU configuration.""" cpu_config = {} cpu_config["system.cpu.vendor"] = self._sample_from_distribution( self.distributions["system.cpu.vendor"] ) cpu_vendor = cpu_config["system.cpu.vendor"] if cpu_vendor in self.distributions["vendor_cpus"]: cpu_config["system.cpu.model"] = self._sample_from_distribution( self.distributions["vendor_cpus"][cpu_vendor] ) else: cpu_config["system.cpu.model"] = self._sample_from_distribution( self.distributions["system.cpu.model"] ) for feature in [ "system.cpu.core_count", "system.cpu.count_per_node", "system.cpu.frequency", ]: value = self._sample_continuous_value(feature) if value is not None: if feature in ["system.cpu.core_count", "system.cpu.count_per_node"]: value = int(value) cpu_config[feature] = value if "system.cpu.caches" in self.distributions: cpu_config["system.cpu.caches"] = self._sample_from_distribution( self.distributions["system.cpu.caches"] ) return cpu_config def _get_software_config(self, vendor: str, constraints=None) -> dict: """Generate a software configuration based on hardware vendor.""" constraints = constraints or {} software_config = {} if vendor in self.distributions["vendor_software"]: vendor_sw = self.distributions["vendor_software"][vendor] if "os" in vendor_sw: os_constraint = constraints.get("software.operating_system") if os_constraint and os_constraint != "Any": software_config["software.operating_system"] = os_constraint else: software_config["software.operating_system"] = ( self._sample_from_distribution(vendor_sw["os"]) ) for framework, versions in vendor_sw.items(): if framework != "os": framework_key = f"software.framework.{framework}" version_constraint = constraints.get(framework_key) if version_constraint and version_constraint != "Any": software_config[framework_key] = version_constraint else: software_config[framework_key] = self._sample_from_distribution( versions ) if ( "software.operating_system" not in software_config and "software.operating_system" in self.distributions ): os_constraint = constraints.get("software.operating_system") if os_constraint and os_constraint != "Any": software_config["software.operating_system"] = os_constraint else: software_config["software.operating_system"] = ( self._sample_from_distribution( self.distributions["software.operating_system"] ) ) for framework in self.frameworks: framework_key = f"software.framework.{framework}" if ( framework_key not in software_config and framework_key in self.distributions ): version_constraint = constraints.get(framework_key) if version_constraint and version_constraint != "Any": software_config[framework_key] = version_constraint else: software_config[framework_key] = self._sample_from_distribution( self.distributions[framework_key] ) return software_config def _get_memory_config(self, min_memory=None, max_memory=None) -> dict: """Generate a memory configuration.""" memory_config = {} dist = self.distributions["system.memory.capacity"] if "values" in dist: valid_values = [ m for m in dist["values"] if (min_memory is None or m >= min_memory) and (max_memory is None or m <= max_memory) ] if valid_values: memory_config["system.memory.capacity"] = random.choice(valid_values) if "system.memory.capacity" not in memory_config: min_val = max(dist["min"], min_memory or dist["min"]) max_val = min(dist["max"], max_memory or dist["max"]) if min_val <= max_val: mean = min(max(dist["mean"], min_val), max_val) std = max(dist["std"], (max_val - min_val) / 6.0) value = np.random.normal(mean, std) if min_val <= value <= max_val: memory_config["system.memory.capacity"] = value else: memory_config["system.memory.capacity"] = np.random.uniform( min_val, max_val ) if "system.memory.configuration" in self.distributions: memory_config["system.memory.configuration"] = ( self._sample_from_distribution( self.distributions["system.memory.configuration"] ) ) return memory_config def _get_interconnect_config(self, vendor: str) -> dict: """Generate interconnect configuration based on vendor.""" interconnect_config = {} if vendor in self.distributions["vendor_interconnects"]: interconnect_config["system.interconnect.accelerator"] = ( self._sample_from_distribution( self.distributions["vendor_interconnects"][vendor] ) ) elif "system.interconnect.accelerator" in self.distributions: interconnect_config["system.interconnect.accelerator"] = ( self._sample_from_distribution( self.distributions["system.interconnect.accelerator"] ) ) if "system.interconnect.accelerator_host" in self.distributions: interconnect_config["system.interconnect.accelerator_host"] = ( self._sample_from_distribution( self.distributions["system.interconnect.accelerator_host"] ) ) return interconnect_config def _generate_configs( self, architecture: str, parameters: float, constraints=None, count: int = 10 ) -> list: """Generate configurations respecting user constraints.""" constraints = constraints or {} configs = [] vendor = constraints.get("system.accelerator.vendor") acc_name = constraints.get("system.accelerator.name") def apply_margin(value, is_min=True): if value is None or not isinstance(value, (int, float)) or value == "Any": return None return value * (0.9 if is_min else 1.1) min_gpu_memory = apply_margin(constraints.get("min_gpu_memory"), is_min=True) max_gpu_memory = apply_margin( constraints.get("max_gpu_memory"), is_min=False ) or (self.max_gpu_memory * 1.1) min_cpu_memory = apply_margin(constraints.get("min_cpu_memory"), is_min=True) max_cpu_memory = apply_margin( constraints.get("max_cpu_memory"), is_min=False ) or (self.max_cpu_memory * 1.1) min_devices = apply_margin(constraints.get("min_accelerators"), is_min=True) max_devices = ( apply_margin(constraints.get("max_accelerators"), is_min=False) or self.max_accelerators ) interconnect = constraints.get("system.interconnect.accelerator") nodes = constraints.get("system.number_of_nodes") VALID_GPUS_PER_NODE = [1, 2, 4, 8] for _ in range(count * 3): if len(configs) >= count: break device_count = self._get_device_count(min_devices, max_devices) acc_vendor, acc_model = self._get_vendor_accelerator(vendor) if acc_name and acc_name != "Any": acc_model = acc_name if nodes and nodes != "Any": node_count = int(nodes) valid_device_counts = [] for gpus in VALID_GPUS_PER_NODE: if node_count * gpus >= ( min_devices or 1 ) and node_count * gpus <= (max_devices or float("inf")): valid_device_counts.append(gpus) if not valid_device_counts: continue devices_per_node = random.choice(valid_device_counts) device_count = node_count * devices_per_node else: valid_count = False for gpus_per_node in sorted(VALID_GPUS_PER_NODE, reverse=True): if device_count % gpus_per_node == 0: valid_count = True break if not valid_count: node_count, devices_per_node = self._get_node_config(device_count) device_count = node_count * devices_per_node else: node_count, devices_per_node = ( device_count // gpus_per_node, gpus_per_node, ) config = { "model.architecture": architecture, "model.number_of_parameters": parameters, "system.accelerator.vendor": acc_vendor, "system.accelerator.name": acc_model, "system.accelerator.total_count": device_count, "system.number_of_nodes": node_count, "system.accelerator.count_per_node": devices_per_node, } gpu_memory = self._get_memory_for_accelerator( acc_vendor, acc_model, min_memory=min_gpu_memory, max_memory=max_gpu_memory, ) if gpu_memory is None: continue config["system.accelerator.memory_capacity"] = gpu_memory if "system.accelerator.memory_config" in self.distributions: config["system.accelerator.memory_config"] = ( self._sample_from_distribution( self.distributions["system.accelerator.memory_config"] ) ) interconnect_config = self._get_interconnect_config(acc_vendor) if interconnect and interconnect != "Any": interconnect_config["system.interconnect.accelerator"] = interconnect config.update(interconnect_config) config.update(self._get_cpu_config()) memory_config = self._get_memory_config( min_memory=min_cpu_memory, max_memory=max_cpu_memory ) if "system.memory.capacity" not in memory_config: continue config.update(memory_config) for feature_name in [ "system.type", "system.cooling", "model.weight_data_types", ]: if feature_name in self.distributions: config[feature_name] = self._sample_from_distribution( self.distributions[feature_name] ) config.update(self._get_software_config(acc_vendor, constraints)) for key, value in constraints.items(): if ( not key.startswith("software.framework.") and key != "software.operating_system" and key not in [ "min_gpu_memory", "max_gpu_memory", "min_cpu_memory", "max_cpu_memory", "min_accelerators", "max_accelerators", ] and key not in config and value != "Any" and value is not None ): config[key] = value configs.append(config) return configs[:count]