|
|
"""3-stage LLM Council orchestration.""" |
|
|
|
|
|
from typing import List, Dict, Any, Tuple |
|
|
from .openrouter import query_models_parallel, query_model, query_model_stream |
|
|
from .config import COUNCIL_MODELS, CHAIRMAN_MODEL |
|
|
|
|
|
|
|
|
async def stage1_collect_responses(user_query: str) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Stage 1: Collect individual responses from all council models. |
|
|
|
|
|
Args: |
|
|
user_query: The user's question |
|
|
|
|
|
Returns: |
|
|
List of dicts with 'model' and 'response' keys |
|
|
""" |
|
|
print("STAGE 1: Collecting individual responses from council members...") |
|
|
messages = [{"role": "user", "content": user_query}] |
|
|
|
|
|
|
|
|
responses = await query_models_parallel(COUNCIL_MODELS, messages) |
|
|
|
|
|
|
|
|
stage1_results = [] |
|
|
for model, response in responses.items(): |
|
|
if response is not None: |
|
|
stage1_results.append({"model": model, "response": response.get("content", "")}) |
|
|
|
|
|
print(f"STAGE 1 COMPLETE: Received {len(stage1_results)} responses.") |
|
|
return stage1_results |
|
|
|
|
|
|
|
|
async def stage2_collect_rankings( |
|
|
user_query: str, stage1_results: List[Dict[str, Any]] |
|
|
) -> Tuple[List[Dict[str, Any]], Dict[str, str]]: |
|
|
""" |
|
|
Stage 2: Each model ranks the anonymized responses. |
|
|
|
|
|
Args: |
|
|
user_query: The original user query |
|
|
stage1_results: Results from Stage 1 |
|
|
|
|
|
Returns: |
|
|
Tuple of (rankings list, label_to_model mapping) |
|
|
""" |
|
|
print("STAGE 2: Council members are ranking each other's responses...") |
|
|
|
|
|
labels = [chr(65 + i) for i in range(len(stage1_results))] |
|
|
|
|
|
|
|
|
label_to_model = {f"Response {label}": result["model"] for label, result in zip(labels, stage1_results)} |
|
|
|
|
|
|
|
|
responses_text = "\n\n".join( |
|
|
[f"Response {label}:\n{result['response']}" for label, result in zip(labels, stage1_results)] |
|
|
) |
|
|
|
|
|
ranking_prompt = f"""You are evaluating different responses to the following question: |
|
|
|
|
|
Question: {user_query} |
|
|
|
|
|
Here are the responses from different models (anonymized): |
|
|
|
|
|
{responses_text} |
|
|
|
|
|
Your task: |
|
|
1. First, evaluate each response individually. For each response, explain what it does well and what it does poorly. |
|
|
2. Then, at the very end of your response, provide a final ranking. |
|
|
|
|
|
IMPORTANT: Your final ranking MUST be formatted EXACTLY as follows: |
|
|
- Start with the line "FINAL RANKING:" (all caps, with colon) |
|
|
- Then list the responses from best to worst as a numbered list |
|
|
- Each line should be: number, period, space, then ONLY the response label (e.g., "1. Response A") |
|
|
- Do not add any other text or explanations in the ranking section |
|
|
|
|
|
Example of the correct format for your ENTIRE response: |
|
|
|
|
|
Response A provides good detail on X but misses Y... |
|
|
Response B is accurate but lacks depth on Z... |
|
|
Response C offers the most comprehensive answer... |
|
|
|
|
|
FINAL RANKING: |
|
|
1. Response C |
|
|
2. Response A |
|
|
3. Response B |
|
|
|
|
|
Now provide your evaluation and ranking:""" |
|
|
|
|
|
messages = [{"role": "user", "content": ranking_prompt}] |
|
|
|
|
|
|
|
|
responses = await query_models_parallel(COUNCIL_MODELS, messages) |
|
|
|
|
|
|
|
|
stage2_results = [] |
|
|
for model, response in responses.items(): |
|
|
if response is not None: |
|
|
full_text = response.get("content", "") |
|
|
parsed = parse_ranking_from_text(full_text) |
|
|
stage2_results.append({"model": model, "ranking": full_text, "parsed_ranking": parsed}) |
|
|
|
|
|
print("STAGE 2 COMPLETE: Rankings collected.") |
|
|
return stage2_results, label_to_model |
|
|
|
|
|
|
|
|
async def stage3_synthesize_final( |
|
|
user_query: str, stage1_results: List[Dict[str, Any]], stage2_results: List[Dict[str, Any]] |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Stage 3: Chairman synthesizes final response. |
|
|
|
|
|
Args: |
|
|
user_query: The original user query |
|
|
stage1_results: Individual model responses from Stage 1 |
|
|
stage2_results: Rankings from Stage 2 |
|
|
|
|
|
Returns: |
|
|
Dict with 'model' and 'response' keys |
|
|
""" |
|
|
print("STAGE 3: Chairman is synthesizing the final answer...") |
|
|
|
|
|
stage1_text = "\n\n".join( |
|
|
[f"Model: {result['model']}\nResponse: {result['response']}" for result in stage1_results] |
|
|
) |
|
|
|
|
|
stage2_text = "\n\n".join( |
|
|
[f"Model: {result['model']}\nRanking: {result['ranking']}" for result in stage2_results] |
|
|
) |
|
|
|
|
|
chairman_prompt = f"""You are the Chairman of an LLM Council. Multiple AI models have provided responses to a user's question, and then ranked each other's responses. |
|
|
|
|
|
Original Question: {user_query} |
|
|
|
|
|
STAGE 1 - Individual Responses: |
|
|
{stage1_text} |
|
|
|
|
|
STAGE 2 - Peer Rankings: |
|
|
{stage2_text} |
|
|
|
|
|
Your task as Chairman is to synthesize all of this information into a single, comprehensive, accurate answer to the user's original question. Consider: |
|
|
- The individual responses and their insights |
|
|
- The peer rankings and what they reveal about response quality |
|
|
- Any patterns of agreement or disagreement |
|
|
|
|
|
Provide a clear, well-reasoned final answer that represents the council's collective wisdom:""" |
|
|
|
|
|
messages = [{"role": "user", "content": chairman_prompt}] |
|
|
|
|
|
|
|
|
response = await query_model(CHAIRMAN_MODEL, messages) |
|
|
|
|
|
if response is None: |
|
|
|
|
|
print("STAGE 3 ERROR: Unable to generate final synthesis.") |
|
|
return {"model": CHAIRMAN_MODEL, "response": "Error: Unable to generate final synthesis."} |
|
|
|
|
|
print("STAGE 3 COMPLETE: Final answer synthesized.") |
|
|
return {"model": CHAIRMAN_MODEL, "response": response.get("content", "")} |
|
|
|
|
|
|
|
|
async def stage3_synthesize_final_stream( |
|
|
user_query: str, stage1_results: List[Dict[str, Any]], stage2_results: List[Dict[str, Any]] |
|
|
): |
|
|
""" |
|
|
Stage 3: Chairman synthesizes final response (Streaming). |
|
|
Yields chunks of text. |
|
|
""" |
|
|
print("STAGE 3: Chairman is synthesizing the final answer (Streaming)...") |
|
|
|
|
|
|
|
|
stage1_text = "\n\n".join( |
|
|
[f"Model: {result['model']}\nResponse: {result['response']}" for result in stage1_results] |
|
|
) |
|
|
|
|
|
stage2_text = "\n\n".join( |
|
|
[f"Model: {result['model']}\nRanking: {result['ranking']}" for result in stage2_results] |
|
|
) |
|
|
|
|
|
chairman_prompt = f"""You are the Chairman of an LLM Council. Multiple AI models have provided responses to a user's question, and then ranked each other's responses. |
|
|
|
|
|
Original Question: {user_query} |
|
|
|
|
|
STAGE 1 - Individual Responses: |
|
|
{stage1_text} |
|
|
|
|
|
STAGE 2 - Peer Rankings: |
|
|
{stage2_text} |
|
|
|
|
|
Your task as Chairman is to synthesize all of this information into a single, comprehensive, accurate answer to the user's original question. Consider: |
|
|
- The individual responses and their insights |
|
|
- The peer rankings and what they reveal about response quality |
|
|
- Any patterns of agreement or disagreement |
|
|
|
|
|
Provide a clear, well-reasoned final answer that represents the council's collective wisdom:""" |
|
|
|
|
|
messages = [{"role": "user", "content": chairman_prompt}] |
|
|
|
|
|
|
|
|
async for chunk in query_model_stream(CHAIRMAN_MODEL, messages): |
|
|
yield chunk |
|
|
|
|
|
print("STAGE 3 COMPLETE: Final answer stream finished.") |
|
|
|
|
|
|
|
|
def parse_ranking_from_text(ranking_text: str) -> List[str]: |
|
|
""" |
|
|
Parse the FINAL RANKING section from the model's response. |
|
|
|
|
|
Args: |
|
|
ranking_text: The full text response from the model |
|
|
|
|
|
Returns: |
|
|
List of response labels in ranked order |
|
|
""" |
|
|
import re |
|
|
|
|
|
|
|
|
if "FINAL RANKING:" in ranking_text: |
|
|
|
|
|
parts = ranking_text.split("FINAL RANKING:") |
|
|
if len(parts) >= 2: |
|
|
ranking_section = parts[1] |
|
|
|
|
|
|
|
|
numbered_matches = re.findall(r"\d+\.\s*Response [A-Z]", ranking_section) |
|
|
if numbered_matches: |
|
|
|
|
|
return [re.search(r"Response [A-Z]", m).group() for m in numbered_matches] |
|
|
|
|
|
|
|
|
matches = re.findall(r"Response [A-Z]", ranking_section) |
|
|
return matches |
|
|
|
|
|
|
|
|
matches = re.findall(r"Response [A-Z]", ranking_text) |
|
|
return matches |
|
|
|
|
|
|
|
|
def calculate_aggregate_rankings( |
|
|
stage2_results: List[Dict[str, Any]], label_to_model: Dict[str, str] |
|
|
) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Calculate aggregate rankings across all models. |
|
|
|
|
|
Args: |
|
|
stage2_results: Rankings from each model |
|
|
label_to_model: Mapping from anonymous labels to model names |
|
|
|
|
|
Returns: |
|
|
List of dicts with model name and average rank, sorted best to worst |
|
|
""" |
|
|
from collections import defaultdict |
|
|
|
|
|
|
|
|
model_positions = defaultdict(list) |
|
|
|
|
|
for ranking in stage2_results: |
|
|
ranking_text = ranking["ranking"] |
|
|
|
|
|
|
|
|
parsed_ranking = parse_ranking_from_text(ranking_text) |
|
|
|
|
|
for position, label in enumerate(parsed_ranking, start=1): |
|
|
if label in label_to_model: |
|
|
model_name = label_to_model[label] |
|
|
model_positions[model_name].append(position) |
|
|
|
|
|
|
|
|
aggregate = [] |
|
|
for model, positions in model_positions.items(): |
|
|
if positions: |
|
|
avg_rank = sum(positions) / len(positions) |
|
|
aggregate.append( |
|
|
{"model": model, "average_rank": round(avg_rank, 2), "rankings_count": len(positions)} |
|
|
) |
|
|
|
|
|
|
|
|
aggregate.sort(key=lambda x: x["average_rank"]) |
|
|
|
|
|
return aggregate |
|
|
|
|
|
|
|
|
async def generate_conversation_title(user_query: str) -> str: |
|
|
""" |
|
|
Generate a short title for a conversation based on the first user message. |
|
|
|
|
|
Args: |
|
|
user_query: The first user message |
|
|
|
|
|
Returns: |
|
|
A short title (3-5 words) |
|
|
""" |
|
|
title_prompt = f"""Generate a very short title (3-5 words maximum) that summarizes the following question. |
|
|
The title should be concise and descriptive. Do not use quotes or punctuation in the title. |
|
|
|
|
|
Question: {user_query} |
|
|
|
|
|
Title:""" |
|
|
|
|
|
messages = [{"role": "user", "content": title_prompt}] |
|
|
|
|
|
|
|
|
response = await query_model("google/gemini-2.5-flash", messages, timeout=30.0) |
|
|
|
|
|
if response is None: |
|
|
|
|
|
return "New Conversation" |
|
|
|
|
|
title = response.get("content", "New Conversation").strip() |
|
|
|
|
|
|
|
|
title = title.strip("\"'") |
|
|
|
|
|
|
|
|
if len(title) > 50: |
|
|
title = title[:47] + "..." |
|
|
|
|
|
return title |
|
|
|
|
|
|
|
|
async def run_full_council(user_query: str) -> Tuple[List, List, Dict, Dict]: |
|
|
""" |
|
|
Run the complete 3-stage council process. |
|
|
|
|
|
Args: |
|
|
user_query: The user's question |
|
|
|
|
|
Returns: |
|
|
Tuple of (stage1_results, stage2_results, stage3_result, metadata) |
|
|
""" |
|
|
|
|
|
stage1_results = await stage1_collect_responses(user_query) |
|
|
|
|
|
|
|
|
if not stage1_results: |
|
|
return [], [], {"model": "error", "response": "All models failed to respond. Please try again."}, {} |
|
|
|
|
|
|
|
|
stage2_results, label_to_model = await stage2_collect_rankings(user_query, stage1_results) |
|
|
|
|
|
|
|
|
aggregate_rankings = calculate_aggregate_rankings(stage2_results, label_to_model) |
|
|
|
|
|
|
|
|
stage3_result = await stage3_synthesize_final(user_query, stage1_results, stage2_results) |
|
|
|
|
|
|
|
|
metadata = {"label_to_model": label_to_model, "aggregate_rankings": aggregate_rankings} |
|
|
|
|
|
return stage1_results, stage2_results, stage3_result, metadata |
|
|
|