|
|
|
|
|
"""Unified benchmarking script for ReCall, ZeroSearch, and R1‑Searcher |
|
|
with optional multi‑threaded execution. |
|
|
|
|
|
Example usage (single‑threaded) |
|
|
------------------------------- |
|
|
```bash |
|
|
python eval_benchmark.py \ |
|
|
--dataset frames \ |
|
|
--agent r1-searcher \ |
|
|
--model-url http://0.0.0.0:1233 \ |
|
|
--out-base /tmp/evals \ |
|
|
--mode single |
|
|
``` |
|
|
|
|
|
Example usage (multi‑threaded, 128 workers) |
|
|
------------------------------------------ |
|
|
```bash |
|
|
python eval_benchmark.py \ |
|
|
--dataset frames \ |
|
|
--agent recall \ |
|
|
--model-url http://0.0.0.0:1231 \ |
|
|
--out-base /tmp/evals \ |
|
|
--mode multi \ |
|
|
--workers 128 |
|
|
``` |
|
|
The script will: |
|
|
1. Load the specified dataset JSONL file that contains objects with keys |
|
|
`question` and `answer`. |
|
|
2. Build the chosen agent wrapper (`recall`, `zerosearch`, or `r1-searcher`). |
|
|
3. Stream one JSONL line per example with *all* details needed for analysis. |
|
|
4. Optionally run the evaluation loop in parallel using a configurable number |
|
|
of worker threads. |
|
|
5. Automatically construct the output path as: |
|
|
``` |
|
|
{out_base}/{model_name}/{dataset}.jsonl |
|
|
``` |
|
|
where `model_name` is derived from the `--model-url` (characters after the |
|
|
last `/`). |
|
|
""" |
|
|
from __future__ import annotations |
|
|
|
|
|
import argparse |
|
|
import json |
|
|
import logging |
|
|
import os |
|
|
import pathlib |
|
|
import re |
|
|
import threading |
|
|
import time |
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed |
|
|
from typing import Dict, List |
|
|
|
|
|
import unicodedata |
|
|
from openai import OpenAI, APIStatusError |
|
|
from tqdm import tqdm |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from re_call import ReCall |
|
|
|
|
|
|
|
|
|
|
|
from pathlib import Path |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
search_env = "from search_api import search_urls, open_url, search_and_parse_query, query_url" |
|
|
search_schemas =[ |
|
|
{ |
|
|
"name": "search_urls", |
|
|
"description": "Google search and return links to web-pages with a brief snippet given a text query", |
|
|
"parameters": { |
|
|
"type": "object", |
|
|
"properties": { |
|
|
"query": {"type": "string"}, |
|
|
"top_k": {"type": "integer", "default": 10}, |
|
|
}, |
|
|
"required": ["query"], |
|
|
}, |
|
|
}, |
|
|
{ |
|
|
"name": "query_url", |
|
|
"description": "Visit webpage and return evidence based retrival for the provided goal", |
|
|
"parameters": { |
|
|
"type": "object", |
|
|
"properties": { |
|
|
"url": {"type": "string", "description": "The URL of the webpage to visit. Must be a single URL"}, |
|
|
"goal": {"type": "string", "description": "The specific information goal for visiting webpage"}, |
|
|
}, |
|
|
"required": ["url", "goal"], |
|
|
}, |
|
|
} |
|
|
] |
|
|
|
|
|
EXECUTOR_URL = os.environ["HOST_SERPER_URL"] |
|
|
DATA_ROOT = pathlib.Path("./eval_datasets") |
|
|
SEM = threading.Semaphore(3) |
|
|
JUDGE_MODEL = "gpt-4.1-mini" |
|
|
|
|
|
try: |
|
|
base = Path(__file__).resolve().parent |
|
|
except NameError: |
|
|
base = Path.cwd() |
|
|
|
|
|
TOKENIZER_DIR = (base / "tokenizer-info").resolve() |
|
|
|
|
|
|
|
|
try: |
|
|
from transformers import AutoTokenizer |
|
|
tokenizer = AutoTokenizer.from_pretrained(TOKENIZER_DIR, trust_remote_code=True) |
|
|
except Exception as e: |
|
|
import sys |
|
|
sys.exit(f"❌ Could not load Qwen3 tokenizer: {e}") |
|
|
|
|
|
import hashlib |
|
|
|
|
|
def get_uid(sample: dict) -> str: |
|
|
"""Generate a UID using SHA256 hash of question.""" |
|
|
return hashlib.sha256(sample["question"].strip().encode("utf-8")).hexdigest() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def extract_answer_tagged(text: str) -> str: |
|
|
|
|
|
ANS_RE = re.compile(r"<answer>(.*?)</answer>", re.S) |
|
|
match = ANS_RE.findall(text) |
|
|
if match : |
|
|
return match[-1].strip().lower() |
|
|
else: |
|
|
print("No answer tags found") |
|
|
return text[-200:] |
|
|
|
|
|
def extract_answer_boxed(response): |
|
|
def remove_boxed(s): |
|
|
if "\\boxed " in s: |
|
|
left = "\\boxed " |
|
|
assert s[:len(left)] == left |
|
|
return s[len(left):] |
|
|
|
|
|
left = "\\boxed{" |
|
|
|
|
|
assert s[:len(left)] == left |
|
|
assert s[-1] == "}" |
|
|
|
|
|
return s[len(left):-1] |
|
|
|
|
|
def last_boxed_only_string(string): |
|
|
idx = string.rfind("\\boxed") |
|
|
if "\\boxed " in string: |
|
|
return "\\boxed " + string.split("\\boxed ")[-1].split("$")[0] |
|
|
if idx < 0: |
|
|
idx = string.rfind("\\fbox") |
|
|
if idx < 0: |
|
|
return None |
|
|
|
|
|
i = idx |
|
|
right_brace_idx = None |
|
|
num_left_braces_open = 0 |
|
|
while i < len(string): |
|
|
if string[i] == "{": |
|
|
num_left_braces_open += 1 |
|
|
if string[i] == "}": |
|
|
num_left_braces_open -= 1 |
|
|
if num_left_braces_open == 0: |
|
|
right_brace_idx = i |
|
|
break |
|
|
i += 1 |
|
|
|
|
|
if right_brace_idx is None: |
|
|
retval = None |
|
|
else: |
|
|
retval = string[idx:right_brace_idx + 1] |
|
|
|
|
|
return retval |
|
|
answer = remove_boxed(last_boxed_only_string(response)) |
|
|
return answer |
|
|
|
|
|
|
|
|
|
|
|
JUDGE_SYS = """ |
|
|
You are an impartial judge evaluating the correctness of a model's answer against a ground-truth answer for a given question. Your task is to: |
|
|
1. Compare the model's answer to the ground-truth answer. |
|
|
2. Determine if the model's answer is correct or incorrect. |
|
|
|
|
|
**Input Format:** |
|
|
- Question: {question} |
|
|
- Ground Truth: {ground_truth} |
|
|
- Model Answer: {model_answer} |
|
|
|
|
|
**Output Format:** |
|
|
correct/incorrect/unknown |
|
|
|
|
|
**Guidelines:** |
|
|
- The model's answer is correct if it matches the ground-truth answer in meaning and content it is case-insensitive, ignore minor punctuation or formatting differences. |
|
|
- If the model's answer contains additional information, it is still correct as long as the core answer matches the ground truth. |
|
|
- Be precise output a single word correct / incorrect / unknown and **nothing else** |
|
|
- For MCQ questions match the option ID A. B. C. or D. if its correct the answer is correct. |
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _oa() -> OpenAI: |
|
|
th = threading.current_thread() |
|
|
if not hasattr(th, "_oa"): |
|
|
th._oa = OpenAI() |
|
|
return th._oa |
|
|
|
|
|
|
|
|
def judge(q: str, gt: str, pred: str) -> str: |
|
|
if pred == "": |
|
|
return "unknown" |
|
|
prompt = JUDGE_SYS.format(question=q, ground_truth=gt, model_answer=pred) |
|
|
try: |
|
|
with SEM: |
|
|
resp = _oa().chat.completions.create( |
|
|
model=JUDGE_MODEL, |
|
|
messages=[ |
|
|
{"role": "system", "content": JUDGE_SYS}, |
|
|
{"role": "user", "content": prompt}, |
|
|
], |
|
|
temperature=0.0, |
|
|
max_tokens=100, |
|
|
) |
|
|
return resp.choices[0].message.content.strip().lower() |
|
|
except APIStatusError: |
|
|
return "unknown" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def build_agent(kind: str, model_url: str): |
|
|
kind = kind.lower() |
|
|
print(kind) |
|
|
if kind == "recall": |
|
|
return ReCall(executor_url=EXECUTOR_URL) |
|
|
else: |
|
|
raise ValueError(f"Unknown agent kind: {kind}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def evaluate_example(example: Dict[str, str], agent_kind: str, model_url: str) -> Dict[str, str]: |
|
|
"""Run one example through the pipeline and return result row.""" |
|
|
question = example["question"].strip() |
|
|
answer_gt = example["answer"].strip() |
|
|
idx = example["id"].strip() |
|
|
|
|
|
|
|
|
|
|
|
agent = build_agent(agent_kind, model_url=model_url) |
|
|
|
|
|
if agent_kind == "recall" and model_url == "deepseek-ai/DeepSeek-R1": |
|
|
|
|
|
|
|
|
transcript, tool_calls = agent.run_deepseek( |
|
|
env=search_env, |
|
|
func_schemas=search_schemas, |
|
|
question=question, |
|
|
model_name="deepseek-ai/DeepSeek-R1", |
|
|
temperature=0.6, |
|
|
max_tokens=40960, |
|
|
|
|
|
) |
|
|
elif agent_kind == "recall": |
|
|
transcript, tool_calls, chat = agent.run( |
|
|
env=search_env, |
|
|
func_schemas=search_schemas, |
|
|
question=question, |
|
|
model_url=model_url, |
|
|
temperature=0.6, |
|
|
max_new_tokens=40960, |
|
|
tokenizer = tokenizer |
|
|
) |
|
|
|
|
|
else: |
|
|
transcript, tool_calls = agent.run(question) |
|
|
|
|
|
if agent_kind in [ |
|
|
"r1-searcher", |
|
|
"zerosearch", |
|
|
|
|
|
]: |
|
|
pred = extract_answer_tagged(transcript) |
|
|
if agent_kind in [ |
|
|
"recall", |
|
|
"SDS" |
|
|
"o1-searcher" |
|
|
]: |
|
|
try: |
|
|
pred = extract_answer_boxed(transcript) |
|
|
except: |
|
|
print("falling to last string") |
|
|
pred = transcript[-200:] |
|
|
else: |
|
|
try: |
|
|
pred = extract_answer_boxed(transcript) |
|
|
except: |
|
|
print("falling to last string") |
|
|
pred = transcript[-200:] |
|
|
|
|
|
verdict = judge(question, answer_gt.lower(), pred.lower()) |
|
|
|
|
|
return { |
|
|
"id": idx, |
|
|
"question": question, |
|
|
"answer_gt": answer_gt, |
|
|
"model_answer": pred, |
|
|
"judge": verdict, |
|
|
"tool_calls": tool_calls, |
|
|
"transcript": transcript, |
|
|
"chat": chat |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def build_output_path(out_base, agent, dataset, name) -> pathlib.Path: |
|
|
"""Construct output path as {out_base}/{model_name}/{dataset}.jsonl.""" |
|
|
return out_base / f"{agent}" / f"{dataset}-{name}.jsonl" |
|
|
|
|
|
def normalize(s: str) -> str: |
|
|
return unicodedata.normalize("NFKD", s.strip().lower()) |
|
|
|
|
|
def load_existing_results(path: pathlib.Path) -> tuple[list[dict], set[str]]: |
|
|
results = [] |
|
|
uids = set() |
|
|
if not path.exists(): |
|
|
return results, uids |
|
|
with open(path, "r", encoding="utf-8") as f: |
|
|
for line in f: |
|
|
try: |
|
|
row = json.loads(line) |
|
|
if row['model_answer'] != "": |
|
|
results.append(row) |
|
|
uids.add(row["id"]) |
|
|
except Exception: |
|
|
continue |
|
|
return results, uids |
|
|
|
|
|
def main(): |
|
|
parser = argparse.ArgumentParser(description="Benchmark QA agents on a dataset (single or multi‑threaded)") |
|
|
parser.add_argument("--dataset", required=True, help="dataset name (frames, …)") |
|
|
parser.add_argument("--agent", required=True, choices=["recall", "zerosearch", "r1-searcher", "o1-search", "SDS", "deepseek-r1"], help="agent wrapper") |
|
|
parser.add_argument("--out", required=True, help="base directory for outputs") |
|
|
parser.add_argument("--model-url", required=False, help="URL of the model server") |
|
|
parser.add_argument("--limit", type=int, default=0, help="optional cap on number of questions") |
|
|
parser.add_argument("--mode", choices=["single", "multi"], default="single", help="execution mode") |
|
|
parser.add_argument("--workers", type=int, default=8, help="number of worker threads for multi‑mode") |
|
|
parser.add_argument("--name", type=str, default="", help="suffix for save dir") |
|
|
|
|
|
args = parser.parse_args() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
ds_path = DATA_ROOT / f"{args.dataset}.jsonl" |
|
|
if not ds_path.exists(): |
|
|
raise FileNotFoundError(ds_path) |
|
|
|
|
|
with ds_path.open() as f: |
|
|
data = [json.loads(line) for line in f] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
out_base = pathlib.Path(args.out).expanduser().resolve() |
|
|
out_path = build_output_path(out_base, args.agent, args.dataset, args.name) |
|
|
print(out_path) |
|
|
out_path.parent.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
if args.limit: |
|
|
data = data[: args.limit] |
|
|
|
|
|
|
|
|
correct = 0 |
|
|
start_time = time.perf_counter() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if args.mode == "single": |
|
|
with open(out_path, "w", encoding="utf-8") as fout: |
|
|
for ex in tqdm(data, desc="QA loop (single)"): |
|
|
|
|
|
row = evaluate_example(ex, args.agent, args.model_url) |
|
|
if row["judge"] == "correct": |
|
|
correct += 1 |
|
|
|
|
|
row.update({"agent": args.agent, "dataset": args.dataset}) |
|
|
fout.write(json.dumps(row, ensure_ascii=False) + "\n") |
|
|
fout.flush() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
else: |
|
|
workers = max(1, args.workers) |
|
|
logging.info("Running in multi‑threaded mode with %d workers", workers) |
|
|
with ThreadPoolExecutor(max_workers=workers) as executor, open(out_path, "a", encoding="utf-8") as fout: |
|
|
futures = {executor.submit(evaluate_example, ex, args.agent, args.model_url): ex for ex in data} |
|
|
for fut in tqdm(as_completed(futures), total=len(futures), desc="QA loop (multi)"): |
|
|
try: |
|
|
row = fut.result() |
|
|
except Exception as exc: |
|
|
logging.exception("Evaluation failed: %s", exc) |
|
|
continue |
|
|
|
|
|
if row["judge"] == "correct": |
|
|
correct += 1 |
|
|
row.update({"agent": args.agent, "dataset": args.dataset}) |
|
|
fout.write(json.dumps(row, ensure_ascii=False) + "\n") |
|
|
fout.flush() |
|
|
|
|
|
elapsed = time.perf_counter() - start_time |
|
|
accuracy = correct / len(data) if data else 0.0 |
|
|
print(f"Accuracy: {correct}/{len(data)} = {accuracy:.1%}") |
|
|
print(f"Elapsed time: {elapsed:.2f}s ({elapsed/len(data):.2f}s per example)") |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |