|
import os |
|
import logging |
|
import json |
|
from huggingface_hub import model_info, InferenceClient |
|
from dotenv import load_dotenv |
|
from config.models_config import PREFERRED_PROVIDERS, DEFAULT_BENCHMARK_MODEL, ALTERNATIVE_BENCHMARK_MODELS |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") |
|
logger = logging.getLogger(__name__) |
|
|
|
def prioritize_providers(providers): |
|
"""Prioritize preferred providers, keeping all others.""" |
|
return sorted(providers, key=lambda provider: provider not in PREFERRED_PROVIDERS) |
|
|
|
def test_provider(model_name: str, provider: str, verbose: bool = False) -> bool: |
|
""" |
|
Test if a specific provider is available for a model using InferenceClient |
|
|
|
Args: |
|
model_name: Name of the model |
|
provider: Provider to test |
|
verbose: Whether to log detailed information |
|
|
|
Returns: |
|
True if the provider is available, False otherwise |
|
""" |
|
|
|
try: |
|
load_dotenv() |
|
|
|
|
|
hf_token = os.environ.get("HF_TOKEN") |
|
if not hf_token: |
|
if verbose: |
|
logger.warning("No HF_TOKEN found in environment variables. This will likely cause authentication failures.") |
|
print("WARNING: HF_TOKEN is missing. Most model providers require valid authentication.") |
|
|
|
return _test_provider_without_token(model_name, provider, verbose) |
|
|
|
|
|
hf_organization = os.environ.get("HF_ORGANIZATION") |
|
if not hf_organization: |
|
if verbose: |
|
logger.warning("HF_ORGANIZATION not defined in environment") |
|
|
|
if verbose: |
|
logger.info(f"Testing provider {provider} for model {model_name}") |
|
|
|
|
|
try: |
|
client = InferenceClient( |
|
model=model_name, |
|
token=hf_token, |
|
provider=provider, |
|
|
|
timeout=3 |
|
) |
|
|
|
try: |
|
|
|
response = client.chat_completion( |
|
messages=[{"role": "user", "content": "Hello"}], |
|
max_tokens=5 |
|
) |
|
|
|
if verbose: |
|
logger.info(f"Provider {provider} is available for {model_name}") |
|
return True |
|
|
|
except Exception as e: |
|
if verbose: |
|
error_message = str(e) |
|
logger.warning(f"Error with provider {provider}: {error_message}") |
|
|
|
|
|
if "status_code=429" in error_message: |
|
logger.warning(f"Provider {provider} rate limited. You may need to wait or upgrade your plan.") |
|
elif "status_code=401" in error_message or "status_code=403" in error_message: |
|
logger.warning(f"Authentication failed for provider {provider}. Your HF_TOKEN may be invalid or expired.") |
|
print(f"Authentication error with provider {provider}. Please check your HF_TOKEN.") |
|
|
|
if verbose: |
|
logger.info(f"Trying provider {provider} without authentication") |
|
return _test_provider_without_token(model_name, provider, verbose) |
|
elif "status_code=503" in error_message: |
|
logger.warning(f"Provider {provider} service unavailable. Model may be loading or provider is down.") |
|
elif "timed out" in error_message.lower(): |
|
logger.warning(f"Timeout error with provider {provider} - request timed out after 10 seconds") |
|
return False |
|
except Exception as auth_error: |
|
if "401" in str(auth_error) or "Unauthorized" in str(auth_error): |
|
|
|
if verbose: |
|
logger.warning(f"Authentication error with {provider}: {str(auth_error)}. Your HF_TOKEN may be invalid.") |
|
print(f"Authentication error detected. Please verify your HF_TOKEN is valid and has appropriate permissions.") |
|
return _test_provider_without_token(model_name, provider, verbose) |
|
else: |
|
if verbose: |
|
logger.warning(f"Error creating client for {provider}: {str(auth_error)}") |
|
return False |
|
|
|
except Exception as e: |
|
if verbose: |
|
logger.warning(f"Error in test_provider: {str(e)}") |
|
return False |
|
|
|
def _test_provider_without_token(model_name: str, provider: str, verbose: bool = False) -> bool: |
|
""" |
|
Essaye de tester un provider sans token d'authentification |
|
|
|
Args: |
|
model_name: Nom du modèle |
|
provider: Provider à tester |
|
verbose: Afficher les logs détaillés |
|
|
|
Returns: |
|
True si le provider est disponible, False sinon |
|
""" |
|
try: |
|
if verbose: |
|
logger.info(f"Testing provider {provider} for model {model_name} without authentication") |
|
|
|
|
|
client = InferenceClient( |
|
model=model_name, |
|
provider=provider, |
|
timeout=3 |
|
) |
|
|
|
try: |
|
|
|
response = client.chat_completion( |
|
messages=[{"role": "user", "content": "Hello"}], |
|
max_tokens=5 |
|
) |
|
|
|
if verbose: |
|
logger.info(f"Provider {provider} is available for {model_name} without authentication") |
|
return True |
|
|
|
except Exception as e: |
|
if verbose: |
|
logger.warning(f"Error with provider {provider} without authentication: {str(e)}") |
|
return False |
|
|
|
except Exception as e: |
|
if verbose: |
|
logger.warning(f"Error in _test_provider_without_token: {str(e)}") |
|
return False |
|
|
|
def get_available_model_provider(model_name, verbose=False): |
|
""" |
|
Get the first available provider for a given model. |
|
|
|
Args: |
|
model_name: Name of the model on the Hub |
|
verbose: Whether to log detailed information |
|
|
|
Returns: |
|
First available provider or None if none are available |
|
""" |
|
try: |
|
|
|
hf_token = os.environ.get("HF_TOKEN") |
|
if not hf_token: |
|
if verbose: |
|
logger.error("HF_TOKEN not defined in environment") |
|
raise ValueError("HF_TOKEN not defined in environment") |
|
|
|
|
|
info = None |
|
try: |
|
|
|
try: |
|
if verbose: |
|
logger.info(f"Trying to get model info for {model_name} with auth token") |
|
info = model_info(model_name, token=hf_token, expand="inferenceProviderMapping") |
|
except Exception as auth_error: |
|
|
|
if "401" in str(auth_error) or "Unauthorized" in str(auth_error): |
|
if verbose: |
|
logger.warning(f"Authentication failed for {model_name}, trying without token") |
|
|
|
try: |
|
info = model_info(model_name, expand="inferenceProviderMapping") |
|
except Exception as e: |
|
if verbose: |
|
logger.error(f"Failed to get model info without token: {str(e)}") |
|
|
|
if verbose: |
|
logger.warning(f"Using default providers list as fallback for {model_name}") |
|
|
|
return _test_fallback_providers(model_name, verbose) |
|
else: |
|
|
|
raise auth_error |
|
|
|
if not info or not hasattr(info, "inference_provider_mapping"): |
|
if verbose: |
|
logger.info(f"No inference providers found for {model_name}") |
|
|
|
return _test_fallback_providers(model_name, verbose) |
|
|
|
providers = list(info.inference_provider_mapping.keys()) |
|
if not providers: |
|
if verbose: |
|
logger.info(f"Empty list of providers for {model_name}") |
|
|
|
return _test_fallback_providers(model_name, verbose) |
|
|
|
except Exception as e: |
|
if verbose: |
|
logger.error(f"Error retrieving model info for {model_name}: {str(e)}") |
|
|
|
return _test_fallback_providers(model_name, verbose) |
|
|
|
|
|
prioritized_providers = prioritize_providers(providers) |
|
|
|
if verbose: |
|
logger.info(f"Available providers for {model_name}: {', '.join(providers)}") |
|
logger.info(f"Prioritized providers: {', '.join(prioritized_providers)}") |
|
|
|
|
|
failed_providers = [] |
|
for provider in prioritized_providers: |
|
if verbose: |
|
logger.info(f"Testing provider {provider} for {model_name}") |
|
|
|
try: |
|
if test_provider(model_name, provider, verbose): |
|
if verbose: |
|
logger.info(f"Provider {provider} is available for {model_name}") |
|
return provider |
|
else: |
|
failed_providers.append(provider) |
|
if verbose: |
|
logger.warning(f"Provider {provider} test failed for {model_name}") |
|
except Exception as e: |
|
failed_providers.append(provider) |
|
if verbose: |
|
logger.error(f"Exception while testing provider {provider} for {model_name}: {str(e)}") |
|
|
|
|
|
remaining_providers = [p for p in providers if p not in prioritized_providers and p not in failed_providers] |
|
|
|
if remaining_providers and verbose: |
|
logger.info(f"Trying remaining non-prioritized providers: {', '.join(remaining_providers)}") |
|
|
|
for provider in remaining_providers: |
|
if verbose: |
|
logger.info(f"Testing non-prioritized provider {provider} for {model_name}") |
|
|
|
try: |
|
if test_provider(model_name, provider, verbose): |
|
if verbose: |
|
logger.info(f"Non-prioritized provider {provider} is available for {model_name}") |
|
return provider |
|
except Exception as e: |
|
if verbose: |
|
logger.error(f"Exception while testing non-prioritized provider {provider}: {str(e)}") |
|
|
|
|
|
if verbose: |
|
logger.error(f"No available providers for {model_name}. Tried {len(failed_providers + remaining_providers)} providers.") |
|
return None |
|
|
|
except Exception as e: |
|
if verbose: |
|
logger.error(f"Error in get_available_model_provider: {str(e)}") |
|
return None |
|
|
|
def _test_fallback_providers(model_name, verbose=False): |
|
""" |
|
Fonction de secours qui teste une liste de providers communs sans passer par l'API |
|
|
|
Args: |
|
model_name: Nom du modèle |
|
verbose: Afficher les logs détaillés |
|
|
|
Returns: |
|
Le premier provider disponible ou None |
|
""" |
|
|
|
default_providers = ["huggingface", "sambanova", "novita", "fireworks-ai", "together", "openai", "anthropic"] |
|
|
|
if verbose: |
|
logger.warning(f"Using fallback providers list for {model_name}: {', '.join(default_providers)}") |
|
|
|
|
|
for provider in default_providers: |
|
if verbose: |
|
logger.info(f"Testing fallback provider {provider} for {model_name}") |
|
try: |
|
if test_provider(model_name, provider, verbose): |
|
if verbose: |
|
logger.info(f"FALLBACK: Provider {provider} is available for {model_name}") |
|
return provider |
|
except Exception as e: |
|
if verbose: |
|
logger.warning(f"FALLBACK: Error testing provider {provider} for {model_name}: {str(e)}") |
|
|
|
return None |
|
|
|
def test_models(verbose=True): |
|
""" |
|
Test le modèle par défaut et les modèles alternatifs, puis retourne un résumé des résultats. |
|
|
|
Args: |
|
verbose: Afficher les logs détaillés |
|
|
|
Returns: |
|
Un dictionnaire avec les résultats des tests |
|
""" |
|
results = { |
|
"default_model": None, |
|
"working_model": None, |
|
"provider": None, |
|
"all_models": {}, |
|
"available_models": [], |
|
"unavailable_models": [] |
|
} |
|
|
|
print("\n===== Checking HuggingFace Authentication =====") |
|
|
|
hf_token = os.environ.get("HF_TOKEN") |
|
if hf_token: |
|
print("✅ HF_TOKEN is available") |
|
|
|
|
|
if not hf_token.startswith("hf_"): |
|
print("⚠️ WARNING: Your HF_TOKEN does not start with 'hf_' which is unusual. Please verify its format.") |
|
|
|
|
|
masked_token = "••••••••••" |
|
|
|
|
|
import requests |
|
try: |
|
|
|
test_model = "gpt2" |
|
api_url = f"https://api-inference.huggingface.co/models/{test_model}" |
|
|
|
print(f"Testing token with inference API on public model {test_model}...") |
|
|
|
headers = {"Authorization": f"Bearer {hf_token}"} |
|
payload = {"inputs": "Hello, how are you?"} |
|
|
|
response = requests.post(api_url, headers=headers, json=payload, timeout=10) |
|
|
|
if response.status_code in [200, 503]: |
|
print(f"✅ HF_TOKEN validated - Token accepted by the inference API! Status: {response.status_code}") |
|
if response.status_code == 503: |
|
print("ℹ️ Model is loading, but token is valid") |
|
|
|
|
|
|
|
try: |
|
whoami_response = requests.get( |
|
"https://huggingface.co/api/whoami", |
|
headers={"Authorization": f"Bearer {hf_token}"} |
|
) |
|
|
|
if whoami_response.status_code == 200: |
|
user_info = whoami_response.json() |
|
print(f"✅ Additional info - Authenticated as: {user_info.get('name', 'Unknown user')}") |
|
|
|
|
|
if user_info.get('canPay', False): |
|
print("✅ Your account has payment methods configured - you may have access to premium models") |
|
else: |
|
print("ℹ️ Your account does not have payment methods configured - access to premium models may be limited") |
|
except Exception: |
|
|
|
pass |
|
else: |
|
print(f"❌ HF_TOKEN validation failed with status code: {response.status_code}") |
|
error_message = "Unknown error" |
|
try: |
|
error_data = response.json() |
|
if "error" in error_data: |
|
error_message = error_data["error"] |
|
print(f"❌ Error message: {error_message}") |
|
except: |
|
print(f"❌ Error message: {response.text}") |
|
|
|
print("⚠️ Most model providers will not work with invalid credentials") |
|
|
|
|
|
try: |
|
print("Attempting alternative validation with status endpoint...") |
|
status_url = "https://api-inference.huggingface.co/status" |
|
status_response = requests.get(status_url, headers=headers, timeout=10) |
|
|
|
if status_response.status_code == 200: |
|
print("✅ Token can access the status endpoint. This is partially good news.") |
|
else: |
|
print(f"❌ Status endpoint test also failed: {status_response.status_code}") |
|
except Exception as e: |
|
print(f"❌ Alternative validation also failed: {str(e)}") |
|
except Exception as e: |
|
print(f"❌ Error validating HF_TOKEN with inference API: {str(e)}") |
|
else: |
|
print("❌ HF_TOKEN is missing - authentication to HuggingFace API will fail") |
|
print("⚠️ Most models and providers require authentication") |
|
|
|
|
|
hf_organization = os.environ.get("HF_ORGANIZATION") |
|
if hf_organization: |
|
print(f"✅ HF_ORGANIZATION is available: {hf_organization}") |
|
else: |
|
print("ℹ️ HF_ORGANIZATION is not set") |
|
|
|
if verbose: |
|
print(f"\n===== Testing main default model: {DEFAULT_BENCHMARK_MODEL} =====") |
|
|
|
|
|
provider = get_available_model_provider(DEFAULT_BENCHMARK_MODEL, verbose=verbose) |
|
|
|
if provider: |
|
if verbose: |
|
print(f"\n✅ SUCCESS: Found provider for default model {DEFAULT_BENCHMARK_MODEL}: {provider}") |
|
results["default_model"] = DEFAULT_BENCHMARK_MODEL |
|
results["working_model"] = DEFAULT_BENCHMARK_MODEL |
|
results["provider"] = provider |
|
else: |
|
if verbose: |
|
print(f"\n❌ DEFAULT MODEL FAILED: No provider found for {DEFAULT_BENCHMARK_MODEL}") |
|
print("Trying alternative models...") |
|
|
|
|
|
for alt_model in ALTERNATIVE_BENCHMARK_MODELS: |
|
if verbose: |
|
print(f"\nTrying alternative model: {alt_model}") |
|
alt_provider = get_available_model_provider(alt_model, verbose=verbose) |
|
if alt_provider: |
|
if verbose: |
|
print(f"\n✅ SUCCESS: Found provider for alternative model {alt_model}: {alt_provider}") |
|
results["working_model"] = alt_model |
|
results["provider"] = alt_provider |
|
break |
|
elif verbose: |
|
print(f"❌ Failed to find provider for alternative model: {alt_model}") |
|
else: |
|
if verbose: |
|
print("\n❌ ALL MODELS FAILED: No provider found for any model") |
|
print("\n⚠️ This is likely due to authentication issues with your HF_TOKEN") |
|
print("⚠️ Please check your token or try using models that don't require authentication") |
|
|
|
|
|
models = [ |
|
"Qwen/QwQ-32B", |
|
"Qwen/Qwen2.5-72B-Instruct", |
|
"Qwen/Qwen2.5-32B-Instruct", |
|
"meta-llama/Llama-3.1-8B-Instruct", |
|
"meta-llama/Llama-3.3-70B-Instruct", |
|
"deepseek-ai/DeepSeek-R1-Distill-Llama-70B", |
|
"mistralai/Mistral-Small-24B-Instruct-2501", |
|
] |
|
|
|
if verbose: |
|
print("\n===== Testing all available models =====") |
|
|
|
for model in models: |
|
provider = get_available_model_provider(model, verbose) |
|
results["all_models"][model] = provider |
|
if provider: |
|
results["available_models"].append((model, provider)) |
|
else: |
|
results["unavailable_models"].append(model) |
|
|
|
if verbose: |
|
print("\n===== Results Summary =====") |
|
if results["available_models"]: |
|
print("Models with available providers:") |
|
for model, provider in results["available_models"]: |
|
print(f"✅ Model: {model}, Provider: {provider}") |
|
else: |
|
print("❌ No models with available providers found") |
|
print("⚠️ Please check your HF_TOKEN and permissions") |
|
|
|
if results["unavailable_models"]: |
|
print("\nModels with no available providers:") |
|
for model in results["unavailable_models"]: |
|
print(f"❌ {model}") |
|
|
|
print(f"\nTotal Available Models: {len(results['available_models'])}") |
|
print(f"Total Unavailable Models: {len(results['unavailable_models'])}") |
|
|
|
return results |
|
|
|
if __name__ == "__main__": |
|
|
|
test_results = test_models(verbose=True) |