worker-universal / model_manager.py
Bc-AI's picture
Upload folder using huggingface_hub
af68acb verified
import os
import json
import tensorflow as tf
import keras
import numpy as np
from tokenizers import Tokenizer
from huggingface_hub import hf_hub_download
from transformers import GPT2Tokenizer
import threading
from typing import Dict, Optional
from model_architecture import SAM1Model
class ModelManager:
"""
Manages multiple models and their loading/unloading based on demand
"""
def __init__(self):
self.models: Dict[str, keras.Model] = {}
self.tokenizers: Dict[str, Tokenizer] = {}
self.model_configs: Dict[str, dict] = {}
self.lock = threading.Lock()
# Model mapping
self.model_repos = {
"sam-x-nano": "Smilyai-labs/Sam-nano",
"sam-x-mini": "Smilyai-labs/Sam-mini",
"sam-x-fast": "Smilyai-labs/Sam-fast",
"sam-x-large": "Smilyai-labs/Sam-large-2", # Using Sam-large-2 as the large model
"sam-large-2": "Smilyai-labs/Sam-large-2"
}
# Performance optimizations that should be applied before TF import
NUM_CORES = os.cpu_count() or 4
os.environ['TF_NUM_INTEROP_THREADS'] = str(NUM_CORES)
os.environ['TF_NUM_INTRAOP_THREADS'] = str(NUM_CORES)
os.environ['CUDA_VISIBLE_DEVICES'] = '-1' # Force CPU only for consistency
os.environ['TF_ENABLE_ONEDNN_OPTS'] = '1' # Intel optimization
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' # Reduce TF logging
# Configure TF threading
tf.config.threading.set_inter_op_parallelism_threads(NUM_CORES)
tf.config.threading.set_intra_op_parallelism_threads(NUM_CORES)
print(f"βœ… CPU optimized: {NUM_CORES} threads, oneDNN enabled")
def get_model_repo(self, model_type: str) -> str:
"""Get the Hugging Face repository for a given model type"""
return self.model_repos.get(model_type, self.model_repos["sam-x-large"])
def load_tokenizer(self, model_type: str) -> Tokenizer:
"""Load tokenizer for a specific model type"""
if model_type in self.tokenizers:
return self.tokenizers[model_type]
print(f"πŸš€ Loading tokenizer for {model_type}...")
try:
# Load base tokenizer
from transformers import AutoTokenizer
hf_tokenizer = AutoTokenizer.from_pretrained("gpt2")
# Add special tokens specific to your models
special_tokens = [
"\n", "\n", "\n", "\n",
"<CONTINUE>",
"<im end for model tun>"
]
hf_tokenizer.add_special_tokens({"additional_special_tokens": special_tokens})
# Save temporarily to create tokenizers instance
os.makedirs(f"./temp_tokenizer_{model_type}", exist_ok=True)
hf_tokenizer.save_pretrained(f"./temp_tokenizer_{model_type}")
tokenizer = Tokenizer.from_file(f"./temp_tokenizer_{model_type}/tokenizer.json")
print(f"βœ… Tokenizer loaded for {model_type} with vocab size: {tokenizer.get_vocab_size()}")
self.tokenizers[model_type] = tokenizer
return tokenizer
except Exception as e:
print(f"❌ Error loading tokenizer for {model_type}: {e}")
raise
def load_model(self, model_type: str) -> keras.Model:
"""Load a specific model by type"""
if model_type in self.models:
return self.models[model_type]
print(f"πŸš€ Loading {model_type} model...")
try:
# Get the appropriate model repo
model_repo = self.get_model_repo(model_type)
cache_dir = f"./model_cache/{model_type}"
# Download config
config_path = hf_hub_download(model_repo, "config.json", cache_dir=cache_dir)
with open(config_path, 'r') as f:
config = json.load(f)
# Store model config
self.model_configs[model_type] = config
# Build model from config
model_config = {
'vocab_size': config.get('vocab_size', 50432),
'd_model': config.get('hidden_size', 768),
'n_layers': config.get('num_hidden_layers', 12),
'n_heads': config.get('num_attention_heads', 12),
'ff_mult': config.get('intermediate_size', 3072) / config.get('hidden_size', 768),
'max_len': config.get('max_position_embeddings', 2048),
'dropout': 0.1,
'rope_theta': config.get('rope_theta', 10000)
}
model = SAM1Model(config=model_config)
# Build model with dummy input
dummy_input = tf.zeros((1, 16), dtype=tf.int32)
_ = model(dummy_input, training=False, use_cache=False)
print(f"βœ… Model {model_type} loaded: {config.get('num_hidden_layers', 12)} layers")
# Try to load weights
try:
weights_path = hf_hub_download(model_repo, "model.weights.h5", cache_dir=cache_dir)
model.load_weights(weights_path)
print(f"βœ… Model weights loaded successfully for {model_type}!")
except Exception as e:
print(f"⚠️ Could not load weights for {model_type}, using random initialization: {e}")
# Warm up the model
print(f"πŸ”₯ Warming up model {model_type}...")
warmup_input = tf.constant([[1, 2, 3, 4, 5]], dtype=tf.int32)
_, _ = model(warmup_input, training=False, use_cache=True)
print(f"βœ… Model {model_type} warmed up")
# Store the model
self.models[model_type] = model
return model
except Exception as e:
print(f"❌ Error loading model {model_type}: {e}")
raise
def get_model(self, model_type: str) -> tuple:
"""Get model and tokenizer for a specific type, loading if necessary"""
with self.lock:
# Ensure tokenizer is loaded
if model_type not in self.tokenizers:
self.load_tokenizer(model_type)
# Ensure model is loaded
if model_type not in self.models:
self.load_model(model_type)
return self.models[model_type], self.tokenizers[model_type], self.model_configs[model_type]
def list_available_models(self) -> list:
"""Get list of available model types"""
return list(self.model_repos.keys())
def is_model_loaded(self, model_type: str) -> bool:
"""Check if a model is currently loaded"""
return model_type in self.models