|
""" |
|
Task to ingest and transform documents to markdown using yourbench |
|
""" |
|
import os |
|
import time |
|
import pathlib |
|
import subprocess |
|
import threading |
|
from typing import Optional, List, Tuple, Dict, Any |
|
import yaml |
|
|
|
from loguru import logger |
|
|
|
|
|
class CreateBenchTask: |
|
""" |
|
Task to ingest and transform documents to markdown using yourbench |
|
""" |
|
|
|
def __init__(self, session_uid: str, config_path: Optional[str] = None): |
|
""" |
|
Initialize the ingestion task |
|
|
|
Args: |
|
session_uid: Session ID for this task |
|
config_path: Path to the configuration file, will be generated if None |
|
""" |
|
self.session_uid = session_uid |
|
self.logs: List[str] = [] |
|
self.is_completed = False |
|
self.process = None |
|
self.is_running_flag = threading.Event() |
|
|
|
|
|
if config_path is None: |
|
config_path = f"uploaded_files/{session_uid}/config.yml" |
|
self.config_path = config_path |
|
|
|
|
|
self.command = ["yourbench", "run", "--config", str(self.config_path)] |
|
|
|
self._add_log("[INFO] Initializing ingestion task") |
|
self._add_log(f"[INFO] Using configuration file: {self.config_path}") |
|
|
|
def _add_log(self, message: str) -> None: |
|
""" |
|
Add a log message to the logs list |
|
|
|
Args: |
|
message: Log message to add |
|
""" |
|
if message not in self.logs: |
|
self.logs.append(message) |
|
|
|
self.logs = self.logs.copy() |
|
|
|
logger.info(f"[{self.session_uid}] {message}") |
|
|
|
def get_logs(self) -> List[str]: |
|
""" |
|
Get all logs for this task |
|
|
|
Returns: |
|
List of log messages |
|
""" |
|
return self.logs.copy() |
|
|
|
def is_task_completed(self) -> bool: |
|
""" |
|
Check if the task is completed |
|
|
|
Returns: |
|
True if completed, False otherwise |
|
""" |
|
return self.is_completed |
|
|
|
def is_running(self) -> bool: |
|
""" |
|
Check if the process is running |
|
|
|
Returns: |
|
True if running, False otherwise |
|
""" |
|
return self.is_running_flag.is_set() |
|
|
|
def stop(self) -> None: |
|
""" |
|
Stop the process if it's running |
|
""" |
|
if self.process and self.is_running(): |
|
self._add_log("[INFO] Stopping ingestion process") |
|
try: |
|
self.process.terminate() |
|
|
|
self.process.wait(timeout=5) |
|
except subprocess.TimeoutExpired: |
|
self._add_log("[WARN] Process not responding, forcing termination") |
|
self.process.kill() |
|
finally: |
|
self.is_running_flag.clear() |
|
self.is_completed = True |
|
self._add_log("[INFO] Ingestion process stopped") |
|
|
|
def _capture_output(self) -> None: |
|
""" |
|
Capture and process the output from the yourbench process |
|
""" |
|
self._add_log("[INFO] Starting output capture") |
|
|
|
|
|
rate_limit_detected = False |
|
|
|
json_errors_detected = False |
|
|
|
try: |
|
while self.is_running() and self.process: |
|
line = self.process.stdout.readline() |
|
if not line: |
|
|
|
if self.process.poll() is not None: |
|
self.is_running_flag.clear() |
|
break |
|
|
|
time.sleep(0.1) |
|
continue |
|
|
|
|
|
line = line.strip() |
|
if line: |
|
|
|
if ("too many requests" in line.lower() or |
|
"rate limit" in line.lower() or |
|
"429" in line or |
|
"too many concurrent requests" in line.lower()): |
|
rate_limit_detected = True |
|
self._add_log("[ERROR] RATE_LIMIT_EXCEEDED: The demo is under heavy load at the moment.") |
|
|
|
|
|
if ("JSONDecodeError" in line or |
|
"Error processing QA pair" in line or |
|
"'str' object has no attribute 'get'" in line): |
|
json_errors_detected = True |
|
|
|
self._add_log(f"[WARN] Non-critical JSON error: {line}") |
|
continue |
|
|
|
|
|
self._add_log(f"[DEBUG] Raw output: {line}") |
|
|
|
if "ERROR" in line: |
|
self._add_log(f"[ERROR] {line}") |
|
elif "WARNING" in line: |
|
self._add_log(f"[WARN] {line}") |
|
|
|
if "No valid questions produced in single_shot_question_generation" in line: |
|
self._add_log("[ERROR] Failed to generate benchmark: The document does not contain enough information to generate a meaningful benchmark. Please try with a more detailed document.") |
|
else: |
|
|
|
if "Completed stage:" in line: |
|
|
|
stage = line.split("'")[1] if "'" in line else line.split("Completed stage:")[1].strip() |
|
|
|
stage = self._standardize_stage_name(stage) |
|
self._add_log(f"[SUCCESS] Stage completed: {stage}") |
|
|
|
elif "Successfully completed 'upload_ingest_to_hub' stage" in line: |
|
self._add_log(f"[SUCCESS] Stage completed: upload_ingest_to_hub") |
|
else: |
|
self._add_log(f"[INFO] {line}") |
|
|
|
|
|
if self.process: |
|
exit_code = self.process.poll() |
|
if exit_code == 0 or json_errors_detected: |
|
|
|
if json_errors_detected: |
|
self._add_log("[INFO] Benchmark completed with non-critical JSON errors, considered successful") |
|
else: |
|
self._add_log("[SUCCESS] Benchmark process completed successfully") |
|
else: |
|
|
|
if rate_limit_detected: |
|
self._add_log("[ERROR] Benchmark process failed due to API rate limiting. The demo is under heavy load at the moment.") |
|
|
|
|
|
|
|
self._add_log("[INFO] Benchmark process completed with errors") |
|
except Exception as e: |
|
self._add_log(f"[ERROR] Error during output capture: {str(e)}") |
|
|
|
finally: |
|
self.is_completed = True |
|
self.is_running_flag.clear() |
|
self._add_log("[INFO] Output capture completed") |
|
|
|
def _standardize_stage_name(self, stage_name: str) -> str: |
|
""" |
|
Standardize the stage name to match the frontend expectations |
|
|
|
Args: |
|
stage_name: Original stage name |
|
|
|
Returns: |
|
Standardized stage name |
|
""" |
|
|
|
stage_mapping = { |
|
|
|
|
|
"ingest": "ingestion", |
|
"upload": "upload_ingest_to_hub", |
|
"summarize": "summarization", |
|
"chunk": "chunking", |
|
"generate_questions": "single_shot_question_generation", |
|
} |
|
|
|
|
|
for key, value in stage_mapping.items(): |
|
if key in stage_name.lower(): |
|
return value |
|
|
|
|
|
return stage_name |
|
|
|
def run(self, token: Optional[str] = None) -> None: |
|
""" |
|
Run the ingestion task |
|
|
|
Args: |
|
token: Hugging Face token |
|
""" |
|
try: |
|
self._add_log("[INFO] Starting ingestion process") |
|
|
|
|
|
if not os.path.exists(self.config_path): |
|
raise FileNotFoundError(f"Configuration file does not exist: {self.config_path}") |
|
|
|
|
|
try: |
|
with open(self.config_path, 'r') as f: |
|
config_yaml = yaml.safe_load(f) |
|
|
|
|
|
source_dir = config_yaml.get("pipeline", {}).get("ingestion", {}).get("source_documents_dir", "") |
|
output_dir = config_yaml.get("pipeline", {}).get("ingestion", {}).get("output_dir", "") |
|
|
|
if source_dir: |
|
self._add_log(f"[INFO] Source directory: {source_dir}") |
|
if output_dir: |
|
self._add_log(f"[INFO] Output directory: {output_dir}") |
|
|
|
|
|
if source_dir and os.path.exists(source_dir): |
|
files = os.listdir(source_dir) |
|
if files: |
|
self._add_log(f"[INFO] Files to process: {', '.join(files)}") |
|
else: |
|
self._add_log("[WARN] No files found in source directory") |
|
|
|
except Exception as e: |
|
self._add_log(f"[WARN] Unable to read configuration: {str(e)}") |
|
|
|
|
|
env = os.environ.copy() |
|
|
|
|
|
hf_token = os.getenv("HF_TOKEN") |
|
if hf_token: |
|
|
|
env["HF_TOKEN"] = hf_token |
|
env["HUGGING_FACE_HUB_TOKEN"] = hf_token |
|
env["HF_ORGANIZATION"] = os.getenv("HF_ORGANIZATION", "yourbench") |
|
self._add_log("[INFO] Environment variables exported") |
|
|
|
|
|
self._add_log(f"[INFO] Executing command: {' '.join(self.command)}") |
|
|
|
self.process = subprocess.Popen( |
|
self.command, |
|
stdout=subprocess.PIPE, |
|
stderr=subprocess.STDOUT, |
|
text=True, |
|
bufsize=1, |
|
universal_newlines=True, |
|
env=env |
|
) |
|
|
|
|
|
self.is_running_flag.set() |
|
|
|
|
|
output_thread = threading.Thread(target=self._capture_output) |
|
output_thread.daemon = True |
|
output_thread.start() |
|
|
|
self._add_log(f"[INFO] Process started with PID: {self.process.pid}") |
|
|
|
except Exception as e: |
|
self._add_log(f"[ERROR] Error starting ingestion process: {str(e)}") |
|
self.is_completed = True |
|
|