Aglimate / app /utils /model_manager.py
nexusbert's picture
Refactor climate advisory agent to support video input and improve model loading. Update requirements to use the latest transformers from GitHub and add new utility dependencies. Clean up code by removing unnecessary comments and enhancing descriptions for clarity.
c7ece81
import os
import logging
import torch
from typing import Optional
from functools import lru_cache
logging.basicConfig(level=logging.INFO)
_models = {
"expert_model": None,
"expert_tokenizer": None,
"multimodal_model": None,
"multimodal_processor": None,
"translation_model": None,
"translation_tokenizer": None,
"embedder": None,
"lang_identifier": None,
"classifier": None,
}
_device = "cpu"
def get_device():
return _device
def load_expert_model(model_name: str, use_quantization: bool = True):
if _models["expert_model"] is not None:
return _models["expert_tokenizer"], _models["expert_model"]
from transformers import AutoTokenizer, AutoModelForCausalLM
from app.utils import config
logging.info(f"Loading expert model ({model_name})...")
cache_dir = getattr(config, 'hf_cache', '/models/huggingface')
tokenizer = AutoTokenizer.from_pretrained(
model_name,
use_fast=True,
cache_dir=cache_dir
)
model_kwargs = {
"torch_dtype": torch.float32,
"device_map": "cpu",
"low_cpu_mem_usage": True,
}
logging.info("Loading model in float32 for CPU compatibility")
cache_dir = getattr(config, 'hf_cache', '/models/huggingface')
model = AutoModelForCausalLM.from_pretrained(
model_name,
cache_dir=cache_dir,
**model_kwargs
)
model.eval()
_models["expert_model"] = model
_models["expert_tokenizer"] = tokenizer
logging.info("Expert model loaded successfully")
return tokenizer, model
def load_multimodal_model(model_name: str):
"""
Lazy load multimodal Qwen2-VL model (vision-language).
Used for photo/video-aware advisory.
"""
if _models["multimodal_model"] is not None:
return _models["multimodal_processor"], _models["multimodal_model"]
# With latest transformers + qwen-vl-utils, Qwen2VLForConditionalGeneration
# and AutoProcessor support full image/video chat as in official docs.
from transformers import AutoProcessor, Qwen2VLForConditionalGeneration
from app.utils import config
logging.info(f"Loading multimodal expert model ({model_name})...")
cache_dir = getattr(config, "hf_cache", "/models/huggingface")
try:
processor = AutoProcessor.from_pretrained(
model_name,
cache_dir=cache_dir,
)
model = Qwen2VLForConditionalGeneration.from_pretrained(
model_name,
torch_dtype=torch.float32, # CPU deployment
cache_dir=cache_dir,
device_map="cpu",
low_cpu_mem_usage=True,
)
model.eval()
_models["multimodal_model"] = model
_models["multimodal_processor"] = processor
logging.info("Multimodal expert model loaded successfully")
return processor, model
except Exception as e:
logging.error(
f"Failed to load multimodal model {model_name}: {e}. "
"Falling back to text-only expert model."
)
_models["multimodal_model"] = None
_models["multimodal_processor"] = None
return None, None
def load_translation_model(model_name: str):
"""Lazy load translation model."""
if _models["translation_model"] is not None:
return _models["translation_tokenizer"], _models["translation_model"]
from transformers import AutoModelForSeq2SeqLM, NllbTokenizer
from app.utils import config
logging.info(f"Loading translation model ({model_name})...")
cache_dir = getattr(config, 'hf_cache', '/models/huggingface')
tokenizer = NllbTokenizer.from_pretrained(
model_name,
cache_dir=cache_dir
)
model = AutoModelForSeq2SeqLM.from_pretrained(
model_name,
torch_dtype=torch.float32, # CPU uses float32
cache_dir=cache_dir,
device_map="cpu",
low_cpu_mem_usage=True
)
model.eval()
_models["translation_model"] = model
_models["translation_tokenizer"] = tokenizer
logging.info("Translation model loaded successfully")
return tokenizer, model
def load_embedder(model_name: str):
"""Lazy load sentence transformer embedder."""
if _models["embedder"] is not None:
return _models["embedder"]
from sentence_transformers import SentenceTransformer
from app.utils import config
logging.info(f"Loading embedder ({model_name})...")
cache_folder = getattr(config, 'hf_cache', '/models/huggingface')
embedder = SentenceTransformer(
model_name,
device=_device,
cache_folder=cache_folder
)
_models["embedder"] = embedder
logging.info("Embedder loaded successfully")
return embedder
def load_lang_identifier(repo_id: str, filename: str = "model.bin"):
"""Lazy load FastText language identifier."""
if _models["lang_identifier"] is not None:
return _models["lang_identifier"]
import fasttext
from huggingface_hub import hf_hub_download
from app.utils import config
logging.info(f"Loading language identifier ({repo_id})...")
cache_dir = getattr(config, 'hf_cache', '/models/huggingface')
lang_model_path = hf_hub_download(
repo_id=repo_id,
filename=filename,
cache_dir=cache_dir
)
lang_identifier = fasttext.load_model(lang_model_path)
_models["lang_identifier"] = lang_identifier
logging.info("Language identifier loaded successfully")
return lang_identifier
def load_classifier(classifier_path: str):
"""Lazy load intent classifier."""
if _models["classifier"] is not None:
return _models["classifier"]
import joblib
from pathlib import Path
logging.info(f"Loading classifier ({classifier_path})...")
if not Path(classifier_path).exists():
logging.warning(f"Classifier not found at {classifier_path}")
return None
try:
classifier = joblib.load(classifier_path)
_models["classifier"] = classifier
logging.info("Classifier loaded successfully")
return classifier
except Exception as e:
logging.error(f"Failed to load classifier: {e}")
return None
def clear_model_cache():
"""Clear all loaded models from memory."""
global _models
for key in _models:
if _models[key] is not None:
del _models[key]
_models[key] = None
import gc
gc.collect()
logging.info("Model cache cleared")
def get_model_memory_usage():
"""Get approximate memory usage of loaded models."""
usage = {}
if _models["expert_model"] is not None:
# Rough estimate: 4B params * 4 bytes = 16 GB
usage["expert_model"] = "~16 GB"
if _models["translation_model"] is not None:
usage["translation_model"] = "~2-5 GB"
if _models["embedder"] is not None:
usage["embedder"] = "~1 GB"
if _models["lang_identifier"] is not None:
usage["lang_identifier"] = "~200 MB"
return usage