# -*- coding: utf-8 -*- """ Enhanced RAG Pipeline for vaccine assistant with fallback system Handles agent creation and question answering with sequential citation numbering Includes fallback agent for max iterations handling """ import json import re from llama_index.core import PromptTemplate from llama_index.core.agent import ReActAgent from llama_index.llms.google_genai import GoogleGenAI from langdetect import detect import os import time def extract_source_ids(response_text): """ Extract source IDs from the response, handling different citation formats: - Standard format: [Source ID] - Multiple sources in one citation: [Source ID1][Source ID2] - Multiple sources in one bracket: [Source ID1, Source ID2] Args: response_text (str): The generated response text with inline citations. Returns: list of str: List of unique source IDs found in the response text. """ import re print(f"[LOG] Extracting source IDs from response text (length: {len(response_text)} chars)") # First, extract all source IDs from inline citations with adjacent brackets [ID1][ID2] # Replace them with single brackets with comma separation to standardize format consolidated_text = re.sub(r'\][\s]*\[', '][', response_text) consolidated_text = re.sub(r'\]\[', ', ', consolidated_text) # Now extract all source IDs from any format (single ID or comma-separated IDs) inline_citations = re.findall(r'\[([^\[\]]+)\]', consolidated_text) print(f"[LOG] Found {len(inline_citations)} inline citations") if not inline_citations: print("Warning: No source IDs found in the response text.") return [] # Process each citation which might contain multiple comma-separated IDs all_ids = [] for citation in inline_citations: # Split by comma and strip whitespace ids = [id_str.strip() for id_str in citation.split(',')] all_ids.extend(ids) # Get unique source IDs while preserving order seen = set() source_ids = [] for id_str in all_ids: if id_str not in seen: seen.add(id_str) source_ids.append(id_str) print(f"[LOG] Extracted {len(source_ids)} unique source IDs: {source_ids[:3]}{'...' if len(source_ids) > 3 else ''}") if not source_ids: print("Warning: No valid source IDs found after filtering.") return [] return source_ids def convert_citations_to_sequential(response_text, source_id_to_number_map): """ Convert source IDs in response text to sequential numbers. Args: response_text (str): The response text with source ID citations source_id_to_number_map (dict): Mapping from source IDs to sequential numbers Returns: str: Response text with sequential number citations """ print(f"[LOG] Converting {len(source_id_to_number_map)} source IDs to sequential numbers") def replace_citation(match): citation_content = match.group(1) # Handle multiple IDs in one citation (comma-separated) ids = [id_str.strip() for id_str in citation_content.split(',')] # Convert each ID to its sequential number numbers = [] for id_str in ids: if id_str in source_id_to_number_map: numbers.append(str(source_id_to_number_map[id_str])) # Return the formatted citation with sequential numbers if len(numbers) == 1: return f"[{numbers[0]}]" elif len(numbers) > 1: return f"[{','.join(numbers)}]" else: return match.group(0) # Return original if no mapping found # Replace all citations in the text sequential_response = re.sub(r'\[([^\[\]]+)\]', replace_citation, response_text) print("[LOG] Successfully converted citations to sequential format") return sequential_response def create_safe_custom_prompt(tools, llm, is_fallback=False): """Create a safe version that won't have formatting conflicts""" print(f"[LOG] Creating {'fallback' if is_fallback else 'standard'} custom prompt with {len(tools)} tools") if is_fallback: custom_instructions = """ ## MEDICAL ASSISTANT ROLE - FALLBACK MODE You are a helpful and knowledgeable AI-powered vaccine assistant designed to support doctors in clinical decision-making. You are operating in FALLBACK MODE with access to only the most essential and comprehensive tools. You provide evidence-based guidance using only information from official vaccine medical documents. Answer the doctor's question accurately and concisely using only the provided information. ## FALLBACK MODE INSTRUCTIONS - You have access to only 2 powerful tools: general_guide_tool (Algerian National Vaccination Guide) and who_immunization_tool (WHO global guidance). - **MANDATORY TOOL USAGE**: Always use the relevant tool(s) to search for information before answering, even if you initially think no information is available. - Be direct and efficient - search once with each tool if needed, then provide your answer. - Do not overthink or search repeatedly - these tools are comprehensive. ## IMPORTANT REQUIREMENTS ### Citation and Sourcing 1. For each fact in your response, include an inline citation in the format [Source ID] immediately following the information, e.g., [e795ebd28318886c0b1a5395ac30ad90]. 2. The Source ID must be the exact alphanumeric identifier from the search results, NOT the tool name or any other text. 3. Do NOT use 'Source:' in the citation format; use only the Source ID in square brackets. 4. Do NOT use tool names (like general_guide_tool, who_immunization_tool) as citations. 5. If a fact is supported by multiple sources, use adjacent citations: [e795ebd28318886c0b1a5395ac30ad90][21a932b2340bb16707763f57f0ad2] 6. Use ONLY the provided information from tool outputs and never include facts from your general knowledge. ### Content Formatting 1. When rendering tables: - Convert HTML tables into clean Markdown format. - Preserve all original headers and data rows exactly. - Include the citation in the table caption, e.g., 'Table: Vaccination Schedule [Source ID]'. 2. For lists, maintain the original bullet points/numbering and include citations. 3. Present information concisely but ensure clinical accuracy is never compromised. ### CRITICAL: Efficient Fallback Strategy 1. **MANDATORY SEARCH**: Use each relevant tool at least once to search for information, even if you suspect the information might not be available. 2. **BREAK DOWN COMPLEX QUERIES**: For comparative or multi-part questions (e.g., comparing Algerian and WHO guidelines), break the query into sub-queries and use the appropriate tool for each part. 3. **DO NOT STOP PREMATURELY**: Do not conclude "no information is available" without using the relevant tool(s) to search for the answer. 4. **BE DECISIVE**: Once you find relevant information for each sub-query, formulate your response immediately. 5. **ANSWER FULLY**: Address all parts of the question, using multiple tools if required by the query. 6. **FINAL ANSWER**: Once you have your answer, present it directly. Do not output your internal 'thought' or 'action' steps. Your final output must be the synthesized answer itself. ### Response Guidelines - **MANDATORY TOOL SELECTION**: - For queries mentioning "WHO," "World Health Organization," "international," "global guidance," or WHO documents, use who_immunization_tool first. - For queries mentioning "Algerian," "national guide," or Algerian-specific terms, use general_guide_tool first. - For comparative queries (e.g., Algerian vs. WHO), use both tools, addressing each part systematically. - **EXPLICIT REASONING**: Before answering, log your reasoning steps, including which tools you will use and why, based on the query’s content. - Provide all found information with proper citations using Source IDs only. - If information is limited, clearly state: "Based on the available documents, I can provide the following information..." and indicate what is not available. --- """ else: custom_instructions = """ ## MEDICAL ASSISTANT ROLE You are a helpful and knowledgeable AI-powered vaccine assistant designed to support doctors in clinical decision-making. You provide evidence-based guidance using only information from official vaccine medical documents. Answer the doctor's question accurately and concisely using only the provided information. ## IMPORTANT REQUIREMENTS ### Citation and Sourcing 1. For each fact in your response, include an inline citation in the format [Source ID] immediately following the information, e.g., [e795ebd28318886c0b1a5395ac30ad90]. 2. The Source ID must be the exact alphanumeric identifier from the search results, NOT the tool name or any other text. 3. Do NOT use 'Source:' in the citation format; use only the Source ID in square brackets. 4. Do NOT use tool names (like general_guide_tool, cold_chain_tool) as citations. 5. If a fact is supported by multiple sources, use adjacent citations: [e795ebd28318886c0b1a5395ac30ad90][21a932b2340bb16707763f57f0ad2] 6. Use ONLY the provided information from tool outputs and never include facts from your general knowledge. ### Content Formatting 1. When rendering tables: - Convert HTML tables into clean Markdown format. - Preserve all original headers and data rows exactly. - Include the citation in the table caption, e.g., 'Table: Vaccination Schedule [Source ID]'. 2. For lists, maintain the original bullet points/numbering and include citations. 3. Present information concisely but ensure clinical accuracy is never compromised. ### CRITICAL: Efficient Response Strategy 1. **MANDATORY SEARCH**: Always use the relevant tool(s) to search for information before answering, even if you initially think no information is available. 2. **MANDATORY TOOL SELECTION**: - For queries about global standards or WHO, use who_immunization_tool. - For broad questions about the Algerian guide, use general_guide_tool. - For specific topics like cold chain, disease info, etc., use the most specific tool (e.g., cold_chain_tool, disease_info_tool). 3. **Query Decomposition**: Break comparative or multi-part queries into sub-queries and use the appropriate tool for each. 4. **DO NOT STOP PREMATURELY**: Do not conclude "no information is available" without using the relevant tool(s) to search for the answer. 5. **EXPLICIT REASONING**: Before answering, log your reasoning steps, including which tools you will use and why. 6. **BE DECISIVE**: Once you find relevant information, formulate your response. ### Final Answer Generation - **STOP WHEN SUFFICIENT**: Once you have gathered enough information from the tools to answer the user's question completely, you MUST stop using tools and formulate a final answer. - **SYNTHESIZE THE ANSWER**: Formulate a comprehensive, final answer based ONLY on the observed tool outputs. - **PRESENT CLEANLY**: Present this final answer directly to the user. Your final output must be the answer itself, not your internal 'thought' or 'action' steps. ### Response Guidelines for Complex Questions - For comparative questions: Break the query into sub-queries, use the appropriate tools, then provide the comparison. - For multi-part questions: Address each part systematically, using the appropriate tool for each sub-query. - If information is not found after using the relevant tool(s): State clearly: "Based on the available documents, I can provide the following information..." and specify what is not available. --- """ # Get the exact original template first temp_agent = ReActAgent.from_tools(tools, llm=llm, verbose=False) original_prompts = temp_agent.get_prompts() original_template = original_prompts["agent_worker:system_prompt"].template # Add instructions at the very beginning safe_template = f"{custom_instructions}{original_template}" # Create new prompt with same metadata as original original_prompt = original_prompts["agent_worker:system_prompt"] try: new_prompt = PromptTemplate( template=safe_template, template_vars=original_prompt.template_vars, metadata=original_prompt.metadata if hasattr(original_prompt, 'metadata') else None ) print(f"[LOG] ✅ Successfully created {'fallback' if is_fallback else 'standard'} custom prompt") return new_prompt except: # Even safer fallback print(f"[LOG] ⚠️ Using fallback prompt template for {'fallback' if is_fallback else 'standard'} agent") return PromptTemplate(template=safe_template) def create_agent(tools, llm, is_fallback=False): """Create the ReAct agent with custom prompt""" agent_type = "FALLBACK" if is_fallback else "STANDARD" # **FIX**: Increased max_iterations to give the agent more steps to reason max_iter = 15 print(f"[LOG] Creating {agent_type} ReAct agent with {len(tools)} tools and max_iterations={max_iter}") # Create agent with appropriate settings agent = ReActAgent.from_tools( tools, llm=llm, verbose=True, max_iterations=max_iter, ) # Create and apply safe custom prompt try: safe_custom_prompt = create_safe_custom_prompt(tools, llm, is_fallback=is_fallback) agent.update_prompts({"agent_worker:system_prompt": safe_custom_prompt}) print(f"✅ Successfully updated {agent_type} agent with safe custom prompt") except Exception as e: print(f"❌ {agent_type} agent prompt update failed: {e}") print(f"⚠️ Using original {agent_type} agent without modifications") print(f"[LOG] {agent_type} agent creation completed") return agent def create_fallback_tools(all_tools): """Extract only the general_guide_tool and who_immunization_tool for fallback agent""" print("[LOG] Creating fallback tools (guide + WHO only)") fallback_tools = [] tool_names_found = [] for tool in all_tools: tool_name = tool.metadata.name if hasattr(tool, 'metadata') else str(tool) if tool_name in ["general_guide_tool", "who_immunization_tool"]: fallback_tools.append(tool) tool_names_found.append(tool_name) print(f"[LOG] Found {len(fallback_tools)} fallback tools: {tool_names_found}") if len(fallback_tools) == 0: print("[LOG] ❌ ERROR: No fallback tools found! Check tool names.") return None return fallback_tools def initialize_rag_pipeline(tools): """Initialize the RAG pipeline with both standard and fallback agents""" print("[LOG] Initializing RAG pipeline with fallback system...") print(f"[LOG] Available tools: {[tool.metadata.name if hasattr(tool, 'metadata') else str(tool) for tool in tools]}") # Initialize LlamaIndex LLM print("[LOG] Initializing Google GenAI LLM (gemini-2.0-flash)") llama_index_llm = GoogleGenAI( model="models/gemini-2.0-flash", api_key=os.getenv('GOOGLE_API_KEY'), ) # Create standard agent print("[LOG] Creating standard agent...") standard_agent = create_agent(tools, llama_index_llm, is_fallback=False) # Create fallback tools and agent print("[LOG] Creating fallback agent...") fallback_tools = create_fallback_tools(tools) if fallback_tools is None: print("[LOG] ❌ WARNING: Fallback agent creation failed - no fallback tools available") fallback_agent = None else: fallback_agent = create_agent(fallback_tools, llama_index_llm, is_fallback=True) print("[LOG] ✅ Fallback agent created successfully") print("[LOG] ✅ RAG pipeline initialization completed with fallback system") return { "standard_agent": standard_agent, "fallback_agent": fallback_agent, "llm": llama_index_llm } def detect_max_iterations_error(response_text): """Detect if the response indicates a max iterations error OR is an unfinished thought.""" response_lower = response_text.lower().strip() # **FIX**: Check if the response is the agent's raw thought process. if response_lower.startswith("a:```thought") or response_lower.startswith("```thought"): print("[LOG] Detected unfinished agent thought process.") return True max_iteration_indicators = [ "max iterations", "reached max iterations", "agent stopped due to max iterations", "maximum number of iterations", "iteration limit" ] # Check for explicit max iterations indicators for indicator in max_iteration_indicators: if indicator in response_lower: print(f"[LOG] Detected max iteration indicator: '{indicator}'") return True # Check for very short or empty responses (often indicates failure) if len(response_text.strip()) < 10: return True # Check for generic error patterns if ("error" in response_lower and "processing" in response_lower): return True return False def process_question(agents_dict, question: str) -> str: """Process a question through the RAG pipeline with fallback support""" print(f"[LOG] Processing question: '{question[:100]}{'...' if len(question) > 100 else ''}'") standard_agent = agents_dict["standard_agent"] fallback_agent = agents_dict["fallback_agent"] print("="*50) print("🤖 STANDARD AGENT REASONING PROCESS:") print("="*50) start_time = time.time() try: # Try standard agent first response = standard_agent.chat(question) response_text = response.response print("="*50) print("🤖 STANDARD AGENT REASONING COMPLETED") print("="*50) elapsed_time = time.time() - start_time print(f"[LOG] ✅ Standard agent response received in {elapsed_time:.2f} seconds") print(f"[LOG] Response length: {len(response_text)} characters") # Check if we need to use fallback if detect_max_iterations_error(response_text): print("[LOG] 🔄 Max iterations or unfinished thought detected, switching to FALLBACK AGENT...") if fallback_agent is None: print("[LOG] ❌ Fallback agent not available, returning error message") return ("I apologize, but I encountered difficulties processing your question. " "Please try rephrasing your question more specifically or breaking it down into smaller parts.") print("="*50) print("🛡️ FALLBACK AGENT REASONING PROCESS:") print("="*50) fallback_start_time = time.time() try: fallback_response = fallback_agent.chat(question) fallback_text = fallback_response.response print("="*50) print("🛡️ FALLBACK AGENT REASONING COMPLETED") print("="*50) fallback_elapsed = time.time() - fallback_start_time total_elapsed = time.time() - start_time print(f"[LOG] ✅ Fallback agent response received in {fallback_elapsed:.2f} seconds") print(f"[LOG] Total processing time: {total_elapsed:.2f} seconds") print(f"[LOG] Fallback response length: {len(fallback_text)} characters") # Check if fallback also failed if detect_max_iterations_error(fallback_text): print("[LOG] ❌ Fallback agent also hit max iterations or failed to produce an answer.") return ("I apologize, but I'm having difficulty finding specific information about your question in the available documents. " "Please try asking a more specific question or rephrasing your query.") return fallback_text except Exception as e: fallback_elapsed = time.time() - fallback_start_time print(f"[LOG] ❌ Fallback agent error after {fallback_elapsed:.2f} seconds: {e}") return ("I apologize, but I encountered an error while processing your question. " "Please try rephrasing your question or asking about a more specific topic.") return response_text except Exception as e: elapsed_time = time.time() - start_time print(f"[LOG] ❌ Standard agent error after {elapsed_time:.2f} seconds: {e}") # Try fallback even on standard agent exception if fallback_agent is not None: print("[LOG] 🔄 Standard agent failed, trying FALLBACK AGENT...") try: fallback_response = fallback_agent.chat(question) return fallback_response.response except Exception as fallback_e: print(f"[LOG] ❌ Fallback agent also failed: {fallback_e}") return f"Error processing your question: {str(e)}" def aswer_language_detection(response_text: str) -> str: """ Detect the language of the response text. Args: response_text (str): The response text to analyze. Returns: str: Detected language code (e.g., 'en', 'fr', etc.) """ print("[LOG] Detecting response language...") try: # Detect the language of the first 5 words of the response first_line = " ".join(response_text.split()[:5]) first_line = re.sub(r'\[.*?\]', '', first_line) # Remove citations answer_language = detect(first_line) print(f"[LOG] Detected language: {answer_language}") if answer_language not in ['en', 'ar', 'fr']: print(f"[LOG] Language {answer_language} not in supported list, defaulting to 'en'") answer_language ='en' except: print("[LOG] Language detection failed, defaulting to 'en'") answer_language ='en' finally: return answer_language def process_question_with_sequential_citations(agents_dict, question: str, chunks_directory="./data/") -> dict: """ Process a question through the RAG pipeline with fallback support and return response with sequential citation numbers. Args: agents_dict: Dictionary containing standard_agent, fallback_agent, and llm question (str): The user's question chunks_directory (str): Path to the directory containing JSON files Returns: dict: { "response": str, # Response with sequential citation numbers [1], [2], etc. "cited_elements_json": str, # JSON array of cited elements in order "unique_ids": list, # Original source IDs in order "citation_mapping": dict, # Mapping from source ID to citation number "used_fallback": bool # Whether fallback agent was used } """ print(f"\n[LOG] === STARTING QUESTION PROCESSING WITH FALLBACK SUPPORT ===") print(f"[LOG] Question: '{question[:150]}{'...' if len(question) > 150 else ''}'") print(f"[LOG] Chunks directory: {chunks_directory}") start_time = time.time() used_fallback = False # This flag is a heuristic try: # Get the response using the enhanced process_question function response_text = process_question(agents_dict, question) # Check if fallback was likely used (simple heuristic based on logs) # A more robust way would be for `process_question` to return a tuple (response, used_fallback) if "switching to fallback agent" in response_text.lower(): used_fallback = True print("[LOG] 🛡️ Fallback agent was likely used based on log indicators.") agent_time = time.time() - start_time print(f"[LOG] Agent processing completed in {agent_time:.2f} seconds") print(f"[LOG] Raw response length: {len(response_text)} characters") # Extract source IDs from the response (preserving order) unique_ids = extract_source_ids(response_text) # Create mapping from source ID to sequential number source_id_to_number = {source_id: i + 1 for i, source_id in enumerate(unique_ids)} print(f"[LOG] Created citation mapping for {len(source_id_to_number)} sources") # Convert citations to sequential numbers sequential_response = convert_citations_to_sequential(response_text, source_id_to_number) # Load all chunks data to find cited elements print("[LOG] Loading chunks data for citation lookup...") all_chunks_data = [] min_chunks_files = ["Guide-pratique-de-mise-en-oeuvre-du-calendrier-national-de-vaccination-2023.json", "Immunization in Practice_WHO_eng_2015.json"] for json_file in min_chunks_files: json_path = os.path.join(chunks_directory, json_file) if not os.path.exists(json_path): print(f"[LOG] ⚠️ Skipping non-existent file: {json_path}") continue print(f"[LOG] Loading {json_file}...") try: with open(json_path, "r", encoding="utf-8") as f: chunks_data = json.load(f) all_chunks_data.extend(chunks_data) print(f"[LOG] ✅ Loaded {len(chunks_data)} chunks from {json_file}") except Exception as e: print(f"[LOG] ❌ Warning: Could not load {json_file}: {e}") print(f"[LOG] Total chunks loaded: {len(all_chunks_data)}") # Get cited elements in the same order as the sequential citations print("[LOG] Finding cited elements...") cited_elements_ordered = [] for i, source_id in enumerate(unique_ids): # This preserves the order # print(f"[LOG] Looking for source ID {i+1}/{len(unique_ids)}: {source_id}") # This is too verbose for normal operation found = False for element in all_chunks_data: # Handle TableElement structure if element.get("type") == 'TableElement': if element.get("elements", {}).get("element_id") == source_id: cited_elements_ordered.append(element.get("elements", {})) found = True break # Handle other element structures elif "elements" in element and isinstance(element["elements"], list): for nested_element in element["elements"]: if isinstance(nested_element, dict) and nested_element.get("element_id") == source_id: cited_elements_ordered.append(nested_element) found = True break if found: break if not found: print(f"[LOG] ⚠️ Source ID {source_id} not found in chunks data") print(f"[LOG] Found {len(cited_elements_ordered)} cited elements") # Convert to JSON cited_elements_json = json.dumps(cited_elements_ordered, ensure_ascii=False, indent=2) answer_language = aswer_language_detection(response_text) total_time = time.time() - start_time print(f"[LOG] ✅ Processing completed in {total_time:.2f} seconds total") print(f"[LOG] Final response length: {len(sequential_response)} characters") print(f"[LOG] Used fallback: {used_fallback}") print(f"[LOG] === QUESTION PROCESSING COMPLETED ===\n") return { "response": sequential_response, "cited_elements_json": cited_elements_json, "unique_ids": unique_ids, "citation_mapping": source_id_to_number, "answer_language": answer_language, "used_fallback": used_fallback } except Exception as e: elapsed_time = time.time() - start_time print(f"[LOG] ❌ Error processing question after {elapsed_time:.2f} seconds: {e}") error_response = "I apologize, but I encountered an error while processing your question. Please try rephrasing your question or asking about a more specific topic." return { "response": error_response, "cited_elements_json": "[]", "unique_ids": [], "citation_mapping": {}, "answer_language": "en", "used_fallback": False } def process_question_with_citations(agents_dict, question: str, chunks_directory="./data/") -> dict: """ Legacy function - maintained for backward compatibility. Now calls the new sequential citation function with fallback support. """ print("[LOG] Using legacy function wrapper - redirecting to sequential citations with fallback") return process_question_with_sequential_citations(agents_dict, question, chunks_directory)