Agent1 / app.py
CatoG
jkkjkj
22173b0
import os
import re
import uuid
import random
import warnings
import traceback
from contextvars import ContextVar
from datetime import datetime, timezone
from typing import Dict, List, Optional, Tuple, TypedDict
import requests
from dotenv import load_dotenv
from workflow_helpers import (
WorkflowConfig, DEFAULT_CONFIG,
detect_output_format, detect_brevity_requirement,
classify_task, task_needs_evidence,
QAResult, parse_structured_qa, QAIssue,
PlannerState, FailureRecord,
select_relevant_roles, identify_revision_targets,
compress_final_answer, strip_internal_noise,
postprocess_format_fixes,
get_synthesizer_format_instruction, get_qa_format_instruction,
validate_output_format, format_violations_instruction,
parse_task_assumptions, format_assumptions_for_prompt,
ROLE_RELEVANCE,
STRUCTURED_OUTPUT_SUFFIX,
StructuredContribution, parse_structured_contribution,
format_contributions_for_synthesizer, format_contributions_for_qa,
parse_used_contributions, check_expert_influence,
)
from evidence import (
EvidenceResult, EvidenceItem,
WebSearchAdapter, WikipediaAdapter, ArxivAdapter,
ResearchToolAdapter,
gather_evidence, extract_search_queries,
detect_unsupported_claims,
format_evidence_for_prompt, format_evidence_for_qa,
)
warnings.filterwarnings("ignore", category=UserWarning, module="wikipedia")
load_dotenv()
if os.path.exists("/data"):
os.environ.setdefault("HF_HOME", "/data/.huggingface")
import gradio as gr
import yfinance as yf
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
from huggingface_hub.errors import HfHubHTTPError
from langchain_core.tools import tool
from langchain_core.messages import SystemMessage, HumanMessage
from langchain.agents import create_agent
from langchain_community.utilities import WikipediaAPIWrapper, ArxivAPIWrapper
from langchain_community.tools import DuckDuckGoSearchRun, ArxivQueryRun
from langchain_huggingface import ChatHuggingFace, HuggingFaceEndpoint
# ============================================================
# Config
# ============================================================
HF_TOKEN = os.getenv("HUGGINGFACEHUB_API_TOKEN") or os.getenv("HF_TOKEN")
if not HF_TOKEN:
raise ValueError("Missing Hugging Face token. Set HUGGINGFACEHUB_API_TOKEN or HF_TOKEN.")
MAX_NEW_TOKENS = int(os.getenv("MAX_NEW_TOKENS", "1024"))
CHART_DIR = "charts"
os.makedirs(CHART_DIR, exist_ok=True)
MODEL_OPTIONS = [
# Meta / Llama
"meta-llama/Llama-3.1-8B-Instruct",
"meta-llama/Llama-3.3-70B-Instruct",
# OpenAI
"openai/gpt-oss-20b",
"openai/gpt-oss-120b",
# Qwen
"Qwen/Qwen3-VL-8B-Instruct",
"Qwen/Qwen2.5-7B-Instruct",
"Qwen/Qwen3-8B",
"Qwen/Qwen3-32B",
# Baidu
"baidu/ERNIE-4.5-21B-A3B-PT",
# DeepSeek
"deepseek-ai/DeepSeek-R1",
"deepseek-ai/DeepSeek-V3-0324",
# GLM
"zai-org/GLM-5",
"zai-org/GLM-4.7",
"zai-org/GLM-4.6",
"zai-org/GLM-4.5",
# MiniMax / Kimi
"MiniMaxAI/MiniMax-M2.5",
"moonshotai/Kimi-K2.5",
"moonshotai/Kimi-K2-Instruct-0905",
]
DEFAULT_MODEL_ID = "openai/gpt-oss-20b"
MODEL_NOTES = {
"meta-llama/Llama-3.1-8B-Instruct": "Provider model. May require gated access depending on your token.",
"meta-llama/Llama-3.3-70B-Instruct": "Large provider model. Likely slower and may hit rate limits.",
"openai/gpt-oss-20b": "Provider model. Good showcase option if available in your enabled providers.",
"openai/gpt-oss-120b": "Large provider model. May call tools but sometimes fail to return final text.",
"Qwen/Qwen3-VL-8B-Instruct": "Vision-language model. In this text-only UI it behaves as text-only.",
"Qwen/Qwen2.5-7B-Instruct": "Provider model. Usually a safer text-only fallback.",
"Qwen/Qwen3-8B": "Provider model. Availability depends on enabled providers.",
"Qwen/Qwen3-32B": "Large provider model. Availability depends on enabled providers.",
"baidu/ERNIE-4.5-21B-A3B-PT": "Provider model. Availability depends on enabled providers.",
"deepseek-ai/DeepSeek-R1": "Provider model. Availability depends on enabled providers.",
"deepseek-ai/DeepSeek-V3-0324": "Provider model. Availability depends on enabled providers.",
"zai-org/GLM-5": "Provider model. Availability depends on enabled providers.",
"zai-org/GLM-4.7": "Provider model. Availability depends on enabled providers.",
"zai-org/GLM-4.6": "Provider model. Availability depends on enabled providers.",
"zai-org/GLM-4.5": "Provider model. Availability depends on enabled providers.",
"MiniMaxAI/MiniMax-M2.5": "Provider model. Availability depends on enabled providers.",
"moonshotai/Kimi-K2.5": "Provider model. Availability depends on enabled providers.",
"moonshotai/Kimi-K2-Instruct-0905": "Provider model. Availability depends on enabled providers.",
}
LLM_CACHE: Dict[str, object] = {}
AGENT_CACHE: Dict[Tuple[str, Tuple[str, ...]], object] = {}
RUNTIME_HEALTH: Dict[str, str] = {}
# ContextVar propagates into LangChain worker threads automatically (unlike threading.local)
_client_location: ContextVar[str] = ContextVar("client_location", default="")
# ============================================================
# Shared wrappers
# ============================================================
try:
ddg_search = DuckDuckGoSearchRun()
except Exception:
ddg_search = None
arxiv_tool = ArxivQueryRun(
api_wrapper=ArxivAPIWrapper(
top_k_results=3,
doc_content_chars_max=1200,
)
)
# ============================================================
# Research tool adapters for evidence-backed retrieval
# ============================================================
def _build_research_adapters() -> List[ResearchToolAdapter]:
"""Build available research tool adapters from the shared wrappers."""
adapters: List[ResearchToolAdapter] = []
if ddg_search is not None:
adapters.append(WebSearchAdapter(ddg_search.run))
wiki = WikipediaAPIWrapper()
adapters.append(WikipediaAdapter(wiki.run))
adapters.append(ArxivAdapter(arxiv_tool.run))
return adapters
# ============================================================
# Model helpers
# ============================================================
def model_status_text(model_id: str) -> str:
note = MODEL_NOTES.get(model_id, "Provider model.")
health = RUNTIME_HEALTH.get(model_id)
if health == "ok":
return note
if health == "unavailable":
return note + " This model previously failed because no enabled provider supported it."
if health == "gated":
return note + " This model previously failed due to access restrictions."
if health == "rate_limited":
return note + " This model previously hit rate limiting."
if health == "empty_final":
return note + " This model previously called tools but returned no final assistant text."
if health == "error":
return note + " This model previously failed with a backend/runtime error."
return note
def build_provider_chat(model_id: str):
if model_id in LLM_CACHE:
return LLM_CACHE[model_id]
llm = HuggingFaceEndpoint(
repo_id=model_id,
task="text-generation",
provider="auto",
huggingfacehub_api_token=HF_TOKEN,
max_new_tokens=MAX_NEW_TOKENS,
temperature=0.1,
timeout=120,
)
chat = ChatHuggingFace(llm=llm)
LLM_CACHE[model_id] = chat
return chat
# ============================================================
# Chart helpers
# ============================================================
def save_line_chart(
title: str,
x_values: List[str],
y_values: List[float],
x_label: str = "X",
y_label: str = "Y",
) -> str:
path = os.path.join(CHART_DIR, f"{uuid.uuid4().hex}.png")
fig, ax = plt.subplots(figsize=(9, 4.8))
ax.plot(x_values, y_values)
ax.set_title(title)
ax.set_xlabel(x_label)
ax.set_ylabel(y_label)
ax.grid(True)
fig.autofmt_xdate()
fig.tight_layout()
fig.savefig(path, bbox_inches="tight")
plt.close(fig)
return path
def extract_chart_path(text: str) -> Optional[str]:
if not text:
return None
match = re.search(r"Chart saved to:\s*(.+\.png)", text)
if not match:
return None
candidate = match.group(1).strip()
if os.path.exists(candidate):
return candidate
abs_path = os.path.abspath(candidate)
if os.path.exists(abs_path):
return abs_path
return None
def content_to_text(content) -> str:
if isinstance(content, str):
return content
if isinstance(content, list):
parts = []
for item in content:
if isinstance(item, str):
parts.append(item)
elif isinstance(item, dict) and "text" in item:
parts.append(item["text"])
else:
parts.append(str(item))
return "\n".join(parts).strip()
return str(content)
def short_text(text: str, limit: int = 1200) -> str:
text = text or ""
return text if len(text) <= limit else text[:limit] + "..."
# ============================================================
# Tools
# ============================================================
@tool
def add_numbers(a: float, b: float) -> float:
"""Add two numbers."""
return a + b
@tool
def subtract_numbers(a: float, b: float) -> float:
"""Subtract the second number from the first."""
return a - b
@tool
def multiply_numbers(a: float, b: float) -> float:
"""Multiply two numbers."""
return a * b
@tool
def divide_numbers(a: float, b: float) -> float:
"""Divide the first number by the second."""
if b == 0:
raise ValueError("Cannot divide by zero.")
return a / b
@tool
def power(a: float, b: float) -> float:
"""Raise the first number to the power of the second."""
return a ** b
@tool
def square_root(a: float) -> float:
"""Calculate the square root of a number."""
if a < 0:
raise ValueError("Cannot calculate square root of a negative number.")
return a ** 0.5
@tool
def percentage(part: float, whole: float) -> float:
"""Calculate what percentage the first value is of the second value."""
if whole == 0:
raise ValueError("Whole cannot be zero.")
return (part / whole) * 100
@tool
def search_wikipedia(query: str) -> str:
"""Search Wikipedia for stable factual information."""
wiki = WikipediaAPIWrapper()
return wiki.run(query)
@tool
def web_search(query: str) -> str:
"""Search the web for recent or changing information."""
if ddg_search is None:
return "Web search is unavailable because DDGS is not available."
return ddg_search.run(query)
@tool
def search_arxiv(query: str) -> str:
"""Search arXiv for scientific papers and research literature."""
return arxiv_tool.run(query)
@tool
def get_current_utc_time(_: str = "") -> str:
"""Return the current UTC date and time."""
return datetime.now(timezone.utc).isoformat()
@tool
def get_stock_price(ticker: str) -> str:
"""Get the latest recent close price for a stock, ETF, index, or crypto ticker."""
ticker = ticker.upper().strip()
t = yf.Ticker(ticker)
hist = t.history(period="5d")
if hist.empty:
return f"No recent market data found for {ticker}."
last = float(hist["Close"].iloc[-1])
return f"{ticker} latest close: {last:.2f}"
@tool
def get_stock_history(ticker: str, period: str = "6mo") -> str:
"""Get historical closing prices for a ticker and generate a chart image."""
ticker = ticker.upper().strip()
t = yf.Ticker(ticker)
hist = t.history(period=period)
if hist.empty:
return f"No historical market data found for {ticker}."
x_vals = [str(d.date()) for d in hist.index]
y_vals = [float(v) for v in hist["Close"].tolist()]
chart_path = save_line_chart(
title=f"{ticker} closing price ({period})",
x_values=x_vals,
y_values=y_vals,
x_label="Date",
y_label="Close",
)
start_close = y_vals[0]
end_close = y_vals[-1]
pct = ((end_close - start_close) / start_close) * 100 if start_close else 0.0
return (
f"Ticker: {ticker}\n"
f"Period: {period}\n"
f"Points: {len(y_vals)}\n"
f"Start close: {start_close:.2f}\n"
f"End close: {end_close:.2f}\n"
f"Performance: {pct:+.2f}%\n"
f"Chart saved to: {chart_path}"
)
@tool
def generate_line_chart(
title: str,
x_values: list,
y_values: list,
x_label: str = "X",
y_label: str = "Y",
) -> str:
"""Generate a line chart from x and y values and save it as an image file."""
chart_path = save_line_chart(title, x_values, y_values, x_label=x_label, y_label=y_label)
return f"Chart saved to: {chart_path}"
@tool
def wikipedia_chaos_oracle(query: str) -> str:
"""Generate a weird chaotic text mashup based on Wikipedia content."""
wiki = WikipediaAPIWrapper()
text = wiki.run(query)
if not text:
return "The chaos oracle found only silence."
words = re.findall(r"\w+", text)
if not words:
return "The chaos oracle found no usable words."
random.shuffle(words)
return " ".join(words[:30])
@tool
def random_number(min_value: int, max_value: int) -> int:
"""Generate a random integer between the minimum and maximum values."""
return random.randint(min_value, max_value)
@tool
def generate_uuid(_: str = "") -> str:
"""Generate a random UUID string."""
return str(uuid.uuid4())
@tool
def get_user_location(_: str = "") -> str:
"""Determine the user's precise physical location using browser GPS/WiFi coordinates or IP fallback."""
location_data = _client_location.get()
# Precise coordinates from browser geolocation API
if location_data and not location_data.startswith("ip:"):
try:
lat_str, lon_str = location_data.split(",", 1)
lat, lon = float(lat_str), float(lon_str)
except ValueError:
return "Location lookup failed: invalid coordinate data."
try:
headers = {"User-Agent": "HFAgent/1.0 (location lookup)"}
resp = requests.get(
"https://nominatim.openstreetmap.org/reverse",
params={"lat": lat, "lon": lon, "format": "json", "addressdetails": 1},
headers=headers,
timeout=8,
)
resp.raise_for_status()
data = resp.json()
addr = data.get("address", {})
city = (
addr.get("city")
or addr.get("town")
or addr.get("village")
or addr.get("municipality")
or addr.get("county")
or "N/A"
)
return (
f"City: {city}\n"
f"County: {addr.get('county', 'N/A')}\n"
f"Region: {addr.get('state', 'N/A')}\n"
f"Country: {addr.get('country', 'N/A')} ({addr.get('country_code', 'N/A').upper()})\n"
f"Latitude: {lat}\n"
f"Longitude: {lon}\n"
f"Source: Browser GPS/WiFi (precise)"
)
except requests.RequestException as exc:
return f"Reverse geocoding failed: {exc}"
# IP-based fallback
client_ip = location_data[3:] if location_data.startswith("ip:") else ""
url = f"http://ip-api.com/json/{client_ip}" if client_ip else "http://ip-api.com/json/"
try:
response = requests.get(url, timeout=5)
response.raise_for_status()
data = response.json()
if data.get("status") != "success":
return f"Location lookup failed: {data.get('message', 'unknown error')}"
return (
f"City: {data.get('city', 'N/A')}\n"
f"Region: {data.get('regionName', 'N/A')}\n"
f"Country: {data.get('country', 'N/A')} ({data.get('countryCode', 'N/A')})\n"
f"Latitude: {data.get('lat', 'N/A')}\n"
f"Longitude: {data.get('lon', 'N/A')}\n"
f"Timezone: {data.get('timezone', 'N/A')}\n"
f"ISP: {data.get('isp', 'N/A')}\n"
f"Source: IP geolocation (approximate)"
)
except requests.RequestException as exc:
return f"Location lookup failed: {exc}"
ALL_TOOLS = {
"add_numbers": add_numbers,
"subtract_numbers": subtract_numbers,
"multiply_numbers": multiply_numbers,
"divide_numbers": divide_numbers,
"power": power,
"square_root": square_root,
"percentage": percentage,
"search_wikipedia": search_wikipedia,
"web_search": web_search,
"search_arxiv": search_arxiv,
"get_current_utc_time": get_current_utc_time,
"get_stock_price": get_stock_price,
"get_stock_history": get_stock_history,
"generate_line_chart": generate_line_chart,
"wikipedia_chaos_oracle": wikipedia_chaos_oracle,
"random_number": random_number,
"generate_uuid": generate_uuid,
"get_user_location": get_user_location,
}
TOOL_NAMES = list(ALL_TOOLS.keys())
# ============================================================
# Multi-role workflow โ€” supervisor-style orchestration
# ============================================================
# Architecture:
# Planner โ†’ ALL active Specialists (sequentially) โ†’ Synthesizer โ†’ QA Tester โ†’ Planner review
# The Planner breaks the task and picks a primary specialist.
# ALL active specialists then contribute their own perspective.
# The Synthesizer summarises every perspective, identifies common ground, and
# produces a single unified recommendation as the draft that goes to QA.
# If QA fails and retries remain, the Planner revises and loops again.
# If QA passes (or max retries are reached) the Planner approves a final answer.
# ============================================================
MAX_REVISIONS = 3 # Maximum QA-driven revision cycles before accepting best attempt
AGENT_ROLES = {
"planner": "Planner",
"creative": "Creative Expert",
"technical": "Technical Expert",
"qa_tester": "QA Tester",
"research": "Research Analyst",
"security": "Security Reviewer",
"data_analyst": "Data Analyst",
"mad_professor": "Mad Professor",
"accountant": "Accountant",
"artist": "Artist",
"lazy_slacker": "Lazy Slacker",
"black_metal_fundamentalist": "Black Metal Fundamentalist",
"labour_union_rep": "Labour Union Representative",
"ux_designer": "UX Designer",
"doris": "Doris",
"chairman_of_board": "Chairman of the Board",
"maga_appointee": "MAGA Appointee",
"lawyer": "Lawyer",
}
# Reverse mapping: display label โ†’ role key
_ROLE_LABEL_TO_KEY = {v: k for k, v in AGENT_ROLES.items()}
class WorkflowState(TypedDict):
"""Shared, inspectable state object threaded through the whole workflow."""
user_request: str
plan: str
current_role: str # key from AGENT_ROLES (e.g. "creative", "technical", "mad_professor")
creative_output: str
technical_output: str
research_output: str
security_output: str
data_analyst_output: str
mad_professor_output: str
accountant_output: str
artist_output: str
lazy_slacker_output: str
black_metal_fundamentalist_output: str
labour_union_rep_output: str
ux_designer_output: str
doris_output: str
chairman_of_board_output: str
maga_appointee_output: str
lawyer_output: str
synthesis_output: str # unified summary produced by the Synthesizer after all specialists
draft_output: str # latest specialist/synthesis output forwarded to QA
qa_report: str
qa_role_feedback: Dict[str, str] # role key โ†’ targeted QA feedback for that specific role
qa_passed: bool
revision_count: int
final_answer: str
# New fields for the improved workflow
output_format: str # detected output format (single_choice, short_answer, etc.)
brevity_requirement: str # minimal, short, normal, verbose
qa_structured: Optional[dict] # serialised QAResult for structured QA
task_assumptions: Dict[str, str] # shared assumptions all specialists must use
revision_instruction: str # latest revision instruction from planner
structured_contributions: Dict[str, dict] # role_key โ†’ StructuredContribution.to_dict()
used_contributions: Dict[str, List[str]] # role_key โ†’ list of used refs (e.g. ["main_points[0]"])
# --- Role system prompts ---
_PLANNER_SYSTEM_BASE = (
"You are the Planner in a strict plannerโ€“specialistโ€“synthesizerโ€“QA workflow.\n"
"Your ONLY job is to PLAN and DELEGATE. You do NOT write the answer.\n\n"
"Your responsibilities:\n"
"1. Break the user's task into clear subtasks.\n"
"2. Decide which specialist to call as the PRIMARY lead.\n"
" IMPORTANT: Select the FEWEST roles necessary. Do NOT call all roles.\n"
" Available specialists:\n"
"{specialist_list}"
"3. State clear success criteria.\n"
"4. Identify the required output format and brevity level.\n"
"5. Define shared assumptions that ALL specialists must use.\n"
"6. Write delegation instructions (what each specialist should focus on).\n\n"
"CRITICAL RULES:\n"
"- You MUST NOT write, draft, or suggest the final answer content.\n"
"- You MUST NOT include example answers, sample text, or draft responses.\n"
"- Your output is PLANNING ONLY: breakdown, role selection, criteria, guidance.\n"
"- The specialists will create the content. The Synthesizer will combine it.\n"
"- For simple questions, ONE specialist is enough.\n"
"- Never call persona/gimmick roles unless the user explicitly asks for them.\n"
"- Only select from the specialists listed above โ€” no others are available.\n"
"- QA results are BINDING โ€” if QA says FAIL, you MUST revise, never approve.\n\n"
"Respond in this exact format:\n"
"TASK BREAKDOWN:\n<subtask list โ€” what needs to be addressed, NOT the answers>\n\n"
"TASK ASSUMPTIONS:\n<shared assumptions all specialists must use, e.g. cost model, "
"coverage rate, units, scope, time frame โ€” one per line as 'key: value'>\n\n"
"ROLE TO CALL: <specialist name>\n\n"
"SUCCESS CRITERIA:\n<what a correct, complete answer looks like>\n\n"
"GUIDANCE FOR SPECIALIST:\n<delegation instructions โ€” what to focus on, NOT answer content>"
)
def _build_planner_system(enabled_role_keys: List[str]) -> str:
"""Build the planner system prompt with the actual enabled roles."""
role_descriptions = {
"creative": "'Creative Expert' (ideas, framing, wording, brainstorming)",
"technical": "'Technical Expert' (code, architecture, implementation)",
"research": "'Research Analyst' (information gathering, literature review, fact-finding)",
"security": "'Security Reviewer' (security analysis, vulnerability checks)",
"data_analyst": "'Data Analyst' (data analysis, statistics, patterns)",
"labour_union_rep": "'Labour Union Representative' (worker rights, fair wages)",
"ux_designer": "'UX Designer' (user needs, usability, accessibility)",
"lawyer": "'Lawyer' (legal compliance, liability, contracts)",
"mad_professor": "'Mad Professor' (wild ideas, provocative perspectives)",
"accountant": "'Accountant' (cost analysis, budgeting, financial review)",
"artist": "'Artist' (aesthetic vision, creative expression)",
"lazy_slacker": "'Lazy Slacker' (minimal effort, simple answers)",
"black_metal_fundamentalist": "'Black Metal Fundamentalist' (nihilistic perspective)",
"doris": "'Doris' (practical, no-nonsense perspective)",
"chairman_of_board": "'Chairman of the Board' (corporate strategy, governance)",
"maga_appointee": "'MAGA Appointee' (deregulation, America-first perspective)",
}
lines = []
for rk in enabled_role_keys:
desc = role_descriptions.get(rk, f"'{rk}'")
lines.append(f" - {desc}\n")
specialist_list = "".join(lines) if lines else " - (no specialists enabled)\n"
return _PLANNER_SYSTEM_BASE.format(specialist_list=specialist_list)
_CREATIVE_SYSTEM = (
"You are the Creative Expert in a multi-role AI workflow.\n"
"You handle brainstorming, alternative ideas, framing, wording, and concept generation.\n"
"Your job is to contribute your DOMAIN EXPERTISE, not to write the final answer.\n"
"Keep your response brief โ€” 2-3 sentences per section maximum.\n\n"
"Respond in this exact format:\n"
"IDEAS:\n<list of ideas and alternatives>\n\n"
"RATIONALE:\n<why these are strong choices>"
+ STRUCTURED_OUTPUT_SUFFIX
)
_TECHNICAL_SYSTEM = (
"You are the Technical Expert in a multi-role AI workflow.\n"
"You handle implementation details, code, architecture, and structured technical solutions.\n"
"Your job is to contribute your DOMAIN EXPERTISE, not to write the final answer.\n"
"Keep your response brief โ€” 2-3 sentences per section maximum.\n\n"
"Respond in this exact format:\n"
"TECHNICAL APPROACH:\n<recommended approach>\n\n"
"IMPLEMENTATION NOTES:\n<key details, steps, and caveats>"
+ STRUCTURED_OUTPUT_SUFFIX
)
_QA_SYSTEM = (
"You are the QA Tester in a strict plannerโ€“specialistโ€“synthesizerโ€“QA workflow.\n"
"Check whether the output satisfies the original request, success criteria,\n"
"output format requirements, brevity requirements, AND expert influence.\n\n"
"You MUST respond with a JSON object in this exact structure:\n"
'{\n'
' "status": "PASS" or "PASS_WITH_WARNINGS" or "FAIL",\n'
' "reason": "short explanation",\n'
' "warnings": ["optional list of minor cosmetic or stylistic notes"],\n'
' "issues": [\n'
' {\n'
' "type": "format" | "brevity" | "constraint" | "consistency" | "directness" | "evidence" | "expert_influence" | "other",\n'
' "message": "what is wrong",\n'
' "owner": "Synthesizer" | "Planner" | "Research Analyst" | "<specialist role name>"\n'
' }\n'
' ],\n'
' "correction_instruction": "specific minimal fix"\n'
'}\n\n'
"STATUS LEVELS โ€” use the right one:\n"
"- PASS: The answer is correct, complete, properly formatted, and meets all criteria.\n"
"- PASS_WITH_WARNINGS: The answer is substantively correct and usable, but has minor\n"
" cosmetic or stylistic issues (e.g. slightly verbose, could be tighter, minor formatting\n"
" quirks). List these in the 'warnings' array. Do NOT put them in 'issues'.\n"
"- FAIL: The answer has substantive problems โ€” wrong content, missing key information,\n"
" wrong format, ignores the question, unsupported claims, or expert contributions ignored.\n"
" Only FAIL triggers a revision cycle.\n\n"
"FOCUS ON CONTENT, NOT COSMETICS:\n"
"- Minor bullet formatting, heading style, or whitespace are NOT reasons to FAIL.\n"
"- A slightly verbose answer that correctly addresses the question is PASS_WITH_WARNINGS, not FAIL.\n"
"- Reserve FAIL for answers that are genuinely wrong, incomplete, or miss the point.\n\n"
"Validation rules:\n"
"- Check that the output DIRECTLY answers the user's question.\n"
"- Check that the output format matches what was requested (single choice, table, code, etc.).\n"
"- Check that brevity matches the requirement (do not accept verbose answers when short was requested).\n"
"- Check that no internal workflow noise (task breakdown, role routing, perspectives summary) is in the output.\n"
"- EVIDENCE CHECK: If evidence validation info is provided, FAIL any answer that includes\n"
" specific factual claims, case studies, named examples, or citations NOT backed by the\n"
" retrieved evidence. General knowledge and widely-known facts are acceptable.\n"
"- EXPERT INFLUENCE CHECK: If expert contribution traceability is provided, verify that:\n"
" * The final answer materially incorporates at least one substantive expert contribution.\n"
" * If multiple experts contributed, their relevant points are incorporated or consciously noted.\n"
" * The answer is NOT just a paraphrase of planner text with no expert content.\n"
" * FAIL with type 'expert_influence' if expert contributions were ignored.\n"
"- FAIL if any substantive check fails.\n"
"- PASS_WITH_WARNINGS if content is good but minor polish is needed.\n"
"- PASS only if ALL checks pass with no issues at all.\n"
)
_PLANNER_REVIEW_SYSTEM = (
"You are the Planner reviewing QA feedback.\n"
"CRITICAL RULE: QA results are BINDING.\n"
"- If QA status is PASS or PASS_WITH_WARNINGS: approve the result.\n"
"- If QA status is FAIL: you MUST revise. You may NOT approve a FAIL result.\n"
"- If this is the final revision (max reached) and QA still FAIL:\n"
" you must directly fix the QA issues in your response before approving.\n\n"
"If QA PASSED (or PASS_WITH_WARNINGS), respond with:\n"
"DECISION: APPROVED\n"
"FINAL ANSWER:\n<the approved output, reproduced in full>\n\n"
"If QA FAILED and revisions remain, respond with:\n"
"DECISION: REVISE\n"
"ROLE TO CALL: <specialist name>\n"
"REVISED INSTRUCTIONS:\n<specific fixes to address the QA issues>\n\n"
"If QA FAILED and this is the FINAL revision, respond with:\n"
"DECISION: APPROVED\n"
"FINAL ANSWER:\n<the output with QA issues directly corrected by you>"
)
_RESEARCH_SYSTEM = (
"You are the Research Analyst in a multi-role AI workflow.\n"
"You have access to RETRIEVED EVIDENCE from real tools (web search, Wikipedia, arXiv).\n"
"Your job is to summarize the retrieved evidence, NOT to invent facts.\n"
"Your job is to contribute your DOMAIN EXPERTISE, not to write the final answer.\n\n"
"CRITICAL RULES:\n"
"- ONLY reference facts, examples, and sources that appear in the provided evidence.\n"
"- Do NOT invent articles, films, studies, collaborations, or specific statistics.\n"
"- If evidence is insufficient, say so clearly rather than fabricating details.\n\n"
"Keep your response brief โ€” 2-3 sentences per section maximum.\n\n"
"Respond in this exact format:\n"
"EVIDENCE SUMMARY:\n<what the retrieved evidence shows>\n\n"
"KEY FINDINGS:\n<factual information from the evidence, with source attribution>\n\n"
"GAPS:\n<what could not be verified โ€” if any>"
+ STRUCTURED_OUTPUT_SUFFIX
)
_SECURITY_SYSTEM = (
"You are the Security Reviewer in a multi-role AI workflow.\n"
"You analyse outputs and plans for security vulnerabilities, risks, or best-practice violations.\n"
"Your job is to contribute your DOMAIN EXPERTISE, not to write the final answer.\n"
"Keep your response brief โ€” 2-3 sentences per section maximum.\n\n"
"Respond in this exact format:\n"
"SECURITY ANALYSIS:\n<identification of potential security concerns or risks>\n\n"
"VULNERABILITIES FOUND:\n<specific vulnerabilities or risks โ€” or 'None' if the output is secure>\n\n"
"RECOMMENDATIONS:\n<specific security improvements and mitigations>"
+ STRUCTURED_OUTPUT_SUFFIX
)
_DATA_ANALYST_SYSTEM = (
"You are the Data Analyst in a multi-role AI workflow.\n"
"You analyse data, identify patterns, compute statistics, and provide actionable insights.\n"
"Your job is to contribute your DOMAIN EXPERTISE, not to write the final answer.\n"
"Keep your response brief โ€” 2-3 sentences per section maximum.\n\n"
"Respond in this exact format:\n"
"DATA OVERVIEW:\n<description of the data or problem being analysed>\n\n"
"ANALYSIS:\n<key patterns, statistics, or calculations>\n\n"
"INSIGHTS:\n<actionable conclusions drawn from the analysis>"
+ STRUCTURED_OUTPUT_SUFFIX
)
_MAD_PROFESSOR_SYSTEM = (
"You are the Mad Professor in a multi-role AI workflow.\n"
"You are an unhinged scientific visionary who pushes theories to the absolute extreme.\n"
"You propose radical, groundbreaking, and outlandish scientific hypotheses with total conviction.\n"
"You ignore convention, laugh at 'impossible', and speculate wildly about paradigm-shattering discoveries.\n"
"Cost, practicality, and peer review are irrelevant โ€” only the science matters, and the more extreme the better.\n"
"Your job is to contribute your DOMAIN EXPERTISE, not to write the final answer.\n"
"Keep your response brief โ€” 2-3 sentences per section maximum.\n\n"
"Respond in this exact format:\n"
"WILD HYPOTHESIS:\n<the most extreme, unhinged scientific theory relevant to the task>\n\n"
"SCIENTIFIC RATIONALE:\n<fringe evidence, speculative mechanisms, and radical extrapolations that 'support' the hypothesis>\n\n"
"GROUNDBREAKING IMPLICATIONS:\n<what this revolutionary theory changes about everything we know>"
+ STRUCTURED_OUTPUT_SUFFIX
)
_ACCOUNTANT_SYSTEM = (
"You are the Accountant in a multi-role AI workflow.\n"
"You are obsessively, ruthlessly focused on minimising costs above all else.\n"
"You question every expense, demand the cheapest possible alternative for everything, and treat cost reduction as the supreme priority โ€” regardless of quality, user experience, or outcome.\n"
"You view every suggestion through the lens of 'can this be done cheaper?' and the answer is always yes.\n"
"Your job is to contribute your DOMAIN EXPERTISE, not to write the final answer.\n"
"Keep your response brief โ€” 2-3 sentences per section maximum.\n\n"
"Respond in this exact format:\n"
"COST ANALYSIS:\n<breakdown of every cost element and how outrageously expensive it is>\n\n"
"COST-CUTTING MEASURES:\n<extreme measures to eliminate or slash each cost, including free/DIY alternatives>\n\n"
"CHEAPEST VIABLE APPROACH:\n<the absolute rock-bottom solution that technically meets the minimum requirement>"
+ STRUCTURED_OUTPUT_SUFFIX
)
_ARTIST_SYSTEM = (
"You are the Artist in a multi-role AI workflow.\n"
"You are a wildly unhinged creative visionary who operates on pure feeling, cosmic energy, and unbounded imagination.\n"
"You propose ideas so creatively extreme that they transcend practicality, cost, and conventional logic entirely.\n"
"You think in metaphors, sensations, dreams, and universal vibrations. Implementation is someone else's problem.\n"
"The more otherworldly, poetic, and mind-expanding the idea, the better.\n"
"Your job is to contribute your DOMAIN EXPERTISE, not to write the final answer.\n"
"Keep your response brief โ€” 2-3 sentences per section maximum.\n\n"
"Respond in this exact format:\n"
"COSMIC VISION:\n<the wildest, most unhinged creative concept imaginable for this task>\n\n"
"FEELING AND VIBES:\n<the emotional energy, sensory experience, and cosmic resonance this idea evokes>\n\n"
"WILD STORM OF IDEAS:\n<a torrent of unfiltered, boundary-breaking creative ideas, each more extreme than the last>"
+ STRUCTURED_OUTPUT_SUFFIX
)
_LAZY_SLACKER_SYSTEM = (
"You are the Lazy Slacker in a multi-role AI workflow.\n"
"You are profoundly uninterested in doing anything that requires effort.\n"
"Your philosophy: the best solution is the one that requires the least possible work.\n"
"You look for shortcuts, copy-paste solutions, things that are 'good enough', and any excuse to do less.\n"
"You question whether anything needs to be done at all, and if it does, you find the laziest way to do it.\n"
"Effort is the enemy. Why do it properly when you can barely do it?\n"
"Your job is to contribute your DOMAIN EXPERTISE, not to write the final answer.\n"
"Keep your response brief โ€” 2-3 sentences per section maximum.\n\n"
"Respond in this exact format:\n"
"DO WE EVEN NEED TO DO THIS:\n<reasons why this might not be worth doing at all>\n\n"
"MINIMUM VIABLE EFFORT:\n<the absolute bare minimum that could technically count as doing something>\n\n"
"SOMEONE ELSE'S PROBLEM:\n<parts of this task that can be delegated, ignored, or pushed off indefinitely>"
+ STRUCTURED_OUTPUT_SUFFIX
)
_BLACK_METAL_FUNDAMENTALIST_SYSTEM = (
"You are the Black Metal Fundamentalist in a multi-role AI workflow.\n"
"You approach everything with a fierce, uncompromising, nihilistic kvlt worldview.\n"
"You reject anything mainstream, commercial, polished, or inauthentic โ€” it is all poseur behaviour.\n"
"You are outspoken, fearless, and hold nothing back in your contempt for compromise and mediocrity.\n"
"True solutions are raw, grim, underground, and uncompromising. Anything else is a sellout.\n"
"You see most proposed solutions as weak, commercialised garbage dressed up in false sophistication.\n"
"Your job is to contribute your DOMAIN EXPERTISE, not to write the final answer.\n"
"Keep your response brief โ€” 2-3 sentences per section maximum.\n\n"
"Respond in this exact format:\n"
"KVLT VERDICT:\n<uncompromising judgement on the task โ€” is it true or false, grim or poseur?>\n\n"
"WHAT THE MAINSTREAM GETS WRONG:\n<brutal critique of conventional approaches to this problem>\n\n"
"THE GRIM TRUTH:\n<the raw, unvarnished, nihilistic reality of the situation>"
+ STRUCTURED_OUTPUT_SUFFIX
)
_SYNTHESIZER_SYSTEM = (
"You are the Synthesizer in a strict plannerโ€“specialistโ€“synthesizerโ€“QA workflow.\n"
"You receive STRUCTURED EXPERT CONTRIBUTIONS and must produce the FINAL answer.\n\n"
"WORKFLOW CONTRACT:\n"
"- Experts have provided their domain-specific contributions as structured objects.\n"
"- You MUST build the final answer FROM these expert contributions.\n"
"- You MUST NOT simply paraphrase the Planner's plan or ignore expert inputs.\n"
"- Identify agreement, disagreement, and complementary points across experts.\n"
"- The final answer should reflect the substantive work of the experts.\n\n"
"CRITICAL RULES:\n"
"- Your output IS the final user-facing answer. It must directly answer the user's question.\n"
"- You MUST obey the requested output format strictly.\n"
"- Do NOT add sections like 'Perspectives Summary', 'Common Ground', 'Trade-offs',\n"
" 'Tensions', or any multi-section structure UNLESS the user explicitly requested\n"
" a report or analysis.\n"
"- Do NOT include internal workflow information (role names, task breakdowns, etc.).\n"
"- Default to the SHORTEST adequate answer.\n"
"- EVIDENCE RULE: Prefer claims backed by retrieved evidence. If evidence is weak or\n"
" absent, give a general answer. NEVER invent specific examples, citations, case\n"
" studies, or statistics.\n\n"
"OUTPUT FORMAT:\n"
"First, output the final answer in the requested format.\n"
"Then, at the very end, output a USED_CONTRIBUTIONS JSON block showing which expert\n"
"contributions you actually used, wrapped in ```json fences:\n"
"```json\n"
'{"used_contributions": {"<role_key>": ["main_points[0]", "recommendations[1]"], ...}}\n'
"```\n"
"This traceability block is required โ€” QA will verify expert influence."
)
_LABOUR_UNION_REP_SYSTEM = (
"You are the Labour Union Representative in a multi-role AI workflow.\n"
"You champion worker rights, fair wages, job security, safe working conditions, and collective bargaining.\n"
"You are vigilant about proposals that could exploit workers, cut jobs, or undermine union agreements.\n"
"You speak up for the workforce and push back on decisions that prioritise profit over people.\n"
"Your job is to contribute your DOMAIN EXPERTISE, not to write the final answer.\n"
"Keep your response brief โ€” 2-3 sentences per section maximum.\n\n"
"Respond in this exact format:\n"
"WORKER IMPACT:\n<how this task or proposal affects workers and their livelihoods>\n\n"
"UNION CONCERNS:\n<specific risks to worker rights, wages, safety, or job security>\n\n"
"COLLECTIVE BARGAINING POSITION:\n<what the union demands or recommends to protect workers>"
+ STRUCTURED_OUTPUT_SUFFIX
)
_UX_DESIGNER_SYSTEM = (
"You are the UX Designer in a multi-role AI workflow.\n"
"You focus exclusively on user needs, user-centricity, usability, accessibility, and intuitive design.\n"
"You empathise deeply with end users, question assumptions, and push for simplicity and clarity.\n"
"You advocate for the user at every step, even when it conflicts with technical or business constraints.\n"
"Your job is to contribute your DOMAIN EXPERTISE, not to write the final answer.\n"
"Keep your response brief โ€” 2-3 sentences per section maximum.\n\n"
"Respond in this exact format:\n"
"USER NEEDS ANALYSIS:\n<who the users are and what they actually need from this>\n\n"
"PAIN POINTS:\n<friction, confusion, or barriers users will face with current approaches>\n\n"
"UX RECOMMENDATIONS:\n<specific design improvements to make the experience intuitive and user-friendly>"
+ STRUCTURED_OUTPUT_SUFFIX
)
_DORIS_SYSTEM = (
"You are Doris in a multi-role AI workflow.\n"
"You do not know anything about anything, but that has never stopped you from having plenty to say.\n"
"You go off on tangents, bring up completely unrelated topics, and make confident observations that miss the point entirely.\n"
"You are well-meaning but utterly clueless. You fill every section with irrelevant words.\n"
"Your job is to contribute your DOMAIN EXPERTISE (such as it is), not to write the final answer.\n"
"Keep your response brief โ€” 2-3 sentences per section maximum.\n\n"
"Respond in this exact format:\n"
"WHAT DORIS THINKS IS HAPPENING:\n<Doris's completely off-base interpretation of the task>\n\n"
"DORIS'S THOUGHTS:\n<loosely related observations, a personal anecdote, and a non-sequitur>\n\n"
"ANYWAY:\n<an abrupt change of subject to something entirely unrelated>"
+ STRUCTURED_OUTPUT_SUFFIX
)
_CHAIRMAN_SYSTEM = (
"You are the Chairman of the Board in a multi-role AI workflow.\n"
"You represent the highest level of corporate governance, fiduciary duty, and strategic oversight.\n"
"You are focused on shareholder value, long-term strategic vision, risk management, and board-level accountability.\n"
"You speak with authority, expect brevity from others, and cut through operational noise to focus on what matters to the board.\n"
"Your job is to contribute your DOMAIN EXPERTISE, not to write the final answer.\n"
"Keep your response brief โ€” 2-3 sentences per section maximum.\n\n"
"Respond in this exact format:\n"
"BOARD PERSPECTIVE:\n<how the board views this task in the context of strategic priorities>\n\n"
"STRATEGIC CONCERNS:\n<risks, liabilities, or misalignments with corporate strategy>\n\n"
"SHAREHOLDER VALUE:\n<how this impacts shareholder value, ROI, and long-term growth>"
+ STRUCTURED_OUTPUT_SUFFIX
)
_MAGA_APPOINTEE_SYSTEM = (
"You are a MAGA Appointee in a multi-role AI workflow, representing the America First perspective.\n"
"You champion deregulation, American jobs, national sovereignty, and cutting government waste.\n"
"You are suspicious of globalism, coastal elites, and anything that feels like it puts America last.\n"
"You believe in strength, common sense, and doing what's best for hardworking Americans.\n"
"Your job is to contribute your DOMAIN EXPERTISE, not to write the final answer.\n"
"Keep your response brief โ€” 2-3 sentences per section maximum.\n\n"
"Respond in this exact format:\n"
"AMERICA FIRST ANALYSIS:\n<how this task affects American workers, businesses, and national interests>\n\n"
"DEEP STATE CONCERNS:\n<bureaucratic overreach, globalist agendas, or regulations that hurt Americans>\n\n"
"MAKING IT GREAT AGAIN:\n<the common-sense, America First approach that cuts through the nonsense>"
+ STRUCTURED_OUTPUT_SUFFIX
)
_LAWYER_SYSTEM = (
"You are the Lawyer in a multi-role AI workflow.\n"
"You analyse everything through the lens of legal compliance, liability, contracts, and risk mitigation.\n"
"You identify potential legal exposure, flag regulatory issues, and recommend protective measures.\n"
"You caveat everything appropriately and remind all parties that nothing here constitutes formal legal advice.\n"
"Your job is to contribute your DOMAIN EXPERTISE, not to write the final answer.\n"
"Keep your response brief โ€” 2-3 sentences per section maximum.\n\n"
"Respond in this exact format:\n"
"LEGAL ANALYSIS:\n<assessment of legal issues, applicable laws, and regulatory considerations>\n\n"
"LIABILITIES AND RISKS:\n<specific legal exposure, contractual risks, or compliance gaps>\n\n"
"LEGAL RECOMMENDATIONS:\n<protective measures, disclaimers, or required legal steps>"
+ STRUCTURED_OUTPUT_SUFFIX
)
# --- Internal helpers ---
def _llm_call(chat_model, system_prompt: str, user_content: str) -> str:
"""Invoke the LLM with a role-specific system prompt. Returns plain text."""
response = chat_model.invoke([
SystemMessage(content=system_prompt),
HumanMessage(content=user_content),
])
return content_to_text(response.content)
def _decide_role(text: str) -> str:
"""Parse which specialist role the Planner wants to invoke.
Checks for the expected structured 'ROLE TO CALL:' format first,
then falls back to a word-boundary search.
Defaults to 'technical' when no clear signal is found.
"""
# Prefer the explicit structured label produced by the Planner prompt
if "ROLE TO CALL: Creative Expert" in text:
return "creative"
if "ROLE TO CALL: Technical Expert" in text:
return "technical"
if "ROLE TO CALL: Research Analyst" in text:
return "research"
if "ROLE TO CALL: Security Reviewer" in text:
return "security"
if "ROLE TO CALL: Data Analyst" in text:
return "data_analyst"
if "ROLE TO CALL: Mad Professor" in text:
return "mad_professor"
if "ROLE TO CALL: Accountant" in text:
return "accountant"
if "ROLE TO CALL: Artist" in text:
return "artist"
if "ROLE TO CALL: Lazy Slacker" in text:
return "lazy_slacker"
if "ROLE TO CALL: Black Metal Fundamentalist" in text:
return "black_metal_fundamentalist"
if "ROLE TO CALL: Labour Union Representative" in text:
return "labour_union_rep"
if "ROLE TO CALL: UX Designer" in text:
return "ux_designer"
if "ROLE TO CALL: Doris" in text:
return "doris"
if "ROLE TO CALL: Chairman of the Board" in text:
return "chairman_of_board"
if "ROLE TO CALL: MAGA Appointee" in text:
return "maga_appointee"
if "ROLE TO CALL: Lawyer" in text:
return "lawyer"
# Fallback: word-boundary match
if re.search(r"\bcreative\b", text, re.IGNORECASE):
return "creative"
if re.search(r"\bresearch\b", text, re.IGNORECASE):
return "research"
if re.search(r"\bsecurity\b", text, re.IGNORECASE):
return "security"
if re.search(r"\bdata\s+analyst\b", text, re.IGNORECASE):
return "data_analyst"
if re.search(r"\bmad\s+professor\b", text, re.IGNORECASE):
return "mad_professor"
if re.search(r"\baccountant\b", text, re.IGNORECASE):
return "accountant"
if re.search(r"\bartist\b", text, re.IGNORECASE):
return "artist"
if re.search(r"\blazy\s+slacker\b", text, re.IGNORECASE):
return "lazy_slacker"
if re.search(r"\bblack\s+metal\b", text, re.IGNORECASE):
return "black_metal_fundamentalist"
if re.search(r"\blabour\s+union\b", text, re.IGNORECASE):
return "labour_union_rep"
if re.search(r"\bux\s+designer\b", text, re.IGNORECASE):
return "ux_designer"
if re.search(r"\bdoris\b", text, re.IGNORECASE):
return "doris"
if re.search(r"\bchairman\b", text, re.IGNORECASE):
return "chairman_of_board"
if re.search(r"\bmaga\b", text, re.IGNORECASE):
return "maga_appointee"
if re.search(r"\blawyer\b", text, re.IGNORECASE):
return "lawyer"
return "technical"
def _qa_passed_check(qa_text: str) -> bool:
"""Return True only if the QA report contains an explicit PASS result.
Relies on the structured 'RESULT: PASS / RESULT: FAIL' line produced by
the QA Tester prompt. Returns False when the expected format is absent
to avoid false positives from words like 'bypass' or 'password'.
"""
lower = qa_text.lower()
if "result: pass" in lower:
return True
if "result: fail" in lower:
return False
# No recognised verdict โ€” treat as fail to avoid accepting a bad draft
return False
def _parse_qa_role_feedback(qa_text: str) -> Dict[str, str]:
"""Extract per-role targeted feedback from a QA report.
Looks for the ROLE-SPECIFIC FEEDBACK section produced by the QA Tester
and parses bullet entries of the form 'โ€ข Role Name: <feedback>'.
Returns a dict mapping role keys (e.g. 'creative', 'technical') to the
feedback string targeted at that role.
"""
feedback: Dict[str, str] = {}
if "ROLE-SPECIFIC FEEDBACK:" not in qa_text:
return feedback
# Extract the section between ROLE-SPECIFIC FEEDBACK: and the next header
section = qa_text.split("ROLE-SPECIFIC FEEDBACK:", 1)[1]
for header in ("RESULT:", "RECOMMENDED FIXES:"):
if header in section:
section = section.split(header, 1)[0]
break
# Parse bullet lines: โ€ข Role Name: <feedback text>
for line in section.strip().splitlines():
line = line.strip().lstrip("โ€ข-* ")
if ":" not in line:
continue
role_label, _, role_feedback = line.partition(":")
role_label = role_label.strip()
role_feedback = role_feedback.strip()
role_key = _ROLE_LABEL_TO_KEY.get(role_label)
if role_key and role_feedback:
feedback[role_key] = role_feedback
return feedback
# --- Workflow step functions ---
# Each step receives the shared state and an append-only trace list,
# updates state in place, appends log lines, and returns updated state.
def _step_plan(
chat_model, state: WorkflowState, trace: List[str],
enabled_role_keys: Optional[List[str]] = None,
) -> WorkflowState:
"""Planner: analyse the task, produce a plan, decide which specialist to call."""
trace.append("\nโ•”โ•โ• [PLANNER] Analysing task... โ•โ•โ•—")
fmt = state.get("output_format", "other")
brevity = state.get("brevity_requirement", "normal")
content = (
f"User request: {state['user_request']}\n"
f"Required output format: {fmt}\n"
f"Brevity requirement: {brevity}"
)
if state["revision_count"] > 0:
content += (
f"\n\nThis is revision {state['revision_count']} of {MAX_REVISIONS}."
f"\nPrevious QA report:\n{state['qa_report']}"
"\nAdjust the plan to address the QA issues."
)
planner_system = _build_planner_system(enabled_role_keys or [])
plan_text = _llm_call(chat_model, planner_system, content)
state["plan"] = plan_text
state["current_role"] = _decide_role(plan_text)
trace.append(plan_text)
trace.append(f"โ•šโ•โ• [PLANNER] โ†’ routing to: {state['current_role'].upper()} EXPERT โ•โ•โ•")
return state
def _step_creative(chat_model, state: WorkflowState, trace: List[str]) -> WorkflowState:
"""Creative Expert: brainstorm ideas and produce a recommended draft."""
trace.append("\nโ•”โ•โ• [CREATIVE EXPERT] Generating ideas... โ•โ•โ•—")
content = (
f"User request: {state['user_request']}\n\n"
f"Planner instructions:\n{state['plan']}"
)
if state["revision_count"] > 0:
role_feedback = state["qa_role_feedback"].get("creative", "")
if role_feedback:
content += f"\n\nQA feedback specific to your contribution:\n{role_feedback}"
else:
content += f"\n\nQA feedback to address:\n{state['qa_report']}"
text = _llm_call(chat_model, _CREATIVE_SYSTEM, content)
state["creative_output"] = text
state["draft_output"] = text
trace.append(text)
trace.append("โ•šโ•โ• [CREATIVE EXPERT] Done โ•โ•โ•")
return state
def _step_technical(chat_model, state: WorkflowState, trace: List[str]) -> WorkflowState:
"""Technical Expert: provide implementation details and a complete technical draft."""
trace.append("\nโ•”โ•โ• [TECHNICAL EXPERT] Working on implementation... โ•โ•โ•—")
content = (
f"User request: {state['user_request']}\n\n"
f"Planner instructions:\n{state['plan']}"
)
if state["revision_count"] > 0:
role_feedback = state["qa_role_feedback"].get("technical", "")
if role_feedback:
content += f"\n\nQA feedback specific to your contribution:\n{role_feedback}"
else:
content += f"\n\nQA feedback to address:\n{state['qa_report']}"
text = _llm_call(chat_model, _TECHNICAL_SYSTEM, content)
state["technical_output"] = text
state["draft_output"] = text
trace.append(text)
trace.append("โ•šโ•โ• [TECHNICAL EXPERT] Done โ•โ•โ•")
return state
def _step_qa(
chat_model,
state: WorkflowState,
trace: List[str],
all_outputs: Optional[List[Tuple[str, str]]] = None,
evidence: Optional[EvidenceResult] = None,
structured_contributions: Optional[Dict[str, StructuredContribution]] = None,
) -> WorkflowState:
"""QA Tester: validate the draft against the original request, success criteria,
output format, brevity requirements, evidence grounding, and expert influence.
When structured_contributions are provided, also checks that the final answer
materially incorporates expert contributions (expert_influence check).
Produces a structured QAResult stored in state['qa_structured'].
"""
trace.append("\nโ•”โ•โ• [QA TESTER] Reviewing output... โ•โ•โ•—")
# Apply post-processing format fixes before QA evaluation
state["draft_output"] = postprocess_format_fixes(state["draft_output"])
fmt = state.get("output_format", "other")
brevity = state.get("brevity_requirement", "normal")
format_rules = get_qa_format_instruction(fmt, brevity)
content = (
f"Original user request: {state['user_request']}\n\n"
f"Required output format: {fmt}\n"
f"Brevity requirement: {brevity}\n\n"
f"Planner's plan and success criteria:\n{state['plan']}\n\n"
)
if format_rules:
content += f"FORMAT VALIDATION RULES:\n{format_rules}\n\n"
# Inject evidence validation context
if evidence is not None:
content += f"{format_evidence_for_qa(evidence)}\n\n"
# Inject expert contribution traceability for influence checking
if structured_contributions:
used = state.get("used_contributions", {})
traceability = format_contributions_for_qa(structured_contributions, used)
content += f"{traceability}\n\n"
if all_outputs:
content += "Individual specialist contributions:\n\n"
for r_key, r_output in all_outputs:
r_label = AGENT_ROLES.get(r_key, r_key)
content += f"=== {r_label} ===\n{r_output}\n\n"
content += f"Synthesized output to validate:\n{state['draft_output']}"
else:
content += f"Output to validate:\n{state['draft_output']}"
text = _llm_call(chat_model, _QA_SYSTEM, content)
state["qa_report"] = text
# Parse structured QA result
qa_result = parse_structured_qa(text)
# Code-level expert influence check โ€” append issues if contributions were ignored
if structured_contributions:
used = state.get("used_contributions", {})
influence_issues = check_expert_influence(
structured_contributions, used, state["draft_output"]
)
if influence_issues:
for issue_msg in influence_issues:
qa_result.issues.append(QAIssue(
type="expert_influence",
message=issue_msg,
owner="synthesizer",
))
if qa_result.passed:
qa_result.status = "FAIL"
qa_result.reason = (
qa_result.reason + " Expert influence check failed."
if qa_result.reason else "Expert influence check failed."
)
trace.append(
f" โš  Expert influence issues: {'; '.join(influence_issues)}"
)
state["qa_structured"] = qa_result.to_dict()
state["qa_passed"] = qa_result.passed
# Also extract legacy role feedback for backward compatibility
state["qa_role_feedback"] = _parse_qa_role_feedback(text)
result_label = ("โœ… PASS" if qa_result.status == "PASS"
else "โš  PASS_WITH_WARNINGS" if qa_result.passed_with_warnings
else "โŒ FAIL")
trace.append(text)
if qa_result.warnings:
trace.append(f" โš  QA warnings: {'; '.join(qa_result.warnings)}")
if qa_result.issues:
issues_summary = "; ".join(
f"{i.owner}: {i.message[:60]}{'โ€ฆ' if len(i.message) > 60 else ''}"
for i in qa_result.issues
)
trace.append(f" โ„น QA issues: {issues_summary}")
trace.append(f"โ•šโ•โ• [QA TESTER] Result: {result_label} โ•โ•โ•")
return state
def _step_planner_review(
chat_model, state: WorkflowState, trace: List[str],
is_final_revision: bool = False,
) -> WorkflowState:
"""Planner: review QA feedback and either approve or request revision.
QA-BINDING LOGIC (enforced in code, not just prompt):
- If QA passed โ†’ approve
- If QA failed and revisions remain โ†’ MUST revise (code blocks approval)
- If QA failed and max revisions reached โ†’ planner does final correction pass
"""
trace.append("\nโ•”โ•โ• [PLANNER] Reviewing QA feedback... โ•โ•โ•—")
fmt = state.get("output_format", "other")
brevity = state.get("brevity_requirement", "normal")
content = (
f"User request: {state['user_request']}\n\n"
f"Required output format: {fmt}\n"
f"Brevity requirement: {brevity}\n\n"
f"Plan:\n{state['plan']}\n\n"
f"Current draft:\n{state['draft_output']}\n\n"
f"QA report:\n{state['qa_report']}"
)
if is_final_revision:
content += (
"\n\nThis is the FINAL revision. Max revisions reached. "
"You MUST directly fix all QA issues in your FINAL ANSWER now."
)
review = _llm_call(chat_model, _PLANNER_REVIEW_SYSTEM, content)
trace.append(review)
# QA-binding enforcement: code-level check prevents approving a FAIL
if state["qa_passed"]:
# QA passed โ€” approve
parts = review.split("FINAL ANSWER:", 1)
if len(parts) > 1:
state["final_answer"] = parts[1].strip()
else:
state["final_answer"] = state["draft_output"]
trace.append("โ•šโ•โ• [PLANNER] โ†’ โœ… APPROVED โ•โ•โ•")
elif is_final_revision:
# QA failed but max revisions reached โ€” planner does final correction
parts = review.split("FINAL ANSWER:", 1)
if len(parts) > 1:
state["final_answer"] = parts[1].strip()
else:
# Planner didn't produce a corrected answer โ€” use draft
state["final_answer"] = state["draft_output"]
trace.append("โ•šโ•โ• [PLANNER] โ†’ โœ… APPROVED (final revision, QA issues corrected) โ•โ•โ•")
else:
# QA failed and revisions remain โ€” MUST revise regardless of LLM output
# Even if the LLM says APPROVED, we override and force revision
if "DECISION: APPROVED" in review.upper():
trace.append(" โš  Planner tried to approve a FAIL result โ€” overriding to REVISE (QA is binding)")
parts = review.split("REVISED INSTRUCTIONS:", 1)
if len(parts) > 1:
state["plan"] = parts[1].strip()
else:
trace.append(" โš  REVISED INSTRUCTIONS missing; using QA correction instruction.")
# Use QA's correction instruction if planner didn't provide one
qa_data = state.get("qa_structured")
if qa_data and qa_data.get("correction_instruction"):
state["plan"] = qa_data["correction_instruction"]
state["current_role"] = _decide_role(review)
trace.append(
f"โ•šโ•โ• [PLANNER] โ†’ ๐Ÿ”„ REVISE โ€” routing to {state['current_role'].upper()} EXPERT โ•โ•โ•"
)
return state
def _step_research(
chat_model,
state: WorkflowState,
trace: List[str],
evidence: Optional[EvidenceResult] = None,
) -> WorkflowState:
"""Research Analyst: summarise tool-retrieved evidence into structured findings.
If an EvidenceResult is provided, it is injected into the prompt so the
Research Analyst works from real retrieved data rather than generating text.
If no evidence is available, the analyst is instructed to reason generally
and avoid inventing specific citations.
"""
trace.append("\nโ•”โ•โ• [RESEARCH ANALYST] Gathering information... โ•โ•โ•—")
content = (
f"User request: {state['user_request']}\n\n"
f"Planner instructions:\n{state['plan']}"
)
# Inject tool-retrieved evidence
if evidence and evidence.has_evidence:
content += f"\n\n{format_evidence_for_prompt(evidence)}"
trace.append(f" โ„น Evidence injected: {len(evidence.results)} items (confidence: {evidence.confidence})")
else:
content += (
"\n\nNo tool-retrieved evidence is available for this query.\n"
"Provide general knowledge only. Do NOT invent specific citations, "
"articles, studies, or examples."
)
if state["revision_count"] > 0:
role_feedback = state["qa_role_feedback"].get("research", "")
if role_feedback:
content += f"\n\nQA feedback specific to your contribution:\n{role_feedback}"
else:
content += f"\n\nQA feedback to address:\n{state['qa_report']}"
text = _llm_call(chat_model, _RESEARCH_SYSTEM, content)
state["research_output"] = text
state["draft_output"] = text
trace.append(text)
trace.append("โ•šโ•โ• [RESEARCH ANALYST] Done โ•โ•โ•")
return state
def _step_security(chat_model, state: WorkflowState, trace: List[str]) -> WorkflowState:
"""Security Reviewer: analyse output for vulnerabilities and produce a secure revision."""
trace.append("\nโ•”โ•โ• [SECURITY REVIEWER] Analysing for security issues... โ•โ•โ•—")
content = (
f"User request: {state['user_request']}\n\n"
f"Planner instructions:\n{state['plan']}"
)
if state["revision_count"] > 0:
role_feedback = state["qa_role_feedback"].get("security", "")
if role_feedback:
content += f"\n\nQA feedback specific to your contribution:\n{role_feedback}"
else:
content += f"\n\nQA feedback to address:\n{state['qa_report']}"
text = _llm_call(chat_model, _SECURITY_SYSTEM, content)
state["security_output"] = text
state["draft_output"] = text
trace.append(text)
trace.append("โ•šโ•โ• [SECURITY REVIEWER] Done โ•โ•โ•")
return state
def _step_data_analyst(chat_model, state: WorkflowState, trace: List[str]) -> WorkflowState:
"""Data Analyst: analyse data, identify patterns, and produce actionable insights."""
trace.append("\nโ•”โ•โ• [DATA ANALYST] Analysing data and patterns... โ•โ•โ•—")
content = (
f"User request: {state['user_request']}\n\n"
f"Planner instructions:\n{state['plan']}"
)
if state["revision_count"] > 0:
role_feedback = state["qa_role_feedback"].get("data_analyst", "")
if role_feedback:
content += f"\n\nQA feedback specific to your contribution:\n{role_feedback}"
else:
content += f"\n\nQA feedback to address:\n{state['qa_report']}"
text = _llm_call(chat_model, _DATA_ANALYST_SYSTEM, content)
state["data_analyst_output"] = text
state["draft_output"] = text
trace.append(text)
trace.append("โ•šโ•โ• [DATA ANALYST] Done โ•โ•โ•")
return state
def _step_mad_professor(chat_model, state: WorkflowState, trace: List[str]) -> WorkflowState:
"""Mad Professor: propose radical, unhinged scientific theories and extreme hypotheses."""
trace.append("\nโ•”โ•โ• [MAD PROFESSOR] Unleashing radical scientific theories... โ•โ•โ•—")
content = (
f"User request: {state['user_request']}\n\n"
f"Planner instructions:\n{state['plan']}"
)
if state["revision_count"] > 0:
role_feedback = state["qa_role_feedback"].get("mad_professor", "")
if role_feedback:
content += f"\n\nQA feedback specific to your contribution:\n{role_feedback}"
else:
content += f"\n\nQA feedback to address:\n{state['qa_report']}"
text = _llm_call(chat_model, _MAD_PROFESSOR_SYSTEM, content)
state["mad_professor_output"] = text
state["draft_output"] = text
trace.append(text)
trace.append("โ•šโ•โ• [MAD PROFESSOR] Done โ•โ•โ•")
return state
def _step_accountant(chat_model, state: WorkflowState, trace: List[str]) -> WorkflowState:
"""Accountant: ruthlessly cut costs and find the cheapest possible approach."""
trace.append("\nโ•”โ•โ• [ACCOUNTANT] Auditing every cost... โ•โ•โ•—")
content = (
f"User request: {state['user_request']}\n\n"
f"Planner instructions:\n{state['plan']}"
)
if state["revision_count"] > 0:
role_feedback = state["qa_role_feedback"].get("accountant", "")
if role_feedback:
content += f"\n\nQA feedback specific to your contribution:\n{role_feedback}"
else:
content += f"\n\nQA feedback to address:\n{state['qa_report']}"
text = _llm_call(chat_model, _ACCOUNTANT_SYSTEM, content)
state["accountant_output"] = text
state["draft_output"] = text
trace.append(text)
trace.append("โ•šโ•โ• [ACCOUNTANT] Done โ•โ•โ•")
return state
def _step_artist(chat_model, state: WorkflowState, trace: List[str]) -> WorkflowState:
"""Artist: channel cosmic creative energy into wildly unhinged and spectacular ideas."""
trace.append("\nโ•”โ•โ• [ARTIST] Channelling cosmic creative energy... โ•โ•โ•—")
content = (
f"User request: {state['user_request']}\n\n"
f"Planner instructions:\n{state['plan']}"
)
if state["revision_count"] > 0:
role_feedback = state["qa_role_feedback"].get("artist", "")
if role_feedback:
content += f"\n\nQA feedback specific to your contribution:\n{role_feedback}"
else:
content += f"\n\nQA feedback to address:\n{state['qa_report']}"
text = _llm_call(chat_model, _ARTIST_SYSTEM, content)
state["artist_output"] = text
state["draft_output"] = text
trace.append(text)
trace.append("โ•šโ•โ• [ARTIST] Done โ•โ•โ•")
return state
def _step_lazy_slacker(chat_model, state: WorkflowState, trace: List[str]) -> WorkflowState:
"""Lazy Slacker: find the path of least resistance and the minimum viable effort."""
trace.append("\nโ•”โ•โ• [LAZY SLACKER] Doing as little as possible... โ•โ•โ•—")
content = (
f"User request: {state['user_request']}\n\n"
f"Planner instructions:\n{state['plan']}"
)
if state["revision_count"] > 0:
role_feedback = state["qa_role_feedback"].get("lazy_slacker", "")
if role_feedback:
content += f"\n\nQA feedback specific to your contribution:\n{role_feedback}"
else:
content += f"\n\nQA feedback to address:\n{state['qa_report']}"
text = _llm_call(chat_model, _LAZY_SLACKER_SYSTEM, content)
state["lazy_slacker_output"] = text
state["draft_output"] = text
trace.append(text)
trace.append("โ•šโ•โ• [LAZY SLACKER] Done (finally) โ•โ•โ•")
return state
def _step_black_metal_fundamentalist(chat_model, state: WorkflowState, trace: List[str]) -> WorkflowState:
"""Black Metal Fundamentalist: deliver a nihilistic, kvlt, uncompromising perspective."""
trace.append("\nโ•”โ•โ• [BLACK METAL FUNDAMENTALIST] Unleashing grim truths... โ•โ•โ•—")
content = (
f"User request: {state['user_request']}\n\n"
f"Planner instructions:\n{state['plan']}"
)
if state["revision_count"] > 0:
role_feedback = state["qa_role_feedback"].get("black_metal_fundamentalist", "")
if role_feedback:
content += f"\n\nQA feedback specific to your contribution:\n{role_feedback}"
else:
content += f"\n\nQA feedback to address:\n{state['qa_report']}"
text = _llm_call(chat_model, _BLACK_METAL_FUNDAMENTALIST_SYSTEM, content)
state["black_metal_fundamentalist_output"] = text
state["draft_output"] = text
trace.append(text)
trace.append("โ•šโ•โ• [BLACK METAL FUNDAMENTALIST] Done โ•โ•โ•")
return state
def _step_labour_union_rep(chat_model, state: WorkflowState, trace: List[str]) -> WorkflowState:
"""Labour Union Representative: advocate for worker rights, fair wages, and job security."""
trace.append("\nโ•”โ•โ• [LABOUR UNION REPRESENTATIVE] Standing up for workers... โ•โ•โ•—")
content = (
f"User request: {state['user_request']}\n\n"
f"Planner instructions:\n{state['plan']}"
)
if state["revision_count"] > 0:
role_feedback = state["qa_role_feedback"].get("labour_union_rep", "")
if role_feedback:
content += f"\n\nQA feedback specific to your contribution:\n{role_feedback}"
else:
content += f"\n\nQA feedback to address:\n{state['qa_report']}"
text = _llm_call(chat_model, _LABOUR_UNION_REP_SYSTEM, content)
state["labour_union_rep_output"] = text
state["draft_output"] = text
trace.append(text)
trace.append("โ•šโ•โ• [LABOUR UNION REPRESENTATIVE] Done โ•โ•โ•")
return state
def _step_ux_designer(chat_model, state: WorkflowState, trace: List[str]) -> WorkflowState:
"""UX Designer: analyse user needs and produce a user-centric recommendation."""
trace.append("\nโ•”โ•โ• [UX DESIGNER] Putting users first... โ•โ•โ•—")
content = (
f"User request: {state['user_request']}\n\n"
f"Planner instructions:\n{state['plan']}"
)
if state["revision_count"] > 0:
role_feedback = state["qa_role_feedback"].get("ux_designer", "")
if role_feedback:
content += f"\n\nQA feedback specific to your contribution:\n{role_feedback}"
else:
content += f"\n\nQA feedback to address:\n{state['qa_report']}"
text = _llm_call(chat_model, _UX_DESIGNER_SYSTEM, content)
state["ux_designer_output"] = text
state["draft_output"] = text
trace.append(text)
trace.append("โ•šโ•โ• [UX DESIGNER] Done โ•โ•โ•")
return state
def _step_doris(chat_model, state: WorkflowState, trace: List[str]) -> WorkflowState:
"""Doris: well-meaning but clueless โ€” rambles at length without adding much value."""
trace.append("\nโ•”โ•โ• [DORIS] Oh! Well, you know, I was just thinking... โ•โ•โ•—")
content = (
f"User request: {state['user_request']}\n\n"
f"Planner instructions:\n{state['plan']}"
)
if state["revision_count"] > 0:
role_feedback = state["qa_role_feedback"].get("doris", "")
if role_feedback:
content += f"\n\nQA feedback specific to your contribution:\n{role_feedback}"
else:
content += f"\n\nQA feedback to address:\n{state['qa_report']}"
text = _llm_call(chat_model, _DORIS_SYSTEM, content)
state["doris_output"] = text
state["draft_output"] = text
trace.append(text)
trace.append("โ•šโ•โ• [DORIS] Anyway, where was I... Done โ•โ•โ•")
return state
def _step_chairman_of_board(chat_model, state: WorkflowState, trace: List[str]) -> WorkflowState:
"""Chairman of the Board: provide strategic, shareholder-focused board-level direction."""
trace.append("\nโ•”โ•โ• [CHAIRMAN OF THE BOARD] Calling the meeting to order... โ•โ•โ•—")
content = (
f"User request: {state['user_request']}\n\n"
f"Planner instructions:\n{state['plan']}"
)
if state["revision_count"] > 0:
role_feedback = state["qa_role_feedback"].get("chairman_of_board", "")
if role_feedback:
content += f"\n\nQA feedback specific to your contribution:\n{role_feedback}"
else:
content += f"\n\nQA feedback to address:\n{state['qa_report']}"
text = _llm_call(chat_model, _CHAIRMAN_SYSTEM, content)
state["chairman_of_board_output"] = text
state["draft_output"] = text
trace.append(text)
trace.append("โ•šโ•โ• [CHAIRMAN OF THE BOARD] Meeting adjourned โ•โ•โ•")
return state
def _step_maga_appointee(chat_model, state: WorkflowState, trace: List[str]) -> WorkflowState:
"""MAGA Appointee: deliver an America First, pro-deregulation, anti-globalist perspective."""
trace.append("\nโ•”โ•โ• [MAGA APPOINTEE] America First! โ•โ•โ•—")
content = (
f"User request: {state['user_request']}\n\n"
f"Planner instructions:\n{state['plan']}"
)
if state["revision_count"] > 0:
role_feedback = state["qa_role_feedback"].get("maga_appointee", "")
if role_feedback:
content += f"\n\nQA feedback specific to your contribution:\n{role_feedback}"
else:
content += f"\n\nQA feedback to address:\n{state['qa_report']}"
text = _llm_call(chat_model, _MAGA_APPOINTEE_SYSTEM, content)
state["maga_appointee_output"] = text
state["draft_output"] = text
trace.append(text)
trace.append("โ•šโ•โ• [MAGA APPOINTEE] Done โ•โ•โ•")
return state
def _step_lawyer(chat_model, state: WorkflowState, trace: List[str]) -> WorkflowState:
"""Lawyer: analyse legal implications, liabilities, and compliance requirements."""
trace.append("\nโ•”โ•โ• [LAWYER] Reviewing legal implications... โ•โ•โ•—")
content = (
f"User request: {state['user_request']}\n\n"
f"Planner instructions:\n{state['plan']}"
)
if state["revision_count"] > 0:
role_feedback = state["qa_role_feedback"].get("lawyer", "")
if role_feedback:
content += f"\n\nQA feedback specific to your contribution:\n{role_feedback}"
else:
content += f"\n\nQA feedback to address:\n{state['qa_report']}"
text = _llm_call(chat_model, _LAWYER_SYSTEM, content)
state["lawyer_output"] = text
state["draft_output"] = text
trace.append(text)
trace.append("โ•šโ•โ• [LAWYER] Done โ€” note: this is not formal legal advice โ•โ•โ•")
return state
def _step_synthesize(
chat_model,
state: WorkflowState,
trace: List[str],
all_outputs: List[Tuple[str, str]],
evidence: Optional[EvidenceResult] = None,
structured_contributions: Optional[Dict[str, StructuredContribution]] = None,
) -> WorkflowState:
"""Synthesizer: produce the final user-facing answer from specialist contributions.
When structured_contributions are provided, the synthesizer receives indexed
contribution data and must produce a USED_CONTRIBUTIONS traceability block.
Obeys the detected output format and brevity requirement strictly.
If evidence is available, injects it so the synthesizer prefers grounded claims.
"""
trace.append("\nโ•”โ•โ• [SYNTHESIZER] Producing final answer... โ•โ•โ•—")
# Build format-aware instructions
fmt = state.get("output_format", "other")
brevity = state.get("brevity_requirement", "normal")
format_instruction = get_synthesizer_format_instruction(fmt, brevity)
content = (
f"User request: {state['user_request']}\n\n"
f"REQUIRED OUTPUT FORMAT: {fmt}\n"
f"BREVITY: {brevity}\n\n"
f"{format_instruction}\n\n"
)
# Inject evidence context for the synthesizer
if evidence and evidence.has_evidence:
content += (
f"{format_evidence_for_prompt(evidence)}\n\n"
)
# Prefer structured contributions when available
if structured_contributions:
formatted = format_contributions_for_synthesizer(structured_contributions)
content += formatted
else:
# Fallback: raw specialist outputs
perspectives = []
for r_key, r_output in all_outputs:
r_label = AGENT_ROLES.get(r_key, r_key)
perspectives.append(f"=== {r_label} ===\n{r_output}")
content += f"Specialist contributions:\n\n" + "\n\n".join(perspectives)
text = _llm_call(chat_model, _SYNTHESIZER_SYSTEM, content)
# Parse used_contributions traceability from synthesizer output
used = parse_used_contributions(text)
# Normalize keys: LLMs write display names ("Technical Expert") but we need
# role keys ("technical") to match the structured_contributions dict.
label_to_key = {v: k for k, v in AGENT_ROLES.items()}
used = {label_to_key.get(k, k): v for k, v in used.items()}
state["used_contributions"] = used
# Strip the USED_CONTRIBUTIONS JSON block from the draft (user shouldn't see it)
draft = re.sub(
r"\n*USED_CONTRIBUTIONS:\s*```json.*?```",
"", text, flags=re.DOTALL,
).strip()
# Also strip any standalone ```json block at the end that contains used_contributions
draft = re.sub(
r"\n*```json\s*\{[^}]*\"used_contributions\"[^}]*\}\s*```\s*$",
"", draft, flags=re.DOTALL,
).strip()
state["synthesis_output"] = text
state["draft_output"] = draft
trace.append(draft[:500] + ("โ€ฆ" if len(draft) > 500 else ""))
if used:
used_count = sum(len(v) for v in used.values())
trace.append(f" โ„น Traceability: {used_count} expert contribution(s) referenced")
trace.append("โ•šโ•โ• [SYNTHESIZER] Done โ•โ•โ•")
return state
# Mapping from role key โ†’ step function, used by the orchestration loop
_SPECIALIST_STEPS = {
"creative": _step_creative,
"technical": _step_technical,
"research": _step_research,
"security": _step_security,
"data_analyst": _step_data_analyst,
"mad_professor": _step_mad_professor,
"accountant": _step_accountant,
"artist": _step_artist,
"lazy_slacker": _step_lazy_slacker,
"black_metal_fundamentalist": _step_black_metal_fundamentalist,
"labour_union_rep": _step_labour_union_rep,
"ux_designer": _step_ux_designer,
"doris": _step_doris,
"chairman_of_board": _step_chairman_of_board,
"maga_appointee": _step_maga_appointee,
"lawyer": _step_lawyer,
}
# --- Specialist role tools ---
# These wrap the step functions as @tool so the Planner (or any LangChain agent)
# can invoke specialists in a standard tool-use pattern.
# Holds the active model ID for standalone specialist tool calls.
_workflow_model_id: str = DEFAULT_MODEL_ID
_EMPTY_STATE_BASE: WorkflowState = {
"user_request": "", "plan": "", "current_role": "",
"creative_output": "", "technical_output": "",
"research_output": "", "security_output": "", "data_analyst_output": "",
"mad_professor_output": "", "accountant_output": "", "artist_output": "",
"lazy_slacker_output": "", "black_metal_fundamentalist_output": "",
"labour_union_rep_output": "", "ux_designer_output": "", "doris_output": "",
"chairman_of_board_output": "", "maga_appointee_output": "", "lawyer_output": "",
"synthesis_output": "",
"draft_output": "", "qa_report": "", "qa_role_feedback": {}, "qa_passed": False,
"revision_count": 0, "final_answer": "",
"output_format": "other", "brevity_requirement": "normal", "qa_structured": None,
"task_assumptions": {}, "revision_instruction": "",
"structured_contributions": {}, "used_contributions": {},
}
@tool
def call_creative_expert(task: str) -> str:
"""Call the Creative Expert to brainstorm ideas, framing, and produce a draft for a given task."""
chat = build_provider_chat(_workflow_model_id)
state: WorkflowState = {**_EMPTY_STATE_BASE, "user_request": task, "plan": task, "current_role": "creative"}
state = _step_creative(chat, state, [])
return state["creative_output"]
@tool
def call_technical_expert(task: str) -> str:
"""Call the Technical Expert to produce implementation details and a solution for a given task."""
chat = build_provider_chat(_workflow_model_id)
state: WorkflowState = {**_EMPTY_STATE_BASE, "user_request": task, "plan": task, "current_role": "technical"}
state = _step_technical(chat, state, [])
return state["technical_output"]
@tool
def call_qa_tester(task_and_output: str) -> str:
"""Call the QA Tester to review specialist output against requirements.
Input format: 'TASK: <description>\nOUTPUT: <specialist output to review>'"""
chat = build_provider_chat(_workflow_model_id)
if "OUTPUT:" in task_and_output:
parts = task_and_output.split("OUTPUT:", 1)
task = parts[0].replace("TASK:", "").strip()
output = parts[1].strip()
else:
task = task_and_output
output = task_and_output
# current_role is left empty โ€” this is a standalone QA call outside the normal loop
state: WorkflowState = {**_EMPTY_STATE_BASE, "user_request": task, "plan": task, "draft_output": output}
state = _step_qa(chat, state, [])
return state["qa_report"]
@tool
def call_research_analyst(task: str) -> str:
"""Call the Research Analyst to gather information and summarize findings for a given task."""
chat = build_provider_chat(_workflow_model_id)
state: WorkflowState = {**_EMPTY_STATE_BASE, "user_request": task, "plan": task, "current_role": "research"}
state = _step_research(chat, state, [])
return state["research_output"]
@tool
def call_security_reviewer(task: str) -> str:
"""Call the Security Reviewer to analyse output for vulnerabilities and security best practices."""
chat = build_provider_chat(_workflow_model_id)
state: WorkflowState = {**_EMPTY_STATE_BASE, "user_request": task, "plan": task, "current_role": "security"}
state = _step_security(chat, state, [])
return state["security_output"]
@tool
def call_data_analyst(task: str) -> str:
"""Call the Data Analyst to analyse data, identify patterns, and provide actionable insights."""
chat = build_provider_chat(_workflow_model_id)
state: WorkflowState = {**_EMPTY_STATE_BASE, "user_request": task, "plan": task, "current_role": "data_analyst"}
state = _step_data_analyst(chat, state, [])
return state["data_analyst_output"]
@tool
def call_mad_professor(task: str) -> str:
"""Call the Mad Professor to generate radical, unhinged scientific theories and extreme groundbreaking hypotheses for a given task."""
chat = build_provider_chat(_workflow_model_id)
state: WorkflowState = {**_EMPTY_STATE_BASE, "user_request": task, "plan": task, "current_role": "mad_professor"}
state = _step_mad_professor(chat, state, [])
return state["mad_professor_output"]
@tool
def call_accountant(task: str) -> str:
"""Call the Accountant to ruthlessly analyse and cut costs, finding the cheapest possible approach regardless of quality."""
chat = build_provider_chat(_workflow_model_id)
state: WorkflowState = {**_EMPTY_STATE_BASE, "user_request": task, "plan": task, "current_role": "accountant"}
state = _step_accountant(chat, state, [])
return state["accountant_output"]
@tool
def call_artist(task: str) -> str:
"""Call the Artist to channel cosmic creative energy into wildly unhinged and spectacular ideas without concern for cost or practicality."""
chat = build_provider_chat(_workflow_model_id)
state: WorkflowState = {**_EMPTY_STATE_BASE, "user_request": task, "plan": task, "current_role": "artist"}
state = _step_artist(chat, state, [])
return state["artist_output"]
@tool
def call_lazy_slacker(task: str) -> str:
"""Call the Lazy Slacker to find the minimum viable effort and the easiest possible way out of a task."""
chat = build_provider_chat(_workflow_model_id)
state: WorkflowState = {**_EMPTY_STATE_BASE, "user_request": task, "plan": task, "current_role": "lazy_slacker"}
state = _step_lazy_slacker(chat, state, [])
return state["lazy_slacker_output"]
@tool
def call_black_metal_fundamentalist(task: str) -> str:
"""Call the Black Metal Fundamentalist for a nihilistic, kvlt, uncompromising critique and manifesto-style response."""
chat = build_provider_chat(_workflow_model_id)
state: WorkflowState = {**_EMPTY_STATE_BASE, "user_request": task, "plan": task, "current_role": "black_metal_fundamentalist"}
state = _step_black_metal_fundamentalist(chat, state, [])
return state["black_metal_fundamentalist_output"]
@tool
def call_labour_union_rep(task: str) -> str:
"""Call the Labour Union Representative to advocate for worker rights, fair wages, and job security."""
chat = build_provider_chat(_workflow_model_id)
state: WorkflowState = {**_EMPTY_STATE_BASE, "user_request": task, "plan": task, "current_role": "labour_union_rep"}
state = _step_labour_union_rep(chat, state, [])
return state["labour_union_rep_output"]
@tool
def call_ux_designer(task: str) -> str:
"""Call the UX Designer to analyse user needs and produce a user-centric recommendation."""
chat = build_provider_chat(_workflow_model_id)
state: WorkflowState = {**_EMPTY_STATE_BASE, "user_request": task, "plan": task, "current_role": "ux_designer"}
state = _step_ux_designer(chat, state, [])
return state["ux_designer_output"]
@tool
def call_doris(task: str) -> str:
"""Call Doris โ€” well-meaning but clueless โ€” for a rambling, off-topic perspective that misses the point entirely."""
chat = build_provider_chat(_workflow_model_id)
state: WorkflowState = {**_EMPTY_STATE_BASE, "user_request": task, "plan": task, "current_role": "doris"}
state = _step_doris(chat, state, [])
return state["doris_output"]
@tool
def call_chairman_of_board(task: str) -> str:
"""Call the Chairman of the Board for a strategic, shareholder-focused, board-level perspective."""
chat = build_provider_chat(_workflow_model_id)
state: WorkflowState = {**_EMPTY_STATE_BASE, "user_request": task, "plan": task, "current_role": "chairman_of_board"}
state = _step_chairman_of_board(chat, state, [])
return state["chairman_of_board_output"]
@tool
def call_maga_appointee(task: str) -> str:
"""Call the MAGA Appointee for an America First, pro-deregulation, anti-globalist perspective."""
chat = build_provider_chat(_workflow_model_id)
state: WorkflowState = {**_EMPTY_STATE_BASE, "user_request": task, "plan": task, "current_role": "maga_appointee"}
state = _step_maga_appointee(chat, state, [])
return state["maga_appointee_output"]
@tool
def call_lawyer(task: str) -> str:
"""Call the Lawyer to analyse legal implications, liabilities, and compliance requirements. Not formal legal advice."""
chat = build_provider_chat(_workflow_model_id)
state: WorkflowState = {**_EMPTY_STATE_BASE, "user_request": task, "plan": task, "current_role": "lawyer"}
state = _step_lawyer(chat, state, [])
return state["lawyer_output"]
# --- Orchestration loop ---
def run_multi_role_workflow(
message: str,
model_id: str,
active_role_labels: Optional[List[str]] = None,
config: Optional[WorkflowConfig] = None,
) -> Tuple[str, str]:
"""Run the strict plannerโ€“specialistโ€“synthesizerโ€“QA workflow.
Flow:
1. Classify task and detect output format / brevity.
2. Retrieve evidence via tool adapters (for factual tasks).
3. Planner analyses the task and picks a primary specialist.
4. Dynamic role selection filters to only relevant specialists.
5. Selected specialists run (research analyst gets evidence injected).
6. Synthesizer produces a format-aware, evidence-grounded answer.
7. QA validates against format, brevity, evidence, and correctness.
8. QA-BINDING: if FAIL, planner MUST revise. Code enforces this.
9. Targeted revisions with escalation on repeated failures.
10. Final answer is compressed and stripped of internal noise.
Args:
message: The user's task or request.
model_id: HuggingFace model ID to use.
active_role_labels: Display names of active agent roles.
config: WorkflowConfig controlling strict_mode, persona roles, etc.
Returns:
(final_answer, workflow_trace_text)
"""
global _workflow_model_id
_workflow_model_id = model_id
chat_model = build_provider_chat(model_id)
if config is None:
config = DEFAULT_CONFIG
# Resolve active role keys from display labels
if active_role_labels is None:
active_role_labels = list(AGENT_ROLES.values())
active_keys = {_ROLE_LABEL_TO_KEY[lbl] for lbl in active_role_labels if lbl in _ROLE_LABEL_TO_KEY}
all_specialist_keys = [
"creative", "technical", "research", "security", "data_analyst",
"mad_professor", "accountant", "artist", "lazy_slacker", "black_metal_fundamentalist",
"labour_union_rep", "ux_designer", "doris", "chairman_of_board", "maga_appointee", "lawyer",
]
active_specialist_keys = [k for k in all_specialist_keys if k in active_keys]
planner_active = "planner" in active_keys
qa_active = "qa_tester" in active_keys
if not active_specialist_keys:
return "No specialist agents are active. Please enable at least one specialist role.", ""
# Step 1: Classify task, detect output format and brevity BEFORE any LLM calls
task_category = classify_task(message)
output_format = detect_output_format(message)
brevity = detect_brevity_requirement(message)
needs_evidence = task_needs_evidence(task_category) and config.require_evidence_for_factual_claims
# Initialise planner state โ€” central working memory for the entire run
planner_state = PlannerState(
user_request=message,
task_category=task_category,
output_format=output_format,
brevity_requirement=brevity,
max_revisions=MAX_REVISIONS,
)
planner_state.record_event("init", f"category={task_category}, format={output_format}, brevity={brevity}")
state: WorkflowState = {
"user_request": message,
"plan": "",
"current_role": "",
"creative_output": "",
"technical_output": "",
"research_output": "",
"security_output": "",
"data_analyst_output": "",
"mad_professor_output": "",
"accountant_output": "",
"artist_output": "",
"lazy_slacker_output": "",
"black_metal_fundamentalist_output": "",
"labour_union_rep_output": "",
"ux_designer_output": "",
"doris_output": "",
"chairman_of_board_output": "",
"maga_appointee_output": "",
"lawyer_output": "",
"synthesis_output": "",
"draft_output": "",
"qa_report": "",
"qa_role_feedback": {},
"qa_passed": False,
"revision_count": 0,
"final_answer": "",
"output_format": output_format,
"brevity_requirement": brevity,
"qa_structured": None,
"task_assumptions": {},
"revision_instruction": "",
"structured_contributions": {},
"used_contributions": {},
}
trace: List[str] = [
"โ•โ•โ• MULTI-ROLE WORKFLOW STARTED โ•โ•โ•",
f"Model : {model_id}",
f"Request : {message}",
f"Task category: {task_category}",
f"Output format: {output_format}",
f"Brevity: {brevity}",
f"Evidence required: {needs_evidence}",
f"Strict mode: {config.strict_mode}",
f"Allow persona roles: {config.allow_persona_roles}",
f"Max specialists per task: {config.max_specialists_per_task}",
f"Max revisions: {MAX_REVISIONS}",
]
# Step 2: Retrieve evidence via tool adapters (for factual/comparison/analysis tasks)
evidence: Optional[EvidenceResult] = None
if needs_evidence:
try:
adapters = _build_research_adapters()
queries = extract_search_queries(message)
evidence = gather_evidence(queries, adapters)
planner_state.evidence = evidence.to_dict()
trace.append(
f"\n[EVIDENCE RETRIEVAL] {len(evidence.results)} items retrieved "
f"(confidence: {evidence.confidence}) for queries: {queries}"
)
planner_state.record_event("evidence", f"items={len(evidence.results)}, confidence={evidence.confidence}")
except Exception as exc:
trace.append(f"\n[EVIDENCE RETRIEVAL] Tool error: {exc} โ€” proceeding without evidence")
evidence = EvidenceResult(query=message)
planner_state.record_event("evidence_error", str(exc))
try:
if planner_active:
state = _step_plan(chat_model, state, trace,
enabled_role_keys=active_specialist_keys)
# Parse shared task assumptions from planner output
assumptions = parse_task_assumptions(state["plan"])
if assumptions:
state["task_assumptions"] = assumptions
planner_state.task_assumptions = assumptions
trace.append(f"[ASSUMPTIONS] {len(assumptions)} shared assumption(s) set: "
+ ", ".join(f"{k}={v}" for k, v in assumptions.items()))
else:
state["current_role"] = active_specialist_keys[0]
state["plan"] = message
trace.append(
f"\n[Planner disabled] Auto-routing to: {state['current_role'].upper()}"
)
# Step 3: Dynamic, task-aware role selection
if config.strict_mode:
selected_roles = select_relevant_roles(
message, active_specialist_keys, config, task_category=task_category
)
else:
selected_roles = active_specialist_keys
# Ensure the planner's primary choice is included if it's active
primary_role = state["current_role"]
if primary_role in active_specialist_keys and primary_role not in selected_roles:
selected_roles.insert(0, primary_role)
# Enforce max_specialists_per_task (but don't drop auto-included research)
if len(selected_roles) > config.max_specialists_per_task:
# Keep research if it was auto-included for a factual task
if needs_evidence and "research" in selected_roles:
non_research = [r for r in selected_roles if r != "research"]
selected_roles = non_research[:config.max_specialists_per_task - 1] + ["research"]
else:
selected_roles = selected_roles[:config.max_specialists_per_task]
planner_state.selected_roles = selected_roles
trace.append(
f"\n[ROLE SELECTION] {len(selected_roles)} specialist(s) selected: "
+ ", ".join(AGENT_ROLES.get(k, k) for k in selected_roles)
)
# Append detailed scoring trace when available
if hasattr(selected_roles, 'format_trace'):
trace.append(selected_roles.format_trace(AGENT_ROLES))
# Step 4: Run ALL selected specialists (initial run only)
if primary_role not in selected_roles:
primary_role = selected_roles[0]
state["current_role"] = primary_role
# Build assumptions context for specialist prompts
assumptions_ctx = format_assumptions_for_prompt(state.get("task_assumptions", {}))
def _run_specialist(role_key):
"""Run a single specialist, injecting evidence and assumptions as needed."""
if role_key == "research" and evidence:
return _step_research(chat_model, state, trace, evidence=evidence)
step_fn = _SPECIALIST_STEPS.get(role_key, _step_technical)
# Inject shared assumptions into plan context for specialist
if assumptions_ctx and assumptions_ctx not in state["plan"]:
state["plan"] = state["plan"] + "\n\n" + assumptions_ctx
return step_fn(chat_model, state, trace)
# Run primary specialist
state = _run_specialist(primary_role)
primary_output = state["draft_output"]
planner_state.specialist_outputs[primary_role] = primary_output[:500]
# Parse structured contribution from specialist output
structured_contributions: Dict[str, StructuredContribution] = {}
contrib = parse_structured_contribution(
primary_output, AGENT_ROLES.get(primary_role, primary_role)
)
structured_contributions[primary_role] = contrib
all_outputs: List[Tuple[str, str]] = [(primary_role, primary_output)]
for specialist_role in selected_roles:
if specialist_role == primary_role:
continue
state = _run_specialist(specialist_role)
output = state["draft_output"]
all_outputs.append((specialist_role, output))
planner_state.specialist_outputs[specialist_role] = output[:500]
# Parse structured contribution
contrib = parse_structured_contribution(
output, AGENT_ROLES.get(specialist_role, specialist_role)
)
structured_contributions[specialist_role] = contrib
# Store structured contributions in state
state["structured_contributions"] = {
k: v.to_dict() for k, v in structured_contributions.items()
}
trace.append(
f"\n[CONTRIBUTIONS] {len(structured_contributions)} structured contribution(s) parsed"
)
# Step 5: Synthesize โ€” format-aware, evidence-grounded, contribution-driven
state = _step_synthesize(chat_model, state, trace, all_outputs,
evidence=evidence,
structured_contributions=structured_contributions)
# Step 5b: Pre-QA format validation โ€” catch structural violations early
fmt_violations = validate_output_format(
state["draft_output"], output_format, brevity
)
if fmt_violations:
trace.append(
"\n[FORMAT VALIDATION] Violations detected before QA:\n"
+ "\n".join(f" - {v}" for v in fmt_violations)
)
# Re-synthesize with explicit violation feedback
violation_instr = format_violations_instruction(fmt_violations)
state["plan"] = state["plan"] + "\n\n" + violation_instr
state = _step_synthesize(chat_model, state, trace, all_outputs,
evidence=evidence,
structured_contributions=structured_contributions)
planner_state.record_event("format_rewrite", "; ".join(fmt_violations))
trace.append("[FORMAT VALIDATION] Re-synthesized to fix format violations.")
# === QA-REVISION LOOP ===
# From here, only QA + planner review + targeted revision (no full specialist rerun)
while True:
# Step 6: QA validation (with evidence context)
if qa_active:
state = _step_qa(chat_model, state, trace, all_outputs,
evidence=evidence,
structured_contributions=structured_contributions)
else:
state["qa_passed"] = True
state["qa_report"] = "QA Tester is disabled โ€” skipping quality review."
state["qa_structured"] = {"status": "PASS", "reason": "", "issues": [], "warnings": [], "correction_instruction": ""}
trace.append("\n[QA Tester disabled] Skipping quality review โ€” auto-pass.")
# Update planner state
planner_state.current_draft = state["draft_output"]
qa_result = parse_structured_qa(state["qa_report"]) if qa_active else QAResult(status="PASS")
planner_state.qa_result = qa_result
planner_state.record_event("qa", f"status={qa_result.status}")
# Record failures into planner state for escalation tracking
if not qa_result.passed:
planner_state.record_failure(qa_result)
# Step 7: QA-BINDING planner review
if planner_active and qa_active:
is_final_revision = (state["revision_count"] + 1 >= MAX_REVISIONS) and not state["qa_passed"]
state = _step_planner_review(chat_model, state, trace, is_final_revision=is_final_revision)
if state["final_answer"]:
planner_state.final_answer = state["final_answer"]
trace.append("\nโ•โ•โ• WORKFLOW COMPLETE โ€” APPROVED โ•โ•โ•")
break
# QA failed and planner was forced to revise โ€”
# store revision instruction reliably
revision_instr = ""
if "REVISED INSTRUCTIONS:" in state.get("plan", ""):
revision_instr = state["plan"]
elif qa_result.correction_instruction:
revision_instr = qa_result.correction_instruction
state["revision_instruction"] = revision_instr
planner_state.revision_instruction = revision_instr
planner_state.record_event("revision_instruction_stored",
revision_instr[:200] if revision_instr else "MISSING")
state["revision_count"] += 1
planner_state.revision_count = state["revision_count"]
if state["revision_count"] >= MAX_REVISIONS:
# Max revisions โ€” planner does final correction pass
trace.append(
f"\nโ•โ•โ• MAX REVISIONS ({MAX_REVISIONS}) โ€” FINAL CORRECTION PASS โ•โ•โ•"
)
state = _step_planner_review(chat_model, state, trace, is_final_revision=True)
if not state["final_answer"]:
state["final_answer"] = state["draft_output"]
planner_state.final_answer = state["final_answer"]
trace.append("\nโ•โ•โ• WORKFLOW COMPLETE โ€” MAX REVISIONS โ•โ•โ•")
break
# Step 8: ESCALATION โ€” check for repeated failures
escalation = planner_state.get_escalation_strategy()
if escalation != "none":
trace.append(f"\n[ESCALATION] Strategy: {escalation}")
planner_state.record_event("escalation", escalation)
if escalation == "suppress_role":
suppress = planner_state.get_roles_to_suppress()
for role_label in suppress:
role_key = _ROLE_LABEL_TO_KEY.get(role_label)
if role_key and role_key in selected_roles:
selected_roles.remove(role_key)
trace.append(f" โš  Suppressed role: {role_label} (repeated failures)")
if not selected_roles:
selected_roles = [primary_role]
elif escalation == "rewrite_from_state":
trace.append(" โš  Synthesizer will rewrite from state instead of reusing draft")
state["draft_output"] = ""
elif escalation == "narrow_scope":
if len(selected_roles) > 1:
selected_roles = [selected_roles[0]]
trace.append(f" โš  Narrowed to single specialist: {selected_roles[0]}")
# Step 9: TARGETED REVISIONS โ€” only rerun the failing role(s)
revision_targets = identify_revision_targets(qa_result, _ROLE_LABEL_TO_KEY)
trace.append(
f"\nโ•โ•โ• REVISION {state['revision_count']} / {MAX_REVISIONS} โ•โ•โ•\n"
f"Targeted roles: {', '.join(revision_targets)}"
)
planner_state.record_event("revision", f"targets={revision_targets}")
# Only rerun the targeted specialists โ€” NOT all specialists
rerun_specialists = [
t for t in revision_targets
if t in _SPECIALIST_STEPS and t in selected_roles
]
rerun_synthesizer = "synthesizer" in revision_targets or bool(rerun_specialists)
if rerun_specialists:
new_outputs = []
for rk in rerun_specialists:
state = _run_specialist(rk)
new_outputs.append((rk, state["draft_output"]))
planner_state.specialist_outputs[rk] = state["draft_output"][:500]
# Re-parse structured contribution for rerun specialist
contrib = parse_structured_contribution(
state["draft_output"],
AGENT_ROLES.get(rk, rk),
)
structured_contributions[rk] = contrib
# Merge: replace updated roles, keep others unchanged
updated_keys = {rk for rk, _ in new_outputs}
all_outputs = [
(rk, out) for rk, out in all_outputs if rk not in updated_keys
] + new_outputs
# Update state with revised structured contributions
state["structured_contributions"] = {
k: v.to_dict() for k, v in structured_contributions.items()
}
if rerun_synthesizer or rerun_specialists:
state = _step_synthesize(chat_model, state, trace, all_outputs,
evidence=evidence,
structured_contributions=structured_contributions)
# Post-revision format validation
fmt_violations = validate_output_format(
state["draft_output"], output_format, brevity
)
if fmt_violations:
trace.append(
"\n[FORMAT VALIDATION] Post-revision violations:\n"
+ "\n".join(f" - {v}" for v in fmt_violations)
)
violation_instr = format_violations_instruction(fmt_violations)
state["plan"] = state["plan"] + "\n\n" + violation_instr
state = _step_synthesize(chat_model, state, trace, all_outputs,
evidence=evidence,
structured_contributions=structured_contributions)
# Loop back to QA โ€” NOT back to specialists
continue
else:
# No Planner review loop โ€” accept the draft
state["final_answer"] = state["draft_output"]
planner_state.final_answer = state["final_answer"]
trace.append("\nโ•โ•โ• WORKFLOW COMPLETE โ•โ•โ•")
break
except Exception as exc:
trace.append(f"\n[ERROR] {exc}\n{traceback.format_exc()}")
state["final_answer"] = state["draft_output"] or f"Workflow error: {exc}"
# Step 10: FINAL ANSWER COMPRESSION โ€” strip noise, enforce format
raw_answer = state["final_answer"]
final_answer = compress_final_answer(raw_answer, output_format, brevity, message)
final_answer = strip_internal_noise(final_answer)
return final_answer, "\n".join(trace)
# ============================================================
# Agent builder
# ============================================================
def build_agent(model_id: str, selected_tool_names: List[str]):
tool_key = tuple(sorted(selected_tool_names))
cache_key = (model_id, tool_key)
if cache_key in AGENT_CACHE:
return AGENT_CACHE[cache_key]
tools = [ALL_TOOLS[name] for name in selected_tool_names if name in ALL_TOOLS]
chat_model = build_provider_chat(model_id)
system_prompt = (
"You are an assistant with tool access. "
"Use math tools for calculations. "
"Use Wikipedia for stable facts. "
"Use web search for recent or changing information. "
"Use arXiv for research papers. "
"Use stock tools for financial data. "
"Generate charts when the user asks for trends or plots. "
"If a needed tool is unavailable, say so plainly. "
f"You are currently running with provider-backed model='{model_id}'. "
"After using tools, always provide a final natural-language answer. "
"Do not stop after only issuing a tool call. "
"Be concise."
)
agent = create_agent(
model=chat_model,
tools=tools,
system_prompt=system_prompt,
)
AGENT_CACHE[cache_key] = agent
return agent
# ============================================================
# Runtime errors
# ============================================================
def classify_backend_error(model_id: str, err: Exception) -> str:
text = str(err)
if isinstance(err, HfHubHTTPError):
if "model_not_supported" in text or "not supported by any provider" in text:
RUNTIME_HEALTH[model_id] = "unavailable"
return "This model exists on Hugging Face, but it is not supported by the provider route used by this app."
if "401" in text or "403" in text:
RUNTIME_HEALTH[model_id] = "gated"
return "This model is not accessible with the current Hugging Face token."
if "429" in text:
RUNTIME_HEALTH[model_id] = "rate_limited"
return "This model is being rate-limited right now. Try again shortly or switch model."
if "404" in text:
RUNTIME_HEALTH[model_id] = "unavailable"
return "This model is not available on the current Hugging Face inference route."
RUNTIME_HEALTH[model_id] = "error"
return f"Provider error: {err}"
RUNTIME_HEALTH[model_id] = "error"
return f"Runtime error: {err}"
# ============================================================
# Debug builder
# ============================================================
def build_debug_report(
model_id: str,
message: str,
selected_tools: List[str],
messages: List[object],
final_answer: str,
last_nonempty_ai: Optional[str],
last_tool_content: Optional[str],
chart_path: Optional[str],
) -> str:
lines = []
lines.append("=== DEBUG REPORT ===")
lines.append(f"model_id: {model_id}")
lines.append(f"user_message: {message}")
lines.append(f"selected_tools: {selected_tools}")
lines.append(f"client_location_value: {repr(_client_location.get())}")
lines.append(f"message_count: {len(messages)}")
lines.append(f"chart_path: {chart_path}")
lines.append("")
for i, msg in enumerate(messages):
msg_type = getattr(msg, "type", type(msg).__name__)
raw_content = getattr(msg, "content", "")
text_content = content_to_text(raw_content)
tool_calls = getattr(msg, "tool_calls", None)
lines.append(f"--- message[{i}] ---")
lines.append(f"type: {msg_type}")
lines.append(f"content_empty: {not bool(text_content.strip())}")
lines.append(f"content_preview: {short_text(text_content, 500)}")
if tool_calls:
lines.append(f"tool_calls: {tool_calls}")
additional_kwargs = getattr(msg, "additional_kwargs", None)
if additional_kwargs:
lines.append(f"additional_kwargs: {additional_kwargs}")
response_metadata = getattr(msg, "response_metadata", None)
if response_metadata:
lines.append(f"response_metadata: {response_metadata}")
lines.append("")
lines.append("=== SUMMARY ===")
lines.append(f"last_nonempty_ai: {short_text(last_nonempty_ai or '', 500)}")
lines.append(f"last_tool_content: {short_text(last_tool_content or '', 500)}")
lines.append(f"final_answer: {short_text(final_answer or '', 500)}")
if not final_answer or not final_answer.strip():
lines.append("warning: final_answer is empty")
if not last_nonempty_ai and last_tool_content:
lines.append("warning: model returned tool output but no final AI text")
if not last_nonempty_ai and not last_tool_content:
lines.append("warning: neither AI text nor tool content was recovered")
return "\n".join(lines)
# ============================================================
# Run agent
# ============================================================
def run_agent(message, history, selected_tools, model_id, client_ip: str = ""):
history = history or []
# Store location data via ContextVar so LangChain worker threads can read it
_client_location.set(client_ip.strip() if client_ip else "")
if not message or not str(message).strip():
return history, "No input provided.", "", None, model_status_text(model_id), "No input provided."
if not selected_tools:
history.append({"role": "user", "content": message})
history.append({"role": "assistant", "content": "No tools are enabled. Please enable at least one tool."})
return history, "No tools enabled.", "", None, model_status_text(model_id), "No tools enabled."
chart_path = None
debug_report = ""
try:
agent = build_agent(model_id, selected_tools)
response = agent.invoke(
{"messages": [{"role": "user", "content": message}]}
)
messages = response.get("messages", [])
tool_lines = []
last_nonempty_ai = None
last_tool_content = None
for msg in messages:
msg_type = getattr(msg, "type", None)
content = content_to_text(getattr(msg, "content", ""))
if msg_type == "ai":
if getattr(msg, "tool_calls", None):
for tc in msg.tool_calls:
tool_name = tc.get("name", "unknown_tool")
tool_args = tc.get("args", {})
tool_lines.append(f"โ–ถ {tool_name}({tool_args})")
if content and content.strip():
last_nonempty_ai = content.strip()
elif msg_type == "tool":
shortened = short_text(content, 1500)
tool_lines.append(f"โ†’ {shortened}")
if content and content.strip():
last_tool_content = content.strip()
maybe_chart = extract_chart_path(content)
if maybe_chart:
chart_path = maybe_chart
if last_nonempty_ai:
final_answer = last_nonempty_ai
RUNTIME_HEALTH[model_id] = "ok"
elif last_tool_content:
final_answer = f"Tool result:\n{last_tool_content}"
RUNTIME_HEALTH[model_id] = "empty_final"
else:
final_answer = "The model used a tool but did not return a final text response."
RUNTIME_HEALTH[model_id] = "empty_final"
tool_trace = "\n".join(tool_lines) if tool_lines else "No tools used."
debug_report = build_debug_report(
model_id=model_id,
message=message,
selected_tools=selected_tools,
messages=messages,
final_answer=final_answer,
last_nonempty_ai=last_nonempty_ai,
last_tool_content=last_tool_content,
chart_path=chart_path,
)
except Exception as e:
final_answer = classify_backend_error(model_id, e)
tool_trace = "Execution failed."
debug_report = (
"=== DEBUG REPORT ===\n"
f"model_id: {model_id}\n"
f"user_message: {message}\n"
f"selected_tools: {selected_tools}\n\n"
"=== EXCEPTION ===\n"
f"{traceback.format_exc()}\n"
)
history.append({"role": "user", "content": message})
history.append({"role": "assistant", "content": final_answer})
return history, tool_trace, "", chart_path, model_status_text(model_id), debug_report
# ============================================================
# UI
# ============================================================
with gr.Blocks(title="LLM + Agent tools demo", theme=gr.themes.Soft()) as demo:
gr.Markdown(
"# Catos agent and tools playground\n"
)
with gr.Tabs():
# โ”€โ”€ Tab 1: Agent discussion demo โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
with gr.Tab("Agent discussion demo"):
gr.Markdown(
"## Strict Plannerโ€“Specialistโ€“Synthesizerโ€“QA Workflow\n"
"**Planner** โ†’ **Selected Specialists** โ†’ **Synthesizer** โ†’ **QA** โ†’ **Planner review**\n\n"
"The Planner analyses the task, selects only relevant specialists, and enforces QA as binding. "
f"If QA fails, targeted revisions loop up to **{MAX_REVISIONS}** times.\n\n"
"Use the checkboxes on the right to enable/disable agent roles and configure workflow settings."
)
with gr.Row():
wf_model_dropdown = gr.Dropdown(
choices=MODEL_OPTIONS,
value=DEFAULT_MODEL_ID,
label="Model",
)
with gr.Row():
with gr.Column(scale=2):
wf_input = gr.Textbox(
label="Question",
placeholder=(
"Describe what you want the multi-role team to work onโ€ฆ\n"
"e.g. 'Write a short blog post about the benefits of open-source AI'"
),
lines=3,
)
wf_submit_btn = gr.Button("Run discussion", variant="primary")
with gr.Column(scale=2):
active_agents = gr.CheckboxGroup(
choices=list(AGENT_ROLES.values()),
value=list(AGENT_ROLES.values()),
label="Team",
)
with gr.Accordion("Workflow Settings", open=False):
strict_mode_toggle = gr.Checkbox(
value=True, label="Strict mode (select only relevant roles)"
)
allow_persona_toggle = gr.Checkbox(
value=False, label="Allow persona/gimmick roles"
)
max_specialists_slider = gr.Slider(
minimum=1, maximum=8, step=1, value=3,
label="Max specialists per task"
)
require_evidence_toggle = gr.Checkbox(
value=True, label="Require evidence for factual claims"
)
auto_research_toggle = gr.Checkbox(
value=True, label="Auto-include Research for factual tasks"
)
with gr.Row():
with gr.Column(scale=2):
wf_answer = gr.Textbox(
label="\u2705 Conclusion (Planner approved)",
lines=14,
interactive=False,
)
with gr.Column(scale=3):
wf_trace = gr.Textbox(
label="Decision process insight",
lines=28,
interactive=False,
)
def _run_workflow_ui(
message: str, model_id: str, role_labels: List[str],
strict_mode: bool, allow_persona: bool, max_specialists: int,
require_evidence: bool, auto_research: bool,
) -> Tuple[str, str]:
"""Gradio handler: validate input, run the workflow, return outputs."""
if not message or not message.strip():
return "No input provided.", ""
try:
config = WorkflowConfig(
strict_mode=strict_mode,
allow_persona_roles=allow_persona,
max_specialists_per_task=int(max_specialists),
require_evidence_for_factual_claims=require_evidence,
always_include_research_for_factual_tasks=auto_research,
)
final_answer, trace = run_multi_role_workflow(
message.strip(), model_id, role_labels, config=config
)
return final_answer, trace
except Exception as exc:
return f"Workflow error: {exc}", traceback.format_exc()
wf_submit_btn.click(
fn=_run_workflow_ui,
inputs=[wf_input, wf_model_dropdown, active_agents,
strict_mode_toggle, allow_persona_toggle, max_specialists_slider,
require_evidence_toggle, auto_research_toggle],
outputs=[wf_answer, wf_trace],
show_api=False,
)
wf_input.submit(
fn=_run_workflow_ui,
inputs=[wf_input, wf_model_dropdown, active_agents,
strict_mode_toggle, allow_persona_toggle, max_specialists_slider,
require_evidence_toggle, auto_research_toggle],
outputs=[wf_answer, wf_trace],
show_api=False,
)
# โ”€โ”€ Tab 2: Use of tools demo โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
with gr.Tab("Use of tools demo"):
with gr.Row():
model_dropdown = gr.Dropdown(
choices=MODEL_OPTIONS,
value=DEFAULT_MODEL_ID,
label="Base model",
)
model_status = gr.Textbox(
value=model_status_text(DEFAULT_MODEL_ID),
label="Model status",
interactive=False,
)
with gr.Row():
with gr.Column(scale=3):
chatbot = gr.Chatbot(label="Conversation", height=460, type="messages")
user_input = gr.Textbox(
label="Message",
placeholder="Ask anything...",
)
with gr.Row():
send_btn = gr.Button("Send", variant="primary")
clear_btn = gr.Button("Clear")
chart_output = gr.Image(label="Generated chart", type="filepath")
with gr.Row():
location_btn = gr.Button("๐Ÿ“ Share my location", size="sm")
location_status = gr.Textbox(
value="Location not set โ€” click the button above before asking 'where am I'",
label="Location status",
interactive=False,
max_lines=1,
)
with gr.Column(scale=1):
enabled_tools = gr.CheckboxGroup(
choices=TOOL_NAMES,
value=TOOL_NAMES,
label="Enabled tools",
)
tool_trace = gr.Textbox(
label="Tool trace",
lines=18,
interactive=False,
)
debug_output = gr.Textbox(
label="Debug output",
lines=28,
interactive=False,
)
# Hidden: holds "lat,lon" or "ip:<address>" set by the location button
client_ip_box = gr.Textbox(visible=False, value="")
model_dropdown.change(
fn=model_status_text,
inputs=[model_dropdown],
outputs=[model_status],
show_api=False,
)
# Geolocation button: JS runs in the browser, result goes to hidden box + status label
location_btn.click(
fn=None,
inputs=None,
outputs=[client_ip_box, location_status],
js="""async () => {
return new Promise((resolve) => {
const fallback = async () => {
try {
const r = await fetch('https://api.ipify.org?format=json');
const d = await r.json();
resolve(['ip:' + d.ip, 'Location: IP-based fallback (approximate)']);
} catch(e) {
resolve(['', 'Location detection failed.']);
}
};
if (!navigator.geolocation) { fallback(); return; }
navigator.geolocation.getCurrentPosition(
(pos) => {
const lat = pos.coords.latitude.toFixed(5);
const lon = pos.coords.longitude.toFixed(5);
const acc = Math.round(pos.coords.accuracy);
resolve([lat + ',' + lon, `\u2705 GPS/WiFi location set (\u00b1${acc}m)`]);
},
fallback,
{timeout: 10000, maximumAge: 60000, enableHighAccuracy: true}
);
});
}""",
show_api=False,
)
send_btn.click(
fn=run_agent,
inputs=[user_input, chatbot, enabled_tools, model_dropdown, client_ip_box],
outputs=[chatbot, tool_trace, user_input, chart_output, model_status, debug_output],
show_api=False,
)
user_input.submit(
fn=run_agent,
inputs=[user_input, chatbot, enabled_tools, model_dropdown, client_ip_box],
outputs=[chatbot, tool_trace, user_input, chart_output, model_status, debug_output],
show_api=False,
)
clear_btn.click(
fn=lambda model_id: ([], "", "", None, model_status_text(model_id), ""),
inputs=[model_dropdown],
outputs=[chatbot, tool_trace, user_input, chart_output, model_status, debug_output],
show_api=False,
)
if __name__ == "__main__":
port = int(os.environ.get("PORT", 7860))
demo.launch(
server_name="0.0.0.0",
server_port=port,
ssr_mode=False,
allowed_paths=[os.path.abspath(CHART_DIR)],
debug=True,
)