Spaces:
Sleeping
Sleeping
| import numpy as np | |
| import time | |
| from typing import Dict, Any, Optional, Tuple, Union, List | |
| from enum import Enum | |
| from tensor_core import TensorCoreArray | |
| class VectorOperation(Enum): | |
| """Enumeration of supported vector operations.""" | |
| ADD = "add" | |
| SUBTRACT = "subtract" | |
| MULTIPLY = "multiply" | |
| DIVIDE = "divide" | |
| DOT_PRODUCT = "dot_product" | |
| CROSS_PRODUCT = "cross_product" | |
| NORMALIZE = "normalize" | |
| MAGNITUDE = "magnitude" | |
| class AIAccelerator: | |
| """ | |
| AI Accelerator that simulates GPU-based AI computations using HTTP storage. | |
| This class leverages NumPy's optimized operations to simulate the parallel | |
| processing capabilities of the vGPU for AI workloads. | |
| """ | |
| def __init__(self, vram=None, num_sms: int = 800, cores_per_sm: int = 222, storage=None): | |
| """Initialize AI Accelerator with electron-speed awareness and shared HTTP storage.""" | |
| from electron_speed import TARGET_SWITCHES_PER_SEC, TRANSISTORS_ON_CHIP, drift_velocity | |
| self.storage = storage # Use the shared storage instance | |
| if self.storage is None: | |
| from http_storage import HTTPGPUStorage | |
| self.storage = HTTPGPUStorage() # Create HTTP storage instead of WebSocket | |
| if not self.storage.wait_for_connection(): | |
| raise RuntimeError("Could not connect to GPU storage server") | |
| self.vram = vram | |
| self.num_sms = num_sms | |
| self.cores_per_sm = cores_per_sm | |
| self.total_cores = num_sms * cores_per_sm | |
| # Configure for maximum parallel processing at electron speed | |
| total_tensor_cores = num_sms * cores_per_sm # Use ALL cores for tensor operations | |
| self.tensor_core_array = TensorCoreArray( | |
| num_tensor_cores=total_tensor_cores, | |
| bits=32, | |
| bandwidth_tbps=drift_velocity / 1e-12 # Bandwidth scaled to electron drift speed | |
| ) | |
| self.tensor_cores_initialized = False | |
| # Initialize model, tensor, and tokenizer tracking | |
| self.model_registry: Dict[str, Dict[str, Any]] = {} # Track loaded models | |
| self.tensor_registry: Dict[str, Dict[str, Any]] = {} # Track tensor metadata | |
| self.tokenizer_registry: Dict[str, Any] = {} # Track tokenizers | |
| self.resource_monitor = { | |
| 'vram_used': 0, | |
| 'active_tensors': 0, | |
| 'loaded_models': set() | |
| } | |
| # AI operation statistics | |
| self.operations_performed = 0 | |
| self.total_compute_time = 0.0 | |
| self.flops_performed = 0 | |
| # HTTP-based memory management | |
| self.model_registry = {} # Track loaded models | |
| self.matrix_registry = {} # Track loaded matrices | |
| self.matrix_counter = 0 | |
| self.activation_cache: Dict[str, str] = {} # Cache activation outputs | |
| self.weight_cache: Dict[str, Any] = {} # Cache preprocessed weights | |
| # Model registries | |
| self.model_registry: Dict[str, Any] = {} | |
| self.tokenizer_registry: Dict[str, Any] = {} | |
| self.model_configs: Dict[str, Any] = {} # Store model architectures | |
| self.model_loaded = False | |
| # Batch processing configuration | |
| self.max_batch_size = 64 | |
| self.min_batch_size = 4 | |
| self.dynamic_batching = True # Enable automatic batch size adjustment | |
| def _serialize_model_config(self, config: Any) -> dict: | |
| """Convert model config to a serializable format.""" | |
| # Handle None case first | |
| if config is None: | |
| return None | |
| # Handle Florence2LanguageConfig specifically | |
| if config.__class__.__name__ == "Florence2LanguageConfig": | |
| try: | |
| return { | |
| "type": "Florence2LanguageConfig", | |
| "model_type": getattr(config, "model_type", ""), | |
| "architectures": getattr(config, "architectures", []), | |
| "hidden_size": getattr(config, "hidden_size", 0), | |
| "num_attention_heads": getattr(config, "num_attention_heads", 0), | |
| "num_hidden_layers": getattr(config, "num_hidden_layers", 0), | |
| "intermediate_size": getattr(config, "intermediate_size", 0), | |
| "max_position_embeddings": getattr(config, "max_position_embeddings", 0), | |
| "layer_norm_eps": getattr(config, "layer_norm_eps", 1e-12), | |
| "vocab_size": getattr(config, "vocab_size", 0) | |
| } | |
| except Exception as e: | |
| print(f"Warning: Error serializing Florence2LanguageConfig: {e}") | |
| return {"type": "Florence2LanguageConfig", "error": str(e)} | |
| # Handle standard types | |
| if isinstance(config, (int, float, str, bool)): | |
| return config | |
| # Handle lists and tuples | |
| if isinstance(config, (list, tuple)): | |
| return [self._serialize_model_config(item) for item in config] | |
| # Handle dictionaries | |
| if isinstance(config, dict): | |
| return {k: self._serialize_model_config(v) for k, v in config.items()} | |
| # Handle objects with __dict__ | |
| if hasattr(config, '__dict__'): | |
| config_dict = {} | |
| for key, value in config.__dict__.items(): | |
| try: | |
| # Skip private attributes | |
| if key.startswith('_'): | |
| continue | |
| config_dict[key] = self._serialize_model_config(value) | |
| except Exception as e: | |
| print(f"Warning: Error serializing attribute {key}: {e}") | |
| config_dict[key] = str(value) | |
| return config_dict | |
| # Fallback: convert to string representation | |
| try: | |
| return str(config) | |
| except Exception as e: | |
| return f"<Unserializable object of type {type(config).__name__}: {str(e)}>" | |
| def store_model_state(self, model_name: str, model_info: Dict[str, Any]) -> bool: | |
| """Store model state in HTTP storage with proper serialization.""" | |
| try: | |
| # Convert any non-serializable parts of model_info | |
| serializable_info = self._serialize_model_config(model_info) | |
| # Store in model registry | |
| self.model_registry[model_name] = serializable_info | |
| # Save to storage | |
| if self.storage: | |
| # Store model info | |
| info_success = self.storage.store_state( | |
| "models", | |
| f"{model_name}/info", | |
| serializable_info | |
| ) | |
| # Store model state | |
| state_success = self.storage.store_state( | |
| "models", | |
| f"{model_name}/state", | |
| {"loaded": True, "timestamp": time.time()} | |
| ) | |
| if info_success and state_success: | |
| self.resource_monitor['loaded_models'].add(model_name) | |
| return True | |
| return False | |
| except Exception as e: | |
| print(f"Error storing model state: {str(e)}") | |
| return False | |
| def initialize_tensor_cores(self): | |
| """Initialize tensor cores and verify they're ready for computation""" | |
| if self.tensor_cores_initialized: | |
| return True | |
| try: | |
| # Verify tensor core array is properly initialized | |
| if not hasattr(self, 'tensor_core_array') or self.tensor_core_array is None: | |
| raise RuntimeError("Tensor core array not properly initialized") | |
| # Initialize tensor cores if needed | |
| if hasattr(self.tensor_core_array, 'initialize'): | |
| self.tensor_core_array.initialize() | |
| # Verify VRAM access | |
| if self.vram is None: | |
| raise RuntimeError("VRAM not properly configured") | |
| # Test tensor core functionality with a small computation | |
| test_input = [[1.0, 2.0], [3.0, 4.0]] | |
| # Convert input to numpy array if needed | |
| if isinstance(test_input, list): | |
| test_input = np.array(test_input, dtype=np.float32) | |
| test_result = self.tensor_core_array.matmul(test_input, test_input) | |
| if test_result is None or not isinstance(test_result, (np.ndarray, list)) or len(test_result) == 0: | |
| raise RuntimeError("Tensor core test computation failed") | |
| self.tensor_cores_initialized = True | |
| return True | |
| except Exception as e: | |
| print(f"Failed to initialize tensor cores: {str(e)}") | |
| self.tensor_cores_initialized = False | |
| return False | |
| def set_vram(self, vram): | |
| """Set the VRAM reference.""" | |
| self.vram = vram | |
| def allocate_matrix(self, shape: Tuple[int, ...], dtype=np.float32, | |
| name: Optional[str] = None) -> str: | |
| """Allocate a matrix in VRAM and return its ID.""" | |
| if not self.vram: | |
| raise RuntimeError("VRAM not available") | |
| if name is None: | |
| name = f"matrix_{self.matrix_counter}" | |
| self.matrix_counter += 1 | |
| # Create matrix data | |
| matrix_data = np.zeros(shape, dtype=dtype) | |
| # Store in VRAM using HTTP storage | |
| if self.storage.store_tensor(name, matrix_data): | |
| self.matrix_registry[name] = name | |
| return name | |
| else: | |
| raise RuntimeError(f"Failed to allocate matrix {name}") | |
| def load_matrix(self, matrix_data: np.ndarray, name: Optional[str] = None) -> str: | |
| """Load matrix data into VRAM and return its ID.""" | |
| if name is None: | |
| name = f"matrix_{self.matrix_counter}" | |
| self.matrix_counter += 1 | |
| # Store in VRAM using HTTP storage | |
| if self.storage.store_tensor(name, matrix_data): | |
| self.matrix_registry[name] = name | |
| return name | |
| else: | |
| raise RuntimeError(f"Failed to load matrix {name}") | |
| def get_matrix(self, matrix_id: str) -> Optional[np.ndarray]: | |
| """Retrieve matrix data from VRAM.""" | |
| if matrix_id not in self.matrix_registry: | |
| return None | |
| return self.storage.load_tensor(matrix_id) | |
| def matrix_multiply(self, matrix_a_id: str, matrix_b_id: str, | |
| result_id: Optional[str] = None) -> Optional[str]: | |
| """Perform matrix multiplication using simulated GPU parallelism.""" | |
| start_time = time.time() | |
| # Retrieve matrices from VRAM via HTTP storage | |
| matrix_a = self.get_matrix(matrix_a_id) | |
| matrix_b = self.get_matrix(matrix_b_id) | |
| if matrix_a is None or matrix_b is None: | |
| print(f"Error: Could not retrieve matrices {matrix_a_id} or {matrix_b_id}") | |
| return None | |
| try: | |
| # Check if matrices can be multiplied | |
| if matrix_a.shape[-1] != matrix_b.shape[0]: | |
| print(f"Error: Matrix dimensions incompatible for multiplication: " | |
| f"{matrix_a.shape} x {matrix_b.shape}") | |
| return None | |
| # Route matrix multiplication through the virtual TensorCoreArray | |
| A = matrix_a.tolist() | |
| B = matrix_b.tolist() | |
| result = self.tensor_core_array.matmul(A, B) | |
| result_array = np.array(result) | |
| # Store result in VRAM | |
| if result_id is None: | |
| result_id = f"result_{self.matrix_counter}" | |
| self.matrix_counter += 1 | |
| result_matrix_id = self.load_matrix(result_array, result_id) | |
| # Update statistics | |
| compute_time = time.time() - start_time | |
| self.total_compute_time += compute_time | |
| self.operations_performed += 1 | |
| # Calculate FLOPs (2 * M * N * K for matrix multiplication) | |
| m, k = matrix_a.shape | |
| k2, n = matrix_b.shape | |
| flops = 2 * m * n * k | |
| self.flops_performed += flops | |
| print(f"Matrix multiplication completed: {matrix_a.shape} x {matrix_b.shape} " | |
| f"= {result_array.shape} in {compute_time:.4f}s") | |
| print(f"Simulated {flops:,} FLOPs across {self.total_cores} cores") | |
| return result_matrix_id | |
| except Exception as e: | |
| print(f"Error in matrix multiplication: {e}") | |
| return None | |
| def vector_operation(self, operation: VectorOperation, vector_a_id: str, | |
| vector_b_id: Optional[str] = None, | |
| result_id: Optional[str] = None) -> Optional[str]: | |
| """Perform vector operations using simulated GPU parallelism.""" | |
| start_time = time.time() | |
| # Retrieve vectors from VRAM via HTTP storage | |
| vector_a = self.get_matrix(vector_a_id) | |
| if vector_a is None: | |
| print(f"Error: Could not retrieve vector {vector_a_id}") | |
| return None | |
| vector_b = None | |
| if vector_b_id: | |
| vector_b = self.get_matrix(vector_b_id) | |
| if vector_b is None: | |
| print(f"Error: Could not retrieve vector {vector_b_id}") | |
| return None | |
| try: | |
| result = None | |
| flops = 0 | |
| if operation == VectorOperation.ADD: | |
| if vector_b is None: | |
| raise ValueError("Vector B required for addition") | |
| result = vector_a + vector_b | |
| flops = vector_a.size | |
| elif operation == VectorOperation.SUBTRACT: | |
| if vector_b is None: | |
| raise ValueError("Vector B required for subtraction") | |
| result = vector_a - vector_b | |
| flops = vector_a.size | |
| elif operation == VectorOperation.MULTIPLY: | |
| if vector_b is None: | |
| raise ValueError("Vector B required for multiplication") | |
| result = vector_a * vector_b | |
| flops = vector_a.size | |
| elif operation == VectorOperation.DIVIDE: | |
| if vector_b is None: | |
| raise ValueError("Vector B required for division") | |
| result = vector_a / vector_b | |
| flops = vector_a.size | |
| elif operation == VectorOperation.DOT_PRODUCT: | |
| if vector_b is None: | |
| raise ValueError("Vector B required for dot product") | |
| result = np.dot(vector_a.flatten(), vector_b.flatten()) | |
| flops = 2 * vector_a.size | |
| elif operation == VectorOperation.CROSS_PRODUCT: | |
| if vector_b is None: | |
| raise ValueError("Vector B required for cross product") | |
| if vector_a.size != 3 or vector_b.size != 3: | |
| raise ValueError("Cross product requires 3D vectors") | |
| result = np.cross(vector_a.flatten(), vector_b.flatten()) | |
| flops = 6 # Cross product operations | |
| elif operation == VectorOperation.NORMALIZE: | |
| magnitude = np.linalg.norm(vector_a) | |
| if magnitude == 0: | |
| result = vector_a | |
| else: | |
| result = vector_a / magnitude | |
| flops = vector_a.size + 1 # Division + sqrt | |
| elif operation == VectorOperation.MAGNITUDE: | |
| result = np.array([np.linalg.norm(vector_a)]) | |
| flops = vector_a.size + 1 # Sum of squares + sqrt | |
| else: | |
| raise ValueError(f"Unknown vector operation: {operation}") | |
| # Store result | |
| if result_id is None: | |
| result_id = f"vector_result_{self.matrix_counter}" | |
| self.matrix_counter += 1 | |
| result_vector_id = self.load_matrix(result, result_id) | |
| # Update statistics | |
| compute_time = time.time() - start_time | |
| self.total_compute_time += compute_time | |
| self.operations_performed += 1 | |
| self.flops_performed += flops | |
| print(f"Vector operation {operation.value} completed in {compute_time:.4f}s") | |
| print(f"Simulated {flops:,} FLOPs across {self.total_cores} cores") | |
| return result_vector_id | |
| except Exception as e: | |
| print(f"Error in vector operation: {e}") | |
| return None | |
| def has_model(self, model_id: str) -> bool: | |
| """Check if model is loaded via HTTP storage""" | |
| return self.storage.is_model_loaded(model_id) | |
| def load_model(self, model_id: str, model=None, processor=None) -> bool: | |
| """Load model via HTTP storage""" | |
| try: | |
| # Prepare model data for storage | |
| model_data = None | |
| if model is not None: | |
| # In a real implementation, this would serialize the model | |
| model_data = { | |
| "model_type": type(model).__name__, | |
| "config": self._serialize_model_config(getattr(model, 'config', None)), | |
| "loaded_at": time.time() | |
| } | |
| # Use HTTP storage to load model | |
| success = self.storage.load_model(model_id, model_data=model_data) | |
| if success: | |
| self.model_registry[model_id] = { | |
| "model_data": model_data, | |
| "processor": processor, | |
| "loaded_at": time.time() | |
| } | |
| self.resource_monitor['loaded_models'].add(model_id) | |
| return True | |
| return False | |
| except Exception as e: | |
| print(f"Error loading model {model_id}: {str(e)}") | |
| return False | |
| def inference(self, model_id: str, input_tensor_id: str) -> Optional[np.ndarray]: | |
| """Run inference using HTTP storage""" | |
| try: | |
| # Load input tensor | |
| input_data = self.storage.load_tensor(input_tensor_id) | |
| if input_data is None: | |
| print(f"Could not load input tensor {input_tensor_id}") | |
| return None | |
| # Run inference via HTTP API | |
| result = self.storage.start_inference(model_id, input_data) | |
| if result and result.get('output') is not None: | |
| return result['output'] | |
| else: | |
| print(f"Inference failed for model {model_id}") | |
| return None | |
| except Exception as e: | |
| print(f"Error during inference: {str(e)}") | |
| return None | |
| def get_stats(self) -> Dict[str, Any]: | |
| """Get AI accelerator statistics""" | |
| return { | |
| "operations_performed": self.operations_performed, | |
| "total_compute_time": self.total_compute_time, | |
| "flops_performed": self.flops_performed, | |
| "avg_ops_per_second": self.operations_performed / max(self.total_compute_time, 0.001), | |
| "tensor_cores_initialized": self.tensor_cores_initialized, | |
| "total_cores": self.total_cores, | |
| "loaded_models": list(self.resource_monitor['loaded_models']), | |
| "storage_status": self.storage.get_connection_status() if self.storage else None | |
| } | |