Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python | |
| # coding=utf-8 | |
| """ | |
| Psc_Predict MCP Server | |
| Perovskite Solar Cell Performance Prediction MCP Service | |
| Using FastMCP framework with SSE transport | |
| Designed for HuggingFace Docker deployment | |
| """ | |
| import os | |
| import re | |
| import pickle | |
| import logging | |
| from typing import Dict, List, Optional, Any | |
| import numpy as np | |
| import torch | |
| import torch.nn as nn | |
| from fastmcp import FastMCP | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Initialize FastMCP server | |
| mcp = FastMCP("Psc_Predict") | |
| # ============ CIF Parser ============ | |
| class CIFParser: | |
| """Extract crystallographic features from CIF content (93 dimensions)""" | |
| def __init__(self): | |
| self.elements = [ | |
| 'H', 'Li', 'Be', 'B', 'C', 'N', 'O', 'F', 'Na', 'Mg', 'Al', 'Si', 'P', 'S', 'Cl', 'K', 'Ca', | |
| 'Ti', 'V', 'Cr', 'Mn', 'Fe', 'Co', 'Ni', 'Cu', 'Zn', 'Ga', 'Ge', 'As', 'Se', 'Br', 'Rb', | |
| 'Sr', 'Y', 'Zr', 'Nb', 'Mo', 'Tc', 'Ru', 'Rh', 'Pd', 'Ag', 'Cd', 'In', 'Sn', 'Sb', 'Te', | |
| 'I', 'Cs', 'Ba', 'La', 'Ce', 'Pr', 'Nd', 'Pm', 'Sm', 'Eu', 'Gd', 'Tb', 'Dy', 'Ho', 'Er', | |
| 'Tm', 'Yb', 'Lu', 'Hf', 'Ta', 'W', 'Re', 'Os', 'Ir', 'Pt', 'Au', 'Hg', 'Tl', 'Pb', 'Bi', | |
| 'Po', 'At', 'Rn', 'Fr', 'Ra', 'Ac', 'Th', 'Pa', 'U' | |
| ] | |
| self.elem_to_idx = {e: i for i, e in enumerate(self.elements)} | |
| def parse(self, cif_text: str) -> np.ndarray: | |
| """Parse CIF string and return 93-dimensional feature vector""" | |
| # Handle escaped newlines | |
| if "\\n" in cif_text: | |
| cif_text = cif_text.replace("\\n", "\n") | |
| # A. Extract lattice parameters (7 dimensions) | |
| patterns = { | |
| 'a': r"_cell_length_a\s+([\d\.]+)", | |
| 'b': r"_cell_length_b\s+([\d\.]+)", | |
| 'c': r"_cell_length_c\s+([\d\.]+)", | |
| 'alpha': r"_cell_angle_alpha\s+([\d\.]+)", | |
| 'beta': r"_cell_angle_beta\s+([\d\.]+)", | |
| 'gamma': r"_cell_angle_gamma\s+([\d\.]+)", | |
| 'vol': r"_cell_volume\s+([\d\.]+)" | |
| } | |
| lattice_feats = [] | |
| for key, pat in patterns.items(): | |
| match = re.search(pat, cif_text) | |
| val = float(match.group(1)) if match else 0.0 | |
| lattice_feats.append(val) | |
| # B. Extract element composition (86 dimensions) | |
| chem_match = re.search(r"_chemical_formula_sum\s+'?([^'\n]+)'?", cif_text) | |
| elem_vec = np.zeros(len(self.elements)) | |
| if chem_match: | |
| formula = chem_match.group(1) | |
| parts = formula.replace("'", "").split() | |
| for part in parts: | |
| m = re.match(r"([A-Za-z]+)([\d\.]*)", part) | |
| if m: | |
| el = m.group(1) | |
| num = float(m.group(2)) if m.group(2) else 1.0 | |
| if el in self.elem_to_idx: | |
| elem_vec[self.elem_to_idx[el]] = num | |
| # Normalize element vector | |
| total_atoms = np.sum(elem_vec) | |
| if total_atoms > 0: | |
| elem_vec = elem_vec / total_atoms | |
| return np.concatenate([lattice_feats, elem_vec]) | |
| def get_feature_names(self) -> List[str]: | |
| return ['a', 'b', 'c', 'alpha', 'beta', 'gamma', 'vol'] + self.elements | |
| # ============ Neural Network Model ============ | |
| class MaterialNN(nn.Module): | |
| """Neural Network for material property prediction""" | |
| def __init__(self, input_dim, hidden_dims=[128, 64, 32]): | |
| super(MaterialNN, self).__init__() | |
| layers = [] | |
| in_d = input_dim | |
| for h_d in hidden_dims: | |
| layers.append(nn.Linear(in_d, h_d)) | |
| layers.append(nn.ReLU()) | |
| layers.append(nn.BatchNorm1d(h_d)) | |
| in_d = h_d | |
| layers.append(nn.Linear(in_d, 1)) | |
| self.net = nn.Sequential(*layers) | |
| def forward(self, x): | |
| return self.net(x) | |
| # ============ Model Manager ============ | |
| class ModelManager: | |
| """Manage all pretrained models (XGBoost, Random Forest, Neural Network)""" | |
| TARGETS = ['pce', 'dft_band_gap', 'energy_above_hull', 'stability_retention', | |
| 'stability_t80', 'voc', 'jsc', 'ff'] | |
| MODEL_TYPES = ['xgboost', 'random_forest', 'neural_network'] | |
| TARGET_INFO = { | |
| 'pce': {'name': 'Power Conversion Efficiency', 'unit': '%'}, | |
| 'dft_band_gap': {'name': 'DFT Band Gap', 'unit': 'eV'}, | |
| 'energy_above_hull': {'name': 'Energy Above Hull', 'unit': 'eV/atom'}, | |
| 'stability_retention': {'name': 'Stability Retention', 'unit': '%'}, | |
| 'stability_t80': {'name': 'T80 Lifetime', 'unit': 'hours'}, | |
| 'voc': {'name': 'Open Circuit Voltage', 'unit': 'V'}, | |
| 'jsc': {'name': 'Short Circuit Current Density', 'unit': 'mA/cm²'}, | |
| 'ff': {'name': 'Fill Factor', 'unit': ''} | |
| } | |
| def __init__(self, model_dir: str = "./models"): | |
| self.model_dir = model_dir | |
| self.models: Dict[str, Dict[str, Any]] = { | |
| 'xgboost': {}, | |
| 'random_forest': {}, | |
| 'neural_network': {} | |
| } | |
| self.cif_parser = CIFParser() | |
| self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') | |
| self._load_all_models() | |
| def _load_all_models(self): | |
| """Load all available models""" | |
| # Load XGBoost models | |
| for target in self.TARGETS: | |
| model_path = os.path.join( | |
| self.model_dir, | |
| f"xgboost_{target}_layers-NA_seed-42_batch-32.pkl" | |
| ) | |
| if os.path.exists(model_path): | |
| try: | |
| with open(model_path, 'rb') as f: | |
| self.models['xgboost'][target] = pickle.load(f) | |
| logger.info(f"Loaded XGBoost model for {target}") | |
| except Exception as e: | |
| logger.warning(f"Failed to load XGBoost model for {target}: {e}") | |
| # Load Random Forest models | |
| for target in self.TARGETS: | |
| model_path = os.path.join( | |
| self.model_dir, | |
| f"random_forest_{target}_layers-NA_seed-42_batch-32.pkl" | |
| ) | |
| if os.path.exists(model_path): | |
| try: | |
| with open(model_path, 'rb') as f: | |
| self.models['random_forest'][target] = pickle.load(f) | |
| logger.info(f"Loaded Random Forest model for {target}") | |
| except Exception as e: | |
| logger.warning(f"Failed to load Random Forest model for {target}: {e}") | |
| # Load Neural Network models | |
| for target in self.TARGETS: | |
| model_path = os.path.join( | |
| self.model_dir, | |
| f"neural_network_{target}_layers-128-64-32_seed-42_batch-32.pth" | |
| ) | |
| if os.path.exists(model_path): | |
| try: | |
| model = MaterialNN(input_dim=93, hidden_dims=[128, 64, 32]) | |
| model.load_state_dict(torch.load(model_path, map_location=self.device)) | |
| model.to(self.device) | |
| model.eval() | |
| self.models['neural_network'][target] = model | |
| logger.info(f"Loaded Neural Network model for {target}") | |
| except Exception as e: | |
| logger.warning(f"Failed to load Neural Network model for {target}: {e}") | |
| def predict(self, cif_text: str, targets: Optional[List[str]] = None, | |
| model_type: str = 'xgboost') -> Dict[str, float]: | |
| """Predict specified targets using selected model type""" | |
| if model_type not in self.models: | |
| raise ValueError(f"Unknown model type: {model_type}. Available: {self.MODEL_TYPES}") | |
| if targets is None: | |
| targets = list(self.models[model_type].keys()) | |
| # Parse CIF | |
| features = self.cif_parser.parse(cif_text) | |
| X = features.reshape(1, -1) | |
| # Predict | |
| results = {} | |
| for target in targets: | |
| if target in self.models[model_type]: | |
| model = self.models[model_type][target] | |
| if model_type == 'neural_network': | |
| X_tensor = torch.tensor(X, dtype=torch.float32).to(self.device) | |
| with torch.no_grad(): | |
| pred = model(X_tensor).cpu().numpy().flatten()[0] | |
| else: | |
| pred = model.predict(X)[0] | |
| results[target] = float(pred) | |
| else: | |
| results[target] = None | |
| return results | |
| def get_available_targets(self, model_type: str = 'xgboost') -> List[str]: | |
| """Return available prediction targets for a model type""" | |
| if model_type in self.models: | |
| return list(self.models[model_type].keys()) | |
| return [] | |
| def get_available_models(self) -> Dict[str, List[str]]: | |
| """Return all available models and their targets""" | |
| return { | |
| model_type: list(targets.keys()) | |
| for model_type, targets in self.models.items() | |
| if targets | |
| } | |
| # Global model manager | |
| model_manager: Optional[ModelManager] = None | |
| def get_model_manager() -> ModelManager: | |
| """Get or initialize model manager""" | |
| global model_manager | |
| if model_manager is None: | |
| model_dir = os.environ.get("MODEL_DIR", "./models") | |
| model_manager = ModelManager(model_dir) | |
| return model_manager | |
| # ============ MCP Tools ============ | |
| # Valid model types | |
| VALID_MODEL_TYPES = ['xgboost', 'random_forest', 'neural_network'] | |
| DEFAULT_MODEL_TYPE = 'xgboost' | |
| def parse_cif_features(cif: str) -> Dict[str, Any]: | |
| """ | |
| Parse a CIF file and extract features for model prediction. | |
| Extracts 93-dimensional features: | |
| - 7 lattice parameters (a, b, c, alpha, beta, gamma, volume) | |
| - 86 element composition fractions | |
| Args: | |
| cif: Crystal structure text in CIF format | |
| Returns: | |
| Dictionary containing lattice parameters and element composition | |
| """ | |
| manager = get_model_manager() | |
| features = manager.cif_parser.parse(cif) | |
| feature_names = manager.cif_parser.get_feature_names() | |
| # Separate lattice parameters and element composition | |
| lattice = dict(zip(feature_names[:7], features[:7].tolist())) | |
| # Only return non-zero elements | |
| composition = {} | |
| for i, elem in enumerate(feature_names[7:]): | |
| if features[7 + i] > 0: | |
| composition[elem] = float(features[7 + i]) | |
| return { | |
| "lattice_parameters": lattice, | |
| "composition": composition, | |
| "feature_dim": len(features), | |
| "status": "success" | |
| } | |
| def get_model_info() -> Dict[str, Any]: | |
| """ | |
| Get model information and available prediction targets. | |
| Returns information about: | |
| - Available model types (XGBoost, Random Forest, Neural Network) | |
| - All 8 prediction targets and their availability | |
| - Input feature dimensions | |
| Returns: | |
| Dictionary containing model information | |
| """ | |
| manager = get_model_manager() | |
| # Get available targets for each model type | |
| model_availability = {} | |
| for mt in VALID_MODEL_TYPES: | |
| available = manager.get_available_targets(model_type=mt) | |
| model_availability[mt] = { | |
| "available_targets": available, | |
| "count": len(available) | |
| } | |
| targets_info = [] | |
| for target in ModelManager.TARGETS: | |
| info = ModelManager.TARGET_INFO.get(target, {}) | |
| targets_info.append({ | |
| "id": target, | |
| "name": info.get('name', target), | |
| "unit": info.get('unit', ''), | |
| "xgboost": target in model_availability['xgboost']['available_targets'], | |
| "random_forest": target in model_availability['random_forest']['available_targets'], | |
| "neural_network": target in model_availability['neural_network']['available_targets'] | |
| }) | |
| return { | |
| "available_model_types": VALID_MODEL_TYPES, | |
| "default_model_type": DEFAULT_MODEL_TYPE, | |
| "recommended_model_type": "xgboost", | |
| "input_features": 93, | |
| "targets": targets_info, | |
| "model_availability": model_availability, | |
| "total_targets": len(ModelManager.TARGETS) | |
| } | |
| def list_available_models() -> Dict[str, Any]: | |
| """ | |
| List all available models and their status. | |
| Returns detailed information about which models are loaded and ready for inference. | |
| Returns: | |
| Dictionary containing model availability status for each target and model type | |
| """ | |
| manager = get_model_manager() | |
| models_status = {} | |
| for mt in VALID_MODEL_TYPES: | |
| models_status[mt] = {} | |
| for target in ModelManager.TARGETS: | |
| key = f"{mt}_{target}" | |
| is_loaded = key in manager.models | |
| models_status[mt][target] = { | |
| "loaded": is_loaded, | |
| "status": "ready" if is_loaded else "not_available" | |
| } | |
| return { | |
| "models": models_status, | |
| "model_types": VALID_MODEL_TYPES, | |
| "targets": ModelManager.TARGETS, | |
| "status": "success" | |
| } | |
| def predict_ensemble(cif: str, targets: Optional[List[str]] = None) -> Dict[str, Any]: | |
| """ | |
| Predict using all three model types and return ensemble results with comparison. | |
| Automatically calls XGBoost, Random Forest, and Neural Network models for the same input, | |
| allowing comparison of predictions across different model architectures. | |
| Also provides ensemble statistics (mean, std, min, max) for each target. | |
| Args: | |
| cif: Crystal structure text in CIF format | |
| targets: Optional list of specific targets to predict. If None, predicts all available targets. | |
| Valid targets: pce, dft_band_gap, energy_above_hull, stability_retention, | |
| stability_t80, voc, jsc, ff | |
| Returns: | |
| Dictionary containing predictions from all models and ensemble statistics | |
| """ | |
| import numpy as np | |
| manager = get_model_manager() | |
| # Determine targets to predict | |
| if targets is None: | |
| targets = ModelManager.TARGETS | |
| # Collect predictions from all models | |
| all_predictions = {} | |
| for mt in VALID_MODEL_TYPES: | |
| try: | |
| result = manager.predict(cif, list(targets), model_type=mt) | |
| all_predictions[mt] = result | |
| except Exception as e: | |
| all_predictions[mt] = {"error": str(e)} | |
| # Calculate ensemble statistics for each target | |
| ensemble_results = {} | |
| for target in targets: | |
| values = [] | |
| model_values = {} | |
| for mt in VALID_MODEL_TYPES: | |
| if mt in all_predictions and target in all_predictions[mt]: | |
| val = all_predictions[mt][target] | |
| if val is not None: | |
| values.append(val) | |
| model_values[mt] = val | |
| else: | |
| model_values[mt] = None | |
| else: | |
| model_values[mt] = None | |
| info = ModelManager.TARGET_INFO.get(target, {}) | |
| if values: | |
| ensemble_results[target] = { | |
| "name": info.get('name', target), | |
| "unit": info.get('unit', ''), | |
| "predictions": model_values, | |
| "ensemble": { | |
| "mean": float(np.mean(values)), | |
| "std": float(np.std(values)), | |
| "min": float(np.min(values)), | |
| "max": float(np.max(values)), | |
| "range": float(np.max(values) - np.min(values)), | |
| "n_models": len(values) | |
| }, | |
| "recommendation": _get_best_prediction(target, model_values) | |
| } | |
| else: | |
| ensemble_results[target] = { | |
| "name": info.get('name', target), | |
| "unit": info.get('unit', ''), | |
| "predictions": model_values, | |
| "ensemble": None, | |
| "recommendation": None | |
| } | |
| return { | |
| "targets_predicted": list(targets), | |
| "models_used": VALID_MODEL_TYPES, | |
| "results": ensemble_results, | |
| "raw_predictions": all_predictions, | |
| "status": "success" | |
| } | |
| def _get_best_prediction(target: str, model_values: Dict[str, float]) -> Dict[str, Any]: | |
| """ | |
| Provide recommendation based on model performance characteristics. | |
| XGBoost is generally recommended as it has the best overall performance. | |
| """ | |
| # XGBoost is the recommended model based on benchmark results | |
| if model_values.get('xgboost') is not None: | |
| return { | |
| "model": "xgboost", | |
| "value": model_values['xgboost'], | |
| "reason": "XGBoost recommended - best overall performance in benchmarks" | |
| } | |
| elif model_values.get('random_forest') is not None: | |
| return { | |
| "model": "random_forest", | |
| "value": model_values['random_forest'], | |
| "reason": "Random Forest - fallback when XGBoost unavailable" | |
| } | |
| elif model_values.get('neural_network') is not None: | |
| return { | |
| "model": "neural_network", | |
| "value": model_values['neural_network'], | |
| "reason": "Neural Network - fallback option" | |
| } | |
| return None | |
| # ============ MCP Resources ============ | |
| def get_service_info() -> str: | |
| """Service information""" | |
| return """ | |
| # Psc_Predict MCP Service | |
| Perovskite Solar Cell Performance Prediction Service | |
| ## Features | |
| - Predict 8 performance metrics from CIF crystal structures | |
| - Support for single-target and multi-target prediction | |
| - Multiple model types: XGBoost (recommended), Random Forest, Neural Network | |
| ## Available Model Types | |
| 1. **XGBoost** (default, recommended) - Best overall performance | |
| 2. **Random Forest** - Good interpretability | |
| 3. **Neural Network** - 3-layer MLP (128-64-32) | |
| ## Prediction Targets | |
| 1. PCE - Power Conversion Efficiency (%) | |
| 2. DFT Band Gap - DFT calculated band gap (eV) | |
| 3. Energy Above Hull - Thermodynamic stability (eV/atom) | |
| 4. Stability Retention - Stability retention percentage (%) | |
| 5. Stability T80 - T80 lifetime (hours) | |
| 6. Voc - Open-circuit voltage (V) | |
| 7. Jsc - Short-circuit current density (mA/cm²) | |
| 8. FF - Fill factor | |
| ## Input Format | |
| CIF (Crystallographic Information File) format crystal structure text | |
| ## Usage Example | |
| Call predict_pce(cif, model_type="xgboost") to predict PCE using XGBoost model. | |
| """ | |
| def get_example_cif() -> str: | |
| """Example CIF file for testing""" | |
| return """data_CsPbI3 | |
| _symmetry_space_group_name_H-M 'P m -3 m' | |
| _cell_length_a 6.2894 | |
| _cell_length_b 6.2894 | |
| _cell_length_c 6.2894 | |
| _cell_angle_alpha 90.0 | |
| _cell_angle_beta 90.0 | |
| _cell_angle_gamma 90.0 | |
| _cell_volume 248.89 | |
| _chemical_formula_sum 'Cs1 Pb1 I3' | |
| loop_ | |
| _atom_site_label | |
| _atom_site_type_symbol | |
| _atom_site_fract_x | |
| _atom_site_fract_y | |
| _atom_site_fract_z | |
| Cs1 Cs 0.0 0.0 0.0 | |
| Pb1 Pb 0.5 0.5 0.5 | |
| I1 I 0.5 0.5 0.0 | |
| I2 I 0.5 0.0 0.5 | |
| I3 I 0.0 0.5 0.5 | |
| """ | |
| # ============ Main Entry Point ============ | |
| if __name__ == "__main__": | |
| import sys | |
| # Support command line arguments for transport selection | |
| transport = os.environ.get("MCP_TRANSPORT", "sse") | |
| host = os.environ.get("HOST", "0.0.0.0") | |
| port = int(os.environ.get("PORT", 7860)) | |
| if transport == "stdio": | |
| mcp.run() | |
| else: | |
| # SSE mode (default, for HuggingFace) | |
| mcp.run(transport="sse", host=host, port=port) | |