|
|
|
|
|
import functools
|
|
import time
|
|
import threading
|
|
from typing import Dict, Any, Optional
|
|
from .logging_config import logger
|
|
|
|
class PerformanceOptimizer:
|
|
"""Performance optimization utilities for the property verification system"""
|
|
|
|
def __init__(self):
|
|
self._cache = {}
|
|
self._cache_lock = threading.Lock()
|
|
self._cache_ttl = 300
|
|
self._cache_timestamps = {}
|
|
|
|
def cache_result(self, key: str, result: Any, ttl: int = None) -> None:
|
|
"""Cache a result with TTL"""
|
|
with self._cache_lock:
|
|
self._cache[key] = result
|
|
self._cache_timestamps[key] = time.time() + (ttl or self._cache_ttl)
|
|
|
|
def get_cached_result(self, key: str) -> Optional[Any]:
|
|
"""Get cached result if not expired"""
|
|
with self._cache_lock:
|
|
if key in self._cache:
|
|
if time.time() < self._cache_timestamps.get(key, 0):
|
|
return self._cache[key]
|
|
else:
|
|
|
|
del self._cache[key]
|
|
if key in self._cache_timestamps:
|
|
del self._cache_timestamps[key]
|
|
return None
|
|
|
|
def clear_cache(self) -> None:
|
|
"""Clear all cached results"""
|
|
with self._cache_lock:
|
|
self._cache.clear()
|
|
self._cache_timestamps.clear()
|
|
|
|
def get_cache_stats(self) -> Dict[str, Any]:
|
|
"""Get cache statistics"""
|
|
with self._cache_lock:
|
|
return {
|
|
'cache_size': len(self._cache),
|
|
'cache_keys': list(self._cache.keys()),
|
|
'cache_ttl': self._cache_ttl
|
|
}
|
|
|
|
|
|
performance_optimizer = PerformanceOptimizer()
|
|
|
|
def timed_function(func):
|
|
"""Decorator to time function execution"""
|
|
@functools.wraps(func)
|
|
def wrapper(*args, **kwargs):
|
|
start_time = time.time()
|
|
try:
|
|
result = func(*args, **kwargs)
|
|
execution_time = time.time() - start_time
|
|
logger.info(f"{func.__name__} executed in {execution_time:.2f} seconds")
|
|
return result
|
|
except Exception as e:
|
|
execution_time = time.time() - start_time
|
|
logger.error(f"{func.__name__} failed after {execution_time:.2f} seconds: {str(e)}")
|
|
raise
|
|
return wrapper
|
|
|
|
def cached_function(ttl: int = 300):
|
|
"""Decorator to cache function results"""
|
|
def decorator(func):
|
|
@functools.wraps(func)
|
|
def wrapper(*args, **kwargs):
|
|
|
|
cache_key = f"{func.__name__}:{hash(str(args) + str(sorted(kwargs.items())))}"
|
|
|
|
|
|
cached_result = performance_optimizer.get_cached_result(cache_key)
|
|
if cached_result is not None:
|
|
logger.debug(f"Cache hit for {func.__name__}")
|
|
return cached_result
|
|
|
|
|
|
result = func(*args, **kwargs)
|
|
performance_optimizer.cache_result(cache_key, result, ttl)
|
|
logger.debug(f"Cached result for {func.__name__}")
|
|
return result
|
|
return wrapper
|
|
return decorator
|
|
|
|
def optimize_model_loading():
|
|
"""Optimize model loading for better performance"""
|
|
try:
|
|
from .model_loader import load_model
|
|
|
|
|
|
import concurrent.futures
|
|
import threading
|
|
|
|
def load_model_async(model_name):
|
|
try:
|
|
model = load_model(model_name)
|
|
logger.info(f"Pre-loaded model: {model_name}")
|
|
return model
|
|
except Exception as e:
|
|
logger.warning(f"Failed to pre-load model {model_name}: {str(e)}")
|
|
return None
|
|
|
|
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
|
|
model_names = [
|
|
"zero-shot-classification",
|
|
"summarization"
|
|
]
|
|
|
|
futures = {executor.submit(load_model_async, name): name for name in model_names}
|
|
|
|
for future in concurrent.futures.as_completed(futures, timeout=30):
|
|
model_name = futures[future]
|
|
try:
|
|
future.result()
|
|
except Exception as e:
|
|
logger.error(f"Error pre-loading {model_name}: {str(e)}")
|
|
|
|
logger.info("Model pre-loading optimization completed")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in model loading optimization: {str(e)}")
|
|
|
|
def optimize_image_processing():
|
|
"""Optimize image processing for better performance"""
|
|
try:
|
|
from PIL import Image
|
|
import io
|
|
|
|
|
|
Image.MAX_IMAGE_PIXELS = None
|
|
|
|
|
|
def optimize_image(img, max_size=1024):
|
|
"""Optimize image for faster processing"""
|
|
if max(img.size) > max_size:
|
|
img.thumbnail((max_size, max_size), Image.Resampling.LANCZOS)
|
|
return img
|
|
|
|
return optimize_image
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in image processing optimization: {str(e)}")
|
|
return lambda img, max_size=1024: img
|
|
|
|
def get_performance_metrics():
|
|
"""Get current performance metrics"""
|
|
import psutil
|
|
import os
|
|
|
|
try:
|
|
process = psutil.Process(os.getpid())
|
|
memory_info = process.memory_info()
|
|
|
|
return {
|
|
'memory_usage_mb': memory_info.rss / 1024 / 1024,
|
|
'cpu_percent': process.cpu_percent(),
|
|
'cache_stats': performance_optimizer.get_cache_stats(),
|
|
'thread_count': threading.active_count()
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error getting performance metrics: {str(e)}")
|
|
return {
|
|
'memory_usage_mb': 0,
|
|
'cpu_percent': 0,
|
|
'cache_stats': {},
|
|
'thread_count': 0
|
|
} |