Spaces:
Running
Running
import httpx | |
import asyncio | |
import json | |
from typing import List, Dict, Optional, Any | |
# Assuming config.py is in the same directory level for Docker execution | |
import config as app_config | |
_model_cache: Optional[Dict[str, List[str]]] = None | |
_cache_lock = asyncio.Lock() | |
async def fetch_and_parse_models_config() -> Optional[Dict[str, List[str]]]: | |
""" | |
Fetches the model configuration JSON from the URL specified in app_config. | |
Parses it and returns a dictionary with 'vertex_models' and 'vertex_express_models'. | |
Returns None if fetching or parsing fails. | |
""" | |
if not app_config.MODELS_CONFIG_URL: | |
print("ERROR: MODELS_CONFIG_URL is not set in the environment/config.") | |
return None | |
print(f"Fetching model configuration from: {app_config.MODELS_CONFIG_URL}") | |
try: | |
async with httpx.AsyncClient() as client: | |
response = await client.get(app_config.MODELS_CONFIG_URL) | |
response.raise_for_status() # Raise an exception for HTTP errors (4xx or 5xx) | |
data = response.json() | |
# Basic validation of the fetched data structure | |
if isinstance(data, dict) and \ | |
"vertex_models" in data and isinstance(data["vertex_models"], list) and \ | |
"vertex_express_models" in data and isinstance(data["vertex_express_models"], list): | |
print("Successfully fetched and parsed model configuration.") | |
return { | |
"vertex_models": data["vertex_models"], | |
"vertex_express_models": data["vertex_express_models"] | |
} | |
else: | |
print(f"ERROR: Fetched model configuration has an invalid structure: {data}") | |
return None | |
except httpx.RequestError as e: | |
print(f"ERROR: HTTP request failed while fetching model configuration: {e}") | |
return None | |
except json.JSONDecodeError as e: | |
print(f"ERROR: Failed to decode JSON from model configuration: {e}") | |
return None | |
except Exception as e: | |
print(f"ERROR: An unexpected error occurred while fetching/parsing model configuration: {e}") | |
return None | |
async def get_models_config() -> Dict[str, List[str]]: | |
""" | |
Returns the cached model configuration. | |
If not cached, fetches and caches it. | |
Returns a default empty structure if fetching fails. | |
""" | |
global _model_cache | |
async with _cache_lock: | |
if _model_cache is None: | |
print("Model cache is empty. Fetching configuration...") | |
_model_cache = await fetch_and_parse_models_config() | |
if _model_cache is None: # If fetching failed, use a default empty structure | |
print("WARNING: Using default empty model configuration due to fetch/parse failure.") | |
_model_cache = {"vertex_models": [], "vertex_express_models": []} | |
return _model_cache | |
async def get_vertex_models() -> List[str]: | |
config = await get_models_config() | |
return config.get("vertex_models", []) | |
async def get_vertex_express_models() -> List[str]: | |
config = await get_models_config() | |
return config.get("vertex_express_models", []) | |
async def refresh_models_config_cache() -> bool: | |
""" | |
Forces a refresh of the model configuration cache. | |
Returns True if successful, False otherwise. | |
""" | |
global _model_cache | |
print("Attempting to refresh model configuration cache...") | |
async with _cache_lock: | |
new_config = await fetch_and_parse_models_config() | |
if new_config is not None: | |
_model_cache = new_config | |
print("Model configuration cache refreshed successfully.") | |
return True | |
else: | |
print("ERROR: Failed to refresh model configuration cache.") | |
# Optionally, decide if we want to clear the old cache or keep it | |
# _model_cache = {"vertex_models": [], "vertex_express_models": []} # To clear | |
return False |