|
|
|
""" |
|
Script to benchmark the performance of different providers for a given model. |
|
|
|
Usage: python model_provider_benchmark.py [--model "model_name"] [--output results.json] [--questions 5] |
|
""" |
|
|
|
import argparse |
|
import json |
|
import time |
|
import os |
|
import requests |
|
from typing import List, Dict, Any, Tuple, Optional |
|
import logging |
|
from datetime import datetime |
|
from dotenv import load_dotenv |
|
from huggingface_hub import model_info |
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", |
|
) |
|
logger = logging.getLogger("provider_benchmark") |
|
|
|
|
|
DEFAULT_MODELS = [ |
|
"Qwen/Qwen2.5-72B-Instruct", |
|
"meta-llama/Llama-3.3-70B-Instruct", |
|
"deepseek-ai/DeepSeek-R1-Distill-Llama-70B", |
|
"Qwen/QwQ-32B", |
|
"mistralai/Mistral-Small-24B-Instruct-2501" |
|
] |
|
|
|
|
|
DEFAULT_QUESTIONS = [ |
|
"What are the key benefits of using distributed systems?", |
|
"Explain the concept of quantum computing in simple terms.", |
|
"What are the ethical considerations in artificial intelligence?", |
|
"Compare and contrast supervised and unsupervised learning.", |
|
"How does blockchain technology ensure security and transparency?" |
|
] |
|
|
|
def get_model_providers(model_name: str) -> List[str]: |
|
""" |
|
Gets all available providers for a given model. |
|
|
|
Args: |
|
model_name: Name of the model on the Hub |
|
|
|
Returns: |
|
List of available providers |
|
""" |
|
try: |
|
info = model_info(model_name, expand="inferenceProviderMapping") |
|
if hasattr(info, "inference_provider_mapping"): |
|
providers = list(info.inference_provider_mapping.keys()) |
|
return providers |
|
else: |
|
logger.warning(f"No providers available for {model_name}") |
|
return [] |
|
except Exception as e: |
|
logger.error(f"Error while retrieving providers for {model_name}: {e}") |
|
return [] |
|
|
|
def query_model( |
|
model: str, |
|
provider: str, |
|
prompt: str, |
|
token: str |
|
) -> Tuple[str, float]: |
|
""" |
|
Sends a request to a model via the Inference Endpoints API. |
|
|
|
Args: |
|
model: Model name |
|
provider: Provider name |
|
prompt: Question to ask |
|
token: HF token for authentication |
|
|
|
Returns: |
|
Tuple containing the response and execution time |
|
""" |
|
headers = { |
|
"Authorization": f"Bearer {token}", |
|
"Content-Type": "application/json" |
|
} |
|
|
|
payload = { |
|
"inputs": prompt, |
|
"parameters": { |
|
"max_new_tokens": 100, |
|
"temperature": 0.7, |
|
"top_p": 0.9, |
|
"do_sample": True, |
|
"provider": provider |
|
} |
|
} |
|
|
|
|
|
api_url = f"https://api-inference.huggingface.co/models/{model}" |
|
|
|
start_time = time.time() |
|
try: |
|
|
|
time.sleep(0.5) |
|
|
|
response = requests.post(api_url, headers=headers, json=payload) |
|
|
|
|
|
if response.status_code != 200: |
|
try: |
|
error_data = response.json() |
|
error_msg = error_data.get("error", str(error_data)) |
|
except: |
|
error_msg = response.text |
|
logger.error(f"Error for {model} ({provider}): {error_msg}") |
|
return f"ERROR: {error_msg}", 0 |
|
|
|
response.raise_for_status() |
|
result = response.json() |
|
|
|
|
|
if isinstance(result, list) and len(result) > 0: |
|
if "generated_text" in result[0]: |
|
answer = result[0]["generated_text"] |
|
else: |
|
answer = str(result) |
|
elif isinstance(result, dict): |
|
if "generated_text" in result: |
|
answer = result["generated_text"] |
|
else: |
|
answer = str(result) |
|
else: |
|
answer = str(result) |
|
|
|
except requests.exceptions.RequestException as e: |
|
error_msg = str(e) |
|
logger.error(f"Error for {model} ({provider}): {error_msg}") |
|
return f"ERROR: {error_msg}", 0 |
|
except Exception as e: |
|
error_msg = str(e) |
|
logger.error(f"Error for {model} ({provider}): {error_msg}") |
|
return f"ERROR: {error_msg}", 0 |
|
|
|
end_time = time.time() |
|
execution_time = end_time - start_time |
|
|
|
return answer, execution_time |
|
|
|
def run_benchmark( |
|
model: str, |
|
questions: List[str] = DEFAULT_QUESTIONS, |
|
output_file: str = None |
|
) -> Optional[List[Dict[str, Any]]]: |
|
""" |
|
Runs a benchmark for all model/provider combinations. |
|
|
|
Args: |
|
model: Name of the model to test |
|
questions: List of questions to ask |
|
output_file: Path to the output JSON file (optional) |
|
|
|
Returns: |
|
List of ranked providers or None in case of error |
|
""" |
|
|
|
load_dotenv() |
|
|
|
|
|
hf_token = os.environ.get("HF_TOKEN") |
|
if not hf_token: |
|
logger.error("HF_TOKEN not defined") |
|
return None |
|
|
|
|
|
providers = get_model_providers(model) |
|
if not providers: |
|
logger.warning(f"No providers for {model}") |
|
return None |
|
|
|
logger.info(f"Testing {model} with providers: {', '.join(providers)}") |
|
|
|
|
|
results = { |
|
"providers": {} |
|
} |
|
|
|
|
|
for provider in providers: |
|
logger.info(f"Provider: {provider}") |
|
provider_results = { |
|
"questions": [], |
|
"total_time": 0, |
|
"average_time": 0, |
|
"success_rate": 0 |
|
} |
|
|
|
successful_queries = 0 |
|
total_time = 0 |
|
|
|
|
|
for i, question in enumerate(questions): |
|
answer, execution_time = query_model( |
|
model=model, |
|
provider=provider, |
|
prompt=question, |
|
token=hf_token |
|
) |
|
|
|
|
|
is_error = answer.startswith("ERROR:") |
|
if not is_error: |
|
successful_queries += 1 |
|
total_time += execution_time |
|
|
|
|
|
provider_results["questions"].append({ |
|
"question": question, |
|
"time": execution_time, |
|
"success": not is_error, |
|
"answer": answer[:100] + "..." if len(answer) > 100 else answer |
|
}) |
|
|
|
|
|
provider_results["total_time"] = total_time |
|
provider_results["average_time"] = total_time / successful_queries if successful_queries > 0 else 0 |
|
provider_results["success_rate"] = successful_queries / len(questions) |
|
|
|
|
|
results["providers"][provider] = provider_results |
|
|
|
|
|
if not any(data["success_rate"] > 0 for data in results["providers"].values()): |
|
logger.warning(f"No successful providers for {model}") |
|
return None |
|
|
|
|
|
sorted_providers = sorted( |
|
results["providers"].items(), |
|
key=lambda x: x[1]["total_time"] if x[1]["success_rate"] > 0 else float('inf') |
|
) |
|
|
|
|
|
return [ |
|
{ |
|
"provider": provider, |
|
"total_time": data["total_time"], |
|
"success_rate": data["success_rate"], |
|
"average_time": data["average_time"] |
|
} |
|
for provider, data in sorted_providers |
|
] |
|
|
|
def display_results(model: str, results: List[Dict[str, Any]]) -> None: |
|
""" |
|
Displays benchmark results in a readable format. |
|
|
|
Args: |
|
model: Model name |
|
results: List of ranked providers |
|
""" |
|
print(f"\n===== Benchmark Results for {model} =====") |
|
print(f"Number of providers tested: {len(results)}") |
|
|
|
print("\nProvider Rankings (fastest to slowest):") |
|
print("-" * 80) |
|
print(f"{'Rank':<6} {'Provider':<20} {'Success Rate':<15} {'Total Time (s)':<20} {'Avg Time (s)':<15}") |
|
print("-" * 80) |
|
|
|
for i, provider_data in enumerate(results, 1): |
|
print(f"{i:<6} {provider_data['provider']:<20} {provider_data['success_rate']*100:>6.1f}% {provider_data['total_time']:>8.2f}s {provider_data['average_time']:>6.2f}s") |
|
|
|
def calculate_model_rankings(all_results: Dict[str, Any]) -> List[Dict[str, Any]]: |
|
""" |
|
Calculates model rankings based on their performance. |
|
|
|
Args: |
|
all_results: Complete benchmark results |
|
|
|
Returns: |
|
List of models ranked by performance |
|
""" |
|
model_rankings = [] |
|
|
|
for model_name, results in all_results["models"].items(): |
|
if results is None: |
|
continue |
|
|
|
|
|
best_provider = None |
|
best_time = float('inf') |
|
best_success_rate = 0 |
|
|
|
for provider_data in results: |
|
if provider_data["success_rate"] >= 0.8: |
|
if provider_data["total_time"] < best_time: |
|
best_time = provider_data["total_time"] |
|
best_success_rate = provider_data["success_rate"] |
|
best_provider = provider_data["provider"] |
|
|
|
if best_provider: |
|
model_rankings.append({ |
|
"model": model_name, |
|
"best_provider": best_provider, |
|
"total_time": best_time, |
|
"success_rate": best_success_rate, |
|
"average_time": best_time / 5 |
|
}) |
|
|
|
|
|
return sorted(model_rankings, key=lambda x: x["total_time"]) |
|
|
|
def display_final_rankings(model_rankings: List[Dict[str, Any]]) -> None: |
|
""" |
|
Displays the final model rankings. |
|
|
|
Args: |
|
model_rankings: List of ranked models |
|
""" |
|
print("\n" + "="*80) |
|
print("FINAL MODEL RANKINGS (fastest to slowest)") |
|
print("="*80) |
|
print(f"{'Rank':<6} {'Model':<40} {'Provider':<20} {'Total Time (s)':<15} {'Success Rate':<15}") |
|
print("-"*80) |
|
|
|
for i, model_data in enumerate(model_rankings, 1): |
|
print(f"{i:<6} {model_data['model']:<40} {model_data['best_provider']:<20} " |
|
f"{model_data['total_time']:>8.2f}s {model_data['success_rate']*100:>6.1f}%") |
|
|
|
def display_final_summary(all_results: Dict[str, Any]) -> None: |
|
""" |
|
Displays a final summary with ranked providers for each model. |
|
|
|
Args: |
|
all_results: Complete benchmark results |
|
""" |
|
print("\n" + "="*100) |
|
print("FINAL SUMMARY OF PROVIDERS BY MODEL") |
|
print("="*100) |
|
|
|
for model_name, results in all_results["models"].items(): |
|
if results is None: |
|
print(f"\n{model_name}:") |
|
print(" No successful providers found") |
|
continue |
|
|
|
print(f"\n{model_name}:") |
|
print(" Successful providers:") |
|
for provider_data in results: |
|
if provider_data["success_rate"] > 0: |
|
print(f" - {provider_data['provider']} (Success rate: {provider_data['success_rate']*100:.1f}%, Avg time: {provider_data['average_time']:.2f}s)") |
|
|
|
|
|
failed_providers = [p for p in results if p["success_rate"] == 0] |
|
if failed_providers: |
|
print(" Failed providers:") |
|
for provider_data in failed_providers: |
|
print(f" - {provider_data['provider']}") |
|
|
|
def main(): |
|
""" |
|
Main entry point for the script. |
|
""" |
|
parser = argparse.ArgumentParser(description="Tests the performance of model providers.") |
|
parser.add_argument("--model", type=str, help="Name of the model to test (if not specified, all default models will be tested)") |
|
parser.add_argument("--output", type=str, default="benchmark_results.json", help="Path to the output JSON file") |
|
parser.add_argument("--questions", type=int, default=5, help="Number of questions to ask (default: 5)") |
|
|
|
args = parser.parse_args() |
|
|
|
|
|
num_questions = min(args.questions, len(DEFAULT_QUESTIONS)) |
|
questions = DEFAULT_QUESTIONS[:num_questions] |
|
|
|
|
|
models_to_test = [args.model] if args.model else DEFAULT_MODELS |
|
|
|
|
|
all_results = { |
|
"timestamp": datetime.now().isoformat(), |
|
"models": {} |
|
} |
|
|
|
|
|
for model in models_to_test: |
|
logger.info(f"\nModel: {model}") |
|
results = run_benchmark( |
|
model=model, |
|
questions=questions, |
|
output_file=None |
|
) |
|
all_results["models"][model] = results |
|
|
|
|
|
with open(args.output, "w") as f: |
|
json.dump(all_results, f, indent=2) |
|
logger.info(f"\nResults saved to {args.output}") |
|
|
|
|
|
display_final_summary(all_results) |
|
|
|
if __name__ == "__main__": |
|
main() |