propertyverification / models /performance_optimizer.py
sksameermujahid's picture
Upload 23 files
6e3dbdb verified
# models/performance_optimizer.py
import functools
import time
import threading
from typing import Dict, Any, Optional
from .logging_config import logger
class PerformanceOptimizer:
"""Performance optimization utilities for the property verification system"""
def __init__(self):
self._cache = {}
self._cache_lock = threading.Lock()
self._cache_ttl = 300 # 5 minutes cache TTL
self._cache_timestamps = {}
def cache_result(self, key: str, result: Any, ttl: int = None) -> None:
"""Cache a result with TTL"""
with self._cache_lock:
self._cache[key] = result
self._cache_timestamps[key] = time.time() + (ttl or self._cache_ttl)
def get_cached_result(self, key: str) -> Optional[Any]:
"""Get cached result if not expired"""
with self._cache_lock:
if key in self._cache:
if time.time() < self._cache_timestamps.get(key, 0):
return self._cache[key]
else:
# Remove expired cache entry
del self._cache[key]
if key in self._cache_timestamps:
del self._cache_timestamps[key]
return None
def clear_cache(self) -> None:
"""Clear all cached results"""
with self._cache_lock:
self._cache.clear()
self._cache_timestamps.clear()
def get_cache_stats(self) -> Dict[str, Any]:
"""Get cache statistics"""
with self._cache_lock:
return {
'cache_size': len(self._cache),
'cache_keys': list(self._cache.keys()),
'cache_ttl': self._cache_ttl
}
# Global performance optimizer instance
performance_optimizer = PerformanceOptimizer()
def timed_function(func):
"""Decorator to time function execution"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = func(*args, **kwargs)
execution_time = time.time() - start_time
logger.info(f"{func.__name__} executed in {execution_time:.2f} seconds")
return result
except Exception as e:
execution_time = time.time() - start_time
logger.error(f"{func.__name__} failed after {execution_time:.2f} seconds: {str(e)}")
raise
return wrapper
def cached_function(ttl: int = 300):
"""Decorator to cache function results"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
# Create cache key from function name and arguments
cache_key = f"{func.__name__}:{hash(str(args) + str(sorted(kwargs.items())))}"
# Try to get cached result
cached_result = performance_optimizer.get_cached_result(cache_key)
if cached_result is not None:
logger.debug(f"Cache hit for {func.__name__}")
return cached_result
# Execute function and cache result
result = func(*args, **kwargs)
performance_optimizer.cache_result(cache_key, result, ttl)
logger.debug(f"Cached result for {func.__name__}")
return result
return wrapper
return decorator
def optimize_model_loading():
"""Optimize model loading for better performance"""
try:
from .model_loader import load_model
# Pre-load only essential models in background threads
import concurrent.futures
import threading
def load_model_async(model_name):
try:
model = load_model(model_name)
logger.info(f"Pre-loaded model: {model_name}")
return model
except Exception as e:
logger.warning(f"Failed to pre-load model {model_name}: {str(e)}")
return None
# Load only essential models in parallel with timeout
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: # Reduced workers
model_names = [
"zero-shot-classification", # Most important
"summarization" # Second most important
]
futures = {executor.submit(load_model_async, name): name for name in model_names}
for future in concurrent.futures.as_completed(futures, timeout=30): # 30 second timeout
model_name = futures[future]
try:
future.result()
except Exception as e:
logger.error(f"Error pre-loading {model_name}: {str(e)}")
logger.info("Model pre-loading optimization completed")
except Exception as e:
logger.error(f"Error in model loading optimization: {str(e)}")
def optimize_image_processing():
"""Optimize image processing for better performance"""
try:
from PIL import Image
import io
# Set PIL to use optimized settings
Image.MAX_IMAGE_PIXELS = None # Allow large images
# Optimize JPEG quality for faster processing
def optimize_image(img, max_size=1024):
"""Optimize image for faster processing"""
if max(img.size) > max_size:
img.thumbnail((max_size, max_size), Image.Resampling.LANCZOS)
return img
return optimize_image
except Exception as e:
logger.error(f"Error in image processing optimization: {str(e)}")
return lambda img, max_size=1024: img
def get_performance_metrics():
"""Get current performance metrics"""
import psutil
import os
try:
process = psutil.Process(os.getpid())
memory_info = process.memory_info()
return {
'memory_usage_mb': memory_info.rss / 1024 / 1024,
'cpu_percent': process.cpu_percent(),
'cache_stats': performance_optimizer.get_cache_stats(),
'thread_count': threading.active_count()
}
except Exception as e:
logger.error(f"Error getting performance metrics: {str(e)}")
return {
'memory_usage_mb': 0,
'cpu_percent': 0,
'cache_stats': {},
'thread_count': 0
}