from huggingface_hub import HfFileSystem import pandas as pd from utils import logger from datetime import datetime, timedelta import threading import traceback import json import re import random from typing import List, Tuple, Optional, Dict # NOTE: if caching is an issue, try adding `use_listings_cache=False` fs = HfFileSystem() IMPORTANT_MODELS = [ "auto", "bert", # old but dominant (encoder only) "gpt2", # old (decoder) "t5", # old (encoder-decoder) "modernbert", # (encoder only) "vit", # old (vision) - fixed comma "clip", # old but dominant (vision) "detr", # objection detection, segmentation (vision) "table_transformer", # objection detection (visioin) - maybe just detr? "got_ocr2", # ocr (vision) "whisper", # old but dominant (audio) "wav2vec2", # old (audio) "qwen2_audio", # (audio) "speech_t5", # (audio) "csm", # (audio) "llama", # new and dominant (meta) "gemma3", # new (google) "qwen2", # new (Alibaba) "mistral3", # new (Mistral) - added missing comma "qwen2_5_vl", # new (vision) "llava", # many models from it (vision) "smolvlm", # new (video) "internvl", # new (video) "gemma3n", # new (omnimodal models) "qwen2_5_omni", # new (omnimodal models) # "gpt_oss", # new (quite used) "qwen2_5_omni", # new (omnimodal models) ] KEYS_TO_KEEP = [ "success_amd", "success_nvidia", "skipped_amd", "skipped_nvidia", "failed_multi_no_amd", "failed_multi_no_nvidia", "failed_single_no_amd", "failed_single_no_nvidia", "failures_amd", "failures_nvidia", "job_link_amd", "job_link_nvidia", ] # ============================================================================ # HELPER FUNCTIONS # ============================================================================ def generate_fake_dates(num_days: int = 7) -> List[str]: """Generate fake dates for the last N days.""" today = datetime.now() return [(today - timedelta(days=i)).strftime("%Y-%m-%d") for i in range(num_days)] def parse_json_field(value) -> dict: """Safely parse a JSON field that might be a string or dict.""" if value is None or pd.isna(value): return {} if isinstance(value, str): try: return json.loads(value) except: return {} # Handle dict-like objects (including pandas Series/dict) if isinstance(value, dict): return value # Try to convert to dict if possible try: return dict(value) if hasattr(value, '__iter__') else {} except: return {} def extract_date_from_path(path: str, pattern: str) -> Optional[str]: """Extract date from file path using regex pattern.""" match = re.search(pattern, path) return match.group(1) if match else None def get_test_names(tests: list) -> set: """Extract test names from a list of test dictionaries.""" return {test.get('line', '') for test in tests} def safe_extract(row: pd.Series, key: str) -> int: """Safely extract an integer value from a DataFrame row.""" return int(row.get(key, 0)) if pd.notna(row.get(key, 0)) else 0 # ============================================================================ # DATA LOADING FUNCTIONS # ============================================================================ def log_dataframe_link(link: str) -> str: """ Adds the link to the dataset in the logs, modifies it to get a clockable link and then returns the date of the report. """ if link.startswith("sample_"): return "9999-99-99" logger.info(f"Reading df located at {link}") # Make sure the links starts with an http adress if link.startswith("hf://"): link = "https://huggingface.co/" + link.removeprefix("hf://") # Pattern to match transformers_daily_ci followed by any path, then a date (YYYY-MM-DD format) pattern = r'transformers_daily_ci(.*?)/(\d{4}-\d{2}-\d{2})' match = re.search(pattern, link) # Failure case: if not match: logger.error("Could not find transformers_daily_ci and.or date in the link") return "9999-99-99" # Replace the path between with blob/main path_between = match.group(1) link = link.replace("transformers_daily_ci" + path_between, "transformers_daily_ci/blob/main") logger.info(f"Link to data source: {link}") # Return the date return match.group(2) def infer_latest_update_msg(date_df_amd: str, date_df_nvidia: str) -> str: # Early return if one of the dates is invalid if date_df_amd.startswith("9999") and date_df_nvidia.startswith("9999"): return "could not find last update time" # Warn if dates are not the same if date_df_amd != date_df_nvidia: logger.warning(f"Different dates found: {date_df_amd} (AMD) vs {date_df_nvidia} (NVIDIA)") # Take the latest date and format it try: latest_date = max(date_df_amd, date_df_nvidia) yyyy, mm, dd = latest_date.split("-") return f"last updated {mm}/{dd}/{yyyy}" except Exception as e: logger.error(f"When trying to infer latest date, got error {e}") return "could not find last update time" def read_one_dataframe(json_path: str, device_label: str) -> tuple[pd.DataFrame, str]: df_upload_date = log_dataframe_link(json_path) df = pd.read_json(json_path, orient="index") df.index.name = "model_name" df[f"failed_multi_no_{device_label}"] = df["failures"].apply(lambda x: len(x["multi"]) if "multi" in x else 0) df[f"failed_single_no_{device_label}"] = df["failures"].apply(lambda x: len(x["single"]) if "single" in x else 0) return df, df_upload_date def get_available_dates() -> List[str]: """Get list of available dates from both AMD and NVIDIA datasets.""" try: # Get file lists amd_src = "hf://datasets/optimum-amd/transformers_daily_ci/**/runs/**/ci_results_run_models_gpu/model_results.json" nvidia_src = "hf://datasets/hf-internal-testing/transformers_daily_ci/*/ci_results_run_models_gpu/model_results.json" files_amd = sorted(fs.glob(amd_src, refresh=True), reverse=True) files_nvidia = sorted(fs.glob(nvidia_src, refresh=True), reverse=True) logger.info(f"Found {len(files_amd)} AMD files, {len(files_nvidia)} NVIDIA files") # Extract dates using patterns amd_pattern = r'transformers_daily_ci/(\d{4}-\d{2}-\d{2})/runs/[^/]+/ci_results_run_models_gpu/model_results\.json' nvidia_pattern = r'transformers_daily_ci/(\d{4}-\d{2}-\d{2})/ci_results_run_models_gpu/model_results\.json' amd_dates = {extract_date_from_path(f, amd_pattern) for f in files_amd} amd_dates.discard(None) # Remove None values nvidia_dates = {extract_date_from_path(f, nvidia_pattern) for f in files_nvidia} nvidia_dates.discard(None) logger.info(f"AMD dates: {sorted(amd_dates, reverse=True)[:5]}...") logger.info(f"NVIDIA dates: {sorted(nvidia_dates, reverse=True)[:5]}...") # Return intersection of both datasets common_dates = sorted(amd_dates.intersection(nvidia_dates), reverse=True) logger.info(f"Common dates: {len(common_dates)} dates where both AMD and NVIDIA have data") if common_dates: return common_dates[:30] # Limit to last 30 days # No real dates available - log warning and return empty list # This will allow the system to fall back to sample data properly logger.warning("No common dates found between AMD and NVIDIA datasets") return [] except Exception as e: logger.error(f"Error getting available dates: {e}") return [] def get_data_for_date(target_date: str) -> tuple[pd.DataFrame, str]: """Get data for a specific date.""" try: # For AMD, we need to find the specific run file for the date # AMD structure: YYYY-MM-DD/runs/{run_id}/ci_results_run_models_gpu/model_results.json amd_src = f"hf://datasets/optimum-amd/transformers_daily_ci/{target_date}/runs/*/ci_results_run_models_gpu/model_results.json" amd_files = fs.glob(amd_src, refresh=True) if not amd_files: raise FileNotFoundError(f"No AMD data found for date {target_date}") # Use the first (most recent) run for the date amd_file = amd_files[0] # Ensure the AMD file path has the hf:// prefix if not amd_file.startswith("hf://"): amd_file = f"hf://{amd_file}" # NVIDIA structure: YYYY-MM-DD/ci_results_run_models_gpu/model_results.json nvidia_src = f"hf://datasets/hf-internal-testing/transformers_daily_ci/{target_date}/ci_results_run_models_gpu/model_results.json" # Read dataframes - try each platform independently df_amd = pd.DataFrame() df_nvidia = pd.DataFrame() try: df_amd, _ = read_one_dataframe(amd_file, "amd") logger.info(f"Successfully loaded AMD data for {target_date}") except Exception as e: logger.warning(f"Failed to load AMD data for {target_date}: {e}") try: df_nvidia, _ = read_one_dataframe(nvidia_src, "nvidia") logger.info(f"Successfully loaded NVIDIA data for {target_date}") except Exception as e: logger.warning(f"Failed to load NVIDIA data for {target_date}: {e}") # If both failed, return empty dataframe if df_amd.empty and df_nvidia.empty: logger.warning(f"No data available for either platform on {target_date}") return pd.DataFrame(), target_date # Join both dataframes (outer join to include data from either platform) if not df_amd.empty and not df_nvidia.empty: joined = df_amd.join(df_nvidia, rsuffix="_nvidia", lsuffix="_amd", how="outer") elif not df_amd.empty: joined = df_amd.copy() else: joined = df_nvidia.copy() joined = joined[KEYS_TO_KEEP] joined.index = joined.index.str.replace("^models_", "", regex=True) # Filter out all but important models important_models_lower = [model.lower() for model in IMPORTANT_MODELS] filtered_joined = joined[joined.index.str.lower().isin(important_models_lower)] return filtered_joined, target_date except Exception as e: logger.error(f"Error getting data for date {target_date}: {e}") # Return empty dataframe instead of sample data for historical functionality return pd.DataFrame(), target_date def get_historical_data(start_date: str, end_date: str, sample_data = False) -> pd.DataFrame: """Get historical data for a date range.""" if sample_data: return get_fake_historical_data(start_date, end_date) try: start_dt = datetime.strptime(start_date, "%Y-%m-%d") end_dt = datetime.strptime(end_date, "%Y-%m-%d") historical_data = [] # Load data for each day in range current_dt = start_dt while current_dt <= end_dt: date_str = current_dt.strftime("%Y-%m-%d") try: df, _ = get_data_for_date(date_str) if not df.empty: df['date'] = date_str historical_data.append(df) logger.info(f"Loaded data for {date_str}") except Exception as e: logger.warning(f"Could not load data for {date_str}: {e}") current_dt += timedelta(days=1) return pd.concat(historical_data, ignore_index=False) if historical_data else pd.DataFrame() except Exception as e: logger.error(f"Error getting historical data: {e}") return get_fake_historical_data(start_date, end_date) def get_distant_data() -> tuple[pd.DataFrame, str]: # Retrieve AMD dataframe amd_src = "hf://datasets/optimum-amd/transformers_daily_ci/**/runs/**/ci_results_run_models_gpu/model_results.json" files_amd = sorted(fs.glob(amd_src, refresh=True), reverse=True) df_amd, date_df_amd = read_one_dataframe(f"hf://{files_amd[0]}", "amd") # Retrieve NVIDIA dataframe, which pattern should be: # hf://datasets/hf-internal-testing`/transformers_daily_ci/raw/main/YYYY-MM-DD/ci_results_run_models_gpu/model_results.json nvidia_src = "hf://datasets/hf-internal-testing/transformers_daily_ci/*/ci_results_run_models_gpu/model_results.json" files_nvidia = sorted(fs.glob(nvidia_src, refresh=True), reverse=True) # NOTE: should this be removeprefix instead of lstrip? nvidia_path = files_nvidia[0].lstrip('datasets/hf-internal-testing/transformers_daily_ci/') nvidia_path = "https://huggingface.co/datasets/hf-internal-testing/transformers_daily_ci/raw/main/" + nvidia_path df_nvidia, date_df_nvidia = read_one_dataframe(nvidia_path, "nvidia") # Infer and format the latest df date latest_update_msg = infer_latest_update_msg(date_df_amd, date_df_nvidia) # Join both dataframes joined = df_amd.join(df_nvidia, rsuffix="_nvidia", lsuffix="_amd", how="outer") joined = joined[KEYS_TO_KEEP] joined.index = joined.index.str.replace("^models_", "", regex=True) # Fitler out all but important models important_models_lower = [model.lower() for model in IMPORTANT_MODELS] filtered_joined = joined[joined.index.str.lower().isin(important_models_lower)] # Warn for ach missing important models for model in IMPORTANT_MODELS: if model not in filtered_joined.index: print(f"[WARNING] Model {model} was missing from index.") return filtered_joined, latest_update_msg def get_sample_data() -> tuple[pd.DataFrame, str]: # Retrieve sample dataframes df_amd, _ = read_one_dataframe("sample_amd.json", "amd") df_nvidia, _ = read_one_dataframe("sample_nvidia.json", "nvidia") # Join both dataframes joined = df_amd.join(df_nvidia, rsuffix="_nvidia", lsuffix="_amd", how="outer") joined = joined[KEYS_TO_KEEP] joined.index = joined.index.str.replace("^models_", "", regex=True) # Fitler out all but important models important_models_lower = [model.lower() for model in IMPORTANT_MODELS] filtered_joined = joined[joined.index.str.lower().isin(important_models_lower)] # Prefix all model names with "sample_" filtered_joined.index = "sample_" + filtered_joined.index return filtered_joined, "sample data was loaded" def get_fake_historical_data(start_date: str, end_date: str) -> pd.DataFrame: """Generate fake historical data for a date range when real data loading fails.""" try: start_dt = datetime.strptime(start_date, "%Y-%m-%d") end_dt = datetime.strptime(end_date, "%Y-%m-%d") sample_df, _ = get_sample_data() historical_data = [] # Generate data for each date current_dt = start_dt while current_dt <= end_dt: date_df = sample_df.copy() date_df['date'] = current_dt.strftime("%Y-%m-%d") # Add random variations to make it realistic for idx in date_df.index: # Vary success/skipped counts (±20%) for col in ['success_amd', 'success_nvidia', 'skipped_amd', 'skipped_nvidia']: if col in date_df.columns and pd.notna(date_df.loc[idx, col]): val = date_df.loc[idx, col] if val > 0: date_df.loc[idx, col] = max(0, int(val * random.uniform(0.8, 1.2))) # Vary failure counts more dramatically (±50-100%) for col in ['failed_multi_no_amd', 'failed_multi_no_nvidia', 'failed_single_no_amd', 'failed_single_no_nvidia']: if col in date_df.columns and pd.notna(date_df.loc[idx, col]): val = date_df.loc[idx, col] date_df.loc[idx, col] = max(0, int(val * random.uniform(0.5, 2.0))) historical_data.append(date_df) current_dt += timedelta(days=1) if not historical_data: return pd.DataFrame() combined_df = pd.concat(historical_data, ignore_index=False) logger.info(f"Generated fake historical data: {len(combined_df)} records from {start_date} to {end_date}") return combined_df except Exception as e: logger.error(f"Error generating fake historical data: {e}") return pd.DataFrame() def find_failure_first_seen(historical_df: pd.DataFrame, model_name: str, test_name: str, device: str, gpu_type: str) -> Optional[str]: """Find the first date when a specific test failure appeared in historical data.""" if historical_df is None or historical_df.empty: return None try: model_name_lower = model_name.lower() # Filter by model name (case-insensitive) model_data = historical_df[historical_df.index.str.lower() == model_name_lower].copy() if model_data.empty: return None # Ensure we have a 'date' column if 'date' not in model_data.columns: return None # Check each date (oldest first) for this failure for _, row in model_data.sort_values('date').iterrows(): failures_raw = row.get(f'failures_{device}') if failures_raw is None or pd.isna(failures_raw): continue # Parse failures (could be dict, string, or already parsed) failures = parse_json_field(failures_raw) if not isinstance(failures, dict) or gpu_type not in failures: continue # Check each test in this gpu_type for test in failures.get(gpu_type, []): if isinstance(test, dict) and test.get('line', '') == test_name: date_value = row.get('date') return date_value if date_value else None return None except Exception as e: logger.error(f"Error finding first seen date for {test_name}: {e}") return None def _find_device_regressions(model_name: str, current_failures: dict, yesterday_failures: dict, device: str) -> list[dict]: """Helper to find regressions for a specific device.""" regressions = [] for gpu_type in ['single', 'multi']: current_tests = get_test_names(current_failures.get(gpu_type, [])) yesterday_tests = get_test_names(yesterday_failures.get(gpu_type, [])) # Find NEW failures: failing NOW but NOT yesterday new_tests = current_tests - yesterday_tests for test_name in new_tests: if test_name: # Skip empty names regressions.append({ 'model': model_name, 'test': test_name.split('::')[-1], # Short name 'test_full': test_name, # Full name 'device': device, 'gpu_type': gpu_type }) return regressions def find_new_regressions(current_df: pd.DataFrame, historical_df: pd.DataFrame) -> list[dict]: """Compare current failures against previous day's failures to find new regressions.""" if current_df.empty or historical_df.empty: return [] # Get yesterday's data available_dates = sorted(historical_df['date'].unique(), reverse=True) if not available_dates: return [] yesterday_data = historical_df[historical_df['date'] == available_dates[0]] new_regressions = [] # For each model, compare current vs yesterday for model_name in current_df.index: current_row = current_df.loc[model_name] yesterday_row = yesterday_data[yesterday_data.index == model_name.lower()] # Parse current failures current_amd = parse_json_field(current_row.get('failures_amd', {})) current_nvidia = parse_json_field(current_row.get('failures_nvidia', {})) # Parse yesterday failures yesterday_amd = {} yesterday_nvidia = {} if not yesterday_row.empty: yesterday_row = yesterday_row.iloc[0] yesterday_amd = parse_json_field(yesterday_row.get('failures_amd', {})) yesterday_nvidia = parse_json_field(yesterday_row.get('failures_nvidia', {})) # Find regressions for both devices new_regressions.extend(_find_device_regressions(model_name, current_amd, yesterday_amd, 'amd')) new_regressions.extend(_find_device_regressions(model_name, current_nvidia, yesterday_nvidia, 'nvidia')) return new_regressions def extract_model_data(row: pd.Series) -> tuple[dict[str, int], dict[str, int], int, int, int, int]: """Extract and process model data from DataFrame row.""" # Extract all counts counts = {key: safe_extract(row, key) for key in [ 'success_amd', 'success_nvidia', 'skipped_amd', 'skipped_nvidia', 'failed_multi_no_amd', 'failed_multi_no_nvidia', 'failed_single_no_amd', 'failed_single_no_nvidia' ]} # Create stats dictionaries amd_stats = { 'passed': counts['success_amd'], 'failed': counts['failed_multi_no_amd'] + counts['failed_single_no_amd'], 'skipped': counts['skipped_amd'], 'error': 0 } nvidia_stats = { 'passed': counts['success_nvidia'], 'failed': counts['failed_multi_no_nvidia'] + counts['failed_single_no_nvidia'], 'skipped': counts['skipped_nvidia'], 'error': 0 } return (amd_stats, nvidia_stats, counts['failed_multi_no_amd'], counts['failed_single_no_amd'], counts['failed_multi_no_nvidia'], counts['failed_single_no_nvidia']) class CIResults: def __init__(self): self.df = pd.DataFrame() self.available_models = [] self.latest_update_msg = "" self.available_dates = [] self.historical_df = pd.DataFrame() self.all_historical_data = pd.DataFrame() # Store all historical data at startup self.sample_data = False def load_data(self) -> None: """Load data from the data source.""" # Try loading the distant data, and fall back on sample data for local tinkering try: logger.info("Loading distant data...") new_df, latest_update_msg = get_distant_data() self.latest_update_msg = latest_update_msg self.available_dates = get_available_dates() logger.info(f"Available dates: {len(self.available_dates)} dates") if self.available_dates: logger.info(f"Date range: {self.available_dates[-1]} to {self.available_dates[0]}") else: logger.warning("No available dates found") self.available_dates = [] except Exception as e: error_msg = [ "Loading data failed:", "-" * 120, traceback.format_exc(), "-" * 120, "Falling back on sample data." ] logger.error("\n".join(error_msg)) self.sample_data = True new_df, latest_update_msg = get_sample_data() self.latest_update_msg = latest_update_msg # Generate fake dates for sample data historical functionality self.available_dates = generate_fake_dates() # Update attributes self.df = new_df self.available_models = new_df.index.tolist() # Load all historical data at startup self.load_all_historical_data() # Log and return distant load status logger.info(f"Data loaded successfully: {len(self.available_models)} models") logger.info(f"Models: {self.available_models[:5]}{'...' if len(self.available_models) > 5 else ''}") logger.info(f"Latest update message: {self.latest_update_msg}") # Log a preview of the df msg = {} for model in self.available_models[:3]: msg[model] = {} for col in self.df.columns: value = self.df.loc[model, col] if not isinstance(value, int): value = str(value) if len(value) > 10: value = value[:10] + "..." msg[model][col] = value logger.info(json.dumps(msg, indent=4)) def load_all_historical_data(self) -> None: """Load all available historical data at startup.""" try: if not self.available_dates: logger.warning("No available dates found, skipping historical data load") self.all_historical_data = pd.DataFrame() return logger.info(f"Loading all historical data for {len(self.available_dates)} dates...") start_date, end_date = self.available_dates[-1], self.available_dates[0] self.all_historical_data = get_historical_data(start_date, end_date, self.sample_data) logger.info(f"All historical data loaded: {len(self.all_historical_data)} records") except Exception as e: logger.error(f"Error loading all historical data: {e}") self.all_historical_data = pd.DataFrame() def load_historical_data(self, start_date: str, end_date: str) -> None: """Load historical data for a date range from pre-loaded data.""" try: logger.info(f"Filtering historical data from {start_date} to {end_date}") if self.all_historical_data.empty: logger.warning("No pre-loaded historical data available") self.historical_df = pd.DataFrame() return # Filter by date range start_dt = datetime.strptime(start_date, "%Y-%m-%d") end_dt = datetime.strptime(end_date, "%Y-%m-%d") filtered_data = [ self.all_historical_data[self.all_historical_data['date'] == date_str] for date_str in self.all_historical_data['date'].unique() if start_dt <= datetime.strptime(date_str, "%Y-%m-%d") <= end_dt ] if filtered_data: self.historical_df = pd.concat(filtered_data, ignore_index=False) logger.info(f"Historical data filtered: {len(self.historical_df)} records for {start_date} to {end_date}") else: self.historical_df = pd.DataFrame() logger.warning(f"No historical data found for date range {start_date} to {end_date}") except Exception as e: logger.error(f"Error filtering historical data: {e}") self.historical_df = pd.DataFrame() def schedule_data_reload(self): """Schedule the next data reload.""" def reload_data(): self.load_data() # Schedule the next reload in 15 minutes (900 seconds) timer = threading.Timer(900.0, reload_data) timer.daemon = True # Dies when main thread dies timer.start() logger.info("Next data reload scheduled in 15 minutes") # Start the first reload timer timer = threading.Timer(900.0, reload_data) timer.daemon = True timer.start() logger.info("Data auto-reload scheduled every 15 minutes")