|
""" |
|
Task to create and save the configuration file |
|
""" |
|
import os |
|
import pathlib |
|
import uuid |
|
import yaml |
|
import shutil |
|
import time |
|
import threading |
|
from typing import Optional, Dict, Any, List, Tuple |
|
|
|
from loguru import logger |
|
from huggingface_hub import HfApi |
|
|
|
from tasks.get_available_model_provider import get_available_model_provider |
|
from config.models_config import ( |
|
DEFAULT_BENCHMARK_MODEL, |
|
BENCHMARK_MODEL_ROLES, |
|
DEFAULT_BENCHMARK_TIMEOUT, |
|
PREFERRED_PROVIDERS, |
|
ALTERNATIVE_BENCHMARK_MODELS, |
|
) |
|
|
|
|
|
class CreateBenchConfigTask: |
|
""" |
|
Task to create and save a configuration file for YourbenchSimpleDemo |
|
""" |
|
|
|
def __init__(self, session_uid: Optional[str] = None, timeout: float = None): |
|
""" |
|
Initialize the task with a session ID |
|
|
|
Args: |
|
session_uid: Optional session ID, will be generated if None |
|
timeout: Timeout in seconds for benchmark operations (if None, uses default) |
|
""" |
|
self.session_uid = session_uid or str(uuid.uuid4()) |
|
self.logs: List[str] = [] |
|
self.is_completed = False |
|
self.is_running_flag = threading.Event() |
|
self.thread = None |
|
self.timeout = timeout if timeout is not None else DEFAULT_BENCHMARK_TIMEOUT |
|
self._add_log("[INFO] Initializing configuration creation task") |
|
|
|
def _add_log(self, message: str) -> None: |
|
""" |
|
Add a log message to the logs list |
|
|
|
Args: |
|
message: Log message to add |
|
""" |
|
if message not in self.logs: |
|
self.logs.append(message) |
|
|
|
self.logs = self.logs.copy() |
|
|
|
logger.info(f"[{self.session_uid}] {message}") |
|
|
|
def get_logs(self) -> List[str]: |
|
""" |
|
Get all logs for this task |
|
|
|
Returns: |
|
List of log messages |
|
""" |
|
return self.logs.copy() |
|
|
|
def save_uploaded_file(self, file_path: str) -> str: |
|
""" |
|
Process the uploaded file that is already in the correct directory |
|
|
|
Args: |
|
file_path: Path to the uploaded file |
|
|
|
Returns: |
|
Path to the file (same as input) |
|
""" |
|
try: |
|
|
|
|
|
self._add_log(f"[INFO] Processing file: {os.path.basename(file_path)}") |
|
return file_path |
|
except Exception as e: |
|
error_msg = f"Error processing file: {str(e)}" |
|
self._add_log(f"[ERROR] {error_msg}") |
|
raise RuntimeError(error_msg) |
|
|
|
def get_model_provider(self, model_name: str) -> Optional[str]: |
|
""" |
|
Get the available provider for a model |
|
|
|
Args: |
|
model_name: Name of the model to check |
|
|
|
Returns: |
|
Available provider or None if none found |
|
""" |
|
self._add_log(f"[INFO] Finding available provider for {model_name}") |
|
|
|
|
|
provider = get_available_model_provider(model_name, verbose=True) |
|
|
|
if provider: |
|
self._add_log(f"[INFO] Found provider for {model_name}: {provider}") |
|
return provider |
|
|
|
|
|
|
|
from huggingface_hub import model_info |
|
from tasks.get_available_model_provider import test_provider |
|
|
|
self._add_log(f"[WARNING] No preferred provider found for {model_name}, trying all available providers...") |
|
|
|
try: |
|
|
|
info = model_info(model_name, expand="inferenceProviderMapping") |
|
if hasattr(info, "inference_provider_mapping"): |
|
providers = list(info.inference_provider_mapping.keys()) |
|
|
|
|
|
other_providers = [p for p in providers if p not in PREFERRED_PROVIDERS] |
|
|
|
if other_providers: |
|
self._add_log(f"[INFO] Testing additional providers: {', '.join(other_providers)}") |
|
|
|
|
|
for provider in other_providers: |
|
self._add_log(f"[INFO] Testing provider {provider}") |
|
if test_provider(model_name, provider, verbose=True): |
|
self._add_log(f"[INFO] Found alternative provider for {model_name}: {provider}") |
|
return provider |
|
except Exception as e: |
|
self._add_log(f"[WARNING] Error while testing additional providers: {str(e)}") |
|
|
|
self._add_log(f"[WARNING] No available provider found for {model_name}") |
|
return None |
|
|
|
def generate_base_config(self, hf_org: str, hf_dataset_name: str) -> Dict[str, Any]: |
|
""" |
|
Create the base configuration dictionary |
|
|
|
Args: |
|
hf_org: Hugging Face organization name |
|
hf_dataset_name: Hugging Face dataset name |
|
|
|
Returns: |
|
Configuration dictionary |
|
""" |
|
self._add_log(f"[INFO] Generating base configuration for {hf_dataset_name}") |
|
|
|
|
|
hf_token = os.getenv("HF_TOKEN") |
|
if not hf_token: |
|
raise RuntimeError("HF_TOKEN environment variable is not defined") |
|
|
|
|
|
provider = self.get_model_provider(DEFAULT_BENCHMARK_MODEL) |
|
|
|
|
|
selected_model = DEFAULT_BENCHMARK_MODEL |
|
|
|
if not provider: |
|
self._add_log(f"[WARNING] Primary model {DEFAULT_BENCHMARK_MODEL} not available. Trying alternatives...") |
|
|
|
|
|
for alt_model in ALTERNATIVE_BENCHMARK_MODELS: |
|
self._add_log(f"[INFO] Trying alternative model: {alt_model}") |
|
alt_provider = self.get_model_provider(alt_model) |
|
if alt_provider: |
|
self._add_log(f"[INFO] Found working alternative model: {alt_model} with provider: {alt_provider}") |
|
selected_model = alt_model |
|
provider = alt_provider |
|
break |
|
|
|
|
|
if not provider: |
|
error_msg = "No model with available provider found. Cannot proceed with benchmark." |
|
self._add_log(f"[ERROR] {error_msg}") |
|
raise RuntimeError(error_msg) |
|
|
|
|
|
model_list = [{ |
|
"model_name": selected_model, |
|
"provider": provider, |
|
"api_key": "$HF_TOKEN", |
|
"max_concurrent_requests": 32, |
|
}] |
|
|
|
|
|
model_roles = dict(BENCHMARK_MODEL_ROLES) |
|
if selected_model != DEFAULT_BENCHMARK_MODEL: |
|
for role in model_roles: |
|
if role != "chunking": |
|
model_roles[role] = [selected_model] |
|
|
|
self._add_log(f"[INFO] Updated model roles to use {selected_model}") |
|
|
|
|
|
self._add_log("[INFO] Finalizing provider check...") |
|
time.sleep(2) |
|
|
|
|
|
self._add_log("[SUCCESS] Stage completed: provider_check") |
|
|
|
return { |
|
"hf_configuration": { |
|
"token": "$HF_TOKEN", |
|
"hf_organization": "$HF_ORGANIZATION", |
|
"private": True, |
|
"hf_dataset_name": hf_dataset_name, |
|
"concat_if_exist": False, |
|
"timeout": self.timeout, |
|
}, |
|
"model_list": model_list, |
|
|
|
"model_roles": model_roles, |
|
"pipeline": { |
|
"ingestion": { |
|
"source_documents_dir": f"uploaded_files/{self.session_uid}/uploaded_files/", |
|
"output_dir": f"uploaded_files/{self.session_uid}/ingested", |
|
"run": True, |
|
"timeout": self.timeout, |
|
}, |
|
"upload_ingest_to_hub": { |
|
"source_documents_dir": f"uploaded_files/{self.session_uid}/ingested", |
|
"run": True, |
|
"timeout": self.timeout, |
|
}, |
|
"summarization": { |
|
"run": True, |
|
"timeout": self.timeout, |
|
}, |
|
"chunking": { |
|
"run": True, |
|
"timeout": self.timeout, |
|
"chunking_configuration": { |
|
"l_min_tokens": 64, |
|
"l_max_tokens": 128, |
|
"tau_threshold": 0.8, |
|
"h_min": 2, |
|
"h_max": 5, |
|
"num_multihops_factor": 1, |
|
}, |
|
}, |
|
"single_shot_question_generation": { |
|
"run": True, |
|
"timeout": self.timeout, |
|
"additional_instructions": "Generate rich and creative questions to test a curious adult", |
|
"chunk_sampling": { |
|
"mode": "count", |
|
"value": 5, |
|
"random_seed": 123, |
|
}, |
|
}, |
|
"multi_hop_question_generation": { |
|
"run": False, |
|
"timeout": self.timeout, |
|
}, |
|
"lighteval": { |
|
"run": False, |
|
"timeout": self.timeout, |
|
}, |
|
}, |
|
} |
|
|
|
def save_yaml_file(self, config: Dict[str, Any], path: str) -> str: |
|
""" |
|
Save the given configuration dictionary to a YAML file |
|
|
|
Args: |
|
config: Configuration dictionary |
|
path: Path to save the file |
|
|
|
Returns: |
|
Path to the saved file |
|
""" |
|
try: |
|
|
|
os.makedirs(os.path.dirname(path), exist_ok=True) |
|
|
|
with open(path, "w") as file: |
|
yaml.dump(config, file, default_flow_style=False, sort_keys=False) |
|
|
|
self._add_log(f"[INFO] Configuration saved: {path}") |
|
return path |
|
except Exception as e: |
|
error_msg = f"Error saving configuration: {str(e)}" |
|
self._add_log(f"[ERROR] {error_msg}") |
|
raise RuntimeError(error_msg) |
|
|
|
def _run_task(self, file_path: str) -> str: |
|
""" |
|
Internal method to run the task in a separate thread |
|
|
|
Args: |
|
file_path: Path to the uploaded file |
|
|
|
Returns: |
|
Path to the configuration file |
|
""" |
|
try: |
|
|
|
org_name = os.getenv("HF_ORGANIZATION") |
|
|
|
|
|
hf_token = os.getenv("HF_TOKEN") |
|
if not hf_token: |
|
raise RuntimeError("HF_TOKEN environment variable is not defined") |
|
|
|
self._add_log(f"[INFO] Organization: {org_name}") |
|
|
|
time.sleep(0.5) |
|
|
|
|
|
saved_file_path = self.save_uploaded_file(file_path) |
|
|
|
time.sleep(1) |
|
|
|
|
|
config_dir = pathlib.Path(f"uploaded_files/{self.session_uid}") |
|
config_path = config_dir / "config.yml" |
|
|
|
|
|
dataset_name = f"yourbench_{self.session_uid}" |
|
self._add_log(f"[INFO] Dataset name: {dataset_name}") |
|
|
|
time.sleep(0.8) |
|
|
|
|
|
self._add_log("[INFO] Finding available providers for models...") |
|
|
|
|
|
config = self.generate_base_config(org_name, dataset_name) |
|
|
|
time.sleep(1.2) |
|
|
|
config_file_path = self.save_yaml_file(config, str(config_path)) |
|
|
|
self._add_log(f"[INFO] Configuration generated successfully: {config_file_path}") |
|
|
|
|
|
time.sleep(1.5) |
|
self._add_log("[INFO] Starting ingestion") |
|
|
|
time.sleep(2) |
|
self._add_log(f"[INFO] Processing file: {dataset_name}") |
|
|
|
time.sleep(2) |
|
self._add_log("[SUCCESS] Stage completed: config_generation") |
|
|
|
|
|
self.mark_task_completed() |
|
|
|
return str(config_path) |
|
except Exception as e: |
|
error_msg = f"Error generating configuration: {str(e)}" |
|
self._add_log(f"[ERROR] {error_msg}") |
|
self.mark_task_completed() |
|
raise RuntimeError(error_msg) |
|
|
|
def run(self, file_path: str, token: Optional[str] = None, timeout: Optional[float] = None) -> str: |
|
""" |
|
Run the task to create and save the configuration file asynchronously |
|
|
|
Args: |
|
file_path: Path to the uploaded file |
|
token: Hugging Face token (not used, using HF_TOKEN from environment) |
|
timeout: Timeout in seconds for benchmark operations (if None, uses default) |
|
|
|
Returns: |
|
Path to the configuration file |
|
""" |
|
|
|
if timeout is not None: |
|
self.timeout = timeout |
|
|
|
|
|
self.is_running_flag.set() |
|
|
|
|
|
try: |
|
config_path = self._run_task(file_path) |
|
return config_path |
|
except Exception as e: |
|
error_msg = f"Error generating configuration: {str(e)}" |
|
self._add_log(f"[ERROR] {error_msg}") |
|
self.mark_task_completed() |
|
raise RuntimeError(error_msg) |
|
|
|
def is_running(self) -> bool: |
|
""" |
|
Check if the task is running |
|
|
|
Returns: |
|
True if running, False otherwise |
|
""" |
|
return self.is_running_flag.is_set() and not self.is_completed |
|
|
|
def is_task_completed(self) -> bool: |
|
""" |
|
Check if the task is completed |
|
|
|
Returns: |
|
True if completed, False otherwise |
|
""" |
|
return self.is_completed |
|
|
|
def mark_task_completed(self) -> None: |
|
""" |
|
Mark the task as completed |
|
""" |
|
self.is_completed = True |
|
self.is_running_flag.clear() |
|
self._add_log("[INFO] Configuration generation task completed") |