Spaces:
Running
Running
| import ast | |
| import logging | |
| import re | |
| from typing import Dict, List, Optional, Tuple | |
| import gradio as gr | |
| import matplotlib.pyplot as plt | |
| import numpy as np | |
| import pandas as pd | |
| from datasets import load_dataset | |
| from sklearn.ensemble import RandomForestRegressor | |
| from sklearn.impute import SimpleImputer | |
| from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score | |
| from sklearn.model_selection import train_test_split | |
| from sklearn.pipeline import Pipeline | |
| from sklearn.preprocessing import StandardScaler | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| APP_TITLE = "CNOT Count Regression" | |
| APP_SUBTITLE = "Predict the number of CNOT gates (cx_count) from circuit topology and gate structure." | |
| REPO_CONFIG = { | |
| "Core (Clean)": "QSBench/QSBench-Core-v1.0.0-demo", | |
| "Depolarizing Noise": "QSBench/QSBench-Depolarizing-Demo-v1.0.0", | |
| "Amplitude Damping": "QSBench/QSBench-Amplitude-v1.0.0-demo", | |
| "Transpilation (10q)": "QSBench/QSBench-Transpilation-v1.0.0-demo", | |
| } | |
| TARGET_COL = "cx_count" | |
| NON_FEATURE_COLS = { | |
| "sample_id", | |
| "sample_seed", | |
| "circuit_hash", | |
| "split", | |
| "circuit_qasm", | |
| "qasm_raw", | |
| "qasm_transpiled", | |
| "circuit_type_resolved", | |
| "circuit_type_requested", | |
| "noise_type", | |
| "noise_prob", | |
| "observable_bases", | |
| "observable_mode", | |
| "backend_device", | |
| "precision_mode", | |
| "circuit_signature", | |
| "entanglement", | |
| TARGET_COL, | |
| } | |
| SOFT_EXCLUDE_PATTERNS = ["ideal_", "noisy_", "error_", "sign_ideal_", "sign_noisy_"] | |
| _ASSET_CACHE: Dict[str, pd.DataFrame] = {} | |
| def load_dataset_df(dataset_key: str) -> pd.DataFrame: | |
| """Load a dataset shard from Hugging Face and cache it in memory.""" | |
| if dataset_key not in _ASSET_CACHE: | |
| logger.info("Loading dataset from Hugging Face: %s", dataset_key) | |
| ds = load_dataset(REPO_CONFIG[dataset_key]) | |
| df = pd.DataFrame(ds["train"]) | |
| df = enrich_dataframe(df) | |
| _ASSET_CACHE[dataset_key] = df | |
| return _ASSET_CACHE[dataset_key] | |
| def safe_parse(value): | |
| """Safely parse stringified Python literals.""" | |
| if isinstance(value, str): | |
| try: | |
| return ast.literal_eval(value) | |
| except Exception: | |
| return value | |
| return value | |
| def adjacency_features(adj_value) -> Dict[str, float]: | |
| """Derive basic graph features from an adjacency matrix.""" | |
| parsed = safe_parse(adj_value) | |
| if not isinstance(parsed, list) or len(parsed) == 0: | |
| return { | |
| "adj_edge_count": np.nan, | |
| "adj_density": np.nan, | |
| "adj_degree_mean": np.nan, | |
| "adj_degree_std": np.nan, | |
| } | |
| try: | |
| arr = np.array(parsed, dtype=float) | |
| n = arr.shape[0] | |
| edge_count = float(np.triu(arr, k=1).sum()) | |
| possible_edges = float(n * (n - 1) / 2) | |
| density = edge_count / possible_edges if possible_edges > 0 else np.nan | |
| degrees = arr.sum(axis=1) | |
| return { | |
| "adj_edge_count": edge_count, | |
| "adj_density": density, | |
| "adj_degree_mean": float(np.mean(degrees)), | |
| "adj_degree_std": float(np.std(degrees)), | |
| } | |
| except Exception: | |
| return { | |
| "adj_edge_count": np.nan, | |
| "adj_density": np.nan, | |
| "adj_degree_mean": np.nan, | |
| "adj_degree_std": np.nan, | |
| } | |
| def qasm_features(qasm_value) -> Dict[str, float]: | |
| """Extract lightweight statistics from QASM text.""" | |
| if not isinstance(qasm_value, str) or not qasm_value.strip(): | |
| return { | |
| "qasm_length": np.nan, | |
| "qasm_line_count": np.nan, | |
| "qasm_gate_keyword_count": np.nan, | |
| "qasm_measure_count": np.nan, | |
| "qasm_comment_count": np.nan, | |
| } | |
| text = qasm_value | |
| lines = [line for line in text.splitlines() if line.strip()] | |
| gate_keywords = re.findall( | |
| r"\b(cx|h|x|y|z|rx|ry|rz|u1|u2|u3|u|swap|cz|ccx|rxx|ryy|rzz)\b", | |
| text, | |
| flags=re.IGNORECASE, | |
| ) | |
| measure_count = len(re.findall(r"\bmeasure\b", text, flags=re.IGNORECASE)) | |
| comment_count = sum(1 for line in lines if line.strip().startswith("//")) | |
| return { | |
| "qasm_length": float(len(text)), | |
| "qasm_line_count": float(len(lines)), | |
| "qasm_gate_keyword_count": float(len(gate_keywords)), | |
| "qasm_measure_count": float(measure_count), | |
| "qasm_comment_count": float(comment_count), | |
| } | |
| def enrich_dataframe(df: pd.DataFrame) -> pd.DataFrame: | |
| """Add derived numeric features for regression.""" | |
| df = df.copy() | |
| if "adjacency" in df.columns: | |
| adj_df = df["adjacency"].apply(adjacency_features).apply(pd.Series) | |
| df = pd.concat([df, adj_df], axis=1) | |
| qasm_source = "qasm_transpiled" if "qasm_transpiled" in df.columns else "qasm_raw" | |
| if qasm_source in df.columns: | |
| qasm_df = df[qasm_source].apply(qasm_features).apply(pd.Series) | |
| df = pd.concat([df, qasm_df], axis=1) | |
| return df | |
| def load_guide_content() -> str: | |
| """Load the markdown guide if it exists.""" | |
| try: | |
| with open("GUIDE.md", "r", encoding="utf-8") as f: | |
| return f.read() | |
| except FileNotFoundError: | |
| return "# Guide\n\nGuide file not found." | |
| def get_available_feature_columns(df: pd.DataFrame) -> List[str]: | |
| """Collect numeric feature columns, excluding the target and metadata.""" | |
| numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist() | |
| features = [] | |
| for col in numeric_cols: | |
| if col in NON_FEATURE_COLS: | |
| continue | |
| if any(pattern in col for pattern in SOFT_EXCLUDE_PATTERNS): | |
| continue | |
| features.append(col) | |
| return sorted(features) | |
| def default_feature_selection(features: List[str]) -> List[str]: | |
| """Select a stable default feature subset.""" | |
| preferred = [ | |
| "gate_entropy", | |
| "adj_density", | |
| "adj_degree_mean", | |
| "adj_degree_std", | |
| "depth", | |
| "total_gates", | |
| "single_qubit_gates", | |
| "two_qubit_gates", | |
| "qasm_length", | |
| "qasm_line_count", | |
| "qasm_gate_keyword_count", | |
| ] | |
| selected = [feature for feature in preferred if feature in features] | |
| return selected[:8] if selected else features[:8] | |
| def make_regression_figure( | |
| y_true: np.ndarray, | |
| y_pred: np.ndarray, | |
| feature_names: Optional[List[str]] = None, | |
| importances: Optional[np.ndarray] = None, | |
| ) -> plt.Figure: | |
| """Create a compact regression summary figure.""" | |
| fig = plt.figure(figsize=(20, 6)) | |
| gs = fig.add_gridspec(1, 3) | |
| ax1 = fig.add_subplot(gs[0, 0]) | |
| ax2 = fig.add_subplot(gs[0, 1]) | |
| ax3 = fig.add_subplot(gs[0, 2]) | |
| ax1.scatter(y_true, y_pred, alpha=0.75) | |
| min_v = min(float(np.min(y_true)), float(np.min(y_pred))) | |
| max_v = max(float(np.max(y_true)), float(np.max(y_pred))) | |
| ax1.plot([min_v, max_v], [min_v, max_v], linestyle="--") | |
| ax1.set_title("Actual vs Predicted") | |
| ax1.set_xlabel("Actual cx_count") | |
| ax1.set_ylabel("Predicted cx_count") | |
| residuals = y_true - y_pred | |
| ax2.hist(residuals, bins=20) | |
| ax2.set_title("Residual Distribution") | |
| ax2.set_xlabel("Residual") | |
| ax2.set_ylabel("Count") | |
| if importances is not None and feature_names is not None and len(importances) == len(feature_names): | |
| idx = np.argsort(importances)[-10:] | |
| ax3.barh([feature_names[i] for i in idx], importances[idx]) | |
| ax3.set_title("Top-10 Feature Importances") | |
| ax3.set_xlabel("Importance") | |
| else: | |
| ax3.text(0.5, 0.5, "Feature importances are unavailable.", ha="center", va="center") | |
| ax3.set_axis_off() | |
| fig.tight_layout() | |
| return fig | |
| def build_dataset_profile(df: pd.DataFrame) -> str: | |
| """Build a short dataset summary for the explorer tab.""" | |
| target = df[TARGET_COL] if TARGET_COL in df.columns else None | |
| if target is None: | |
| return "### Dataset profile\n\nTarget column not found." | |
| return ( | |
| f"### Dataset profile\n\n" | |
| f"**Rows:** {len(df):,} \n" | |
| f"**Columns:** {len(df.columns):,} \n" | |
| f"**{TARGET_COL} mean:** {target.mean():.4f} \n" | |
| f"**{TARGET_COL} std:** {target.std():.4f} \n" | |
| f"**{TARGET_COL} min/max:** {target.min():.4f} / {target.max():.4f}" | |
| ) | |
| def refresh_explorer(dataset_key: str, split_name: str) -> Tuple[gr.update, pd.DataFrame, str, str, str, str]: | |
| """Refresh the explorer tab when the dataset or split changes.""" | |
| df = load_dataset_df(dataset_key) | |
| splits = df["split"].dropna().unique().tolist() if "split" in df.columns else ["train"] | |
| if not splits: | |
| splits = ["train"] | |
| if split_name not in splits: | |
| split_name = splits[0] | |
| filtered = df[df["split"] == split_name] if "split" in df.columns else df | |
| display_df = filtered.head(12).copy() | |
| raw_qasm = display_df["qasm_raw"].iloc[0] if "qasm_raw" in display_df.columns and not display_df.empty else "// N/A" | |
| transpiled_qasm = display_df["qasm_transpiled"].iloc[0] if "qasm_transpiled" in display_df.columns and not display_df.empty else "// N/A" | |
| profile_box = build_dataset_profile(df) | |
| summary_box = ( | |
| f"### Split summary\n\n" | |
| f"**Dataset:** `{dataset_key}` \n" | |
| f"**Available splits:** {', '.join(splits)} \n" | |
| f"**Preview rows:** {len(display_df)}" | |
| ) | |
| return ( | |
| gr.update(choices=splits, value=split_name), | |
| display_df, | |
| raw_qasm, | |
| transpiled_qasm, | |
| profile_box, | |
| summary_box, | |
| ) | |
| def sync_feature_picker(dataset_key: str) -> gr.update: | |
| """Refresh the feature list for the selected dataset.""" | |
| df = load_dataset_df(dataset_key) | |
| features = get_available_feature_columns(df) | |
| defaults = default_feature_selection(features) | |
| return gr.update(choices=features, value=defaults) | |
| def train_regressor( | |
| dataset_key: str, | |
| feature_columns: List[str], | |
| test_size: float, | |
| n_estimators: int, | |
| max_depth: float, | |
| random_state: float, | |
| ) -> Tuple[Optional[plt.Figure], str]: | |
| """Train a regression model and report evaluation metrics.""" | |
| if not feature_columns: | |
| return None, "### β Please select at least one feature." | |
| df = load_dataset_df(dataset_key) | |
| required_cols = feature_columns + [TARGET_COL] | |
| train_df = df.dropna(subset=required_cols).copy() | |
| if len(train_df) < 10: | |
| return None, "### β Not enough clean rows after filtering missing values." | |
| X = train_df[feature_columns] | |
| y = train_df[TARGET_COL] | |
| seed = int(random_state) | |
| depth = int(max_depth) if max_depth and int(max_depth) > 0 else None | |
| trees = int(n_estimators) | |
| X_train, X_test, y_train, y_test = train_test_split( | |
| X, | |
| y, | |
| test_size=test_size, | |
| random_state=seed, | |
| ) | |
| model = Pipeline( | |
| steps=[ | |
| ("imputer", SimpleImputer(strategy="median")), | |
| ("scaler", StandardScaler()), | |
| ( | |
| "regressor", | |
| RandomForestRegressor( | |
| n_estimators=trees, | |
| max_depth=depth, | |
| random_state=seed, | |
| n_jobs=-1, | |
| ), | |
| ), | |
| ] | |
| ) | |
| model.fit(X_train, y_train) | |
| y_pred = model.predict(X_test) | |
| rmse = float(np.sqrt(mean_squared_error(y_test, y_pred))) | |
| mae = float(mean_absolute_error(y_test, y_pred)) | |
| r2 = float(r2_score(y_test, y_pred)) | |
| regressor = model.named_steps["regressor"] | |
| importances = getattr(regressor, "feature_importances_", None) | |
| fig = make_regression_figure(y_test.to_numpy(), y_pred, list(feature_columns), importances) | |
| results = ( | |
| "### Regression results\n\n" | |
| f"**Rows used:** {len(train_df):,} \n" | |
| f"**Test size:** {test_size:.0%} \n" | |
| f"**RMSE:** {rmse:.4f} \n" | |
| f"**MAE:** {mae:.4f} \n" | |
| f"**RΒ²:** {r2:.4f}\n\n" | |
| "The closer the scatter points are to the diagonal line, the better the model." | |
| ) | |
| return fig, results | |
| CUSTOM_CSS = """ | |
| .gradio-container { | |
| max-width: 1400px !important; | |
| } | |
| footer { | |
| margin-top: 1rem; | |
| } | |
| """ | |
| with gr.Blocks(title=APP_TITLE) as demo: | |
| gr.Markdown(f"# π {APP_TITLE}") | |
| gr.Markdown(APP_SUBTITLE) | |
| with gr.Tabs(): | |
| with gr.TabItem("π Explorer"): | |
| dataset_dropdown = gr.Dropdown( | |
| list(REPO_CONFIG.keys()), | |
| value="Amplitude Damping", | |
| label="Dataset", | |
| ) | |
| split_dropdown = gr.Dropdown( | |
| ["train"], | |
| value="train", | |
| label="Split", | |
| ) | |
| profile_box = gr.Markdown(value="### Loading dataset...") | |
| summary_box = gr.Markdown(value="### Loading split summary...") | |
| explorer_df = gr.Dataframe(label="Preview", interactive=False) | |
| with gr.Row(): | |
| raw_qasm = gr.Code(label="Raw QASM", language=None) | |
| transpiled_qasm = gr.Code(label="Transpiled QASM", language=None) | |
| with gr.TabItem("π§ Regression"): | |
| feature_picker = gr.CheckboxGroup(label="Input features", choices=[]) | |
| test_size = gr.Slider(0.1, 0.4, value=0.2, step=0.05, label="Test split") | |
| n_estimators = gr.Slider(50, 400, value=200, step=10, label="Trees") | |
| max_depth = gr.Slider(1, 30, value=12, step=1, label="Max depth") | |
| seed = gr.Number(value=42, precision=0, label="Random seed") | |
| run_btn = gr.Button("Train & Evaluate", variant="primary") | |
| plot = gr.Plot() | |
| metrics = gr.Markdown() | |
| with gr.TabItem("π Guide"): | |
| gr.Markdown(load_guide_content()) | |
| gr.Markdown("---") | |
| gr.Markdown( | |
| "### π Links\n" | |
| "[Website](https://qsbench.github.io) | " | |
| "[Hugging Face](https://huggingface.co/QSBench) | " | |
| "[GitHub](https://github.com/QSBench)" | |
| ) | |
| dataset_dropdown.change( | |
| refresh_explorer, | |
| [dataset_dropdown, split_dropdown], | |
| [split_dropdown, explorer_df, raw_qasm, transpiled_qasm, profile_box, summary_box], | |
| ) | |
| split_dropdown.change( | |
| refresh_explorer, | |
| [dataset_dropdown, split_dropdown], | |
| [split_dropdown, explorer_df, raw_qasm, transpiled_qasm, profile_box, summary_box], | |
| ) | |
| dataset_dropdown.change(sync_feature_picker, [dataset_dropdown], [feature_picker]) | |
| run_btn.click( | |
| train_regressor, | |
| [dataset_dropdown, feature_picker, test_size, n_estimators, max_depth, seed], | |
| [plot, metrics], | |
| ) | |
| demo.load( | |
| refresh_explorer, | |
| [dataset_dropdown, split_dropdown], | |
| [split_dropdown, explorer_df, raw_qasm, transpiled_qasm, profile_box, summary_box], | |
| ) | |
| demo.load(sync_feature_picker, [dataset_dropdown], [feature_picker]) | |
| if __name__ == "__main__": | |
| demo.launch(theme=gr.themes.Soft(), css=CUSTOM_CSS) | |