Fathom-DeepResearch / eval_benchmarks.py
Tasmay-Tib's picture
init
5ab87e0
# eval_benchmark_multithreaded.py
"""Unified benchmarking script for ReCall, ZeroSearch, and R1‑Searcher
with optional multi‑threaded execution.
Example usage (single‑threaded)
-------------------------------
```bash
python eval_benchmark.py \
--dataset frames \
--agent r1-searcher \
--model-url http://0.0.0.0:1233 \
--out-base /tmp/evals \
--mode single
```
Example usage (multi‑threaded, 128 workers)
------------------------------------------
```bash
python eval_benchmark.py \
--dataset frames \
--agent recall \
--model-url http://0.0.0.0:1231 \
--out-base /tmp/evals \
--mode multi \
--workers 128
```
The script will:
1. Load the specified dataset JSONL file that contains objects with keys
`question` and `answer`.
2. Build the chosen agent wrapper (`recall`, `zerosearch`, or `r1-searcher`).
3. Stream one JSONL line per example with *all* details needed for analysis.
4. Optionally run the evaluation loop in parallel using a configurable number
of worker threads.
5. Automatically construct the output path as:
```
{out_base}/{model_name}/{dataset}.jsonl
```
where `model_name` is derived from the `--model-url` (characters after the
last `/`).
"""
from __future__ import annotations
import argparse
import json
import logging
import os
import pathlib
import re
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Dict, List
import unicodedata
from openai import OpenAI, APIStatusError
from tqdm import tqdm
# --------------------------------------------------------------------
# Agent imports (ensure PYTHONPATH is set appropriately)
# --------------------------------------------------------------------
from re_call import ReCall # user's wrapper
# from re_call import ZeroSearchInference, ZeroSearchConfig
# from re_call import R1Searcher, R1SearchConfig as R1Cfg
# from re_call import O1Cfg, O1Searcher
from pathlib import Path
# from re_call import SDSCfg, SDSSearcher
# --------------------------------------------------------------------
# Environment Keys – override with real keys or environment variables
# --------------------------------------------------------------------
#for recall
# search_env = "from search_api import web_search, web_visit"
# search_schemas =[
# {
# "name": "web_search",
# "description": "Google search and return links to web-pages with a brief snippet given a text query",
# "parameters": {
# "type": "object",
# "properties": {
# "query": {"type": "string"},
# },
# "required": ["query"],
# },
# },
# {
# "name": "web_visit",
# "description": "Visit webpage and return its content",
# "parameters": {
# "type": "object",
# "properties": {
# "url": {"type": "string", "description": "The URL of the webpage to visit. Must be a single URL"},
# },
# "required": ["url"],
# },
# }
# ]
# for recall
search_env = "from search_api import search_urls, open_url, search_and_parse_query, query_url"
search_schemas =[
{
"name": "search_urls",
"description": "Google search and return links to web-pages with a brief snippet given a text query",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string"},
"top_k": {"type": "integer", "default": 10},
},
"required": ["query"],
},
},
{
"name": "query_url",
"description": "Visit webpage and return evidence based retrival for the provided goal",
"parameters": {
"type": "object",
"properties": {
"url": {"type": "string", "description": "The URL of the webpage to visit. Must be a single URL"},
"goal": {"type": "string", "description": "The specific information goal for visiting webpage"},
},
"required": ["url", "goal"],
},
}
]
EXECUTOR_URL = os.environ["HOST_SERPER_URL"]
DATA_ROOT = pathlib.Path("./eval_datasets")
SEM = threading.Semaphore(3) # limit concurrent judge calls
JUDGE_MODEL = "gpt-4.1-mini"
try:
base = Path(__file__).resolve().parent
except NameError: # e.g., REPL/Jupyter
base = Path.cwd()
TOKENIZER_DIR = (base / "tokenizer-info").resolve()
# ───────────────────────── tokenizer ────────────────────────────────────────
try:
from transformers import AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained(TOKENIZER_DIR, trust_remote_code=True)
except Exception as e:
import sys
sys.exit(f"❌ Could not load Qwen3 tokenizer: {e}")
import hashlib
def get_uid(sample: dict) -> str:
"""Generate a UID using SHA256 hash of question."""
return hashlib.sha256(sample["question"].strip().encode("utf-8")).hexdigest()
# --------------------------------------------------------------------
# Regex & utilities
# --------------------------------------------------------------------
def extract_answer_tagged(text: str) -> str:
ANS_RE = re.compile(r"<answer>(.*?)</answer>", re.S)
match = ANS_RE.findall(text)
if match :
return match[-1].strip().lower()
else:
print("No answer tags found")
return text[-200:] #because o1-searcher fails to follow format
def extract_answer_boxed(response):
def remove_boxed(s):
if "\\boxed " in s:
left = "\\boxed "
assert s[:len(left)] == left
return s[len(left):]
left = "\\boxed{"
assert s[:len(left)] == left
assert s[-1] == "}"
return s[len(left):-1]
def last_boxed_only_string(string):
idx = string.rfind("\\boxed")
if "\\boxed " in string:
return "\\boxed " + string.split("\\boxed ")[-1].split("$")[0]
if idx < 0:
idx = string.rfind("\\fbox")
if idx < 0:
return None
i = idx
right_brace_idx = None
num_left_braces_open = 0
while i < len(string):
if string[i] == "{":
num_left_braces_open += 1
if string[i] == "}":
num_left_braces_open -= 1
if num_left_braces_open == 0:
right_brace_idx = i
break
i += 1
if right_brace_idx is None:
retval = None
else:
retval = string[idx:right_brace_idx + 1]
return retval
answer = remove_boxed(last_boxed_only_string(response))
return answer
JUDGE_SYS = """
You are an impartial judge evaluating the correctness of a model's answer against a ground-truth answer for a given question. Your task is to:
1. Compare the model's answer to the ground-truth answer.
2. Determine if the model's answer is correct or incorrect.
**Input Format:**
- Question: {question}
- Ground Truth: {ground_truth}
- Model Answer: {model_answer}
**Output Format:**
correct/incorrect/unknown
**Guidelines:**
- The model's answer is correct if it matches the ground-truth answer in meaning and content it is case-insensitive, ignore minor punctuation or formatting differences.
- If the model's answer contains additional information, it is still correct as long as the core answer matches the ground truth.
- Be precise output a single word correct / incorrect / unknown and **nothing else**
- For MCQ questions match the option ID A. B. C. or D. if its correct the answer is correct.
"""
# - If the model's answer is partially correct or contains errors, it is incorrect.
# Thread‑local OpenAI client cache
def _oa() -> OpenAI:
th = threading.current_thread()
if not hasattr(th, "_oa"):
th._oa = OpenAI()
return th._oa
def judge(q: str, gt: str, pred: str) -> str:
if pred == "":
return "unknown"
prompt = JUDGE_SYS.format(question=q, ground_truth=gt, model_answer=pred)
try:
with SEM:
resp = _oa().chat.completions.create(
model=JUDGE_MODEL,
messages=[
{"role": "system", "content": JUDGE_SYS},
{"role": "user", "content": prompt},
],
temperature=0.0,
max_tokens=100,
)
return resp.choices[0].message.content.strip().lower()
except APIStatusError:
return "unknown"
# --------------------------------------------------------------------
# Agent factory
# --------------------------------------------------------------------
def build_agent(kind: str, model_url: str):
kind = kind.lower()
print(kind)
if kind == "recall":
return ReCall(executor_url=EXECUTOR_URL)
else:
raise ValueError(f"Unknown agent kind: {kind}")
# if kind == "o1-search" or kind == "sds":
# cfg = O1Cfg()
# return O1Searcher(cfg, thinker_url=model_url)
# if kind == "zerosearch":
# cfg = ZeroSearchConfig(thinker_url=model_url)
# return ZeroSearchInference(cfg)
# if kind in ("r1-search", "r1-searcher", "r1"):
# cfg = R1Cfg(serper_api_key=os.getenv("SERPER_API_KEY", ""))
# return R1Searcher(cfg=cfg, model_url=model_url)
# raise ValueError(f"Unknown agent kind: {kind}")
# --------------------------------------------------------------------
# Core evaluation routine for a single example (thread‑safe)
# --------------------------------------------------------------------
def evaluate_example(example: Dict[str, str], agent_kind: str, model_url: str) -> Dict[str, str]:
"""Run one example through the pipeline and return result row."""
question = example["question"].strip()
answer_gt = example["answer"].strip()
idx = example["id"].strip()
# Build a *fresh* agent per thread to avoid shared‑state issues
agent = build_agent(agent_kind, model_url=model_url)
if agent_kind == "recall" and model_url == "deepseek-ai/DeepSeek-R1":
# print(agent_kind)
# print("B"*100)
transcript, tool_calls = agent.run_deepseek(
env=search_env,
func_schemas=search_schemas,
question=question,
model_name="deepseek-ai/DeepSeek-R1",
temperature=0.6,
max_tokens=40960,
# tokenizer = tokenizer
)
elif agent_kind == "recall":
transcript, tool_calls, chat = agent.run(
env=search_env,
func_schemas=search_schemas,
question=question,
model_url=model_url,
temperature=0.6,
max_new_tokens=40960,
tokenizer = tokenizer
)
# tool_calls = agent.extract_tool_calls(transcript)
else: # zerosearch or r1‑searcher
transcript, tool_calls = agent.run(question)
if agent_kind in [
"r1-searcher",
"zerosearch",
# "o1-search",
]:
pred = extract_answer_tagged(transcript)
if agent_kind in [
"recall",
"SDS"
"o1-searcher"
]:
try:
pred = extract_answer_boxed(transcript)
except:
print("falling to last string")
pred = transcript[-200:]
else:
try:
pred = extract_answer_boxed(transcript)
except:
print("falling to last string")
pred = transcript[-200:]
verdict = judge(question, answer_gt.lower(), pred.lower())
return {
"id": idx,
"question": question,
"answer_gt": answer_gt,
"model_answer": pred,
"judge": verdict,
"tool_calls": tool_calls,
"transcript": transcript,
"chat": chat
}
# --------------------------------------------------------------------
# CLI entry‑point
# --------------------------------------------------------------------
def build_output_path(out_base, agent, dataset, name) -> pathlib.Path:
"""Construct output path as {out_base}/{model_name}/{dataset}.jsonl."""
return out_base / f"{agent}" / f"{dataset}-{name}.jsonl"
def normalize(s: str) -> str:
return unicodedata.normalize("NFKD", s.strip().lower())
def load_existing_results(path: pathlib.Path) -> tuple[list[dict], set[str]]:
results = []
uids = set()
if not path.exists():
return results, uids
with open(path, "r", encoding="utf-8") as f:
for line in f:
try:
row = json.loads(line)
if row['model_answer'] != "":
results.append(row)
uids.add(row["id"])
except Exception:
continue
return results, uids
def main():
parser = argparse.ArgumentParser(description="Benchmark QA agents on a dataset (single or multi‑threaded)")
parser.add_argument("--dataset", required=True, help="dataset name (frames, …)")
parser.add_argument("--agent", required=True, choices=["recall", "zerosearch", "r1-searcher", "o1-search", "SDS", "deepseek-r1"], help="agent wrapper")
parser.add_argument("--out", required=True, help="base directory for outputs")
parser.add_argument("--model-url", required=False, help="URL of the model server")
parser.add_argument("--limit", type=int, default=0, help="optional cap on number of questions")
parser.add_argument("--mode", choices=["single", "multi"], default="single", help="execution mode")
parser.add_argument("--workers", type=int, default=8, help="number of worker threads for multi‑mode")
parser.add_argument("--name", type=str, default="", help="suffix for save dir")
args = parser.parse_args()
# ----------------------------------------------------------------
# Dataset loading
# ----------------------------------------------------------------
ds_path = DATA_ROOT / f"{args.dataset}.jsonl"
if not ds_path.exists():
raise FileNotFoundError(ds_path)
with ds_path.open() as f:
data = [json.loads(line) for line in f]
# ----------------------------------------------------------------
# Output path setup
# ----------------------------------------------------------------
out_base = pathlib.Path(args.out).expanduser().resolve()
out_path = build_output_path(out_base, args.agent, args.dataset, args.name)
print(out_path)
out_path.parent.mkdir(parents=True, exist_ok=True)
if args.limit:
data = data[: args.limit]
# data = data[246:]
correct = 0
start_time = time.perf_counter()
# ----------------------------------------------------------------
# SINGLE‑THREADED EXECUTION
# ----------------------------------------------------------------
if args.mode == "single":
with open(out_path, "w", encoding="utf-8") as fout:
for ex in tqdm(data, desc="QA loop (single)"):
row = evaluate_example(ex, args.agent, args.model_url)
if row["judge"] == "correct":
correct += 1
# context for row
row.update({"agent": args.agent, "dataset": args.dataset})
fout.write(json.dumps(row, ensure_ascii=False) + "\n")
fout.flush()
# ----------------------------------------------------------------
# MULTI‑THREADED EXECUTION
# ----------------------------------------------------------------
else:
workers = max(1, args.workers)
logging.info("Running in multi‑threaded mode with %d workers", workers)
with ThreadPoolExecutor(max_workers=workers) as executor, open(out_path, "a", encoding="utf-8") as fout:
futures = {executor.submit(evaluate_example, ex, args.agent, args.model_url): ex for ex in data}
for fut in tqdm(as_completed(futures), total=len(futures), desc="QA loop (multi)"):
try:
row = fut.result()
except Exception as exc:
logging.exception("Evaluation failed: %s", exc)
continue
# print(row['id'])
if row["judge"] == "correct":
correct += 1
row.update({"agent": args.agent, "dataset": args.dataset})
fout.write(json.dumps(row, ensure_ascii=False) + "\n")
fout.flush()
elapsed = time.perf_counter() - start_time
accuracy = correct / len(data) if data else 0.0
print(f"Accuracy: {correct}/{len(data)} = {accuracy:.1%}")
print(f"Elapsed time: {elapsed:.2f}s ({elapsed/len(data):.2f}s per example)")
if __name__ == "__main__":
main()